标签:KafkaGolang定时任务事件驱动架构设计
Go Kafka 库:github.com/segmentio/kafka-go
为什么要替换定时任务?
定时任务在小规模系统中运行良好,但随着业务增长,问题逐渐暴露:
| 问题 | 描述 |
|---|---|
| 时间耦合 | 必须等到固定时间点才触发,实时性差 |
| 重复执行 | 多实例部署需要额外分布式锁,稍有不慎就重复执行 |
| 全量扫描 | 通常需要扫全表找待处理数据,数据量大时拖垮数据库 |
| 失败难追踪 | 没有自然的重试机制,失败了难以回溯 |
| 扩展性差 | 任务量增大时无法动态扩容消费能力 |
方案一:事件驱动替代状态轮询
适用场景
- 订单支付后触发发货
- 用户注册后发送欢迎邮件
- 状态流转触发下游处理
流程对比
❌ 旧方案(定时轮询)
┌─────────────────────────────────────────────┐
│ Cron Job(每 5 分钟) │
│ ↓ │
│ SELECT * FROM orders WHERE status='paid' │ ← 全表扫描,数据量大时极慢
│ ↓ │
│ for each order → 触发发货逻辑 │
│ ↓ │
│ UPDATE orders SET status='shipping' │
└─────────────────────────────────────────────┘
问题:最多延迟 5 分钟响应,高峰期扫表慢
✅ 新方案(事件驱动)
支付服务 Kafka 发货服务
│ │ │
│ 支付成功 │ │
│──发布 order.paid ────→│ │
│ │──推送消息 ────────────→│
│ │ │ 毫秒级实时触发发货
│ │ │ 处理完提交 offset
│ │ │Go 实现
// ========== Producer:支付完成发事件 ==========
package payment
import (
"context"
"encoding/json"
"time"
"github.com/segmentio/kafka-go"
)
// OrderPaidEvent 是订单支付成功的事件结构体
// 这个结构体会被序列化为 JSON 作为 Kafka 消息的 Value 发送
type OrderPaidEvent struct {
OrderID string `json:"order_id"`
UserID string `json:"user_id"`
Amount float64 `json:"amount"`
PaidAt time.Time `json:"paid_at"`
// MessageID 是消费端幂等去重的唯一标识
// 消费者收到消息后先用这个 ID 检查是否已处理过,防止重复消费
MessageID string `json:"message_id"`
}
// writer 是全局共享的 Kafka 生产者,kafka-go 的 Writer 是协程安全的,可以复用
var writer = &kafka.Writer{
// Addr:Kafka Broker 地址列表
// 填多个地址是为了高可用:其中一个 Broker 宕机时,Producer 会自动切换到其他地址
// 这里只需要填部分 Broker 地址,Producer 连上后会自动发现集群其他节点(Bootstrap)
Addr: kafka.TCP("kafka1:9092", "kafka2:9092", "kafka3:9092"),
// Topic:消息写入的目标 Topic 名称
// 命名规范:{业务域}.{实体}.{事件类型},便于管理和监控
Topic: "order.order.paid",
// Balancer:决定消息路由到哪个 Partition 的策略
// kafka.Hash{} 表示对消息的 Key 做哈希取模,相同 Key 的消息永远路由到同一 Partition
// 这样可以保证同一个 OrderID 的消息是有序的(Kafka 只保证单 Partition 内有序)
// 其他可选值:
// &kafka.RoundRobin{} → 轮询,适合不需要顺序保证的场景,吞吐更均匀
// &kafka.LeastBytes{} → 优先发给堆积最少的 Partition
Balancer: &kafka.Hash{},
// RequiredAcks:发送消息后需要等待多少副本确认才算成功
// kafka.RequireAll = acks=-1,等待所有 ISR 副本写入确认,最安全,不丢消息(推荐生产使用)
// kafka.RequireOne = acks=1, 只等 Leader 确认,Leader 宕机可能丢消息
// kafka.RequireNone = acks=0, 不等任何确认,性能最高但可能丢消息
RequiredAcks: kafka.RequireAll,
// Async:是否异步发送
// false(默认):同步发送,WriteMessages 会阻塞直到 Broker 确认,适合对可靠性要求高的场景
// true:异步发送,WriteMessages 立即返回,错误通过 Completion 回调处理,吞吐更高但需额外处理错误
Async: false,
}
// OnPaymentSuccess 在支付成功时调用,将事件发布到 Kafka
func OnPaymentSuccess(ctx context.Context, orderID, userID string, amount float64) error {
event := OrderPaidEvent{
OrderID: orderID,
UserID: userID,
Amount: amount,
PaidAt: time.Now(),
// 用 orderID + 事件类型 拼接成幂等 ID
// 保证同一笔订单的支付成功事件即使重试发送,消费端也只处理一次
MessageID: orderID + "_paid",
}
payload, _ := json.Marshal(event)
return writer.WriteMessages(ctx, kafka.Message{
// Key:消息路由键,决定这条消息去哪个 Partition
// 这里用 OrderID 作为 Key,保证同一个订单的所有事件都在同一个 Partition,保序
Key: []byte(orderID),
Value: payload,
})
}// ========== Consumer:发货服务实时消费 ==========
package shipping
import (
"context"
"encoding/json"
"log"
"time"
"github.com/segmentio/kafka-go"
)
func StartConsumer(ctx context.Context) {
r := kafka.NewReader(kafka.ReaderConfig{
// Brokers:Kafka 集群地址,填多个保证高可用,逻辑同 Producer
Brokers: []string{"kafka1:9092", "kafka2:9092"},
// Topic:要消费的 Topic 名称,需与 Producer 写入的 Topic 一致
Topic: "order.order.paid",
// GroupID:消费者组 ID
// 同一个 GroupID 的多个消费者实例共同消费这个 Topic
// Kafka 会自动将 Partition 分配给组内不同的消费者,实现负载均衡
// 不同 GroupID 的消费者相互独立,都能收到全量消息(广播语义)
// 例如:发货服务和通知服务可以用不同的 GroupID 同时消费同一个 Topic
GroupID: "shipping-service-group",
// MinBytes:每次拉取请求等待的最小数据量(字节)
// 设为 1 表示有消息就立即返回,延迟最低,适合实时性要求高的场景
// 设大一点(如 1MB)可以减少请求次数,提升吞吐,但会增加延迟
MinBytes: 1,
// MaxBytes:每次拉取请求返回的最大数据量(字节)
// 10e6 = 10MB,防止一次拉取过多消息导致内存压力
// 需要结合单条消息大小和处理能力来设置
MaxBytes: 10e6,
// CommitInterval:自动提交 offset 的时间间隔
// 0 表示关闭自动提交,改为手动调用 CommitMessages 控制
// 手动提交的好处:只有业务处理成功后才提交,避免消息丢失
// 如果设为自动提交(如 time.Second),消息拉下来还没处理完就提交了,
// 一旦进程崩溃,这批消息就再也不会被处理(消息丢失)
CommitInterval: 0,
// StartOffset:当 GroupID 第一次消费这个 Topic 时,从哪里开始
// kafka.FirstOffset = 从最早的消息开始消费(不遗漏历史消息)
// kafka.LastOffset = 从最新的消息开始消费(忽略历史消息)
// 注意:这个配置只对新的 GroupID 生效,已有 offset 记录的 Group 会从上次位置继续
StartOffset: kafka.FirstOffset,
// MaxWait:当消息不足 MinBytes 时,最多等待多久再返回
// 设为 3s 表示即使消息量不够,等 3 秒后也强制返回,避免消费者长时间阻塞
MaxWait: 3 * time.Second,
})
defer r.Close()
for {
// FetchMessage:拉取一条消息,但不提交 offset
// 与 ReadMessage 的区别:ReadMessage 会自动提交,FetchMessage 需要手动 CommitMessages
// 这里用 FetchMessage 配合手动提交,保证"处理成功才提交"
msg, err := r.FetchMessage(ctx)
if err != nil {
log.Printf("拉取消息失败: %v", err)
continue
}
var event OrderPaidEvent
if err := json.Unmarshal(msg.Value, &event); err != nil {
log.Printf("消息解析失败,跳过: %v", err)
// 解析失败说明消息格式有问题,重试也没用,直接提交跳过
// 否则这条消息会一直阻塞,后续消息无法消费
r.CommitMessages(ctx, msg)
continue
}
// 幂等检查 + 业务处理
if err := processWithIdempotent(ctx, event); err != nil {
log.Printf("处理失败,发往 DLQ: %v", err)
sendToDLQ(ctx, msg) // 失败消息发到死信队列,不阻塞主流程
}
// 只有业务处理完成(无论成功还是进入 DLQ)后,才提交 offset
// 这保证了 At Least Once 语义:消息至少被处理一次
if err := r.CommitMessages(ctx, msg); err != nil {
log.Printf("offset 提交失败: %v", err)
}
}
}
func processWithIdempotent(ctx context.Context, event OrderPaidEvent) error {
// 用 Redis SetNX(Set if Not eXists)实现幂等去重
// SetNX 是原子操作:Key 不存在时设置并返回 true,已存在时返回 false
// TTL 设为 24 小时:超过这个时间窗口的重复消息概率极低,可以接受
ok, _ := rdb.SetNX(ctx,
"msg:processed:"+event.MessageID, // Key:用消息唯一 ID 标识
1, // Value:随意,只用 Key 来判断是否存在
24*time.Hour, // TTL:24 小时后 Key 自动删除,释放内存
).Result()
if !ok {
// ok=false 说明 Key 已存在,这是一条重复消息,直接跳过
log.Printf("重复消息,跳过: %s", event.MessageID)
return nil
}
// 确认是新消息,执行业务逻辑
return shippingService.Trigger(event.OrderID)
}方案二:延迟消息替代延时定时任务
适用场景
- 下单 30 分钟未支付自动取消
- 发货 7 天后自动确认收货
- 优惠券到期提醒
时间轮原理
时间轮(Timing Wheel) 是一种高效处理大量定时任务的数据结构。
想象一个时钟表盘,表盘被均匀分成 N 个槽(Slot),每个槽对应一个时间刻度(如 1 秒)。
指针每隔一个刻度向前走一格,走到哪个槽就触发该槽中所有到期的任务。0s ┌──┐ 59s──┤ ├──1s ← 指针当前在 0s,每秒走一格 │ │ ... │ │ ... 每个槽里挂着"在这个时刻到期"的任务列表 │ │ 30s──┴──┘ 到期的任务从槽中取出并执行 优点:插入任务 O(1),触发任务 O(1),远优于最小堆的 O(log n) 适合:海量定时任务(如百万级订单超时取消)对比最小堆(Go
time.AfterFunc内部实现):
- 最小堆:任务量大时,每次插入/删除都是 O(log n),任务多了性能下降
- 时间轮:无论多少任务,插入和触发都是 O(1),更适合高并发延迟场景
流程图
┌─────────────────────────────────────────────┐
│ 延迟调度服务 │
│ │
业务服务 │ delay-queue Topic │ 真实 Topic
│ │ │ │ │
│ 下单成功 │ │ │ │
│─写入 ────────→│────────→ consume 拉取消息 │ │
│ execute_at │ │ │ │
│ = now+30min │ ↓ │ │
│ │ 时间轮(每秒 tick) │ │
│ │ 槽内挂载到期任务 │ │
│ │ │ 30分钟后到期 │ │
│ │ └──────────────────────────────→│───────→ 取消订单
│ │ 转发到 order.cancel Topic │ │ Consumer
│ │ │ │
└───────────────┴──────────────────────────────────────────────┴───────┘Go 实现
// ========== 发送延迟消息 ==========
package delay
import (
"context"
"encoding/json"
"time"
"github.com/segmentio/kafka-go"
)
// DelayMessage 是延迟消息的信封结构
// 它不是真正的业务消息,而是一个"包裹",里面装着真正要投递的内容和投递时间
type DelayMessage struct {
// RealTopic:到期后消息需要被转发到的真实 Topic
RealTopic string `json:"real_topic"`
// Key:转发时使用的消息 Key,保证路由到正确的 Partition
Key string `json:"key"`
// Value:真正的业务消息内容(原样转发,不解析)
Value json.RawMessage `json:"value"`
// ExecuteAt:消息应该被触发的时间点(Unix 毫秒时间戳)
// 调度器会不断检查当前时间是否 >= ExecuteAt,到期则转发
ExecuteAt int64 `json:"execute_at"`
}
// delay-queue 是所有延迟消息的"暂存区"
// 所有需要延迟执行的消息都先写到这里,由调度器统一管理投递时机
var writer = &kafka.Writer{
Addr: kafka.TCP("kafka1:9092"),
Topic: "delay-queue",
RequiredAcks: kafka.RequireAll,
}
// SendDelay 将一条业务消息包装成延迟消息发送到暂存 Topic
// realTopic:最终要投递的 Topic
// key: 消息的路由 Key
// value: 业务消息内容(会被原样透传)
// delay: 延迟时长,如 30*time.Minute
func SendDelay(ctx context.Context, realTopic, key string, value any, delay time.Duration) error {
payload, _ := json.Marshal(value)
msg := DelayMessage{
RealTopic: realTopic,
Key: key,
Value: payload,
// 当前时间 + 延迟时长 = 应该被触发的绝对时间点
ExecuteAt: time.Now().Add(delay).UnixMilli(),
}
body, _ := json.Marshal(msg)
return writer.WriteMessages(ctx, kafka.Message{
Key: []byte(key),
Value: body,
})
}
// OnOrderCreated 下单时调用,发送 30 分钟后自动取消的延迟消息
func OnOrderCreated(ctx context.Context, orderID string) {
SendDelay(ctx,
"order.order.cancel", // 到期后投递到这个 Topic
orderID, // 路由 Key
map[string]string{"order_id": orderID}, // 业务内容
30*time.Minute, // 延迟 30 分钟
)
}// ========== 延迟调度服务(时间轮转发)==========
package delay
import (
"context"
"encoding/json"
"log"
"time"
"github.com/segmentio/kafka-go"
)
// timerTask 是内存时间轮中的一个待触发任务
type timerTask struct {
msg DelayMessage // 解析后的延迟消息,包含目标 Topic 和业务内容
commitMsg kafka.Message // 原始 Kafka 消息,转发成功后用于提交 delay-queue 的 offset
}
// Scheduler 是延迟调度器的主体
// 它扮演"中间人"角色:从 delay-queue 拉取消息 → 放入时间轮等待 → 到期后转发到真实 Topic
type Scheduler struct {
reader *kafka.Reader // 从 delay-queue 拉取消息的消费者
writers map[string]*kafka.Writer // 按目标 Topic 缓存 Writer,避免重复创建连接
timerCh chan timerTask // 时间轮到期后,将任务投入这个 channel,由 dispatch 协程处理
}
func NewScheduler() *Scheduler {
return &Scheduler{
reader: kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"kafka1:9092"},
Topic: "delay-queue",
// 调度器是单独的服务,用独立的 GroupID
// 保证 delay-queue 的每条消息只被调度器处理一次
GroupID: "delay-scheduler-group",
CommitInterval: 0, // 手动提交:转发成功后才提交,防止消息丢失
MinBytes: 1,
MaxBytes: 10e6,
}),
writers: make(map[string]*kafka.Writer),
// timerCh 缓冲区设为 10000:
// 允许最多 10000 个任务同时到期等待被 dispatch 消费
// 缓冲区满时,时间轮的 goroutine 会阻塞,起到背压作用
timerCh: make(chan timerTask, 10000),
}
}
func (s *Scheduler) Start(ctx context.Context) {
go s.consume(ctx) // 协程1:持续从 delay-queue 拉取消息,放入时间轮
go s.dispatch(ctx) // 协程2:持续从 timerCh 取到期任务,转发到真实 Topic
}
// consume 从 delay-queue 拉取消息,计算剩余等待时间,用 time.Sleep 模拟时间轮槽
// 注意:每条消息会启动一个独立 goroutine 等待到期,适合任务量适中的场景
// 若任务量极大(百万级),建议替换为 github.com/RussellLuo/timingwheel 等成熟时间轮库
func (s *Scheduler) consume(ctx context.Context) {
for {
// FetchMessage 拉取消息,不自动提交 offset
m, err := s.reader.FetchMessage(ctx)
if err != nil {
log.Printf("delay-queue 拉取失败: %v", err)
continue
}
var dm DelayMessage
if err := json.Unmarshal(m.Value, &dm); err != nil {
log.Printf("延迟消息解析失败,跳过: %v", err)
s.reader.CommitMessages(ctx, m) // 格式错误,跳过这条消息
continue
}
// 计算距离到期还有多久
wait := time.Until(time.UnixMilli(dm.ExecuteAt))
if wait < 0 {
wait = 0 // 已经过期的消息立即投递(如服务重启后的补偿)
}
// 为每条消息启动一个 goroutine,sleep 到期后投入 dispatch channel
// 这就是简化版"时间轮":每个 goroutine 相当于时间轮的一个槽
go func(task timerTask, wait time.Duration) {
log.Printf("延迟任务入轮: key=%s, 等待=%v", task.msg.Key, wait)
// select 监听两个事件:
// 1. wait 到期:正常触发投递
// 2. ctx 取消:服务关闭时优雅退出,不再等待
select {
case <-time.After(wait):
s.timerCh <- task // 到期,投入 dispatch 队列
case <-ctx.Done():
log.Printf("调度器关闭,任务丢弃: key=%s", task.msg.Key)
// 生产环境应将未到期任务持久化到 DB,重启后恢复
}
}(timerTask{msg: dm, commitMsg: m}, wait)
}
}
// dispatch 从 timerCh 取出到期任务,转发到真实 Topic,然后提交 delay-queue 的 offset
// 这个顺序非常重要:必须先确认转发成功,再提交 offset
// 如果先提交 offset 再转发失败,delay-queue 里的这条消息就永久丢失了
func (s *Scheduler) dispatch(ctx context.Context) {
for {
select {
case task := <-s.timerCh:
w := s.getWriter(task.msg.RealTopic) // 获取目标 Topic 的 Writer
err := w.WriteMessages(ctx, kafka.Message{
Key: []byte(task.msg.Key),
Value: task.msg.Value, // 原样转发业务内容
})
if err != nil {
log.Printf("转发到 %s 失败: %v,消息将重新投递", task.msg.RealTopic, err)
// 转发失败:不提交 offset,消息会在下次服务重启后重新调度
// 注意:这可能导致轻微的重复投递,消费端需要做幂等处理
continue
}
// 转发成功后才提交 delay-queue 的 offset,防止重复调度
if err := s.reader.CommitMessages(ctx, task.commitMsg); err != nil {
log.Printf("offset 提交失败: %v", err)
}
log.Printf("延迟消息转发成功: key=%s → topic=%s", task.msg.Key, task.msg.RealTopic)
case <-ctx.Done():
return
}
}
}
// getWriter 按 Topic 获取或创建 Writer,避免重复建立连接
func (s *Scheduler) getWriter(topic string) *kafka.Writer {
if w, ok := s.writers[topic]; ok {
return w
}
w := &kafka.Writer{
Addr: kafka.TCP("kafka1:9092"),
Topic: topic,
RequiredAcks: kafka.RequireAll,
}
s.writers[topic] = w
return w
}方案三:流式窗口替代定时批处理
适用场景
- 每小时统计订单金额报表
- 每天对账汇总
- 实时监控指标聚合
流程图
❌ 旧方案(定时批处理)
Cron 每小时触发
↓
SELECT SUM(amount) FROM orders ← 重复全量计算,数据库压力大
WHERE created_at BETWEEN ? AND ?
↓
写入报表表
✅ 新方案(滚动窗口聚合)
订单服务 Kafka 聚合消费者 Doris
│ │ │ │
│ 每笔订单 ──────→│ │ │
│ │── 实时推送 ────────→│ │
│ │ │ 内存累加 │
│ │ │ 整点窗口关闭 │
│ │ │──── 批量写入 ────────→│
│ │ │ │ 报表 / BI 查询Go 实现
package aggregator
import (
"context"
"encoding/json"
"log"
"sync"
"time"
"github.com/segmentio/kafka-go"
)
type OrderEvent struct {
OrderID string `json:"order_id"`
MerchantID string `json:"merchant_id"`
Amount float64 `json:"amount"`
CreatedAt time.Time `json:"created_at"`
}
// WindowResult 是一个时间窗口内的聚合结果
// 例如:某商户在 14:00~15:00 的订单总金额和笔数
type WindowResult struct {
MerchantID string
WindowStart time.Time // 窗口开始时间(整点)
WindowEnd time.Time // 窗口结束时间(下一个整点)
TotalAmount float64
Count int64
}
// WindowAggregator 是滚动窗口聚合器
type WindowAggregator struct {
mu sync.Mutex // 保护 windows map 的并发安全
windows map[string]*WindowResult // key = merchantID|windowStart,value = 该窗口的聚合数据
ticker *time.Ticker // 定时触发窗口关闭和刷新
}
func NewWindowAggregator() *WindowAggregator {
wa := &WindowAggregator{
windows: make(map[string]*WindowResult),
// 每小时触发一次窗口刷新,将上一个小时的聚合结果写入 Doris
ticker: time.NewTicker(time.Hour),
}
go wa.flushLoop()
return wa
}
func (wa *WindowAggregator) Start(ctx context.Context) {
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"kafka1:9092"},
Topic: "order.order.created",
GroupID: "hourly-aggregator-group",
// 聚合场景可以适当增大 MinBytes,减少请求次数,提升吞吐
MinBytes: 1e3, // 1KB
MaxBytes: 10e6, // 10MB
// 最多等 500ms,即使消息量不够 MinBytes 也返回,保证聚合的实时性
MaxWait: 500 * time.Millisecond,
})
defer r.Close()
for {
m, err := r.FetchMessage(ctx)
if err != nil {
continue
}
var event OrderEvent
if err := json.Unmarshal(m.Value, &event); err != nil {
r.CommitMessages(ctx, m)
continue
}
// 将消息累加到对应的时间窗口
wa.accumulate(event)
r.CommitMessages(ctx, m)
}
}
// accumulate 将一条订单消息的金额累加到对应的时间窗口中
func (wa *WindowAggregator) accumulate(event OrderEvent) {
// Truncate(time.Hour) 将时间对齐到整点
// 例如:14:37:22 → 14:00:00,这样同一小时内的消息都落入同一个窗口
windowStart := event.CreatedAt.Truncate(time.Hour)
// key = merchantID|windowStart,唯一标识一个窗口
key := event.MerchantID + "|" + windowStart.String()
wa.mu.Lock()
defer wa.mu.Unlock()
// 第一次看到这个窗口,初始化
if _, ok := wa.windows[key]; !ok {
wa.windows[key] = &WindowResult{
MerchantID: event.MerchantID,
WindowStart: windowStart,
WindowEnd: windowStart.Add(time.Hour),
}
}
wa.windows[key].TotalAmount += event.Amount
wa.windows[key].Count++
}
// flushLoop 每小时将当前所有窗口的聚合结果写入 Doris,然后清空内存
// 注意:这里有个简化:整点时写入上一个小时的数据
// 生产环境建议用水位线(Watermark)机制处理迟到消息
func (wa *WindowAggregator) flushLoop() {
for range wa.ticker.C {
// 原子性地取出所有窗口数据并重置,减少锁持有时间
wa.mu.Lock()
toFlush := wa.windows
wa.windows = make(map[string]*WindowResult)
wa.mu.Unlock()
for _, result := range toFlush {
if err := doris.Insert(result); err != nil {
log.Printf("写入 Doris 失败: %v", err)
// 生产环境应有重试或补偿机制,如写入本地文件后告警人工处理
}
}
log.Printf("窗口刷新完成,写入 %d 条聚合结果", len(toFlush))
}
}方案四:心跳超时检测替代健康检查定时任务
适用场景
- IoT 设备在线检测
- 微服务实例存活检测
- 长连接客户端超时判断
流程图
设备 / 服务 Kafka 心跳消费者 Redis
│ │ │ │
│ 每 30s 发心跳 │ │ │
│──→ device.heartbeat ────→│ │ │
│ │──── 实时推送 ───────────→│ │
│ │ │ SET device:online │
│ │ │ :deviceID EX 90 ──→│ TTL 刷新
│ │ │ │
│ (超过 90s 无心跳) │ │ │
│ │ │ Key 过期 ──→ 触发过期事件
│ │ │ │
│ │ │ 告警服务 ←─ 订阅过期事件
│ │ │ 钉钉 / PagerDuty 通知Go 实现
// ========== 设备心跳 Producer ==========
package device
import (
"context"
"encoding/json"
"time"
"github.com/segmentio/kafka-go"
)
type HeartbeatEvent struct {
DeviceID string `json:"device_id"`
IP string `json:"ip"`
Timestamp time.Time `json:"timestamp"`
}
var writer = &kafka.Writer{
Addr: kafka.TCP("kafka1:9092"),
Topic: "device.heartbeat",
// kafka.Hash{} 保证同一设备的心跳路由到同一 Partition,便于顺序消费
Balancer: &kafka.Hash{},
// 心跳消息对可靠性要求略低,允许偶尔丢失(网络抖动导致的)
// 使用 RequireOne 而非 RequireAll,降低发送延迟
// 因为即使少发一两条心跳,只要下一条在 TTL 内到达,设备仍被认为在线
RequiredAcks: kafka.RequireOne,
// Async=true:异步发送,心跳不阻塞主业务流程
// 发送失败由 Completion 回调处理,或直接忽略(下次心跳会补偿)
Async: true,
}
// StartHeartbeat 启动设备心跳,每 30 秒发送一次
// 心跳间隔(30s)必须小于 Redis TTL(90s)的 1/2,留足余量应对网络延迟
func StartHeartbeat(ctx context.Context, deviceID, ip string) {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return // 收到停止信号,优雅退出
case <-ticker.C:
event := HeartbeatEvent{
DeviceID: deviceID,
IP: ip,
Timestamp: time.Now(),
}
payload, _ := json.Marshal(event)
writer.WriteMessages(ctx, kafka.Message{
Key: []byte(deviceID), // 用 DeviceID 作为 Key,保证同设备路由一致
Value: payload,
})
}
}
}// ========== 心跳消费者:刷新 Redis TTL ==========
package monitor
import (
"context"
"encoding/json"
"log"
"time"
"github.com/redis/go-redis/v9"
"github.com/segmentio/kafka-go"
)
const (
// HeartbeatTTL:Redis Key 的存活时长
// 设计原则:TTL = 心跳间隔 × 3
// 心跳间隔 30s,TTL = 90s,允许连续丢失 2 次心跳后才判定为离线
// 这样可以容忍短暂的网络抖动,避免误报
HeartbeatTTL = 90 * time.Second
)
func StartHeartbeatConsumer(ctx context.Context, rdb *redis.Client) {
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"kafka1:9092"},
Topic: "device.heartbeat",
GroupID: "device-monitor-group",
// 心跳场景要求低延迟,MinBytes=1 有消息立即消费
MinBytes: 1,
MaxBytes: 10e6,
MaxWait: 1 * time.Second,
})
defer r.Close()
for {
m, err := r.FetchMessage(ctx)
if err != nil {
continue
}
var event HeartbeatEvent
if err := json.Unmarshal(m.Value, &event); err != nil {
r.CommitMessages(ctx, m)
continue
}
// 每次收到心跳就刷新 Redis 中这台设备的 Key,并重置 TTL
// Set 会覆盖旧值并重置过期时间(比 Expire 命令更原子)
// Key 格式:device:online:{deviceID},Value 存 IP 便于排查
rdb.Set(ctx,
"device:online:"+event.DeviceID,
event.IP,
HeartbeatTTL,
)
r.CommitMessages(ctx, m)
log.Printf("心跳更新: device=%s ip=%s ttl=%v", event.DeviceID, event.IP, HeartbeatTTL)
}
}// ========== Redis Key 过期监听:触发离线告警 ==========
package monitor
import (
"context"
"log"
"strings"
"github.com/redis/go-redis/v9"
)
// StartOfflineListener 监听 Redis Key 过期事件,设备 Key 过期即代表设备离线
//
// 前置条件:需要在 Redis 配置中开启 Keyspace 通知
// redis.conf 中设置:notify-keyspace-events "KEx"
// K = Keyspace 事件(以 __keyspace@<db>__ 为前缀)
// E = Keyevent 事件(以 __keyevent@<db>__ 为前缀)
// x = 过期事件(expired)
//
// 或者运行时执行:CONFIG SET notify-keyspace-events KEx
func StartOfflineListener(ctx context.Context, rdb *redis.Client) {
// 订阅 db0 中所有 Key 的过期事件
// 格式:__keyevent@{dbIndex}__:expired
pubsub := rdb.PSubscribe(ctx, "__keyevent@0__:expired")
defer pubsub.Close()
log.Println("开始监听设备离线事件...")
for msg := range pubsub.Channel() {
// msg.Payload 是过期的 Key 名称,如 "device:online:device-001"
key := msg.Payload
// 只处理设备心跳相关的 Key,过滤其他业务的 Key
if !strings.HasPrefix(key, "device:online:") {
continue
}
// 从 Key 中提取 DeviceID
deviceID := strings.TrimPrefix(key, "device:online:")
log.Printf("⚠️ 设备离线: %s", deviceID)
// 异步发送告警,不阻塞监听循环
go alertService.SendOfflineAlert(ctx, deviceID)
}
}方案五:死信队列(DLQ)兜底
无论哪种方案,消费失败的消息都需要有去处,不能阻塞主流程。
流程图
主 Topic
│
↓
Consumer 处理
│
├── 成功 ──→ CommitOffset,结束
│
└── 失败
│
├── 可重试(网络抖动等)──→ 指数退避重试(1s → 2s → 4s,最多 3 次)
│ │
│ └── 仍失败 ──→ 写入 DLQ Topic
│
└── 不可重试(数据格式错误等)──────→ 写入 DLQ Topic
│
┌───────┴────────┐
│ DLQ Consumer │
│ 人工审查 / 修复 │
│ 修复后重新投递 │
└────────────────┘Go 实现
package consumer
import (
"context"
"fmt"
"log"
"time"
"github.com/segmentio/kafka-go"
)
// dlqWriter 专门用于写入死信队列
// 死信队列本身要求高可靠,必须用 RequireAll
var dlqWriter = &kafka.Writer{
Addr: kafka.TCP("kafka1:9092"),
RequiredAcks: kafka.RequireAll,
}
// NonRetryableError 表示不可重试的错误(如消息格式错误、业务逻辑校验失败)
// 遇到这类错误继续重试没有意义,应该直接进 DLQ 等人工处理
type NonRetryableError struct{ msg string }
func (e NonRetryableError) Error() string { return e.msg }
// processWithRetry 带重试和 DLQ 的消息处理入口
// handler:真正的业务处理函数,由调用方传入
func processWithRetry(ctx context.Context, msg kafka.Message, handler func(kafka.Message) error) error {
maxRetry := 3
for i := 0; i < maxRetry; i++ {
err := handler(msg)
if err == nil {
return nil // 处理成功,退出
}
// 判断是否为不可重试错误
if _, ok := err.(NonRetryableError); ok {
log.Printf("不可重试错误,直接发往 DLQ: %v", err)
return sendToDLQ(ctx, msg, err)
}
if i < maxRetry-1 {
// 指数退避:第 1 次失败等 1s,第 2 次等 2s,第 3 次等 4s
// 避免瞬间大量重试打垮下游服务
wait := time.Duration(1<<i) * time.Second
log.Printf("第 %d 次重试,%v 后重试: %v", i+1, wait, err)
select {
case <-time.After(wait):
case <-ctx.Done():
return ctx.Err()
}
}
}
// 超出最大重试次数,发往 DLQ
log.Printf("超出最大重试次数(%d),发往 DLQ", maxRetry)
return sendToDLQ(ctx, msg, fmt.Errorf("超出最大重试次数"))
}
// sendToDLQ 将失败消息发送到死信队列 Topic
// DLQ Topic 命名规范:原 Topic 名 + ".DLQ"
// 例如:order.order.paid → order.order.paid.DLQ
func sendToDLQ(ctx context.Context, msg kafka.Message, reason error) error {
dlqTopic := msg.Topic + ".DLQ"
// 将原始消息的 Header 带过去,并追加失败信息,便于排查
headers := append(msg.Headers,
// 记录消息来自哪个 Topic,方便 DLQ 消费者知道从哪里来的
kafka.Header{Key: "original-topic", Value: []byte(msg.Topic)},
// 记录失败原因,人工查看时可以快速定位问题
kafka.Header{Key: "fail-reason", Value: []byte(reason.Error())},
// 记录失败时间,便于判断消息已经在队列里积压了多久
kafka.Header{Key: "fail-time", Value: []byte(time.Now().Format(time.RFC3339))},
// 记录原始 Partition 和 Offset,便于追溯
kafka.Header{Key: "original-partition", Value: []byte(fmt.Sprintf("%d", msg.Partition))},
kafka.Header{Key: "original-offset", Value: []byte(fmt.Sprintf("%d", msg.Offset))},
)
return dlqWriter.WriteMessages(ctx, kafka.Message{
Topic: dlqTopic,
Key: msg.Key, // 保留原始 Key,便于路由和排查
Value: msg.Value, // 保留原始消息体,便于人工重新投递
Headers: headers,
})
}方案对比总结
| 定时任务场景 | Kafka 替换方案 | 实时性 | 复杂度 |
|---|---|---|---|
| 状态变更触发下游 | 事件驱动(方案一) | 毫秒级 | 低 |
| N 分钟后执行某操作 | 延迟消息 + 时间轮(方案二) | 秒级 | 中 |
| 定时批量统计报表 | 滚动窗口聚合(方案三) | 分钟级 | 中 |
| 心跳 / 超时检测 | Kafka + Redis TTL(方案四) | 秒级 | 低 |
| 失败补偿 / 兜底 | 死信队列 DLQ(方案五) | 人工介入 | 低 |
不适合替换的场景
核心判断标准:
触发条件是"某件事发生了"→ 用 Kafka 事件驱动
触发条件是"到了某个时间点"→ Cron 定时任务仍是最简选择,不要过度设计