Skip to content

nilorg/eventbus

Repository files navigation

EventBus

Go License Go Report Card

EventBus 是一个高性能、可扩展的 Go 语言事件总线库,支持多种消息中间件后端。

特性

多种后端支持

  • ✅ RabbitMQ - 企业级消息队列
  • ✅ Redis Streams - 高性能流式处理
  • ✅ Redis Queue (List) - 轻量级队列实现
  • ✅ NATS Core - 高性能云原生消息系统
  • ✅ NATS JetStream - 持久化消息流平台

🚀 核心功能

  • 同步/异步消息发布订阅
  • 消费组支持
  • 消息头自定义
  • 可插拔序列化器
  • 多种日志器支持
  • 优雅的错误处理
  • 智能重试机制
  • 统一死信队列支持
  • 消息级别重试
  • 连接自动恢复
  • 队列长度限制
  • 消息TTL管理

🛠 易于使用

  • 统一的 API 接口
  • 丰富的配置选项
  • 完整的示例代码
  • 详细的文档说明

快速开始

安装

go get github.com/nilorg/eventbus

基本用法

Redis Streams

package main

import (
    "context"
    "log"
    
    "github.com/go-redis/redis/v8"
    "github.com/nilorg/eventbus"
)

func main() {
    // 连接Redis
    client := redis.NewClient(&redis.Options{
        Addr: "localhost:6379",
    })
    defer client.Close()

    // 创建事件总线
    bus, err := eventbus.NewRedis(client)
    if err != nil {
        log.Fatal(err)
    }

    ctx := context.Background()
    topic := "user_events"

    // 订阅消息
    go func() {
        bus.Subscribe(ctx, topic, func(ctx context.Context, msg *eventbus.Message) error {
            log.Printf("收到消息: %+v", msg.Value)
            return nil
        })
    }()

    // 发布消息
    err = bus.Publish(ctx, topic, map[string]interface{}{
        "user_id": 123,
        "action":  "login",
    })
    if err != nil {
        log.Fatal(err)
    }
}

Redis Queue

// 创建 Redis Queue 事件总线
bus, err := eventbus.NewRedisQueue(client)
if err != nil {
    log.Fatal(err)
}

// 使用方式与 Redis Streams 相同

NATS Core

package main

import (
    "context"
    "log"
    
    "github.com/nats-io/nats.go"
    "github.com/nilorg/eventbus"
)

func main() {
    // 连接NATS
    nc, err := nats.Connect(nats.DefaultURL)
    if err != nil {
        log.Fatal(err)
    }
    defer nc.Close()

    // 创建NATS事件总线
    bus, err := eventbus.NewNATS(nc)
    if err != nil {
        log.Fatal(err)
    }

    ctx := context.Background()
    topic := "user.events"

    // 订阅消息
    go func() {
        bus.Subscribe(ctx, topic, func(ctx context.Context, msg *eventbus.Message) error {
            log.Printf("NATS收到消息: %+v", msg.Value)
            return nil
        })
    }()

    // 发布消息
    err = bus.Publish(ctx, topic, map[string]interface{}{
        "user_id": 123,
        "action":  "login",
    })
    if err != nil {
        log.Fatal(err)
    }
}

NATS JetStream

// 创建NATS JetStream事件总线,支持持久化
options := &eventbus.NATSJetStreamOptions{
    StreamName:          "MY_APP",
    MaxMsgs:             1000000,
    MaxAge:              time.Hour * 24 * 7,  // 7天
    Replicas:            1,
    DefaultDeliveryMode: eventbus.NATSJetStreamDeliveryModeWorkQueue, // 默认投递模式
}

bus, err := eventbus.NewNATSJetStream(nc, options)
if err != nil {
    log.Fatal(err)
}

ctx := context.Background()
topic := "user.events"

// 发布消息(使用默认的 WorkQueue 模式)
err = bus.Publish(ctx, topic, map[string]interface{}{
    "user_id": 123,
    "action":  "login",
})

// 订阅消息
bus.Subscribe(ctx, topic, func(ctx context.Context, msg *eventbus.Message) error {
    log.Printf("收到消息: %+v", msg.Value)
    return nil
})

RabbitMQ

import "github.com/streadway/amqp"

// 连接RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
    log.Fatal(err)
}
defer conn.Close()

// 创建事件总线
bus, err := eventbus.NewRabbitMQ(conn)
if err != nil {
    log.Fatal(err)
}

高级配置

死信队列配置

EventBus 支持统一的死信队列处理,当消息处理失败时自动发送到死信队列:

