写在前面:我是个后端开发,用过 Redis 的 Pub/Sub,听说过 Kafka,但从来没有认真用过消息队列。这篇文章记录我第一次接触 Apache Pulsar 的全过程,踩了哪些坑,怎么理解那些概念,希望对同样是新手的你有用。
为什么会接触 Pulsar?
项目里有个需求:多个服务之间要传消息,而且要可靠——发出去的消息不能丢,消费失败了要能重试,还要支持多个服务同时消费同一条消息。
Redis Pub/Sub 太轻量,消费者不在线时消息就丢了。Kafka 听起来很重,配置复杂。同事推荐了 Pulsar,说"功能全,概念清晰,适合现在的规模"。
于是就开始学了。
第一步:先搞清楚它是干什么的
在看文档之前,我先问自己:消息队列到底解决什么问题?
想象一个外卖平台:用户下单之后,要通知厨房备餐、通知骑手接单、通知财务记账……如果下单服务直接调用这三个服务的接口,任何一个挂掉都会导致下单失败。
消息队列的思路是:下单服务只管把"有人下单了"这条消息丢进队列,其他服务各自来取,互不影响。
这就是解耦。Pulsar 就是这样一个"消息中转站"。
第二步:启动一个 Pulsar 玩玩
文档里有很多部署方式,新手直接用 Docker,一行命令搞定:
docker run -it \
-p 6650:6650 \
-p 8080:8080 \
--name pulsar \
apachepulsar/pulsar:3.3.0 \
bin/pulsar standalone看到这行日志说明成功了:
INFO - messaging service is ready两个端口的作用:
6650:客户端连接 Pulsar 用这个(发消息、收消息)8080:管理后台,可以用浏览器或 curl 查状态
验证一下是否跑起来了:
curl http://localhost:8080/admin/v2/brokers/healthcheck
# 返回 "ok" 就没问题第三步:搞懂几个概念(用人话解释)
刚开始看文档,一堆术语:Tenant、Namespace、Topic、Subscription、Broker、Bookie……我直接懵了。
后来我用一个比喻理解清楚了:
把 Pulsar 想象成一个大型邮局
| Pulsar 概念 | 邮局比喻 |
|---|---|
| Tenant(租户) | 邮局里不同的企业客户(比如京东、淘宝分别租了一块地方) |
| Namespace(命名空间) | 每个企业客户内部划分的业务区域(京东的"订单组"、"物流组") |
| Topic | 具体的一个邮箱/信箱("订单创建"信箱) |
| Producer | 往信箱里投信的人(寄件方) |
| Consumer | 从信箱里取信的人(收件方) |
| Subscription | 取信的方式/协议(谁可以取、怎么取) |
| Broker | 邮局前台,负责收发调度,自己不存信 |
| Bookie | 后台仓库,真正存放信件的地方 |
Topic 的完整格式
Pulsar 的 Topic 名字有固定格式,刚开始总是搞错:
persistent://public/default/my-topic
│ │ │ │
│ │ │ └── Topic 名称(你自己起的)
│ │ └────────── Namespace(命名空间)
│ └───────────────── Tenant(租户)
└────────────────────────────── 消息是否持久化persistent 意味着消息写到磁盘,服务重启消息不丢失。还有 non-persistent,消息只在内存里,速度快但不可靠。
新手建议:一开始就用 persistent://public/default/你的topic名 这个格式,public/default 是 Pulsar 默认自带的,不用额外创建。
第四步:用命令行发第一条消息
Pulsar 自带命令行工具,可以不写代码直接发消息:
# 发消息(进入 Docker 容器里执行)
docker exec -it pulsar \
bin/pulsar-client produce persistent://public/default/hello-pulsar \
--messages "我的第一条消息"然后开另一个终端接收:
docker exec -it pulsar \
bin/pulsar-client consume persistent://public/default/hello-pulsar \
--subscription-name my-first-sub \
--num-messages 0--num-messages 0 的意思是一直监听,不自动退出。
看到下面这样的输出就说明消息收到了:
----- got message -----
value : 我的第一条消息成就感满满! 这时候我大概知道 Pulsar 是怎么工作的了。
第五步:用 Go 代码实现生产者和消费者
光靠命令行不够,我们要在代码里用。我用的是 Go,先安装客户端:
go get github.com/apache/pulsar-client-go/pulsar生产者:发消息
package main
import (
"context"
"fmt"
"log"
"github.com/apache/pulsar-client-go/pulsar"
)
func main() {
// 第一步:建立连接
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
})
if err != nil {
log.Fatal("连接失败:", err)
}
defer client.Close()
// 第二步:创建 Producer
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: "persistent://public/default/hello-pulsar",
})
if err != nil {
log.Fatal("创建 Producer 失败:", err)
}
defer producer.Close()
// 第三步:发消息
msgID, err := producer.Send(context.Background(), &pulsar.ProducerMessage{
Payload: []byte("Hello from Go!"),
})
if err != nil {
log.Fatal("发送失败:", err)
}
fmt.Printf("消息发送成功,消息 ID: %v\n", msgID)
}消费者:收消息
package main
import (
"context"
"fmt"
"log"
"github.com/apache/pulsar-client-go/pulsar"
)
func main() {
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
})
if err != nil {
log.Fatal(err)
}
defer client.Close()
// 创建 Consumer,需要指定订阅名
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "persistent://public/default/hello-pulsar",
SubscriptionName: "my-go-sub", // 订阅名,同名的 Consumer 共享消费进度
Type: pulsar.Shared, // 订阅类型,先用 Shared
})
if err != nil {
log.Fatal(err)
}
defer consumer.Close()
fmt.Println("开始监听消息...")
for {
// Receive 是阻塞的,没消息时会一直等
msg, err := consumer.Receive(context.Background())
if err != nil {
log.Printf("接收出错: %v", err)
continue
}
fmt.Printf("收到消息: %s\n", string(msg.Payload()))
// 处理完一定要 Ack!否则消息会被反复投递
consumer.Ack(msg)
}
}⚠️ 新手最容易忘的一件事:Ack
Pulsar 默认不会自动确认消息。你处理完之后必须调用
consumer.Ack(msg),告诉 Pulsar "这条我处理好了"。如果不 Ack,Pulsar 认为你没处理成功,下次还会再发给你,就出现重复消费了。
第六步:搞懂订阅类型(这里卡了我挺久的)
第一次看文档说有四种订阅类型,我没搞清楚有什么区别,全部实验了一遍才明白。
Exclusive(独占)
消息 A B C D E
↓
Consumer 1(只有它一个在消费)同一个订阅名下只允许一个 Consumer 连接。第二个 Consumer 试图连接时会报错。
适合:需要保证顺序,并且单个消费者就够用的场景。
Shared(共享)
消息 A B C D E F
↙ ↓ ↘
Consumer1 Consumer2 Consumer3消息轮流分发给多个 Consumer,可以水平扩展。
缺点:不保证顺序,因为消息被分给不同的人处理。
适合:对顺序没要求,需要多个实例并行处理提高吞吐的场景。
Failover(灾备)
正常情况:
消息 A B C D E → Consumer 1(主)
Consumer 1 挂了:
消息 F G H I → Consumer 2(自动接管)平时只有一个主 Consumer 消费,主挂了之后自动切换到备用 Consumer,保证顺序。
适合:需要顺序 + 高可用的场景。
Key_Shared(按 Key 分组共享)
消息(带 Key):
order-1: A C E → Consumer 1(专门处理 order-1)
order-2: B D → Consumer 2(专门处理 order-2)
order-3: F → Consumer 3(专门处理 order-3)相同 Key 的消息永远路由到同一个 Consumer,不同 Key 并行处理。
适合:按业务 ID 保证顺序 + 需要水平扩展(比如交易系统按订单 ID 路由)。
第七步:消息处理失败了怎么办?
这是我第一次没想到的问题:如果处理消息时报错了,该怎么办?
答案是用 Nack(Negative Acknowledge,否定确认):
msg, _ := consumer.Receive(context.Background())
err := processMessage(msg)
if err != nil {
// 处理失败,告诉 Pulsar 重新投递
consumer.Nack(msg)
continue
}
// 处理成功
consumer.Ack(msg)调用 Nack 之后,这条消息会在一段时间后重新投递给 Consumer。
但如果一条消息一直处理失败,会无限重试吗?不会。Pulsar 有死信队列(Dead Letter Topic)机制:
consumer, _ := client.Subscribe(pulsar.ConsumerOptions{
Topic: "persistent://public/default/orders",
SubscriptionName: "order-processor",
Type: pulsar.Shared,
DLQ: &pulsar.DLQPolicy{
MaxDeliveries: 3, // 最多重试 3 次
// 超过 3 次后,消息自动转移到死信 Topic
DeadLetterTopic: "persistent://public/default/orders-DLQ",
},
})重试超过 3 次后,消息会被扔进死信队列,你可以单独处理这些"问题消息",不影响正常流程。
我踩过的坑
坑 1:忘记 Ack,消息一直重复
现象:同一条消息被消费了好多次。
原因:consumer.Receive() 之后忘记调用 consumer.Ack(msg)。
教训:Receive 和 Ack 要成对出现,处理完立刻 Ack。
坑 2:Topic 名字格式写错
写成了 hello-pulsar,忘记加前缀 persistent://public/default/。
Pulsar 不会直接报错,而是自动帮你补全成 persistent://public/default/hello-pulsar,但有时候行为不符合预期。
教训:始终写完整的 Topic 路径,不要依赖自动补全。
坑 3:消费者启动比生产者晚,消息丢了
现象:生产者发了消息,消费者后来才启动,什么都没收到。
原因:我用的是 non-persistent Topic,消息不落盘,消费者不在线时消息就丢了。
教训:开发测试阶段统一用 persistent Topic,消息持久化到磁盘,消费者随时启动都能拿到历史消息。
坑 4:以为 Shared 模式会保证顺序
发了 1、2、3 三条消息,消费者收到的顺序是 1、3、2。
原因:Shared 模式消息是轮询分发的,多个 Consumer 处理速度不同,顺序无法保证。
教训:需要顺序就用 Exclusive 或 Key_Shared,不要用 Shared。