在落地 Pulsar 的过程中,开发者最容易踩的坑集中在两个问题上:
问题一:"我有多个消费者,消息是怎么分发的?顺序能保证吗?"
问题二:"消费者处理业务逻辑很慢,会不会把整个消费流程阻塞住?"
这两个问题的答案,分别对应 Pulsar 的订阅模式(Subscription Type) 和消费模型(Consumption Pattern)。
本文基于 Go 语言,通过完整可运行的代码,逐一拆解四种订阅模式,并给出生产环境中解决阻塞问题的异步消费方案。
环境准备
# 1. 启动本地 Pulsar(Docker)
docker run -d --name pulsar \
-p 6650:6650 \
apachepulsar/pulsar:latest \
bin/pulsar standalone
# 2. 初始化 Go 项目
go mod init pulsar-demo
go get github.com/apache/pulsar-client-go/pulsar
# 3. 运行 Demo
go run pulsar_subscription_demo.go一、基础概念:订阅模式是什么?
Pulsar 的订阅(Subscription) 是消费者与 Topic 之间的绑定关系,由三个要素确定:
Topic ──────────────────→ Subscription
│
├─ SubscriptionName(订阅名,唯一标识)
├─ Type(订阅模式,决定分发规则)
└─ Cursor(消费进度,记录哪些消息已 Ack)订阅模式在 Consumer 订阅时声明,一旦该订阅名创建,模式不可更改。
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "persistent://public/default/my-topic",
SubscriptionName: "my-sub",
Type: pulsar.KeyShared, // ← 在这里声明,只此一次
})Pulsar 提供四种订阅模式,覆盖从"严格有序"到"最大吞吐"的所有需求:
严格有序 ◄─────────────────────────────────► 最大吞吐
│ │
Exclusive Failover Key_Shared Shared
(独占) (主备) (Key路由) (共享)二、Topic 命名规则
在看代码之前,先理解 Pulsar Topic 的完整命名格式:
persistent:// public / default / my-topic
───────────── ────── ─────── ────────
存储类型 租户 命名空间 Topic名称
persistent = 消息持久化到磁盘(BookKeeper)
non-persistent = 消息不落盘,Broker 重启即丢失
public = 默认租户(内置)
default = 默认命名空间(内置)三层命名空间(租户 / 命名空间 / Topic)正是 Pulsar 原生多租户的基础,每一层都可以独立配置权限、配额和存储策略。
三、创建客户端连接
客户端是整个应用的单例,一个进程通常只需要一个 Client。
func newClient() pulsar.Client {
client, err := pulsar.NewClient(pulsar.ClientOptions{
// Broker 地址,格式 pulsar://host:6650
// 生产环境通常是 pulsar+ssl://host:6651(开启 TLS)
URL: "pulsar://localhost:6650",
// 单次操作超时(Send / Receive 超时后返回 error,而非永久阻塞)
OperationTimeout: 30 * time.Second,
// TCP 连接建立超时(网络不通时快速失败,而非等待系统默认超时)
ConnectionTimeout: 30 * time.Second,
// 生产环境建议补充:
// Authentication: pulsar.NewAuthenticationToken("eyJ..."), // Token 认证
// TLSOptions: &pulsar.TLSOptions{InsecureSkipVerify: false}, // TLS 加密
})
if err != nil {
log.Fatalf("创建 Pulsar 客户端失败: %v", err)
}
return client
}四、Producer:发送消息
4.1 普通消息(无 Key)
func produceMessages(client pulsar.Client, topic string, messages []string) {
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: topic,
// Pulsar 默认开启批量发送(Batching):
// 将 10ms 内的多条消息合并成一批发给 Broker,显著提升吞吐量。
// 若需要严格逐条确认(如金融场景),可关闭:
// EnableBatching: false,
// 生产环境建议开启压缩,降低网络带宽:
// CompressionType: pulsar.LZ4, // 压缩率低,速度快
// CompressionType: pulsar.ZSTD, // 压缩率高,CPU 消耗略高
})
if err != nil {
log.Fatalf("创建 Producer 失败: %v", err)
}
defer producer.Close() // 关闭时会刷新缓冲区,确保所有消息发出
for _, msg := range messages {
// Send() 同步发送:阻塞直到 Broker 确认写入 BookKeeper 多数派副本
// 返回的 MsgID 是消息的唯一标识,格式:(LedgerID, EntryID, BatchIndex)
// 可用于日志追踪、消息回溯、审计
msgID, err := producer.Send(context.Background(), &pulsar.ProducerMessage{
Payload: []byte(msg), // 消息体是原始字节,序列化格式由业务自定(JSON/Protobuf等)
})
if err != nil {
log.Printf("发送失败: %v", err)
} else {
fmt.Printf("[Producer] 发送: %-20s → MsgID: %v\n", msg, msgID)
}
}
}4.2 带 Key 的消息(Key_Shared 模式专用)
func produceMessagesWithKey(client pulsar.Client, topic string, messages []struct{ key, value string }) {
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: topic,
})
if err != nil {
log.Fatalf("创建 Producer 失败: %v", err)
}
defer producer.Close()
for _, m := range messages {
_, err := producer.Send(context.Background(), &pulsar.ProducerMessage{
// Key 决定消息路由到哪个 Consumer
// 路由规则:ConsumerIndex = hash(Key) % ActiveConsumerCount
// 同一个 Key → 始终路由到同一个 Consumer → 该 Key 下消息严格有序
//
// 典型 Key 设计:
// 用户ID → 同一用户的操作按序处理
// 订单ID → 同一订单的状态变更按序处理
// 设备ID → 同一 IoT 设备的数据按序入库
Key: m.key,
Payload: []byte(m.value),
})
if err != nil {
log.Printf("发送失败: %v", err)
} else {
fmt.Printf("[Producer] Key=%-10s Value=%s\n", m.key, m.value)
}
}
}五、四种订阅模式详解
5.1 Exclusive —— 独占订阅
核心特点: 同一订阅名在同一时刻只允许 1 个 Consumer 连接,第二个会被 Broker 直接拒绝。
Broker
msg-1 ──────────────→ Consumer-1(唯一的 Active Consumer)
msg-2 ──────────────→ Consumer-1
msg-3 ──────────────→ Consumer-1
↑
Consumer-2 尝试连接 → 被 Broker 拒绝(ConsumerBusy)消息顺序:✅ 严格全局有序
func demoExclusive() {
client := newClient()
defer client.Close()
topic := "persistent://public/default/demo-exclusive"
// 第一个 Consumer:订阅成功,成为唯一的 Active Consumer
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: topic,
SubscriptionName: "exclusive-sub",
Type: pulsar.Exclusive, // 独占模式
// ReceiverQueueSize:Consumer 本地预拉取队列大小(默认 1000)
// Pulsar 提前从 Broker 拉取消息缓存到本地内存,减少每次 Receive() 的网络延迟。
// 设太大:内存占用高;设太小:网络往返频繁。
// 业务处理快(<10ms)可设大(5000+);业务处理慢可设小(100~500)。
ReceiverQueueSize: 100,
})
if err != nil {
log.Fatalf("订阅失败: %v", err)
}
defer consumer.Close()
// 第二个 Consumer:尝试以相同订阅名连接,会被 Broker 拒绝
_, err2 := client.Subscribe(pulsar.ConsumerOptions{
Topic: topic,
SubscriptionName: "exclusive-sub", // 同一订阅名 → 必然冲突
Type: pulsar.Exclusive,
})
if err2 != nil {
// 这是预期行为,说明 Exclusive 语义生效
fmt.Printf("[预期行为] 第二个 Consumer 被拒绝: %v\n", err2)
}
go produceMessages(client, topic, []string{"msg-1", "msg-2", "msg-3"})
time.Sleep(500 * time.Millisecond)
for i := 0; i < 3; i++ {
// WithTimeout 防止 Receive() 在没有消息时永久阻塞
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
msg, err := consumer.Receive(ctx) // 阻塞,直到有消息到达或超时
cancel()
if err != nil {
break
}
fmt.Printf("[Consumer] 收到: %s\n", string(msg.Payload()))
// ⚠️ 黄金法则:先执行业务逻辑,成功后再 Ack
// 顺序不能颠倒!先 Ack 再处理,业务失败后消息永久丢失
time.Sleep(100 * time.Millisecond) // 模拟业务处理(写DB、调接口等)
// Ack:通知 Broker 该消息已成功处理,Cursor 可以推进
// Ack 后 Broker 不会再投递这条消息
consumer.Ack(msg)
fmt.Printf("[Consumer] Ack: %s\n", string(msg.Payload()))
}
}适用场景:
| ✅ 适合 | ❌ 不适合 |
|---|---|
| 开发测试环境 | 生产高可用(宕机无法切换) |
| 严格单消费者 + 全局有序 | 需要水平扩展消费能力 |
| 业务逻辑简单,处理速度快 | 处理慢,容易积压 |
5.2 Failover —— 主备订阅
核心特点: 多个 Consumer 可以连接同一订阅,但同一时刻只有主(Active)Consumer 接收消息。主 Consumer 断开后,备(Standby)Consumer 自动升级为 Active,无缝接管。
正常状态:
msg-1 ──→ Consumer-1(Active)
msg-2 ──→ Consumer-1
Consumer-2(Standby,保持连接,等待接管)
Consumer-1 宕机:
Broker 检测到心跳超时(约 30s,可配置)
↓
Consumer-2 自动升为 Active
msg-3 ──→ Consumer-2(从上次 Cursor 位置继续,不丢消息)主备选举规则: Pulsar 按 Consumer 连接顺序决定优先级,先连接的优先级高。
消息顺序:✅ 严格全局有序
func demoFailover() {
client := newClient()
defer client.Close()
topic := "persistent://public/default/demo-failover"
// 主消费者:第一个连接,自动成为 Active
primaryConsumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: topic,
SubscriptionName: "failover-sub",
Type: pulsar.Failover,
})
if err != nil {
log.Fatalf("主Consumer订阅失败: %v", err)
}
defer primaryConsumer.Close()
fmt.Println("[主Consumer] Active,正在接收消息")
// 备用消费者:第二个连接,自动成为 Standby
// 与 Broker 保持心跳连接,随时准备接管
// 注意:必须使用相同的 SubscriptionName
standbyConsumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: topic,
SubscriptionName: "failover-sub", // 同一订阅名
Type: pulsar.Failover,
})
if err != nil {
log.Fatalf("备Consumer订阅失败: %v", err)
}
defer standbyConsumer.Close()
fmt.Println("[备Consumer] Standby,等待主Consumer故障")
fmt.Println("[说明] 主Consumer宕机 → 备Consumer自动接管 → 从Cursor断点继续 → 不丢消息")
go produceMessages(client, topic, []string{"msg-1", "msg-2", "msg-3"})
time.Sleep(500 * time.Millisecond)
for i := 0; i < 3; i++ {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
msg, err := primaryConsumer.Receive(ctx)
cancel()
if err != nil {
break
}
fmt.Printf("[主Consumer] 处理: %s\n", string(msg.Payload()))
// 若主Consumer在 Ack 前宕机:
// 备Consumer接管后,Broker 会重新投递这条未 Ack 的消息(At-Least-Once)
primaryConsumer.Ack(msg)
}
}与 Exclusive 的区别:
| 维度 | Exclusive | Failover |
|---|---|---|
| 第二个 Consumer | ❌ 被拒绝 | ✅ 可连接(Standby) |
| 故障切换 | ❌ 无法切换 | ✅ 自动切换(毫秒级) |
| 消费顺序 | ✅ 严格有序 | ✅ 严格有序 |
| 适合场景 | 简单单消费者 | 高可用 + 严格有序 |
5.3 Shared —— 共享订阅
核心特点: 多个 Consumer 同时连接,消息以 Round-Robin(轮询)方式分发,每条消息只投递给一个 Consumer。吞吐量最高,但不保证顺序。
Broker(9条消息,3个Consumer轮询):
msg-1 ──→ Consumer-1(50ms 处理)
msg-2 ──→ Consumer-2(100ms 处理)
msg-3 ──→ Consumer-3(150ms 处理)
msg-4 ──→ Consumer-1
msg-5 ──→ Consumer-2
...
完成顺序(按处理速度):
msg-1 ✅ (50ms)
msg-4 ✅ (100ms)
msg-2 ✅ (150ms) ← 乱序!
...消息顺序:❌ 不保证
Individual Ack(Pulsar 核心优势):
Pulsar Cursor(位图):
msg-1 ✅ msg-2 ❌ msg-3 ✅ msg-4 ✅ msg-5 ❌
↑ ↑
只重投这两条,其余不受影响
Kafka Offset(线性):
offset-1 ✅ offset-2 ❌ offset-3 ✅ offset-4 ✅
↑
必须等这里提交,否则重启从 offset-2 开始重放
导致 offset-3、offset-4 被重复消费!func demoShared() {
client := newClient()
defer client.Close()
topic := "persistent://public/default/demo-shared"
var wg sync.WaitGroup
// 启动 3 个并行 Consumer,模拟水平扩展
// 水平扩展只需启动更多 Consumer 实例,Broker 自动均衡分发,无需重启或 Rebalance
for i := 1; i <= 3; i++ {
wg.Add(1)
consumerID := i
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: topic,
SubscriptionName: "shared-sub", // 同一订阅名,Shared 模式允许多 Consumer 并存
Type: pulsar.Shared,
// Broker 根据 Consumer 本地队列的剩余容量推送消息(背压机制)
// 队列满 → Broker 暂停推送 → Consumer 处理完腾出空间 → 继续推送
// 这是 Pulsar 的流控机制(Flow Control),防止 Consumer OOM
ReceiverQueueSize: 100,
})
if err != nil {
log.Printf("Consumer-%d 订阅失败: %v", consumerID, err)
wg.Done()
continue
}
go func(c pulsar.Consumer, id int) {
defer wg.Done()
defer c.Close()
for {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
msg, err := c.Receive(ctx)
cancel()
if err != nil {
return // 超时,没有更多消息,退出循环
}
// 模拟不同 Consumer 处理速度不同(演示乱序)
processingTime := time.Duration(id*50) * time.Millisecond
time.Sleep(processingTime)
fmt.Printf("[Consumer-%d] 收到: %-12s 耗时: %dms\n",
id, string(msg.Payload()), processingTime.Milliseconds())
// Shared 模式下每条消息独立 Ack(Individual Ack)
// msg-2 未 Ack 不会影响 msg-1、msg-3 的 Ack 和 Cursor 推进
c.Ack(msg)
}
}(consumer, consumerID)
}
time.Sleep(300 * time.Millisecond)
// 生产 9 条消息,预期被 3 个 Consumer 轮询分摊(约各 3 条)
produceMessages(client, topic, []string{
"task-1", "task-2", "task-3",
"task-4", "task-5", "task-6",
"task-7", "task-8", "task-9",
})
wg.Wait()
// 观察输出:完成顺序不等于发送顺序(乱序是正常的)
}适用场景:
- 日志收集、监控告警(量大、顺序无关)
- 任务队列:发邮件、发短信、图片压缩等幂等任务
- 需要弹性水平扩展的通用队列
⚠️ 注意: Shared 模式下任务必须幂等设计,因为 Consumer 宕机时未 Ack 的消息会重新投递,可能导致重复处理。
5.4 Key_Shared —— Key 路由订阅 ⭐ 生产最推荐
核心特点: 融合 Shared(高并发)和 Failover(有序)的优点。同一个 Key 的消息固定路由到同一个 Consumer(该 Key 下严格有序),不同 Key 的消息分发到不同 Consumer(并行处理)。
路由规则:ConsumerIndex = hash(MessageKey) % ActiveConsumerCount
Broker(3个Consumer,3个用户的订单事件):
user-A/下单 ──→ Consumer-1 ┐
user-A/支付 ──→ Consumer-1 ├── user-A 严格有序:下单→支付→发货
user-A/发货 ──→ Consumer-1 ┘
user-B/下单 ──→ Consumer-2 ┐
user-B/支付 ──→ Consumer-2 ├── user-B 严格有序:下单→支付→发货
user-B/发货 ──→ Consumer-2 ┘
user-C/下单 ──→ Consumer-3 ┐
user-C/支付 ──→ Consumer-3 ├── user-C 严格有序:下单→支付→发货
user-C/发货 ──→ Consumer-3 ┘
三个用户并行处理,互不等待 → 高吞吐 + 有序func demoKeyShared() {
client := newClient()
defer client.Close()
topic := "persistent://public/default/demo-key-shared"
var wg sync.WaitGroup
// 启动 3 个 Consumer(Key_Shared 模式)
// Broker 将不同的 Key 哈希分配到这 3 个 Consumer
// 当 Consumer 数量变化时,Pulsar 自动重新分配 Key → Consumer 的映射(无需重启)
for i := 0; i < 3; i++ {
wg.Add(1)
consumerID := i + 1
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: topic,
SubscriptionName: "key-shared-sub",
Type: pulsar.KeyShared, // Key 路由模式
// ⚠️ Key_Shared 关键配置:
// 若某个 Consumer 有消息长时间未 Ack,该 Consumer 负责的所有 Key 后续消息都会阻塞!
// 原因:为保证 Key 内有序,下一条同 Key 消息必须等上一条 Ack 后才能投递。
// 解决方案:配置 AckTimeout,超时自动重投,避免永久阻塞整个 Key 的消费链路。
// AckTimeout: 30 * time.Second,
// AckTimeoutTimer: 1 * time.Second,
})
if err != nil {
log.Printf("Consumer-%d 订阅失败: %v", consumerID, err)
wg.Done()
continue
}
go func(c pulsar.Consumer, id int) {
defer wg.Done()
defer c.Close()
for {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
msg, err := c.Receive(ctx)
cancel()
if err != nil {
return
}
// msg.Key() 返回消息的路由 Key
// 关键观察:同一个 Key(如 user-A)始终由同一个 Consumer 处理
fmt.Printf("[Consumer-%d] Key=%-10s Value=%-6s ← 同Key消息在此Consumer内严格有序\n",
id, msg.Key(), string(msg.Payload()))
time.Sleep(50 * time.Millisecond) // 模拟业务处理
// Ack 之后,Broker 才会将同 Key 的下一条消息投递过来
c.Ack(msg)
}
}(consumer, consumerID)
}
time.Sleep(300 * time.Millisecond)
// 生产三个用户的订单事件(模拟乱序到达的现实场景)
// 预期:每个用户的事件在其对应 Consumer 内严格按序处理
produceMessagesWithKey(client, topic, []struct{ key, value string }{
{"user-A", "下单"},
{"user-B", "下单"},
{"user-A", "支付"}, // user-A:必须在 下单 之后处理
{"user-C", "下单"},
{"user-B", "支付"},
{"user-A", "发货"}, // user-A:必须在 支付 之后处理
{"user-C", "支付"},
{"user-B", "发货"},
{"user-C", "发货"},
})
wg.Wait()
}与 Kafka 分区方案的对比:
| 维度 | Kafka 分区 | Pulsar Key_Shared |
|---|---|---|
| 同 Key 有序 | ✅ | ✅ |
| 并行消费 | ✅(受分区数限制) | ✅(不受限) |
| 扩容方式 | 增加分区 → Rebalance(有停顿) | 增加 Consumer → 自动重路由(无停顿) |
| Consumer 数 > 分区数 | ❌ 有 Consumer 空闲 | ✅ 所有 Consumer 均可工作 |
六、Ack 机制深度解析
6.1 三种 Ack 操作
// ✅ 正常 Ack:业务成功,消息处理完毕,Cursor 推进
consumer.Ack(msg)
// ❌ Negative Ack:业务失败,主动要求重新投递
// Broker 在 negativeAckRedeliveryDelay 后重新投递(默认 1 分钟)
consumer.NegativeAcknowledge(msg)
// ⏱️ Ack 超时兜底:若超过 ackTimeout 未 Ack,Broker 自动重新投递
// 在 ConsumerOptions 中配置:
// AckTimeout: 30 * time.Second,6.2 Ack 的黄金法则
❌ 错误顺序(先 Ack 再处理):
Receive → Ack → 业务逻辑失败 → 消息永久丢失(无法找回)
✅ 正确顺序(先处理再 Ack):
Receive → 业务逻辑 → 成功 → Ack
→ 失败 → NegativeAcknowledge(触发重投)6.3 Pulsar Cursor vs Kafka Offset
Kafka(线性 Offset):
[✅1][✅2][❌3][✅4][✅5]
↑
Offset 卡在 3,重启后从 3 开始重放
4 和 5 被重复消费 → At-Least-Once 在这里很痛
Pulsar(位图 Cursor):
[✅1][✅2][❌3][✅4][✅5]
↑
只记录 3 未确认,重启后只重投 3
4 和 5 不受影响 → 更精准的 At-Least-Once七、异步消费:解决业务阻塞问题
7.1 问题:同步消费的吞吐量瓶颈
同步消费(单线程):
Consumer 线程
├─ Receive msg-1
├─ 业务逻辑(写 DB,耗时 2s)← 整个消费循环阻塞 2s
├─ Ack msg-1
├─ Receive msg-2
├─ 业务逻辑(2s)
├─ Ack msg-2
...
5 条消息串行处理:总耗时 = 5 × 2s = 10s7.2 解决方案:Goroutine 池异步消费
func demoAsyncConsume() {
client := newClient()
defer client.Close()
consumer, _ := client.Subscribe(pulsar.ConsumerOptions{
Topic: "persistent://public/default/demo-async",
SubscriptionName: "async-sub",
Type: pulsar.Shared,
// ReceiverQueueSize 设大:
// 主循环 Receive() 从本地内存队列取消息(无网络 I/O,极快)
// Goroutine 池满时主循环短暂阻塞,本地队列依然在接收 Broker 推送的消息
// 避免 Broker 端因 Consumer 无响应而减少推送速率
ReceiverQueueSize: 1000,
})
defer consumer.Close()
// Semaphore Channel:Go 惯用的"有界并发"实现
// 容量 = 最大并发 Goroutine 数(根据业务特性调整)
// CPU 密集型任务:goroutine 数 ≈ CPU 核心数
// IO 密集型任务:goroutine 数可设较大(如 50~200),让 CPU 在等待 IO 时处理其他任务
sem := make(chan struct{}, 10)
go produceMessages(client, consumer.Topic(), []string{
"task-1", "task-2", "task-3", "task-4", "task-5",
})
time.Sleep(300 * time.Millisecond)
startTime := time.Now()
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
msg, err := consumer.Receive(ctx) // 主循环:只取消息,极快
cancel()
if err != nil {
break
}
wg.Add(1)
sem <- struct{}{} // 占槽;若 10 个 Goroutine 都在运行,这里短暂阻塞(自然背压)
// 立刻启动 Goroutine,主循环不等待,马上 Receive() 下一条
go func(m pulsar.Message) {
defer wg.Done()
defer func() { <-sem }() // 释放槽,让主循环可以继续
fmt.Printf("[Goroutine] 开始处理: %s (t=%s)\n",
string(m.Payload()), time.Since(startTime).Round(time.Millisecond))
// 耗时业务逻辑(调外部接口、写 DB 等)
// 这 2 秒内,主循环已经在取下一条消息并启动新 Goroutine 了
time.Sleep(2 * time.Second)
// ✅ 业务成功 → Ack
consumer.Ack(m)
// ❌ 业务失败时改为:
// consumer.NegativeAcknowledge(m)
fmt.Printf("[Goroutine] Ack 完成: %s (t=%s)\n",
string(m.Payload()), time.Since(startTime).Round(time.Millisecond))
}(msg)
}
wg.Wait()
fmt.Printf("全部完成,总耗时: %s(同步需 ~10s)\n",
time.Since(startTime).Round(time.Millisecond))
}效果对比(5条消息,每条业务耗时 2 秒):
同步消费:msg-1(2s) → msg-2(2s) → msg-3(2s) → msg-4(2s) → msg-5(2s) = 10s
异步消费:
t=0ms → 取 msg-1,启动 Goroutine-1
t=1ms → 取 msg-2,启动 Goroutine-2
t=2ms → 取 msg-3,启动 Goroutine-3
t=3ms → 取 msg-4,启动 Goroutine-4
t=4ms → 取 msg-5,启动 Goroutine-5
t=2004ms → 所有 Goroutine 完成 ≈ 2s ✅八、各模式横向对比
| 维度 | Exclusive | Failover | Shared | Key_Shared |
|---|---|---|---|---|
| 并发 Consumer 数 | 1 | 多(主+备) | 多 | 多 |
| 全局消息顺序 | ✅ | ✅ | ❌ | ❌ |
| Key 内消息顺序 | ✅ | ✅ | ❌ | ✅ |
| 自动故障转移 | ❌ | ✅ | ✅ | ✅ |
| 水平扩展 | ❌ | ❌ | ✅ | ✅ |
| 吞吐量 | 低 | 低 | 最高 | 高 |
| 典型场景 | 测试/简单消费 | 金融账务 | 日志/任务队列 | 订单/用户流 |
九、选型决策树
需要消费者高可用(故障自动切换)?
├─ 否 → Exclusive(简单场景/测试)
└─ 是
└─ 需要严格全局顺序?
├─ 是 → Failover(金融账务、配置变更)
└─ 否
└─ 需要 per-Key 顺序(同一实体的消息有序)?
├─ 是 → Key_Shared ⭐(订单、用户行为、IoT 设备)
└─ 否 → Shared(日志、告警、幂等任务队列)十、生产环境配置建议
Consumer 关键参数
consumer, _ := client.Subscribe(pulsar.ConsumerOptions{
Topic: "persistent://tenant/ns/topic",
SubscriptionName: "prod-sub",
Type: pulsar.KeyShared,
// 预拉取队列:根据业务处理速度调整
// 处理快(<5ms):设 5000+;处理慢(>100ms):设 100~500
ReceiverQueueSize: 1000,
// Ack 超时:防止 Consumer 卡死导致消息积压
// Key_Shared 模式下尤其重要
AckTimeout: 30 * time.Second,
AckTimeoutTimer: 1 * time.Second,
// Negative Ack 重投延迟:失败后多久重试(默认 1 分钟)
// 防止频繁重试打爆下游服务
NackRedeliveryDelay: 5 * time.Second,
// 死信队列:消息重试超过次数后发送到 DLQ,避免阻塞正常消费
DLQ: &pulsar.DLQPolicy{
MaxDeliveries: 3, // 最多重试 3 次
DeadLetterTopic: "my-topic-DLQ", // 死信 Topic
},
})Producer 关键参数
producer, _ := client.CreateProducer(pulsar.ProducerOptions{
Topic: "persistent://tenant/ns/topic",
CompressionType: pulsar.LZ4, // 开启压缩
BatchingMaxPublishDelay: 10 * time.Millisecond, // 批量发送窗口
// 开启幂等发送(防止网络重试导致重复写入)
// 结合事务 API 可实现端到端 Exactly-Once
})总结
Pulsar 订阅模式的设计哲学是:把"顺序"和"吞吐"的权衡暴露给开发者,而不是在系统内部做妥协。
- 需要有序:Exclusive 或 Failover,系统保证全局顺序
- 需要吞吐:Shared,牺牲顺序换取最大并发
- 两者都要:Key_Shared,在 Key 粒度上有序,跨 Key 并行
消费模型上,同步消费适合入门,异步消费(Goroutine 池)才是生产标准。Ack 必须在业务逻辑成功后发出,这是 At-Least-Once 语义的基石。