// Redis Streams 死信配置
redisOptions := &eventbus.RedisOptions{
    DeadLetterTopic:   "user_events_dlq",    // 死信队列主题
    DeadLetterMaxLen:  1000,                 // 死信队列最大长度
    DeadLetterTTL:     time.Hour * 24,       // 死信消息TTL
    MessageMaxRetries: 3,                    // 消息最大重试次数
    SkipBadMessages:   true,                 // 跳过无法处理的消息
}

bus, err := eventbus.NewRedis(client, redisOptions)

// Redis Queue 死信配置
queueOptions := &eventbus.RedisQueueOptions{
    DeadLetterTopic:   "user_queue_dlq",
    DeadLetterMaxLen:  1000,
    DeadLetterTTL:     time.Hour * 24,
    MessageMaxRetries: 3,
    SkipBadMessages:   true,
}

bus, err := eventbus.NewRedisQueue(client, queueOptions)

// NATS 死信配置
natsOptions := &eventbus.NATSOptions{
    DeadLetterSubject: "user.events.dlq",
    MessageMaxRetries: 3,
    SkipBadMessages:   true,
}

bus, err := eventbus.NewNATS(nc, natsOptions)

处理死信消息

// 监听死信队列
err := bus.Subscribe(ctx, "user_events_dlq", func(ctx context.Context, msg *eventbus.Message) error {
    // 死信消息使用统一的 dlq_ 前缀格式
    if data, ok := msg.Value.(map[string]interface{}); ok {
        originalTopic := data["dlq_original_topic"]
        originalID := data["dlq_original_id"]
        errorReason := data["dlq_error_reason"]
        failedAt := data["dlq_failed_at"]
        originalValues := data["dlq_original_values"]
        
        log.Printf("处理死信: topic=%v, id=%v, error=%v, time=%v", 
                   originalTopic, originalID, errorReason, failedAt)
        
        // 进行告警、重新处理或记录
        return handleDeadLetterMessage(originalValues)
    }
    return nil
})

高级用法

消费组

// 创建带消费组的上下文
groupCtx := eventbus.NewGroupIDContext(ctx, "payment_service")

// 订阅消息
bus.Subscribe(groupCtx, "order_events", handler)

// 发布到特定消费组
bus.Publish(groupCtx, "order_events", orderData)

NATS JetStream 投递模式

NATS JetStream 支持三种投递模式,适用于不同的业务场景。

重要说明:投递模式通过 Context 指定,不同模式的消息存储在不同的 Stream 中,彼此完全隔离。

例如,业务 Topic user.events 在不同模式下的实际 Subject:

  • WorkQueue: EVENTBUS.user.events
  • Broadcast: EVENTBUS_BROADCAST.user.events
  • Limits: EVENTBUS_LIMITS.user.events

发布者和订阅者必须使用相同的模式才能通信。

WorkQueue 模式(工作队列)

消息只被一个消费者处理,适合任务分发场景。所有订阅者共享同一个 Consumer,实现负载均衡。

// 配置默认使用 WorkQueue 模式
options := &eventbus.NATSJetStreamOptions{
    StreamName:          "TASKS",
    DefaultDeliveryMode: eventbus.NATSJetStreamDeliveryModeWorkQueue,
}

bus, err := eventbus.NewNATSJetStream(nc, options)

// 多个消费者订阅同一个主题,消息会被负载均衡分发
// 消费者 A
bus.Subscribe(ctx, "tasks.process", handlerA)
// 消费者 B
bus.Subscribe(ctx, "tasks.process", handlerB)

// 发布任务,只会被 A 或 B 中的一个处理
bus.Publish(ctx, "tasks.process", taskData)

Broadcast 模式(广播)

所有 Group 都会收到消息,同一 Group 内负载均衡。适合事件通知、配置更新等场景。

// 使用上下文指定 Broadcast 模式
broadcastCtx := eventbus.WithNATSJetStreamDeliveryMode(ctx, eventbus.NATSJetStreamDeliveryModeBroadcast)

// 服务 A 的实例 1 和 2(使用相同 Group,组内负载均衡)
groupCtxA := eventbus.NewGroupIDContext(broadcastCtx, "service-a")
bus.Subscribe(groupCtxA, "config.update", handlerA1) // 实例 1
bus.Subscribe(groupCtxA, "config.update", handlerA2) // 实例 2

// 服务 B 的实例(不同 Group,独立消费)
groupCtxB := eventbus.NewGroupIDContext(broadcastCtx, "service-b")
bus.Subscribe(groupCtxB, "config.update", handlerB)

