GSignal is a thread-safe Event Hub for Go, designed to decouple components through efficient synchronous and asynchronous messaging.
It provides a robust Hub interface that handles event dispatching with advanced features like worker pools, smart buffering, and context-aware subscriptions.
- Dual Dispatch Mode:
- Synchronous: Direct blocking calls for immediate consistency.
- Asynchronous: Non-blocking dispatch supported by a dedicated worker pool.
- Smart Buffering: Drop-on-full policy for async consumers to prevent slow subscribers from blocking the entire system.
- Flexible Subscriptions:
- Type-based: Subscribe to specific event types (e.g.,
USER.CREATED). - Global (Firehose): Listen to all events flowing through the hub.
- Type-based: Subscribe to specific event types (e.g.,
- Thread-Safety: Fully protected by
sync.RWMutexfor concurrent access. - Graceful Shutdown: Ensures all workers and subscriptions are closed cleanly.
While Go channels are powerful, managing multiple fan-out patterns and safe shutdowns can become complex.
GSignal solves this by providing:
- Managed Worker Pool: The
AsyncDispatcheruses a configurable pool of goroutines (default 4) to handle heavy event loads without spawning a goroutine per event. - Safe Resource Management: Subscriptions are automatically closed when the context is cancelled or
Unsubscribeis called, preventing goroutine leaks. - Low-Allocation Dispatching: Optimized hot path with minimal allocations.
go get github.com/Merluz/GSignalCreate a new Hub instance. This spins up the async worker pool immediately.
package main
import (
"github.com/Merluz/GSignal"
)
func main() {
// Create the hub
hub := gsignal.New()
// Ensure clean shutdown
defer hub.Shutdown()
}You can subscribe to specific events or listen to everything.
// Create a context for the subscription
ctx := context.Background()
// Subscribe to specific events
sub, err := hub.Subscribe(ctx, "ORDER_PLACED", "PAYMENT_RECEIVED")
if err != nil {
log.Fatal(err)
}
// Consume events in a separate goroutine
go func() {
for evt := range sub.Events() {
log.Printf("Received event: %s | Payload: %v", evt.Type, evt.Payload)
}
}()Use this when you need to ensure all subscribers have processed the event before continuing.
evt, _ := gsignal.NewEvent("ORDER_PLACED", order)
err := hub.Publish(evt) // Blocks until all subscribers receive itUse this for fire-and-forget scenarios. The event is queued and processed by the worker pool.
evt, _ := gsignal.NewEvent("LOG_ENTRY", "System started")
err := hub.PublishAsync(evt) // Returns immediatelyhub.go: Main entry point andHubinterface implementation.dispatcher.go: Manages synchronous dispatch logic and subscription lists.async_dispatcher.go: Implements the worker pool and buffered queue for async events.subscription.go: Thread-safe channel wrapper for subscribers.event.go: ULID-based immutable event model.
- Options Pattern: Configure workers, queue size, and ID generation.
- Middleware Support: Interceptors for logging, metrics, or validation.
- Generics Support: Typed payloads in Go 1.18+.
- Persistent Store: Pluggable storage (Redis/SQL) for event durability.
- Distributed Adapter: Bridge multiple GSignal instances via NATS or Kafka.
This project is licensed under the MIT License. See the LICENSE file for details.