diff --git a/crates/fluss/src/metadata/table.rs b/crates/fluss/src/metadata/table.rs index 3b9da7d..9c52d2d 100644 --- a/crates/fluss/src/metadata/table.rs +++ b/crates/fluss/src/metadata/table.rs @@ -19,7 +19,7 @@ use crate::compression::ArrowCompressionInfo; use crate::error::Error::{IllegalArgument, InvalidTableError}; use crate::error::{Error, Result}; use crate::metadata::DataLakeFormat; -use crate::metadata::datatype::{DataField, DataType, RowType}; +use crate::metadata::datatype::{DataField, DataType, DataTypes, RowType}; use crate::{BucketId, PartitionId, TableId}; use core::fmt; use serde::{Deserialize, Serialize}; @@ -784,6 +784,75 @@ impl TableInfo { } } +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct AutoPartitionStrategy { + auto_partition_enabled: bool, + auto_partition_key: Option, + auto_partition_time_unit: String, + auto_partition_num_precreate: i32, + auto_partition_num_retention: i32, + auto_partition_timezone: String, +} + +impl AutoPartitionStrategy { + pub fn from(properties: &HashMap) -> Self { + Self { + auto_partition_enabled: properties + .get("table.auto-partition.enabled") + .and_then(|s| s.parse().ok()) + .unwrap_or(false), + auto_partition_key: properties + .get("table.auto-partition.key") + .map(|s| s.to_string()), + auto_partition_time_unit: properties + .get("table.auto-partition.time-unit") + .map(|s| s.to_string()) + .unwrap_or_else(|| "DAY".to_string()), + auto_partition_num_precreate: properties + .get("table.auto-partition.num-precreate") + .and_then(|s| s.parse().ok()) + .unwrap_or(2), + auto_partition_num_retention: properties + .get("table.auto-partition.num-retention") + .and_then(|s| s.parse().ok()) + .unwrap_or(7), + auto_partition_timezone: properties + .get("table.auto-partition.time-zone") + .map(|s| s.to_string()) + .unwrap_or_else(|| { + jiff::tz::TimeZone::system() + .iana_name() + .unwrap_or("UTC") + .to_string() + }), + } + } + + pub fn is_auto_partition_enabled(&self) -> bool { + self.auto_partition_enabled + } + + pub fn key(&self) -> Option<&str> { + self.auto_partition_key.as_deref() + } + + pub fn time_unit(&self) -> &str { + &self.auto_partition_time_unit + } + + pub fn num_precreate(&self) -> i32 { + self.auto_partition_num_precreate + } + + pub fn num_retention(&self) -> i32 { + self.auto_partition_num_retention + } + + pub fn timezone(&self) -> &str { + &self.auto_partition_timezone + } +} + #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct TableConfig { pub properties: HashMap, @@ -816,6 +885,10 @@ impl TableConfig { .unwrap_or(DEFAULT_KV_FORMAT); kv_format.parse().map_err(Into::into) } + + pub fn get_auto_partition_strategy(&self) -> AutoPartitionStrategy { + AutoPartitionStrategy::from(&self.properties) + } } impl TableInfo { @@ -953,7 +1026,11 @@ impl TableInfo { } pub fn is_auto_partitioned(&self) -> bool { - self.is_partitioned() && todo!() + self.is_partitioned() + && self + .table_config + .get_auto_partition_strategy() + .is_auto_partition_enabled() } pub fn get_partition_keys(&self) -> &[String] { @@ -1106,3 +1183,101 @@ impl LakeSnapshot { &self.table_buckets_offset } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_is_auto_partitioned() { + let schema = Schema::builder() + .column("id", DataTypes::int()) + .column("name", DataTypes::string()) + .primary_key(vec!["id".to_string()]) + .build() + .unwrap(); + + let table_path = TablePath::new("db".to_string(), "tbl".to_string()); + + // 1. Not partitioned, auto partition disabled + let mut properties = HashMap::new(); + let table_info = TableInfo::new( + table_path.clone(), + 1, + 1, + schema.clone(), + vec!["id".to_string()], + vec![], // No partition keys + 1, + properties.clone(), + HashMap::new(), + None, + 0, + 0, + ); + assert!(!table_info.is_auto_partitioned()); + + // 2. Not partitioned, auto partition enabled + properties.insert( + "table.auto-partition.enabled".to_string(), + "true".to_string(), + ); + let table_info = TableInfo::new( + table_path.clone(), + 1, + 1, + schema.clone(), + vec!["id".to_string()], + vec![], // No partition keys + 1, + properties.clone(), + HashMap::new(), + None, + 0, + 0, + ); + assert!(!table_info.is_auto_partitioned()); + + // 3. Partitioned, auto partition disabled + properties.insert( + "table.auto-partition.enabled".to_string(), + "false".to_string(), + ); + let table_info = TableInfo::new( + table_path.clone(), + 1, + 1, + schema.clone(), + vec!["id".to_string()], + vec!["name".to_string()], // Partition keys + 1, + properties.clone(), + HashMap::new(), + None, + 0, + 0, + ); + assert!(!table_info.is_auto_partitioned()); + + // 4. Partitioned, auto partition enabled + properties.insert( + "table.auto-partition.enabled".to_string(), + "true".to_string(), + ); + let table_info = TableInfo::new( + table_path.clone(), + 1, + 1, + schema.clone(), + vec!["id".to_string()], + vec!["name".to_string()], // Partition keys + 1, + properties.clone(), + HashMap::new(), + None, + 0, + 0, + ); + assert!(table_info.is_auto_partitioned()); + } +}