Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,13 @@ Run `make tpcc-dual` to mirror every TPCC statement to an in-memory SQLite datab
All cases have been fully optimized.
```shell
<90th Percentile RT (MaxRT)>
New-Order : 0.002 (0.006)
Payment : 0.001 (0.019)
Order-Status : 0.001 (0.003)
Delivery : 0.022 (0.038)
Stock-Level : 0.002 (0.005)
New-Order : 0.002 (0.005)
Payment : 0.001 (0.013)
Order-Status : 0.002 (0.006)
Delivery : 0.010 (0.023)
Stock-Level : 0.002 (0.017)
<TpmC>
18432 Tpmc
27226 Tpmc
```
#### 👉[check more](tpcc/README.md)

Expand Down
2 changes: 1 addition & 1 deletion src/binder/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ impl<T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'_, '_, T, A>
return_orderby.push(SortField::new(
expr,
asc.is_none_or(|asc| asc),
nulls_first.unwrap_or(true),
nulls_first.unwrap_or(false),
));
}
Some(return_orderby)
Expand Down
25 changes: 18 additions & 7 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,14 @@ fn default_optimizer_pipeline() -> HepOptimizerPipeline {
NormalizationRuleImpl::PushLimitIntoTableScan,
],
)
.before_batch(
"TopK".to_string(),
HepBatchStrategy::once_topdown(),
vec![
NormalizationRuleImpl::MinMaxToTopK,
NormalizationRuleImpl::TopK,
],
)
.before_batch(
"Combine Operators".to_string(),
HepBatchStrategy::fix_point_topdown(10),
Expand All @@ -197,11 +205,6 @@ fn default_optimizer_pipeline() -> HepOptimizerPipeline {
NormalizationRuleImpl::CombineFilter,
],
)
.before_batch(
"TopK".to_string(),
HepBatchStrategy::once_topdown(),
vec![NormalizationRuleImpl::TopK],
)
.after_batch(
"Eliminate Aggregate".to_string(),
HepBatchStrategy::once_topdown(),
Expand Down Expand Up @@ -379,8 +382,16 @@ impl<S: Storage> Database<S> {
};
let transaction = Box::into_raw(Box::new(self.storage.transaction()?));
let (schema, executor) =
self.state
.execute(unsafe { &mut (*transaction) }, statement, params)?;
match self
.state
.execute(unsafe { &mut (*transaction) }, statement, params)
{
Ok(result) => result,
Err(err) => {
unsafe { drop(Box::from_raw(transaction)) };
return Err(err);
}
};
let inner = Box::into_raw(Box::new(TransactionIter::new(schema, executor)));
Ok(DatabaseIter { transaction, inner })
}
Expand Down
2 changes: 1 addition & 1 deletion src/optimizer/core/memo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ mod tests {
let sort_fields = vec![SortField::new(
ScalarExpression::column_expr(c1_column.clone()),
true,
true,
false,
)];
let scala_functions = Default::default();
let table_functions = Default::default();
Expand Down
53 changes: 45 additions & 8 deletions src/optimizer/rule/normalization/agg_elimination.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ use crate::expression::ScalarExpression;
use crate::optimizer::core::pattern::{Pattern, PatternChildrenPredicate};
use crate::optimizer::core::rule::{MatchPattern, NormalizationRule};
use crate::optimizer::plan_utils::{only_child_mut, replace_with_only_child};
use crate::planner::operator::limit::LimitOperator;
use crate::planner::operator::sort::SortField;
use crate::planner::operator::{Operator, PhysicalOption, PlanImpl, SortOption};
use crate::planner::{Childrens, LogicalPlan};
use std::sync::LazyLock;

static REDUNDANT_SORT_PATTERN: LazyLock<Pattern> = LazyLock::new(|| Pattern {
predicate: |op| matches!(op, Operator::Sort(_)),
predicate: |op| matches!(op, Operator::Sort(_) | Operator::TopK(_)),
children: PatternChildrenPredicate::None,
});

Expand All @@ -38,8 +39,12 @@ impl MatchPattern for EliminateRedundantSort {

impl NormalizationRule for EliminateRedundantSort {
fn apply(&self, plan: &mut LogicalPlan) -> Result<bool, DatabaseError> {
let sort_fields = match &plan.operator {
Operator::Sort(sort_op) => sort_op.sort_fields.clone(),
let (sort_fields, topk_limit) = match &plan.operator {
Operator::Sort(sort_op) => (sort_op.sort_fields.clone(), None),
Operator::TopK(topk_op) => (
topk_op.sort_fields.clone(),
Some((topk_op.limit, topk_op.offset)),
),
_ => return Ok(false),
};

Expand All @@ -54,6 +59,15 @@ impl NormalizationRule for EliminateRedundantSort {
return Ok(false);
}

if let Some((limit, offset)) = topk_limit {
plan.operator = Operator::Limit(LimitOperator {
offset,
limit: Some(limit),
});
plan.physical_option = Some(PhysicalOption::new(PlanImpl::Limit, SortOption::Follow));
return Ok(true);
}

Ok(replace_with_only_child(plan))
}
}
Expand Down Expand Up @@ -165,7 +179,7 @@ fn distinct_sort_fields(groupby_exprs: &[ScalarExpression]) -> Vec<SortField> {
groupby_exprs
.iter()
.cloned()
.map(|expr| SortField::new(expr, true, true))
.map(|expr| SortField::new(expr, true, false))
.collect()
}

Expand Down Expand Up @@ -322,6 +336,7 @@ mod tests {
use crate::planner::operator::filter::FilterOperator;
use crate::planner::operator::sort::{SortField, SortOperator};
use crate::planner::operator::table_scan::TableScanOperator;
use crate::planner::operator::top_k::TopKOperator;
use crate::planner::operator::{Operator, PhysicalOption, PlanImpl, SortOption};
use crate::planner::{Childrens, LogicalPlan};
use crate::types::index::{IndexInfo, IndexMeta, IndexType};
Expand All @@ -334,7 +349,7 @@ mod tests {

fn make_sort_field(name: &str) -> SortField {
let column = ColumnRef::from(ColumnCatalog::new_dummy(name.to_string()));
SortField::new(ScalarExpression::column_expr(column), true, true)
SortField::new(ScalarExpression::column_expr(column), true, false)
}

fn build_plan(
Expand Down Expand Up @@ -412,7 +427,7 @@ mod tests {
let sort_fields = vec![SortField::new(
ScalarExpression::column_expr(c1.clone()),
true,
true,
false,
)];
let sort_option = SortOption::OrderBy {
fields: sort_fields.clone(),
Expand Down Expand Up @@ -471,6 +486,28 @@ mod tests {
Ok(())
}

#[test]
fn remove_topk_when_index_matches_order() -> Result<(), DatabaseError> {
let sort_field = make_sort_field("c1");
let mut plan = build_plan(vec![sort_field.clone()], vec![sort_field.clone()], 0);
plan.operator = Operator::TopK(TopKOperator {
sort_fields: vec![sort_field],
limit: 10,
offset: Some(5),
});
let rule = EliminateRedundantSort;

assert!(rule.apply(&mut plan)?);
match plan.operator {
Operator::Limit(limit_op) => {
assert_eq!(limit_op.limit, Some(10));
assert_eq!(limit_op.offset, Some(5));
}
_ => unreachable!("expected limit operator after removing topk"),
}
Ok(())
}

#[test]
fn remove_sort_when_prefix_can_be_ignored() -> Result<(), DatabaseError> {
let c1 = make_sort_field("c1");
Expand All @@ -486,7 +523,7 @@ mod tests {
#[test]
fn annotate_sets_sort_hint_on_table_scan() -> Result<(), DatabaseError> {
let column = ColumnRef::from(ColumnCatalog::new_dummy("c1".to_string()));
let sort_field = SortField::new(ScalarExpression::column_expr(column.clone()), true, true);
let sort_field = SortField::new(ScalarExpression::column_expr(column.clone()), true, false);
let (index_info, _) = build_index_info(vec![sort_field.clone()], 0);

let mut columns = BTreeMap::new();
Expand Down Expand Up @@ -588,7 +625,7 @@ mod tests {
#[test]
fn promote_index_to_remove_sort() -> Result<(), DatabaseError> {
let column = ColumnRef::from(ColumnCatalog::new_dummy("c_first".to_string()));
let sort_field = SortField::new(ScalarExpression::column_expr(column.clone()), true, true);
let sort_field = SortField::new(ScalarExpression::column_expr(column.clone()), true, false);
let (mut index_info, _) = build_index_info(vec![sort_field.clone()], 0);
index_info.range = Some(Range::Scope {
min: Bound::Unbounded,
Expand Down
Loading
Loading