Skip to content

Conversation

@leshy
Copy link
Contributor

@leshy leshy commented Jan 21, 2026

Summary

This is a first pass on memory, cleans up the way we deal with timed events in dimos
Creates a TimeSeriesStore[T] abstraction for timestamped data with multiple backend implementations.

Backend implementations are very simple and TBH I don't expect to use psql but sqlite, implemented them more as examples.

TimeSeriesStore[T] Base Class (dimos/memory/timeseries/base.py)

  • Generic abstract base — backends implement _save, _load, _delete, _iter_items, _find_closest_timestamp, _count, _last_timestamp, _find_before, _find_after

  • API built on top: save(), find_closest(), find_before(), find_after(), prune_old(), slice_by_time(), iterate(), iterate_realtime(), stream(), consume_stream()

  • T bound to Timestamped — timestamps come from .ts attribute

Backend Implementations

Backend File Use Case
InMemoryStore inmemory.py Live streaming, buffers, testing. O(log n) via SortedKeyList
PickleDirStore pickledir.py Simple file-based, one pickle per timestamp
SqliteStore sqlite.py Single-file DB with indexed queries
PostgresStore postgres.py Multi-user, implements Resource lifecycle
LegacyPickleStore legacy.py Backward compat with old TimedSensorReplay recordings

Unified the way we treat storage of timed items

Replaced TimestampedCollection

TimestampedCollection and its subclass TimestampedBufferCollection were used for:

  • Transform buffers in TF service (TBuffer)
  • Message timestamp alignment (align_timestamped)
  • Quality filtering (image sharpness windows)

All now use InMemoryStore directly. TimestampedBufferCollection is a thin wrapper adding auto-prune on insert. TimestampedCollection class deleted entirely.

Replaced Pickle Sensor Replay System with standard time series store interface

future recording and reply can go via sqlite for example

Replaced Transform service in-memory transform storage

they all depend on the same base, storing transforms in postgres (not that we want to) is a one line change

Other changes

  • Removed generic Embedding type — all embedding models return the same type

Usage

from dimos.memory.timeseries import InMemoryStore, SqliteStore, PostgresStore

# In-memory buffer
store = InMemoryStore()
# or
# SQLite persistent storage
# store = SqliteStore("recordings/lidar")
# or
# Postgres storage
# store = PostgresStore("my_lidar_table")

store.save(data)  # data.ts used as timestamp
latest = store.last()
closest = store.find_closest(target_ts, tolerance=0.5)

for item in store.iterate_realtime(speed=2.0):
    process(item)

# Observable streaming
store.stream(speed=1.0, seek=10.0).subscribe(on_next=process)

leshy added 11 commits January 21, 2026 17:17
- quality_barrier: Callable[[Observable[T]], Observable[T]]
- sharpness_barrier: Callable[[Observable[Image]], Observable[Image]]
- Implement find_closest(), first_timestamp(), iterate(), iterate_ts(),
  iterate_realtime() methods using abstract _iter_items/_find_closest_timestamp
- Add scheduler-based stream() with absolute time reference to prevent timing
  drift during long playback (ported from replay.py)
- Move imports to top of file, add proper typing throughout
- Fix pickledir.py mypy error (pickle.load returns Any)
- Single-file SQLite storage with indexed timestamp queries
- BLOB storage for pickled sensor data
- INSERT OR REPLACE for duplicate timestamp handling
- Supports multiple tables per database (different sensors)
- Added to parametrized tests (15 tests across 3 backends)
- PostgresStore implements SensorStore[T] + Resource for lifecycle management
- Multiple stores can share same database with different tables
- Tables created automatically on first save
- Tests are optional - skip gracefully if PostgreSQL not available
- Added psycopg2-binary and types-psycopg2 dependencies
- Includes reset_db() helper for simple migrations (drop/recreate)
@leshy leshy requested a review from a team January 21, 2026 14:27
@leshy leshy marked this pull request as draft January 21, 2026 14:27
@greptile-apps
Copy link

greptile-apps bot commented Jan 21, 2026

Greptile Overview

Greptile Summary

This PR introduces a comprehensive refactoring that unifies timestamped data storage across the codebase through a new TimeSeriesStore[T] abstraction.