// 发布配置更新,服务 A 和服务 B 都会收到(各 Group 内负载均衡)
bus.Publish(broadcastCtx, "config.update", configData)

Limits 模式(历史回溯)

支持消息持久化和历史回放,适合事件溯源、审计日志等场景。

// 使用上下文指定 Limits 模式
limitsCtx := eventbus.WithNATSJetStreamDeliveryMode(ctx, eventbus.NATSJetStreamDeliveryModeLimits)

// 有 Group 时组内负载均衡
groupCtx := eventbus.NewGroupIDContext(limitsCtx, "audit-processor")
bus.Subscribe(groupCtx, "audit.events", auditHandler)

// 无 Group 时每个实例独立消费所有消息
bus.Subscribe(limitsCtx, "audit.events", independentHandler)

// 发布审计事件
bus.Publish(limitsCtx, "audit.events", auditEvent)

投递模式对比

模式 消息分发 Group 语义 适用场景
WorkQueue 每条消息只被一个消费者处理 忽略 Group 任务分发、订单处理
Broadcast 每个 Group 都收到消息 组内负载均衡 配置更新、事件通知
Limits 支持历史回放 有 Group 则负载均衡 事件溯源、审计日志

消息头

// 设置消息头
headerCtx := eventbus.NewSetMessageHeaderContext(ctx, func(ctx context.Context) eventbus.MessageHeader {
    return eventbus.MessageHeader{
        "source":    "user_service",
        "timestamp": time.Now().Format(time.RFC3339),
        "version":   "v1.0",
    }
})

bus.Publish(headerCtx, "events", data)

错误处理和重试

EventBus 提供了强大的错误处理和重试机制:

// 配置重试选项
options := &eventbus.RedisOptions{
    MaxRetries:        3,                   // 连接失败最大重试次数
    RetryInterval:     time.Second * 2,     // 重试间隔
    BackoffMultiplier: 2.0,                 // 指数退避倍数
    MaxBackoff:        time.Minute,         // 最大退避时间
    MessageMaxRetries: 2,                   // 单条消息最大重试次数
    SkipBadMessages:   true,                // 跳过无法处理的消息
}

// 错误处理示例
bus.Subscribe(ctx, "events", func(ctx context.Context, msg *eventbus.Message) error {
    // 处理消息
    if err := processMessage(msg); err != nil {
        // 错误会自动重试,达到最大次数后进入死信队列
        return fmt.Errorf("处理失败: %w", err)
    }
    return nil
})

死信队列

当消息处理失败达到最大重试次数时,会自动发送到死信队列:

// 启用死信队列
options := &eventbus.RedisOptions{
    DeadLetterTopic: "my_app.dlq",  // 设置死信队列主题
    // ... 其他配置
}

// 订阅死信队列处理失败消息
bus.Subscribe(ctx, "my_app.dlq", func(ctx context.Context, msg *eventbus.Message) error {
    // 死信消息使用统一的 dlq_ 前缀格式
    if data, ok := msg.Value.(map[string]interface{}); ok {
        log.Printf("失败消息: ID=%v, 主题=%v, 错误=%v", 
            data["dlq_original_id"], 
            data["dlq_original_topic"], 
            data["dlq_error_reason"])
        
        // 可以实现告警、重新处理等逻辑
        return handleFailedMessage(data)
    }
    return nil
})

自定义配置

Redis Streams 配置

options := &eventbus.RedisOptions{
    ReadCount: 10,                          // 每次读取消息数量
    ReadBlock: time.Second * 5,            // 阻塞读取超时
    Serialize: &eventbus.JSONSerialize{},   // 序列化器
    Logger:    &eventbus.StdLogger{},       // 日志器
    
    // 重试机制配置
    MaxRetries:        3,                   // 连接最大重试次数
    RetryInterval:     time.Second * 5,     // 重试间隔
    BackoffMultiplier: 2.0,                 // 指数退避倍数
    MaxBackoff:        time.Minute * 5,     // 最大退避时间
    MessageMaxRetries: 3,                   // 消息最大重试次数
    SkipBadMessages:   true,                // 跳过无法处理的消息
    DeadLetterTopic:   "my_app.dlq",        // 死信队列主题(可选)
}

bus, err := eventbus.NewRedis(client, options)

Redis Queue 配置

