Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions example_event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ const (

func TimePublisher(srv *eventsource.Server) {
start := time.Date(2013, time.January, 1, 0, 0, 0, 0, time.UTC)
ticker := time.NewTicker(time.Second)
for i := 0; i < TICK_COUNT; i++ {
ticker := time.NewTicker(1 * time.Millisecond)
for range TICK_COUNT {
<-ticker.C
srv.Publish([]string{"time"}, TimeEvent(start))
start = start.Add(time.Second)
Expand All @@ -45,7 +45,7 @@ func ExampleEvent() {
if err != nil {
return
}
for i := 0; i < TICK_COUNT; i++ {
for range TICK_COUNT {
ev := <-stream.Events
fmt.Println(ev.Id(), ev.Event(), ev.Data())
}
Expand Down
128 changes: 126 additions & 2 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,23 @@ type Server struct {
quit chan bool
isClosed bool
isClosedMutex sync.RWMutex
jitter time.Duration
}

// NewServer creates a new Server instance.
func NewServer() *Server {
var duration time.Duration
return NewServerWithJitter(duration)
}

// NewServerWithJitter creates a new Server instance with jitter support.
//
// WARNING: Intermediate events sent while another event send is pending WILL
// BE DISCARDED. Loss of data will occur if used incorrectly.
//
// This method is for use by LaunchDarkly libraries ONLY. No guarantee is made
// about backwards compatibility or future support.
func NewServerWithJitter(jitter time.Duration) *Server {
srv := &Server{
registrations: make(chan *registration),
unregistrations: make(chan *unregistration),
Expand All @@ -68,6 +81,7 @@ func NewServer() *Server {
unsubs: make(chan *subscription, 2),
quit: make(chan bool),
BufferSize: 128,
jitter: jitter,
}
go srv.run()
return srv
Expand Down Expand Up @@ -157,24 +171,96 @@ func (srv *Server) Handler(channel string) http.HandlerFunc {
closedNormally := false
closeNotify := req.Context().Done()

// The handler consumes events in two different modes -- either as soon as
// they arrive, or on some jitter-influenced delay.
//
// If the jitter value has been provided, the first event received will be
// delayed for some (delay/2, delay) amount of time. Events received until
// then are DISCARDED. If this seems excessive, it is!
//
// This jitter functionality is only meant to service the ping stream
// functionality. The ping stream sends identical "ping" events, so
// discarding intermediate values is a safe operation.

var delayedEvent eventOrComment
jitterStrategy := newDefaultJitter(0.5, 0)

usingJitter := srv.jitter > 0
var jitterTimer timer
if usingJitter {
jitterTimer = &goTimer{timer: time.NewTimer(jitterStrategy.applyJitter(srv.jitter))}
jitterTimer.Stop()
} else {
jitterTimer = &noopTimer{C: make(<-chan time.Time)}
}

ReadLoop:
for {
select {
case <-closeNotify:
break ReadLoop
case <-maxConnTimeCh: // if MaxConnTime was not set, this is a nil channel and has no effect on the select
break ReadLoop
case <-jitterTimer.Channel():
// If the jitter is 0, we may have an initial event that fired before
// we could stop the timer. Or maybe the channel is being closed.
// Whatever the reason, we can safely discard here.
if !usingJitter || delayedEvent == nil {
continue
}

ok := writeEventOrComment(delayedEvent)
delayedEvent = nil

if !ok {
break ReadLoop
}
case ev, ok := <-readMainCh:
if !ok {
closedNormally = true
break ReadLoop
}

if batch, ok := ev.(eventBatch); ok {
// If we receive an event batch, we are meant to switch to this as
// our input source. But before we can do that, we need to process
// any event that was pending processing.
if delayedEvent != nil {
jitterTimer.Stop()
ok := writeEventOrComment(delayedEvent)
delayedEvent = nil

if !ok {
break ReadLoop
}
}

readBatchCh = batch.events
readMainCh = nil
} else if !writeEventOrComment(ev) {
break ReadLoop
continue
}

// Write immediately if we aren't using the jitter functionality.
if !usingJitter {
if !writeEventOrComment(ev) {
break ReadLoop
}
continue
}

// If we are using jitter and we have a pending event, then we don't
// need to do anything. We can swallow this event.
if delayedEvent != nil {
continue
}

delayedEvent = ev

// Figure out the jitter and start the timer. Once this trigger, we
// will write the event and clear the way for a new event to come in.
delay := jitterStrategy.applyJitter(srv.jitter)
jitterTimer.Reset(delay)

case ev, ok := <-readBatchCh:
if !ok { // end of batch
readBatchCh = nil
Expand Down Expand Up @@ -358,3 +444,41 @@ func (s *subscription) close() {
close(s.out)
s.out = nil
}

type timer interface {
Channel() <-chan time.Time
Reset(time.Duration) bool
Stop() bool
}

type noopTimer struct {
C <-chan time.Time
}

func (n *noopTimer) Channel() <-chan time.Time {
return n.C
}

func (n *noopTimer) Reset(_ time.Duration) bool {
return true
}

func (n *noopTimer) Stop() bool {
return true
}

type goTimer struct {
timer *time.Timer
}

func (t *goTimer) Channel() <-chan time.Time {
return t.timer.C
}

func (t *goTimer) Reset(d time.Duration) bool {
return t.timer.Reset(d)
}

func (t *goTimer) Stop() bool {
return t.timer.Stop()
}
Loading
Loading