From d3d3ac2e229b018734c2c874fdf409242401adef Mon Sep 17 00:00:00 2001 From: Pavel Jbanov Date: Thu, 1 Jan 2026 12:57:21 -0500 Subject: [PATCH 1/8] feat(js): added support for bidi actions and flows --- js/core/src/action.ts | 254 ++++++++++++++++++++++++++---- js/core/src/flow.ts | 63 +++++++- js/core/src/index.ts | 2 + js/core/src/registry.ts | 2 + js/core/tests/bidi-action_test.ts | 241 ++++++++++++++++++++++++++++ js/core/tests/bidi-flow_test.ts | 112 +++++++++++++ 6 files changed, 639 insertions(+), 35 deletions(-) create mode 100644 js/core/tests/bidi-action_test.ts create mode 100644 js/core/tests/bidi-flow_test.ts diff --git a/js/core/src/action.ts b/js/core/src/action.ts index 27d746f761..c87ea58db4 100644 --- a/js/core/src/action.ts +++ b/js/core/src/action.ts @@ -17,7 +17,7 @@ import type { JSONSchema7 } from 'json-schema'; import type * as z from 'zod'; import { getAsyncContext } from './async-context.js'; -import { lazy } from './async.js'; +import { Channel, lazy } from './async.js'; import { getContext, runWithContext, type ActionContext } from './context.js'; import type { ActionType, Registry } from './registry.js'; import { parseSchema } from './schema.js'; @@ -76,7 +76,7 @@ export interface ActionResult { /** * Options (side channel) data to pass to the model. */ -export interface ActionRunOptions { +export interface ActionRunOptions { /** * Streaming callback (optional). */ @@ -106,10 +106,20 @@ export interface ActionRunOptions { onTraceStart?: (traceInfo: { traceId: string; spanId: string }) => void; } +export interface ActionBidiRunOptions + extends ActionRunOptions { + /** + * Streaming input (optional). + */ + inputStream?: AsyncIterable; + + init?: Init; +} + /** * Options (side channel) data to pass to the model. */ -export interface ActionFnArg { +export interface ActionFnArg { /** * Whether the caller of the action requested streaming. */ @@ -141,6 +151,16 @@ export interface ActionFnArg { registry?: Registry; } +export interface BidiActionFnArg + extends Omit, 'inputStream'> { + /** + * Streaming input. + */ + inputStream: AsyncIterable; + + init?: Init; +} + /** * Streaming response from an action. */ @@ -154,6 +174,15 @@ export interface StreamingResponse< output: Promise>; } +export interface BidiStreamingResponse< + O extends z.ZodTypeAny = z.ZodTypeAny, + S extends z.ZodTypeAny = z.ZodTypeAny, + I extends z.ZodTypeAny = z.ZodTypeAny, +> extends StreamingResponse { + send(chunk: z.infer): void; + close(): void; +} + /** * Self-describing, validating, observable, locally and remotely callable function. */ @@ -161,19 +190,28 @@ export type Action< I extends z.ZodTypeAny = z.ZodTypeAny, O extends z.ZodTypeAny = z.ZodTypeAny, S extends z.ZodTypeAny = z.ZodTypeAny, - RunOptions extends ActionRunOptions = ActionRunOptions, + RunOptions extends ActionRunOptions< + z.infer, + z.infer + > = ActionRunOptions, z.infer>, + Init extends z.ZodTypeAny = z.ZodTypeAny, > = ((input?: z.infer, options?: RunOptions) => Promise>) & { __action: ActionMetadata; __registry?: Registry; run( input?: z.infer, - options?: ActionRunOptions> + options?: ActionRunOptions, z.infer> ): Promise>>; stream( input?: z.infer, - opts?: ActionRunOptions> + opts?: ActionRunOptions, z.infer> ): StreamingResponse; + + streamBidi( + input?: AsyncIterable>, + opts?: ActionBidiRunOptions, z.infer, z.infer> + ): BidiStreamingResponse; }; /** @@ -201,6 +239,16 @@ export type ActionParams< actionType: ActionType; }; +export interface BidiActionParams< + I extends z.ZodTypeAny, + O extends z.ZodTypeAny, + S extends z.ZodTypeAny = z.ZodTypeAny, + Init extends z.ZodTypeAny = z.ZodTypeAny, +> extends ActionParams { + initSchema?: Init; + initJsonSchema?: JSONSchema7; +} + export type ActionAsyncParams< I extends z.ZodTypeAny, O extends z.ZodTypeAny, @@ -208,7 +256,7 @@ export type ActionAsyncParams< > = ActionParams & { fn: ( input: z.infer, - options: ActionFnArg> + options: ActionFnArg, z.infer> ) => Promise>; }; @@ -219,8 +267,8 @@ export type SimpleMiddleware = ( export type MiddlewareWithOptions = ( req: I, - options: ActionRunOptions | undefined, - next: (req?: I, options?: ActionRunOptions) => Promise + options: ActionRunOptions | undefined, + next: (req?: I, options?: ActionRunOptions) => Promise ) => Promise; /** @@ -243,20 +291,20 @@ export function actionWithMiddleware< ): Action { const wrapped = (async ( req: z.infer, - options?: ActionRunOptions> + options?: ActionRunOptions, z.infer> ) => { return (await wrapped.run(req, options)).result; }) as Action; wrapped.__action = action.__action; wrapped.run = async ( req: z.infer, - options?: ActionRunOptions> + options?: ActionRunOptions, z.infer> ): Promise>> => { let telemetry; const dispatch = async ( index: number, req: z.infer, - opts?: ActionRunOptions> + opts?: ActionRunOptions, z.infer> ) => { if (index === middleware.length) { // end of the chain, call the original model action @@ -283,6 +331,7 @@ export function actionWithMiddleware< } }; wrapped.stream = action.stream; + wrapped.streamBidi = action.streamBidi; return { result: await dispatch(0, req, options), telemetry }; }; @@ -297,10 +346,10 @@ export function action< O extends z.ZodTypeAny, S extends z.ZodTypeAny = z.ZodTypeAny, >( - config: ActionParams, + config: BidiActionParams, fn: ( input: z.infer, - options: ActionFnArg> + options: ActionFnArg, z.infer> ) => Promise> ): Action> { const actionName = @@ -321,7 +370,7 @@ export function action< const actionFn = (async ( input?: I, - options?: ActionRunOptions> + options?: ActionRunOptions, z.infer> ) => { return (await actionFn.run(input, options)).result; }) as Action>; @@ -329,12 +378,42 @@ export function action< actionFn.run = async ( input: z.infer, - options?: ActionRunOptions> + options?: ActionBidiRunOptions, z.infer> ): Promise>> => { - input = parseSchema(input, { - schema: config.inputSchema, - jsonSchema: config.inputJsonSchema, - }); + if (config.inputSchema || config.inputJsonSchema) { + if (!options?.inputStream) { + input = parseSchema(input, { + schema: config.inputSchema, + jsonSchema: config.inputJsonSchema, + }); + } else { + const inputStream = options.inputStream; + options = { + ...options, + inputStream: (async function* () { + for await (const item of inputStream) { + yield parseSchema(item, { + schema: config.inputSchema, + jsonSchema: config.inputJsonSchema, + }); + } + })(), + }; + } + } + + if (config.initSchema || config.initJsonSchema) { + const validatedInit = parseSchema(options?.init, { + schema: config.initSchema, + jsonSchema: config.initJsonSchema, + }); + if (options) { + options.init = validatedInit; + } else { + options = { init: validatedInit }; + } + } + let traceId; let spanId; let output = await runInNewSpan( @@ -379,13 +458,14 @@ export function action< !!options?.onChunk && options.onChunk !== sentinelNoopStreamingCallback, sendChunk: options?.onChunk ?? sentinelNoopStreamingCallback, + inputStream: options?.inputStream, trace: { traceId, spanId, }, registry: actionFn.__registry, abortSignal: options?.abortSignal ?? makeNoopAbortSignal(), - }); + } as BidiActionFnArg>); // if context is explicitly passed in, we run action with the provided context, // otherwise we let upstream context carry through. const output = await runWithContext(options?.context, actFn); @@ -415,7 +495,7 @@ export function action< actionFn.stream = ( input?: z.infer, - opts?: ActionRunOptions> + opts?: ActionBidiRunOptions, z.infer> ): StreamingResponse => { let chunkStreamController: ReadableStreamController>; const chunkStream = new ReadableStream>({ @@ -427,17 +507,24 @@ export function action< }); const invocationPromise = actionFn - .run(config.inputSchema ? config.inputSchema.parse(input) : input, { - onChunk: ((chunk: z.infer) => { - chunkStreamController.enqueue(chunk); - }) as S extends z.ZodVoid ? undefined : StreamingCallback>, - context: { - ...actionFn.__registry?.context, - ...(opts?.context ?? getContext()), - }, - abortSignal: opts?.abortSignal, - telemetryLabels: opts?.telemetryLabels, - }) + .run( + !opts?.inputStream && config.inputSchema + ? config.inputSchema.parse(input) + : input, + { + onChunk: ((chunk: z.infer) => { + chunkStreamController.enqueue(chunk); + }) as S extends z.ZodVoid ? undefined : StreamingCallback>, + context: { + ...actionFn.__registry?.context, + ...(opts?.context ?? getContext()), + }, + inputStream: opts?.inputStream, + abortSignal: opts?.abortSignal, + telemetryLabels: opts?.telemetryLabels, + init: (opts as BidiActionFnArg>)?.init, + } as ActionBidiRunOptions> + ) .then((s) => s.result) .finally(() => { chunkStreamController.close(); @@ -461,6 +548,38 @@ export function action< }; }; + actionFn.streamBidi = ( + inputStream?: AsyncIterable>, + opts?: ActionBidiRunOptions, z.infer> + ): BidiStreamingResponse => { + let channel: Channel> | undefined; + if (!inputStream) { + channel = new Channel>(); + inputStream = channel; + } + + const result = actionFn.stream(undefined, { + ...opts, + inputStream, + } as ActionBidiRunOptions>); + + return { + ...result, + send: (chunk) => { + if (!channel) { + throw new Error('Cannot send to a provided stream.'); + } + channel.send(chunk); + }, + close: () => { + if (!channel) { + throw new Error('Cannot close a provided stream.'); + } + channel.close(); + }, + }; + }; + if (config.use) { return actionWithMiddleware(actionFn, config.use); } @@ -483,7 +602,7 @@ export function defineAction< config: ActionParams, fn: ( input: z.infer, - options: ActionFnArg> + options: ActionFnArg, z.infer> ) => Promise> ): Action { if (isInRuntimeContext()) { @@ -501,6 +620,73 @@ export function defineAction< return act; } +/** + * Defines a bi-directional action with the given config and registers it in the registry. + */ +export function defineBidiAction< + I extends z.ZodTypeAny, + O extends z.ZodTypeAny, + S extends z.ZodTypeAny = z.ZodTypeAny, + Init extends z.ZodTypeAny = z.ZodTypeAny, +>( + registry: Registry, + config: BidiActionParams, + fn: ( + input: BidiActionFnArg, z.infer, z.infer> + ) => AsyncGenerator, z.infer, void> +): Action, z.infer>, Init> { + const act = bidiAction(config, fn); + registry.registerAction(config.actionType, act); + return act; +} + +/** + * Creates a bi-directional action with the given config. + */ +export function bidiAction< + I extends z.ZodTypeAny, + O extends z.ZodTypeAny, + S extends z.ZodTypeAny = z.ZodTypeAny, + Init extends z.ZodTypeAny = z.ZodTypeAny, +>( + config: BidiActionParams, + fn: ( + input: BidiActionFnArg, z.infer, z.infer> + ) => AsyncGenerator, z.infer, void> +): Action, z.infer>, Init> { + const meta = { ...config.metadata, bidi: true }; + return action({ ...config, metadata: meta }, async (input, options) => { + let stream = (options as BidiActionFnArg>).inputStream; + if (!stream) { + if (input !== undefined) { + stream = (async function* () { + yield input; + })(); + } else { + stream = (async function* () {})(); + } + } + + const outputGen = fn({ + ...options, + inputStream: stream, + }); + + // Manually iterate to get chunks and the return value + const iter = outputGen[Symbol.asyncIterator](); + let result: z.infer; + while (true) { + const { value, done } = await iter.next(); + if (done) { + result = value; + break; + } + options.sendChunk(value); + } + return result; + }); +} + /** * Defines an action with the given config promise and registers it in the registry. */ diff --git a/js/core/src/flow.ts b/js/core/src/flow.ts index 56c3a3d6b9..e4b07c906c 100644 --- a/js/core/src/flow.ts +++ b/js/core/src/flow.ts @@ -15,7 +15,14 @@ */ import type { z } from 'zod'; -import { ActionFnArg, action, type Action } from './action.js'; +import { + ActionFnArg, + BidiActionFnArg, + JSONSchema7, + action, + bidiAction, + type Action, +} from './action.js'; import { Registry, type HasRegistry } from './registry.js'; import { SPAN_TYPE_ATTR, runInNewSpan } from './tracing.js'; @@ -48,6 +55,16 @@ export interface FlowConfig< metadata?: Record; } +export interface BidiFlowConfig< + I extends z.ZodTypeAny = z.ZodTypeAny, + O extends z.ZodTypeAny = z.ZodTypeAny, + S extends z.ZodTypeAny = z.ZodTypeAny, + Init extends z.ZodTypeAny = z.ZodTypeAny, +> extends FlowConfig { + initSchema?: Init; + initJsonSchema?: JSONSchema7; +} + /** * Flow execution context for flow to access the streaming callback and * side-channel context data. The context itself is a function, a short-cut @@ -104,6 +121,50 @@ export function defineFlow< return f; } +/** + * Defines a bi-directional flow and registers the flow in the provided registry. + */ +export function defineBidiFlow< + I extends z.ZodTypeAny = z.ZodTypeAny, + O extends z.ZodTypeAny = z.ZodTypeAny, + S extends z.ZodTypeAny = z.ZodTypeAny, + Init extends z.ZodTypeAny = z.ZodTypeAny, +>( + registry: Registry, + config: BidiFlowConfig, + fn: ( + input: BidiActionFnArg, z.infer, z.infer> + ) => AsyncGenerator, z.infer, void> +): Flow { + const flow = bidiFlow(config, fn); + registry.registerAction('bidi-flow', flow); + return flow; +} + +/** + * Defines a bi-directional flow. + */ +export function bidiFlow< + I extends z.ZodTypeAny = z.ZodTypeAny, + O extends z.ZodTypeAny = z.ZodTypeAny, + S extends z.ZodTypeAny = z.ZodTypeAny, + Init extends z.ZodTypeAny = z.ZodTypeAny, +>( + config: BidiFlowConfig, + fn: ( + input: BidiActionFnArg, z.infer, z.infer> + ) => AsyncGenerator, z.infer, void> +): Flow { + const f = bidiAction( + { + ...config, + actionType: 'bidi-flow', + }, + fn + ); + return f; +} + /** * Registers a flow as an action in the registry. */ diff --git a/js/core/src/index.ts b/js/core/src/index.ts index 632b9b30fc..d0e60921f7 100644 --- a/js/core/src/index.ts +++ b/js/core/src/index.ts @@ -67,6 +67,8 @@ export { type StatusName, } from './error.js'; export { + bidiFlow, + defineBidiFlow, defineFlow, flow, run, diff --git a/js/core/src/registry.ts b/js/core/src/registry.ts index 5caa0eed5b..4533b5d6cb 100644 --- a/js/core/src/registry.ts +++ b/js/core/src/registry.ts @@ -44,8 +44,10 @@ const ACTION_TYPES = [ 'evaluator', 'executable-prompt', 'flow', + 'bidi-flow', 'indexer', 'model', + 'bidi-model', 'background-model', 'check-operation', 'cancel-operation', diff --git a/js/core/tests/bidi-action_test.ts b/js/core/tests/bidi-action_test.ts new file mode 100644 index 0000000000..6a2596e125 --- /dev/null +++ b/js/core/tests/bidi-action_test.ts @@ -0,0 +1,241 @@ +/** + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import * as assert from 'assert'; +import { beforeEach, describe, it } from 'node:test'; +import { z } from 'zod'; +import { defineBidiAction } from '../src/action.js'; +import { initNodeFeatures } from '../src/node.js'; +import { Registry } from '../src/registry.js'; + +initNodeFeatures(); + +describe('bidi action', () => { + var registry: Registry; + beforeEach(() => { + registry = new Registry(); + }); + + it('streamBidi ergonomic (push)', async () => { + const act = defineBidiAction( + registry, + { + name: 'chat', + actionType: 'custom', + inputSchema: z.string(), + outputSchema: z.string(), + }, + async function* ({ inputStream }) { + for await (const chunk of inputStream) { + yield `echo ${chunk}`; + } + return 'done'; + } + ); + + const session = act.streamBidi(); + session.send('1'); + session.send('2'); + session.close(); + + const chunks: string[] = []; + for await (const chunk of session.stream) { + chunks.push(chunk); + } + + assert.deepStrictEqual(chunks, ['echo 1', 'echo 2']); + assert.strictEqual(await session.output, 'done'); + }); + + it('streamBidi pull (generator)', async () => { + const act = defineBidiAction( + registry, + { + name: 'chat', + actionType: 'custom', + inputSchema: z.string(), + outputSchema: z.string(), + }, + async function* ({ inputStream }) { + for await (const chunk of inputStream) { + yield `echo ${chunk}`; + } + return 'done'; + } + ); + + async function* inputGen() { + yield '1'; + yield '2'; + } + + const session = act.streamBidi(inputGen()); + + const chunks: string[] = []; + for await (const chunk of session.stream) { + chunks.push(chunk); + } + + assert.deepStrictEqual(chunks, ['echo 1', 'echo 2']); + assert.strictEqual(await session.output, 'done'); + }); + + it('classic run works on bidi action', async () => { + const act = defineBidiAction( + registry, + { + name: 'chat', + actionType: 'custom', + inputSchema: z.string(), + outputSchema: z.string(), + }, + async function* ({ inputStream }) { + for await (const chunk of inputStream) { + // Should receive exactly one chunk + yield `echo ${chunk}`; + } + return 'done'; + } + ); + + const result = await act.run('1'); + assert.strictEqual(result.result, 'done'); + }); + + it('classic run works on bidi action with streaming', async () => { + const act = defineBidiAction( + registry, + { + name: 'chat', + actionType: 'custom', + inputSchema: z.string(), + outputSchema: z.string(), + }, + async function* ({ inputStream }) { + for await (const chunk of inputStream) { + yield `echo ${chunk}`; + } + return 'done'; + } + ); + + const chunks: string[] = []; + const result = await act.run('1', { + onChunk: (c) => chunks.push(c), + }); + + assert.deepStrictEqual(chunks, ['echo 1']); + assert.strictEqual(result.result, 'done'); + }); + + it('validates input stream items', async () => { + const act = defineBidiAction( + registry, + { + name: 'chat', + actionType: 'custom', + inputSchema: z.string(), // Input is string + }, + async function* ({ inputStream }) { + for await (const chunk of inputStream) { + yield `echo ${chunk}`; + } + } + ); + + const session = act.streamBidi(); + // Bypass TS check to send invalid data + (session as any).send(123); + session.close(); + + try { + for await (const _ of session.stream) { + // Consume + } + assert.fail('Should have thrown validation error'); + } catch (e: any) { + // Zod validation error or Genkit validation error + assert.ok( + e.message.includes('Expected string, received number') || + e.name === 'ZodError' || + e.code === 'invalid_type' || + e.message.includes('Validation failed') || + e.message.includes('must be string') + ); + } + }); + + it('bidi action receives init data', async () => { + const act = defineBidiAction( + registry, + { + name: 'chatWithInit', + actionType: 'custom', + inputSchema: z.string(), + outputSchema: z.string(), + initSchema: z.object({ prefix: z.string() }), + }, + async function* ({ inputStream, init }) { + const prefix = init?.prefix || ''; + for await (const chunk of inputStream) { + yield `${prefix}${chunk}`; + } + return 'done'; + } + ); + + const session = act.streamBidi(undefined, { init: { prefix: '>> ' } }); + session.send('1'); + session.send('2'); + session.close(); + + const chunks: string[] = []; + for await (const chunk of session.stream) { + chunks.push(chunk); + } + + assert.deepStrictEqual(chunks, ['>> 1', '>> 2']); + assert.strictEqual(await session.output, 'done'); + }); + + it('validates init data', async () => { + const act = defineBidiAction( + registry, + { + name: 'chatWithInitValidation', + actionType: 'custom', + inputSchema: z.string(), + initSchema: z.object({ count: z.number() }), + }, + async function* ({ inputStream, init }) { + yield `count: ${init?.count}`; + } + ); + + // Invalid init (string instead of number) + try { + const session = act.streamBidi(undefined, { + init: { count: '123' } as any, + }); + for await (const _ of session.stream) { + // Consume + } + assert.fail('Should have thrown validation error'); + } catch (e: any) { + assert.ok(e.message.includes('count: must be number')); + } + }); +}); diff --git a/js/core/tests/bidi-flow_test.ts b/js/core/tests/bidi-flow_test.ts new file mode 100644 index 0000000000..b131154c2d --- /dev/null +++ b/js/core/tests/bidi-flow_test.ts @@ -0,0 +1,112 @@ +/** + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import * as assert from 'assert'; +import { describe, it } from 'node:test'; +import { z } from 'zod'; +import { bidiFlow } from '../src/flow.js'; +import { initNodeFeatures } from '../src/node.js'; + +initNodeFeatures(); + +describe('bidi flow', () => { + it('streamBidi ergonomic (push)', async () => { + const flow = bidiFlow( + { + name: 'chatFlow', + inputSchema: z.string(), + outputSchema: z.string(), + }, + async function* ({ inputStream }) { + for await (const chunk of inputStream) { + yield `echo ${chunk}`; + } + return 'done'; + } + ); + + const session = flow.streamBidi(); + session.send('1'); + session.send('2'); + session.close(); + + const chunks: string[] = []; + for await (const chunk of session.stream) { + chunks.push(chunk); + } + + assert.deepStrictEqual(chunks, ['echo 1', 'echo 2']); + assert.strictEqual(await session.output, 'done'); + assert.strictEqual(flow.__action.actionType, 'bidi-flow'); + assert.ok(flow.__action.metadata?.bidi); + }); + + it('bidi flow receives init data', async () => { + const flow = bidiFlow( + { + name: 'chatFlowWithInit', + inputSchema: z.string(), + outputSchema: z.string(), + initSchema: z.object({ prefix: z.string() }), + }, + async function* ({ inputStream, init }) { + const prefix = init?.prefix || ''; + for await (const chunk of inputStream) { + yield `${prefix}${chunk}`; + } + return 'done'; + } + ); + + const session = flow.streamBidi(undefined, { init: { prefix: '>> ' } }); + session.send('1'); + session.send('2'); + session.close(); + + const chunks: string[] = []; + for await (const chunk of session.stream) { + chunks.push(chunk); + } + + assert.deepStrictEqual(chunks, ['>> 1', '>> 2']); + assert.strictEqual(await session.output, 'done'); + }); + + it('validates init data in bidi flow', async () => { + const flow = bidiFlow( + { + name: 'chatFlowWithInitValidation', + inputSchema: z.string(), + initSchema: z.object({ count: z.number() }), + }, + async function* ({ init }) { + yield `count: ${init?.count}`; + } + ); + + try { + const session = flow.streamBidi(undefined, { + init: { count: '123' } as any, + }); + for await (const _ of session.stream) { + // consume + } + assert.fail('Should have thrown validation error'); + } catch (e: any) { + assert.ok(e.message.includes('count: must be number')); + } + }); +}); From 0e9843da8333b5454a32728a270a3c734b4a99db Mon Sep 17 00:00:00 2001 From: Pavel Jbanov Date: Sat, 3 Jan 2026 20:43:56 -0500 Subject: [PATCH 2/8] fixed flow typing --- js/core/src/flow.ts | 8 +++++--- js/core/src/index.ts | 1 + 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/js/core/src/flow.ts b/js/core/src/flow.ts index e4b07c906c..9206f34714 100644 --- a/js/core/src/flow.ts +++ b/js/core/src/flow.ts @@ -17,6 +17,7 @@ import type { z } from 'zod'; import { ActionFnArg, + ActionRunOptions, BidiActionFnArg, JSONSchema7, action, @@ -33,7 +34,8 @@ export interface Flow< I extends z.ZodTypeAny = z.ZodTypeAny, O extends z.ZodTypeAny = z.ZodTypeAny, S extends z.ZodTypeAny = z.ZodTypeAny, -> extends Action {} + Init extends z.ZodTypeAny = z.ZodTypeAny, +> extends Action, z.infer>, Init> {} /** * Configuration for a streaming flow. @@ -135,7 +137,7 @@ export function defineBidiFlow< fn: ( input: BidiActionFnArg, z.infer, z.infer> ) => AsyncGenerator, z.infer, void> -): Flow { +): Flow { const flow = bidiFlow(config, fn); registry.registerAction('bidi-flow', flow); return flow; @@ -154,7 +156,7 @@ export function bidiFlow< fn: ( input: BidiActionFnArg, z.infer, z.infer> ) => AsyncGenerator, z.infer, void> -): Flow { +): Flow { const f = bidiAction( { ...config, diff --git a/js/core/src/index.ts b/js/core/src/index.ts index d0e60921f7..e8a7d0b120 100644 --- a/js/core/src/index.ts +++ b/js/core/src/index.ts @@ -72,6 +72,7 @@ export { defineFlow, flow, run, + type BidiFlowConfig, type Flow, type FlowConfig, type FlowFn, From 4dbd70647cc1d391a41c6b611a3db0cf8f89c193 Mon Sep 17 00:00:00 2001 From: Pavel Jbanov Date: Mon, 5 Jan 2026 11:43:27 -0500 Subject: [PATCH 3/8] inputStream from input --- js/core/src/action.ts | 43 +++++++++++++++++-------------- js/core/src/flow.ts | 18 ++++--------- js/core/src/index.ts | 1 - js/core/tests/bidi-action_test.ts | 8 +++--- 4 files changed, 32 insertions(+), 38 deletions(-) diff --git a/js/core/src/action.ts b/js/core/src/action.ts index c87ea58db4..a600882f61 100644 --- a/js/core/src/action.ts +++ b/js/core/src/action.ts @@ -76,7 +76,7 @@ export interface ActionResult { /** * Options (side channel) data to pass to the model. */ -export interface ActionRunOptions { +export interface ActionRunOptions { /** * Streaming callback (optional). */ @@ -104,10 +104,7 @@ export interface ActionRunOptions { * Note: This only fires once for the root action span, not for nested spans. */ onTraceStart?: (traceInfo: { traceId: string; spanId: string }) => void; -} -export interface ActionBidiRunOptions - extends ActionRunOptions { /** * Streaming input (optional). */ @@ -119,7 +116,7 @@ export interface ActionBidiRunOptions /** * Options (side channel) data to pass to the model. */ -export interface ActionFnArg { +export interface ActionFnArg { /** * Whether the caller of the action requested streaming. */ @@ -149,10 +146,7 @@ export interface ActionFnArg { abortSignal: AbortSignal; registry?: Registry; -} -export interface BidiActionFnArg - extends Omit, 'inputStream'> { /** * Streaming input. */ @@ -210,7 +204,7 @@ export type Action< streamBidi( input?: AsyncIterable>, - opts?: ActionBidiRunOptions, z.infer, z.infer> + opts?: ActionRunOptions, z.infer, z.infer> ): BidiStreamingResponse; }; @@ -378,7 +372,7 @@ export function action< actionFn.run = async ( input: z.infer, - options?: ActionBidiRunOptions, z.infer> + options?: ActionRunOptions, z.infer> ): Promise>> => { if (config.inputSchema || config.inputJsonSchema) { if (!options?.inputStream) { @@ -458,14 +452,15 @@ export function action< !!options?.onChunk && options.onChunk !== sentinelNoopStreamingCallback, sendChunk: options?.onChunk ?? sentinelNoopStreamingCallback, - inputStream: options?.inputStream, + inputStream: + options?.inputStream ?? asyncIteratorFromArray(input), trace: { traceId, spanId, }, registry: actionFn.__registry, abortSignal: options?.abortSignal ?? makeNoopAbortSignal(), - } as BidiActionFnArg>); + } as ActionFnArg>); // if context is explicitly passed in, we run action with the provided context, // otherwise we let upstream context carry through. const output = await runWithContext(options?.context, actFn); @@ -495,7 +490,7 @@ export function action< actionFn.stream = ( input?: z.infer, - opts?: ActionBidiRunOptions, z.infer> + opts?: ActionRunOptions, z.infer> ): StreamingResponse => { let chunkStreamController: ReadableStreamController>; const chunkStream = new ReadableStream>({ @@ -522,8 +517,8 @@ export function action< inputStream: opts?.inputStream, abortSignal: opts?.abortSignal, telemetryLabels: opts?.telemetryLabels, - init: (opts as BidiActionFnArg>)?.init, - } as ActionBidiRunOptions> + init: (opts as ActionFnArg>)?.init, + } as ActionRunOptions> ) .then((s) => s.result) .finally(() => { @@ -550,7 +545,7 @@ export function action< actionFn.streamBidi = ( inputStream?: AsyncIterable>, - opts?: ActionBidiRunOptions, z.infer> + opts?: ActionRunOptions, z.infer> ): BidiStreamingResponse => { let channel: Channel> | undefined; if (!inputStream) { @@ -561,7 +556,7 @@ export function action< const result = actionFn.stream(undefined, { ...opts, inputStream, - } as ActionBidiRunOptions>); + } as ActionRunOptions>); return { ...result, @@ -632,7 +627,7 @@ export function defineBidiAction< registry: Registry, config: BidiActionParams, fn: ( - input: BidiActionFnArg, z.infer, z.infer> + input: ActionFnArg, z.infer, z.infer> ) => AsyncGenerator, z.infer, void> ): Action, z.infer>, Init> { const act = bidiAction(config, fn); @@ -651,12 +646,12 @@ export function bidiAction< >( config: BidiActionParams, fn: ( - input: BidiActionFnArg, z.infer, z.infer> + input: ActionFnArg, z.infer, z.infer> ) => AsyncGenerator, z.infer, void> ): Action, z.infer>, Init> { const meta = { ...config.metadata, bidi: true }; return action({ ...config, metadata: meta }, async (input, options) => { - let stream = (options as BidiActionFnArg>).inputStream; + let stream = (options as ActionFnArg>).inputStream; if (!stream) { if (input !== undefined) { stream = (async function* () { @@ -784,3 +779,11 @@ export function runInActionRuntimeContext(fn: () => R) { export function runOutsideActionRuntimeContext(fn: () => R) { return getAsyncContext().run(runtimeContextAslKey, 'outside', fn); } + +async function* asyncIteratorFromArray(array: T[]): AsyncIterator { + for (const item of array) { + // Optionally, perform some async operation here + await new Promise((resolve) => setTimeout(resolve, 100)); // Example delay + yield item; + } +} diff --git a/js/core/src/flow.ts b/js/core/src/flow.ts index 9206f34714..1a4ff34236 100644 --- a/js/core/src/flow.ts +++ b/js/core/src/flow.ts @@ -18,7 +18,6 @@ import type { z } from 'zod'; import { ActionFnArg, ActionRunOptions, - BidiActionFnArg, JSONSchema7, action, bidiAction, @@ -44,6 +43,7 @@ export interface FlowConfig< I extends z.ZodTypeAny = z.ZodTypeAny, O extends z.ZodTypeAny = z.ZodTypeAny, S extends z.ZodTypeAny = z.ZodTypeAny, + Init extends z.ZodTypeAny = z.ZodTypeAny, > { /** Name of the flow. */ name: string; @@ -55,14 +55,6 @@ export interface FlowConfig< streamSchema?: S; /** Metadata of the flow used by tooling. */ metadata?: Record; -} - -export interface BidiFlowConfig< - I extends z.ZodTypeAny = z.ZodTypeAny, - O extends z.ZodTypeAny = z.ZodTypeAny, - S extends z.ZodTypeAny = z.ZodTypeAny, - Init extends z.ZodTypeAny = z.ZodTypeAny, -> extends FlowConfig { initSchema?: Init; initJsonSchema?: JSONSchema7; } @@ -133,9 +125,9 @@ export function defineBidiFlow< Init extends z.ZodTypeAny = z.ZodTypeAny, >( registry: Registry, - config: BidiFlowConfig, + config: FlowConfig, fn: ( - input: BidiActionFnArg, z.infer, z.infer> + input: ActionFnArg, z.infer, z.infer> ) => AsyncGenerator, z.infer, void> ): Flow { const flow = bidiFlow(config, fn); @@ -152,9 +144,9 @@ export function bidiFlow< S extends z.ZodTypeAny = z.ZodTypeAny, Init extends z.ZodTypeAny = z.ZodTypeAny, >( - config: BidiFlowConfig, + config: FlowConfig, fn: ( - input: BidiActionFnArg, z.infer, z.infer> + input: ActionFnArg, z.infer, z.infer> ) => AsyncGenerator, z.infer, void> ): Flow { const f = bidiAction( diff --git a/js/core/src/index.ts b/js/core/src/index.ts index e8a7d0b120..d0e60921f7 100644 --- a/js/core/src/index.ts +++ b/js/core/src/index.ts @@ -72,7 +72,6 @@ export { defineFlow, flow, run, - type BidiFlowConfig, type Flow, type FlowConfig, type FlowFn, diff --git a/js/core/tests/bidi-action_test.ts b/js/core/tests/bidi-action_test.ts index 6a2596e125..3bb3beb612 100644 --- a/js/core/tests/bidi-action_test.ts +++ b/js/core/tests/bidi-action_test.ts @@ -103,16 +103,16 @@ describe('bidi action', () => { outputSchema: z.string(), }, async function* ({ inputStream }) { + const inputs: string[] = []; for await (const chunk of inputStream) { - // Should receive exactly one chunk - yield `echo ${chunk}`; + inputs.push(chunk); } - return 'done'; + return `done: ${inputs.join(', ')}`; } ); const result = await act.run('1'); - assert.strictEqual(result.result, 'done'); + assert.strictEqual(result.result, 'done: 1'); }); it('classic run works on bidi action with streaming', async () => { From bbb29f7dab1f879bf1eb75f98b6ccdd7ffbd03ee Mon Sep 17 00:00:00 2001 From: Pavel Jbanov Date: Mon, 5 Jan 2026 19:51:35 -0500 Subject: [PATCH 4/8] fix --- js/core/src/action.ts | 4 +--- js/core/src/flow.ts | 4 ++-- js/core/src/registry.ts | 1 - js/core/tests/bidi-flow_test.ts | 2 +- js/genkit/src/genkit.ts | 21 +++++++++++++++++++ js/testapps/flow-sample1/src/index.ts | 30 +++++++++++++++++++++++++++ 6 files changed, 55 insertions(+), 7 deletions(-) diff --git a/js/core/src/action.ts b/js/core/src/action.ts index a600882f61..17f10c2038 100644 --- a/js/core/src/action.ts +++ b/js/core/src/action.ts @@ -453,7 +453,7 @@ export function action< options.onChunk !== sentinelNoopStreamingCallback, sendChunk: options?.onChunk ?? sentinelNoopStreamingCallback, inputStream: - options?.inputStream ?? asyncIteratorFromArray(input), + options?.inputStream ?? asyncIteratorFromArray([input]), trace: { traceId, spanId, @@ -782,8 +782,6 @@ export function runOutsideActionRuntimeContext(fn: () => R) { async function* asyncIteratorFromArray(array: T[]): AsyncIterator { for (const item of array) { - // Optionally, perform some async operation here - await new Promise((resolve) => setTimeout(resolve, 100)); // Example delay yield item; } } diff --git a/js/core/src/flow.ts b/js/core/src/flow.ts index 1a4ff34236..10aa4ca718 100644 --- a/js/core/src/flow.ts +++ b/js/core/src/flow.ts @@ -131,7 +131,7 @@ export function defineBidiFlow< ) => AsyncGenerator, z.infer, void> ): Flow { const flow = bidiFlow(config, fn); - registry.registerAction('bidi-flow', flow); + registry.registerAction('flow', flow); return flow; } @@ -152,7 +152,7 @@ export function bidiFlow< const f = bidiAction( { ...config, - actionType: 'bidi-flow', + actionType: 'flow', }, fn ); diff --git a/js/core/src/registry.ts b/js/core/src/registry.ts index 4533b5d6cb..01c3e640b0 100644 --- a/js/core/src/registry.ts +++ b/js/core/src/registry.ts @@ -44,7 +44,6 @@ const ACTION_TYPES = [ 'evaluator', 'executable-prompt', 'flow', - 'bidi-flow', 'indexer', 'model', 'bidi-model', diff --git a/js/core/tests/bidi-flow_test.ts b/js/core/tests/bidi-flow_test.ts index b131154c2d..7bd150963d 100644 --- a/js/core/tests/bidi-flow_test.ts +++ b/js/core/tests/bidi-flow_test.ts @@ -50,7 +50,7 @@ describe('bidi flow', () => { assert.deepStrictEqual(chunks, ['echo 1', 'echo 2']); assert.strictEqual(await session.output, 'done'); - assert.strictEqual(flow.__action.actionType, 'bidi-flow'); + assert.strictEqual(flow.__action.actionType, 'flow'); assert.ok(flow.__action.metadata?.bidi); }); diff --git a/js/genkit/src/genkit.ts b/js/genkit/src/genkit.ts index 2cd34a4c7a..104b495ee0 100644 --- a/js/genkit/src/genkit.ts +++ b/js/genkit/src/genkit.ts @@ -105,9 +105,11 @@ import { } from '@genkit-ai/ai/tool'; import { ActionFnArg, + ActionRunOptions, GenkitError, Operation, ReflectionServer, + defineBidiFlow, defineDynamicActionProvider, defineFlow, defineJsonSchema, @@ -227,6 +229,25 @@ export class Genkit implements HasRegistry { return flow; } + /** + * Defines and registers a bi-directional flow. + */ + defineBidiFlow< + I extends z.ZodTypeAny = z.ZodTypeAny, + O extends z.ZodTypeAny = z.ZodTypeAny, + S extends z.ZodTypeAny = z.ZodTypeAny, + Init extends z.ZodTypeAny = z.ZodTypeAny, + >( + config: FlowConfig, + fn: ( + input: ActionFnArg, z.infer, z.infer> + ) => AsyncGenerator, z.infer, void> + ): Action, z.infer>, Init> { + const flow = defineBidiFlow(this.registry, config, fn); + this.flows.push(flow); + return flow; + } + /** * Defines and registers a tool that can return multiple parts of content. * diff --git a/js/testapps/flow-sample1/src/index.ts b/js/testapps/flow-sample1/src/index.ts index 8c3f55abf1..1c921b4a67 100644 --- a/js/testapps/flow-sample1/src/index.ts +++ b/js/testapps/flow-sample1/src/index.ts @@ -312,3 +312,33 @@ function generateString(length: number) { } return str.substring(0, length); } + +export const chatFlow = ai.defineBidiFlow( + { + name: 'chatFlow', + inputSchema: z.string(), + outputSchema: z.string(), + }, + async function* ({ inputStream }) { + for await (const chunk of inputStream) { + yield `echo ${chunk}`; + } + return 'done'; + } +); + +export const chatFlowWithInit = ai.defineBidiFlow( + { + name: 'chatFlowWithInit', + inputSchema: z.string(), + outputSchema: z.string(), + initSchema: z.object({ prefix: z.string() }), + }, + async function* ({ inputStream, init }) { + const prefix = init?.prefix || ''; + for await (const chunk of inputStream) { + yield `${prefix}${chunk}`; + } + return 'done'; + } +); From cb68c2333d66e213715348257d803c091728b320 Mon Sep 17 00:00:00 2001 From: Pavel Jbanov Date: Mon, 26 Jan 2026 19:49:44 -0500 Subject: [PATCH 5/8] jsdocs --- js/core/src/action.ts | 77 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 77 insertions(+) diff --git a/js/core/src/action.ts b/js/core/src/action.ts index 17f10c2038..7932cf0cc9 100644 --- a/js/core/src/action.ts +++ b/js/core/src/action.ts @@ -51,14 +51,41 @@ export interface ActionMetadata< O extends z.ZodTypeAny = z.ZodTypeAny, S extends z.ZodTypeAny = z.ZodTypeAny, > { + /** + * The type of action (e.g. 'prompt', 'flow'). + */ actionType?: ActionType; + /** + * The name of the action. + */ name: string; + /** + * Description of the action. + */ description?: string; + /** + * Input Zod schema. + */ inputSchema?: I; + /** + * Input JSON schema. + */ inputJsonSchema?: JSONSchema7; + /** + * Output Zod schema. + */ outputSchema?: O; + /** + * Output JSON schema. + */ outputJsonSchema?: JSONSchema7; + /** + * Stream Zod schema. + */ streamSchema?: S; + /** + * Metadata for the action. + */ metadata?: Record; } @@ -110,6 +137,9 @@ export interface ActionRunOptions { */ inputStream?: AsyncIterable; + /** + * Initialization data provided to the action. + */ init?: Init; } @@ -152,6 +182,9 @@ export interface ActionFnArg { */ inputStream: AsyncIterable; + /** + * Initialization data provided to the action. + */ init?: Init; } @@ -173,7 +206,13 @@ export interface BidiStreamingResponse< S extends z.ZodTypeAny = z.ZodTypeAny, I extends z.ZodTypeAny = z.ZodTypeAny, > extends StreamingResponse { + /** + * Sends a chunk of data to the action (for bi-directional streaming). + */ send(chunk: z.infer): void; + /** + * Closes the input stream to the action. + */ close(): void; } @@ -190,7 +229,9 @@ export type Action< > = ActionRunOptions, z.infer>, Init extends z.ZodTypeAny = z.ZodTypeAny, > = ((input?: z.infer, options?: RunOptions) => Promise>) & { + /** @hidden */ __action: ActionMetadata; + /** @hidden */ __registry?: Registry; run( input?: z.infer, @@ -216,20 +257,50 @@ export type ActionParams< O extends z.ZodTypeAny, S extends z.ZodTypeAny = z.ZodTypeAny, > = { + /** + * Name of the action, or an object with pluginId and actionId. + */ name: | string | { pluginId: string; actionId: string; }; + /** + * Description of the action. + */ description?: string; + /** + * Input Zod schema. + */ inputSchema?: I; + /** + * Input JSON schema. + */ inputJsonSchema?: JSONSchema7; + /** + * Output Zod schema. + */ outputSchema?: O; + /** + * Output JSON schema. + */ outputJsonSchema?: JSONSchema7; + /** + * Metadata for the action. + */ metadata?: Record; + /** + * Middleware to apply to the action. + */ use?: Middleware, z.infer, z.infer>[]; + /** + * Stream Zod schema. + */ streamSchema?: S; + /** + * The type of action. + */ actionType: ActionType; }; @@ -239,7 +310,13 @@ export interface BidiActionParams< S extends z.ZodTypeAny = z.ZodTypeAny, Init extends z.ZodTypeAny = z.ZodTypeAny, > extends ActionParams { + /** + * Zod schema for the initialization data. + */ initSchema?: Init; + /** + * JSON schema for the initialization data. + */ initJsonSchema?: JSONSchema7; } From 0f4ea7f4c3fdacc38019f7a49959072fa815c3ac Mon Sep 17 00:00:00 2001 From: Pavel Jbanov Date: Mon, 26 Jan 2026 19:53:35 -0500 Subject: [PATCH 6/8] jsdocs --- js/core/src/action.ts | 36 +++++++++--------------------------- js/core/src/flow.ts | 2 ++ 2 files changed, 11 insertions(+), 27 deletions(-) diff --git a/js/core/src/action.ts b/js/core/src/action.ts index 7932cf0cc9..0f76744552 100644 --- a/js/core/src/action.ts +++ b/js/core/src/action.ts @@ -51,41 +51,23 @@ export interface ActionMetadata< O extends z.ZodTypeAny = z.ZodTypeAny, S extends z.ZodTypeAny = z.ZodTypeAny, > { - /** - * The type of action (e.g. 'prompt', 'flow'). - */ + /** The type of action (e.g. 'prompt', 'flow'). */ actionType?: ActionType; - /** - * The name of the action. - */ + /** The name of the action. */ name: string; - /** - * Description of the action. - */ + /** Description of the action. */ description?: string; - /** - * Input Zod schema. - */ + /** Input Zod schema. */ inputSchema?: I; - /** - * Input JSON schema. - */ + /** Input JSON schema. */ inputJsonSchema?: JSONSchema7; - /** - * Output Zod schema. - */ + /** Output Zod schema. */ outputSchema?: O; - /** - * Output JSON schema. - */ + /** Output JSON schema. */ outputJsonSchema?: JSONSchema7; - /** - * Stream Zod schema. - */ + /** Stream Zod schema. */ streamSchema?: S; - /** - * Metadata for the action. - */ + /** Metadata for the action. */ metadata?: Record; } diff --git a/js/core/src/flow.ts b/js/core/src/flow.ts index 10aa4ca718..fedb60e784 100644 --- a/js/core/src/flow.ts +++ b/js/core/src/flow.ts @@ -55,7 +55,9 @@ export interface FlowConfig< streamSchema?: S; /** Metadata of the flow used by tooling. */ metadata?: Record; + /** Schema of the initialization data. */ initSchema?: Init; + /** JSON schema of the initialization data. */ initJsonSchema?: JSONSchema7; } From 8df4226fdc73d52d44aeafcbd46e8c8b0e249380 Mon Sep 17 00:00:00 2001 From: Pavel Jbanov Date: Mon, 26 Jan 2026 20:06:47 -0500 Subject: [PATCH 7/8] docs: Add JSDoc comments to various action and flow types and functions. --- js/core/src/action.ts | 15 +++++++++++++++ js/core/src/flow.ts | 6 ++++++ 2 files changed, 21 insertions(+) diff --git a/js/core/src/action.ts b/js/core/src/action.ts index 0f76744552..8990b0da53 100644 --- a/js/core/src/action.ts +++ b/js/core/src/action.ts @@ -183,6 +183,9 @@ export interface StreamingResponse< output: Promise>; } +/** + * Streaming response from a bi-directional action. + */ export interface BidiStreamingResponse< O extends z.ZodTypeAny = z.ZodTypeAny, S extends z.ZodTypeAny = z.ZodTypeAny, @@ -286,6 +289,9 @@ export type ActionParams< actionType: ActionType; }; +/** + * Configuration for a bi-directional action. + */ export interface BidiActionParams< I extends z.ZodTypeAny, O extends z.ZodTypeAny, @@ -302,6 +308,9 @@ export interface BidiActionParams< initJsonSchema?: JSONSchema7; } +/** + * Configuration for an async action (lazy loaded). + */ export type ActionAsyncParams< I extends z.ZodTypeAny, O extends z.ZodTypeAny, @@ -313,11 +322,17 @@ export type ActionAsyncParams< ) => Promise>; }; +/** + * Simple middleware that only modifies request/response. + */ export type SimpleMiddleware = ( req: I, next: (req?: I) => Promise ) => Promise; +/** + * Middleware that has access to options (including streaming callback). + */ export type MiddlewareWithOptions = ( req: I, options: ActionRunOptions | undefined, diff --git a/js/core/src/flow.ts b/js/core/src/flow.ts index fedb60e784..bf916cf7ef 100644 --- a/js/core/src/flow.ts +++ b/js/core/src/flow.ts @@ -194,12 +194,18 @@ function flowAction< ); } +/** + * A flow step that executes the provided function. + */ export function run( name: string, func: () => Promise, _?: Registry ): Promise; +/** + * A flow step that executes the provided function with input. + */ export function run( name: string, input: any, From fab14b2d077cc916eab9956531cfb5103b4d931f Mon Sep 17 00:00:00 2001 From: Pavel Jbanov Date: Mon, 26 Jan 2026 20:15:38 -0500 Subject: [PATCH 8/8] feedback --- js/core/src/action.ts | 12 ++++++------ js/core/src/registry.ts | 1 - 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/js/core/src/action.ts b/js/core/src/action.ts index 8990b0da53..8ac411bcd9 100644 --- a/js/core/src/action.ts +++ b/js/core/src/action.ts @@ -527,14 +527,14 @@ export function action< options.onChunk !== sentinelNoopStreamingCallback, sendChunk: options?.onChunk ?? sentinelNoopStreamingCallback, inputStream: - options?.inputStream ?? asyncIteratorFromArray([input]), + options?.inputStream ?? asyncIterableFromArray([input]), trace: { traceId, spanId, }, registry: actionFn.__registry, abortSignal: options?.abortSignal ?? makeNoopAbortSignal(), - } as ActionFnArg>); + } as ActionFnArg, z.infer>); // if context is explicitly passed in, we run action with the provided context, // otherwise we let upstream context carry through. const output = await runWithContext(options?.context, actFn); @@ -592,7 +592,7 @@ export function action< abortSignal: opts?.abortSignal, telemetryLabels: opts?.telemetryLabels, init: (opts as ActionFnArg>)?.init, - } as ActionRunOptions> + } as ActionRunOptions, z.infer> ) .then((s) => s.result) .finally(() => { @@ -630,7 +630,7 @@ export function action< const result = actionFn.stream(undefined, { ...opts, inputStream, - } as ActionRunOptions>); + } as ActionRunOptions, z.infer>); return { ...result, @@ -725,7 +725,7 @@ export function bidiAction< ): Action, z.infer>, Init> { const meta = { ...config.metadata, bidi: true }; return action({ ...config, metadata: meta }, async (input, options) => { - let stream = (options as ActionFnArg>).inputStream; + let stream = options.inputStream; if (!stream) { if (input !== undefined) { stream = (async function* () { @@ -854,7 +854,7 @@ export function runOutsideActionRuntimeContext(fn: () => R) { return getAsyncContext().run(runtimeContextAslKey, 'outside', fn); } -async function* asyncIteratorFromArray(array: T[]): AsyncIterator { +async function* asyncIterableFromArray(array: T[]): AsyncIterable { for (const item of array) { yield item; } diff --git a/js/core/src/registry.ts b/js/core/src/registry.ts index 01c3e640b0..5caa0eed5b 100644 --- a/js/core/src/registry.ts +++ b/js/core/src/registry.ts @@ -46,7 +46,6 @@ const ACTION_TYPES = [ 'flow', 'indexer', 'model', - 'bidi-model', 'background-model', 'check-operation', 'cancel-operation',