Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/bindings-macro/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ mod sym {
symbol!(unique);
symbol!(update);
symbol!(default);
symbol!(event);

symbol!(u8);
symbol!(i8);
Expand Down
22 changes: 22 additions & 0 deletions crates/bindings-macro/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub(crate) struct TableArgs {
scheduled: Option<ScheduledArg>,
name: Ident,
indices: Vec<IndexArg>,
event: Option<Span>,
}

enum TableAccess {
Expand Down Expand Up @@ -71,6 +72,7 @@ impl TableArgs {
let mut scheduled = None;
let mut name = None;
let mut indices = Vec::new();
let mut event = None;
syn::meta::parser(|meta| {
match_meta!(match meta {
sym::public => {
Expand All @@ -91,6 +93,10 @@ impl TableArgs {
check_duplicate(&scheduled, &meta)?;
scheduled = Some(ScheduledArg::parse_meta(meta)?);
}
sym::event => {
check_duplicate(&event, &meta)?;
event = Some(meta.path.span());
}
});
Ok(())
})
Expand All @@ -107,6 +113,7 @@ impl TableArgs {
scheduled,
name,
indices,
event,
})
}
}
Expand Down Expand Up @@ -841,6 +848,18 @@ pub(crate) fn table_impl(mut args: TableArgs, item: &syn::DeriveInput) -> syn::R
);

let table_access = args.access.iter().map(|acc| acc.to_value());
let is_event = args.event.iter().map(|_| {
quote!(
const IS_EVENT: bool = true;
)
});
let can_be_lookup_impl = if args.event.is_none() {
quote! {
impl spacetimedb::query_builder::CanBeLookupTable for #original_struct_ident {}
}
} else {
quote! {}
};
let unique_col_ids = unique_columns.iter().map(|col| col.index);
let primary_col_id = primary_key_column.clone().into_iter().map(|col| col.index);
let sequence_col_ids = sequenced_columns.iter().map(|col| col.index);
Expand Down Expand Up @@ -966,6 +985,7 @@ pub(crate) fn table_impl(mut args: TableArgs, item: &syn::DeriveInput) -> syn::R
const TABLE_NAME: &'static str = #table_name;
// the default value if not specified is Private
#(const TABLE_ACCESS: spacetimedb::table::TableAccess = #table_access;)*
#(#is_event)*
const UNIQUE_COLUMNS: &'static [u16] = &[#(#unique_col_ids),*];
const INDEXES: &'static [spacetimedb::table::IndexDesc<'static>] = &[#(#index_descs),*];
#(const PRIMARY_KEY: Option<u16> = Some(#primary_col_id);)*
Expand Down Expand Up @@ -1077,6 +1097,8 @@ pub(crate) fn table_impl(mut args: TableArgs, item: &syn::DeriveInput) -> syn::R
}
}

#can_be_lookup_impl

};

