-
Notifications
You must be signed in to change notification settings - Fork 650
RFC: [JS] Bidirectional Actions, Flows, and Models #4210
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -0,0 +1,201 @@ | ||||||
| # RFC: Bidirectional Actions, Flows, and Models | ||||||
|
|
||||||
| ## Summary | ||||||
|
|
||||||
| Introduces bidirectional streaming capabilities to Genkit Actions, Flows, and Models. This allows these primitives to accept a continuous input stream and an initialization payload, in addition to producing an output stream. | ||||||
|
|
||||||
| ## Motivation | ||||||
|
|
||||||
| Core primitives need to support advanced interaction patterns beyond simple request/response: | ||||||
| - **Real-time interactions**: Voice or text chat where input and output happen simultaneously (e.g., Gemini Live, OpenAI Realtime). | ||||||
| - **Session management**: Long-running flows that maintain state across inputs. | ||||||
| - **Tool interruptions**: Human-in-the-loop scenarios where execution pauses for input. | ||||||
|
|
||||||
| ## Design | ||||||
|
|
||||||
| ### 1. Bidi Action Primitive | ||||||
|
|
||||||
| The `Action` interface is extended to support bidirectional communication. This is non-breaking; existing actions remain compatible. | ||||||
|
|
||||||
| - **`inputStream`**: An `AsyncIterable` of input chunks. | ||||||
| - **`init`**: An optional payload for setup context available before streaming begins. | ||||||
|
|
||||||
| #### Definition (`defineBidiAction`) | ||||||
|
|
||||||
| ```typescript | ||||||
| import { defineBidiAction } from '@genkit-ai/core'; | ||||||
| import { z } from 'zod'; | ||||||
|
|
||||||
| export const bidiChat = defineBidiAction( | ||||||
| { | ||||||
| name: 'bidiChat', | ||||||
| actionType: 'custom', | ||||||
| inputSchema: z.string(), // Schema for items in inputStream | ||||||
| outputSchema: z.string(), // Schema for the final return value | ||||||
| streamSchema: z.string(), // Schema for items in output stream | ||||||
| initSchema: z.object({ // Schema for initialization data | ||||||
| userId: z.string() | ||||||
| }), | ||||||
| }, | ||||||
| async function* ({ inputStream, init }) { | ||||||
| console.log(`Starting chat for ${init.userId}`); | ||||||
|
|
||||||
| for await (const chunk of inputStream) { | ||||||
| yield `Echo: ${chunk}`; | ||||||
| } | ||||||
| return 'Conversation ended'; | ||||||
| } | ||||||
| ); | ||||||
| ``` | ||||||
|
|
||||||
| ### 2. Bidi Flows | ||||||
|
|
||||||
| `defineBidiFlow` exposes this capability in the Genkit public API, wrapping the action with observability and tracing. | ||||||
|
|
||||||
| #### Type Signature | ||||||
| `Flow<Input, Output, Stream, Init>` | ||||||
|
|
||||||
| #### Usage | ||||||
|
|
||||||
| ```typescript | ||||||
| import { genkit, z } from 'genkit'; | ||||||
|
|
||||||
| const ai = genkit({ ... }); | ||||||
|
|
||||||
| export const flow = ai.defineBidiFlow( | ||||||
| { | ||||||
| name: 'chatFlow', | ||||||
| inputSchema: z.string(), | ||||||
| streamSchema: z.string(), | ||||||
| initSchema: z.object({ topic: z.string() }), | ||||||
| }, | ||||||
| async function* ({ inputStream, init }) { | ||||||
| yield `Welcome to ${init.topic}`; | ||||||
|
|
||||||
| for await (const msg of inputStream) { | ||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. just to make sure I understand this example - if there is no "return" - then this stream will continue to be open?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, in this example the output stream will continue to be open while the input stream is open. Once the input stream closes, this function will end, the flow will return void/undefined and the output stream will close. |
||||||
| if (msg === 'bye') break; | ||||||
| yield `You said: ${msg}`; | ||||||
| } | ||||||
| } | ||||||
| ); | ||||||
| ``` | ||||||
|
|
||||||
| ### 3. Bidi Models | ||||||
|
|
||||||
| `defineBidiModel` creates a specialized bidi action for LLMs, aimed at real-time model APIs. | ||||||
|
|
||||||
| #### The Role of `init` | ||||||
| For real-time sessions, the connection to the model API often requires configuration (temperature, system prompt, tools) to be established *before* the first user message is received. The `init` payload fulfills this requirement, separating session configuration from the conversation stream. | ||||||
|
|
||||||
| - **`init`**: `GenerateRequest` (contains config, tools, system prompt). | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||||||
| - **`inputStream`**: Stream of `GenerateRequest` (contains user messages/turns). | ||||||
| - **`stream`**: Stream of `GenerateResponseChunk`. | ||||||
|
|
||||||
| #### Definition | ||||||
|
|
||||||
| ```typescript | ||||||
| export const myRealtimeModel = defineBidiModel( | ||||||
| { name: 'myRealtimeModel' }, | ||||||
| async function* ({ inputStream, init }) { | ||||||
| // 1. Establish session using configuration from init | ||||||
| const session = await upstreamApi.connect({ | ||||||
| model: 'my-model', | ||||||
| config: init?.config, | ||||||
| systemPrompt: init?.messages?.find(m => m.role === 'system'), | ||||||
| tools: init?.tools, | ||||||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Tool requests are also streamed out, unless I'm missing something, we have no way of handling them automatically, the client will have to handle these?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. that's fine. Tool requests will be handled out of the output stream and responses injected back into the input stream. |
||||||
| }); | ||||||
|
|
||||||
| // 2. Handle conversation stream | ||||||
| for await (const request of inputStream) { | ||||||
| // Send new user input to the upstream session | ||||||
| session.send(request.messages); | ||||||
|
|
||||||
| // Yield responses from the upstream session | ||||||
| for await (const response of session.receive()) { | ||||||
| yield { content: [{ text: response.text }] }; | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| // 3. Return final result (usage stats, etc.) | ||||||
| return { | ||||||
| usage: { inputTokens: 10, outputTokens: 20, totalTokens: 30 }, | ||||||
| custom: session.getFinalMetadata(), | ||||||
| }; | ||||||
| } | ||||||
| ); | ||||||
| ``` | ||||||
|
|
||||||
| #### Usage (`generateBidi`) | ||||||
|
|
||||||
| `generateBidi` is the high-level API for interacting with bidi models. It is a subset of the standard `generate` API, currently supporting tool calling. | ||||||
|
|
||||||
| ```typescript | ||||||
| const session = await ai.generateBidi({ | ||||||
| model: myRealtimeModel, | ||||||
| config: { temperature: 0.7 }, // Passed via init | ||||||
| system: 'You are a helpful assistant', // Passed via init | ||||||
| }); | ||||||
|
|
||||||
| // The session is established. Now we can stream inputs. | ||||||
| session.send('Hello!'); | ||||||
|
|
||||||
| // Listen for responses (simultaneously) | ||||||
| for await (const chunk of session.stream) { | ||||||
| console.log(chunk.content); | ||||||
| } | ||||||
| ``` | ||||||
|
|
||||||
| ### 4. Execution API (`streamBidi`) | ||||||
|
|
||||||
| Actions and Flows expose a `streamBidi` method that returns a `BidiStreamingResponse`. | ||||||
|
|
||||||
| ```typescript | ||||||
| interface BidiStreamingResponse<O, S, I> { | ||||||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
| stream: AsyncGenerator<S>; // Output stream | ||||||
| output: Promise<O>; // Final result | ||||||
| send(chunk: I): void; // Push input | ||||||
| close(): void; // End input stream | ||||||
| } | ||||||
| ``` | ||||||
|
|
||||||
| #### Push Style (Manual Send) | ||||||
|
|
||||||
| ```typescript | ||||||
| const session = flow.streamBidi(undefined, { | ||||||
| init: { topic: 'Support' } | ||||||
| }); | ||||||
|
|
||||||
| // Send inputs | ||||||
| session.send('Hello'); | ||||||
| session.send('Help'); | ||||||
| session.close(); | ||||||
|
|
||||||
| // Consume output | ||||||
| for await (const chunk of session.stream) { | ||||||
| console.log(chunk); | ||||||
| } | ||||||
| ``` | ||||||
|
|
||||||
| #### Pull Style (Generator) | ||||||
|
|
||||||
| ```typescript | ||||||
| async function* inputSource() { | ||||||
| yield 'Hello'; | ||||||
| yield 'World'; | ||||||
| } | ||||||
|
|
||||||
| const session = flow.streamBidi(inputSource(), { | ||||||
| init: { topic: 'Greeting' } | ||||||
| }); | ||||||
|
|
||||||
| for await (const chunk of session.stream) { | ||||||
| console.log(chunk); | ||||||
| } | ||||||
| ``` | ||||||
|
|
||||||
| ## Integration with Reflection API | ||||||
|
|
||||||
| These features align with **Reflection API V2**, which uses WebSockets to support bidirectional streaming between the Runtime and the CLI/Manager. | ||||||
|
|
||||||
| - `runAction` now supports an `input` stream. | ||||||
| - `streamChunk` notifications are bidirectional (Manager <-> Runtime). | ||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would this run at the beginning of the flow basically everytime? @pavelgj
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes.