Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion tasks/hivemind/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,11 @@ def do_rag_query(self) -> str:

# Run the agent
agent_executor = AgentExecutor(
agent=agent, tools=tools, verbose=True, return_intermediate_steps=False
agent=agent,
tools=tools,
verbose=True,
return_intermediate_steps=False,
max_iterations=3,
)

result = agent_executor.invoke({"input": self.state.user_query})
Expand Down
33 changes: 26 additions & 7 deletions tasks/hivemind/query_data_sources.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import os
import logging

import nest_asyncio
from dotenv import load_dotenv
Expand All @@ -8,6 +9,7 @@
from tc_temporal_backend.client import TemporalClient
from tc_temporal_backend.schema.hivemind import HivemindQueryPayload
from temporalio.common import RetryPolicy
from temporalio.client import WorkflowFailureError
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Incorrect import for WorkflowFailureError

WorkflowFailureError is exposed from temporalio.exceptions, not temporalio.client. This will raise ImportError at runtime.

-from temporalio.client import WorkflowFailureError
+from temporalio.exceptions import WorkflowFailureError
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
from temporalio.client import WorkflowFailureError
- from temporalio.client import WorkflowFailureError
+ from temporalio.exceptions import WorkflowFailureError
🤖 Prompt for AI Agents
In tasks/hivemind/query_data_sources.py around line 12, the code imports
WorkflowFailureError from temporalio.client which is incorrect; change the
import to import WorkflowFailureError from temporalio.exceptions (i.e., replace
the temporalio.client import with temporalio.exceptions) so the correct symbol
is imported and ImportError at runtime is avoided.


nest_asyncio.apply()

Expand Down Expand Up @@ -42,13 +44,30 @@ async def query(self, query: str) -> str | None:
payload.workflow_id = self.workflow_id

hivemind_queue = self.load_hivemind_queue()
result = await client.execute_workflow(
"HivemindWorkflow",
payload,
id=f"hivemind-query-{self.community_id}-{self.workflow_id}",
task_queue=hivemind_queue,
retry_policy=RetryPolicy(maximum_attempts=3),
)
try:
result = await client.execute_workflow(
"HivemindWorkflow",
payload,
id=f"hivemind-query-{self.community_id}-{self.workflow_id}",
task_queue=hivemind_queue,
retry_policy=RetryPolicy(maximum_attempts=3),
)
except WorkflowFailureError as e:
Comment on lines +47 to +55
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Ensure workflow ID uniqueness when workflow_id is absent

Using id=f"hivemind-query-{community_id}-{workflow_id}" will collide across requests when workflow_id is None, leading to WorkflowAlreadyStarted-like errors. Fallback to a random suffix when no workflow_id is provided.

-                id=f"hivemind-query-{self.community_id}-{self.workflow_id}",
+                id=f"hivemind-query-{self.community_id}-{self.workflow_id or os.urandom(8).hex()}",

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In tasks/hivemind/query_data_sources.py around lines 47 to 55, the workflow id
string uses self.workflow_id directly which can be None and cause collisions;
update the code to compute a unique workflow_id_part (use self.workflow_id if
truthy otherwise generate a short random suffix such as a UUID4 or
timestamp+random) and then use
id=f"hivemind-query-{self.community_id}-{workflow_id_part}" when calling
client.execute_workflow so each started workflow without an explicit workflow_id
gets a unique id to avoid WorkflowAlreadyStarted errors.

logging.error(f"WorkflowFailureError: {e} for workflow {self.workflow_id}", exc_info=True)
return None
except Exception as e:
logging.error(f"Exception: {e} for workflow {self.workflow_id}", exc_info=True)
return None

# Normalize Temporal failure-shaped responses that may be returned as data
if isinstance(result, dict) and (
"workflowExecutionFailedEventAttributes" in result or "failure" in result
):
logging.error(f"WorkflowFailureError: {result} for workflow {self.workflow_id}", exc_info=True)
return None
if isinstance(result, str) and "workflowExecutionFailedEventAttributes" in result:
logging.error(f"WorkflowFailureError: {result} for workflow {self.workflow_id}", exc_info=True)
return None
Comment on lines +62 to +70
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Don’t log exc_info for non-exceptions

These branches handle failure-shaped payloads, not exceptions. exc_info=True prints a spurious “NoneType: None” stack trace. Remove exc_info here.

