diff --git a/bin/openfnx b/bin/openfnx new file mode 120000 index 000000000..66adbf53e --- /dev/null +++ b/bin/openfnx @@ -0,0 +1 @@ +../lib/node_modules/@openfn/clix/dist/index.js \ No newline at end of file diff --git a/claude.md b/claude.md new file mode 100644 index 000000000..b2f45c047 --- /dev/null +++ b/claude.md @@ -0,0 +1,139 @@ +# OpenFn Kit + +This monorepo contains the core packages that power OpenFn's workflow automation platform. OpenFn is a Digital Public Good trusted by NGOs and governments in 40+ countries to automate data integration workflows. + +## Architecture + +The repository has three main packages: **CLI**, **Runtime**, and **Worker**. The CLI and Worker are both frontends for executing workflows - the CLI for local development, the Worker for production execution via Lightning (the web platform). Both wrap the Runtime as their execution engine. The Worker uses engine-multi to wrap the Runtime for multi-process execution. + +## Core Packages + +- **[@openfn/cli](packages/cli)** - Command-line interface for local development. Run, test, compile, and deploy workflows. +- **[@openfn/runtime](packages/runtime)** - Core execution engine. Safely executes jobs in a sandboxed VM environment. +- **[@openfn/ws-worker](packages/ws-worker)** - WebSocket worker connecting Lightning to the Runtime. Stateless server that pulls runs from Lightning's queue. See [.claude/event-processor.md](.claude/event-processor.md) for event processing details. +- **[@openfn/engine-multi](packages/engine-multi)** - Multi-process runtime wrapper used by ws-worker for concurrent workflow execution. +- **[@openfn/compiler](packages/compiler)** - Transforms OpenFn job DSL into executable JavaScript modules. + +## Supporting Packages + +- **@openfn/lexicon** - Shared TypeScript types +- **@openfn/logger** - Structured logging utilities +- **@openfn/describe-package** - TypeScript analysis for adaptor docs (to be phased out) +- **@openfn/deploy** - Deployment logic for Lightning (soon to be deprecated) +- **@openfn/project** - Models and understands local OpenFn projects +- **@openfn/lightning-mock** - Mock Lightning server for testing + +## AI Assistant + +- Keep responses terse and do not over-explain. Users will ask for more guidance if they need it. +- Always present users a short action plan and ask for confirmation before doing it +- Keep the human in the loop at all times. Stop regularly and check for guidance. + +## Key Concepts + +**Workflows** are sequences of **jobs** that process data through steps. Each **job** is an array of **operations** (functions that transform state). State flows between jobs based on conditional edges. + +**Adaptors** are npm packages (e.g., `@openfn/language-http`) providing operations for specific systems. The CLI auto-installs them as needed. + +The **Compiler** transforms job DSL code into standard ES modules with imports and operation arrays. + +## Development Setup + +### Prerequisites + +- Node.js 18+ (use `asdf`) +- pnpm (enable with `corepack enable`) + +### Common Commands + +```bash +# Root +pnpm install # Install dependencies +pnpm build # Build all packages +pnpm test # Run all tests +pnpm changeset # Add a changeset for your PR + +# CLI +cd packages/cli +pnpm openfn test # Run from source +pnpm install:global # Install as 'openfnx' for testing + +# Worker +cd packages/ws-worker +pnpm start # Connect to localhost:4000 +pnpm start -l mock # Use mock Lightning +pnpm start --no-loop # Disable auto-fetch +curl -X POST http://localhost:2222/claim # Manual claim +``` + +### Environment Variables + +- `OPENFN_REPO_DIR` - CLI adaptor storage +- `OPENFN_ADAPTORS_REPO` - Local adaptors monorepo path +- `OPENFN_API_KEY` - API key for Lightning deployment +- `OPENFN_ENDPOINT` - Lightning URL (default: app.openfn.org) +- `WORKER_SECRET` - Worker authentication secret + +## Repository Structure + +``` +packages/ +├── cli/ # CLI entry: cli.ts, commands.ts, projects/, options.ts +├── runtime/ # Runtime entry: index.ts, runtime.ts, util/linker +├── ws-worker/ # Worker entry: start.ts, server.ts, api/, events/ +├── compiler/ # Job DSL compiler +├── engine-multi/ # Multi-process wrapper +├── lexicon/ # Shared TypeScript types +└── logger/ # Logging utilities +``` + +## Testing & Releases + +```bash +pnpm test # All tests +pnpm test:types # Type checking +pnpm test:integration # Integration tests +cd packages/cli && pnpm test:watch # Watch mode +``` + +## Testing Best Practice + +- Ensure tests are valuable before generating them. Focus on what's important. +- Treat tests as documentation: they should show how the function is expected to work +- Keep tests focuses: test one thing in each test +- This repo contains extensive testing: check for similar patterns in the same package before improvising + +## Additional Documentation + +**Changesets**: Run `pnpm changeset` when submitting PRs. Releases publish automatically to npm on merge to main. + +The [.claude](.claude) folder contains detailed guides: + +- **[command-refactor.md](.claude/command-refactor.md)** - Refactoring CLI commands into project subcommand structure +- **[event-processor.md](.claude/event-processor.md)** - Worker event processing architecture (batching, ordering) + +## Code Standards + +- **Formatting**: Use Prettier (`pnpm format`) +- **TypeScript**: Required for all new code +- **TypeSync**: Run `pnpm typesync` after modifying dependencies +- **Tests**: Write tests and run `pnpm build` before testing (tests run against `dist/`) +- **Independence**: Keep packages loosely coupled where possible + +## Architecture Principles + +- **Separation of Concerns**: CLI and Worker are frontends; Runtime is the shared execution backend +- **Sandboxing**: Runtime uses Node's VM module for isolation +- **State Immutability**: State cannot be mutated between jobs +- **Portability**: Compiled jobs are standard ES modules +- **Zero Persistence (Worker)**: Worker is stateless; Lightning handles persistence +- **Multi-Process Isolation**: Worker uses engine-multi for concurrent workflow execution + +## Contributing + +1. Make changes +2. Run `pnpm test` +3. Add changeset: `pnpm changeset` +4. Open PR at https://github.com/openfn/kit + +**Resources**: [docs.openfn.org](https://docs.openfn.org) | [app.openfn.org](https://app.openfn.org) | [github.com/openfn/kit](https://github.com/openfn/kit) diff --git a/integration-tests/cli/CHANGELOG.md b/integration-tests/cli/CHANGELOG.md index 75268c427..922602a28 100644 --- a/integration-tests/cli/CHANGELOG.md +++ b/integration-tests/cli/CHANGELOG.md @@ -1,5 +1,15 @@ # @openfn/integration-tests-cli +## 1.0.11 + +### Patch Changes + +- Updated dependencies [6d86d9b] +- Updated dependencies [91fe531] +- Updated dependencies [d0d8069] + - @openfn/project@0.12.1 + - @openfn/lightning-mock@2.4.3 + ## 1.0.10 ### Patch Changes diff --git a/integration-tests/cli/package.json b/integration-tests/cli/package.json index 553480cad..dfbbe7b69 100644 --- a/integration-tests/cli/package.json +++ b/integration-tests/cli/package.json @@ -1,7 +1,7 @@ { "name": "@openfn/integration-tests-cli", "private": true, - "version": "1.0.10", + "version": "1.0.11", "description": "CLI integration tests", "author": "Open Function Group ", "license": "ISC", diff --git a/packages/cli/CHANGELOG.md b/packages/cli/CHANGELOG.md index 5b6e432a4..5c8bc79c9 100644 --- a/packages/cli/CHANGELOG.md +++ b/packages/cli/CHANGELOG.md @@ -1,5 +1,24 @@ # @openfn/cli +## 1.25.0 + +### Minor Changes + +- bb7be02: fetch: allow state files to be writtem to JSON with --format + +### Patch Changes + +- e7b7fdb: Fix an issue where start is not correctly loaded from workflow.yaml +- 297f641: On deploy, skip the check to see if the remote history has diverged. History tracking still needs some work and this feature isn't working properly yet" +- d0d8069: When checking out new projects, only delete the files necessary +- 6d86d9b: Fix step caching when running a workflow through the Project +- Updated dependencies [6d86d9b] +- Updated dependencies [ad7c7c5] +- Updated dependencies [91fe531] +- Updated dependencies [d0d8069] + - @openfn/project@0.12.1 + - @openfn/runtime@1.8.2 + ## 1.24.0 ### Minor Changes diff --git a/packages/cli/package.json b/packages/cli/package.json index 119b12f2e..dddb2b6ca 100644 --- a/packages/cli/package.json +++ b/packages/cli/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/cli", - "version": "1.24.0", + "version": "1.25.0", "description": "CLI devtools for the OpenFn toolchain", "engines": { "node": ">=18", @@ -42,6 +42,7 @@ "@types/ws": "^8.18.1", "@types/yargs": "^17.0.33", "ava": "5.3.1", + "lodash-es": "^4.17.21", "mock-fs": "^5.5.0", "tslib": "^2.8.1", "tsup": "^7.2.0", diff --git a/packages/cli/src/execute/handler.ts b/packages/cli/src/execute/handler.ts index e31d439be..5df00afca 100644 --- a/packages/cli/src/execute/handler.ts +++ b/packages/cli/src/execute/handler.ts @@ -17,7 +17,7 @@ import loadState from '../util/load-state'; import validateAdaptors from '../util/validate-adaptors'; import loadPlan from '../util/load-plan'; import assertPath from '../util/assert-path'; -import { clearCache } from '../util/cache'; +import { clearCache, getCachePath } from '../util/cache'; import fuzzyMatchStep from '../util/fuzzy-match-step'; import abort from '../util/abort'; import validatePlan from '../util/validate-plan'; @@ -182,7 +182,10 @@ const executeHandler = async (options: ExecuteOptions, logger: Logger) => { if (options.cacheSteps) { logger.success( - 'Cached output written to ./cli-cache (see info logs for details)' + `Cached output written to ${getCachePath( + options, + plan.workflow.name + )} (see info logs for details)` ); } diff --git a/packages/cli/src/options.ts b/packages/cli/src/options.ts index 4dc0e44da..4f214c02f 100644 --- a/packages/cli/src/options.ts +++ b/packages/cli/src/options.ts @@ -28,6 +28,7 @@ export type Opts = { json?: boolean; beta?: boolean; cacheSteps?: boolean; + cachePath?: string; compile?: boolean; configPath?: string; confirm?: boolean; @@ -218,6 +219,13 @@ export const cacheSteps: CLIOption = { }, }; +export const cacheDir: CLIOption = { + name: 'cache-dir', + yargs: { + description: 'Set the path to read/write the state cache', + }, +}; + export const compile: CLIOption = { name: 'no-compile', yargs: { @@ -366,8 +374,8 @@ export const ignoreImports: CLIOption = { }, }; -const getBaseDir = (opts: { path?: string }) => { - const basePath = opts.path ?? '.'; +const getBaseDir = (opts: { path?: string; workspace?: string }) => { + const basePath = opts.path ?? opts.workspace ?? '.'; if (/\.(jso?n?|ya?ml)$/.test(basePath)) { return nodePath.dirname(basePath); } diff --git a/packages/cli/src/projects/checkout.ts b/packages/cli/src/projects/checkout.ts index 4163631a1..33369e34c 100644 --- a/packages/cli/src/projects/checkout.ts +++ b/packages/cli/src/projects/checkout.ts @@ -10,13 +10,14 @@ import * as o from '../options'; import * as po from './options'; import type { Opts } from './options'; +import { tidyWorkflowDir } from './util'; export type CheckoutOptions = Pick< Opts, - 'command' | 'project' | 'workspace' | 'log' + 'command' | 'project' | 'workspace' | 'log' | 'clean' >; -const options = [o.log, po.workspace]; +const options = [o.log, po.workspace, po.clean]; const command: yargs.CommandModule = { command: 'checkout ', @@ -24,7 +25,7 @@ const command: yargs.CommandModule = { handler: ensure('project-checkout', options), builder: (yargs) => build(options, yargs).positional('project', { - describe: 'The id, alias or UUID of the project to chcekout', + describe: 'The id, alias or UUID of the project to checkout', demandOption: true, }), }; @@ -40,6 +41,8 @@ export const handler = async (options: CheckoutOptions, logger: Logger) => { // TODO: try to retain the endpoint for the projects const { project: _, ...config } = workspace.getConfig() as any; + const currentProject = workspace.getActiveProject(); + // get the project let switchProject; if (/\.(yaml|json)$/.test(projectIdentifier)) { @@ -60,7 +63,11 @@ export const handler = async (options: CheckoutOptions, logger: Logger) => { } // delete workflow dir before expanding project - await rimraf(path.join(workspacePath, config.workflowRoot ?? 'workflows')); + if (options.clean) { + await rimraf(workspace.workflowsPath); + } else { + await tidyWorkflowDir(currentProject!, switchProject); + } // expand project into directory const files: any = switchProject.serialize('fs'); diff --git a/packages/cli/src/projects/deploy.ts b/packages/cli/src/projects/deploy.ts index abe14dbe8..25fa87a93 100644 --- a/packages/cli/src/projects/deploy.ts +++ b/packages/cli/src/projects/deploy.ts @@ -75,6 +75,9 @@ export async function handler(options: DeployOptions, logger: Logger) { // Note that it's a little wierd to deploy a project you haven't checked out, // so put good safeguards here logger.info('Attempting to load checked-out project from workspace'); + + // TODO this doesn't have a history! + // loading from the fs the history isn't available const localProject = await Project.from('fs', { root: options.workspace || '.', }); @@ -126,7 +129,19 @@ Pass --force to override this error and deploy anyway.`); } // Ensure there's no divergence - if (!localProject.canMergeInto(remoteProject!)) { + + // Skip divergence testing if the remote has no history in its workflows + // (this will only happen on older versions of lightning) + const skipVersionTest = + localProject.workflows.find((wf) => wf.history.length === 0) || + remoteProject.workflows.find((wf) => wf.history.length === 0); + + if (skipVersionTest) { + logger.warn( + 'Skipping compatibility check as no local version history detected' + ); + logger.warn('Pushing these changes may overrite changes made to the app'); + } else if (!localProject.canMergeInto(remoteProject!)) { if (!options.force) { logger.error(`Error: Projects have diverged! @@ -168,6 +183,10 @@ Pass --force to override this error and deploy anyway.`); if (options.dryRun) { logger.always('dryRun option set: skipping upload step'); } else { + // sync summary + // :+1: the remove project has not changed since last sync / the remote project has changed since last sync, and your changes may overwrite these + // The following workflows will be updated + if (options.confirm) { if ( !(await logger.confirm( diff --git a/packages/cli/src/projects/fetch.ts b/packages/cli/src/projects/fetch.ts index d10c4626e..37cf67373 100644 --- a/packages/cli/src/projects/fetch.ts +++ b/packages/cli/src/projects/fetch.ts @@ -14,6 +14,7 @@ import { loadAppAuthConfig, getSerializePath, } from './util'; +import { writeFile } from 'node:fs/promises'; export type FetchOptions = Pick< Opts, @@ -23,6 +24,7 @@ export type FetchOptions = Pick< | 'endpoint' | 'env' | 'force' + | 'format' | 'log' | 'logJson' | 'snapshots' @@ -45,6 +47,7 @@ const options = [ po.outputPath, po.env, po.workspace, + po.format, ]; const command: yargs.CommandModule = { @@ -68,24 +71,72 @@ export default command; const printProjectName = (project: Project) => `${project.qname} (${project.id})`; -export const handler = async (options: FetchOptions, logger: Logger) => { +const fetchV1 = async (options: FetchOptions, logger: Logger) => { const workspacePath = options.workspace ?? process.cwd(); logger.debug('Using workspace at', workspacePath); const workspace = new Workspace(workspacePath, logger, false); - const { outputPath } = options; + // TODO we may need to resolve an alias to a UUID and endpoint + const localProject = workspace.get(options.project!); + if (localProject) { + logger.debug( + `Resolved "${options.project}" to local project ${printProjectName( + localProject + )}` + ); + } else { + logger.debug( + `Failed to resolve "${options.project}" to local project. Will send request to app anyway.` + ); + } - const localTargetProject = await resolveOutputProject( - workspace, - options, + const config = loadAppAuthConfig(options, logger); + + const { data } = await fetchProject( + options.endpoint ?? localProject?.openfn?.endpoint!, + config.apiKey, + localProject?.uuid ?? options.project!, logger ); + const finalOutputPath = getSerializePath( + localProject!, + options.workspace, + options.outputPath + ); + + logger.success(`Fetched project file to ${finalOutputPath}`); + await writeFile(finalOutputPath, JSON.stringify(data, null, 2)); + + // TODO should we return a Project or just the raw state? + return data; +}; + +export const handler = async (options: FetchOptions, logger: Logger) => { + if (options.format === 'state') { + return fetchV1(options, logger); + } + return fetchV2(options, logger); +}; + +export const fetchV2 = async (options: FetchOptions, logger: Logger) => { + const workspacePath = options.workspace ?? process.cwd(); + logger.debug('Using workspace at', workspacePath); + + const workspace = new Workspace(workspacePath, logger, false); + const { outputPath } = options; + const remoteProject = await fetchRemoteProject(workspace, options, logger); - ensureTargetCompatible(options, remoteProject, localTargetProject); + if (!options.force && options.format !== 'state') { + const localTargetProject = await resolveOutputProject( + workspace, + options, + logger + ); - // TODO should we use the local target project for output? + ensureTargetCompatible(options, remoteProject, localTargetProject); + } // Work out where and how to serialize the project const finalOutputPath = getSerializePath( @@ -94,7 +145,7 @@ export const handler = async (options: FetchOptions, logger: Logger) => { outputPath ); - let format: undefined | 'json' | 'yaml' = undefined; + let format: undefined | 'json' | 'yaml' | 'state' = options.format; if (outputPath) { // If the user gave us a path for output, we need to respect the format we've been given const ext = path.extname(outputPath!).substring(1) as any; @@ -112,12 +163,14 @@ export const handler = async (options: FetchOptions, logger: Logger) => { // TODO report whether we've updated or not // finally, write it! - await serialize(remoteProject, finalOutputPath!, format as any); - - logger.success( - `Fetched project file to ${finalOutputPath}.${format ?? 'yaml'}` + const finalPathWithExt = await serialize( + remoteProject, + finalOutputPath!, + format as any ); + logger.success(`Fetched project file to ${finalPathWithExt}`); + return remoteProject; }; @@ -193,7 +246,7 @@ export async function fetchRemoteProject( localProject?.openfn?.uuid && localProject.openfn.uuid !== options.project ) { - // ifwe resolve the UUID to something other than what the user gave us, + // if we resolve the UUID to something other than what the user gave us, // debug-log the UUID we're actually going to use projectUUID = localProject.openfn.uuid as string; logger.debug( diff --git a/packages/cli/src/projects/options.ts b/packages/cli/src/projects/options.ts index dc35a74b8..49e620493 100644 --- a/packages/cli/src/projects/options.ts +++ b/packages/cli/src/projects/options.ts @@ -9,6 +9,8 @@ export type Opts = BaseOpts & { removeUnmapped?: boolean | undefined; workflowMappings?: Record | undefined; project?: string; + format?: 'yaml' | 'json' | 'state'; + clean?: boolean; }; // project specific options @@ -28,6 +30,15 @@ export const alias: CLIOption = { }, }; +export const clean: CLIOption = { + name: 'clean', + yargs: { + description: 'Clean the working dir before checking out the new project', + default: false, + boolean: true, + }, +}; + export const dryRun: CLIOption = { name: 'dryRun', yargs: { @@ -36,6 +47,15 @@ export const dryRun: CLIOption = { }, }; +export const format: CLIOption = { + name: 'format', + yargs: { + hidden: true, + description: + 'The format to save the project as - state, yaml or json. Use this to download raw state files.', + }, +}; + export const removeUnmapped: CLIOption = { name: 'remove-unmapped', yargs: { diff --git a/packages/cli/src/projects/pull.ts b/packages/cli/src/projects/pull.ts index 8943c0021..6a3492561 100644 --- a/packages/cli/src/projects/pull.ts +++ b/packages/cli/src/projects/pull.ts @@ -1,4 +1,6 @@ import yargs from 'yargs'; +import { Workspace } from '@openfn/project'; + import { build, ensure, override } from '../util/command-builders'; import { handler as fetch } from './fetch'; import { handler as checkout } from './checkout'; @@ -59,6 +61,8 @@ export const command: yargs.CommandModule = { }; export async function handler(options: PullOptions, logger: Logger) { + ensureProjectId(options, logger); + await fetch(options, logger); logger.success(`Downloaded latest project version`); @@ -66,4 +70,23 @@ export async function handler(options: PullOptions, logger: Logger) { logger.success(`Checked out project locally`); } +const ensureProjectId = (options: any, logger?: Logger) => { + if (!options.project) { + logger?.debug( + 'No project ID specified: looking up checked out project in Workspace' + ); + const ws = new Workspace(options.workspace); + if (ws.activeProject) { + options.project = ws.activeProject.uuid; + logger?.info( + `Project id not provided: will default to ${options.project}` + ); + } else { + throw new Error( + 'Project not provided: specify a project UUID, id or alias' + ); + } + } +}; + export default handler; diff --git a/packages/cli/src/projects/util.ts b/packages/cli/src/projects/util.ts index e9202ba50..be6f8b1a0 100644 --- a/packages/cli/src/projects/util.ts +++ b/packages/cli/src/projects/util.ts @@ -7,6 +7,7 @@ import type { Logger } from '@openfn/logger'; import type Project from '@openfn/project'; import { CLIError } from '../errors'; import resolvePath from '../util/resolve-path'; +import { rimraf } from 'rimraf'; type AuthOptions = Pick; @@ -44,26 +45,30 @@ const ensureExt = (filePath: string, ext: string) => { }; export const getSerializePath = ( - project: Project, - workspacePath: string, + project?: Project, + workspacePath?: string, outputPath?: string ) => { - const outputRoot = resolvePath(outputPath || workspacePath); + const outputRoot = resolvePath(outputPath || workspacePath || '.'); const projectsDir = project?.config.dirs.projects ?? '.projects'; - return outputPath ?? `${outputRoot}/${projectsDir}/${project.qname}`; + return outputPath ?? `${outputRoot}/${projectsDir}/${project?.qname}`; }; export const serialize = async ( project: Project, outputPath: string, - formatOverride?: 'yaml' | 'json', + formatOverride?: 'yaml' | 'json' | 'state', dryRun = false ) => { const root = path.dirname(outputPath); await mkdir(root, { recursive: true }); const format = formatOverride ?? project.config?.formats.project; - const output = project?.serialize('project', { format }); + + const output = + format === 'state' + ? project?.serialize('state', { format: 'json' }) + : project?.serialize('project', { format }); const maybeWriteFile = (filePath: string, output: string) => { if (!dryRun) { @@ -183,3 +188,31 @@ class DeployError extends Error { super(message); } } + +export async function tidyWorkflowDir( + currentProject: Project | undefined, + incomingProject: Project | undefined, + dryRun = false +) { + if (!currentProject || !incomingProject) { + return []; + } + + const currentFiles = currentProject.serialize('fs'); + const newFiles = incomingProject.serialize('fs'); + + const toRemove: string[] = []; + // any files not in the new list should be removed + for (const path in currentFiles) { + if (!newFiles[path]) { + toRemove.push(path); + } + } + + if (!dryRun) { + await rimraf(toRemove); + } + + // Return and sort for testing + return toRemove.sort(); +} diff --git a/packages/cli/src/util/cache.ts b/packages/cli/src/util/cache.ts index cd69d931c..182ab0ce3 100644 --- a/packages/cli/src/util/cache.ts +++ b/packages/cli/src/util/cache.ts @@ -6,30 +6,42 @@ import type { ExecutionPlan } from '@openfn/lexicon'; import type { Opts } from '../options'; import type { Logger } from './logger'; -export const getCachePath = async ( - plan: ExecutionPlan, - options: Pick, +export const CACHE_DIR = '.cli-cache'; + +// TODO this is all a bit over complicated tbh +export const getCachePath = ( + options: Pick, + workflowName?: string, stepId?: string ) => { - const { baseDir } = options; - - const { name } = plan.workflow; + const { baseDir, cachePath } = options; + if (cachePath) { + if (stepId) { + return path.resolve(cachePath, `${stepId.replace(/ /, '-')}.json`); + } + return path.resolve(cachePath); + } - const basePath = `${baseDir}/.cli-cache/${name}`; + const basePath = path.resolve( + baseDir ?? process.cwd(), + `${CACHE_DIR}/${workflowName}` + ); if (stepId) { - return path.resolve(`${basePath}/${stepId.replace(/ /, '-')}.json`); + return `${basePath}/${stepId.replace(/ /, '-')}.json`; } - return path.resolve(basePath); + return basePath; }; -const ensureGitIgnore = (options: any) => { +const ensureGitIgnore = (options: any, cachePath: string) => { if (!options._hasGitIgnore) { - const ignorePath = path.resolve( - options.baseDir, - '.cli-cache', - '.gitignore' - ); + // Find the root cache folder + let root = cachePath; + while (root.length > 1 && !root.endsWith(CACHE_DIR)) { + root = path.dirname(root); + } + // From the root cache, look for a .gitignore + const ignorePath = path.resolve(root, '.gitignore'); try { fs.accessSync(ignorePath); } catch (e) { @@ -48,11 +60,11 @@ export const saveToCache = async ( logger: Logger ) => { if (options.cacheSteps) { - const cachePath = await getCachePath(plan, options, stepId); + const cachePath = await getCachePath(options, plan.workflow.name, stepId); // Note that this is sync because other execution order gets messed up fs.mkdirSync(path.dirname(cachePath), { recursive: true }); - ensureGitIgnore(options); + ensureGitIgnore(options, path.dirname(cachePath)); logger.info(`Writing ${stepId} output to ${cachePath}`); fs.writeFileSync(cachePath, JSON.stringify(output)); @@ -64,7 +76,7 @@ export const clearCache = async ( options: Pick, logger: Logger ) => { - const cacheDir = await getCachePath(plan, options); + const cacheDir = await getCachePath(options, plan.workflow?.name); try { await rmdir(cacheDir, { recursive: true }); diff --git a/packages/cli/src/util/load-plan.ts b/packages/cli/src/util/load-plan.ts index de773cc4d..203a7acb9 100644 --- a/packages/cli/src/util/load-plan.ts +++ b/packages/cli/src/util/load-plan.ts @@ -12,6 +12,7 @@ import type { Logger } from './logger'; import type { CLIExecutionPlan, CLIJobNode, OldCLIWorkflow } from '../types'; import resolvePath from './resolve-path'; import { CREDENTIALS_KEY } from '../execute/apply-credential-map'; +import { CACHE_DIR } from './cache'; const loadPlan = async ( options: Pick< @@ -27,6 +28,7 @@ const loadPlan = async ( | 'globals' | 'credentials' | 'collectionsEndpoint' + | 'cachePath' > & { workflow?: Opts['workflow']; workspace?: string; // from project opts @@ -61,6 +63,8 @@ const loadPlan = async ( options.credentials ??= workspace.getConfig().credentials; options.collectionsEndpoint ??= proj.openfn?.endpoint; + // Set the cache path to be relative to the workflow + options.cachePath ??= workspace.workflowsPath + `/${name}/${CACHE_DIR}`; } if (options.path && /ya?ml$/.test(options.path)) { @@ -102,7 +106,10 @@ const loadPlan = async ( ); } else { // This is the main route now - just load the workflow from the file - return loadXPlan({ workflow: workflowObj }, options, logger, defaultName); + const { id, start, options: o, ...w } = workflowObj; + const opts = { ...o, start }; + const plan = { id, workflow: w, options: opts }; + return loadXPlan(plan, options, logger, defaultName); } }; diff --git a/packages/cli/src/util/load-state.ts b/packages/cli/src/util/load-state.ts index 9161806a9..fd0c911f9 100644 --- a/packages/cli/src/util/load-state.ts +++ b/packages/cli/src/util/load-state.ts @@ -71,7 +71,11 @@ export default async ( const upstreamStepId = getUpstreamStepId(plan, start); if (upstreamStepId) { log.debug(`Input step for "${start}" is "${upstreamStepId}"`); - const cachedStatePath = await getCachePath(plan, opts, upstreamStepId); + const cachedStatePath = await getCachePath( + opts, + plan.workflow.name, + upstreamStepId + ); log.debug('Loading cached state from', cachedStatePath); try { diff --git a/packages/cli/test/execute/execute.test.ts b/packages/cli/test/execute/execute.test.ts index 7fcbbff85..2b500fa1b 100644 --- a/packages/cli/test/execute/execute.test.ts +++ b/packages/cli/test/execute/execute.test.ts @@ -93,6 +93,33 @@ test.serial('run a workflow', async (t) => { t.is(result.data.count, 84); }); +test.serial('run a workflow in new format with start', async (t) => { + const workflow = { + start: 'a', + steps: [ + { + id: 'b', + expression: `${fn}fn((state) => { state.data.count = state.data.count * 2; return state; });`, + }, + { + id: 'a', + expression: `${fn}fn(() => ({ data: { count: 42 } }));`, + next: { b: true }, + }, + ], + }; + mockFs({ + '/workflow.json': JSON.stringify(workflow), + }); + + const options = { + ...defaultOptions, + workflowPath: '/workflow.json', + }; + const result = await handler(options, logger); + t.is(result.data.count, 84); +}); + test.serial('run a workflow with a JSON credential map', async (t) => { const workflow = { workflow: { diff --git a/packages/cli/test/projects/checkout.test.ts b/packages/cli/test/projects/checkout.test.ts index 55a5485bf..44fcb0fa3 100644 --- a/packages/cli/test/projects/checkout.test.ts +++ b/packages/cli/test/projects/checkout.test.ts @@ -246,6 +246,7 @@ test.serial('checkout: switching to and back between projects', async (t) => { command: 'project-checkout', project: 'my-project', workspace: '/ws', + clean: true, }, logger ); diff --git a/packages/cli/test/projects/deploy.test.ts b/packages/cli/test/projects/deploy.test.ts index 5809fe736..903dabf52 100644 --- a/packages/cli/test/projects/deploy.test.ts +++ b/packages/cli/test/projects/deploy.test.ts @@ -1,10 +1,45 @@ +import { readFile, writeFile } from 'node:fs/promises'; import test from 'ava'; +import mock from 'mock-fs'; +import path from 'node:path'; import Project, { generateWorkflow } from '@openfn/project'; import { createMockLogger } from '@openfn/logger'; -import { reportDiff } from '../../src/projects/deploy'; +import createLightningServer, { + DEFAULT_PROJECT_ID, +} from '@openfn/lightning-mock'; + +import { + handler as deployHandler, + reportDiff, +} from '../../src/projects/deploy'; +import { myProject_yaml, myProject_v1 } from './fixtures'; +import { checkout } from '../../src/projects'; const logger = createMockLogger(undefined, { level: 'debug' }); +const port = 9876; +const ENDPOINT = `http://localhost:${port}`; + +let server: any; + +test.before(async () => { + server = await createLightningServer({ port }); +}); + +test.beforeEach(() => { + server.addProject(myProject_v1); + logger._reset(); + mock.restore(); +}); + +const mockFs = (paths: Record) => { + const pnpm = path.resolve('../../node_modules/.pnpm'); + mock({ + [pnpm]: mock.load(pnpm, {}), + ...paths, + }); +}; + // what will deploy tests look like? // deploy a project for the first time (this doesn't work though?) @@ -148,3 +183,98 @@ test('reportDiff: should report mix of added, changed, and removed workflows', ( t.truthy(logger._find('always', /workflows removed/i)); t.truthy(logger._find('always', /- c/i)); }); + +// This doesn't work until local history is tracked properly +test.serial.skip( + 'deploy a change to a project and write the yaml back (compatible histories)', + async (t) => { + mockFs({ + '/ws/.projects/main@app.openfn.org.yaml': myProject_yaml, + '/ws/openfn.yaml': '', + }); + + // first checkout the project + await checkout( + { + project: 'main', + workspace: '/ws', + }, + logger + ); + + // Now change the expression + await writeFile('/ws/workflows/my-workflow/transform-data.js', 'log()'); + + await deployHandler( + { + endpoint: ENDPOINT, + apiKey: 'test-api-key', + workspace: '/ws', + log: 'debug', + } as any, + logger + ); + + // Check what was uploaded to Lightning - the internal app state + // should be the exact state object that was uploaded + const uploadedState = + server.state.projects['e16c5f09-f0cb-4ba7-a4c2-73fcb2f29d00']; + t.truthy(uploadedState); + + const projectYaml = await readFile( + '/ws/.projects/main@app.openfn.org.yaml', + 'utf8' + ); + t.regex(projectYaml, /fn()/); + + const success = logger._find('success', /Updated project at/); + t.truthy(success); + } +); + +// TODO skipping while history checking is messed up +test.serial.skip( + 'Exit early if the remote is not compatible with local', + async (t) => { + mockFs({ + '/ws/.projects/main@app.openfn.org.yaml': myProject_yaml, + '/ws/openfn.yaml': '', + }); + + // Update the server-side project + const changed = JSON.parse(JSON.stringify(myProject_v1)); + changed.workflows['my-workflow'].version_history.push('app:abc'); + + server.addProject('e16c5f09-f0cb-4ba7-a4c2-73fcb2f29d00', changed); + + // checkout the project locally + await checkout( + { + project: 'main', + workspace: '/ws', + }, + logger + ); + + // Now change the expression + await writeFile('/ws/workflows/my-workflow/transform-data.js', 'log()'); + + await deployHandler( + { + endpoint: ENDPOINT, + apiKey: 'test-api-key', + workspace: '/ws', + } as any, + logger + ); + + // The remote project should not have changed + const appState = + server.state.projects['e16c5f09-f0cb-4ba7-a4c2-73fcb2f29d00']; + t.deepEqual(appState, myProject_v1); + + // We should log what's going on to the user + const expectedLog = logger._find('error', /projects have diverged/i); + t.truthy(expectedLog); + } +); diff --git a/packages/cli/test/projects/fetch.test.ts b/packages/cli/test/projects/fetch.test.ts index ff6ef9922..301d656e7 100644 --- a/packages/cli/test/projects/fetch.test.ts +++ b/packages/cli/test/projects/fetch.test.ts @@ -480,7 +480,7 @@ test.serial( .replace('fn()', 'fn(x)') // arbitrary edit so that we can track the change .replace(' - a', ' - z'); // change the local history to be incompatible - // Make it look like we've checked out hte project + // Make it look like we've checked out the project mock({ '/ws/.projects': {}, '/ws/openfn.yaml': '', diff --git a/packages/cli/test/projects/util.test.ts b/packages/cli/test/projects/util.test.ts new file mode 100644 index 000000000..681a238f4 --- /dev/null +++ b/packages/cli/test/projects/util.test.ts @@ -0,0 +1,159 @@ +import test from 'ava'; +import Project, { generateWorkflow } from '@openfn/project'; +import { tidyWorkflowDir } from '../../src/projects/util'; + +test('tidyWorkflowDir: removes workflows that no longer exist', async (t) => { + const currentProject = new Project({ + name: 'current', + workflows: [ + generateWorkflow('@id A trigger-x'), + generateWorkflow('@id B trigger-y'), + ], + }); + + const incomingProject = new Project({ + name: 'incoming', + workflows: [ + generateWorkflow('@id A trigger-x'), + generateWorkflow('@id D trigger-w'), + ], + }); + + const toRemove = await tidyWorkflowDir(currentProject, incomingProject, true); + + t.deepEqual(toRemove, ['workflows/B/B.yaml']); +}); + +test('tidyWorkflowDir: do nothing when no workflows are removed', async (t) => { + const currentProject = new Project({ + name: 'current', + workflows: [generateWorkflow('@id A trigger-x')], + }); + + const incomingProject = new Project({ + name: 'incoming', + workflows: [ + generateWorkflow('@id A trigger-x'), + generateWorkflow('@id B trigger-y'), + ], + }); + + const toRemove = await tidyWorkflowDir(currentProject, incomingProject, true); + + t.deepEqual(toRemove, []); +}); + +test('tidyWorkflowDir: removes all workflows when incoming project is empty', async (t) => { + const currentProject = new Project({ + name: 'current', + workflows: [ + generateWorkflow('@id A trigger-x'), + generateWorkflow('@id B trigger-y'), + ], + }); + + const incomingProject = new Project({ + name: 'incoming', + workflows: [], + }); + + const toRemove = await tidyWorkflowDir(currentProject, incomingProject, true); + + // All workflows should be removed + t.deepEqual(toRemove, ['workflows/A/A.yaml', 'workflows/B/B.yaml']); +}); + +test('tidyWorkflowDir: both projects empty', async (t) => { + const currentProject = new Project({ + name: 'current', + workflows: [], + }); + + const incomingProject = new Project({ + name: 'incoming', + workflows: [], + }); + + const toRemove = await tidyWorkflowDir(currentProject, incomingProject, true); + + t.deepEqual(toRemove, []); +}); + +test('tidyWorkflowDir: identical projects', async (t) => { + const currentProject = new Project({ + name: 'current', + workflows: [ + generateWorkflow('@id A trigger-x'), + generateWorkflow('@id B trigger-y'), + ], + }); + + const incomingProject = new Project({ + name: 'incoming', + workflows: [ + generateWorkflow('@id A trigger-x'), + generateWorkflow('@id B trigger-y'), + ], + }); + + const toRemove = await tidyWorkflowDir(currentProject, incomingProject, true); + + t.deepEqual(toRemove, []); +}); + +test('tidyWorkflowDir: complete replacement with no overlap', async (t) => { + const currentProject = new Project({ + name: 'current', + workflows: [ + generateWorkflow('@id A trigger-x'), + generateWorkflow('@id B trigger-y'), + ], + }); + + const incomingProject = new Project({ + name: 'incoming', + workflows: [ + generateWorkflow('@id X trigger-x'), + generateWorkflow('@id Y trigger-y'), + ], + }); + + const toRemove = await tidyWorkflowDir(currentProject, incomingProject, true); + + t.deepEqual(toRemove, ['workflows/A/A.yaml', 'workflows/B/B.yaml']); +}); + +test('tidyWorkflowDir: handles undefined projects', async (t) => { + const project = new Project({ + name: 'project', + workflows: [generateWorkflow('@id A trigger-x')], + }); + + // Both undefined + let toRemove = await tidyWorkflowDir(undefined, undefined, true); + t.deepEqual(toRemove, []); + + // Current undefined + toRemove = await tidyWorkflowDir(undefined, project, true); + t.deepEqual(toRemove, []); + + // Incoming undefined + toRemove = await tidyWorkflowDir(project, undefined, true); + t.deepEqual(toRemove, []); +}); + +test('tidyWorkflowDir: removes expression files when workflow steps change', async (t) => { + const currentProject = new Project({ + name: 'current', + workflows: [generateWorkflow('@id A trigger-x(expression=fn)')], + }); + + const incomingProject = new Project({ + name: 'incoming', + workflows: [generateWorkflow('@id A trigger-z(expression=fn)')], + }); + + const toRemove = await tidyWorkflowDir(currentProject, incomingProject, true); + + t.deepEqual(toRemove, ['workflows/A/x.js']); +}); diff --git a/packages/cli/test/util/load-plan.test.ts b/packages/cli/test/util/load-plan.test.ts index 388495100..932354981 100644 --- a/packages/cli/test/util/load-plan.test.ts +++ b/packages/cli/test/util/load-plan.test.ts @@ -1,6 +1,7 @@ import test from 'ava'; import mock from 'mock-fs'; import { createMockLogger } from '@openfn/logger'; +import { omit } from 'lodash-es'; import type { Job } from '@openfn/lexicon'; import loadPlan from '../../src/util/load-plan'; @@ -181,10 +182,10 @@ test.serial('xplan: load a new flat plan from workflow path', async (t) => { }; const plan = await loadPlan(opts, logger); - - t.truthy(plan); + console.log(plan); t.deepEqual(plan, { - options: {}, // no options here! + id: undefined, + options: { start: undefined }, workflow: sampleXPlan.workflow, }); }); @@ -217,6 +218,36 @@ test.serial('xplan: expand adaptors', async (t) => { t.is(step.adaptor, undefined); }); +test.serial('flat xplan: expand adaptors', async (t) => { + const opts = { + workflowPath: 'test/wf.json', + expandAdaptors: true, + plan: {}, + }; + + const plan = { + start: 'a', + steps: [ + { + id: 'a', + expression: '.', + adaptor: 'common@1.0.0', + }, + ], + }; + mock({ + 'test/wf.json': JSON.stringify(plan), + }); + + const result = await loadPlan(opts, logger); + t.truthy(result); + + const step = result.workflow.steps[0] as Job; + t.is(step.adaptors[0], '@openfn/language-common@1.0.0'); + // @ts-ignore + t.is(step.adaptor, undefined); +}); + test.serial('xplan: do not expand adaptors', async (t) => { const opts = { workflowPath: 'test/wf.json', @@ -490,7 +521,7 @@ test.serial( async (t) => { mock({ 'test/wf.yaml': ` -name: wf +start: a steps: - id: a adaptors: [] @@ -504,8 +535,11 @@ steps: const plan = await loadPlan(opts, logger); t.truthy(plan); - // Note that options are lost in this design! - t.deepEqual(plan, { workflow: sampleXPlan.workflow, options: {} }); + t.deepEqual(plan, { + id: undefined, + workflow: omit(sampleXPlan.workflow, ['id', 'name']), + options: { start: 'a' }, + }); } ); diff --git a/packages/engine-multi/CHANGELOG.md b/packages/engine-multi/CHANGELOG.md index d927ada96..9c428a3a2 100644 --- a/packages/engine-multi/CHANGELOG.md +++ b/packages/engine-multi/CHANGELOG.md @@ -1,5 +1,12 @@ # engine-multi +## 1.10.2 + +### Patch Changes + +- Updated dependencies [ad7c7c5] + - @openfn/runtime@1.8.2 + ## 1.10.1 ### Patch Changes diff --git a/packages/engine-multi/package.json b/packages/engine-multi/package.json index 4f32735af..57a6a3880 100644 --- a/packages/engine-multi/package.json +++ b/packages/engine-multi/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/engine-multi", - "version": "1.10.1", + "version": "1.10.2", "description": "Multi-process runtime engine", "main": "dist/index.js", "type": "module", diff --git a/packages/lightning-mock/CHANGELOG.md b/packages/lightning-mock/CHANGELOG.md index dcff0e2b9..01016b5cc 100644 --- a/packages/lightning-mock/CHANGELOG.md +++ b/packages/lightning-mock/CHANGELOG.md @@ -1,5 +1,13 @@ # @openfn/lightning-mock +## 2.4.3 + +### Patch Changes + +- Updated dependencies [ad7c7c5] + - @openfn/runtime@1.8.2 + - @openfn/engine-multi@1.10.2 + ## 2.4.2 ### Patch Changes diff --git a/packages/lightning-mock/package.json b/packages/lightning-mock/package.json index 1dcabee7c..dd0df420b 100644 --- a/packages/lightning-mock/package.json +++ b/packages/lightning-mock/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/lightning-mock", - "version": "2.4.2", + "version": "2.4.3", "private": true, "description": "A mock Lightning server", "main": "dist/index.js", diff --git a/packages/lightning-mock/src/api-rest.ts b/packages/lightning-mock/src/api-rest.ts index 19fe283b7..bed2a562b 100644 --- a/packages/lightning-mock/src/api-rest.ts +++ b/packages/lightning-mock/src/api-rest.ts @@ -110,9 +110,11 @@ export default ( router.post('/api/provision', (ctx) => { const proj: any = ctx.request.body; + state.projects[proj.id] = proj; ctx.response.status = 200; + ctx.response.body = { data: proj }; }); // list with query diff --git a/packages/project/CHANGELOG.md b/packages/project/CHANGELOG.md index a1b2d67a4..b1038daf6 100644 --- a/packages/project/CHANGELOG.md +++ b/packages/project/CHANGELOG.md @@ -1,5 +1,13 @@ # @openfn/project +## 0.12.1 + +### Patch Changes + +- 6d86d9b: Workspace: add getters for project and workflow paths +- 91fe531: Fix an issue where workflow history is dropped during merge +- d0d8069: When checking out new projects, only delete the files necessary + ## 0.12.0 ### Minor Changes diff --git a/packages/project/package.json b/packages/project/package.json index 02035cdf6..f02ad0b6a 100644 --- a/packages/project/package.json +++ b/packages/project/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/project", - "version": "0.12.0", + "version": "0.12.1", "description": "Read, serialize, replicate and sync OpenFn projects", "scripts": { "test": "pnpm ava", diff --git a/packages/project/src/Workflow.ts b/packages/project/src/Workflow.ts index 0f57dc94d..e1066f981 100644 --- a/packages/project/src/Workflow.ts +++ b/packages/project/src/Workflow.ts @@ -191,6 +191,10 @@ class Workflow { this.workflow.history?.push(versionHash); } + get history() { + return this.workflow.history ?? []; + } + // return true if the current workflow can be merged into the target workflow without losing any changes canMergeInto(target: Workflow) { const thisHistory = diff --git a/packages/project/src/Workspace.ts b/packages/project/src/Workspace.ts index 16289bc34..93d44b3f8 100644 --- a/packages/project/src/Workspace.ts +++ b/packages/project/src/Workspace.ts @@ -99,6 +99,14 @@ export class Workspace { return this.projects; } + get projectsPath() { + return path.join(this.root, this.config.dirs.projects); + } + + get workflowsPath() { + return path.join(this.root, this.config.dirs.workflows); + } + /** Get a project by its alias, id or UUID. Can also include a UUID */ get(nameyThing: string) { return matchProject(nameyThing, this.projects); diff --git a/packages/project/src/merge/merge-project.ts b/packages/project/src/merge/merge-project.ts index 0d2e79c38..17abdb488 100644 --- a/packages/project/src/merge/merge-project.ts +++ b/packages/project/src/merge/merge-project.ts @@ -1,7 +1,7 @@ import { defaultsDeep, isEmpty } from 'lodash-es'; import { Project } from '../Project'; -import { mergeWorkflows } from './merge-node'; +import { mergeWorkflows } from './merge-workflow'; import mapUuids from './map-uuids'; import baseMerge from '../util/base-merge'; import getDuplicates from '../util/get-duplicates'; diff --git a/packages/project/src/merge/merge-node.ts b/packages/project/src/merge/merge-workflow.ts similarity index 98% rename from packages/project/src/merge/merge-node.ts rename to packages/project/src/merge/merge-workflow.ts index b9c98fcac..ed7aa7202 100644 --- a/packages/project/src/merge/merge-node.ts +++ b/packages/project/src/merge/merge-workflow.ts @@ -77,6 +77,7 @@ export function mergeWorkflows( return { ...target, ...newSource, + history: source.history ?? target.history, openfn: { ...target.openfn, ...source.openfn, diff --git a/packages/project/test/merge/merge-project.test.ts b/packages/project/test/merge/merge-project.test.ts index 63862fd4f..d56f6f5ac 100644 --- a/packages/project/test/merge/merge-project.test.ts +++ b/packages/project/test/merge/merge-project.test.ts @@ -2,9 +2,7 @@ import test from 'ava'; import { randomUUID } from 'node:crypto'; import Project from '../../src'; import { merge, REPLACE_MERGE } from '../../src/merge/merge-project'; -import { join } from 'node:path'; import { generateWorkflow } from '../../src/gen/generator'; -import slugify from '../../src/util/slugify'; let idgen = 0; @@ -149,6 +147,38 @@ test('merge a simple change between single-step workflows with preserved uuids', t.is(step.openfn.uuid, wf_a.steps[0].openfn.uuid); }); +test('merge with history (prefers source history)', (t) => { + // create a base workflow + const wf = { + id: 'wf', + history: ['a'], + steps: [ + { + id: 'x', + name: 'X', + adaptor: 'common', + expression: 'fn(s => s)', + }, + ], + }; + + // step up two copies with UUIDS + const wf_a = assignUUIDs(wf); + const wf_b = assignUUIDs(wf); + wf_b.history.push('b'); + + // change the adaptor + wf_b.steps[0].adaptor = 'http'; + + const main = createProject(wf_a, 'a'); + const staging = createProject(wf_b, 'b'); + + // merge staging into main + const result = merge(staging, main); + + t.deepEqual(result.workflows[0].history, ['a', 'b']); +}); + test('merge a simple change between single-step workflows with preserved numeric uuids', (t) => { // create a base workflow const wf = { diff --git a/packages/runtime/CHANGELOG.md b/packages/runtime/CHANGELOG.md index 2ef24d31b..8500f657a 100644 --- a/packages/runtime/CHANGELOG.md +++ b/packages/runtime/CHANGELOG.md @@ -1,5 +1,11 @@ # @openfn/runtime +## 1.8.2 + +### Patch Changes + +- ad7c7c5: Fix an issue where ExecutionPlan.start is not used properly. + ## 1.8.1 ### Patch Changes diff --git a/packages/runtime/package.json b/packages/runtime/package.json index 27d8478e4..887141061 100644 --- a/packages/runtime/package.json +++ b/packages/runtime/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/runtime", - "version": "1.8.1", + "version": "1.8.2", "description": "Job processing runtime.", "type": "module", "exports": { diff --git a/packages/runtime/src/execute/compile-plan.ts b/packages/runtime/src/execute/compile-plan.ts index e6f8048b0..00f958a36 100644 --- a/packages/runtime/src/execute/compile-plan.ts +++ b/packages/runtime/src/execute/compile-plan.ts @@ -125,7 +125,7 @@ export default (plan: ExecutionPlan) => { }, options: { ...options, - start: options.start ?? workflow.steps[0]?.id!, + start: options.start ?? workflow.start ?? workflow.steps[0]?.id!, }, }; diff --git a/packages/runtime/src/execute/plan.ts b/packages/runtime/src/execute/plan.ts index e04ebed28..879ed8c15 100644 --- a/packages/runtime/src/execute/plan.ts +++ b/packages/runtime/src/execute/plan.ts @@ -28,7 +28,6 @@ const executePlan = async ( throw e; } logger.info(`Executing ${plan.workflow.name || plan.id}`); - const { workflow, options } = compiledPlan; const ctx: ExecutionContext = { @@ -53,8 +52,10 @@ const executePlan = async ( logger.success(`loaded state for ${id} in ${duration}ms`); } + const startNode = options.start ?? workflow.start; + logger.debug('Starting execution from step', startNode); const queue: Array<{ stepName: string; input: any }> = [ - { stepName: options.start ?? workflow.start, input }, + { stepName: startNode, input }, ]; // count how many times each step has been called diff --git a/packages/runtime/test/execute/compile-plan.test.ts b/packages/runtime/test/execute/compile-plan.test.ts index b1ed9efa0..ddebddada 100644 --- a/packages/runtime/test/execute/compile-plan.test.ts +++ b/packages/runtime/test/execute/compile-plan.test.ts @@ -61,6 +61,53 @@ test('should preserve the start option', (t) => { t.is(compiledPlan.options.start, 'a'); }); +test('should prefer use workflow.start', (t) => { + const compiledPlan = compilePlan({ + id: 'a', + workflow: { + start: 'b', + steps: [ + { id: 'a', expression: 'a' }, + { id: 'b', expression: 'b' }, + ], + }, + }); + + t.is(compiledPlan.options.start, 'b'); +}); + +test('should prefer use options.start to workflow.start', (t) => { + const compiledPlan = compilePlan({ + id: 'a', + workflow: { + start: 'b', + steps: [ + { id: 'a', expression: 'a' }, + { id: 'b', expression: 'b' }, + ], + }, + options: { + start: 'a', + }, + }); + + t.is(compiledPlan.options.start, 'a'); +}); + +test('should default start to the first node', (t) => { + const compiledPlan = compilePlan({ + id: 'a', + workflow: { + steps: [ + { id: 'a', expression: 'a' }, + { id: 'b', expression: 'b' }, + ], + }, + }); + + t.is(compiledPlan.options.start, 'a'); +}); + test('should preserve arbitrary options', (t) => { const compiledPlan = compilePlan({ id: 'a', diff --git a/packages/ws-worker/CHANGELOG.md b/packages/ws-worker/CHANGELOG.md index 0566877c0..a9393940b 100644 --- a/packages/ws-worker/CHANGELOG.md +++ b/packages/ws-worker/CHANGELOG.md @@ -1,5 +1,13 @@ # ws-worker +## 1.21.2 + +### Patch Changes + +- Updated dependencies [ad7c7c5] + - @openfn/runtime@1.8.2 + - @openfn/engine-multi@1.10.2 + ## 1.21.1 ### Patch Changes diff --git a/packages/ws-worker/package.json b/packages/ws-worker/package.json index 165cb788c..c528225f1 100644 --- a/packages/ws-worker/package.json +++ b/packages/ws-worker/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/ws-worker", - "version": "1.21.1", + "version": "1.21.2", "description": "A Websocket Worker to connect Lightning to a Runtime Engine", "main": "dist/index.js", "type": "module", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 7066d0636..cbf61729d 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -276,6 +276,9 @@ importers: ava: specifier: 5.3.1 version: 5.3.1 + lodash-es: + specifier: ^4.17.21 + version: 4.17.21 mock-fs: specifier: ^5.5.0 version: 5.5.0