options := &eventbus.RedisQueueOptions{
    PollInterval: time.Second * 2,          // 轮询间隔
    Serialize:    &eventbus.JSONSerialize{}, // 序列化器
    Logger:       &eventbus.StdLogger{},     // 日志器
    
    // 重试机制配置
    MaxRetries:        3,                   // 连接最大重试次数
    RetryInterval:     time.Second * 5,     // 重试间隔
    BackoffMultiplier: 2.0,                 // 指数退避倍数
    MaxBackoff:        time.Minute * 5,     // 最大退避时间
    MessageMaxRetries: 3,                   // 消息最大重试次数
    SkipBadMessages:   true,                // 跳过无法处理的消息
    DeadLetterTopic:   "my_app.dlq",        // 死信队列主题(可选)
}

bus, err := eventbus.NewRedisQueue(client, options)

NATS Core 配置

options := &eventbus.NATSOptions{
    Serialize:         &eventbus.JSONSerialize{}, // 序列化器
    Logger:            &eventbus.StdLogger{},     // 日志器
    
    // 重试机制配置
    MaxRetries:        3,                   // 连接最大重试次数
    RetryInterval:     time.Second * 5,     // 重试间隔
    BackoffMultiplier: 2.0,                 // 指数退避倍数
    MaxBackoff:        time.Minute * 5,     // 最大退避时间
    MessageMaxRetries: 3,                   // 消息最大重试次数
    SkipBadMessages:   true,                // 跳过无法处理的消息
    DeadLetterSubject: "my_app.dlq",        // 死信主题(可选)
}

bus, err := eventbus.NewNATS(nc, options)

NATS JetStream 配置

options := &eventbus.NATSJetStreamOptions{
    // 流配置
    StreamName:        "MY_STREAM",         // 流名称前缀(会根据模式自动添加后缀)
    MaxMsgs:           1000000,             // 最大消息数
    MaxAge:            time.Hour * 24 * 7,  // 消息最大保留时间
    DuplicateWindow:   time.Minute * 2,     // 重复消息检测窗口
    Replicas:          1,                   // 副本数
    
    // 消费者配置
    AckWait:           time.Second * 30,    // 消息确认等待时间
    MaxDeliver:        -1,                  // 最大投递次数(-1 表示无限)
    MaxWaiting:        512,                 // Pull 消费者最大等待请求数
    
    // 投递模式配置
    DefaultDeliveryMode: eventbus.NATSJetStreamDeliveryModeWorkQueue, // 默认投递模式
    InactiveThreshold:   time.Hour * 24,    // 消费者不活跃自动删除时间(仅 Broadcast 模式)
    
    // 异步发布配置
    PublishAsyncMaxPending: 4000,           // 异步发布最大待处理数
    
    Serialize:         &eventbus.JSONSerialize{}, // 序列化器
    Logger:            &eventbus.StdLogger{},     // 日志器
    
    // 重试机制配置
    MaxRetries:        3,                   // 连接最大重试次数
    RetryInterval:     time.Second * 5,     // 重试间隔
    BackoffMultiplier: 2.0,                 // 指数退避倍数
    MaxBackoff:        time.Minute * 5,     // 最大退避时间
    MessageMaxRetries: 3,                   // 消息最大重试次数
    SkipBadMessages:   true,                // 跳过无法处理的消息
    DeadLetterSubject: "my_app.dlq",        // 死信主题(可选)
}

bus, err := eventbus.NewNATSJetStream(nc, options)

RabbitMQ 配置

options := &eventbus.RabbitMQOptions{
    ExchangeName:        "my_exchange",     // 交换机名称
    ExchangeType:        "topic",           // 交换机类型
    QueueMessageExpires: 864000000,         // 消息过期时间
    Serialize:           &eventbus.JSONSerialize{}, // 序列化器
    Logger:              &eventbus.StdLogger{},     // 日志器
    PoolMinOpen:         1,                 // 最小连接池大小
    PoolMaxOpen:         10,                // 最大连接池大小
    
    // 重试机制配置
    MaxRetries:         3,                  // 连接最大重试次数
    RetryInterval:      time.Second * 2,    // 重试间隔
    BackoffMultiplier:  2.0,                // 指数退避倍数
    MaxBackoff:         time.Minute,        // 最大退避时间
    MessageMaxRetries:  2,                  // 消息最大重试次数
    SkipBadMessages:    true,               // 跳过无法处理的消息
    DeadLetterExchange: "my_app.dlx",       // 死信交换机(可选)
}

bus, err := eventbus.NewRabbitMQ(conn, options)

后端对比