Key Changes:

  • Created TimeSeriesStore[T] abstract base class with rich API for time-series data (save, find_closest, iterate, stream, etc.)
  • Implemented 5 backend storage options: InMemoryStore (SortedKeyList), SqliteStore, PostgresStore, PickleDirStore, and LegacyPickleStore
  • Replaced TimestampedCollection with InMemoryStore, deleting ~150 lines of redundant code
  • Migrated TF service transform buffers (TBuffer) to extend InMemoryStore with auto-pruning
  • Replaced sensor replay system with LegacyPickleStore for backward compatibility
  • Enhanced get_data() to support nested paths in archives
  • Simplified embedding models to return unified Embedding type

Architecture Benefits:

  • Single abstraction for all timestamped data reduces code duplication
  • Pluggable backends allow easy switching between in-memory, file, and database storage
  • Consistent API across all use cases (TF transforms, sensor replay, embeddings)
  • Observable/streaming support built into base class
  • Proper SQL injection protection in database backends

SQL Injection Issues Resolved:
Previous review comments about SQL injection in sqlite.py and postgres.py have been addressed with _validate_identifier() validation for all table/database names.

Testing:
Comprehensive parametrized test suite covering all backends with 468+ lines of tests. PostgreSQL tests run conditionally when database is available.

Confidence Score: 5/5

  • This PR is safe to merge with high confidence
  • Well-architected refactoring with comprehensive tests, proper security measures, and backward compatibility. Previous SQL injection concerns have been addressed. The abstraction is clean, the migration path is safe, and extensive test coverage validates correctness.
  • No files require special attention

Important Files Changed

Filename Overview
dimos/memory/timeseries/base.py New unified TimeSeriesStore abstraction with clean API for timestamped data storage and replay. Well-designed generic base class with comprehensive streaming support.
dimos/memory/timeseries/inmemory.py In-memory backend using SortedKeyList for O(log n) operations. Clean implementation with efficient binary search for time-based queries.
dimos/memory/timeseries/sqlite.py SQLite backend with proper SQL injection protection via _validate_identifier. Good path resolution supporting LFS and data directory patterns.
dimos/memory/timeseries/postgres.py PostgreSQL backend implementing Resource lifecycle. Proper SQL injection protection and connection management with context managers.
dimos/memory/timeseries/pickledir.py Pickle directory backend with timestamp-named files. Raises error on duplicate timestamps, preventing silent overwrites.
dimos/memory/timeseries/legacy.py Backward-compatible TimedSensorReplay/Storage wrapper. Enables migration from old numbered-file format to new abstraction without breaking existing code.
dimos/protocol/tf/tf.py TF service migrated to InMemoryStore. TBuffer now extends InMemoryStore with auto-pruning. Clean separation of concerns between storage and TF logic.
dimos/types/timestamped.py Removed TimestampedCollection class entirely. TimestampedBufferCollection now thin wrapper around InMemoryStore. Major simplification with ~120 lines deleted.
dimos/utils/testing/replay.py Replaced with shim importing from LegacyPickleStore. Original ~400 line implementation moved to replay_legacy.py for backward compatibility.
dimos/utils/data.py Enhanced get_data to support nested paths in archives. Made _get_data_dir public as get_data_dir for backend path resolution.

Sequence Diagram

