From a1f9217e1af7c7f9c22c1553b0cc42eee5c5a4ff Mon Sep 17 00:00:00 2001 From: = Date: Thu, 5 Feb 2026 19:55:04 -0500 Subject: [PATCH 1/4] Implement event tables (V10 only, no V9 changes) --- crates/bindings-macro/src/lib.rs | 1 + crates/bindings-macro/src/table.rs | 18 ++++++ crates/bindings/src/rt.rs | 3 +- crates/bindings/src/table.rs | 1 + crates/core/src/vm.rs | 1 + .../locking_tx_datastore/committed_state.rs | 24 ++++++++ .../src/locking_tx_datastore/datastore.rs | 1 + .../src/locking_tx_datastore/mut_tx.rs | 13 ++++- .../src/locking_tx_datastore/state_view.rs | 11 +++- crates/datastore/src/system_tables.rs | 58 ++++++++++++++++++- crates/lib/src/db/raw_def/v10.rs | 14 +++++ crates/physical-plan/src/plan.rs | 1 + crates/query-builder/src/join.rs | 10 ++-- crates/query-builder/src/table.rs | 5 ++ crates/schema/src/def.rs | 8 +++ crates/schema/src/def/validate/v10.rs | 2 + crates/schema/src/def/validate/v9.rs | 1 + crates/schema/src/schema.rs | 10 ++++ modules/module-test/src/lib.rs | 11 ++++ 19 files changed, 181 insertions(+), 12 deletions(-) diff --git a/crates/bindings-macro/src/lib.rs b/crates/bindings-macro/src/lib.rs index b06e98ab8f9..428573be84c 100644 --- a/crates/bindings-macro/src/lib.rs +++ b/crates/bindings-macro/src/lib.rs @@ -61,6 +61,7 @@ mod sym { symbol!(unique); symbol!(update); symbol!(default); + symbol!(event); symbol!(u8); symbol!(i8); diff --git a/crates/bindings-macro/src/table.rs b/crates/bindings-macro/src/table.rs index 7bc2940d548..2076ece600d 100644 --- a/crates/bindings-macro/src/table.rs +++ b/crates/bindings-macro/src/table.rs @@ -19,6 +19,7 @@ pub(crate) struct TableArgs { scheduled: Option, name: Ident, indices: Vec, + event: Option, } enum TableAccess { @@ -71,6 +72,7 @@ impl TableArgs { let mut scheduled = None; let mut name = None; let mut indices = Vec::new(); + let mut event = None; syn::meta::parser(|meta| { match_meta!(match meta { sym::public => { @@ -91,6 +93,10 @@ impl TableArgs { check_duplicate(&scheduled, &meta)?; scheduled = Some(ScheduledArg::parse_meta(meta)?); } + sym::event => { + check_duplicate(&event, &meta)?; + event = Some(meta.path.span()); + } }); Ok(()) }) @@ -107,6 +113,7 @@ impl TableArgs { scheduled, name, indices, + event, }) } } @@ -841,6 +848,14 @@ pub(crate) fn table_impl(mut args: TableArgs, item: &syn::DeriveInput) -> syn::R ); let table_access = args.access.iter().map(|acc| acc.to_value()); + let is_event = args.event.iter().map(|_| quote!(const IS_EVENT: bool = true;)); + let can_be_lookup_impl = if args.event.is_none() { + quote! { + impl spacetimedb::query_builder::CanBeLookupTable for #original_struct_ident {} + } + } else { + quote! {} + }; let unique_col_ids = unique_columns.iter().map(|col| col.index); let primary_col_id = primary_key_column.clone().into_iter().map(|col| col.index); let sequence_col_ids = sequenced_columns.iter().map(|col| col.index); @@ -966,6 +981,7 @@ pub(crate) fn table_impl(mut args: TableArgs, item: &syn::DeriveInput) -> syn::R const TABLE_NAME: &'static str = #table_name; // the default value if not specified is Private #(const TABLE_ACCESS: spacetimedb::table::TableAccess = #table_access;)* + #(#is_event)* const UNIQUE_COLUMNS: &'static [u16] = &[#(#unique_col_ids),*]; const INDEXES: &'static [spacetimedb::table::IndexDesc<'static>] = &[#(#index_descs),*]; #(const PRIMARY_KEY: Option = Some(#primary_col_id);)* @@ -1077,6 +1093,8 @@ pub(crate) fn table_impl(mut args: TableArgs, item: &syn::DeriveInput) -> syn::R } } + #can_be_lookup_impl + }; let table_query_handle_def = quote! { diff --git a/crates/bindings/src/rt.rs b/crates/bindings/src/rt.rs index cea7adfbbec..287b2efde74 100644 --- a/crates/bindings/src/rt.rs +++ b/crates/bindings/src/rt.rs @@ -716,7 +716,8 @@ pub fn register_table() { .inner .build_table(T::TABLE_NAME, product_type_ref) .with_type(TableType::User) - .with_access(T::TABLE_ACCESS); + .with_access(T::TABLE_ACCESS) + .with_event(T::IS_EVENT); for &col in T::UNIQUE_COLUMNS { table = table.with_unique_constraint(col); diff --git a/crates/bindings/src/table.rs b/crates/bindings/src/table.rs index d7cbdd9b39a..f64d815b862 100644 --- a/crates/bindings/src/table.rs +++ b/crates/bindings/src/table.rs @@ -128,6 +128,7 @@ pub trait TableInternal: Sized { const PRIMARY_KEY: Option = None; const SEQUENCES: &'static [u16]; const SCHEDULE: Option> = None; + const IS_EVENT: bool = false; /// Returns the ID of this table. fn table_id() -> TableId; diff --git a/crates/core/src/vm.rs b/crates/core/src/vm.rs index d8bb27a1caa..e995cf72cb3 100644 --- a/crates/core/src/vm.rs +++ b/crates/core/src/vm.rs @@ -699,6 +699,7 @@ pub(crate) mod tests { access, None, None, + false, ), )?; let schema = db.schema_for_table_mut(tx, table_id)?; diff --git a/crates/datastore/src/locking_tx_datastore/committed_state.rs b/crates/datastore/src/locking_tx_datastore/committed_state.rs index b597f742336..2440de4fbef 100644 --- a/crates/datastore/src/locking_tx_datastore/committed_state.rs +++ b/crates/datastore/src/locking_tx_datastore/committed_state.rs @@ -617,6 +617,12 @@ impl CommittedState { schema: &Arc, row: &ProductValue, ) -> Result<()> { + // Event table rows in the commitlog are preserved for future replay features + // but don't rebuild state — event tables have no committed state. + if schema.is_event { + return Ok(()); + } + let (table, blob_store, pool) = self.get_table_and_blob_store_or_create(table_id, schema); let (_, row_ref) = match table.insert(pool, blob_store, row) { @@ -1187,6 +1193,24 @@ impl CommittedState { // and the fullness of the page. for (table_id, tx_table) in insert_tables { + // Event tables: record in TxData for commitlog persistence and subscription dispatch, + // but do NOT merge into committed state. + // NOTE: There is no need to call `get_table_and_blob_store_or_create` here. + // The logic for collecting inserts is duplicated, but it's cleaner this way. + if tx_table.get_schema().is_event { + let mut inserts = Vec::with_capacity(tx_table.row_count as usize); + for row_ref in tx_table.scan_rows(&tx_blob_store) { + inserts.push(row_ref.to_product_value()); + } + if !inserts.is_empty() { + let table_name = &tx_table.get_schema().table_name; + tx_data.set_inserts_for_table(table_id, table_name, inserts.into()); + } + let (_schema, _indexes, pages) = tx_table.consume_for_merge(); + self.page_pool.put_many(pages); + continue; + } + let (commit_table, commit_blob_store, page_pool) = self.get_table_and_blob_store_or_create(table_id, tx_table.get_schema()); diff --git a/crates/datastore/src/locking_tx_datastore/datastore.rs b/crates/datastore/src/locking_tx_datastore/datastore.rs index 48361dbf933..5961fe111e7 100644 --- a/crates/datastore/src/locking_tx_datastore/datastore.rs +++ b/crates/datastore/src/locking_tx_datastore/datastore.rs @@ -1614,6 +1614,7 @@ mod tests { StAccess::Public, schedule, pk, + false, ) } diff --git a/crates/datastore/src/locking_tx_datastore/mut_tx.rs b/crates/datastore/src/locking_tx_datastore/mut_tx.rs index 71bf3bda615..0b9c8e5ec38 100644 --- a/crates/datastore/src/locking_tx_datastore/mut_tx.rs +++ b/crates/datastore/src/locking_tx_datastore/mut_tx.rs @@ -22,9 +22,9 @@ use crate::{ system_tables::{ with_sys_table_buf, StClientFields, StClientRow, StColumnFields, StColumnRow, StConstraintFields, StConstraintRow, StFields as _, StIndexFields, StIndexRow, StRowLevelSecurityFields, StRowLevelSecurityRow, - StScheduledFields, StScheduledRow, StSequenceFields, StSequenceRow, StTableFields, StTableRow, SystemTable, - ST_CLIENT_ID, ST_COLUMN_ID, ST_CONSTRAINT_ID, ST_INDEX_ID, ST_ROW_LEVEL_SECURITY_ID, ST_SCHEDULED_ID, - ST_SEQUENCE_ID, ST_TABLE_ID, + StEventTableRow, StScheduledFields, StScheduledRow, StSequenceFields, StSequenceRow, StTableFields, StTableRow, + SystemTable, ST_CLIENT_ID, ST_COLUMN_ID, ST_CONSTRAINT_ID, ST_EVENT_TABLE_ID, ST_INDEX_ID, + ST_ROW_LEVEL_SECURITY_ID, ST_SCHEDULED_ID, ST_SEQUENCE_ID, ST_TABLE_ID, }, }; use crate::{execution_context::ExecutionContext, system_tables::StViewColumnRow}; @@ -656,6 +656,7 @@ impl MutTxId { self.insert_st_column(table_schema.columns())?; let schedule = table_schema.schedule.clone(); + let is_event = table_schema.is_event; let mut schema_internal = table_schema; // Extract all indexes, constraints, and sequences from the schema. // We will add them back later with correct ids. @@ -685,6 +686,12 @@ impl MutTxId { table.with_mut_schema(|s| s.schedule.as_mut().unwrap().schedule_id = id); } + // Insert into st_event_table if this is an event table. + if is_event { + let row = StEventTableRow { table_id }; + self.insert_via_serialize_bsatn(ST_EVENT_TABLE_ID, &row)?; + } + // Create the indexes for the table. for index in indices { let col_set = ColSet::from(index.index_algorithm.columns()); diff --git a/crates/datastore/src/locking_tx_datastore/state_view.rs b/crates/datastore/src/locking_tx_datastore/state_view.rs index 1c7401c9e70..4ce84e0c602 100644 --- a/crates/datastore/src/locking_tx_datastore/state_view.rs +++ b/crates/datastore/src/locking_tx_datastore/state_view.rs @@ -6,8 +6,8 @@ use crate::system_tables::{ ConnectionIdViaU128, StColumnFields, StColumnRow, StConnectionCredentialsFields, StConnectionCredentialsRow, StConstraintFields, StConstraintRow, StIndexFields, StIndexRow, StScheduledFields, StScheduledRow, StSequenceFields, StSequenceRow, StTableFields, StTableRow, StViewFields, StViewParamFields, StViewRow, - SystemTable, ST_COLUMN_ID, ST_CONNECTION_CREDENTIALS_ID, ST_CONSTRAINT_ID, ST_INDEX_ID, ST_SCHEDULED_ID, - ST_SEQUENCE_ID, ST_TABLE_ID, ST_VIEW_ID, ST_VIEW_PARAM_ID, + StEventTableFields, SystemTable, ST_COLUMN_ID, ST_CONNECTION_CREDENTIALS_ID, ST_CONSTRAINT_ID, + ST_EVENT_TABLE_ID, ST_INDEX_ID, ST_SCHEDULED_ID, ST_SEQUENCE_ID, ST_TABLE_ID, ST_VIEW_ID, ST_VIEW_PARAM_ID, }; use anyhow::anyhow; use core::ops::RangeBounds; @@ -159,6 +159,12 @@ pub trait StateView { .unwrap_or(None) .transpose()?; + // Check if this table is an event table by looking up st_event_table. + let is_event = self + .iter_by_col_eq(ST_EVENT_TABLE_ID, StEventTableFields::TableId, value_eq) + .map(|mut iter| iter.next().is_some()) + .unwrap_or(false); + Ok(TableSchema::new( table_id, table_name, @@ -171,6 +177,7 @@ pub trait StateView { table_access, schedule, table_primary_key, + is_event, )) } diff --git a/crates/datastore/src/system_tables.rs b/crates/datastore/src/system_tables.rs index 947e8bcb0d0..c345283d14e 100644 --- a/crates/datastore/src/system_tables.rs +++ b/crates/datastore/src/system_tables.rs @@ -80,6 +80,8 @@ pub const ST_VIEW_COLUMN_ID: TableId = TableId(14); pub const ST_VIEW_SUB_ID: TableId = TableId(15); /// The static ID of the table that tracks view arguments pub const ST_VIEW_ARG_ID: TableId = TableId(16); +/// The static ID of the table that tracks which tables are event tables +pub const ST_EVENT_TABLE_ID: TableId = TableId(17); pub(crate) const ST_CONNECTION_CREDENTIALS_NAME: &str = "st_connection_credentials"; pub const ST_TABLE_NAME: &str = "st_table"; @@ -97,6 +99,7 @@ pub(crate) const ST_VIEW_PARAM_NAME: &str = "st_view_param"; pub(crate) const ST_VIEW_COLUMN_NAME: &str = "st_view_column"; pub(crate) const ST_VIEW_SUB_NAME: &str = "st_view_sub"; pub(crate) const ST_VIEW_ARG_NAME: &str = "st_view_arg"; +pub(crate) const ST_EVENT_TABLE_NAME: &str = "st_event_table"; /// Reserved range of sequence values used for system tables. /// /// Ids for user-created tables will start at `ST_RESERVED_SEQUENCE_RANGE`. @@ -166,6 +169,10 @@ pub fn is_built_in_meta_row(table_id: TableId, row: &ProductValue) -> Result false, // We don't define any system views, so none of the view-related tables can be system meta-descriptors. ST_VIEW_ID | ST_VIEW_PARAM_ID | ST_VIEW_COLUMN_ID | ST_VIEW_SUB_ID | ST_VIEW_ARG_ID => false, + ST_EVENT_TABLE_ID => { + let row: StEventTableRow = to_typed_row(row)?; + table_id_is_reserved(row.table_id) + } TableId(..ST_RESERVED_SEQUENCE_RANGE) => { log::warn!("Unknown system table {table_id:?}"); false @@ -187,7 +194,7 @@ pub enum SystemTable { st_row_level_security, } -pub fn system_tables() -> [TableSchema; 16] { +pub fn system_tables() -> [TableSchema; 17] { [ // The order should match the `id` of the system table, that start with [ST_TABLE_IDX]. st_table_schema(), @@ -206,6 +213,7 @@ pub fn system_tables() -> [TableSchema; 16] { st_view_column_schema(), st_view_sub_schema(), st_view_arg_schema(), + st_event_table_schema(), ] } @@ -250,6 +258,7 @@ pub(crate) const ST_VIEW_PARAM_IDX: usize = 12; pub(crate) const ST_VIEW_COLUMN_IDX: usize = 13; pub(crate) const ST_VIEW_SUB_IDX: usize = 14; pub(crate) const ST_VIEW_ARG_IDX: usize = 15; +pub(crate) const ST_EVENT_TABLE_IDX: usize = 16; macro_rules! st_fields_enum { ($(#[$attr:meta])* enum $ty_name:ident { $($name:expr, $var:ident = $discr:expr,)* }) => { @@ -404,6 +413,10 @@ st_fields_enum!(enum StScheduledFields { "at_column", AtColumn = 4, }); +st_fields_enum!(enum StEventTableFields { + "table_id", TableId = 0, +}); + /// Helper method to check that a system table has the correct fields. /// Does not check field types since those aren't included in `StFields` types. /// If anything in here is not true, the system is completely broken, so it's fine to assert. @@ -567,6 +580,14 @@ fn system_module_def() -> ModuleDef { .with_index_no_accessor_name(btree(StVarFields::Name)) .with_primary_key(StVarFields::Name); + let st_event_table_type = builder.add_type::(); + builder + .build_table(ST_EVENT_TABLE_NAME, *st_event_table_type.as_ref().expect("should be ref")) + .with_type(TableType::System) + .with_primary_key(StEventTableFields::TableId) + .with_unique_constraint(StEventTableFields::TableId) + .with_index_no_accessor_name(btree(StEventTableFields::TableId)); + let result = builder .finish() .try_into() @@ -588,6 +609,7 @@ fn system_module_def() -> ModuleDef { validate_system_table::(&result, ST_VIEW_COLUMN_NAME); validate_system_table::(&result, ST_VIEW_SUB_NAME); validate_system_table::(&result, ST_VIEW_ARG_NAME); + validate_system_table::(&result, ST_EVENT_TABLE_NAME); result } @@ -629,6 +651,7 @@ lazy_static::lazy_static! { m.insert("st_view_column_view_id_col_pos_key", ConstraintId(16)); m.insert("st_view_arg_id_key", ConstraintId(17)); m.insert("st_view_arg_bytes_key", ConstraintId(18)); + m.insert("st_event_table_table_id_key", ConstraintId(19)); m }; } @@ -660,6 +683,7 @@ lazy_static::lazy_static! { m.insert("st_view_sub_view_id_arg_id_identity_idx_btree", IndexId(20)); m.insert("st_view_arg_id_idx_btree", IndexId(21)); m.insert("st_view_arg_bytes_idx_btree", IndexId(22)); + m.insert("st_event_table_table_id_idx_btree", IndexId(23)); m }; } @@ -796,6 +820,10 @@ pub fn st_view_arg_schema() -> TableSchema { st_schema(ST_VIEW_ARG_NAME, ST_VIEW_ARG_ID) } +fn st_event_table_schema() -> TableSchema { + st_schema(ST_EVENT_TABLE_NAME, ST_EVENT_TABLE_ID) +} + /// If `table_id` refers to a known system table, return its schema. /// /// Used when restoring from a snapshot; system tables are reinstantiated with this schema, @@ -820,6 +848,7 @@ pub(crate) fn system_table_schema(table_id: TableId) -> Option { ST_VIEW_COLUMN_ID => Some(st_view_column_schema()), ST_VIEW_SUB_ID => Some(st_view_sub_schema()), ST_VIEW_ARG_ID => Some(st_view_arg_schema()), + ST_EVENT_TABLE_ID => Some(st_event_table_schema()), _ => None, } } @@ -1636,6 +1665,33 @@ impl From for ScheduleSchema { } } +/// System Table [ST_EVENT_TABLE_NAME] +/// +/// Tracks which tables are event tables. +/// Event tables persist to commitlog but are not merged into committed state. +/// +/// | table_id | +/// |----------| +/// | 4097 | +#[derive(Debug, Clone, PartialEq, Eq, SpacetimeType)] +#[sats(crate = spacetimedb_lib)] +pub struct StEventTableRow { + pub(crate) table_id: TableId, +} + +impl TryFrom> for StEventTableRow { + type Error = DatastoreError; + fn try_from(row: RowRef<'_>) -> Result { + read_via_bsatn(row) + } +} + +impl From for ProductValue { + fn from(x: StEventTableRow) -> Self { + to_product_value(&x) + } +} + thread_local! { static READ_BUF: RefCell> = const { RefCell::new(Vec::new()) }; } diff --git a/crates/lib/src/db/raw_def/v10.rs b/crates/lib/src/db/raw_def/v10.rs index 7474b6bedfe..6c2daf72b97 100644 --- a/crates/lib/src/db/raw_def/v10.rs +++ b/crates/lib/src/db/raw_def/v10.rs @@ -139,6 +139,13 @@ pub struct RawTableDefV10 { /// Default values for columns in this table. pub default_values: Vec, + + /// Whether this is an event table. + /// + /// Event tables are write-only: their rows are persisted to the commitlog + /// but are NOT merged into committed state. They are only visible to V2 + /// subscribers in the transaction that inserted them. + pub is_event: bool, } /// Marks a particular table column as having a particular default value. @@ -687,6 +694,7 @@ impl RawModuleDefV10Builder { table_type: TableType::User, table_access: TableAccess::Public, default_values: vec![], + is_event: false, }, } } @@ -1044,6 +1052,12 @@ impl RawTableDefBuilderV10<'_> { self } + /// Sets whether this table is an event table. + pub fn with_event(mut self, is_event: bool) -> Self { + self.table.is_event = is_event; + self + } + /// Generates a `RawConstraintDefV10` using the supplied `columns`. pub fn with_unique_constraint(mut self, columns: impl Into) -> Self { let columns = columns.into(); diff --git a/crates/physical-plan/src/plan.rs b/crates/physical-plan/src/plan.rs index b1572040f72..92593e293f8 100644 --- a/crates/physical-plan/src/plan.rs +++ b/crates/physical-plan/src/plan.rs @@ -1510,6 +1510,7 @@ mod tests { StAccess::Public, None, primary_key.map(ColId::from), + false, ))) } diff --git a/crates/query-builder/src/join.rs b/crates/query-builder/src/join.rs index 3240d93c32a..82c58ab245f 100644 --- a/crates/query-builder/src/join.rs +++ b/crates/query-builder/src/join.rs @@ -2,7 +2,7 @@ use crate::TableNameStr; use super::{ expr::{format_expr, BoolExpr}, - table::{ColumnRef, HasCols, HasIxCols, Table}, + table::{CanBeLookupTable, ColumnRef, HasCols, HasIxCols, Table}, Query, }; use std::marker::PhantomData; @@ -66,7 +66,7 @@ pub struct RightSemiJoin { } impl Table { - pub fn left_semijoin( + pub fn left_semijoin( self, right: Table, on: impl Fn(&L::IxCols, &R::IxCols) -> IxJoinEq, @@ -80,7 +80,7 @@ impl Table { } } - pub fn right_semijoin( + pub fn right_semijoin( self, right: Table, on: impl Fn(&L::IxCols, &R::IxCols) -> IxJoinEq, @@ -97,7 +97,7 @@ impl Table { } impl super::FromWhere { - pub fn left_semijoin( + pub fn left_semijoin( self, right: Table, on: impl Fn(&L::IxCols, &R::IxCols) -> IxJoinEq, @@ -111,7 +111,7 @@ impl super::FromWhere { } } - pub fn right_semijoin( + pub fn right_semijoin( self, right: Table, on: impl Fn(&L::IxCols, &R::IxCols) -> IxJoinEq, diff --git a/crates/query-builder/src/table.rs b/crates/query-builder/src/table.rs index b08098a51b3..254510e96b3 100644 --- a/crates/query-builder/src/table.rs +++ b/crates/query-builder/src/table.rs @@ -16,6 +16,11 @@ pub trait HasIxCols { fn ix_cols(name: TableNameStr) -> Self::IxCols; } +/// Marker trait for tables that can appear as the right/inner/lookup +/// table in a semi-join. Event tables do NOT implement this trait, +/// preventing them from being used as the lookup side of a join. +pub trait CanBeLookupTable: HasIxCols {} + pub struct Table { pub(super) table_name: TableNameStr, _marker: PhantomData, diff --git a/crates/schema/src/def.rs b/crates/schema/src/def.rs index 5cc0aaa27d6..1a9406eb800 100644 --- a/crates/schema/src/def.rs +++ b/crates/schema/src/def.rs @@ -540,6 +540,12 @@ pub struct TableDef { /// Whether this table is public or private. pub table_access: TableAccess, + + /// Whether this is an event table. + /// + /// Event tables persist to the commitlog but are not merged into committed state. + /// Their rows are only visible to V2 subscribers in the transaction that inserted them. + pub is_event: bool, } impl TableDef { @@ -566,6 +572,7 @@ impl From for RawTableDefV9 { schedule, table_type, table_access, + is_event: _, // V9 does not support event tables; ignore when converting back } = val; RawTableDefV9 { @@ -603,6 +610,7 @@ impl From for TableDef { schedule: None, table_type: TableType::User, table_access: if is_public { Public } else { Private }, + is_event: false, } } } diff --git a/crates/schema/src/def/validate/v10.rs b/crates/schema/src/def/validate/v10.rs index 1667bd63b1f..d3e487b28b4 100644 --- a/crates/schema/src/def/validate/v10.rs +++ b/crates/schema/src/def/validate/v10.rs @@ -221,6 +221,7 @@ impl<'a> ModuleValidatorV10<'a> { table_type, table_access, default_values, + is_event, } = table; let product_type: &ProductType = self @@ -360,6 +361,7 @@ impl<'a> ModuleValidatorV10<'a> { schedule: None, // V10 handles schedules separately table_type, table_access, + is_event, }) } diff --git a/crates/schema/src/def/validate/v9.rs b/crates/schema/src/def/validate/v9.rs index 459f0cc80d2..03c9e6f8338 100644 --- a/crates/schema/src/def/validate/v9.rs +++ b/crates/schema/src/def/validate/v9.rs @@ -317,6 +317,7 @@ impl ModuleValidatorV9<'_> { schedule, table_type, table_access, + is_event: false, // V9 does not support event tables }) } diff --git a/crates/schema/src/schema.rs b/crates/schema/src/schema.rs index 4fa2085844f..3e050bff6f1 100644 --- a/crates/schema/src/schema.rs +++ b/crates/schema/src/schema.rs @@ -188,6 +188,9 @@ pub struct TableSchema { /// The schedule for the table, if present. pub schedule: Option, + /// Whether this is an event table. + pub is_event: bool, + /// Cache for `row_type_for_table` in the data store. pub row_type: ProductType, } @@ -212,6 +215,7 @@ impl TableSchema { table_access: StAccess, schedule: Option, primary_key: Option, + is_event: bool, ) -> Self { Self { row_type: columns_to_row_type(&columns), @@ -226,6 +230,7 @@ impl TableSchema { table_access, schedule, primary_key, + is_event, } } @@ -261,6 +266,7 @@ impl TableSchema { StAccess::Public, None, None, + false, ) } @@ -753,6 +759,7 @@ impl TableSchema { table_access, None, None, + false, ) } @@ -875,6 +882,7 @@ impl TableSchema { table_access, None, None, + false, ) } } @@ -904,6 +912,7 @@ impl Schema for TableSchema { schedule, table_type, table_access, + is_event, } = def; let columns = column_schemas_from_defs(module_def, columns, table_id); @@ -941,6 +950,7 @@ impl Schema for TableSchema { (*table_access).into(), schedule, *primary_key, + *is_event, ) } diff --git a/modules/module-test/src/lib.rs b/modules/module-test/src/lib.rs index 433796e5b60..d8ef5ff62a0 100644 --- a/modules/module-test/src/lib.rs +++ b/modules/module-test/src/lib.rs @@ -165,6 +165,12 @@ pub struct HasSpecialStuff { connection_id: ConnectionId, } +#[table(name = my_event, public, event)] +pub struct MyEvent { + name: String, + value: u64, +} + /// These two tables defined with the same row type /// verify that we can define multiple tables with the same type. /// @@ -373,6 +379,11 @@ pub fn delete_players_by_name(ctx: &ReducerContext, name: String) -> Result<(), } } +#[spacetimedb::reducer] +pub fn emit_event(ctx: &ReducerContext, name: String, value: u64) { + ctx.db.my_event().insert(MyEvent { name, value }); +} + #[spacetimedb::reducer(client_connected)] fn client_connected(_ctx: &ReducerContext) {} From 9f8d2246bd169c22a1d77c2882520ff217d2422e Mon Sep 17 00:00:00 2001 From: = Date: Thu, 5 Feb 2026 23:17:39 -0500 Subject: [PATCH 2/4] Add From for RawV10 impls to match V9 pattern Each validated schema type (TableDef, IndexDef, ReducerDef, etc.) now has its own From impl for the corresponding V10 raw type, mirroring the existing V9 conversions. The main From for RawModuleDefV10 is simplified to use .into() calls. --- crates/schema/src/def.rs | 232 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 232 insertions(+) diff --git a/crates/schema/src/def.rs b/crates/schema/src/def.rs index 1a9406eb800..1751610993b 100644 --- a/crates/schema/src/def.rs +++ b/crates/schema/src/def.rs @@ -31,6 +31,11 @@ use itertools::Itertools; use spacetimedb_data_structures::error_stream::{CollectAllErrors, CombineErrors, ErrorStream}; use spacetimedb_data_structures::map::{Equivalent, HashMap}; use spacetimedb_lib::db::raw_def; +use spacetimedb_lib::db::raw_def::v10::{ + RawConstraintDefV10, RawIndexDefV10, RawLifeCycleReducerDefV10, RawModuleDefV10, RawModuleDefV10Section, + RawProcedureDefV10, RawReducerDefV10, RawRowLevelSecurityDefV10, RawScheduleDefV10, RawScopedTypeNameV10, + RawSequenceDefV10, RawTableDefV10, RawTypeDefV10, RawViewDefV10, +}; use spacetimedb_lib::db::raw_def::v9::{ Lifecycle, RawColumnDefaultValueV9, RawConstraintDataV9, RawConstraintDefV9, RawIndexAlgorithm, RawIndexDefV9, RawMiscModuleExportV9, RawModuleDefV9, RawProcedureDefV9, RawReducerDefV9, RawRowLevelSecurityDefV9, @@ -468,6 +473,97 @@ impl TryFrom for ModuleDef { } } +impl From for RawModuleDefV10 { + fn from(val: ModuleDef) -> Self { + let ModuleDef { + tables, + views, + reducers, + lifecycle_reducers, + types, + typespace, + stored_in_table_def: _, + typespace_for_generate: _, + refmap: _, + row_level_security_raw, + procedures, + raw_module_def_version: _, + } = val; + + let mut sections = Vec::new(); + + sections.push(RawModuleDefV10Section::Typespace(typespace)); + + // Extract lifecycle reducer names before consuming reducers. + let raw_lifecycle: Vec = lifecycle_reducers + .into_iter() + .filter_map(|(lifecycle, reducer_id)| { + let id = reducer_id?; + let (name, _) = reducers.get_index(id.idx())?; + Some(RawLifeCycleReducerDefV10 { + lifecycle_spec: lifecycle, + function_name: name.clone().into(), + }) + }) + .collect(); + + let raw_types: Vec = types.into_values().map(Into::into).collect(); + if !raw_types.is_empty() { + sections.push(RawModuleDefV10Section::Types(raw_types)); + } + + // Collect schedules from tables (V10 stores them in a separate section). + let mut schedules = Vec::new(); + let raw_tables: Vec = tables + .into_values() + .map(|td| { + if let Some(sched) = td.schedule.clone() { + schedules.push(RawScheduleDefV10 { + source_name: Some(sched.name.into()), + table_name: td.name.clone().into(), + schedule_at_col: sched.at_column, + function_name: sched.function_name.into(), + }); + } + td.into() + }) + .collect(); + if !raw_tables.is_empty() { + sections.push(RawModuleDefV10Section::Tables(raw_tables)); + } + + let raw_reducers: Vec = reducers.into_values().map(Into::into).collect(); + if !raw_reducers.is_empty() { + sections.push(RawModuleDefV10Section::Reducers(raw_reducers)); + } + + let raw_procedures: Vec = procedures.into_values().map(Into::into).collect(); + if !raw_procedures.is_empty() { + sections.push(RawModuleDefV10Section::Procedures(raw_procedures)); + } + + let raw_views: Vec = views.into_values().map(Into::into).collect(); + if !raw_views.is_empty() { + sections.push(RawModuleDefV10Section::Views(raw_views)); + } + + if !schedules.is_empty() { + sections.push(RawModuleDefV10Section::Schedules(schedules)); + } + + if !raw_lifecycle.is_empty() { + sections.push(RawModuleDefV10Section::LifeCycleReducers(raw_lifecycle)); + } + + let raw_rls: Vec = row_level_security_raw.into_values().collect(); + if !raw_rls.is_empty() { + sections.push(RawModuleDefV10Section::RowLevelSecurity(raw_rls)); + } + + RawModuleDefV10 { sections } + } +} + /// Implemented by definitions stored in a `ModuleDef`. /// Allows looking definitions up in a `ModuleDef`, and across /// `ModuleDef`s during migrations. @@ -589,6 +685,37 @@ impl From for RawTableDefV9 { } } +impl From for RawTableDefV10 { + fn from(val: TableDef) -> Self { + let TableDef { + name, + product_type_ref, + primary_key, + columns: _, // will be reconstructed from the product type. + indexes, + constraints, + sequences, + schedule: _, // V10 stores schedules in a separate section; handled in From. + table_type, + table_access, + is_event, + } = val; + + RawTableDefV10 { + source_name: name.into(), + product_type_ref, + primary_key: ColList::from_iter(primary_key), + indexes: indexes.into_values().map(Into::into).collect(), + constraints: constraints.into_values().map(Into::into).collect(), + sequences: sequences.into_values().map(Into::into).collect(), + table_type, + table_access, + default_values: Vec::new(), + is_event, + } + } +} + impl From for TableDef { fn from(def: ViewDef) -> Self { use TableAccess::*; @@ -657,6 +784,19 @@ impl From for RawSequenceDefV9 { } } +impl From for RawSequenceDefV10 { + fn from(val: SequenceDef) -> Self { + RawSequenceDefV10 { + source_name: Some(val.name), + column: val.column, + start: val.start, + min_value: val.min_value, + max_value: val.max_value, + increment: val.increment, + } + } +} + /// A struct representing the validated definition of a database index. /// /// Cannot be created directly. Construct a [`ModuleDef`] by validating a [`RawModuleDef`] instead, @@ -704,6 +844,16 @@ impl From for RawIndexDefV9 { } } +impl From for RawIndexDefV10 { + fn from(val: IndexDef) -> Self { + RawIndexDefV10 { + source_name: Some(val.name), + accessor_name: val.accessor_name.map(Into::into), + algorithm: val.algorithm.into(), + } + } +} + /// Data specifying a supported index algorithm. #[non_exhaustive] #[derive(Debug, Clone, Eq, PartialEq)] @@ -964,6 +1114,15 @@ impl From for RawConstraintDefV9 { } } +impl From for RawConstraintDefV10 { + fn from(val: ConstraintDef) -> Self { + RawConstraintDefV10 { + source_name: Some(val.name), + data: val.data.into(), + } + } +} + /// Data for a constraint attached to a table. #[derive(Debug, Clone, Eq, PartialEq)] #[non_exhaustive] @@ -1121,6 +1280,16 @@ impl From for RawTypeDefV9 { } } +impl From for RawTypeDefV10 { + fn from(val: TypeDef) -> Self { + RawTypeDefV10 { + source_name: val.name.into(), + ty: val.ty, + custom_ordering: val.custom_ordering, + } + } +} + /// A scoped type name, in the form `scope0::scope1::...::scopeN::name`. /// /// These are the names that will be used *in client code generation*, NOT the names used for types @@ -1217,6 +1386,15 @@ impl From for RawScopedTypeNameV9 { } } +impl From for RawScopedTypeNameV10 { + fn from(val: ScopedTypeName) -> Self { + RawScopedTypeNameV10 { + scope: val.scope.into_vec().into_iter().map(|id| id.into()).collect(), + source_name: val.name.into(), + } + } +} + /// A view exported by the module. #[derive(Debug, Clone, Eq, PartialEq)] #[non_exhaustive] @@ -1314,6 +1492,28 @@ impl From for RawViewDefV9 { } } +impl From for RawViewDefV10 { + fn from(val: ViewDef) -> Self { + let ViewDef { + name, + is_anonymous, + is_public, + params, + return_type, + fn_ptr, + .. + } = val; + RawViewDefV10 { + source_name: name.into(), + index: fn_ptr.into(), + is_public, + is_anonymous, + params, + return_type, + } + } +} + impl From for RawMiscModuleExportV9 { fn from(def: ViewDef) -> Self { Self::View(def.into()) @@ -1349,6 +1549,15 @@ impl From for FunctionVisibility { } } +impl From for RawFunctionVisibility { + fn from(val: FunctionVisibility) -> Self { + match val { + FunctionVisibility::Private => RawFunctionVisibility::Private, + FunctionVisibility::ClientCallable => RawFunctionVisibility::ClientCallable, + } + } +} + /// A reducer exported by the module. #[derive(Debug, Clone, Eq, PartialEq)] #[non_exhaustive] @@ -1389,6 +1598,18 @@ impl From for RawReducerDefV9 { } } +impl From for RawReducerDefV10 { + fn from(val: ReducerDef) -> Self { + RawReducerDefV10 { + source_name: val.name.into(), + params: val.params, + visibility: val.visibility.into(), + ok_return_type: val.ok_return_type, + err_return_type: val.err_return_type, + } + } +} + #[derive(Debug, Clone, Eq, PartialEq)] #[non_exhaustive] pub struct ProcedureDef { @@ -1433,6 +1654,17 @@ impl From for RawProcedureDefV9 { } } +impl From for RawProcedureDefV10 { + fn from(val: ProcedureDef) -> Self { + RawProcedureDefV10 { + source_name: val.name.into(), + params: val.params, + return_type: val.return_type, + visibility: val.visibility.into(), + } + } +} + impl From for RawMiscModuleExportV9 { fn from(def: ProcedureDef) -> Self { Self::Procedure(def.into()) From 3b26cf86a7996d9a2874f0ed4d26b23ff0778435 Mon Sep 17 00:00:00 2001 From: = Date: Tue, 10 Feb 2026 09:27:40 -0500 Subject: [PATCH 3/4] Add event table unit tests, migration validation, and bootstrap fix - Register st_event_table in bootstrap_system_tables (was missing) - Add 4 datastore unit tests for event table behavior: - insert+delete cancellation produces no TxData - update yields only the final row in TxData - bare insert records in TxData but not committed state - replay_insert is a no-op for event tables - Add ChangeTableEventFlag migration error preventing event flag transitions (event->non-event and vice versa) - Update bootstrap test expectations for new system table - Enable spacetimedb-schema test feature in datastore dev-deps --- crates/datastore/Cargo.toml | 1 + .../locking_tx_datastore/committed_state.rs | 7 +- .../src/locking_tx_datastore/datastore.rs | 140 +++++++++++++++++- crates/schema/src/auto_migrate.rs | 87 ++++++++++- 4 files changed, 225 insertions(+), 10 deletions(-) diff --git a/crates/datastore/Cargo.toml b/crates/datastore/Cargo.toml index af77913e067..cd6bb43a5dd 100644 --- a/crates/datastore/Cargo.toml +++ b/crates/datastore/Cargo.toml @@ -47,6 +47,7 @@ test = ["spacetimedb-commitlog/test"] spacetimedb-lib = { path = "../lib", features = ["proptest"] } spacetimedb-sats = { path = "../sats", features = ["proptest"] } spacetimedb-commitlog = { path = "../commitlog", features = ["test"] } +spacetimedb-schema = { path = "../schema", features = ["test"] } # Also as dev-dependencies for use in _this_ crate's tests. proptest.workspace = true diff --git a/crates/datastore/src/locking_tx_datastore/committed_state.rs b/crates/datastore/src/locking_tx_datastore/committed_state.rs index 2440de4fbef..826042f7887 100644 --- a/crates/datastore/src/locking_tx_datastore/committed_state.rs +++ b/crates/datastore/src/locking_tx_datastore/committed_state.rs @@ -29,8 +29,9 @@ use crate::{ use crate::{ locking_tx_datastore::ViewCallInfo, system_tables::{ - ST_CONNECTION_CREDENTIALS_ID, ST_CONNECTION_CREDENTIALS_IDX, ST_VIEW_COLUMN_ID, ST_VIEW_COLUMN_IDX, ST_VIEW_ID, - ST_VIEW_IDX, ST_VIEW_PARAM_ID, ST_VIEW_PARAM_IDX, ST_VIEW_SUB_ID, ST_VIEW_SUB_IDX, + ST_CONNECTION_CREDENTIALS_ID, ST_CONNECTION_CREDENTIALS_IDX, ST_EVENT_TABLE_ID, ST_EVENT_TABLE_IDX, + ST_VIEW_COLUMN_ID, ST_VIEW_COLUMN_IDX, ST_VIEW_ID, ST_VIEW_IDX, ST_VIEW_PARAM_ID, ST_VIEW_PARAM_IDX, + ST_VIEW_SUB_ID, ST_VIEW_SUB_IDX, }, }; use anyhow::anyhow; @@ -471,6 +472,8 @@ impl CommittedState { self.create_table(ST_VIEW_SUB_ID, schemas[ST_VIEW_SUB_IDX].clone()); self.create_table(ST_VIEW_ARG_ID, schemas[ST_VIEW_ARG_IDX].clone()); + self.create_table(ST_EVENT_TABLE_ID, schemas[ST_EVENT_TABLE_IDX].clone()); + // Insert the sequences into `st_sequences` let (st_sequences, blob_store, pool) = self.get_table_and_blob_store_or_create(ST_SEQUENCE_ID, &schemas[ST_SEQUENCE_IDX]); diff --git a/crates/datastore/src/locking_tx_datastore/datastore.rs b/crates/datastore/src/locking_tx_datastore/datastore.rs index 5961fe111e7..ec63dbcce0e 100644 --- a/crates/datastore/src/locking_tx_datastore/datastore.rs +++ b/crates/datastore/src/locking_tx_datastore/datastore.rs @@ -1278,12 +1278,13 @@ mod tests { use crate::locking_tx_datastore::tx_state::PendingSchemaChange; use crate::system_tables::{ system_tables, StColumnRow, StConnectionCredentialsFields, StConstraintData, StConstraintFields, - StConstraintRow, StIndexAlgorithm, StIndexFields, StIndexRow, StRowLevelSecurityFields, StScheduledFields, - StSequenceFields, StSequenceRow, StTableRow, StVarFields, StViewArgFields, StViewFields, ST_CLIENT_ID, - ST_CLIENT_NAME, ST_COLUMN_ID, ST_COLUMN_NAME, ST_CONNECTION_CREDENTIALS_ID, ST_CONNECTION_CREDENTIALS_NAME, - ST_CONSTRAINT_ID, ST_CONSTRAINT_NAME, ST_INDEX_ID, ST_INDEX_NAME, ST_MODULE_NAME, ST_RESERVED_SEQUENCE_RANGE, - ST_ROW_LEVEL_SECURITY_ID, ST_ROW_LEVEL_SECURITY_NAME, ST_SCHEDULED_ID, ST_SCHEDULED_NAME, ST_SEQUENCE_ID, - ST_SEQUENCE_NAME, ST_TABLE_NAME, ST_VAR_ID, ST_VAR_NAME, ST_VIEW_ARG_ID, ST_VIEW_ARG_NAME, ST_VIEW_COLUMN_ID, + StConstraintRow, StEventTableFields, StIndexAlgorithm, StIndexFields, StIndexRow, StRowLevelSecurityFields, + StScheduledFields, StSequenceFields, StSequenceRow, StTableRow, StVarFields, StViewArgFields, StViewFields, + ST_CLIENT_ID, ST_CLIENT_NAME, ST_COLUMN_ID, ST_COLUMN_NAME, ST_CONNECTION_CREDENTIALS_ID, + ST_CONNECTION_CREDENTIALS_NAME, ST_CONSTRAINT_ID, ST_CONSTRAINT_NAME, ST_EVENT_TABLE_ID, ST_EVENT_TABLE_NAME, + ST_INDEX_ID, ST_INDEX_NAME, ST_MODULE_NAME, ST_RESERVED_SEQUENCE_RANGE, ST_ROW_LEVEL_SECURITY_ID, + ST_ROW_LEVEL_SECURITY_NAME, ST_SCHEDULED_ID, ST_SCHEDULED_NAME, ST_SEQUENCE_ID, ST_SEQUENCE_NAME, + ST_TABLE_NAME, ST_VAR_ID, ST_VAR_NAME, ST_VIEW_ARG_ID, ST_VIEW_ARG_NAME, ST_VIEW_COLUMN_ID, ST_VIEW_COLUMN_NAME, ST_VIEW_ID, ST_VIEW_NAME, ST_VIEW_PARAM_ID, ST_VIEW_PARAM_NAME, ST_VIEW_SUB_ID, ST_VIEW_SUB_NAME, }; @@ -1747,7 +1748,7 @@ mod tests { TableRow { id: ST_VIEW_COLUMN_ID.into(), name: ST_VIEW_COLUMN_NAME, ty: StTableType::System, access: StAccess::Public, primary_key: None }, TableRow { id: ST_VIEW_SUB_ID.into(), name: ST_VIEW_SUB_NAME, ty: StTableType::System, access: StAccess::Public, primary_key: None }, TableRow { id: ST_VIEW_ARG_ID.into(), name: ST_VIEW_ARG_NAME, ty: StTableType::System, access: StAccess::Public, primary_key: Some(StViewArgFields::Id.into()) }, - + TableRow { id: ST_EVENT_TABLE_ID.into(), name: ST_EVENT_TABLE_NAME, ty: StTableType::System, access: StAccess::Public, primary_key: Some(StEventTableFields::TableId.into()) }, ])); #[rustfmt::skip] @@ -1833,6 +1834,8 @@ mod tests { ColRow { table: ST_VIEW_ARG_ID.into(), pos: 0, name: "id", ty: AlgebraicType::U64 }, ColRow { table: ST_VIEW_ARG_ID.into(), pos: 1, name: "bytes", ty: AlgebraicType::bytes() }, + + ColRow { table: ST_EVENT_TABLE_ID.into(), pos: 0, name: "table_id", ty: TableId::get_type() }, ])); #[rustfmt::skip] assert_eq!(query.scan_st_indexes()?, map_array([ @@ -1858,6 +1861,7 @@ mod tests { IndexRow { id: 20, table: ST_VIEW_SUB_ID.into(), col: col_list![0, 1, 2], name: "st_view_sub_view_id_arg_id_identity_idx_btree", }, IndexRow { id: 21, table: ST_VIEW_ARG_ID.into(), col: col(0), name: "st_view_arg_id_idx_btree", }, IndexRow { id: 22, table: ST_VIEW_ARG_ID.into(), col: col(1), name: "st_view_arg_bytes_idx_btree", }, + IndexRow { id: 23, table: ST_EVENT_TABLE_ID.into(), col: col(0), name: "st_event_table_table_id_idx_btree", }, ])); let start = ST_RESERVED_SEQUENCE_RANGE as i128 + 1; #[rustfmt::skip] @@ -1896,6 +1900,7 @@ mod tests { ConstraintRow { constraint_id: 16, table_id: ST_VIEW_COLUMN_ID.into(), unique_columns: col_list![0, 1], constraint_name: "st_view_column_view_id_col_pos_key", }, ConstraintRow { constraint_id: 17, table_id: ST_VIEW_ARG_ID.into(), unique_columns: col(0), constraint_name: "st_view_arg_id_key", }, ConstraintRow { constraint_id: 18, table_id: ST_VIEW_ARG_ID.into(), unique_columns: col(1), constraint_name: "st_view_arg_bytes_key", }, + ConstraintRow { constraint_id: 19, table_id: ST_EVENT_TABLE_ID.into(), unique_columns: col(0), constraint_name: "st_event_table_table_id_key", }, ])); // Verify we get back the tables correctly with the proper ids... @@ -2321,6 +2326,7 @@ mod tests { IndexRow { id: 20, table: ST_VIEW_SUB_ID.into(), col: col_list![0, 1, 2], name: "st_view_sub_view_id_arg_id_identity_idx_btree", }, IndexRow { id: 21, table: ST_VIEW_ARG_ID.into(), col: col(0), name: "st_view_arg_id_idx_btree", }, IndexRow { id: 22, table: ST_VIEW_ARG_ID.into(), col: col(1), name: "st_view_arg_bytes_idx_btree", }, + IndexRow { id: 23, table: ST_EVENT_TABLE_ID.into(), col: col(0), name: "st_event_table_table_id_idx_btree", }, IndexRow { id: seq_start, table: FIRST_NON_SYSTEM_ID, col: col(0), name: "Foo_id_idx_btree", }, IndexRow { id: seq_start + 1, table: FIRST_NON_SYSTEM_ID, col: col(1), name: "Foo_name_idx_btree", }, IndexRow { id: seq_start + 2, table: FIRST_NON_SYSTEM_ID, col: col(2), name: "Foo_age_idx_btree", }, @@ -3706,4 +3712,124 @@ mod tests { assert!(!metrics.committed); Ok(()) } + + /// Create an event table with the basic schema (id: u32, name: String, age: u32). + fn setup_event_table() -> ResultTest<(Locking, MutTxId, TableId)> { + let datastore = get_datastore()?; + let mut tx = begin_mut_tx(&datastore); + let mut schema = basic_table_schema_with_indices(basic_indices(), basic_constraints()); + schema.is_event = true; + let table_id = datastore.create_table_mut_tx(&mut tx, schema)?; + Ok((datastore, tx, table_id)) + } + + #[test] + fn test_event_table_insert_delete_noop() -> ResultTest<()> { + let (datastore, tx, table_id) = setup_event_table()?; + // Commit the table-creation tx first. + commit(&datastore, tx)?; + + let mut tx = begin_mut_tx(&datastore); + let row = u32_str_u32(1, "Alice", 30); + insert(&datastore, &mut tx, table_id, &row)?; + datastore.delete_by_rel_mut_tx(&mut tx, table_id, [row]); + + let tx_data = commit(&datastore, tx)?; + + // Insert+delete in same tx should cancel out: no TxData entry for this table. + assert!( + tx_data.inserts_for_table(table_id).is_none(), + "insert+delete should cancel: no inserts in TxData" + ); + assert!( + tx_data.deletes_for_table(table_id).is_none(), + "insert+delete should cancel: no deletes in TxData" + ); + + // Committed state should be empty for event tables. + let tx = begin_mut_tx(&datastore); + assert_eq!(all_rows(&datastore, &tx, table_id).len(), 0); + Ok(()) + } + + #[test] + fn test_event_table_update_only_final_row() -> ResultTest<()> { + let (datastore, tx, table_id) = setup_event_table()?; + // Commit the table-creation tx first. + commit(&datastore, tx)?; + + let mut tx = begin_mut_tx(&datastore); + let row_v1 = u32_str_u32(1, "Alice", 30); + insert(&datastore, &mut tx, table_id, &row_v1)?; + + // Update via the index on column 0 (id) — replaces the row with same PK. + let idx = extract_index_id(&datastore, &tx, &basic_indices()[0])?; + let row_v2 = u32_str_u32(1, "Alice", 31); + update(&datastore, &mut tx, table_id, idx, &row_v2)?; + + let tx_data = commit(&datastore, tx)?; + + // The update replaces the original insert, so TxData should have only the final row. + let inserts = tx_data.inserts_for_table(table_id).expect("should have inserts"); + assert_eq!(inserts.len(), 1, "update should leave exactly 1 insert"); + assert_eq!(inserts[0], row_v2); + + // Committed state should still be empty for event tables. + let tx = begin_mut_tx(&datastore); + assert_eq!(all_rows(&datastore, &tx, table_id).len(), 0); + Ok(()) + } + + #[test] + fn test_event_table_insert_records_txdata_not_committed_state() -> ResultTest<()> { + let (datastore, tx, table_id) = setup_event_table()?; + // Commit the table-creation tx first. + commit(&datastore, tx)?; + + let mut tx = begin_mut_tx(&datastore); + let row = u32_str_u32(1, "Bob", 25); + insert(&datastore, &mut tx, table_id, &row)?; + + let tx_data = commit(&datastore, tx)?; + + // TxData should record the insert. + let inserts = tx_data + .inserts_for_table(table_id) + .expect("event table insert should appear in TxData"); + assert_eq!(inserts.len(), 1); + assert_eq!(inserts[0], row); + + // But committed state should be empty. + let tx = begin_mut_tx(&datastore); + assert_eq!(all_rows(&datastore, &tx, table_id).len(), 0); + Ok(()) + } + + #[test] + fn test_event_table_replay_ignores_inserts() -> ResultTest<()> { + let (datastore, tx, table_id) = setup_event_table()?; + // Commit the table-creation tx so the schema exists. + commit(&datastore, tx)?; + + // Get the schema for this event table. + let tx = begin_mut_tx(&datastore); + let schema = datastore.schema_for_table_mut_tx(&tx, table_id)?; + let _ = datastore.rollback_mut_tx(tx); + + // Directly call replay_insert on committed state. + let row = u32_str_u32(1, "Carol", 40); + { + let mut committed_state = datastore.committed_state.write(); + committed_state.replay_insert(table_id, &schema, &row)?; + } + + // After replay, the event table should still have no committed rows. + let tx = begin_mut_tx(&datastore); + assert_eq!( + all_rows(&datastore, &tx, table_id).len(), + 0, + "replay_insert should be a no-op for event tables" + ); + Ok(()) + } } diff --git a/crates/schema/src/auto_migrate.rs b/crates/schema/src/auto_migrate.rs index 446c91638b7..1578615f4e4 100644 --- a/crates/schema/src/auto_migrate.rs +++ b/crates/schema/src/auto_migrate.rs @@ -416,6 +416,9 @@ pub enum AutoMigrateError { type2: TableType, }, + #[error("Changing the event flag of table {table} requires a manual migration")] + ChangeTableEventFlag { table: Identifier }, + #[error( "Changing the accessor name on index {index} from {old_accessor:?} to {new_accessor:?} requires a manual migration" )] @@ -650,6 +653,14 @@ fn auto_migrate_table<'def>(plan: &mut AutoMigratePlan<'def>, old: &'def TableDe } .into()) }; + let event_ok: Result<()> = if old.is_event == new.is_event { + Ok(()) + } else { + Err(AutoMigrateError::ChangeTableEventFlag { + table: old.name.clone(), + } + .into()) + }; if old.table_access != new.table_access { plan.steps.push(AutoMigrateStep::ChangeAccess(key)); } @@ -725,7 +736,8 @@ fn auto_migrate_table<'def>(plan: &mut AutoMigratePlan<'def>, old: &'def TableDe }) .collect_all_errors::>(); - let ((), ProductMonoid(Any(row_type_changed), Any(columns_added))) = (type_ok, columns_ok).combine_errors()?; + let ((), (), ProductMonoid(Any(row_type_changed), Any(columns_added))) = + (type_ok, event_ok, columns_ok).combine_errors()?; // If we're adding a column, we'll rewrite the whole table. // That makes any `ChangeColumns` moot, so we can skip it. @@ -2330,4 +2342,77 @@ mod tests { let plan = ponder_auto_migrate(&old_def, &new_def).expect("auto migration should succeed"); assert!(!plan.disconnects_all_users(), "{plan:#?}"); } + + fn create_v10_module_def(build_module: impl Fn(&mut v10::RawModuleDefV10Builder)) -> ModuleDef { + let mut builder = v10::RawModuleDefV10Builder::new(); + build_module(&mut builder); + builder + .finish() + .try_into() + .expect("should be a valid module definition") + } + + #[test] + fn test_change_event_flag_rejected() { + // non-event → event + let old = create_v10_module_def(|builder| { + builder + .build_table_with_new_type( + "Events", + ProductType::from([("id", AlgebraicType::U64)]), + true, + ) + .finish(); + }); + let new = create_v10_module_def(|builder| { + builder + .build_table_with_new_type( + "Events", + ProductType::from([("id", AlgebraicType::U64)]), + true, + ) + .with_event(true) + .finish(); + }); + + let result = ponder_auto_migrate(&old, &new); + expect_error_matching!( + result, + AutoMigrateError::ChangeTableEventFlag { table } => &table[..] == "Events" + ); + + // event → non-event (reverse direction) + let result = ponder_auto_migrate(&new, &old); + expect_error_matching!( + result, + AutoMigrateError::ChangeTableEventFlag { table } => &table[..] == "Events" + ); + } + + #[test] + fn test_same_event_flag_accepted() { + // Both event → no error + let old = create_v10_module_def(|builder| { + builder + .build_table_with_new_type( + "Events", + ProductType::from([("id", AlgebraicType::U64)]), + true, + ) + .with_event(true) + .finish(); + }); + let new = create_v10_module_def(|builder| { + builder + .build_table_with_new_type( + "Events", + ProductType::from([("id", AlgebraicType::U64)]), + true, + ) + .with_event(true) + .finish(); + }); + + ponder_auto_migrate(&old, &new).expect("same event flag should succeed"); + } } From c8f03f609de8210576790ce47c9e9232829a5a72 Mon Sep 17 00:00:00 2001 From: = Date: Tue, 10 Feb 2026 17:29:57 -0500 Subject: [PATCH 4/4] Run cargo fmt --- crates/bindings-macro/src/table.rs | 6 ++++- .../src/locking_tx_datastore/mut_tx.rs | 6 ++--- .../src/locking_tx_datastore/state_view.rs | 8 +++---- crates/datastore/src/system_tables.rs | 5 +++- crates/schema/src/auto_migrate.rs | 24 ++++--------------- 5 files changed, 20 insertions(+), 29 deletions(-) diff --git a/crates/bindings-macro/src/table.rs b/crates/bindings-macro/src/table.rs index 2076ece600d..11d60e1a686 100644 --- a/crates/bindings-macro/src/table.rs +++ b/crates/bindings-macro/src/table.rs @@ -848,7 +848,11 @@ pub(crate) fn table_impl(mut args: TableArgs, item: &syn::DeriveInput) -> syn::R ); let table_access = args.access.iter().map(|acc| acc.to_value()); - let is_event = args.event.iter().map(|_| quote!(const IS_EVENT: bool = true;)); + let is_event = args.event.iter().map(|_| { + quote!( + const IS_EVENT: bool = true; + ) + }); let can_be_lookup_impl = if args.event.is_none() { quote! { impl spacetimedb::query_builder::CanBeLookupTable for #original_struct_ident {} diff --git a/crates/datastore/src/locking_tx_datastore/mut_tx.rs b/crates/datastore/src/locking_tx_datastore/mut_tx.rs index 0b9c8e5ec38..101a5feb753 100644 --- a/crates/datastore/src/locking_tx_datastore/mut_tx.rs +++ b/crates/datastore/src/locking_tx_datastore/mut_tx.rs @@ -21,9 +21,9 @@ use crate::{ error::{IndexError, SequenceError, TableError}, system_tables::{ with_sys_table_buf, StClientFields, StClientRow, StColumnFields, StColumnRow, StConstraintFields, - StConstraintRow, StFields as _, StIndexFields, StIndexRow, StRowLevelSecurityFields, StRowLevelSecurityRow, - StEventTableRow, StScheduledFields, StScheduledRow, StSequenceFields, StSequenceRow, StTableFields, StTableRow, - SystemTable, ST_CLIENT_ID, ST_COLUMN_ID, ST_CONSTRAINT_ID, ST_EVENT_TABLE_ID, ST_INDEX_ID, + StConstraintRow, StEventTableRow, StFields as _, StIndexFields, StIndexRow, StRowLevelSecurityFields, + StRowLevelSecurityRow, StScheduledFields, StScheduledRow, StSequenceFields, StSequenceRow, StTableFields, + StTableRow, SystemTable, ST_CLIENT_ID, ST_COLUMN_ID, ST_CONSTRAINT_ID, ST_EVENT_TABLE_ID, ST_INDEX_ID, ST_ROW_LEVEL_SECURITY_ID, ST_SCHEDULED_ID, ST_SEQUENCE_ID, ST_TABLE_ID, }, }; diff --git a/crates/datastore/src/locking_tx_datastore/state_view.rs b/crates/datastore/src/locking_tx_datastore/state_view.rs index 4ce84e0c602..59b3e27fe55 100644 --- a/crates/datastore/src/locking_tx_datastore/state_view.rs +++ b/crates/datastore/src/locking_tx_datastore/state_view.rs @@ -4,10 +4,10 @@ use crate::error::{DatastoreError, TableError}; use crate::locking_tx_datastore::mut_tx::{IndexScanPoint, IndexScanRanged}; use crate::system_tables::{ ConnectionIdViaU128, StColumnFields, StColumnRow, StConnectionCredentialsFields, StConnectionCredentialsRow, - StConstraintFields, StConstraintRow, StIndexFields, StIndexRow, StScheduledFields, StScheduledRow, - StSequenceFields, StSequenceRow, StTableFields, StTableRow, StViewFields, StViewParamFields, StViewRow, - StEventTableFields, SystemTable, ST_COLUMN_ID, ST_CONNECTION_CREDENTIALS_ID, ST_CONSTRAINT_ID, - ST_EVENT_TABLE_ID, ST_INDEX_ID, ST_SCHEDULED_ID, ST_SEQUENCE_ID, ST_TABLE_ID, ST_VIEW_ID, ST_VIEW_PARAM_ID, + StConstraintFields, StConstraintRow, StEventTableFields, StIndexFields, StIndexRow, StScheduledFields, + StScheduledRow, StSequenceFields, StSequenceRow, StTableFields, StTableRow, StViewFields, StViewParamFields, + StViewRow, SystemTable, ST_COLUMN_ID, ST_CONNECTION_CREDENTIALS_ID, ST_CONSTRAINT_ID, ST_EVENT_TABLE_ID, + ST_INDEX_ID, ST_SCHEDULED_ID, ST_SEQUENCE_ID, ST_TABLE_ID, ST_VIEW_ID, ST_VIEW_PARAM_ID, }; use anyhow::anyhow; use core::ops::RangeBounds; diff --git a/crates/datastore/src/system_tables.rs b/crates/datastore/src/system_tables.rs index c345283d14e..b007d4142da 100644 --- a/crates/datastore/src/system_tables.rs +++ b/crates/datastore/src/system_tables.rs @@ -582,7 +582,10 @@ fn system_module_def() -> ModuleDef { let st_event_table_type = builder.add_type::(); builder - .build_table(ST_EVENT_TABLE_NAME, *st_event_table_type.as_ref().expect("should be ref")) + .build_table( + ST_EVENT_TABLE_NAME, + *st_event_table_type.as_ref().expect("should be ref"), + ) .with_type(TableType::System) .with_primary_key(StEventTableFields::TableId) .with_unique_constraint(StEventTableFields::TableId) diff --git a/crates/schema/src/auto_migrate.rs b/crates/schema/src/auto_migrate.rs index 1578615f4e4..cd326995b92 100644 --- a/crates/schema/src/auto_migrate.rs +++ b/crates/schema/src/auto_migrate.rs @@ -2357,20 +2357,12 @@ mod tests { // non-event → event let old = create_v10_module_def(|builder| { builder - .build_table_with_new_type( - "Events", - ProductType::from([("id", AlgebraicType::U64)]), - true, - ) + .build_table_with_new_type("Events", ProductType::from([("id", AlgebraicType::U64)]), true) .finish(); }); let new = create_v10_module_def(|builder| { builder - .build_table_with_new_type( - "Events", - ProductType::from([("id", AlgebraicType::U64)]), - true, - ) + .build_table_with_new_type("Events", ProductType::from([("id", AlgebraicType::U64)]), true) .with_event(true) .finish(); }); @@ -2394,21 +2386,13 @@ mod tests { // Both event → no error let old = create_v10_module_def(|builder| { builder - .build_table_with_new_type( - "Events", - ProductType::from([("id", AlgebraicType::U64)]), - true, - ) + .build_table_with_new_type("Events", ProductType::from([("id", AlgebraicType::U64)]), true) .with_event(true) .finish(); }); let new = create_v10_module_def(|builder| { builder - .build_table_with_new_type( - "Events", - ProductType::from([("id", AlgebraicType::U64)]), - true, - ) + .build_table_with_new_type("Events", ProductType::from([("id", AlgebraicType::U64)]), true) .with_event(true) .finish(); });