Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions flo_ai/flo_ai/llm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from .openai_vllm import OpenAIVLLM
from .vertexai_llm import VertexAI
from .rootflo_llm import RootFloLLM
from .aws_bedrock_llm import AWSBedrock

__all__ = [
'BaseLLM',
Expand All @@ -16,4 +17,5 @@
'OpenAIVLLM',
'VertexAI',
'RootFloLLM',
'AWSBedrock',
]
265 changes: 265 additions & 0 deletions flo_ai/flo_ai/llm/aws_bedrock_llm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,265 @@
import json
import re
from typing import Dict, Any, List, AsyncIterator, Optional
import boto3
import asyncio
from .base_llm import BaseLLM
from flo_ai.models.chat_message import ImageMessageContent
from flo_ai.tool.base_tool import Tool
from flo_ai.telemetry.instrumentation import (
trace_llm_call,
trace_llm_stream,
llm_metrics,
add_span_attributes,
)
from flo_ai.telemetry import get_tracer
from opentelemetry import trace


class AWSBedrock(BaseLLM):
def __init__(
self,
model: str = 'openai.gpt-oss-20b-1:0',
temperature: float = 0.7,
**kwargs,
):
super().__init__(model=model, temperature=temperature, **kwargs)
self.boto_client = boto3.client('bedrock-runtime')
self.model = model
self.kwargs = kwargs
Comment on lines +20 to +29
Copy link

@coderabbitai coderabbitai bot Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check for setup.py and pyproject.toml, and look for boto3 dependency
echo "=== Looking for setup.py ==="
find . -maxdepth 3 -name "setup.py" -type f | head -5

echo -e "\n=== Looking for pyproject.toml ==="
find . -maxdepth 3 -name "pyproject.toml" -type f | head -5

echo -e "\n=== Looking for setup.cfg ==="
find . -maxdepth 3 -name "setup.cfg" -type f | head -5

echo -e "\n=== Checking boto3 in requirements files ==="
find . -maxdepth 3 \( -name "*requirements*.txt" -o -name "setup.py" -o -name "pyproject.toml" -o -name "setup.cfg" \) -type f | xargs grep -l boto3 2>/dev/null | head -10

Repository: rootflo/wavefront

Length of output: 302


🏁 Script executed:

#!/bin/bash
# Read the aws_bedrock_llm.py file to see the full context
echo "=== Reading aws_bedrock_llm.py ==="
cat -n flo_ai/flo_ai/llm/aws_bedrock_llm.py | head -50

Repository: rootflo/wavefront

Length of output: 1890


🏁 Script executed:

#!/bin/bash
# Check the BaseLLM class to understand what super().__init__() does
rg -n "class BaseLLM" -A 30 --type=py

Repository: rootflo/wavefront

Length of output: 4198


🏁 Script executed:

#!/bin/bash
# Read the pyproject.toml to check if boto3 is in install_requires
echo "=== Checking flo_ai/pyproject.toml for boto3 ==="
cat flo_ai/pyproject.toml | grep -A 20 -B 2 "install_requires\|dependencies"

echo -e "\n=== Full pyproject.toml if needed ==="
cat flo_ai/pyproject.toml

Repository: rootflo/wavefront

Length of output: 3205


🏁 Script executed:

#!/bin/bash
# Also check setup.py if it exists
echo "=== Checking flo_ai/setup.py ==="
cat flo_ai/setup.py | grep -A 30 "install_requires"

Repository: rootflo/wavefront

Length of output: 120


🏁 Script executed:

#!/bin/bash
# Search for boto3 mentions to understand dependency status
echo "=== Looking for boto3 references in config files ==="
grep -r "boto3" flo_ai/ --include="*.toml" --include="*.py" --include="*.txt" --include="*.cfg" 2>/dev/null | grep -v "__pycache__" | grep -v ".pyc" | head -20

Repository: rootflo/wavefront

Length of output: 305


