Kafka使用msgpack序列化
在之前的文章 使用Kafka(附Golang代码)
中已经体验了Kafka的Golang客户端,不过在文中的例子中传输的消息是一个字节数组,类型很单一。但是在实际的工作中一个消息有复杂的有效载荷(Payload),在Golang中通常是一个结构体变量,我们就用之前电影条目结构体:
type Subject struct { ID int Title string Genres []string } subject := Subject{ ID: 2, Title: "千与千寻", Genres: []string{"剧情", "喜剧", "爱情", "战争"}, }
那怎么把subject作为消息发给Kafka进而被消费呢?这就涉及序列化和反序列化。
序列化和反序列化
维基百科对它的描述是:
序列化(serialization)将数据结构或对象状态转换成可取用格式(例如存成文件,存于缓冲,或经由网络中发送),以留待后续在相同或另一台计算机环境中,能恢复原先状态的过程。依照序列化格式重新获取字节的结果时,可以利用它来产生与原始对象相同语义的副本。序列化的过程也称为对象编组(marshalling)。从一系列字节提取数据结构的反向操作,是反序列化(也称为解编组、deserialization、unmarshalling)。
序列化库在网络传输、RPC、数据库访问等环境中经常用到,也都非常成熟。目前据我了解Golang序列化的方式主要有以下4种:
encoding/gob
Gob(Go binary)是Golang内置的以二进制形式序列化和反序列化程序数据的格式,可以通过 encoding/gob
操作数据。不过Gob的协议决定了它的场景都得用纯Go编写,所以不适合多语言生产环境。
encoding/json
JSON(JavaScript Object Notation)由于其结构简单、可读性高、数据体积小等优点,现在是最主流的数据交换格式,几乎所有编程语言都有解析JSON的库,Golang标准库提供 encoding/json
包操作JSON数据。
Protobuf/Thrift等第三方库
在之前在Golang中实现RPC已经体验了gRPC,它就是基于ProtoBuf序列化协议的。另外还有很多公司选择使用Thrift等第三方库实现序列化和反序列化。不过这类方案适合跨语言RPC服务,强调(反)序列化的效率、协议的保密性、可扩展性、更小的空间等,而对于一般的序列化来说有点重了。
PS: 延伸阅读链接1是一个对各种Golang世界的库的(反)序列化基准测试的项目。里面有非常多的库和详细数据供参考。
msgpack
Msgpack是一个支持跨语言通信的,基于二进制高效的对象序列化库。它可以像JSON那样,在许多种语言之间交换结构对象;但是它比JSON更快速也更轻巧。
我这里选择了用Golang实现的编解码库 vmihailenco/msgpack
,首先安装它:
❯ go get -u github.com/vmihailenco/msgpack
使用msgpack
对于Kafka发布者和消费者的例子来说,就不序列化Key,只序列化Value就可以了。先看发布者部分:
import "github.com/vmihailenco/msgpack" subject := Subject{ ID: 2, Title: "千与千寻", Genres: []string{"剧情", "喜剧", "爱情", "战争"}, } b, err := msgpack.Marshal(&subject) if err != nil { panic(err) } msg := &sarama.ProducerMessage{ Topic: topic, Key: sarama.StringEncoder(strconv.Itoa(subject.ID)), Value: sarama.StringEncoder(b), }
Key是转化成字符串的subject.ID,Value就是把subject序列化后的结果:
❯ gore gore version 0.4.1 :help for help gore> type Subject struct { ..... ID int ..... Title string ..... Genres []string ..... } gore> subject := Subject{ ..... ID: 2, ..... Title: "千与千寻", ..... Genres: []string{"剧情", "喜剧", "爱情", "战争"}, ..... } main.Subject{ ID: 2, Title: "千与千寻", Genres: []string{ "剧情", "喜剧", "爱情", "战争", }, } gore> :import github.com/vmihailenco/msgpack gore> b, err := msgpack.Marshal(&subject) []uint8{ 0x83, 0xa2, 0x49, 0x44, 0xd3, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xa5, 0x54, 0x69, 0x74, 0x6c, 0x65, 0xac, 0xe5, 0x8d, 0x83, 0xe4, 0xb8, 0x8e, 0xe5, 0x8d, 0x83, 0xe5, 0xaf, 0xbb, 0xa6, 0x47, 0x65, 0x6e, 0x72, 0x65, 0x73, 0x94, 0xa6, 0xe5, 0x89, 0xa7, 0xe6, 0x83, 0x85, 0xa6, 0xe5, 0x96, 0x9c, 0xe5, 0x89, 0xa7, 0xa6, 0xe7, 0x88, 0xb1, 0xe6, 0x83, 0x85, 0xa6, 0xe6, 0x88, 0x98, 0xe4, 0xba, 0x89, } nil gore> string(b) "\x83\xa2ID\xd3\x00\x00\x00\x00\x00\x00\x00\x02\xa5Title\xac千与千寻\xa6Genres\x94\xa6剧情\xa6喜剧\xa6爱情\xa6战争"
接着是消费者端,消费者拿到的消息的Value部分也是字节数组,可以反序列化成为subject:
var subject Subject func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { for message := range claim.Messages() { err := msgpack.Unmarshal(message.Value, &subject) if err != nil { panic(err) } log.Printf("Message claimed: key = %s, Subject(id=%d, title=%s, genres=%v)", string(message.Key), subject.ID, subject.Title, subject.Genres) session.MarkMessage(message, "") } return nil }
这样就能正确显示条目信息了。感受一下反序列化的过程:
gore> var movie Subject gore> err = msgpack.Unmarshal(b, &movie) nil gore> movie main.Subject{ ID: 2, Title: "千与千寻", Genres: []string{ "剧情", "喜剧", "爱情", "战争", }, } gore> s := string(b) "\x83\xa2ID\xd3\x00\x00\x00\x00\x00\x00\x00\x02\xa5Title\xac千与千寻\xa6Genres\x94\xa6剧情\xa6喜剧\xa6爱情\xa6战争" gore> var movie2 Subject gore> err = msgpack.Unmarshal([]byte(s), &movie2) nil gore> :import fmt gore> fmt.Println(movie.ID, movie2.Title) 2 千与千寻 15 nil
序列化和反序列化的过程需要花时间理解一下。我们看看实际的Kafka消费者和发布者的效果:
❯ bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 2 --topic msgpack ❯ go run producer.go localhost:9092 msgpack Produce message: {2 千与千寻 [剧情 喜剧 爱情 战争]} Produce message: {2 千与千寻 [剧情 喜剧 爱情 战争]} Produce message: {2 千与千寻 [剧情 喜剧 爱情 战争]} Produce message: {2 千与千寻 [剧情 喜剧 爱情 战争]} Produce message: {2 千与千寻 [剧情 喜剧 爱情 战争]} ^C^C2019/08/01 12:32:39 Enqueued: 5; errors: 0 ❯ go run consumer.go localhost:9092 1 msgpack 2019/08/01 12:32:34 Message claimed: key = 2, Subject(id=2, title=千与千寻, genres=[剧情 喜剧 爱情 战争]) 2019/08/01 12:32:35 Message claimed: key = 2, Subject(id=2, title=千与千寻, genres=[剧情 喜剧 爱情 战争]) 2019/08/01 12:32:36 Message claimed: key = 2, Subject(id=2, title=千与千寻, genres=[剧情 喜剧 爱情 战争]) 2019/08/01 12:32:37 Message claimed: key = 2, Subject(id=2, title=千与千寻, genres=[剧情 喜剧 爱情 战争]) 2019/08/01 12:32:38 Message claimed: key = 2, Subject(id=2, title=千与千寻, genres=[剧情 喜剧 爱情 战争])
代码地址
完整代码可以在 这个地址
找到。