Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions dimos/perception/experimental/temporal_memory/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,6 @@ Notes
- Evidence is extracted in sliding windows, so queries can refer to recent or past entities.
- Distance estimation can run in the background to enrich graph relations.
- If you want a different output directory, set `TemporalMemoryConfig(output_dir=...)`.

To visualize, run
` ` and open in `localhost:8080` in your browser.
55 changes: 55 additions & 0 deletions dimos/perception/experimental/temporal_memory/entity_graph_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,61 @@ def get_entity(self, entity_id: str) -> dict[str, Any] | None:
"metadata": json.loads(row["metadata"]) if row["metadata"] else None,
}

def update_entity(
self,
entity_id: str,
descriptor: str | None = None,
metadata: dict[str, Any] | None = None,
) -> bool:
"""
Update an entity's descriptor and/or metadata.

Args:
entity_id: Entity ID to update
descriptor: New descriptor (optional)
metadata: New metadata dict (optional, will merge with existing)

Returns:
True if entity was updated, False if not found
"""
conn = self._get_connection()
cursor = conn.cursor()

# Get existing entity
cursor.execute("SELECT metadata FROM entities WHERE entity_id = ?", (entity_id,))
row = cursor.fetchone()
if row is None:
return False

# Merge metadata if provided
existing_metadata = json.loads(row["metadata"]) if row["metadata"] else {}
if metadata:
existing_metadata.update(metadata)

# Update descriptor and/or metadata
updates = []
params: list[Any] = []

if descriptor is not None:
updates.append("descriptor = ?")
params.append(descriptor)

if metadata is not None:
updates.append("metadata = ?")
params.append(json.dumps(existing_metadata))

if not updates:
return True # Nothing to update

params.append(entity_id)
cursor.execute(
f"UPDATE entities SET {', '.join(updates)} WHERE entity_id = ?",
params,
)
conn.commit()
logger.debug(f"Updated entity {entity_id}")
return True

def get_all_entities(self, entity_type: str | None = None) -> list[dict[str, Any]]:
"""Get all entities, optionally filtered by type."""
conn = self._get_connection()
Expand Down
Loading
Loading