Add boto3 to production dependencies and fix suspicious default model ID.

  1. boto3 is missing from the production dependencies list in pyproject.toml. It's currently only in the [dependency-groups] dev section. Users installing this package will get an ImportError at runtime when trying to use AWSBedrock. Add "boto3>=1.36.1,<2" to the main dependencies list.

  2. The default model 'openai.gpt-oss-20b-1:0' does not match AWS Bedrock model ID patterns (e.g., anthropic.claude-3-sonnet-20240229-v1:0, amazon.titan-text-express-v1). This appears to be a placeholder or incorrect value. Provide a valid Bedrock model ID or document why this unusual format is intentional.

  3. Minor: self.model and self.kwargs on lines 28–29 are redundantly assigned; BaseLLM.__init__() already sets these (harmless but unnecessary).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@flo_ai/flo_ai/llm/aws_bedrock_llm.py` around lines 20 - 29, The __init__ in
AWS Bedrock LLM is missing a production dependency and contains a suspicious
default model plus redundant assignments: add "boto3>=1.36.1,<2" to the
package's main dependencies in pyproject.toml so boto3.client('bedrock-runtime')
in the __init__ won't raise ImportError; replace the default model string
'openai.gpt-oss-20b-1:0' in the AWSBedrock __init__ signature with a valid
Bedrock model ID (e.g., 'amazon.titan-text-express-v1') or document the
intentional placeholder in the constructor docstring; and remove the redundant
assignments to self.model and self.kwargs in AWSBedrock.__init__ since
BaseLLM.__init__ already sets them.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 you need to add, the current code works because some of our libraries have boto dependency internally
3. Check 3 as well

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check BaseLLM.__init__() to see what it sets
echo "=== Checking BaseLLM.__init__() implementation ==="
rg -n "class BaseLLM" -A 40 --type=py flo_ai/flo_ai/llm/base_llm.py | head -50

Length of output: 1513


@vizsatiz, confirmed on both points:

  1. boto3 dependency: You're right that it should be explicitly added to the production dependencies list in pyproject.toml, even though it currently works through transitive dependencies. Add:

    "boto3>=1.36.1,<2"
  2. Redundant assignments on lines 28-29: Confirmed redundant. BaseLLM.__init__() already sets both self.model and self.kwargs (lines 17 and 20 in base_llm.py). These lines can be safely removed:

def __init__(
    self,
    model: str = 'openai.gpt-oss-20b-1:0',
    temperature: float = 0.7,
    **kwargs,
):
    super().__init__(model=model, temperature=temperature, **kwargs)
    self.boto_client = boto3.client('bedrock-runtime')
    # Remove these two lines:
    # self.model = model
    # self.kwargs = kwargs

✏️ Learnings added
Learnt from: vizsatiz
Repo: rootflo/wavefront PR: 228
File: flo_ai/flo_ai/llm/aws_bedrock_llm.py:20-29
Timestamp: 2026-02-26T09:29:30.130Z
Learning: In the `flo_ai` package, boto3 is currently available through transitive dependencies from other internal libraries, but should be added explicitly to the production dependencies in pyproject.toml for the AWS Bedrock integration.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.


@staticmethod
def _strip_reasoning(text: str) -> str:
return re.sub(r'<reasoning>.*?</reasoning>', '', text, flags=re.DOTALL).strip()

def _convert_messages(
self, messages: list[dict], output_schema: dict = None
) -> list[dict]:
result = []

if output_schema:
result.append(
{
'role': 'system',
'content': f'Provide output in the following JSON schema:\n{json.dumps(output_schema, indent=2)}',
}
)

for msg in messages:
if msg['role'] == 'function':
result.append(
{
'role': 'tool',
'tool_call_id': msg.get('tool_use_id', 'unknown'),
'content': msg['content'],
'name': msg.get('name', ''),
}
)
else:
result.append(msg)

return result

@trace_llm_call(provider='bedrock')
async def generate(
self,
messages: list[dict],
functions: Optional[List[Dict[str, Any]]] = None,
output_schema: Optional[Dict[str, Any]] = None,
**kwargs,
) -> Any:
converted = self._convert_messages(messages, output_schema)

request_body: Dict[str, Any] = {
'model': self.model,
'messages': converted,
'temperature': self.temperature,
}
if 'max_tokens' in self.kwargs:
request_body['max_completion_tokens'] = self.kwargs['max_tokens']
if functions:
request_body['tools'] = functions

response = await asyncio.to_thread(
self.boto_client.invoke_model,
modelId=self.model,
body=json.dumps(request_body),
)
response_body = json.loads(response['body'].read().decode('utf-8'))
Comment on lines +63 to +88
Copy link

@coderabbitai coderabbitai bot Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🌐 Web query:

AWS Bedrock invoke_model OpenAI compatible profiles 2025

💡 Result:

In Amazon Bedrock, “profiles” in the context of InvokeModel typically means Inference Profiles (cross‑region or application profiles), and they’re usable from both native Bedrock APIs and (in many cases) OpenAI‑compatible APIs.

  • InvokeModel supports inference profiles directly: the modelId path parameter can be a base model ID/ARN or an inference profile ID/ARN. The API reference explicitly calls this out. [1]
  • OpenAI‑compatible endpoints (2025+):
    • bedrock-mantle.{region}.amazonaws.com: OpenAI‑compatible Responses API + Chat Completions API. [2]
    • bedrock-runtime.{region}.amazonaws.com: native InvokeModel/Converse, and also supports OpenAI‑compatible Chat Completions. [2]
  • OpenAI “GPT‑OSS” models in Bedrock (released Aug 5, 2025) have Bedrock model IDs like:
    • openai.gpt-oss-20b-1:0
    • openai.gpt-oss-120b-1:0 [3]
      They support InvokeModel and OpenAI‑compatible Chat Completions (and are also listed as supporting Responses in Bedrock’s API compatibility matrix). [3][4]
  • Responses API availability (announced Dec 4, 2025): Bedrock added OpenAI Responses API support on new OpenAI‑compatible service endpoints (i.e., bedrock-mantle). [5]
  • Example inference profile IDs you might pass as modelId to InvokeModel include global/us geo routing profiles (AWS provides lists and examples, e.g., global.amazon.nova-2-lite-v1:0). [6]

Sources:
[1] InvokeModel API (modelId can be an inference profile) https://docs.aws.amazon.com/bedrock/latest/APIReference/API_runtime_InvokeModel.html
[2] Bedrock endpoints (bedrock-runtime vs bedrock-mantle) https://docs.aws.amazon.com/bedrock/latest/userguide/endpoints.html
[3] OpenAI models in Bedrock (IDs + release date) https://docs.aws.amazon.com/bedrock/latest/userguide/model-parameters-openai.html
[4] Model/API compatibility matrix (Invoke/Chat Completions/Responses per model) https://docs.aws.amazon.com/bedrock/latest/userguide/models-api-compatibility.html
[5] AWS “What’s New” (Dec 4, 2025 Responses API on Bedrock) https://aws.amazon.com/about-aws/whats-new/2025/12/amazon-bedrock-responses-api-from-openai/
[6] Inference profiles support + example IDs https://docs.aws.amazon.com/bedrock/latest/userguide/inference-profiles-support.html


🌐 Web query:

AWS Bedrock Converse API vs invoke_model difference 2025

💡 Result:

In Amazon Bedrock Runtime, Converse is the newer “standard chat/messages” interface; InvokeModel is the original “raw model payload” interface. (Still true in 2025.)

Key differences

  • Abstraction / portability

    • Converse: consistent messages-based request/response across supported chat-capable models; you can generally switch models with minimal code changes. [1][2]
    • InvokeModel: you send the provider/model-specific JSON body (field names and shapes differ by model family), so switching models often means rewriting payload parsing/building. [3][4]
  • Conversation handling

    • Converse: designed for multi-turn; you pass structured messages (including history) in one standardized format. [1][2]
    • InvokeModel: can do multi-turn only if the specific model’s schema supports it (for example, Anthropic “messages” format), but it’s not standardized across providers. [3]
  • Tool use (function calling)

    • Converse: has first-class, standardized toolConfig/tool use for supported models. [1][2]
    • InvokeModel: tool/function calling is provider-specific (different field names/structures depending on model family). [4]
  • Modalities & “non-chat” workloads

    • InvokeModel: explicitly supports text, images, and embeddings (whatever the selected model exposes), using the model’s native schema. [3]
    • Converse: only works with models that support messages (chat-style). [2]
  • Structured outputs (JSON schema)

    • Both can do structured outputs, but the parameter locations/names differ; Converse uses Bedrock-standard fields, while InvokeModel varies by provider (and can vary again for open-weight models). [4]
  • Endpoints / operations

    • Converse: POST /model/{modelId}/converse (plus ConverseStream). [2]
    • InvokeModel: POST /model/{modelId}/invoke (plus streaming variant). [3]

Practical guidance

  • Prefer Converse/ConverseStream for chat apps, multi-turn assistants, tool/function calling, and easier model switching. [1][2]
  • Use InvokeModel/InvokeModelWithResponseStream when you need full provider-native control, or you’re calling non-messages models (for example embeddings or many image models). [3][4]

Sources: AWS “What’s New” (Converse API announcement) [1], Bedrock Runtime API refs for Converse [2] and InvokeModel [3], and AWS guidance comparing parameter formats (structured outputs / Converse vs InvokeModel) [4].


🌐 Web query:

AWS Bedrock invoke_model request body schema format

💡 Result:

InvokeModel does not have one universal JSON schema. The body is model-specific JSON (sent as raw bytes) and must match the inference parameters for the particular model/provider you’re invoking. You must set Content-Type: application/json (and typically Accept: application/json). (docs.aws.amazon.com)

Generic REST shape

  • Endpoint: POST /model/{modelId}/invoke
  • Headers: Content-Type: application/json, Accept: application/json
  • Body: JSON for that model (max request body size is very large, up to 25,000,000 bytes). (docs.aws.amazon.com)

Example request bodies (common ones)

Anthropic Claude (Messages API format)

{
  "anthropic_version": "bedrock-2023-05-31",
  "max_tokens": 1024,
  "messages": [
    { "role": "user", "content": [ { "type": "text", "text": "Hello world" } ] }
  ]
}

(docs.aws.amazon.com)

Amazon Titan Text (text generation)

{
  "inputText": "Write a short poem about apples.",
  "textGenerationConfig": {
    "maxTokenCount": 256,
    "temperature": 0,
    "topP": 0.9,
    "stopSequences": []
  }
}

(aws.amazon.com)

Amazon Titan Text Embeddings

{
  "inputText": "Text to embed"
}

(aws.amazon.com)

If you tell me the exact modelId (e.g., anthropic.claude-..., amazon.titan-..., stability.stable-diffusion-...), I can paste the exact request/response schema for that model from the “Inference request parameters…” reference. (docs.aws.amazon.com)

Citations:


🏁 Script executed:

cd flo_ai && head -60 flo_ai/llm/aws_bedrock_llm.py | cat -n

Repository: rootflo/wavefront

Length of output: 2260


🏁 Script executed:

cd flo_ai && rg -A 5 -B 5 "class.*Bedrock" flo_ai/llm/aws_bedrock_llm.py

Repository: rootflo/wavefront

Length of output: 294


🏁 Script executed:

cd flo_ai && rg "invoke_model\|InvokeModel\|Converse\|modelId" flo_ai/llm/aws_bedrock_llm.py

Repository: rootflo/wavefront

Length of output: 43


🏁 Script executed:

cd flo_ai && sed -n '63,90p' flo_ai/llm/aws_bedrock_llm.py | cat -n

Repository: rootflo/wavefront

Length of output: 1226


🏁 Script executed:

cd flo_ai && rg -i "openai\|converse\|model.*compatibility\|supported.*model" flo_ai/llm/aws_bedrock_llm.py

Repository: rootflo/wavefront

Length of output: 43


🏁 Script executed:

cd flo_ai && grep -n "invoke_model\|ConversUrl" flo_ai/llm/aws_bedrock_llm.py

Repository: rootflo/wavefront

Length of output: 174


🏁 Script executed:

cd flo_ai && head -100 flo_ai/llm/aws_bedrock_llm.py | tail -40

Repository: rootflo/wavefront

Length of output: 1440


🏁 Script executed:

cd flo_ai && grep -n "class\|def\|#" flo_ai/llm/aws_bedrock_llm.py | head -30

Repository: rootflo/wavefront

Length of output: 861


This implementation is coupled to OpenAI-compatible Bedrock models, not general Bedrock support.

The request body uses OpenAI-style schema (model, messages, temperature, max_completion_tokens, tools) and response parsing expects choices[].message format. While the default model openai.gpt-oss-20b-1:0 is OpenAI-compatible and works correctly, the generic class name AWSBedrock implies broader Bedrock model support.

Users attempting to use native Bedrock models (e.g., anthropic.claude-3-5-sonnet-*, amazon.titan-text-*, meta.llama-*) will encounter failures, as those models use model-specific request/response schemas via invoke_model or require the standardized Converse API.

Recommended fixes:

  • Add a docstring explicitly documenting that this class only supports OpenAI-compatible Bedrock models, or
  • Rename to AWSBedrockOpenAICompat to clarify scope, or
  • Migrate to the Converse API for broader Bedrock model compatibility.

Also, the 'model' key in the request body (line 74) is required for OpenAI but redundant with the modelId parameter (line 85) and may cause issues if model-specific validation is added.

🧰 Tools
🪛 Ruff (0.15.2)

[warning] 69-69: Unused method argument: kwargs

(ARG002)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@flo_ai/flo_ai/llm/aws_bedrock_llm.py` around lines 63 - 88, The AWSBedrock
class is implemented to talk to OpenAI-compatible Bedrock models (generate uses
OpenAI-style request/response shapes and boto_client.invoke_model), so either
(A) make this explicit by updating the class docstring of AWSBedrock (or rename
the class to AWSBedrockOpenAICompat) and keep the current OpenAI-compatible
behavior, or (B) migrate generate to use Bedrock's broader Converse API and
model-specific request/response handling (replace current request_body keys like
'model', 'messages', 'temperature', 'max_completion_tokens', 'tools' and adjust
response parsing that expects choices[].message). In either path remove the
redundant 'model' key from request_body (you already pass modelId=self.model to
boto_client.invoke_model) and update generate (and any usage of
_convert_messages and response parsing) so behavior matches the chosen
compatibility scope.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks likes a valid issue, maybe u can add a comment on whether its true

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Examine the full generate method and response parsing
cd flo_ai && sed -n '63,110p' flo_ai/llm/aws_bedrock_llm.py | cat -n