特性 Redis Streams Redis Queue RabbitMQ NATS Core NATS JetStream
性能 很高 中等 很高
功能丰富度 中等 简单 很高 中等 很高
消息持久化
消息确认
消费组
历史回放
多投递模式
连接重试
消息重试
死信队列
错误隔离
云原生
水平扩展 中等 中等 中等 很高 很高
适用场景 流式处理 简单队列 企业应用 微服务 数据流平台

选择建议

  • Redis Streams: 适合需要消息持久化和历史回放的场景,具备完整的重试机制和死信队列支持
  • Redis Queue: 适合对性能要求高、消息处理简单的场景,现已支持重试和死信队列
  • RabbitMQ: 适合企业级应用,需要复杂路由和可靠性保证,具备全面的错误处理能力
  • NATS Core: 适合云原生微服务架构,提供超高性能的消息传递
  • NATS JetStream: 适合需要持久化的云原生数据流平台,支持三种投递模式(WorkQueue/Broadcast/Limits)满足不同业务场景

🚀 新特性亮点

  • 统一死信格式: 所有后端使用统一的 dlq_ 前缀死信消息格式,便于监控和处理
  • 智能重试: 连接失败时使用指数退避策略,避免雪崩效应
  • 消息级重试: 单条消息处理失败时精确重试,不影响其他消息
  • 死信队列管理: 支持死信队列长度限制和TTL设置
  • 错误隔离: 单个消息处理错误不会中断整个订阅循环
  • 云原生支持: 新增NATS Core和JetStream支持,适应现代微服务架构
  • 统一体验: 所有后端实现提供一致的重试和错误处理机制
  • JetStream 三模式: 支持 WorkQueue(任务分发)、Broadcast(广播)、Limits(历史回溯)三种投递模式

项目结构

eventbus/
├── eventbus.go          # 核心接口定义
├── message.go           # 消息结构和死信处理
├── context.go           # 上下文处理
├── delivery_mode.go     # NATS JetStream 投递模式定义
├── serializer.go        # 序列化器
├── logger.go            # 日志器
├── redis.go             # Redis Streams 实现
├── redisqueue.go        # Redis Queue 实现
├── nats.go              # NATS Core 实现
├── nats_jetstream.go    # NATS JetStream 实现(支持三种投递模式)
├── rabbitmq.go          # RabbitMQ 实现
├── examples/            # 示例代码
│   ├── redis/
│   ├── redisqueue/
│   └── rabbitmq/
└── *_test.go            # 测试文件

示例程序

基础示例

# Redis Streams 基础示例
cd examples/redis
go run main.go

# Redis Queue 基础示例
cd examples/redisqueue  
go run main.go

# RabbitMQ 基础示例
cd examples/rabbitmq
go run main.go

# NATS Core 示例
cd examples/nats
go run main.go

# NATS JetStream 示例
cd examples/nats_jetstream
go run main.go

重试机制示例

# Redis Streams 重试示例
cd examples/redis_retry
go run main.go

# Redis Queue 重试示例
cd examples/redis_queue_retry
go run main.go

# RabbitMQ 重试示例
cd examples/rabbitmq_retry
go run main.go

# 自定义消息头和死信队列示例
cd examples/custom_header
go run main.go

# Redis 死信处理优化示例
cd examples/redis_deadletter
go run main.go

依赖

require (
    github.com/go-redis/redis/v8 v8.11.5
    github.com/streadway/amqp v1.0.0
    github.com/nats-io/nats.go v1.31.0
    github.com/rs/xid v1.5.0
    github.com/sirupsen/logrus v1.8.1
    go.uber.org/zap v1.24.0
    github.com/nilorg/pkg v0.0.0-20221209071251-2bf5826d6883
)

文档

测试

# 运行所有测试
go test ./...

# 运行特定测试
go test -v ./redisqueue_test.go

# 基准测试
go test -bench=.

日志器支持

支持多种日志库:

序列化器

内置 JSON 序列化器,可扩展支持:

  • JSON (默认)
  • Protobuf
  • MessagePack
  • 自定义序列化器

贡献

欢迎提交 Pull Request 和 Issue!

  1. Fork 项目
  2. 创建特性分支 (git checkout -b feature/AmazingFeature)
  3. 提交更改 (git commit -m 'Add some AmazingFeature')
  4. 推送到分支 (git push origin feature/AmazingFeature)
  5. 打开 Pull Request

许可证

本项目基于 MIT 许可证 - 查看 LICENSE 文件了解详情。

相关项目


如果这个项目对你有帮助,请给个 ⭐ Star 支持一下!

Releases

No releases published

Packages

No packages published

Languages