diff --git a/crates/bindings-macro/src/lib.rs b/crates/bindings-macro/src/lib.rs index b06e98ab8f9..428573be84c 100644 --- a/crates/bindings-macro/src/lib.rs +++ b/crates/bindings-macro/src/lib.rs @@ -61,6 +61,7 @@ mod sym { symbol!(unique); symbol!(update); symbol!(default); + symbol!(event); symbol!(u8); symbol!(i8); diff --git a/crates/bindings-macro/src/table.rs b/crates/bindings-macro/src/table.rs index 7bc2940d548..11d60e1a686 100644 --- a/crates/bindings-macro/src/table.rs +++ b/crates/bindings-macro/src/table.rs @@ -19,6 +19,7 @@ pub(crate) struct TableArgs { scheduled: Option, name: Ident, indices: Vec, + event: Option, } enum TableAccess { @@ -71,6 +72,7 @@ impl TableArgs { let mut scheduled = None; let mut name = None; let mut indices = Vec::new(); + let mut event = None; syn::meta::parser(|meta| { match_meta!(match meta { sym::public => { @@ -91,6 +93,10 @@ impl TableArgs { check_duplicate(&scheduled, &meta)?; scheduled = Some(ScheduledArg::parse_meta(meta)?); } + sym::event => { + check_duplicate(&event, &meta)?; + event = Some(meta.path.span()); + } }); Ok(()) }) @@ -107,6 +113,7 @@ impl TableArgs { scheduled, name, indices, + event, }) } } @@ -841,6 +848,18 @@ pub(crate) fn table_impl(mut args: TableArgs, item: &syn::DeriveInput) -> syn::R ); let table_access = args.access.iter().map(|acc| acc.to_value()); + let is_event = args.event.iter().map(|_| { + quote!( + const IS_EVENT: bool = true; + ) + }); + let can_be_lookup_impl = if args.event.is_none() { + quote! { + impl spacetimedb::query_builder::CanBeLookupTable for #original_struct_ident {} + } + } else { + quote! {} + }; let unique_col_ids = unique_columns.iter().map(|col| col.index); let primary_col_id = primary_key_column.clone().into_iter().map(|col| col.index); let sequence_col_ids = sequenced_columns.iter().map(|col| col.index); @@ -966,6 +985,7 @@ pub(crate) fn table_impl(mut args: TableArgs, item: &syn::DeriveInput) -> syn::R const TABLE_NAME: &'static str = #table_name; // the default value if not specified is Private #(const TABLE_ACCESS: spacetimedb::table::TableAccess = #table_access;)* + #(#is_event)* const UNIQUE_COLUMNS: &'static [u16] = &[#(#unique_col_ids),*]; const INDEXES: &'static [spacetimedb::table::IndexDesc<'static>] = &[#(#index_descs),*]; #(const PRIMARY_KEY: Option = Some(#primary_col_id);)* @@ -1077,6 +1097,8 @@ pub(crate) fn table_impl(mut args: TableArgs, item: &syn::DeriveInput) -> syn::R } } + #can_be_lookup_impl + }; let table_query_handle_def = quote! { diff --git a/crates/bindings/src/rt.rs b/crates/bindings/src/rt.rs index cea7adfbbec..287b2efde74 100644 --- a/crates/bindings/src/rt.rs +++ b/crates/bindings/src/rt.rs @@ -716,7 +716,8 @@ pub fn register_table() { .inner .build_table(T::TABLE_NAME, product_type_ref) .with_type(TableType::User) - .with_access(T::TABLE_ACCESS); + .with_access(T::TABLE_ACCESS) + .with_event(T::IS_EVENT); for &col in T::UNIQUE_COLUMNS { table = table.with_unique_constraint(col); diff --git a/crates/bindings/src/table.rs b/crates/bindings/src/table.rs index d7cbdd9b39a..f64d815b862 100644 --- a/crates/bindings/src/table.rs +++ b/crates/bindings/src/table.rs @@ -128,6 +128,7 @@ pub trait TableInternal: Sized { const PRIMARY_KEY: Option = None; const SEQUENCES: &'static [u16]; const SCHEDULE: Option> = None; + const IS_EVENT: bool = false; /// Returns the ID of this table. fn table_id() -> TableId; diff --git a/crates/core/src/vm.rs b/crates/core/src/vm.rs index d8bb27a1caa..e995cf72cb3 100644 --- a/crates/core/src/vm.rs +++ b/crates/core/src/vm.rs @@ -699,6 +699,7 @@ pub(crate) mod tests { access, None, None, + false, ), )?; let schema = db.schema_for_table_mut(tx, table_id)?; diff --git a/crates/datastore/Cargo.toml b/crates/datastore/Cargo.toml index af77913e067..cd6bb43a5dd 100644 --- a/crates/datastore/Cargo.toml +++ b/crates/datastore/Cargo.toml @@ -47,6 +47,7 @@ test = ["spacetimedb-commitlog/test"] spacetimedb-lib = { path = "../lib", features = ["proptest"] } spacetimedb-sats = { path = "../sats", features = ["proptest"] } spacetimedb-commitlog = { path = "../commitlog", features = ["test"] } +spacetimedb-schema = { path = "../schema", features = ["test"] } # Also as dev-dependencies for use in _this_ crate's tests. proptest.workspace = true diff --git a/crates/datastore/src/locking_tx_datastore/committed_state.rs b/crates/datastore/src/locking_tx_datastore/committed_state.rs index b597f742336..826042f7887 100644 --- a/crates/datastore/src/locking_tx_datastore/committed_state.rs +++ b/crates/datastore/src/locking_tx_datastore/committed_state.rs @@ -29,8 +29,9 @@ use crate::{ use crate::{ locking_tx_datastore::ViewCallInfo, system_tables::{ - ST_CONNECTION_CREDENTIALS_ID, ST_CONNECTION_CREDENTIALS_IDX, ST_VIEW_COLUMN_ID, ST_VIEW_COLUMN_IDX, ST_VIEW_ID, - ST_VIEW_IDX, ST_VIEW_PARAM_ID, ST_VIEW_PARAM_IDX, ST_VIEW_SUB_ID, ST_VIEW_SUB_IDX, + ST_CONNECTION_CREDENTIALS_ID, ST_CONNECTION_CREDENTIALS_IDX, ST_EVENT_TABLE_ID, ST_EVENT_TABLE_IDX, + ST_VIEW_COLUMN_ID, ST_VIEW_COLUMN_IDX, ST_VIEW_ID, ST_VIEW_IDX, ST_VIEW_PARAM_ID, ST_VIEW_PARAM_IDX, + ST_VIEW_SUB_ID, ST_VIEW_SUB_IDX, }, }; use anyhow::anyhow; @@ -471,6 +472,8 @@ impl CommittedState { self.create_table(ST_VIEW_SUB_ID, schemas[ST_VIEW_SUB_IDX].clone()); self.create_table(ST_VIEW_ARG_ID, schemas[ST_VIEW_ARG_IDX].clone()); + self.create_table(ST_EVENT_TABLE_ID, schemas[ST_EVENT_TABLE_IDX].clone()); + // Insert the sequences into `st_sequences` let (st_sequences, blob_store, pool) = self.get_table_and_blob_store_or_create(ST_SEQUENCE_ID, &schemas[ST_SEQUENCE_IDX]); @@ -617,6 +620,12 @@ impl CommittedState { schema: &Arc, row: &ProductValue, ) -> Result<()> { + // Event table rows in the commitlog are preserved for future replay features + // but don't rebuild state — event tables have no committed state. + if schema.is_event { + return Ok(()); + } + let (table, blob_store, pool) = self.get_table_and_blob_store_or_create(table_id, schema); let (_, row_ref) = match table.insert(pool, blob_store, row) { @@ -1187,6 +1196,24 @@ impl CommittedState { // and the fullness of the page. for (table_id, tx_table) in insert_tables { + // Event tables: record in TxData for commitlog persistence and subscription dispatch, + // but do NOT merge into committed state. + // NOTE: There is no need to call `get_table_and_blob_store_or_create` here. + // The logic for collecting inserts is duplicated, but it's cleaner this way. + if tx_table.get_schema().is_event { + let mut inserts = Vec::with_capacity(tx_table.row_count as usize); + for row_ref in tx_table.scan_rows(&tx_blob_store) { + inserts.push(row_ref.to_product_value()); + } + if !inserts.is_empty() { + let table_name = &tx_table.get_schema().table_name; + tx_data.set_inserts_for_table(table_id, table_name, inserts.into()); + } + let (_schema, _indexes, pages) = tx_table.consume_for_merge(); + self.page_pool.put_many(pages); + continue; + } + let (commit_table, commit_blob_store, page_pool) = self.get_table_and_blob_store_or_create(table_id, tx_table.get_schema()); diff --git a/crates/datastore/src/locking_tx_datastore/datastore.rs b/crates/datastore/src/locking_tx_datastore/datastore.rs index 48361dbf933..ec63dbcce0e 100644 --- a/crates/datastore/src/locking_tx_datastore/datastore.rs +++ b/crates/datastore/src/locking_tx_datastore/datastore.rs @@ -1278,12 +1278,13 @@ mod tests { use crate::locking_tx_datastore::tx_state::PendingSchemaChange; use crate::system_tables::{ system_tables, StColumnRow, StConnectionCredentialsFields, StConstraintData, StConstraintFields, - StConstraintRow, StIndexAlgorithm, StIndexFields, StIndexRow, StRowLevelSecurityFields, StScheduledFields, - StSequenceFields, StSequenceRow, StTableRow, StVarFields, StViewArgFields, StViewFields, ST_CLIENT_ID, - ST_CLIENT_NAME, ST_COLUMN_ID, ST_COLUMN_NAME, ST_CONNECTION_CREDENTIALS_ID, ST_CONNECTION_CREDENTIALS_NAME, - ST_CONSTRAINT_ID, ST_CONSTRAINT_NAME, ST_INDEX_ID, ST_INDEX_NAME, ST_MODULE_NAME, ST_RESERVED_SEQUENCE_RANGE, - ST_ROW_LEVEL_SECURITY_ID, ST_ROW_LEVEL_SECURITY_NAME, ST_SCHEDULED_ID, ST_SCHEDULED_NAME, ST_SEQUENCE_ID, - ST_SEQUENCE_NAME, ST_TABLE_NAME, ST_VAR_ID, ST_VAR_NAME, ST_VIEW_ARG_ID, ST_VIEW_ARG_NAME, ST_VIEW_COLUMN_ID, + StConstraintRow, StEventTableFields, StIndexAlgorithm, StIndexFields, StIndexRow, StRowLevelSecurityFields, + StScheduledFields, StSequenceFields, StSequenceRow, StTableRow, StVarFields, StViewArgFields, StViewFields, + ST_CLIENT_ID, ST_CLIENT_NAME, ST_COLUMN_ID, ST_COLUMN_NAME, ST_CONNECTION_CREDENTIALS_ID, + ST_CONNECTION_CREDENTIALS_NAME, ST_CONSTRAINT_ID, ST_CONSTRAINT_NAME, ST_EVENT_TABLE_ID, ST_EVENT_TABLE_NAME, + ST_INDEX_ID, ST_INDEX_NAME, ST_MODULE_NAME, ST_RESERVED_SEQUENCE_RANGE, ST_ROW_LEVEL_SECURITY_ID, + ST_ROW_LEVEL_SECURITY_NAME, ST_SCHEDULED_ID, ST_SCHEDULED_NAME, ST_SEQUENCE_ID, ST_SEQUENCE_NAME, + ST_TABLE_NAME, ST_VAR_ID, ST_VAR_NAME, ST_VIEW_ARG_ID, ST_VIEW_ARG_NAME, ST_VIEW_COLUMN_ID, ST_VIEW_COLUMN_NAME, ST_VIEW_ID, ST_VIEW_NAME, ST_VIEW_PARAM_ID, ST_VIEW_PARAM_NAME, ST_VIEW_SUB_ID, ST_VIEW_SUB_NAME, }; @@ -1614,6 +1615,7 @@ mod tests { StAccess::Public, schedule, pk, + false, ) } @@ -1746,7 +1748,7 @@ mod tests { TableRow { id: ST_VIEW_COLUMN_ID.into(), name: ST_VIEW_COLUMN_NAME, ty: StTableType::System, access: StAccess::Public, primary_key: None }, TableRow { id: ST_VIEW_SUB_ID.into(), name: ST_VIEW_SUB_NAME, ty: StTableType::System, access: StAccess::Public, primary_key: None }, TableRow { id: ST_VIEW_ARG_ID.into(), name: ST_VIEW_ARG_NAME, ty: StTableType::System, access: StAccess::Public, primary_key: Some(StViewArgFields::Id.into()) }, - + TableRow { id: ST_EVENT_TABLE_ID.into(), name: ST_EVENT_TABLE_NAME, ty: StTableType::System, access: StAccess::Public, primary_key: Some(StEventTableFields::TableId.into()) }, ])); #[rustfmt::skip] @@ -1832,6 +1834,8 @@ mod tests { ColRow { table: ST_VIEW_ARG_ID.into(), pos: 0, name: "id", ty: AlgebraicType::U64 }, ColRow { table: ST_VIEW_ARG_ID.into(), pos: 1, name: "bytes", ty: AlgebraicType::bytes() }, + + ColRow { table: ST_EVENT_TABLE_ID.into(), pos: 0, name: "table_id", ty: TableId::get_type() }, ])); #[rustfmt::skip] assert_eq!(query.scan_st_indexes()?, map_array([ @@ -1857,6 +1861,7 @@ mod tests { IndexRow { id: 20, table: ST_VIEW_SUB_ID.into(), col: col_list![0, 1, 2], name: "st_view_sub_view_id_arg_id_identity_idx_btree", }, IndexRow { id: 21, table: ST_VIEW_ARG_ID.into(), col: col(0), name: "st_view_arg_id_idx_btree", }, IndexRow { id: 22, table: ST_VIEW_ARG_ID.into(), col: col(1), name: "st_view_arg_bytes_idx_btree", }, + IndexRow { id: 23, table: ST_EVENT_TABLE_ID.into(), col: col(0), name: "st_event_table_table_id_idx_btree", }, ])); let start = ST_RESERVED_SEQUENCE_RANGE as i128 + 1; #[rustfmt::skip] @@ -1895,6 +1900,7 @@ mod tests { ConstraintRow { constraint_id: 16, table_id: ST_VIEW_COLUMN_ID.into(), unique_columns: col_list![0, 1], constraint_name: "st_view_column_view_id_col_pos_key", }, ConstraintRow { constraint_id: 17, table_id: ST_VIEW_ARG_ID.into(), unique_columns: col(0), constraint_name: "st_view_arg_id_key", }, ConstraintRow { constraint_id: 18, table_id: ST_VIEW_ARG_ID.into(), unique_columns: col(1), constraint_name: "st_view_arg_bytes_key", }, + ConstraintRow { constraint_id: 19, table_id: ST_EVENT_TABLE_ID.into(), unique_columns: col(0), constraint_name: "st_event_table_table_id_key", }, ])); // Verify we get back the tables correctly with the proper ids... @@ -2320,6 +2326,7 @@ mod tests { IndexRow { id: 20, table: ST_VIEW_SUB_ID.into(), col: col_list![0, 1, 2], name: "st_view_sub_view_id_arg_id_identity_idx_btree", }, IndexRow { id: 21, table: ST_VIEW_ARG_ID.into(), col: col(0), name: "st_view_arg_id_idx_btree", }, IndexRow { id: 22, table: ST_VIEW_ARG_ID.into(), col: col(1), name: "st_view_arg_bytes_idx_btree", }, + IndexRow { id: 23, table: ST_EVENT_TABLE_ID.into(), col: col(0), name: "st_event_table_table_id_idx_btree", }, IndexRow { id: seq_start, table: FIRST_NON_SYSTEM_ID, col: col(0), name: "Foo_id_idx_btree", }, IndexRow { id: seq_start + 1, table: FIRST_NON_SYSTEM_ID, col: col(1), name: "Foo_name_idx_btree", }, IndexRow { id: seq_start + 2, table: FIRST_NON_SYSTEM_ID, col: col(2), name: "Foo_age_idx_btree", }, @@ -3705,4 +3712,124 @@ mod tests { assert!(!metrics.committed); Ok(()) } + + /// Create an event table with the basic schema (id: u32, name: String, age: u32). + fn setup_event_table() -> ResultTest<(Locking, MutTxId, TableId)> { + let datastore = get_datastore()?; + let mut tx = begin_mut_tx(&datastore); + let mut schema = basic_table_schema_with_indices(basic_indices(), basic_constraints()); + schema.is_event = true; + let table_id = datastore.create_table_mut_tx(&mut tx, schema)?; + Ok((datastore, tx, table_id)) + } + + #[test] + fn test_event_table_insert_delete_noop() -> ResultTest<()> { + let (datastore, tx, table_id) = setup_event_table()?; + // Commit the table-creation tx first. + commit(&datastore, tx)?; + + let mut tx = begin_mut_tx(&datastore); + let row = u32_str_u32(1, "Alice", 30); + insert(&datastore, &mut tx, table_id, &row)?; + datastore.delete_by_rel_mut_tx(&mut tx, table_id, [row]); + + let tx_data = commit(&datastore, tx)?; + + // Insert+delete in same tx should cancel out: no TxData entry for this table. + assert!( + tx_data.inserts_for_table(table_id).is_none(), + "insert+delete should cancel: no inserts in TxData" + ); + assert!( + tx_data.deletes_for_table(table_id).is_none(), + "insert+delete should cancel: no deletes in TxData" + ); + + // Committed state should be empty for event tables. + let tx = begin_mut_tx(&datastore); + assert_eq!(all_rows(&datastore, &tx, table_id).len(), 0); + Ok(()) + } + + #[test] + fn test_event_table_update_only_final_row() -> ResultTest<()> { + let (datastore, tx, table_id) = setup_event_table()?; + // Commit the table-creation tx first. + commit(&datastore, tx)?; + + let mut tx = begin_mut_tx(&datastore); + let row_v1 = u32_str_u32(1, "Alice", 30); + insert(&datastore, &mut tx, table_id, &row_v1)?; + + // Update via the index on column 0 (id) — replaces the row with same PK. + let idx = extract_index_id(&datastore, &tx, &basic_indices()[0])?; + let row_v2 = u32_str_u32(1, "Alice", 31); + update(&datastore, &mut tx, table_id, idx, &row_v2)?; + + let tx_data = commit(&datastore, tx)?; + + // The update replaces the original insert, so TxData should have only the final row. + let inserts = tx_data.inserts_for_table(table_id).expect("should have inserts"); + assert_eq!(inserts.len(), 1, "update should leave exactly 1 insert"); + assert_eq!(inserts[0], row_v2); + + // Committed state should still be empty for event tables. + let tx = begin_mut_tx(&datastore); + assert_eq!(all_rows(&datastore, &tx, table_id).len(), 0); + Ok(()) + } + + #[test] + fn test_event_table_insert_records_txdata_not_committed_state() -> ResultTest<()> { + let (datastore, tx, table_id) = setup_event_table()?; + // Commit the table-creation tx first. + commit(&datastore, tx)?; + + let mut tx = begin_mut_tx(&datastore); + let row = u32_str_u32(1, "Bob", 25); + insert(&datastore, &mut tx, table_id, &row)?; + + let tx_data = commit(&datastore, tx)?; + + // TxData should record the insert. + let inserts = tx_data + .inserts_for_table(table_id) + .expect("event table insert should appear in TxData"); + assert_eq!(inserts.len(), 1); + assert_eq!(inserts[0], row); + + // But committed state should be empty. + let tx = begin_mut_tx(&datastore); + assert_eq!(all_rows(&datastore, &tx, table_id).len(), 0); + Ok(()) + } + + #[test] + fn test_event_table_replay_ignores_inserts() -> ResultTest<()> { + let (datastore, tx, table_id) = setup_event_table()?; + // Commit the table-creation tx so the schema exists. + commit(&datastore, tx)?; + + // Get the schema for this event table. + let tx = begin_mut_tx(&datastore); + let schema = datastore.schema_for_table_mut_tx(&tx, table_id)?; + let _ = datastore.rollback_mut_tx(tx); + + // Directly call replay_insert on committed state. + let row = u32_str_u32(1, "Carol", 40); + { + let mut committed_state = datastore.committed_state.write(); + committed_state.replay_insert(table_id, &schema, &row)?; + } + + // After replay, the event table should still have no committed rows. + let tx = begin_mut_tx(&datastore); + assert_eq!( + all_rows(&datastore, &tx, table_id).len(), + 0, + "replay_insert should be a no-op for event tables" + ); + Ok(()) + } } diff --git a/crates/datastore/src/locking_tx_datastore/mut_tx.rs b/crates/datastore/src/locking_tx_datastore/mut_tx.rs index 71bf3bda615..101a5feb753 100644 --- a/crates/datastore/src/locking_tx_datastore/mut_tx.rs +++ b/crates/datastore/src/locking_tx_datastore/mut_tx.rs @@ -21,10 +21,10 @@ use crate::{ error::{IndexError, SequenceError, TableError}, system_tables::{ with_sys_table_buf, StClientFields, StClientRow, StColumnFields, StColumnRow, StConstraintFields, - StConstraintRow, StFields as _, StIndexFields, StIndexRow, StRowLevelSecurityFields, StRowLevelSecurityRow, - StScheduledFields, StScheduledRow, StSequenceFields, StSequenceRow, StTableFields, StTableRow, SystemTable, - ST_CLIENT_ID, ST_COLUMN_ID, ST_CONSTRAINT_ID, ST_INDEX_ID, ST_ROW_LEVEL_SECURITY_ID, ST_SCHEDULED_ID, - ST_SEQUENCE_ID, ST_TABLE_ID, + StConstraintRow, StEventTableRow, StFields as _, StIndexFields, StIndexRow, StRowLevelSecurityFields, + StRowLevelSecurityRow, StScheduledFields, StScheduledRow, StSequenceFields, StSequenceRow, StTableFields, + StTableRow, SystemTable, ST_CLIENT_ID, ST_COLUMN_ID, ST_CONSTRAINT_ID, ST_EVENT_TABLE_ID, ST_INDEX_ID, + ST_ROW_LEVEL_SECURITY_ID, ST_SCHEDULED_ID, ST_SEQUENCE_ID, ST_TABLE_ID, }, }; use crate::{execution_context::ExecutionContext, system_tables::StViewColumnRow}; @@ -656,6 +656,7 @@ impl MutTxId { self.insert_st_column(table_schema.columns())?; let schedule = table_schema.schedule.clone(); + let is_event = table_schema.is_event; let mut schema_internal = table_schema; // Extract all indexes, constraints, and sequences from the schema. // We will add them back later with correct ids. @@ -685,6 +686,12 @@ impl MutTxId { table.with_mut_schema(|s| s.schedule.as_mut().unwrap().schedule_id = id); } + // Insert into st_event_table if this is an event table. + if is_event { + let row = StEventTableRow { table_id }; + self.insert_via_serialize_bsatn(ST_EVENT_TABLE_ID, &row)?; + } + // Create the indexes for the table. for index in indices { let col_set = ColSet::from(index.index_algorithm.columns()); diff --git a/crates/datastore/src/locking_tx_datastore/state_view.rs b/crates/datastore/src/locking_tx_datastore/state_view.rs index 1c7401c9e70..59b3e27fe55 100644 --- a/crates/datastore/src/locking_tx_datastore/state_view.rs +++ b/crates/datastore/src/locking_tx_datastore/state_view.rs @@ -4,10 +4,10 @@ use crate::error::{DatastoreError, TableError}; use crate::locking_tx_datastore::mut_tx::{IndexScanPoint, IndexScanRanged}; use crate::system_tables::{ ConnectionIdViaU128, StColumnFields, StColumnRow, StConnectionCredentialsFields, StConnectionCredentialsRow, - StConstraintFields, StConstraintRow, StIndexFields, StIndexRow, StScheduledFields, StScheduledRow, - StSequenceFields, StSequenceRow, StTableFields, StTableRow, StViewFields, StViewParamFields, StViewRow, - SystemTable, ST_COLUMN_ID, ST_CONNECTION_CREDENTIALS_ID, ST_CONSTRAINT_ID, ST_INDEX_ID, ST_SCHEDULED_ID, - ST_SEQUENCE_ID, ST_TABLE_ID, ST_VIEW_ID, ST_VIEW_PARAM_ID, + StConstraintFields, StConstraintRow, StEventTableFields, StIndexFields, StIndexRow, StScheduledFields, + StScheduledRow, StSequenceFields, StSequenceRow, StTableFields, StTableRow, StViewFields, StViewParamFields, + StViewRow, SystemTable, ST_COLUMN_ID, ST_CONNECTION_CREDENTIALS_ID, ST_CONSTRAINT_ID, ST_EVENT_TABLE_ID, + ST_INDEX_ID, ST_SCHEDULED_ID, ST_SEQUENCE_ID, ST_TABLE_ID, ST_VIEW_ID, ST_VIEW_PARAM_ID, }; use anyhow::anyhow; use core::ops::RangeBounds; @@ -159,6 +159,12 @@ pub trait StateView { .unwrap_or(None) .transpose()?; + // Check if this table is an event table by looking up st_event_table. + let is_event = self + .iter_by_col_eq(ST_EVENT_TABLE_ID, StEventTableFields::TableId, value_eq) + .map(|mut iter| iter.next().is_some()) + .unwrap_or(false); + Ok(TableSchema::new( table_id, table_name, @@ -171,6 +177,7 @@ pub trait StateView { table_access, schedule, table_primary_key, + is_event, )) } diff --git a/crates/datastore/src/system_tables.rs b/crates/datastore/src/system_tables.rs index 947e8bcb0d0..b007d4142da 100644 --- a/crates/datastore/src/system_tables.rs +++ b/crates/datastore/src/system_tables.rs @@ -80,6 +80,8 @@ pub const ST_VIEW_COLUMN_ID: TableId = TableId(14); pub const ST_VIEW_SUB_ID: TableId = TableId(15); /// The static ID of the table that tracks view arguments pub const ST_VIEW_ARG_ID: TableId = TableId(16); +/// The static ID of the table that tracks which tables are event tables +pub const ST_EVENT_TABLE_ID: TableId = TableId(17); pub(crate) const ST_CONNECTION_CREDENTIALS_NAME: &str = "st_connection_credentials"; pub const ST_TABLE_NAME: &str = "st_table"; @@ -97,6 +99,7 @@ pub(crate) const ST_VIEW_PARAM_NAME: &str = "st_view_param"; pub(crate) const ST_VIEW_COLUMN_NAME: &str = "st_view_column"; pub(crate) const ST_VIEW_SUB_NAME: &str = "st_view_sub"; pub(crate) const ST_VIEW_ARG_NAME: &str = "st_view_arg"; +pub(crate) const ST_EVENT_TABLE_NAME: &str = "st_event_table"; /// Reserved range of sequence values used for system tables. /// /// Ids for user-created tables will start at `ST_RESERVED_SEQUENCE_RANGE`. @@ -166,6 +169,10 @@ pub fn is_built_in_meta_row(table_id: TableId, row: &ProductValue) -> Result false, // We don't define any system views, so none of the view-related tables can be system meta-descriptors. ST_VIEW_ID | ST_VIEW_PARAM_ID | ST_VIEW_COLUMN_ID | ST_VIEW_SUB_ID | ST_VIEW_ARG_ID => false, + ST_EVENT_TABLE_ID => { + let row: StEventTableRow = to_typed_row(row)?; + table_id_is_reserved(row.table_id) + } TableId(..ST_RESERVED_SEQUENCE_RANGE) => { log::warn!("Unknown system table {table_id:?}"); false @@ -187,7 +194,7 @@ pub enum SystemTable { st_row_level_security, } -pub fn system_tables() -> [TableSchema; 16] { +pub fn system_tables() -> [TableSchema; 17] { [ // The order should match the `id` of the system table, that start with [ST_TABLE_IDX]. st_table_schema(), @@ -206,6 +213,7 @@ pub fn system_tables() -> [TableSchema; 16] { st_view_column_schema(), st_view_sub_schema(), st_view_arg_schema(), + st_event_table_schema(), ] } @@ -250,6 +258,7 @@ pub(crate) const ST_VIEW_PARAM_IDX: usize = 12; pub(crate) const ST_VIEW_COLUMN_IDX: usize = 13; pub(crate) const ST_VIEW_SUB_IDX: usize = 14; pub(crate) const ST_VIEW_ARG_IDX: usize = 15; +pub(crate) const ST_EVENT_TABLE_IDX: usize = 16; macro_rules! st_fields_enum { ($(#[$attr:meta])* enum $ty_name:ident { $($name:expr, $var:ident = $discr:expr,)* }) => { @@ -404,6 +413,10 @@ st_fields_enum!(enum StScheduledFields { "at_column", AtColumn = 4, }); +st_fields_enum!(enum StEventTableFields { + "table_id", TableId = 0, +}); + /// Helper method to check that a system table has the correct fields. /// Does not check field types since those aren't included in `StFields` types. /// If anything in here is not true, the system is completely broken, so it's fine to assert. @@ -567,6 +580,17 @@ fn system_module_def() -> ModuleDef { .with_index_no_accessor_name(btree(StVarFields::Name)) .with_primary_key(StVarFields::Name); + let st_event_table_type = builder.add_type::(); + builder + .build_table( + ST_EVENT_TABLE_NAME, + *st_event_table_type.as_ref().expect("should be ref"), + ) + .with_type(TableType::System) + .with_primary_key(StEventTableFields::TableId) + .with_unique_constraint(StEventTableFields::TableId) + .with_index_no_accessor_name(btree(StEventTableFields::TableId)); + let result = builder .finish() .try_into() @@ -588,6 +612,7 @@ fn system_module_def() -> ModuleDef { validate_system_table::(&result, ST_VIEW_COLUMN_NAME); validate_system_table::(&result, ST_VIEW_SUB_NAME); validate_system_table::(&result, ST_VIEW_ARG_NAME); + validate_system_table::(&result, ST_EVENT_TABLE_NAME); result } @@ -629,6 +654,7 @@ lazy_static::lazy_static! { m.insert("st_view_column_view_id_col_pos_key", ConstraintId(16)); m.insert("st_view_arg_id_key", ConstraintId(17)); m.insert("st_view_arg_bytes_key", ConstraintId(18)); + m.insert("st_event_table_table_id_key", ConstraintId(19)); m }; } @@ -660,6 +686,7 @@ lazy_static::lazy_static! { m.insert("st_view_sub_view_id_arg_id_identity_idx_btree", IndexId(20)); m.insert("st_view_arg_id_idx_btree", IndexId(21)); m.insert("st_view_arg_bytes_idx_btree", IndexId(22)); + m.insert("st_event_table_table_id_idx_btree", IndexId(23)); m }; } @@ -796,6 +823,10 @@ pub fn st_view_arg_schema() -> TableSchema { st_schema(ST_VIEW_ARG_NAME, ST_VIEW_ARG_ID) } +fn st_event_table_schema() -> TableSchema { + st_schema(ST_EVENT_TABLE_NAME, ST_EVENT_TABLE_ID) +} + /// If `table_id` refers to a known system table, return its schema. /// /// Used when restoring from a snapshot; system tables are reinstantiated with this schema, @@ -820,6 +851,7 @@ pub(crate) fn system_table_schema(table_id: TableId) -> Option { ST_VIEW_COLUMN_ID => Some(st_view_column_schema()), ST_VIEW_SUB_ID => Some(st_view_sub_schema()), ST_VIEW_ARG_ID => Some(st_view_arg_schema()), + ST_EVENT_TABLE_ID => Some(st_event_table_schema()), _ => None, } } @@ -1636,6 +1668,33 @@ impl From for ScheduleSchema { } } +/// System Table [ST_EVENT_TABLE_NAME] +/// +/// Tracks which tables are event tables. +/// Event tables persist to commitlog but are not merged into committed state. +/// +/// | table_id | +/// |----------| +/// | 4097 | +#[derive(Debug, Clone, PartialEq, Eq, SpacetimeType)] +#[sats(crate = spacetimedb_lib)] +pub struct StEventTableRow { + pub(crate) table_id: TableId, +} + +impl TryFrom> for StEventTableRow { + type Error = DatastoreError; + fn try_from(row: RowRef<'_>) -> Result { + read_via_bsatn(row) + } +} + +impl From for ProductValue { + fn from(x: StEventTableRow) -> Self { + to_product_value(&x) + } +} + thread_local! { static READ_BUF: RefCell> = const { RefCell::new(Vec::new()) }; } diff --git a/crates/lib/src/db/raw_def/v10.rs b/crates/lib/src/db/raw_def/v10.rs index 7474b6bedfe..6c2daf72b97 100644 --- a/crates/lib/src/db/raw_def/v10.rs +++ b/crates/lib/src/db/raw_def/v10.rs @@ -139,6 +139,13 @@ pub struct RawTableDefV10 { /// Default values for columns in this table. pub default_values: Vec, + + /// Whether this is an event table. + /// + /// Event tables are write-only: their rows are persisted to the commitlog + /// but are NOT merged into committed state. They are only visible to V2 + /// subscribers in the transaction that inserted them. + pub is_event: bool, } /// Marks a particular table column as having a particular default value. @@ -687,6 +694,7 @@ impl RawModuleDefV10Builder { table_type: TableType::User, table_access: TableAccess::Public, default_values: vec![], + is_event: false, }, } } @@ -1044,6 +1052,12 @@ impl RawTableDefBuilderV10<'_> { self } + /// Sets whether this table is an event table. + pub fn with_event(mut self, is_event: bool) -> Self { + self.table.is_event = is_event; + self + } + /// Generates a `RawConstraintDefV10` using the supplied `columns`. pub fn with_unique_constraint(mut self, columns: impl Into) -> Self { let columns = columns.into(); diff --git a/crates/physical-plan/src/plan.rs b/crates/physical-plan/src/plan.rs index b1572040f72..92593e293f8 100644 --- a/crates/physical-plan/src/plan.rs +++ b/crates/physical-plan/src/plan.rs @@ -1510,6 +1510,7 @@ mod tests { StAccess::Public, None, primary_key.map(ColId::from), + false, ))) } diff --git a/crates/query-builder/src/join.rs b/crates/query-builder/src/join.rs index 3240d93c32a..82c58ab245f 100644 --- a/crates/query-builder/src/join.rs +++ b/crates/query-builder/src/join.rs @@ -2,7 +2,7 @@ use crate::TableNameStr; use super::{ expr::{format_expr, BoolExpr}, - table::{ColumnRef, HasCols, HasIxCols, Table}, + table::{CanBeLookupTable, ColumnRef, HasCols, HasIxCols, Table}, Query, }; use std::marker::PhantomData; @@ -66,7 +66,7 @@ pub struct RightSemiJoin { } impl Table { - pub fn left_semijoin( + pub fn left_semijoin( self, right: Table, on: impl Fn(&L::IxCols, &R::IxCols) -> IxJoinEq, @@ -80,7 +80,7 @@ impl Table { } } - pub fn right_semijoin( + pub fn right_semijoin( self, right: Table, on: impl Fn(&L::IxCols, &R::IxCols) -> IxJoinEq, @@ -97,7 +97,7 @@ impl Table { } impl super::FromWhere { - pub fn left_semijoin( + pub fn left_semijoin( self, right: Table, on: impl Fn(&L::IxCols, &R::IxCols) -> IxJoinEq, @@ -111,7 +111,7 @@ impl super::FromWhere { } } - pub fn right_semijoin( + pub fn right_semijoin( self, right: Table, on: impl Fn(&L::IxCols, &R::IxCols) -> IxJoinEq, diff --git a/crates/query-builder/src/table.rs b/crates/query-builder/src/table.rs index b08098a51b3..254510e96b3 100644 --- a/crates/query-builder/src/table.rs +++ b/crates/query-builder/src/table.rs @@ -16,6 +16,11 @@ pub trait HasIxCols { fn ix_cols(name: TableNameStr) -> Self::IxCols; } +/// Marker trait for tables that can appear as the right/inner/lookup +/// table in a semi-join. Event tables do NOT implement this trait, +/// preventing them from being used as the lookup side of a join. +pub trait CanBeLookupTable: HasIxCols {} + pub struct Table { pub(super) table_name: TableNameStr, _marker: PhantomData, diff --git a/crates/schema/src/auto_migrate.rs b/crates/schema/src/auto_migrate.rs index 446c91638b7..cd326995b92 100644 --- a/crates/schema/src/auto_migrate.rs +++ b/crates/schema/src/auto_migrate.rs @@ -416,6 +416,9 @@ pub enum AutoMigrateError { type2: TableType, }, + #[error("Changing the event flag of table {table} requires a manual migration")] + ChangeTableEventFlag { table: Identifier }, + #[error( "Changing the accessor name on index {index} from {old_accessor:?} to {new_accessor:?} requires a manual migration" )] @@ -650,6 +653,14 @@ fn auto_migrate_table<'def>(plan: &mut AutoMigratePlan<'def>, old: &'def TableDe } .into()) }; + let event_ok: Result<()> = if old.is_event == new.is_event { + Ok(()) + } else { + Err(AutoMigrateError::ChangeTableEventFlag { + table: old.name.clone(), + } + .into()) + }; if old.table_access != new.table_access { plan.steps.push(AutoMigrateStep::ChangeAccess(key)); } @@ -725,7 +736,8 @@ fn auto_migrate_table<'def>(plan: &mut AutoMigratePlan<'def>, old: &'def TableDe }) .collect_all_errors::>(); - let ((), ProductMonoid(Any(row_type_changed), Any(columns_added))) = (type_ok, columns_ok).combine_errors()?; + let ((), (), ProductMonoid(Any(row_type_changed), Any(columns_added))) = + (type_ok, event_ok, columns_ok).combine_errors()?; // If we're adding a column, we'll rewrite the whole table. // That makes any `ChangeColumns` moot, so we can skip it. @@ -2330,4 +2342,61 @@ mod tests { let plan = ponder_auto_migrate(&old_def, &new_def).expect("auto migration should succeed"); assert!(!plan.disconnects_all_users(), "{plan:#?}"); } + + fn create_v10_module_def(build_module: impl Fn(&mut v10::RawModuleDefV10Builder)) -> ModuleDef { + let mut builder = v10::RawModuleDefV10Builder::new(); + build_module(&mut builder); + builder + .finish() + .try_into() + .expect("should be a valid module definition") + } + + #[test] + fn test_change_event_flag_rejected() { + // non-event → event + let old = create_v10_module_def(|builder| { + builder + .build_table_with_new_type("Events", ProductType::from([("id", AlgebraicType::U64)]), true) + .finish(); + }); + let new = create_v10_module_def(|builder| { + builder + .build_table_with_new_type("Events", ProductType::from([("id", AlgebraicType::U64)]), true) + .with_event(true) + .finish(); + }); + + let result = ponder_auto_migrate(&old, &new); + expect_error_matching!( + result, + AutoMigrateError::ChangeTableEventFlag { table } => &table[..] == "Events" + ); + + // event → non-event (reverse direction) + let result = ponder_auto_migrate(&new, &old); + expect_error_matching!( + result, + AutoMigrateError::ChangeTableEventFlag { table } => &table[..] == "Events" + ); + } + + #[test] + fn test_same_event_flag_accepted() { + // Both event → no error + let old = create_v10_module_def(|builder| { + builder + .build_table_with_new_type("Events", ProductType::from([("id", AlgebraicType::U64)]), true) + .with_event(true) + .finish(); + }); + let new = create_v10_module_def(|builder| { + builder + .build_table_with_new_type("Events", ProductType::from([("id", AlgebraicType::U64)]), true) + .with_event(true) + .finish(); + }); + + ponder_auto_migrate(&old, &new).expect("same event flag should succeed"); + } } diff --git a/crates/schema/src/def.rs b/crates/schema/src/def.rs index 5cc0aaa27d6..1751610993b 100644 --- a/crates/schema/src/def.rs +++ b/crates/schema/src/def.rs @@ -31,6 +31,11 @@ use itertools::Itertools; use spacetimedb_data_structures::error_stream::{CollectAllErrors, CombineErrors, ErrorStream}; use spacetimedb_data_structures::map::{Equivalent, HashMap}; use spacetimedb_lib::db::raw_def; +use spacetimedb_lib::db::raw_def::v10::{ + RawConstraintDefV10, RawIndexDefV10, RawLifeCycleReducerDefV10, RawModuleDefV10, RawModuleDefV10Section, + RawProcedureDefV10, RawReducerDefV10, RawRowLevelSecurityDefV10, RawScheduleDefV10, RawScopedTypeNameV10, + RawSequenceDefV10, RawTableDefV10, RawTypeDefV10, RawViewDefV10, +}; use spacetimedb_lib::db::raw_def::v9::{ Lifecycle, RawColumnDefaultValueV9, RawConstraintDataV9, RawConstraintDefV9, RawIndexAlgorithm, RawIndexDefV9, RawMiscModuleExportV9, RawModuleDefV9, RawProcedureDefV9, RawReducerDefV9, RawRowLevelSecurityDefV9, @@ -468,6 +473,97 @@ impl TryFrom for ModuleDef { } } +impl From for RawModuleDefV10 { + fn from(val: ModuleDef) -> Self { + let ModuleDef { + tables, + views, + reducers, + lifecycle_reducers, + types, + typespace, + stored_in_table_def: _, + typespace_for_generate: _, + refmap: _, + row_level_security_raw, + procedures, + raw_module_def_version: _, + } = val; + + let mut sections = Vec::new(); + + sections.push(RawModuleDefV10Section::Typespace(typespace)); + + // Extract lifecycle reducer names before consuming reducers. + let raw_lifecycle: Vec = lifecycle_reducers + .into_iter() + .filter_map(|(lifecycle, reducer_id)| { + let id = reducer_id?; + let (name, _) = reducers.get_index(id.idx())?; + Some(RawLifeCycleReducerDefV10 { + lifecycle_spec: lifecycle, + function_name: name.clone().into(), + }) + }) + .collect(); + + let raw_types: Vec = types.into_values().map(Into::into).collect(); + if !raw_types.is_empty() { + sections.push(RawModuleDefV10Section::Types(raw_types)); + } + + // Collect schedules from tables (V10 stores them in a separate section). + let mut schedules = Vec::new(); + let raw_tables: Vec = tables + .into_values() + .map(|td| { + if let Some(sched) = td.schedule.clone() { + schedules.push(RawScheduleDefV10 { + source_name: Some(sched.name.into()), + table_name: td.name.clone().into(), + schedule_at_col: sched.at_column, + function_name: sched.function_name.into(), + }); + } + td.into() + }) + .collect(); + if !raw_tables.is_empty() { + sections.push(RawModuleDefV10Section::Tables(raw_tables)); + } + + let raw_reducers: Vec = reducers.into_values().map(Into::into).collect(); + if !raw_reducers.is_empty() { + sections.push(RawModuleDefV10Section::Reducers(raw_reducers)); + } + + let raw_procedures: Vec = procedures.into_values().map(Into::into).collect(); + if !raw_procedures.is_empty() { + sections.push(RawModuleDefV10Section::Procedures(raw_procedures)); + } + + let raw_views: Vec = views.into_values().map(Into::into).collect(); + if !raw_views.is_empty() { + sections.push(RawModuleDefV10Section::Views(raw_views)); + } + + if !schedules.is_empty() { + sections.push(RawModuleDefV10Section::Schedules(schedules)); + } + + if !raw_lifecycle.is_empty() { + sections.push(RawModuleDefV10Section::LifeCycleReducers(raw_lifecycle)); + } + + let raw_rls: Vec = row_level_security_raw.into_values().collect(); + if !raw_rls.is_empty() { + sections.push(RawModuleDefV10Section::RowLevelSecurity(raw_rls)); + } + + RawModuleDefV10 { sections } + } +} + /// Implemented by definitions stored in a `ModuleDef`. /// Allows looking definitions up in a `ModuleDef`, and across /// `ModuleDef`s during migrations. @@ -540,6 +636,12 @@ pub struct TableDef { /// Whether this table is public or private. pub table_access: TableAccess, + + /// Whether this is an event table. + /// + /// Event tables persist to the commitlog but are not merged into committed state. + /// Their rows are only visible to V2 subscribers in the transaction that inserted them. + pub is_event: bool, } impl TableDef { @@ -566,6 +668,7 @@ impl From for RawTableDefV9 { schedule, table_type, table_access, + is_event: _, // V9 does not support event tables; ignore when converting back } = val; RawTableDefV9 { @@ -582,6 +685,37 @@ impl From for RawTableDefV9 { } } +impl From for RawTableDefV10 { + fn from(val: TableDef) -> Self { + let TableDef { + name, + product_type_ref, + primary_key, + columns: _, // will be reconstructed from the product type. + indexes, + constraints, + sequences, + schedule: _, // V10 stores schedules in a separate section; handled in From. + table_type, + table_access, + is_event, + } = val; + + RawTableDefV10 { + source_name: name.into(), + product_type_ref, + primary_key: ColList::from_iter(primary_key), + indexes: indexes.into_values().map(Into::into).collect(), + constraints: constraints.into_values().map(Into::into).collect(), + sequences: sequences.into_values().map(Into::into).collect(), + table_type, + table_access, + default_values: Vec::new(), + is_event, + } + } +} + impl From for TableDef { fn from(def: ViewDef) -> Self { use TableAccess::*; @@ -603,6 +737,7 @@ impl From for TableDef { schedule: None, table_type: TableType::User, table_access: if is_public { Public } else { Private }, + is_event: false, } } } @@ -649,6 +784,19 @@ impl From for RawSequenceDefV9 { } } +impl From for RawSequenceDefV10 { + fn from(val: SequenceDef) -> Self { + RawSequenceDefV10 { + source_name: Some(val.name), + column: val.column, + start: val.start, + min_value: val.min_value, + max_value: val.max_value, + increment: val.increment, + } + } +} + /// A struct representing the validated definition of a database index. /// /// Cannot be created directly. Construct a [`ModuleDef`] by validating a [`RawModuleDef`] instead, @@ -696,6 +844,16 @@ impl From for RawIndexDefV9 { } } +impl From for RawIndexDefV10 { + fn from(val: IndexDef) -> Self { + RawIndexDefV10 { + source_name: Some(val.name), + accessor_name: val.accessor_name.map(Into::into), + algorithm: val.algorithm.into(), + } + } +} + /// Data specifying a supported index algorithm. #[non_exhaustive] #[derive(Debug, Clone, Eq, PartialEq)] @@ -956,6 +1114,15 @@ impl From for RawConstraintDefV9 { } } +impl From for RawConstraintDefV10 { + fn from(val: ConstraintDef) -> Self { + RawConstraintDefV10 { + source_name: Some(val.name), + data: val.data.into(), + } + } +} + /// Data for a constraint attached to a table. #[derive(Debug, Clone, Eq, PartialEq)] #[non_exhaustive] @@ -1113,6 +1280,16 @@ impl From for RawTypeDefV9 { } } +impl From for RawTypeDefV10 { + fn from(val: TypeDef) -> Self { + RawTypeDefV10 { + source_name: val.name.into(), + ty: val.ty, + custom_ordering: val.custom_ordering, + } + } +} + /// A scoped type name, in the form `scope0::scope1::...::scopeN::name`. /// /// These are the names that will be used *in client code generation*, NOT the names used for types @@ -1209,6 +1386,15 @@ impl From for RawScopedTypeNameV9 { } } +impl From for RawScopedTypeNameV10 { + fn from(val: ScopedTypeName) -> Self { + RawScopedTypeNameV10 { + scope: val.scope.into_vec().into_iter().map(|id| id.into()).collect(), + source_name: val.name.into(), + } + } +} + /// A view exported by the module. #[derive(Debug, Clone, Eq, PartialEq)] #[non_exhaustive] @@ -1306,6 +1492,28 @@ impl From for RawViewDefV9 { } } +impl From for RawViewDefV10 { + fn from(val: ViewDef) -> Self { + let ViewDef { + name, + is_anonymous, + is_public, + params, + return_type, + fn_ptr, + .. + } = val; + RawViewDefV10 { + source_name: name.into(), + index: fn_ptr.into(), + is_public, + is_anonymous, + params, + return_type, + } + } +} + impl From for RawMiscModuleExportV9 { fn from(def: ViewDef) -> Self { Self::View(def.into()) @@ -1341,6 +1549,15 @@ impl From for FunctionVisibility { } } +impl From for RawFunctionVisibility { + fn from(val: FunctionVisibility) -> Self { + match val { + FunctionVisibility::Private => RawFunctionVisibility::Private, + FunctionVisibility::ClientCallable => RawFunctionVisibility::ClientCallable, + } + } +} + /// A reducer exported by the module. #[derive(Debug, Clone, Eq, PartialEq)] #[non_exhaustive] @@ -1381,6 +1598,18 @@ impl From for RawReducerDefV9 { } } +impl From for RawReducerDefV10 { + fn from(val: ReducerDef) -> Self { + RawReducerDefV10 { + source_name: val.name.into(), + params: val.params, + visibility: val.visibility.into(), + ok_return_type: val.ok_return_type, + err_return_type: val.err_return_type, + } + } +} + #[derive(Debug, Clone, Eq, PartialEq)] #[non_exhaustive] pub struct ProcedureDef { @@ -1425,6 +1654,17 @@ impl From for RawProcedureDefV9 { } } +impl From for RawProcedureDefV10 { + fn from(val: ProcedureDef) -> Self { + RawProcedureDefV10 { + source_name: val.name.into(), + params: val.params, + return_type: val.return_type, + visibility: val.visibility.into(), + } + } +} + impl From for RawMiscModuleExportV9 { fn from(def: ProcedureDef) -> Self { Self::Procedure(def.into()) diff --git a/crates/schema/src/def/validate/v10.rs b/crates/schema/src/def/validate/v10.rs index 1667bd63b1f..d3e487b28b4 100644 --- a/crates/schema/src/def/validate/v10.rs +++ b/crates/schema/src/def/validate/v10.rs @@ -221,6 +221,7 @@ impl<'a> ModuleValidatorV10<'a> { table_type, table_access, default_values, + is_event, } = table; let product_type: &ProductType = self @@ -360,6 +361,7 @@ impl<'a> ModuleValidatorV10<'a> { schedule: None, // V10 handles schedules separately table_type, table_access, + is_event, }) } diff --git a/crates/schema/src/def/validate/v9.rs b/crates/schema/src/def/validate/v9.rs index 459f0cc80d2..03c9e6f8338 100644 --- a/crates/schema/src/def/validate/v9.rs +++ b/crates/schema/src/def/validate/v9.rs @@ -317,6 +317,7 @@ impl ModuleValidatorV9<'_> { schedule, table_type, table_access, + is_event: false, // V9 does not support event tables }) } diff --git a/crates/schema/src/schema.rs b/crates/schema/src/schema.rs index 4fa2085844f..3e050bff6f1 100644 --- a/crates/schema/src/schema.rs +++ b/crates/schema/src/schema.rs @@ -188,6 +188,9 @@ pub struct TableSchema { /// The schedule for the table, if present. pub schedule: Option, + /// Whether this is an event table. + pub is_event: bool, + /// Cache for `row_type_for_table` in the data store. pub row_type: ProductType, } @@ -212,6 +215,7 @@ impl TableSchema { table_access: StAccess, schedule: Option, primary_key: Option, + is_event: bool, ) -> Self { Self { row_type: columns_to_row_type(&columns), @@ -226,6 +230,7 @@ impl TableSchema { table_access, schedule, primary_key, + is_event, } } @@ -261,6 +266,7 @@ impl TableSchema { StAccess::Public, None, None, + false, ) } @@ -753,6 +759,7 @@ impl TableSchema { table_access, None, None, + false, ) } @@ -875,6 +882,7 @@ impl TableSchema { table_access, None, None, + false, ) } } @@ -904,6 +912,7 @@ impl Schema for TableSchema { schedule, table_type, table_access, + is_event, } = def; let columns = column_schemas_from_defs(module_def, columns, table_id); @@ -941,6 +950,7 @@ impl Schema for TableSchema { (*table_access).into(), schedule, *primary_key, + *is_event, ) } diff --git a/modules/module-test/src/lib.rs b/modules/module-test/src/lib.rs index 433796e5b60..d8ef5ff62a0 100644 --- a/modules/module-test/src/lib.rs +++ b/modules/module-test/src/lib.rs @@ -165,6 +165,12 @@ pub struct HasSpecialStuff { connection_id: ConnectionId, } +#[table(name = my_event, public, event)] +pub struct MyEvent { + name: String, + value: u64, +} + /// These two tables defined with the same row type /// verify that we can define multiple tables with the same type. /// @@ -373,6 +379,11 @@ pub fn delete_players_by_name(ctx: &ReducerContext, name: String) -> Result<(), } } +#[spacetimedb::reducer] +pub fn emit_event(ctx: &ReducerContext, name: String, value: u64) { + ctx.db.my_event().insert(MyEvent { name, value }); +} + #[spacetimedb::reducer(client_connected)] fn client_connected(_ctx: &ReducerContext) {}