Length of output: 2124


🏁 Script executed:

#!/bin/bash
# Look at how the response is parsed after invoke_model
cd flo_ai && sed -n '85,130p' flo_ai/llm/aws_bedrock_llm.py | cat -n

Length of output: 2027


🏁 Script executed:

#!/bin/bash
# Check if there are any comments or docstrings explaining model compatibility
cd flo_ai && head -30 flo_ai/llm/aws_bedrock_llm.py | cat -n

Length of output: 1008


@vizsatiz Yes, the issue is valid. Here's the confirmation based on the code:

Evidence from the implementation:

  1. Request body uses OpenAI format (lines 74-81):

    request_body = {
        'model': self.model,
        'messages': converted,
        'temperature': self.temperature,
        'max_completion_tokens': ...,  # OpenAI-style
        'tools': functions
    }
  2. Response parsing expects OpenAI format (lines 88-105):

    choices = response_body.get('choices', [])
    message = choices[0].get('message', {})
    tool_calls = message.get('tool_calls', [])
    tc['function']['name']  # OpenAI tool call structure
  3. No documentation about model compatibility limits.

What breaks with native Bedrock models:

  • Anthropic Claude (anthropic.claude-3-5-sonnet-*): Expects {"anthropic_version": "...", "max_tokens": ..., "messages": [...]} and returns {"content": [...], "usage": {...}} directly
  • Amazon Titan (amazon.titan-text-*): Expects {"inputText": "...", "textGenerationConfig": {...}} and returns {"results": [...]}
  • Meta Llama (meta.llama-*): Has its own schema structure