sequenceDiagram
    participant App as Application
    participant Store as TimeSeriesStore[T]
    participant Backend as Backend Implementation
    participant Storage as Storage Layer
    
    Note over App,Storage: Saving Data
    App->>Store: save(data)
    Store->>Store: Extract data.ts
    Store->>Backend: _save(timestamp, data)
    Backend->>Storage: Persist to storage
    Storage-->>Backend: Success
    Backend-->>Store: Complete
    
    Note over App,Storage: Finding Closest Match
    App->>Store: find_closest(timestamp, tolerance)
    Store->>Backend: _find_closest_timestamp(ts, tol)
    Backend->>Storage: Query with index/binary search
    Storage-->>Backend: closest_timestamp
    Backend-->>Store: Return timestamp
    Store->>Backend: _load(closest_timestamp)
    Backend->>Storage: Retrieve data
    Storage-->>Backend: data
    Backend-->>Store: Return data
    Store-->>App: Return T
    
    Note over App,Storage: Streaming Replay
    App->>Store: stream(speed=1.0, seek=10.0)
    Store->>Store: Create Observable
    Store->>Backend: iterate_items(seek=10.0)
    loop For each item
        Backend->>Storage: Load next item
        Storage-->>Backend: (timestamp, data)
        Backend-->>Store: Yield item
        Store->>Store: Schedule emission with timing
        Store-->>App: on_next(data)
    end
    Store-->>App: on_completed()
    
    Note over App,Storage: Backend Examples
    Note over Backend: InMemoryStore: SortedKeyList
    Note over Backend: SqliteStore: Indexed DB queries
    Note over Backend: PostgresStore: Distributed storage
    Note over Backend: PickleDirStore: File per timestamp
    Note over Backend: LegacyPickleStore: Backward compat
Loading

Copy link

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

25 files reviewed, 7 comments

Edit Code Review Agent Settings | Greptile

leshy added 15 commits January 22, 2026 11:33
- Validate table/database names in SqliteStore and PostgresStore
  using regex (alphanumeric/underscore, not starting with digit)
- Fix Transform.to_pose() return type using TYPE_CHECKING import
- Add return type annotation to TF.get_pose()
- Fix ambiguous doclink in transports.md
SqliteStore now accepts a name (e.g. "recordings/lidar") that gets
resolved via get_data_dir to data/recordings/lidar.db. Still supports
absolute paths and :memory: for backward compatibility.
- Add TypeVar bound: T = TypeVar("T", bound=Timestamped)
- Simplify save() to always use data.ts (no more optional timestamp)
- Update tests to use SampleData(Timestamped) instead of strings
- SqliteStore accepts str | Path for backward compatibility
# Conflicts:
#	dimos/models/manipulation/contact_graspnet_pytorch/inference.py
Required when cupy/contact_graspnet are installed locally without type stubs.
Resolve conflicts:
- Accept dev's Image refactor (numpy-only, no CudaImage/AbstractImage)
- Keep spatial_db2's psycopg2-binary dep, add dev's annotation-protocol + toolz
- Keep spatial_db2's print_loss_heatmap in benchmark type
- Keep spatial_db2's typed sharpness_barrier signature
- Merge TYPE_CHECKING imports in Transform.py (rerun + PoseStamped)
- Trivial: accept dev's type-ignore formatting (space after comma)
- Add import-untyped to xacro type: ignore comment in mesh_utils.py
- Remove unused record/replay RPC methods from ModuleBase
Remove the Timestamped bound from SensorStore's TypeVar, enabling storage
of arbitrary data types. Timestamps are now provided explicitly via
save(ts, data), with Timestamped convenience methods (save_ts, pipe_save_ts,
consume_stream_ts) as opt-in helpers. iterate_realtime() and stream() now
use stored timestamps instead of data.ts.
leshy and others added 14 commits February 11, 2026 10:43
Fix import sorting in treid.py and suppress B027 for optional warmup()
in base.py.
Remove dead EmbeddingModel.warmup() method. Update go2.py to use
consume_stream_ts() for the new SensorStore API.
save/pipe_save/consume_stream now work with Timestamped data by default.
save_raw/pipe_save_raw/consume_stream_raw take explicit timestamps for
non-Timestamped data.
Implement _delete for InMemoryStore, SqliteStore, PickleDirStore,
PostgresStore (LegacyPickleStore raises NotImplementedError). Fix
find_closest docstring placement and add get/add/prune_old convenience
methods.
…tedKeyList

Replace InMemoryStore's dict + sorted-cache (O(n log n) rebuild on every write)
with SortedKeyList for O(log n) insert, delete, and range queries. Add collection
methods to TimeSeriesStore base: __len__, __iter__, last/last_timestamp, start_ts/
end_ts, time_range, duration, find_before/find_after, slice_by_time. Implement
backing abstract methods (_count, _last_timestamp, _find_before, _find_after) in
all five backends. Performance benchmarks confirm InMemoryStore matches
TimestampedCollection on 100k items.
…nmemory.py

