From 1ba0ddd52a545c240e66318966b9011dc4440a4a Mon Sep 17 00:00:00 2001 From: Mohammad Amin Date: Wed, 13 Aug 2025 12:51:02 +0330 Subject: [PATCH] feat: enhance Hivemind agent execution with max iterations and improve error handling in query data sources --- tasks/hivemind/agent.py | 6 ++++- tasks/hivemind/query_data_sources.py | 33 ++++++++++++++++++++++------ 2 files changed, 31 insertions(+), 8 deletions(-) diff --git a/tasks/hivemind/agent.py b/tasks/hivemind/agent.py index 952c1fe..ada6f57 100644 --- a/tasks/hivemind/agent.py +++ b/tasks/hivemind/agent.py @@ -176,7 +176,11 @@ def do_rag_query(self) -> str: # Run the agent agent_executor = AgentExecutor( - agent=agent, tools=tools, verbose=True, return_intermediate_steps=False + agent=agent, + tools=tools, + verbose=True, + return_intermediate_steps=False, + max_iterations=3, ) result = agent_executor.invoke({"input": self.state.user_query}) diff --git a/tasks/hivemind/query_data_sources.py b/tasks/hivemind/query_data_sources.py index 3bd6a1b..5367eb0 100644 --- a/tasks/hivemind/query_data_sources.py +++ b/tasks/hivemind/query_data_sources.py @@ -1,5 +1,6 @@ import asyncio import os +import logging import nest_asyncio from dotenv import load_dotenv @@ -8,6 +9,7 @@ from tc_temporal_backend.client import TemporalClient from tc_temporal_backend.schema.hivemind import HivemindQueryPayload from temporalio.common import RetryPolicy +from temporalio.client import WorkflowFailureError nest_asyncio.apply() @@ -42,13 +44,30 @@ async def query(self, query: str) -> str | None: payload.workflow_id = self.workflow_id hivemind_queue = self.load_hivemind_queue() - result = await client.execute_workflow( - "HivemindWorkflow", - payload, - id=f"hivemind-query-{self.community_id}-{self.workflow_id}", - task_queue=hivemind_queue, - retry_policy=RetryPolicy(maximum_attempts=3), - ) + try: + result = await client.execute_workflow( + "HivemindWorkflow", + payload, + id=f"hivemind-query-{self.community_id}-{self.workflow_id}", + task_queue=hivemind_queue, + retry_policy=RetryPolicy(maximum_attempts=3), + ) + except WorkflowFailureError as e: + logging.error(f"WorkflowFailureError: {e} for workflow {self.workflow_id}", exc_info=True) + return None + except Exception as e: + logging.error(f"Exception: {e} for workflow {self.workflow_id}", exc_info=True) + return None + + # Normalize Temporal failure-shaped responses that may be returned as data + if isinstance(result, dict) and ( + "workflowExecutionFailedEventAttributes" in result or "failure" in result + ): + logging.error(f"WorkflowFailureError: {result} for workflow {self.workflow_id}", exc_info=True) + return None + if isinstance(result, str) and "workflowExecutionFailedEventAttributes" in result: + logging.error(f"WorkflowFailureError: {result} for workflow {self.workflow_id}", exc_info=True) + return None return result