Recommendation: Either add a clear docstring/comment stating "This class only supports OpenAI-compatible Bedrock models (openai.*)" or rename to AWSBedrockOpenAI to make the limitation explicit.


usage = response_body.get('usage', {})
if usage:
llm_metrics.record_tokens(
total_tokens=usage.get('total_tokens', 0),
prompt_tokens=usage.get('prompt_tokens', 0),
completion_tokens=usage.get('completion_tokens', 0),
model=self.model,
provider='bedrock',
)
tracer = get_tracer()
if tracer:
add_span_attributes(
trace.get_current_span(),
{
'llm.tokens.prompt': usage.get('prompt_tokens', 0),
'llm.tokens.completion': usage.get('completion_tokens', 0),
'llm.tokens.total': usage.get('total_tokens', 0),
},
)

choices = response_body.get('choices', [])
if not choices:
return {'content': '', 'raw_message': response_body}

message = choices[0].get('message', {})
if 'content' in message and message['content']:
message['content'] = self._strip_reasoning(message['content'])
text_content = message.get('content', '') or ''
tool_call = None

tool_calls = message.get('tool_calls', [])
if tool_calls:
tc = tool_calls[0]
tool_call = {
'name': tc['function']['name'],
'arguments': tc['function']['arguments'],
'id': tc['id'],
}

