Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 12 additions & 7 deletions dimos/manipulation/control/coordinator_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,20 +121,25 @@ def get_joint_positions(self) -> dict[str, float]:
return self._rpc.get_joint_positions() or {}

def get_trajectory_status(self, task_name: str) -> dict[str, Any]:
"""Get status of a trajectory task."""
return self._rpc.get_trajectory_status(task_name) or {}
"""Get status of a trajectory task via task_invoke."""
result = self._rpc.task_invoke(task_name, "get_state", {})
if result is not None:
return {"state": int(result), "task": task_name}
return {}

# =========================================================================
# Trajectory execution (RPC calls)
# Trajectory execution (via task_invoke)
# =========================================================================

def execute_trajectory(self, task_name: str, trajectory: JointTrajectory) -> bool:
"""Execute a trajectory on a task."""
return self._rpc.execute_trajectory(task_name, trajectory) or False
"""Execute a trajectory on a task via task_invoke."""
result = self._rpc.task_invoke(task_name, "execute", {"trajectory": trajectory})
return bool(result)

def cancel_trajectory(self, task_name: str) -> bool:
"""Cancel an active trajectory."""
return self._rpc.cancel_trajectory(task_name) or False
"""Cancel an active trajectory via task_invoke."""
result = self._rpc.task_invoke(task_name, "cancel", {})
return bool(result)

# =========================================================================
# Task selection and setup
Expand Down
52 changes: 43 additions & 9 deletions dimos/manipulation/manipulation_blueprints.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,20 @@
Blueprints for manipulation module integration with ControlCoordinator.

Usage:
# Start coordinator first, then planner:
coordinator = xarm7_planner_coordinator.build()
coordinator.loop()

# Plan and execute via RPC client:
from dimos.manipulation.planning.examples.manipulation_client import ManipulationClient
client = ManipulationClient()
client.plan_to_joints([0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7])
client.execute()
# Non-agentic (manual RPC):
dimos run coordinator-mock
dimos run xarm-perception

# Agentic (LLM agent with skills):
dimos run coordinator-mock
dimos run xarm-perception-agent
"""

import math
from pathlib import Path

from dimos.agents.agent import llm_agent
from dimos.agents.cli.human import human_input
from dimos.core.blueprints import autoconnect
from dimos.core.transport import LCMTransport
from dimos.hardware.sensors.camera.realsense import realsense_camera
Expand Down Expand Up @@ -232,6 +232,8 @@ def _make_xarm7_config(
coordinator_task_name=coordinator_task,
gripper_hardware_id=gripper_hardware_id,
tf_extra_links=tf_extra_links or [],
# Home configuration: arm extended forward, elbow up (safe observe pose)
home_joints=[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
)


Expand Down Expand Up @@ -366,11 +368,43 @@ def _make_piper_config(
)


# XArm7 perception + LLM agent for agentic manipulation
# Skills (pick, place, move_to_pose, etc.) auto-register with the agent's SkillCoordinator.
# Usage: dimos run coordinator-mock, then dimos run xarm-perception-agent
_MANIPULATION_AGENT_SYSTEM_PROMPT = """\
You are a robotic manipulation assistant controlling an xArm7 robot arm.

Use ONLY these ManipulationModule skills for manipulation tasks:
- scan_objects: Scan scene and list detected objects with 3D positions. Always call this first.
- pick: Pick up an object by name. Requires scan_objects first.
- place: Place a held object at x, y, z position.
- place_back: Place a held object back at its original pick position.
- pick_and_place: Pick an object and place it at a target location.
- move_to_pose: Move end-effector to x, y, z with optional roll, pitch, yaw.
- move_to_joints: Move to a joint configuration (comma-separated radians).
- open_gripper / close_gripper / set_gripper: Control the gripper.
- go_home: Move to the home/observe position.
- go_init: Return to the startup position.
- get_scene_info: Get full robot state, detected objects, and scene info.

Do NOT use the 'detect' or 'select' skills — use scan_objects instead.
For robot_name parameters, always omit or pass None (single-arm setup).
After pick or place, return to init with go_init unless another action follows immediately.
"""

xarm_perception_agent = autoconnect(
xarm_perception,
llm_agent(system_prompt=_MANIPULATION_AGENT_SYSTEM_PROMPT),
human_input(),
)


__all__ = [
"PIPER_GRIPPER_COLLISION_EXCLUSIONS",
"XARM_GRIPPER_COLLISION_EXCLUSIONS",
"dual_xarm6_planner",
"xarm6_planner_only",
"xarm7_planner_coordinator",
"xarm_perception",
"xarm_perception_agent",
]
Loading