feat: enhance Hivemind agent execution with max iterations and improve error handling!#48
Conversation
…e error handling in query data sources
WalkthroughCaps AgentExecutor iterations at 3 in do_rag_query. Adds logging and exception handling around Temporal workflow execution in QueryDataSources.query, normalizing failure-shaped results to None. No other logic changes. Changes
Sequence Diagram(s)sequenceDiagram
participant Caller as do_rag_query
participant Agent as AgentExecutor
Caller->>Agent: initialize(max_iterations=3)
loop up to 3 iterations
Agent->>Agent: plan/act/observe
end
Agent-->>Caller: final result or early stop
sequenceDiagram
participant Caller as QueryDataSources.query
participant Temporal as Temporal Client
Caller->>Temporal: execute_workflow(payload, id, task_queue, ...)
alt success
Temporal-->>Caller: result
alt result indicates failure-shape
Caller->>Caller: log error
Caller-->>Caller: return None
else valid data
Caller-->>Caller: return result
end
else WorkflowFailureError/Exception
Temporal-->>Caller: throw
Caller->>Caller: log error
Caller-->>Caller: return None
end
Estimated code review effort🎯 2 (Simple) | ⏱️ ~8 minutes Possibly related PRs
Poem
✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Actionable comments posted: 4
🔭 Outside diff range comments (1)
tasks/hivemind/agent.py (1)
186-189: Guard agent invocation with try/except and log contextGiven the tool invokes Temporal and networked services, agent_executor.invoke can raise. Mirror the improved error handling pattern and avoid flow crashes by catching exceptions here, logging, and setting a safe last_answer fallback.
Example change (outside this hunk):
try: result = agent_executor.invoke({"input": self.state.user_query}) self.state.last_answer = result["output"] except Exception as e: logging.error(f"Agent execution failed for workflow_id={self.workflow_id}: {e}", exc_info=True) self.state.last_answer = "Sorry, I couldn't complete your request." finally: self.state.retry_count += 1
🧹 Nitpick comments (3)
tasks/hivemind/agent.py (1)
179-184: Make max_iterations configurableHard-coding 3 may be too strict/lenient depending on the task. Consider exposing it via a constructor arg to AgenticHivemindFlow and pass it through here.
Apply within these lines:
- max_iterations=3, + max_iterations=self.max_agent_iterations,Outside this hunk, add a parameter and store it:
# In __init__(..., max_agent_iterations: int = 3, ...) self.max_agent_iterations = max_agent_iterationstasks/hivemind/query_data_sources.py (2)
55-61: Consider handling specific Temporal exceptions and adding timeoutsCatching WorkflowFailureError and a generic Exception is fine. Consider also handling known Temporal exceptions (e.g., already started) separately for clearer operator signals, and consider adding a suitable workflow timeout to avoid hanging calls.
Would you like me to propose concrete exception types/timeouts based on the tc_temporal_backend and temporalio versions you’re on?
3-3: Use a module-level logger instead of the root loggerTo integrate better with app-wide logging config, prefer a module-specific logger.
Outside this hunk:
logger = logging.getLogger(__name__) # then replace logging.error/info(...) with logger.error/info(...)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
tasks/hivemind/agent.py(1 hunks)tasks/hivemind/query_data_sources.py(3 hunks)
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-02-26T15:41:54.631Z
Learnt from: amindadgar
PR: TogetherCrew/agents-workflow#2
File: tasks/hivemind/query_data_sources.py:27-31
Timestamp: 2025-02-26T15:41:54.631Z
Learning: In the agents-workflow project, "HivemindWorkflow" and "AgenticHivemindTemporalWorkflow" are two distinct workflows. The QueryDataSources class in tasks/hivemind/query_data_sources.py is designed to interact with "HivemindWorkflow", which is separate from "AgenticHivemindTemporalWorkflow" defined in tasks/agent.py.
Applied to files:
tasks/hivemind/query_data_sources.py
📚 Learning: 2025-06-17T13:03:41.479Z
Learnt from: amindadgar
PR: TogetherCrew/agents-workflow#37
File: tasks/hivemind/query_data_sources.py:40-43
Timestamp: 2025-06-17T13:03:41.479Z
Learning: The HivemindQueryPayload schema in tc-temporal-backend package version 1.1.3 includes support for the workflow_id field, so assignments like payload.workflow_id = self.workflow_id are valid and won't cause ValidationError.
Applied to files:
tasks/hivemind/query_data_sources.py
🔇 Additional comments (1)
tasks/hivemind/agent.py (1)
179-184: Capping agent iterations at 3 — LGTMSetting max_iterations=3 is a sensible safeguard against runaway agent loops. No issues spotted with the current usage.
| from tc_temporal_backend.client import TemporalClient | ||
| from tc_temporal_backend.schema.hivemind import HivemindQueryPayload | ||
| from temporalio.common import RetryPolicy | ||
| from temporalio.client import WorkflowFailureError |
There was a problem hiding this comment.
Incorrect import for WorkflowFailureError
WorkflowFailureError is exposed from temporalio.exceptions, not temporalio.client. This will raise ImportError at runtime.
-from temporalio.client import WorkflowFailureError
+from temporalio.exceptions import WorkflowFailureError📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| from temporalio.client import WorkflowFailureError | |
| - from temporalio.client import WorkflowFailureError | |
| + from temporalio.exceptions import WorkflowFailureError |
🤖 Prompt for AI Agents
In tasks/hivemind/query_data_sources.py around line 12, the code imports
WorkflowFailureError from temporalio.client which is incorrect; change the
import to import WorkflowFailureError from temporalio.exceptions (i.e., replace
the temporalio.client import with temporalio.exceptions) so the correct symbol
is imported and ImportError at runtime is avoided.
| try: | ||
| result = await client.execute_workflow( | ||
| "HivemindWorkflow", | ||
| payload, | ||
| id=f"hivemind-query-{self.community_id}-{self.workflow_id}", | ||
| task_queue=hivemind_queue, | ||
| retry_policy=RetryPolicy(maximum_attempts=3), | ||
| ) | ||
| except WorkflowFailureError as e: |
There was a problem hiding this comment.
🛠️ Refactor suggestion
Ensure workflow ID uniqueness when workflow_id is absent
Using id=f"hivemind-query-{community_id}-{workflow_id}" will collide across requests when workflow_id is None, leading to WorkflowAlreadyStarted-like errors. Fallback to a random suffix when no workflow_id is provided.
- id=f"hivemind-query-{self.community_id}-{self.workflow_id}",
+ id=f"hivemind-query-{self.community_id}-{self.workflow_id or os.urandom(8).hex()}",Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In tasks/hivemind/query_data_sources.py around lines 47 to 55, the workflow id
string uses self.workflow_id directly which can be None and cause collisions;
update the code to compute a unique workflow_id_part (use self.workflow_id if
truthy otherwise generate a short random suffix such as a UUID4 or
timestamp+random) and then use
id=f"hivemind-query-{self.community_id}-{workflow_id_part}" when calling
client.execute_workflow so each started workflow without an explicit workflow_id
gets a unique id to avoid WorkflowAlreadyStarted errors.
| # Normalize Temporal failure-shaped responses that may be returned as data | ||
| if isinstance(result, dict) and ( | ||
| "workflowExecutionFailedEventAttributes" in result or "failure" in result | ||
| ): | ||
| logging.error(f"WorkflowFailureError: {result} for workflow {self.workflow_id}", exc_info=True) | ||
| return None | ||
| if isinstance(result, str) and "workflowExecutionFailedEventAttributes" in result: | ||
| logging.error(f"WorkflowFailureError: {result} for workflow {self.workflow_id}", exc_info=True) | ||
| return None |
There was a problem hiding this comment.
🛠️ Refactor suggestion
Don’t log exc_info for non-exceptions
These branches handle failure-shaped payloads, not exceptions. exc_info=True prints a spurious “NoneType: None” stack trace. Remove exc_info here.
- logging.error(f"WorkflowFailureError: {result} for workflow {self.workflow_id}", exc_info=True)
+ logging.error(f"WorkflowFailureError-shaped payload for workflow {self.workflow_id}: {result}")
...
- logging.error(f"WorkflowFailureError: {result} for workflow {self.workflow_id}", exc_info=True)
+ logging.error(f"WorkflowFailureError-shaped payload for workflow {self.workflow_id}: {result}")📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| # Normalize Temporal failure-shaped responses that may be returned as data | |
| if isinstance(result, dict) and ( | |
| "workflowExecutionFailedEventAttributes" in result or "failure" in result | |
| ): | |
| logging.error(f"WorkflowFailureError: {result} for workflow {self.workflow_id}", exc_info=True) | |
| return None | |
| if isinstance(result, str) and "workflowExecutionFailedEventAttributes" in result: | |
| logging.error(f"WorkflowFailureError: {result} for workflow {self.workflow_id}", exc_info=True) | |
| return None | |
| # Normalize Temporal failure-shaped responses that may be returned as data | |
| if isinstance(result, dict) and ( | |
| "workflowExecutionFailedEventAttributes" in result or "failure" in result | |
| ): | |
| logging.error(f"WorkflowFailureError-shaped payload for workflow {self.workflow_id}: {result}") | |
| return None | |
| if isinstance(result, str) and "workflowExecutionFailedEventAttributes" in result: | |
| logging.error(f"WorkflowFailureError-shaped payload for workflow {self.workflow_id}: {result}") | |
| return None |
🤖 Prompt for AI Agents
In tasks/hivemind/query_data_sources.py around lines 62 to 70, the logging.error
calls for failure-shaped payloads include exc_info=True which prints a spurious
stack trace for non-exceptions; remove the exc_info=True argument from both
logging.error calls so the message is logged without exception/stack trace
information.
| # Normalize Temporal failure-shaped responses that may be returned as data | ||
| if isinstance(result, dict) and ( | ||
| "workflowExecutionFailedEventAttributes" in result or "failure" in result | ||
| ): | ||
| logging.error(f"WorkflowFailureError: {result} for workflow {self.workflow_id}", exc_info=True) | ||
| return None | ||
| if isinstance(result, str) and "workflowExecutionFailedEventAttributes" in result: | ||
| logging.error(f"WorkflowFailureError: {result} for workflow {self.workflow_id}", exc_info=True) | ||
| return None | ||
|
|
There was a problem hiding this comment.
💡 Verification agent
🧩 Analysis chain
Verify return type consistency (may not always be str)
You normalize failure-shaped dict/str responses, but a successful result could also be a dict depending on the workflow. The method signature declares str | None, and the tool wrapper returns response as-is. If success can be non-str, adjust types or coerce to str before returning.
Possible adjustments:
- Change signature to return Any or Union[str, dict, None].
- Or, stringify non-str successes:
if not isinstance(result, str):
result = json.dumps(result, ensure_ascii=False)
return resultIf you want, I can scan this repo for consumers that assume a string and propose end-to-end changes.
I’ll inspect the function signature to confirm what types it may return. Running a quick search and file excerpt now.
🏁 Script executed:
#!/bin/bash
# Locate the function definition and show its signature
rg -n "def " tasks/hivemind/query_data_sources.py
# Show the first 200 lines to capture imports and the method signature
sed -n '1,200p' tasks/hivemind/query_data_sources.pyLength of output: 4699
Ensure query always returns a string or update its signature
The async def query(self, …) -> str | None currently returns whatever the workflow yields—often a dict—which violates the declared return type and will break the get_rag_answer tool (it expects a str). You should either serialize non‐string results or widen the type hint.
Locations to update in tasks/hivemind/query_data_sources.py:
- After the failure checks (around line 70), coerce dicts to JSON:
# Normalize Temporal failure-shaped responses … if isinstance(result, dict) and ("workflowExecutionFailedEventAttributes" in result or "failure" in result): logging.error(…) return None if isinstance(result, str) and "workflowExecutionFailedEventAttributes" in result: logging.error(…) return None
-
# Ensure successful payloads are returned as strings -
if not isinstance(result, str): -
import json -
result = json.dumps(result, ensure_ascii=False) return result
- Or, if you intend to surface structured data, change the signature to:
and update the
async def query(self, query: str) -> dict | str | None:
get_rag_answerwrapper accordingly.
🤖 Prompt for AI Agents
In tasks/hivemind/query_data_sources.py around lines 62–71, the function
annotated as async def query(...) -> str | None sometimes returns dicts (or
other non-string types), violating the signature and breaking callers; after the
existing failure checks, coerce non-string results to strings (e.g., import json
and if isinstance(result, dict) set result = json.dumps(result), and for other
non-str types convert with str(result)) so the function always returns a str or
None, or alternatively change the signature to async def query(self, query: str)
-> dict | str | None and update get_rag_answer to accept structured results—pick
one approach and apply the corresponding code and type-hint changes.
Summary by CodeRabbit
Bug Fixes
Chores