if tool_call:
return {
'content': text_content,
'function_call': tool_call,
'raw_message': message,
}
return {'content': text_content, 'raw_message': message}

@trace_llm_stream(provider='bedrock')
async def stream(
self,
messages: List[Dict[str, Any]],
functions: Optional[List[Dict[str, Any]]] = None,
**kwargs: Any,
) -> AsyncIterator[Dict[str, Any]]:
converted = self._convert_messages(messages)

request_body: Dict[str, Any] = {
'model': self.model,
'messages': converted,
'temperature': self.temperature,
'stream': True,
}
if 'max_tokens' in self.kwargs:
request_body['max_completion_tokens'] = self.kwargs['max_tokens']
if functions:
request_body['tools'] = functions

response = await asyncio.to_thread(
self.boto_client.invoke_model_with_response_stream,
modelId=self.model,
body=json.dumps(request_body),
)

buffer = ''

for event in response['body']:
chunk_bytes = event.get('chunk', {}).get('bytes', b'')
if not chunk_bytes:
continue
text = chunk_bytes.decode('utf-8').strip()
# Try direct JSON first (some Bedrock models skip SSE envelope)
try:
data = json.loads(text)
content = data.get('choices', [{}])[0].get('delta', {}).get('content')
if content:
buffer += content
continue
except json.JSONDecodeError:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please address this, we shouldnt do pass without a log

