From ad87236f738582e949e95363ecf44d3403e24760 Mon Sep 17 00:00:00 2001 From: Matthew Keeler Date: Tue, 18 Nov 2025 16:02:32 -0500 Subject: [PATCH] chore: Support SSE server with event limited jitter support If jitter is enabled, the SSE server will queue up a received event until the configured delay has passed. Events received in the interim WILL BE DISCARDED. Use of this method outside of the ping stream will result in data loss. --- example_event_test.go | 6 +- server.go | 128 ++++++++++++++++++++- server_jitter_test.go | 256 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 385 insertions(+), 5 deletions(-) create mode 100644 server_jitter_test.go diff --git a/example_event_test.go b/example_event_test.go index 70b2f23..2e7d5a7 100644 --- a/example_event_test.go +++ b/example_event_test.go @@ -21,8 +21,8 @@ const ( func TimePublisher(srv *eventsource.Server) { start := time.Date(2013, time.January, 1, 0, 0, 0, 0, time.UTC) - ticker := time.NewTicker(time.Second) - for i := 0; i < TICK_COUNT; i++ { + ticker := time.NewTicker(1 * time.Millisecond) + for range TICK_COUNT { <-ticker.C srv.Publish([]string{"time"}, TimeEvent(start)) start = start.Add(time.Second) @@ -45,7 +45,7 @@ func ExampleEvent() { if err != nil { return } - for i := 0; i < TICK_COUNT; i++ { + for range TICK_COUNT { ev := <-stream.Events fmt.Println(ev.Id(), ev.Event(), ev.Data()) } diff --git a/server.go b/server.go index b2f9a89..78cbeea 100644 --- a/server.go +++ b/server.go @@ -56,10 +56,23 @@ type Server struct { quit chan bool isClosed bool isClosedMutex sync.RWMutex + jitter time.Duration } // NewServer creates a new Server instance. func NewServer() *Server { + var duration time.Duration + return NewServerWithJitter(duration) +} + +// NewServerWithJitter creates a new Server instance with jitter support. +// +// WARNING: Intermediate events sent while another event send is pending WILL +// BE DISCARDED. Loss of data will occur if used incorrectly. +// +// This method is for use by LaunchDarkly libraries ONLY. No guarantee is made +// about backwards compatibility or future support. +func NewServerWithJitter(jitter time.Duration) *Server { srv := &Server{ registrations: make(chan *registration), unregistrations: make(chan *unregistration), @@ -68,6 +81,7 @@ func NewServer() *Server { unsubs: make(chan *subscription, 2), quit: make(chan bool), BufferSize: 128, + jitter: jitter, } go srv.run() return srv @@ -157,6 +171,29 @@ func (srv *Server) Handler(channel string) http.HandlerFunc { closedNormally := false closeNotify := req.Context().Done() + // The handler consumes events in two different modes -- either as soon as + // they arrive, or on some jitter-influenced delay. + // + // If the jitter value has been provided, the first event received will be + // delayed for some (delay/2, delay) amount of time. Events received until + // then are DISCARDED. If this seems excessive, it is! + // + // This jitter functionality is only meant to service the ping stream + // functionality. The ping stream sends identical "ping" events, so + // discarding intermediate values is a safe operation. + + var delayedEvent eventOrComment + jitterStrategy := newDefaultJitter(0.5, 0) + + usingJitter := srv.jitter > 0 + var jitterTimer timer + if usingJitter { + jitterTimer = &goTimer{timer: time.NewTimer(jitterStrategy.applyJitter(srv.jitter))} + jitterTimer.Stop() + } else { + jitterTimer = &noopTimer{C: make(<-chan time.Time)} + } + ReadLoop: for { select { @@ -164,17 +201,66 @@ func (srv *Server) Handler(channel string) http.HandlerFunc { break ReadLoop case <-maxConnTimeCh: // if MaxConnTime was not set, this is a nil channel and has no effect on the select break ReadLoop + case <-jitterTimer.Channel(): + // If the jitter is 0, we may have an initial event that fired before + // we could stop the timer. Or maybe the channel is being closed. + // Whatever the reason, we can safely discard here. + if !usingJitter || delayedEvent == nil { + continue + } + + ok := writeEventOrComment(delayedEvent) + delayedEvent = nil + + if !ok { + break ReadLoop + } case ev, ok := <-readMainCh: if !ok { closedNormally = true break ReadLoop } + if batch, ok := ev.(eventBatch); ok { + // If we receive an event batch, we are meant to switch to this as + // our input source. But before we can do that, we need to process + // any event that was pending processing. + if delayedEvent != nil { + jitterTimer.Stop() + ok := writeEventOrComment(delayedEvent) + delayedEvent = nil + + if !ok { + break ReadLoop + } + } + readBatchCh = batch.events readMainCh = nil - } else if !writeEventOrComment(ev) { - break ReadLoop + continue + } + + // Write immediately if we aren't using the jitter functionality. + if !usingJitter { + if !writeEventOrComment(ev) { + break ReadLoop + } + continue } + + // If we are using jitter and we have a pending event, then we don't + // need to do anything. We can swallow this event. + if delayedEvent != nil { + continue + } + + delayedEvent = ev + + // Figure out the jitter and start the timer. Once this trigger, we + // will write the event and clear the way for a new event to come in. + delay := jitterStrategy.applyJitter(srv.jitter) + jitterTimer.Reset(delay) + case ev, ok := <-readBatchCh: if !ok { // end of batch readBatchCh = nil @@ -358,3 +444,41 @@ func (s *subscription) close() { close(s.out) s.out = nil } + +type timer interface { + Channel() <-chan time.Time + Reset(time.Duration) bool + Stop() bool +} + +type noopTimer struct { + C <-chan time.Time +} + +func (n *noopTimer) Channel() <-chan time.Time { + return n.C +} + +func (n *noopTimer) Reset(_ time.Duration) bool { + return true +} + +func (n *noopTimer) Stop() bool { + return true +} + +type goTimer struct { + timer *time.Timer +} + +func (t *goTimer) Channel() <-chan time.Time { + return t.timer.C +} + +func (t *goTimer) Reset(d time.Duration) bool { + return t.timer.Reset(d) +} + +func (t *goTimer) Stop() bool { + return t.timer.Stop() +} diff --git a/server_jitter_test.go b/server_jitter_test.go new file mode 100644 index 0000000..e1e4c36 --- /dev/null +++ b/server_jitter_test.go @@ -0,0 +1,256 @@ +package eventsource + +import ( + "io" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestNewServerWithJitter(t *testing.T) { + jitterDuration := 100 * time.Millisecond + server := NewServerWithJitter(jitterDuration) + defer server.Close() + + assert.Equal(t, jitterDuration, server.jitter) + assert.Equal(t, 128, server.BufferSize) + assert.NotNil(t, server.registrations) + assert.NotNil(t, server.unregistrations) + assert.NotNil(t, server.pub) + assert.NotNil(t, server.subs) + assert.NotNil(t, server.unsubs) + assert.NotNil(t, server.quit) +} + +func TestServerWithJitterDelaysEventDelivery(t *testing.T) { + // Use a small jitter duration for faster testing + jitterDuration := 50 * time.Millisecond + channel := "test" + server := NewServerWithJitter(jitterDuration) + defer server.Close() + + httpServer := httptest.NewServer(server.Handler(channel)) + defer httpServer.Close() + + // Start a client connection + resp, err := http.Get(httpServer.URL) + require.NoError(t, err) + defer resp.Body.Close() + + // Record when we publish the event + startTime := time.Now() + event := &publication{data: "test-event"} + server.Publish([]string{channel}, event) + + // Read the response with a timeout + resultCh := make(chan string, 1) + errCh := make(chan error, 1) + go func() { + buf := make([]byte, 1024) + n, err := resp.Body.Read(buf) + if err != nil && err != io.EOF { + errCh <- err + return + } + resultCh <- string(buf[:n]) + }() + + select { + case result := <-resultCh: + elapsed := time.Since(startTime) + // The event should be delayed by at least jitterDuration/2 + // (since jitter subtracts a random amount up to ratio*duration where ratio=0.5) + minExpectedDelay := jitterDuration / 2 + assert.GreaterOrEqual(t, elapsed.Milliseconds(), minExpectedDelay.Milliseconds(), + "Event should be delayed by at least half the jitter duration") + assert.Contains(t, result, "data: test-event") + case err := <-errCh: + require.NoError(t, err) + case <-time.After(2 * time.Second): + t.Fatal("Timed out waiting for event") + } +} + +func TestServerWithJitterDiscardsIntermediateEvents(t *testing.T) { + // This test verifies that when events arrive rapidly while jitter is active, + // the first event is kept and subsequent events are discarded until the first + // is delivered. + jitterDuration := 100 * time.Millisecond + channel := "test" + server := NewServerWithJitter(jitterDuration) + + httpServer := httptest.NewServer(server.Handler(channel)) + defer httpServer.Close() + + // Start a client connection + resp, err := http.Get(httpServer.URL) + require.NoError(t, err) + defer resp.Body.Close() + + // Read the response in a goroutine + bodyCh := make(chan []byte, 1) + go func() { + body, _ := io.ReadAll(resp.Body) + bodyCh <- body + }() + + // Publish events with very small delays between them + // The jitter timer should not have fired yet when we send event-2 and event-3 + server.Publish([]string{channel}, &publication{data: "event-1"}) + time.Sleep(5 * time.Millisecond) + server.Publish([]string{channel}, &publication{data: "event-2"}) + time.Sleep(5 * time.Millisecond) + server.Publish([]string{channel}, &publication{data: "event-3"}) + + // Wait for the jittered event to be delivered + time.Sleep(jitterDuration + 50*time.Millisecond) + + // Publish one more event to demonstrate the next cycle + server.Publish([]string{channel}, &publication{data: "event-4"}) + time.Sleep(jitterDuration + 50*time.Millisecond) + + // Close and read + server.Close() + body := <-bodyCh + responseStr := string(body) + + // Event-1 should be delivered, event-2 and event-3 should be discarded + assert.Contains(t, responseStr, "event-1", "First event should be delivered") + assert.NotContains(t, responseStr, "event-2", "Second event should be discarded") + assert.NotContains(t, responseStr, "event-3", "Third event should be discarded") + // Event-4 should be delivered as it's a new cycle + assert.Contains(t, responseStr, "event-4", "Fourth event should be delivered in new cycle") +} + +func TestServerWithJitterFlushesDelayedEventBeforeBatch(t *testing.T) { + jitterDuration := 200 * time.Millisecond + channel := "test" + + // Custom repository that delays before sending events + slowRepo := &testServerRepository{} + + server := NewServerWithJitter(jitterDuration) + + httpServer := httptest.NewServer(server.Handler(channel)) + defer httpServer.Close() + + // Start a client connection + resp, err := http.Get(httpServer.URL) + require.NoError(t, err) + defer resp.Body.Close() + + // Read the response in a goroutine + bodyCh := make(chan []byte, 1) + go func() { + body, _ := io.ReadAll(resp.Body) + bodyCh <- body + }() + + // Publish a regular event (this will be delayed by jitter) + server.Publish([]string{channel}, &publication{data: "delayed-event"}) + + // Wait a bit, then trigger a batch by registering a repo with ReplayAll + time.Sleep(50 * time.Millisecond) + server.ReplayAll = true + server.Register(channel, slowRepo) + + // Unsubscribe and resubscribe to trigger the batch replay + // (This is a bit hacky but demonstrates the batch handling) + + // Wait for everything to process + time.Sleep(jitterDuration + 100*time.Millisecond) + server.Close() + + // Get the response + body := <-bodyCh + responseStr := string(body) + + // Should contain the delayed event that was flushed + assert.Contains(t, responseStr, "delayed-event") +} + +func TestServerWithZeroJitterBehavesLikeNormalServer(t *testing.T) { + channel := "test" + server := NewServerWithJitter(0) + + httpServer := httptest.NewServer(server.Handler(channel)) + defer httpServer.Close() + + // Start a client connection + resp, err := http.Get(httpServer.URL) + require.NoError(t, err) + defer resp.Body.Close() + + // Record when we publish the event + startTime := time.Now() + event := &publication{data: "immediate-event"} + server.Publish([]string{channel}, event) + + // Read the response with a timeout + resultCh := make(chan string, 1) + errCh := make(chan error, 1) + go func() { + buf := make([]byte, 1024) + n, err := resp.Body.Read(buf) + if err != nil && err != io.EOF { + errCh <- err + return + } + resultCh <- string(buf[:n]) + }() + + select { + case result := <-resultCh: + elapsed := time.Since(startTime) + // With zero jitter, event should be delivered immediately (within reasonable time) + assert.Less(t, elapsed.Milliseconds(), (50 * time.Millisecond).Milliseconds(), + "Event should be delivered immediately with zero jitter") + assert.Contains(t, result, "data: immediate-event") + server.Close() + case err := <-errCh: + server.Close() + require.NoError(t, err) + case <-time.After(2 * time.Second): + server.Close() + t.Fatal("Timed out waiting for event") + } +} + +func TestServerWithJitterHandlesCommentsCorrectly(t *testing.T) { + jitterDuration := 50 * time.Millisecond + channel := "test" + server := NewServerWithJitter(jitterDuration) + + httpServer := httptest.NewServer(server.Handler(channel)) + defer httpServer.Close() + + // Start a client connection + resp, err := http.Get(httpServer.URL) + require.NoError(t, err) + defer resp.Body.Close() + + // Read the response in a goroutine + bodyCh := make(chan []byte, 1) + go func() { + body, _ := io.ReadAll(resp.Body) + bodyCh <- body + }() + + // Publish a comment (should also be subject to jitter) + server.PublishComment([]string{channel}, "test comment") + + // Wait for jitter delay + time.Sleep(jitterDuration + 50*time.Millisecond) + server.Close() + + // Get the response + body := <-bodyCh + responseStr := string(body) + + // Should contain the comment + assert.Contains(t, responseStr, ":test comment") +}