let table_query_handle_def = quote! {
Expand Down
3 changes: 2 additions & 1 deletion crates/bindings/src/rt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -716,7 +716,8 @@ pub fn register_table<T: Table>() {
.inner
.build_table(T::TABLE_NAME, product_type_ref)
.with_type(TableType::User)
.with_access(T::TABLE_ACCESS);
.with_access(T::TABLE_ACCESS)
.with_event(T::IS_EVENT);

for &col in T::UNIQUE_COLUMNS {
table = table.with_unique_constraint(col);
Expand Down
1 change: 1 addition & 0 deletions crates/bindings/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ pub trait TableInternal: Sized {
const PRIMARY_KEY: Option<u16> = None;
const SEQUENCES: &'static [u16];
const SCHEDULE: Option<ScheduleDesc<'static>> = None;
const IS_EVENT: bool = false;

/// Returns the ID of this table.
fn table_id() -> TableId;
Expand Down
1 change: 1 addition & 0 deletions crates/core/src/vm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,7 @@ pub(crate) mod tests {
access,
None,
None,
false,
),
)?;
let schema = db.schema_for_table_mut(tx, table_id)?;
Expand Down
1 change: 1 addition & 0 deletions crates/datastore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ test = ["spacetimedb-commitlog/test"]
spacetimedb-lib = { path = "../lib", features = ["proptest"] }
spacetimedb-sats = { path = "../sats", features = ["proptest"] }
spacetimedb-commitlog = { path = "../commitlog", features = ["test"] }
spacetimedb-schema = { path = "../schema", features = ["test"] }
Copy link
Contributor Author

@cloutiertyler cloutiertyler Feb 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[dev-dependencies]
spacetimedb-schema = { path = "../schema", features = ["test"] }

This enables the test feature of spacetimedb-schema when running datastore tests. The spacetimedb-schema crate is already a regular dependency of the datastore crate — my change just adds it as a dev-dependency
with the test feature flag enabled.

The test feature gates for_test() constructors on types like TableName, Identifier, and ColumnSchema. The datastore tests use these constructors extensively (e.g., TableName::for_test("Foo") in TableRow
conversions), but the feature wasn't being enabled in test builds — so the tests were already broken on master before our changes.

In short: the datastore already depends on schema. I'm just enabling the test feature so the existing test helpers compile.

I am not sure why claude is insistent on this. Could someone with knowledge verify that this is true and necessary?


# Also as dev-dependencies for use in _this_ crate's tests.
proptest.workspace = true
Expand Down
31 changes: 29 additions & 2 deletions crates/datastore/src/locking_tx_datastore/committed_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ use crate::{
use crate::{
locking_tx_datastore::ViewCallInfo,
system_tables::{
ST_CONNECTION_CREDENTIALS_ID, ST_CONNECTION_CREDENTIALS_IDX, ST_VIEW_COLUMN_ID, ST_VIEW_COLUMN_IDX, ST_VIEW_ID,
ST_VIEW_IDX, ST_VIEW_PARAM_ID, ST_VIEW_PARAM_IDX, ST_VIEW_SUB_ID, ST_VIEW_SUB_IDX,
ST_CONNECTION_CREDENTIALS_ID, ST_CONNECTION_CREDENTIALS_IDX, ST_EVENT_TABLE_ID, ST_EVENT_TABLE_IDX,
ST_VIEW_COLUMN_ID, ST_VIEW_COLUMN_IDX, ST_VIEW_ID, ST_VIEW_IDX, ST_VIEW_PARAM_ID, ST_VIEW_PARAM_IDX,
ST_VIEW_SUB_ID, ST_VIEW_SUB_IDX,
},
};
use anyhow::anyhow;
Expand Down Expand Up @@ -471,6 +472,8 @@ impl CommittedState {
self.create_table(ST_VIEW_SUB_ID, schemas[ST_VIEW_SUB_IDX].clone());
self.create_table(ST_VIEW_ARG_ID, schemas[ST_VIEW_ARG_IDX].clone());

self.create_table(ST_EVENT_TABLE_ID, schemas[ST_EVENT_TABLE_IDX].clone());

// Insert the sequences into `st_sequences`
let (st_sequences, blob_store, pool) =
self.get_table_and_blob_store_or_create(ST_SEQUENCE_ID, &schemas[ST_SEQUENCE_IDX]);
Expand Down Expand Up @@ -617,6 +620,12 @@ impl CommittedState {
schema: &Arc<TableSchema>,
row: &ProductValue,
) -> Result<()> {
// Event table rows in the commitlog are preserved for future replay features
// but don't rebuild state — event tables have no committed state.
if schema.is_event {
return Ok(());
}

let (table, blob_store, pool) = self.get_table_and_blob_store_or_create(table_id, schema);

let (_, row_ref) = match table.insert(pool, blob_store, row) {
Expand Down Expand Up @@ -1187,6 +1196,24 @@ impl CommittedState {
// and the fullness of the page.

for (table_id, tx_table) in insert_tables {
// Event tables: record in TxData for commitlog persistence and subscription dispatch,
// but do NOT merge into committed state.
// NOTE: There is no need to call `get_table_and_blob_store_or_create` here.
// The logic for collecting inserts is duplicated, but it's cleaner this way.
if tx_table.get_schema().is_event {
let mut inserts = Vec::with_capacity(tx_table.row_count as usize);
for row_ref in tx_table.scan_rows(&tx_blob_store) {
inserts.push(row_ref.to_product_value());
}
if !inserts.is_empty() {
let table_name = &tx_table.get_schema().table_name;
tx_data.set_inserts_for_table(table_id, table_name, inserts.into());
}
let (_schema, _indexes, pages) = tx_table.consume_for_merge();
self.page_pool.put_many(pages);
continue;
}

let (commit_table, commit_blob_store, page_pool) =
self.get_table_and_blob_store_or_create(table_id, tx_table.get_schema());

Expand Down
141 changes: 134 additions & 7 deletions crates/datastore/src/locking_tx_datastore/datastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1278,12 +1278,13 @@ mod tests {
use crate::locking_tx_datastore::tx_state::PendingSchemaChange;
use crate::system_tables::{
system_tables, StColumnRow, StConnectionCredentialsFields, StConstraintData, StConstraintFields,
StConstraintRow, StIndexAlgorithm, StIndexFields, StIndexRow, StRowLevelSecurityFields, StScheduledFields,
StSequenceFields, StSequenceRow, StTableRow, StVarFields, StViewArgFields, StViewFields, ST_CLIENT_ID,
ST_CLIENT_NAME, ST_COLUMN_ID, ST_COLUMN_NAME, ST_CONNECTION_CREDENTIALS_ID, ST_CONNECTION_CREDENTIALS_NAME,
ST_CONSTRAINT_ID, ST_CONSTRAINT_NAME, ST_INDEX_ID, ST_INDEX_NAME, ST_MODULE_NAME, ST_RESERVED_SEQUENCE_RANGE,
ST_ROW_LEVEL_SECURITY_ID, ST_ROW_LEVEL_SECURITY_NAME, ST_SCHEDULED_ID, ST_SCHEDULED_NAME, ST_SEQUENCE_ID,
ST_SEQUENCE_NAME, ST_TABLE_NAME, ST_VAR_ID, ST_VAR_NAME, ST_VIEW_ARG_ID, ST_VIEW_ARG_NAME, ST_VIEW_COLUMN_ID,
StConstraintRow, StEventTableFields, StIndexAlgorithm, StIndexFields, StIndexRow, StRowLevelSecurityFields,
StScheduledFields, StSequenceFields, StSequenceRow, StTableRow, StVarFields, StViewArgFields, StViewFields,
ST_CLIENT_ID, ST_CLIENT_NAME, ST_COLUMN_ID, ST_COLUMN_NAME, ST_CONNECTION_CREDENTIALS_ID,
ST_CONNECTION_CREDENTIALS_NAME, ST_CONSTRAINT_ID, ST_CONSTRAINT_NAME, ST_EVENT_TABLE_ID, ST_EVENT_TABLE_NAME,
ST_INDEX_ID, ST_INDEX_NAME, ST_MODULE_NAME, ST_RESERVED_SEQUENCE_RANGE, ST_ROW_LEVEL_SECURITY_ID,
ST_ROW_LEVEL_SECURITY_NAME, ST_SCHEDULED_ID, ST_SCHEDULED_NAME, ST_SEQUENCE_ID, ST_SEQUENCE_NAME,
ST_TABLE_NAME, ST_VAR_ID, ST_VAR_NAME, ST_VIEW_ARG_ID, ST_VIEW_ARG_NAME, ST_VIEW_COLUMN_ID,
ST_VIEW_COLUMN_NAME, ST_VIEW_ID, ST_VIEW_NAME, ST_VIEW_PARAM_ID, ST_VIEW_PARAM_NAME, ST_VIEW_SUB_ID,
ST_VIEW_SUB_NAME,
};
Expand Down Expand Up @@ -1614,6 +1615,7 @@ mod tests {
StAccess::Public,
schedule,
pk,
false,
)
}

Expand Down Expand Up @@ -1746,7 +1748,7 @@ mod tests {
TableRow { id: ST_VIEW_COLUMN_ID.into(), name: ST_VIEW_COLUMN_NAME, ty: StTableType::System, access: StAccess::Public, primary_key: None },
TableRow { id: ST_VIEW_SUB_ID.into(), name: ST_VIEW_SUB_NAME, ty: StTableType::System, access: StAccess::Public, primary_key: None },
TableRow { id: ST_VIEW_ARG_ID.into(), name: ST_VIEW_ARG_NAME, ty: StTableType::System, access: StAccess::Public, primary_key: Some(StViewArgFields::Id.into()) },

TableRow { id: ST_EVENT_TABLE_ID.into(), name: ST_EVENT_TABLE_NAME, ty: StTableType::System, access: StAccess::Public, primary_key: Some(StEventTableFields::TableId.into()) },

]));
#[rustfmt::skip]
Expand Down Expand Up @@ -1832,6 +1834,8 @@ mod tests {

ColRow { table: ST_VIEW_ARG_ID.into(), pos: 0, name: "id", ty: AlgebraicType::U64 },
ColRow { table: ST_VIEW_ARG_ID.into(), pos: 1, name: "bytes", ty: AlgebraicType::bytes() },

ColRow { table: ST_EVENT_TABLE_ID.into(), pos: 0, name: "table_id", ty: TableId::get_type() },
]));
#[rustfmt::skip]
assert_eq!(query.scan_st_indexes()?, map_array([
Expand All @@ -1857,6 +1861,7 @@ mod tests {
IndexRow { id: 20, table: ST_VIEW_SUB_ID.into(), col: col_list![0, 1, 2], name: "st_view_sub_view_id_arg_id_identity_idx_btree", },
IndexRow { id: 21, table: ST_VIEW_ARG_ID.into(), col: col(0), name: "st_view_arg_id_idx_btree", },
IndexRow { id: 22, table: ST_VIEW_ARG_ID.into(), col: col(1), name: "st_view_arg_bytes_idx_btree", },
IndexRow { id: 23, table: ST_EVENT_TABLE_ID.into(), col: col(0), name: "st_event_table_table_id_idx_btree", },
]));
let start = ST_RESERVED_SEQUENCE_RANGE as i128 + 1;
#[rustfmt::skip]
Expand Down Expand Up @@ -1895,6 +1900,7 @@ mod tests {
ConstraintRow { constraint_id: 16, table_id: ST_VIEW_COLUMN_ID.into(), unique_columns: col_list![0, 1], constraint_name: "st_view_column_view_id_col_pos_key", },
ConstraintRow { constraint_id: 17, table_id: ST_VIEW_ARG_ID.into(), unique_columns: col(0), constraint_name: "st_view_arg_id_key", },
ConstraintRow { constraint_id: 18, table_id: ST_VIEW_ARG_ID.into(), unique_columns: col(1), constraint_name: "st_view_arg_bytes_key", },
ConstraintRow { constraint_id: 19, table_id: ST_EVENT_TABLE_ID.into(), unique_columns: col(0), constraint_name: "st_event_table_table_id_key", },
]));

// Verify we get back the tables correctly with the proper ids...
Expand Down Expand Up @@ -2320,6 +2326,7 @@ mod tests {
IndexRow { id: 20, table: ST_VIEW_SUB_ID.into(), col: col_list![0, 1, 2], name: "st_view_sub_view_id_arg_id_identity_idx_btree", },
IndexRow { id: 21, table: ST_VIEW_ARG_ID.into(), col: col(0), name: "st_view_arg_id_idx_btree", },
IndexRow { id: 22, table: ST_VIEW_ARG_ID.into(), col: col(1), name: "st_view_arg_bytes_idx_btree", },
IndexRow { id: 23, table: ST_EVENT_TABLE_ID.into(), col: col(0), name: "st_event_table_table_id_idx_btree", },
IndexRow { id: seq_start, table: FIRST_NON_SYSTEM_ID, col: col(0), name: "Foo_id_idx_btree", },
IndexRow { id: seq_start + 1, table: FIRST_NON_SYSTEM_ID, col: col(1), name: "Foo_name_idx_btree", },
IndexRow { id: seq_start + 2, table: FIRST_NON_SYSTEM_ID, col: col(2), name: "Foo_age_idx_btree", },
Expand Down Expand Up @@ -3705,4 +3712,124 @@ mod tests {
assert!(!metrics.committed);
Ok(())
}

/// Create an event table with the basic schema (id: u32, name: String, age: u32).
fn setup_event_table() -> ResultTest<(Locking, MutTxId, TableId)> {
let datastore = get_datastore()?;
let mut tx = begin_mut_tx(&datastore);
let mut schema = basic_table_schema_with_indices(basic_indices(), basic_constraints());
schema.is_event = true;
let table_id = datastore.create_table_mut_tx(&mut tx, schema)?;
Ok((datastore, tx, table_id))
}

#[test]
fn test_event_table_insert_delete_noop() -> ResultTest<()> {
let (datastore, tx, table_id) = setup_event_table()?;
// Commit the table-creation tx first.
commit(&datastore, tx)?;

let mut tx = begin_mut_tx(&datastore);
let row = u32_str_u32(1, "Alice", 30);
insert(&datastore, &mut tx, table_id, &row)?;
datastore.delete_by_rel_mut_tx(&mut tx, table_id, [row]);

let tx_data = commit(&datastore, tx)?;

// Insert+delete in same tx should cancel out: no TxData entry for this table.
assert!(
tx_data.inserts_for_table(table_id).is_none(),
"insert+delete should cancel: no inserts in TxData"
);
assert!(
tx_data.deletes_for_table(table_id).is_none(),
"insert+delete should cancel: no deletes in TxData"
);

// Committed state should be empty for event tables.
let tx = begin_mut_tx(&datastore);
assert_eq!(all_rows(&datastore, &tx, table_id).len(), 0);
Ok(())
}

#[test]
fn test_event_table_update_only_final_row() -> ResultTest<()> {
let (datastore, tx, table_id) = setup_event_table()?;
// Commit the table-creation tx first.
commit(&datastore, tx)?;

let mut tx = begin_mut_tx(&datastore);
let row_v1 = u32_str_u32(1, "Alice", 30);
insert(&datastore, &mut tx, table_id, &row_v1)?;

// Update via the index on column 0 (id) — replaces the row with same PK.
let idx = extract_index_id(&datastore, &tx, &basic_indices()[0])?;
let row_v2 = u32_str_u32(1, "Alice", 31);
update(&datastore, &mut tx, table_id, idx, &row_v2)?;

let tx_data = commit(&datastore, tx)?;

// The update replaces the original insert, so TxData should have only the final row.
let inserts = tx_data.inserts_for_table(table_id).expect("should have inserts");
assert_eq!(inserts.len(), 1, "update should leave exactly 1 insert");
assert_eq!(inserts[0], row_v2);

// Committed state should still be empty for event tables.
let tx = begin_mut_tx(&datastore);
assert_eq!(all_rows(&datastore, &tx, table_id).len(), 0);
Ok(())
}

#[test]
fn test_event_table_insert_records_txdata_not_committed_state() -> ResultTest<()> {
let (datastore, tx, table_id) = setup_event_table()?;
// Commit the table-creation tx first.
commit(&datastore, tx)?;

let mut tx = begin_mut_tx(&datastore);
let row = u32_str_u32(1, "Bob", 25);
insert(&datastore, &mut tx, table_id, &row)?;

let tx_data = commit(&datastore, tx)?;

// TxData should record the insert.
let inserts = tx_data
.inserts_for_table(table_id)
.expect("event table insert should appear in TxData");
assert_eq!(inserts.len(), 1);
assert_eq!(inserts[0], row);

// But committed state should be empty.
let tx = begin_mut_tx(&datastore);
assert_eq!(all_rows(&datastore, &tx, table_id).len(), 0);
Ok(())
}

#[test]
fn test_event_table_replay_ignores_inserts() -> ResultTest<()> {
let (datastore, tx, table_id) = setup_event_table()?;
// Commit the table-creation tx so the schema exists.
commit(&datastore, tx)?;

// Get the schema for this event table.
let tx = begin_mut_tx(&datastore);
let schema = datastore.schema_for_table_mut_tx(&tx, table_id)?;
let _ = datastore.rollback_mut_tx(tx);

// Directly call replay_insert on committed state.
let row = u32_str_u32(1, "Carol", 40);
{
let mut committed_state = datastore.committed_state.write();
committed_state.replay_insert(table_id, &schema, &row)?;
}

// After replay, the event table should still have no committed rows.
let tx = begin_mut_tx(&datastore);
assert_eq!(
all_rows(&datastore, &tx, table_id).len(),
0,
"replay_insert should be a no-op for event tables"
);
Ok(())
}
}
Loading
Loading