pass
# Fall back to SSE format: "data: {...}"
for line in text.split('\n'):
line = line.strip()
if line.startswith('data: ') and line != 'data: [DONE]':
try:
data = json.loads(line[6:])
content = (
data.get('choices', [{}])[0].get('delta', {}).get('content')
)
if content:
buffer += content
except json.JSONDecodeError:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here

pass

clean = self._strip_reasoning(buffer)
if clean:
yield {'content': clean}
Comment on lines +163 to +195
Copy link

@coderabbitai coderabbitai bot Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

stream() accumulates all content and yields once — not actually streaming.

The method collects the entire response into buffer (lines 163–191), then yields a single chunk at the end (lines 193–195). This defeats the purpose of streaming, as callers will block until the full response is generated before receiving anything. Each content delta should be yielded as it arrives:

Additionally, the for event in response['body'] loop (line 165) runs synchronously on the async event loop thread, blocking it for the entire duration of the stream. Only the initial invoke_model_with_response_stream call is offloaded via asyncio.to_thread.

🐛 Proposed fix — yield incremental chunks and offload blocking iteration
-        buffer = ''
-
-        for event in response['body']:
-            chunk_bytes = event.get('chunk', {}).get('bytes', b'')
-            if not chunk_bytes:
-                continue
-            text = chunk_bytes.decode('utf-8').strip()
-            # Try direct JSON first (some Bedrock models skip SSE envelope)
-            try:
-                data = json.loads(text)
-                content = data.get('choices', [{}])[0].get('delta', {}).get('content')
-                if content:
-                    buffer += content
-                continue
-            except json.JSONDecodeError:
-                pass
-            # Fall back to SSE format: "data: {...}"
-            for line in text.split('\n'):
-                line = line.strip()
-                if line.startswith('data: ') and line != 'data: [DONE]':
-                    try:
-                        data = json.loads(line[6:])
-                        content = (
-                            data.get('choices', [{}])[0].get('delta', {}).get('content')
-                        )
-                        if content:
-                            buffer += content
-                    except json.JSONDecodeError:
-                        pass
-
-        clean = self._strip_reasoning(buffer)
-        if clean:
-            yield {'content': clean}
+        queue: asyncio.Queue = asyncio.Queue()
+
+        def _iter_events():
+            for event in response['body']:
+                chunk_bytes = event.get('chunk', {}).get('bytes', b'')
+                if chunk_bytes:
+                    queue.put_nowait(chunk_bytes)
+            queue.put_nowait(None)  # sentinel
+
+        loop = asyncio.get_event_loop()
+        loop.run_in_executor(None, _iter_events)
+
+        while True:
+            chunk_bytes = await queue.get()
+            if chunk_bytes is None:
+                break
+            text = chunk_bytes.decode('utf-8').strip()
+            content = None
+            try:
+                data = json.loads(text)
+                content = data.get('choices', [{}])[0].get('delta', {}).get('content')
+            except json.JSONDecodeError:
+                for line in text.split('\n'):
+                    line = line.strip()
+                    if line.startswith('data: ') and line != 'data: [DONE]':
+                        try:
+                            data = json.loads(line[6:])
+                            content = data.get('choices', [{}])[0].get('delta', {}).get('content')
+                        except json.JSONDecodeError:
+                            logger.debug("Skipping malformed SSE line: %s", line)
+            if content:
+                clean = self._strip_reasoning(content)
+                if clean:
+                    yield {'content': clean}

