From d175ffff7265b35668ea404a03011dea2d1d9472 Mon Sep 17 00:00:00 2001 From: AJ Roetker Date: Wed, 16 Oct 2024 15:02:59 -0700 Subject: [PATCH 1/3] Add reducer to inflight queues --- opqueue.go | 12 +++++++++++- opwindow.go | 11 ++++++++++- 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/opqueue.go b/opqueue.go index 1d6dd66..5f64f47 100644 --- a/opqueue.go +++ b/opqueue.go @@ -36,6 +36,8 @@ type OpQueue struct { q *list.List entries map[ID]*OpSet closed bool + + reducer func(opset *OpSet, op *Op) } // NewOpQueue create a new OpQueue. @@ -45,11 +47,19 @@ func NewOpQueue(depth, width int) *OpQueue { width: width, q: list.New(), entries: map[ID]*OpSet{}, + + reducer: func(opset *OpSet, op *Op) { + opset.append(op) + }, } q.cond.L = &q.mu return &q } +func (q *OpQueue) SetReducer(fn func(opset *OpSet, op *Op)) { + q.reducer = fn +} + // Close releases resources associated with this callgroup, by canceling the context. // The owner of this OpQueue should either call Close or cancel the context, both are // equivalent. @@ -112,7 +122,7 @@ func (q *OpQueue) Enqueue(id ID, op *Op) error { return ErrQueueSaturatedWidth } - set.append(op) + q.reducer(set, op) return nil } diff --git a/opwindow.go b/opwindow.go index 6e9ff79..19f5c16 100644 --- a/opwindow.go +++ b/opwindow.go @@ -27,6 +27,8 @@ type OpWindow struct { depth int width int windowedBy time.Duration + + reducer func(opset *OpSet, op *Op) } // NewOpWindow creates a new OpWindow. @@ -43,11 +45,18 @@ func NewOpWindow(depth, width int, windowedBy time.Duration) *OpWindow { width: width, windowedBy: windowedBy, m: make(map[ID]*queueItem), + reducer: func(opset *OpSet, op *Op) { + opset.append(op) + }, } q.q.Init() return q } +func (q *OpWindow) SetReducer(fn func(opset *OpSet, op *Op)) { + q.reducer = fn +} + // Close provides graceful shutdown: no new ops will be enqueued. func (q *OpWindow) Close() { q.once.Do(func() { @@ -74,7 +83,7 @@ func (q *OpWindow) Enqueue(ctx context.Context, id ID, op *Op) error { q.mu.Unlock() return ErrQueueSaturatedWidth } - item.OpSet.append(op) + q.reducer(item.OpSet, op) q.mu.Unlock() return nil } From 03fb736e41c2a4fcd0c51c94be6f55297e021b34 Mon Sep 17 00:00:00 2001 From: AJ Roetker Date: Wed, 16 Oct 2024 15:32:20 -0700 Subject: [PATCH 2/3] Add ability to pass in a mergewith to opset --- opqueue.go | 2 +- opset.go | 7 +++++-- opwindow.go | 2 +- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/opqueue.go b/opqueue.go index 5f64f47..0495ee1 100644 --- a/opqueue.go +++ b/opqueue.go @@ -49,7 +49,7 @@ func NewOpQueue(depth, width int) *OpQueue { entries: map[ID]*OpSet{}, reducer: func(opset *OpSet, op *Op) { - opset.append(op) + opset.MergeWith(append, op) }, } q.cond.L = &q.mu diff --git a/opset.go b/opset.go index 4a6d957..83af8e6 100644 --- a/opset.go +++ b/opset.go @@ -12,8 +12,11 @@ func newOpSet(op *Op) *OpSet { } } -func (os *OpSet) append(op *Op) { - os.set = append(os.set, op) +func appendOp(ops []*Op, op *Op) []*Op { + return append(ops, op) +} +func (os *OpSet) MergeWith(fn func([]*Op, *Op) []*Op, op *Op) { + os.set = fn(os.set, op) } // Ops get the list of ops in this set. diff --git a/opwindow.go b/opwindow.go index 19f5c16..37bf5e9 100644 --- a/opwindow.go +++ b/opwindow.go @@ -46,7 +46,7 @@ func NewOpWindow(depth, width int, windowedBy time.Duration) *OpWindow { windowedBy: windowedBy, m: make(map[ID]*queueItem), reducer: func(opset *OpSet, op *Op) { - opset.append(op) + opset.MergeWith(appendOp, op) }, } q.q.Init() From f7c599de054a595b412961ed772699c996ee2e66 Mon Sep 17 00:00:00 2001 From: AJ Roetker Date: Thu, 17 Oct 2024 13:06:30 -0700 Subject: [PATCH 3/3] Simplify reducer passthrough --- opqueue.go | 10 ++++------ opset.go | 2 +- opwindow.go | 10 ++++------ 3 files changed, 9 insertions(+), 13 deletions(-) diff --git a/opqueue.go b/opqueue.go index 0495ee1..46b66ae 100644 --- a/opqueue.go +++ b/opqueue.go @@ -37,7 +37,7 @@ type OpQueue struct { entries map[ID]*OpSet closed bool - reducer func(opset *OpSet, op *Op) + reducer func(ops []*Op, op *Op) []*Op } // NewOpQueue create a new OpQueue. @@ -48,15 +48,13 @@ func NewOpQueue(depth, width int) *OpQueue { q: list.New(), entries: map[ID]*OpSet{}, - reducer: func(opset *OpSet, op *Op) { - opset.MergeWith(append, op) - }, + reducer: appendOp, } q.cond.L = &q.mu return &q } -func (q *OpQueue) SetReducer(fn func(opset *OpSet, op *Op)) { +func (q *OpQueue) SetReducer(fn func(ops []*Op, op *Op) []*Op) { q.reducer = fn } @@ -122,7 +120,7 @@ func (q *OpQueue) Enqueue(id ID, op *Op) error { return ErrQueueSaturatedWidth } - q.reducer(set, op) + set.mergeWith(q.reducer, op) return nil } diff --git a/opset.go b/opset.go index 83af8e6..10a1f8c 100644 --- a/opset.go +++ b/opset.go @@ -15,7 +15,7 @@ func newOpSet(op *Op) *OpSet { func appendOp(ops []*Op, op *Op) []*Op { return append(ops, op) } -func (os *OpSet) MergeWith(fn func([]*Op, *Op) []*Op, op *Op) { +func (os *OpSet) mergeWith(fn func([]*Op, *Op) []*Op, op *Op) { os.set = fn(os.set, op) } diff --git a/opwindow.go b/opwindow.go index 37bf5e9..8612a58 100644 --- a/opwindow.go +++ b/opwindow.go @@ -28,7 +28,7 @@ type OpWindow struct { width int windowedBy time.Duration - reducer func(opset *OpSet, op *Op) + reducer func(ops []*Op, op *Op) []*Op } // NewOpWindow creates a new OpWindow. @@ -45,15 +45,13 @@ func NewOpWindow(depth, width int, windowedBy time.Duration) *OpWindow { width: width, windowedBy: windowedBy, m: make(map[ID]*queueItem), - reducer: func(opset *OpSet, op *Op) { - opset.MergeWith(appendOp, op) - }, + reducer: appendOp, } q.q.Init() return q } -func (q *OpWindow) SetReducer(fn func(opset *OpSet, op *Op)) { +func (q *OpWindow) SetReducer(fn func(ops []*Op, op *Op) []*Op) { q.reducer = fn } @@ -83,7 +81,7 @@ func (q *OpWindow) Enqueue(ctx context.Context, id ID, op *Op) error { q.mu.Unlock() return ErrQueueSaturatedWidth } - q.reducer(item.OpSet, op) + item.OpSet.mergeWith(q.reducer, op) q.mu.Unlock() return nil }