Conversation
📝 WalkthroughWalkthroughThis pull request introduces AWS Bedrock LLM integration to the flo_ai library. A new Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant AWSBedrock
participant BedrockRuntime as Bedrock Runtime
participant ToolHandler as Tool Handler
Client->>AWSBedrock: generate(messages, functions, output_schema)
AWSBedrock->>AWSBedrock: _convert_messages(messages, output_schema)
AWSBedrock->>AWSBedrock: format_tools_for_llm(functions)
AWSBedrock->>BedrockRuntime: invoke_model(request)
BedrockRuntime-->>AWSBedrock: response (content + usage)
AWSBedrock->>AWSBedrock: _strip_reasoning(content)
alt Tool Call Detected
AWSBedrock->>ToolHandler: get_assistant_message_for_tool_call(response)
ToolHandler-->>AWSBedrock: tool_call payload
else Text Response
AWSBedrock->>AWSBedrock: get_message_content(response)
end
AWSBedrock-->>Client: response (text or tool_call)
sequenceDiagram
participant Client
participant AWSBedrock
participant BedrockRuntime as Bedrock Runtime
Client->>AWSBedrock: stream(messages, functions)
AWSBedrock->>AWSBedrock: _convert_messages(messages)
AWSBedrock->>AWSBedrock: format_tools_for_llm(functions)
AWSBedrock->>BedrockRuntime: invoke_model_with_response_stream(request)
BedrockRuntime-->>AWSBedrock: event stream
loop Process Events
AWSBedrock->>AWSBedrock: parse chunk/SSE data
AWSBedrock->>AWSBedrock: accumulate content
end
AWSBedrock->>AWSBedrock: _strip_reasoning(accumulated_content)
AWSBedrock-->>Client: yield final content
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
♻️ Duplicate comments (1)
flo_ai/flo_ai/llm/aws_bedrock_llm.py (1)
177-178: Emptyexceptblocks silently swallow errors — add debug logging.Both
except json.JSONDecodeError: passblocks silently discard parse failures with no trace. Add a module-level logger and log at debug level for observability. This was already flagged in a prior review.Also applies to: 190-191
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@flo_ai/flo_ai/llm/aws_bedrock_llm.py` around lines 177 - 178, The two empty except blocks catching json.JSONDecodeError in the aws_bedrock_llm module should not swallow errors silently: add a module-level logger (import logging; logger = logging.getLogger(__name__)) and in each except json.JSONDecodeError block log the failure at debug level, e.g. logger.debug("Failed to decode JSON in <function_or_method_name>: %s", e) while capturing the exception as a variable (except json.JSONDecodeError as e), so the parse errors in the aws_bedrock_llm module (e.g., in the AWSBedrockLLM class methods where the JSON is parsed) are visible for debugging.
🧹 Nitpick comments (2)
flo_ai/flo_ai/llm/aws_bedrock_llm.py (2)
35-37: Use explicitOptionaltype annotation (PEP 484).Static analysis flags the implicit
Optionalonoutput_schema: dict = None. Per PEP 484, use an explicit union type.♻️ Proposed fix
def _convert_messages( - self, messages: list[dict], output_schema: dict = None + self, messages: list[dict], output_schema: Optional[dict] = None ) -> list[dict]:🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@flo_ai/flo_ai/llm/aws_bedrock_llm.py` around lines 35 - 37, The parameter output_schema in method _convert_messages should use an explicit Optional type per PEP 484; import Optional from typing and change the signature of _convert_messages to use output_schema: Optional[dict] = None (keeping return type list[dict] unchanged) so static analyzers no longer treat the None default as an implicit Optional; update any related type hints in that scope if present.
110-135: Simplify the content-existence check (Ruff RUF019).Line 115 performs an unnecessary key check before accessing.
message.get('content')already returnsNoneif the key is absent, so the truthiness check covers both cases.♻️ Proposed fix
- if 'content' in message and message['content']: - message['content'] = self._strip_reasoning(message['content']) + if message.get('content'): + message['content'] = self._strip_reasoning(message['content'])🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@flo_ai/flo_ai/llm/aws_bedrock_llm.py` around lines 110 - 135, The check "if 'content' in message and message['content']" is redundant; replace it by reading content = message.get('content') and only call self._strip_reasoning when content is truthy, e.g. if content: message['content'] = self._strip_reasoning(content). This simplifies the logic around the local variable message in the AWS Bedrock response parsing (refer to the message variable and the _strip_reasoning method) and preserves the later use of message.get('content', '') to compute text_content.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@flo_ai/flo_ai/llm/aws_bedrock_llm.py`:
- Around line 163-195: The stream() method currently accumulates all deltas into
a local buffer and yields only once; change it to yield each content delta as it
is parsed so consumers receive streaming updates: offload the blocking iteration
over response['body'] (which comes from invoke_model_with_response_stream) to a
thread or executor (e.g., asyncio.to_thread/run_in_executor) so the event loop
isn't blocked, parse each event/chunk as it arrives, apply self._strip_reasoning
to each incremental piece (or accumulate minimally and strip per-yield if
needed) and yield {'content': cleaned_chunk} immediately for every non-empty
content delta instead of returning a single final yield.
- Around line 20-29: The __init__ in AWS Bedrock LLM is missing a production
dependency and contains a suspicious default model plus redundant assignments:
add "boto3>=1.36.1,<2" to the package's main dependencies in pyproject.toml so
boto3.client('bedrock-runtime') in the __init__ won't raise ImportError; replace
the default model string 'openai.gpt-oss-20b-1:0' in the AWSBedrock __init__
signature with a valid Bedrock model ID (e.g., 'amazon.titan-text-express-v1')
or document the intentional placeholder in the constructor docstring; and remove
the redundant assignments to self.model and self.kwargs in AWSBedrock.__init__
since BaseLLM.__init__ already sets them.
- Around line 63-88: The AWSBedrock class is implemented to talk to
OpenAI-compatible Bedrock models (generate uses OpenAI-style request/response
shapes and boto_client.invoke_model), so either (A) make this explicit by
updating the class docstring of AWSBedrock (or rename the class to
AWSBedrockOpenAICompat) and keep the current OpenAI-compatible behavior, or (B)
migrate generate to use Bedrock's broader Converse API and model-specific
request/response handling (replace current request_body keys like 'model',
'messages', 'temperature', 'max_completion_tokens', 'tools' and adjust response
parsing that expects choices[].message). In either path remove the redundant
'model' key from request_body (you already pass modelId=self.model to
boto_client.invoke_model) and update generate (and any usage of
_convert_messages and response parsing) so behavior matches the chosen
compatibility scope.
---
Duplicate comments:
In `@flo_ai/flo_ai/llm/aws_bedrock_llm.py`:
- Around line 177-178: The two empty except blocks catching json.JSONDecodeError
in the aws_bedrock_llm module should not swallow errors silently: add a
module-level logger (import logging; logger = logging.getLogger(__name__)) and
in each except json.JSONDecodeError block log the failure at debug level, e.g.
logger.debug("Failed to decode JSON in <function_or_method_name>: %s", e) while
capturing the exception as a variable (except json.JSONDecodeError as e), so the
parse errors in the aws_bedrock_llm module (e.g., in the AWSBedrockLLM class
methods where the JSON is parsed) are visible for debugging.
---
Nitpick comments:
In `@flo_ai/flo_ai/llm/aws_bedrock_llm.py`:
- Around line 35-37: The parameter output_schema in method _convert_messages
should use an explicit Optional type per PEP 484; import Optional from typing
and change the signature of _convert_messages to use output_schema:
Optional[dict] = None (keeping return type list[dict] unchanged) so static
analyzers no longer treat the None default as an implicit Optional; update any
related type hints in that scope if present.
- Around line 110-135: The check "if 'content' in message and
message['content']" is redundant; replace it by reading content =
message.get('content') and only call self._strip_reasoning when content is
truthy, e.g. if content: message['content'] = self._strip_reasoning(content).
This simplifies the logic around the local variable message in the AWS Bedrock
response parsing (refer to the message variable and the _strip_reasoning method)
and preserves the later use of message.get('content', '') to compute
text_content.
| def __init__( | ||
| self, | ||
| model: str = 'openai.gpt-oss-20b-1:0', | ||
| temperature: float = 0.7, | ||
| **kwargs, | ||
| ): | ||
| super().__init__(model=model, temperature=temperature, **kwargs) | ||
| self.boto_client = boto3.client('bedrock-runtime') | ||
| self.model = model | ||
| self.kwargs = kwargs |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Check for setup.py and pyproject.toml, and look for boto3 dependency
echo "=== Looking for setup.py ==="
find . -maxdepth 3 -name "setup.py" -type f | head -5
echo -e "\n=== Looking for pyproject.toml ==="
find . -maxdepth 3 -name "pyproject.toml" -type f | head -5
echo -e "\n=== Looking for setup.cfg ==="
find . -maxdepth 3 -name "setup.cfg" -type f | head -5
echo -e "\n=== Checking boto3 in requirements files ==="
find . -maxdepth 3 \( -name "*requirements*.txt" -o -name "setup.py" -o -name "pyproject.toml" -o -name "setup.cfg" \) -type f | xargs grep -l boto3 2>/dev/null | head -10Repository: rootflo/wavefront
Length of output: 302
🏁 Script executed:
#!/bin/bash
# Read the aws_bedrock_llm.py file to see the full context
echo "=== Reading aws_bedrock_llm.py ==="
cat -n flo_ai/flo_ai/llm/aws_bedrock_llm.py | head -50Repository: rootflo/wavefront
Length of output: 1890
🏁 Script executed:
#!/bin/bash
# Check the BaseLLM class to understand what super().__init__() does
rg -n "class BaseLLM" -A 30 --type=pyRepository: rootflo/wavefront
Length of output: 4198
🏁 Script executed:
#!/bin/bash
# Read the pyproject.toml to check if boto3 is in install_requires
echo "=== Checking flo_ai/pyproject.toml for boto3 ==="
cat flo_ai/pyproject.toml | grep -A 20 -B 2 "install_requires\|dependencies"
echo -e "\n=== Full pyproject.toml if needed ==="
cat flo_ai/pyproject.tomlRepository: rootflo/wavefront
Length of output: 3205
🏁 Script executed:
#!/bin/bash
# Also check setup.py if it exists
echo "=== Checking flo_ai/setup.py ==="
cat flo_ai/setup.py | grep -A 30 "install_requires"Repository: rootflo/wavefront
Length of output: 120
🏁 Script executed:
#!/bin/bash
# Search for boto3 mentions to understand dependency status
echo "=== Looking for boto3 references in config files ==="
grep -r "boto3" flo_ai/ --include="*.toml" --include="*.py" --include="*.txt" --include="*.cfg" 2>/dev/null | grep -v "__pycache__" | grep -v ".pyc" | head -20Repository: rootflo/wavefront
Length of output: 305
Add boto3 to production dependencies and fix suspicious default model ID.
-
boto3is missing from the productiondependencieslist inpyproject.toml. It's currently only in the[dependency-groups] devsection. Users installing this package will get anImportErrorat runtime when trying to useAWSBedrock. Add"boto3>=1.36.1,<2"to the maindependencieslist. -
The default model
'openai.gpt-oss-20b-1:0'does not match AWS Bedrock model ID patterns (e.g.,anthropic.claude-3-sonnet-20240229-v1:0,amazon.titan-text-express-v1). This appears to be a placeholder or incorrect value. Provide a valid Bedrock model ID or document why this unusual format is intentional. -
Minor:
self.modelandself.kwargson lines 28–29 are redundantly assigned;BaseLLM.__init__()already sets these (harmless but unnecessary).
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@flo_ai/flo_ai/llm/aws_bedrock_llm.py` around lines 20 - 29, The __init__ in
AWS Bedrock LLM is missing a production dependency and contains a suspicious
default model plus redundant assignments: add "boto3>=1.36.1,<2" to the
package's main dependencies in pyproject.toml so boto3.client('bedrock-runtime')
in the __init__ won't raise ImportError; replace the default model string
'openai.gpt-oss-20b-1:0' in the AWSBedrock __init__ signature with a valid
Bedrock model ID (e.g., 'amazon.titan-text-express-v1') or document the
intentional placeholder in the constructor docstring; and remove the redundant
assignments to self.model and self.kwargs in AWSBedrock.__init__ since
BaseLLM.__init__ already sets them.
| @trace_llm_call(provider='bedrock') | ||
| async def generate( | ||
| self, | ||
| messages: list[dict], | ||
| functions: Optional[List[Dict[str, Any]]] = None, | ||
| output_schema: Optional[Dict[str, Any]] = None, | ||
| **kwargs, | ||
| ) -> Any: | ||
| converted = self._convert_messages(messages, output_schema) | ||
|
|
||
| request_body: Dict[str, Any] = { | ||
| 'model': self.model, | ||
| 'messages': converted, | ||
| 'temperature': self.temperature, | ||
| } | ||
| if 'max_tokens' in self.kwargs: | ||
| request_body['max_completion_tokens'] = self.kwargs['max_tokens'] | ||
| if functions: | ||
| request_body['tools'] = functions | ||
|
|
||
| response = await asyncio.to_thread( | ||
| self.boto_client.invoke_model, | ||
| modelId=self.model, | ||
| body=json.dumps(request_body), | ||
| ) | ||
| response_body = json.loads(response['body'].read().decode('utf-8')) |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
AWS Bedrock invoke_model OpenAI compatible profiles 2025
💡 Result:
In Amazon Bedrock, “profiles” in the context of InvokeModel typically means Inference Profiles (cross‑region or application profiles), and they’re usable from both native Bedrock APIs and (in many cases) OpenAI‑compatible APIs.
InvokeModelsupports inference profiles directly: themodelIdpath parameter can be a base model ID/ARN or an inference profile ID/ARN. The API reference explicitly calls this out. [1]- OpenAI‑compatible endpoints (2025+):
bedrock-mantle.{region}.amazonaws.com: OpenAI‑compatible Responses API + Chat Completions API. [2]bedrock-runtime.{region}.amazonaws.com: native InvokeModel/Converse, and also supports OpenAI‑compatible Chat Completions. [2]
- OpenAI “GPT‑OSS” models in Bedrock (released Aug 5, 2025) have Bedrock model IDs like:
openai.gpt-oss-20b-1:0openai.gpt-oss-120b-1:0[3]
They support InvokeModel and OpenAI‑compatible Chat Completions (and are also listed as supporting Responses in Bedrock’s API compatibility matrix). [3][4]
- Responses API availability (announced Dec 4, 2025): Bedrock added OpenAI Responses API support on new OpenAI‑compatible service endpoints (i.e.,
bedrock-mantle). [5] - Example inference profile IDs you might pass as
modelIdtoInvokeModelinclude global/us geo routing profiles (AWS provides lists and examples, e.g.,global.amazon.nova-2-lite-v1:0). [6]
Sources:
[1] InvokeModel API (modelId can be an inference profile) https://docs.aws.amazon.com/bedrock/latest/APIReference/API_runtime_InvokeModel.html
[2] Bedrock endpoints (bedrock-runtime vs bedrock-mantle) https://docs.aws.amazon.com/bedrock/latest/userguide/endpoints.html
[3] OpenAI models in Bedrock (IDs + release date) https://docs.aws.amazon.com/bedrock/latest/userguide/model-parameters-openai.html
[4] Model/API compatibility matrix (Invoke/Chat Completions/Responses per model) https://docs.aws.amazon.com/bedrock/latest/userguide/models-api-compatibility.html
[5] AWS “What’s New” (Dec 4, 2025 Responses API on Bedrock) https://aws.amazon.com/about-aws/whats-new/2025/12/amazon-bedrock-responses-api-from-openai/
[6] Inference profiles support + example IDs https://docs.aws.amazon.com/bedrock/latest/userguide/inference-profiles-support.html
🌐 Web query:
AWS Bedrock Converse API vs invoke_model difference 2025
💡 Result:
In Amazon Bedrock Runtime, Converse is the newer “standard chat/messages” interface; InvokeModel is the original “raw model payload” interface. (Still true in 2025.)
Key differences
-
Abstraction / portability
Converse: consistent messages-based request/response across supported chat-capable models; you can generally switch models with minimal code changes. [1][2]InvokeModel: you send the provider/model-specific JSON body (field names and shapes differ by model family), so switching models often means rewriting payload parsing/building. [3][4]
-
Conversation handling
Converse: designed for multi-turn; you pass structuredmessages(including history) in one standardized format. [1][2]InvokeModel: can do multi-turn only if the specific model’s schema supports it (for example, Anthropic “messages” format), but it’s not standardized across providers. [3]
-
Tool use (function calling)
Converse: has first-class, standardized toolConfig/tool use for supported models. [1][2]InvokeModel: tool/function calling is provider-specific (different field names/structures depending on model family). [4]
-
Modalities & “non-chat” workloads
InvokeModel: explicitly supports text, images, and embeddings (whatever the selected model exposes), using the model’s native schema. [3]Converse: only works with models that support messages (chat-style). [2]
-
Structured outputs (JSON schema)
- Both can do structured outputs, but the parameter locations/names differ;
Converseuses Bedrock-standard fields, whileInvokeModelvaries by provider (and can vary again for open-weight models). [4]
- Both can do structured outputs, but the parameter locations/names differ;
-
Endpoints / operations
Converse:POST /model/{modelId}/converse(plusConverseStream). [2]InvokeModel:POST /model/{modelId}/invoke(plus streaming variant). [3]
Practical guidance
- Prefer
Converse/ConverseStreamfor chat apps, multi-turn assistants, tool/function calling, and easier model switching. [1][2] - Use
InvokeModel/InvokeModelWithResponseStreamwhen you need full provider-native control, or you’re calling non-messages models (for example embeddings or many image models). [3][4]
Sources: AWS “What’s New” (Converse API announcement) [1], Bedrock Runtime API refs for Converse [2] and InvokeModel [3], and AWS guidance comparing parameter formats (structured outputs / Converse vs InvokeModel) [4].
🌐 Web query:
AWS Bedrock invoke_model request body schema format
💡 Result:
InvokeModel does not have one universal JSON schema. The body is model-specific JSON (sent as raw bytes) and must match the inference parameters for the particular model/provider you’re invoking. You must set Content-Type: application/json (and typically Accept: application/json). (docs.aws.amazon.com)
Generic REST shape
- Endpoint:
POST /model/{modelId}/invoke - Headers:
Content-Type: application/json,Accept: application/json - Body: JSON for that model (max request body size is very large, up to 25,000,000 bytes). (docs.aws.amazon.com)
Example request bodies (common ones)
Anthropic Claude (Messages API format)
{
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 1024,
"messages": [
{ "role": "user", "content": [ { "type": "text", "text": "Hello world" } ] }
]
}Amazon Titan Text (text generation)
{
"inputText": "Write a short poem about apples.",
"textGenerationConfig": {
"maxTokenCount": 256,
"temperature": 0,
"topP": 0.9,
"stopSequences": []
}
}Amazon Titan Text Embeddings
{
"inputText": "Text to embed"
}If you tell me the exact modelId (e.g., anthropic.claude-..., amazon.titan-..., stability.stable-diffusion-...), I can paste the exact request/response schema for that model from the “Inference request parameters…” reference. (docs.aws.amazon.com)
Citations:
- 1: https://docs.aws.amazon.com/bedrock/latest/APIReference/API_runtime_InvokeModel.html?utm_source=openai
- 2: https://docs.aws.amazon.com/bedrock/latest/APIReference/API_runtime_InvokeModel.html?utm_source=openai
- 3: https://docs.aws.amazon.com/bedrock/latest/APIReference/API_runtime_InvokeModel.html?utm_source=openai
- 4: https://aws.amazon.com/blogs/aws/build-rag-and-agent-based-generative-ai-applications-with-new-amazon-titan-text-premier-model-available-in-amazon-bedrock/?utm_source=openai
- 5: https://aws.amazon.com/blogs/machine-learning/getting-started-with-amazon-titan-text-embeddings/?utm_source=openai
- 6: https://docs.aws.amazon.com/bedrock/latest/userguide/model-parameters.html?utm_source=openai
🏁 Script executed:
cd flo_ai && head -60 flo_ai/llm/aws_bedrock_llm.py | cat -nRepository: rootflo/wavefront
Length of output: 2260
🏁 Script executed:
cd flo_ai && rg -A 5 -B 5 "class.*Bedrock" flo_ai/llm/aws_bedrock_llm.pyRepository: rootflo/wavefront
Length of output: 294
🏁 Script executed:
cd flo_ai && rg "invoke_model\|InvokeModel\|Converse\|modelId" flo_ai/llm/aws_bedrock_llm.pyRepository: rootflo/wavefront
Length of output: 43
🏁 Script executed:
cd flo_ai && sed -n '63,90p' flo_ai/llm/aws_bedrock_llm.py | cat -nRepository: rootflo/wavefront
Length of output: 1226
🏁 Script executed:
cd flo_ai && rg -i "openai\|converse\|model.*compatibility\|supported.*model" flo_ai/llm/aws_bedrock_llm.pyRepository: rootflo/wavefront
Length of output: 43
🏁 Script executed:
cd flo_ai && grep -n "invoke_model\|ConversUrl" flo_ai/llm/aws_bedrock_llm.pyRepository: rootflo/wavefront
Length of output: 174
🏁 Script executed:
cd flo_ai && head -100 flo_ai/llm/aws_bedrock_llm.py | tail -40Repository: rootflo/wavefront
Length of output: 1440
🏁 Script executed:
cd flo_ai && grep -n "class\|def\|#" flo_ai/llm/aws_bedrock_llm.py | head -30Repository: rootflo/wavefront
Length of output: 861
This implementation is coupled to OpenAI-compatible Bedrock models, not general Bedrock support.
The request body uses OpenAI-style schema (model, messages, temperature, max_completion_tokens, tools) and response parsing expects choices[].message format. While the default model openai.gpt-oss-20b-1:0 is OpenAI-compatible and works correctly, the generic class name AWSBedrock implies broader Bedrock model support.
Users attempting to use native Bedrock models (e.g., anthropic.claude-3-5-sonnet-*, amazon.titan-text-*, meta.llama-*) will encounter failures, as those models use model-specific request/response schemas via invoke_model or require the standardized Converse API.
Recommended fixes:
- Add a docstring explicitly documenting that this class only supports OpenAI-compatible Bedrock models, or
- Rename to
AWSBedrockOpenAICompatto clarify scope, or - Migrate to the
ConverseAPI for broader Bedrock model compatibility.
Also, the 'model' key in the request body (line 74) is required for OpenAI but redundant with the modelId parameter (line 85) and may cause issues if model-specific validation is added.
🧰 Tools
🪛 Ruff (0.15.2)
[warning] 69-69: Unused method argument: kwargs
(ARG002)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@flo_ai/flo_ai/llm/aws_bedrock_llm.py` around lines 63 - 88, The AWSBedrock
class is implemented to talk to OpenAI-compatible Bedrock models (generate uses
OpenAI-style request/response shapes and boto_client.invoke_model), so either
(A) make this explicit by updating the class docstring of AWSBedrock (or rename
the class to AWSBedrockOpenAICompat) and keep the current OpenAI-compatible
behavior, or (B) migrate generate to use Bedrock's broader Converse API and
model-specific request/response handling (replace current request_body keys like
'model', 'messages', 'temperature', 'max_completion_tokens', 'tools' and adjust
response parsing that expects choices[].message). In either path remove the
redundant 'model' key from request_body (you already pass modelId=self.model to
boto_client.invoke_model) and update generate (and any usage of
_convert_messages and response parsing) so behavior matches the chosen
compatibility scope.
| buffer = '' | ||
|
|
||
| for event in response['body']: | ||
| chunk_bytes = event.get('chunk', {}).get('bytes', b'') | ||
| if not chunk_bytes: | ||
| continue | ||
| text = chunk_bytes.decode('utf-8').strip() | ||
| # Try direct JSON first (some Bedrock models skip SSE envelope) | ||
| try: | ||
| data = json.loads(text) | ||
| content = data.get('choices', [{}])[0].get('delta', {}).get('content') | ||
| if content: | ||
| buffer += content | ||
| continue | ||
| except json.JSONDecodeError: | ||
| pass | ||
| # Fall back to SSE format: "data: {...}" | ||
| for line in text.split('\n'): | ||
| line = line.strip() | ||
| if line.startswith('data: ') and line != 'data: [DONE]': | ||
| try: | ||
| data = json.loads(line[6:]) | ||
| content = ( | ||
| data.get('choices', [{}])[0].get('delta', {}).get('content') | ||
| ) | ||
| if content: | ||
| buffer += content | ||
| except json.JSONDecodeError: | ||
| pass | ||
|
|
||
| clean = self._strip_reasoning(buffer) | ||
| if clean: | ||
| yield {'content': clean} |
There was a problem hiding this comment.
stream() accumulates all content and yields once — not actually streaming.
The method collects the entire response into buffer (lines 163–191), then yields a single chunk at the end (lines 193–195). This defeats the purpose of streaming, as callers will block until the full response is generated before receiving anything. Each content delta should be yielded as it arrives:
Additionally, the for event in response['body'] loop (line 165) runs synchronously on the async event loop thread, blocking it for the entire duration of the stream. Only the initial invoke_model_with_response_stream call is offloaded via asyncio.to_thread.
🐛 Proposed fix — yield incremental chunks and offload blocking iteration
- buffer = ''
-
- for event in response['body']:
- chunk_bytes = event.get('chunk', {}).get('bytes', b'')
- if not chunk_bytes:
- continue
- text = chunk_bytes.decode('utf-8').strip()
- # Try direct JSON first (some Bedrock models skip SSE envelope)
- try:
- data = json.loads(text)
- content = data.get('choices', [{}])[0].get('delta', {}).get('content')
- if content:
- buffer += content
- continue
- except json.JSONDecodeError:
- pass
- # Fall back to SSE format: "data: {...}"
- for line in text.split('\n'):
- line = line.strip()
- if line.startswith('data: ') and line != 'data: [DONE]':
- try:
- data = json.loads(line[6:])
- content = (
- data.get('choices', [{}])[0].get('delta', {}).get('content')
- )
- if content:
- buffer += content
- except json.JSONDecodeError:
- pass
-
- clean = self._strip_reasoning(buffer)
- if clean:
- yield {'content': clean}
+ queue: asyncio.Queue = asyncio.Queue()
+
+ def _iter_events():
+ for event in response['body']:
+ chunk_bytes = event.get('chunk', {}).get('bytes', b'')
+ if chunk_bytes:
+ queue.put_nowait(chunk_bytes)
+ queue.put_nowait(None) # sentinel
+
+ loop = asyncio.get_event_loop()
+ loop.run_in_executor(None, _iter_events)
+
+ while True:
+ chunk_bytes = await queue.get()
+ if chunk_bytes is None:
+ break
+ text = chunk_bytes.decode('utf-8').strip()
+ content = None
+ try:
+ data = json.loads(text)
+ content = data.get('choices', [{}])[0].get('delta', {}).get('content')
+ except json.JSONDecodeError:
+ for line in text.split('\n'):
+ line = line.strip()
+ if line.startswith('data: ') and line != 'data: [DONE]':
+ try:
+ data = json.loads(line[6:])
+ content = data.get('choices', [{}])[0].get('delta', {}).get('content')
+ except json.JSONDecodeError:
+ logger.debug("Skipping malformed SSE line: %s", line)
+ if content:
+ clean = self._strip_reasoning(content)
+ if clean:
+ yield {'content': clean}Note: This sketch uses a queue + executor to avoid blocking the event loop. The exact approach may need refinement depending on the framework's async patterns, but the key point is: yield each chunk as it arrives.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@flo_ai/flo_ai/llm/aws_bedrock_llm.py` around lines 163 - 195, The stream()
method currently accumulates all deltas into a local buffer and yields only
once; change it to yield each content delta as it is parsed so consumers receive
streaming updates: offload the blocking iteration over response['body'] (which
comes from invoke_model_with_response_stream) to a thread or executor (e.g.,
asyncio.to_thread/run_in_executor) so the event loop isn't blocked, parse each
event/chunk as it arrives, apply self._strip_reasoning to each incremental piece
(or accumulate minimally and strip per-yield if needed) and yield {'content':
cleaned_chunk} immediately for every non-empty content delta instead of
returning a single final yield.
Summary by CodeRabbit
New Features