diff --git a/opqueue.go b/opqueue.go index 1d6dd66..46b66ae 100644 --- a/opqueue.go +++ b/opqueue.go @@ -36,6 +36,8 @@ type OpQueue struct { q *list.List entries map[ID]*OpSet closed bool + + reducer func(ops []*Op, op *Op) []*Op } // NewOpQueue create a new OpQueue. @@ -45,11 +47,17 @@ func NewOpQueue(depth, width int) *OpQueue { width: width, q: list.New(), entries: map[ID]*OpSet{}, + + reducer: appendOp, } q.cond.L = &q.mu return &q } +func (q *OpQueue) SetReducer(fn func(ops []*Op, op *Op) []*Op) { + q.reducer = fn +} + // Close releases resources associated with this callgroup, by canceling the context. // The owner of this OpQueue should either call Close or cancel the context, both are // equivalent. @@ -112,7 +120,7 @@ func (q *OpQueue) Enqueue(id ID, op *Op) error { return ErrQueueSaturatedWidth } - set.append(op) + set.mergeWith(q.reducer, op) return nil } diff --git a/opset.go b/opset.go index 4a6d957..10a1f8c 100644 --- a/opset.go +++ b/opset.go @@ -12,8 +12,11 @@ func newOpSet(op *Op) *OpSet { } } -func (os *OpSet) append(op *Op) { - os.set = append(os.set, op) +func appendOp(ops []*Op, op *Op) []*Op { + return append(ops, op) +} +func (os *OpSet) mergeWith(fn func([]*Op, *Op) []*Op, op *Op) { + os.set = fn(os.set, op) } // Ops get the list of ops in this set. diff --git a/opwindow.go b/opwindow.go index 6e9ff79..8612a58 100644 --- a/opwindow.go +++ b/opwindow.go @@ -27,6 +27,8 @@ type OpWindow struct { depth int width int windowedBy time.Duration + + reducer func(ops []*Op, op *Op) []*Op } // NewOpWindow creates a new OpWindow. @@ -43,11 +45,16 @@ func NewOpWindow(depth, width int, windowedBy time.Duration) *OpWindow { width: width, windowedBy: windowedBy, m: make(map[ID]*queueItem), + reducer: appendOp, } q.q.Init() return q } +func (q *OpWindow) SetReducer(fn func(ops []*Op, op *Op) []*Op) { + q.reducer = fn +} + // Close provides graceful shutdown: no new ops will be enqueued. func (q *OpWindow) Close() { q.once.Do(func() { @@ -74,7 +81,7 @@ func (q *OpWindow) Enqueue(ctx context.Context, id ID, op *Op) error { q.mu.Unlock() return ErrQueueSaturatedWidth } - item.OpSet.append(op) + item.OpSet.mergeWith(q.reducer, op) q.mu.Unlock() return nil }