Note: This sketch uses a queue + executor to avoid blocking the event loop. The exact approach may need refinement depending on the framework's async patterns, but the key point is: yield each chunk as it arrives.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@flo_ai/flo_ai/llm/aws_bedrock_llm.py` around lines 163 - 195, The stream()
method currently accumulates all deltas into a local buffer and yields only
once; change it to yield each content delta as it is parsed so consumers receive
streaming updates: offload the blocking iteration over response['body'] (which
comes from invoke_model_with_response_stream) to a thread or executor (e.g.,
asyncio.to_thread/run_in_executor) so the event loop isn't blocked, parse each
event/chunk as it arrives, apply self._strip_reasoning to each incremental piece
(or accumulate minimally and strip per-yield if needed) and yield {'content':
cleaned_chunk} immediately for every non-empty content delta instead of
returning a single final yield.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this too

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vizsatiz, understood! Would you like me to plan the code changes to implement proper streaming for the stream() method? I can help refactor it to:

  1. Yield content deltas incrementally as they arrive (instead of accumulating everything in a buffer)
  2. Offload the blocking iteration over response['body'] to avoid blocking the async event loop
  3. Apply _strip_reasoning() to each chunk appropriately

Just let me know and I'll use `` to break down the implementation for you! 🚀


💡 For code changes across multiple files, use @coderabbitai plan in an issue comment


def get_message_content(self, response: Dict[str, Any]) -> str:
content = (
response.get('content', '') if isinstance(response, dict) else str(response)
)
return self._strip_reasoning(content)

def format_tool_for_llm(self, tool: 'Tool') -> Dict[str, Any]:
return {
'type': 'function',
'function': {
'name': tool.name,
'description': tool.description,
'parameters': {
'type': 'object',
'properties': {
name: {
'type': info.get('type', 'string'),
'description': info.get('description', ''),
**(
{'items': info['items']}
if info.get('type') == 'array' and 'items' in info
else {}
),
}
for name, info in tool.parameters.items()
},
'required': [
name
for name, info in tool.parameters.items()
if info.get('required', True)
],
},
},
}

def format_tools_for_llm(self, tools: List['Tool']) -> List[Dict[str, Any]]:
return [self.format_tool_for_llm(tool) for tool in tools]

def format_image_in_message(self, image: ImageMessageContent) -> dict:
if image.base64:
return {
'type': 'image_url',
'image_url': {
'url': f'data:{image.mime_type or "image/jpeg"};base64,{image.base64}'
},
}
raise NotImplementedError(
'AWS Bedrock LLM requires image base64 data to format image content.'
)

def get_assistant_message_for_tool_call(
self, response: Dict[str, Any]
) -> Optional[Any]:
if isinstance(response, dict) and 'raw_message' in response:
return response['raw_message']
return None

def get_tool_use_id(self, function_call: Dict[str, Any]) -> Optional[str]:
return function_call.get('id')

def format_function_result_message(
self, function_name: str, content: str, tool_use_id: Optional[str] = None
) -> Dict[str, Any]:
return {
'role': 'tool',
'tool_call_id': tool_use_id or 'unknown',
'content': content,
'name': function_name,
}