Rename the module to better reflect its purpose. Extract InMemoryStore from
base.py into its own file (inmemory.py) to keep base.py focused on the abstract
TimeSeriesStore class. Update all internal and external imports.
…, store T directly

- Bound T to Timestamped — no more raw/non-Timestamped data paths
- Removed save_raw, pipe_save_raw, consume_stream_raw
- InMemoryStore stores T directly in SortedKeyList (no _Entry wrapper)
- Removed duplicate-check on insert (same semantics as TimestampedCollection)
- Performance now at parity with TimestampedCollection
- Delete TimestampedCollection class (replaced by InMemoryStore)
- Rewrite TimestampedBufferCollection to inherit InMemoryStore
- Remove TBuffer_old dead code from tf.py
- Fix prune_old mutation-during-iteration bug in base.py
- Break circular import with TYPE_CHECKING guard in base.py
- Update Image.py to use public API instead of _items access
- Update tests to use InMemoryStore directly
@leshy leshy changed the title WIP: Good Sensor Store Unified TimeSeriesStore with pluggable backends Feb 11, 2026
@leshy leshy marked this pull request as ready for review February 11, 2026 13:35
@leshy leshy changed the title Unified TimeSeriesStore with pluggable backends Unified TimeSeriesStore with pluggable backends, global rewrite of timed event storage Feb 11, 2026
Copy link

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

42 files reviewed, no comments

Edit Code Review Agent Settings | Greptile

@paul-nechifor
Copy link
Contributor

Personally, I don't like the API. It's similar to what we do with LCM() and others where we don't encode the lifecycle into the API. The API is simple, but doesn't give you enough control.

Sessions

store = SqliteStore("recordings/lidar")
store.save(data)

Without a session, API calls race to create self._conn. You could add a lock. But it just complicates the code to always check the lock and check that self._conn exists. If you have this:

store = SqliteStore("recordings/lidar")
session = store.session()
with session:
    session.save(data)

Then SqliteStoreSession has self._conn as non-None and you never have to check it's there.

Lack of owner

Most systems separate the database from the sessions. This makes it easy to close the database which closes all the sessions.

store = SqliteStore("recordings/lidar")
session = store.session()
store.close()

store knows about all its sessions and can close them. Additionally, without it we can't know if we don't have two SqliteStore objects using the same sqlite file.

Low cohesion in TimeSeriesStore

TimeSeriesStore is a database, a session, and a query set.

Most of the methods belong to the query set. (QuerySet is a Django term, others use Manager or Collection for the object which does the querying.) But the code forces people to extend TimeSeriesStore to get what they need instead of TimeSeriesStore using what it needs.

Example. _save is abstract, but save is provided, even if not wanted.

class TimeSeriesStore(Generic[T], ABC):
    @abstractmethod
    def _save(self, timestamp: float, data: T) -> None: ..

    def save(self, *data: T) -> None:
        for item in data:
            self._save(item.ts, item)

class MyStore(TimeSeriesStore[T]):
    def _save(self, timestamp: float, data: T) -> None: ..

I suggest this:

class Session(Protocol[T]):
    def save(self, timestamp: float, data: T) -> None: ...

class MySession(Session[T]):
    def save(self, timestamp: float, data: T) -> None:
        # implementation
        ...

class TimeSeriesStore(Generic[T])
    def __init__(self, session: Session[T]):
        self._session = session

    def save(self, *data: T) -> None:
        for item in data:
            self._session.save(data)

TimeSeriesStore uses a Session, it doesn't have to force people to inherit from it. It doesn't mix its namespace with whatever inherits from it.

Verbosity

If the concern is that this is too verbose, you can always have helper methods. But if the API mixes concerns too much that's not easy to untangle. Example helper method:

with store_session("sqlite", "/path/to/sensors.db") as store:
    store.save(...)

@leshy
Copy link
Contributor Author

leshy commented Feb 12, 2026

Personally, I don't like the API. It's similar to what we do with LCM() and others where we don't encode the lifecycle into the API. The API is simple, but doesn't give you enough control.

greptile is saying 5/5 paul is saying 0/5, this is a classic good cop bad cop situation

sounds good! will externalize the sessions

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants