Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion opqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ type OpQueue struct {
q *list.List
entries map[ID]*OpSet
closed bool

reducer func(ops []*Op, op *Op) []*Op
}

// NewOpQueue create a new OpQueue.
Expand All @@ -45,11 +47,17 @@ func NewOpQueue(depth, width int) *OpQueue {
width: width,
q: list.New(),
entries: map[ID]*OpSet{},

reducer: appendOp,
}
q.cond.L = &q.mu
return &q
}

func (q *OpQueue) SetReducer(fn func(ops []*Op, op *Op) []*Op) {
q.reducer = fn
}

// Close releases resources associated with this callgroup, by canceling the context.
// The owner of this OpQueue should either call Close or cancel the context, both are
// equivalent.
Expand Down Expand Up @@ -112,7 +120,7 @@ func (q *OpQueue) Enqueue(id ID, op *Op) error {
return ErrQueueSaturatedWidth
}

set.append(op)
set.mergeWith(q.reducer, op)
return nil
}

Expand Down
7 changes: 5 additions & 2 deletions opset.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@ func newOpSet(op *Op) *OpSet {
}
}

func (os *OpSet) append(op *Op) {
os.set = append(os.set, op)
func appendOp(ops []*Op, op *Op) []*Op {
return append(ops, op)
}
func (os *OpSet) mergeWith(fn func([]*Op, *Op) []*Op, op *Op) {
os.set = fn(os.set, op)
}

// Ops get the list of ops in this set.
Expand Down
9 changes: 8 additions & 1 deletion opwindow.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ type OpWindow struct {
depth int
width int
windowedBy time.Duration

reducer func(ops []*Op, op *Op) []*Op
}

// NewOpWindow creates a new OpWindow.
Expand All @@ -43,11 +45,16 @@ func NewOpWindow(depth, width int, windowedBy time.Duration) *OpWindow {
width: width,
windowedBy: windowedBy,
m: make(map[ID]*queueItem),
reducer: appendOp,
}
q.q.Init()
return q
}

func (q *OpWindow) SetReducer(fn func(ops []*Op, op *Op) []*Op) {
q.reducer = fn
}

// Close provides graceful shutdown: no new ops will be enqueued.
func (q *OpWindow) Close() {
q.once.Do(func() {
Expand All @@ -74,7 +81,7 @@ func (q *OpWindow) Enqueue(ctx context.Context, id ID, op *Op) error {
q.mu.Unlock()
return ErrQueueSaturatedWidth
}
item.OpSet.append(op)
item.OpSet.mergeWith(q.reducer, op)
q.mu.Unlock()
return nil
}
Expand Down