-            logging.error(f"WorkflowFailureError: {result} for workflow {self.workflow_id}", exc_info=True)
+            logging.error(f"WorkflowFailureError-shaped payload for workflow {self.workflow_id}: {result}")
...
-            logging.error(f"WorkflowFailureError: {result} for workflow {self.workflow_id}", exc_info=True)
+            logging.error(f"WorkflowFailureError-shaped payload for workflow {self.workflow_id}: {result}")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# Normalize Temporal failure-shaped responses that may be returned as data
if isinstance(result, dict) and (
"workflowExecutionFailedEventAttributes" in result or "failure" in result
):
logging.error(f"WorkflowFailureError: {result} for workflow {self.workflow_id}", exc_info=True)
return None
if isinstance(result, str) and "workflowExecutionFailedEventAttributes" in result:
logging.error(f"WorkflowFailureError: {result} for workflow {self.workflow_id}", exc_info=True)
return None
# Normalize Temporal failure-shaped responses that may be returned as data
if isinstance(result, dict) and (
"workflowExecutionFailedEventAttributes" in result or "failure" in result
):
logging.error(f"WorkflowFailureError-shaped payload for workflow {self.workflow_id}: {result}")
return None
if isinstance(result, str) and "workflowExecutionFailedEventAttributes" in result:
logging.error(f"WorkflowFailureError-shaped payload for workflow {self.workflow_id}: {result}")
return None
🤖 Prompt for AI Agents
In tasks/hivemind/query_data_sources.py around lines 62 to 70, the logging.error
calls for failure-shaped payloads include exc_info=True which prints a spurious
stack trace for non-exceptions; remove the exc_info=True argument from both
logging.error calls so the message is logged without exception/stack trace
information.


Comment on lines +62 to 71
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Verify return type consistency (may not always be str)

You normalize failure-shaped dict/str responses, but a successful result could also be a dict depending on the workflow. The method signature declares str | None, and the tool wrapper returns response as-is. If success can be non-str, adjust types or coerce to str before returning.

Possible adjustments:

  • Change signature to return Any or Union[str, dict, None].
  • Or, stringify non-str successes:
if not isinstance(result, str):
    result = json.dumps(result, ensure_ascii=False)
return result

If you want, I can scan this repo for consumers that assume a string and propose end-to-end changes.


I’ll inspect the function signature to confirm what types it may return. Running a quick search and file excerpt now.


🏁 Script executed:

#!/bin/bash
# Locate the function definition and show its signature
rg -n "def " tasks/hivemind/query_data_sources.py

# Show the first 200 lines to capture imports and the method signature
sed -n '1,200p' tasks/hivemind/query_data_sources.py

Length of output: 4699


Ensure query always returns a string or update its signature

The async def query(self, …) -> str | None currently returns whatever the workflow yields—often a dict—which violates the declared return type and will break the get_rag_answer tool (it expects a str). You should either serialize non‐string results or widen the type hint.

Locations to update in tasks/hivemind/query_data_sources.py:

  • After the failure checks (around line 70), coerce dicts to JSON:
         # Normalize Temporal failure-shaped responses …
         if isinstance(result, dict) and ("workflowExecutionFailedEventAttributes" in result or "failure" in result):
             logging.error(…)
             return None
         if isinstance(result, str) and "workflowExecutionFailedEventAttributes" in result:
             logging.error(…)
             return None
    
  •  # Ensure successful payloads are returned as strings
    
  •  if not isinstance(result, str):
    
  •      import json
    
  •      result = json.dumps(result, ensure_ascii=False)
    
     return result
    
  • Or, if you intend to surface structured data, change the signature to:
    async def query(self, query: str) -> dict | str | None:
    and update the get_rag_answer wrapper accordingly.
🤖 Prompt for AI Agents
In tasks/hivemind/query_data_sources.py around lines 62–71, the function
annotated as async def query(...) -> str | None sometimes returns dicts (or
other non-string types), violating the signature and breaking callers; after the
existing failure checks, coerce non-string results to strings (e.g., import json
and if isinstance(result, dict) set result = json.dumps(result), and for other
non-str types convert with str(result)) so the function always returns a str or
None, or alternatively change the signature to async def query(self, query: str)
-> dict | str | None and update get_rag_answer to accept structured results—pick
one approach and apply the corresponding code and type-hint changes.

return result

Expand Down