Kafka使用msgpack序列化

在之前的文章 使用Kafka(附Golang代码)
中已经体验了Kafka的Golang客户端,不过在文中的例子中传输的消息是一个字节数组,类型很单一。但是在实际的工作中一个消息有复杂的有效载荷(Payload),在Golang中通常是一个结构体变量,我们就用之前电影条目结构体:

type Subject struct {
    ID     int
    Title  string
    Genres []string
}
subject := Subject{
    ID:     2,
    Title:  "千与千寻",
    Genres: []string{"剧情", "喜剧", "爱情", "战争"},
}

那怎么把subject作为消息发给Kafka进而被消费呢?这就涉及序列化和反序列化。

序列化和反序列化

维基百科对它的描述是:
序列化(serialization)将数据结构或对象状态转换成可取用格式(例如存成文件,存于缓冲,或经由网络中发送),以留待后续在相同或另一台计算机环境中,能恢复原先状态的过程。依照序列化格式重新获取字节的结果时,可以利用它来产生与原始对象相同语义的副本。序列化的过程也称为对象编组(marshalling)。从一系列字节提取数据结构的反向操作,是反序列化(也称为解编组、deserialization、unmarshalling)。
序列化库在网络传输、RPC、数据库访问等环境中经常用到,也都非常成熟。目前据我了解Golang序列化的方式主要有以下4种:

encoding/gob

Gob(Go binary)是Golang内置的以二进制形式序列化和反序列化程序数据的格式,可以通过 encoding/gob
操作数据。不过Gob的协议决定了它的场景都得用纯Go编写,所以不适合多语言生产环境。

encoding/json

JSON(JavaScript Object Notation)由于其结构简单、可读性高、数据体积小等优点,现在是最主流的数据交换格式,几乎所有编程语言都有解析JSON的库,Golang标准库提供 encoding/json
包操作JSON数据。

Protobuf/Thrift等第三方库

在之前在Golang中实现RPC已经体验了gRPC,它就是基于ProtoBuf序列化协议的。另外还有很多公司选择使用Thrift等第三方库实现序列化和反序列化。不过这类方案适合跨语言RPC服务,强调(反)序列化的效率、协议的保密性、可扩展性、更小的空间等,而对于一般的序列化来说有点重了。
PS: 延伸阅读链接1是一个对各种Golang世界的库的(反)序列化基准测试的项目。里面有非常多的库和详细数据供参考。

msgpack

Msgpack是一个支持跨语言通信的,基于二进制高效的对象序列化库。它可以像JSON那样,在许多种语言之间交换结构对象;但是它比JSON更快速也更轻巧。

我这里选择了用Golang实现的编解码库 vmihailenco/msgpack
,首先安装它:

❯ go get -u github.com/vmihailenco/msgpack

使用msgpack

对于Kafka发布者和消费者的例子来说,就不序列化Key,只序列化Value就可以了。先看发布者部分:

import "github.com/vmihailenco/msgpack"

subject := Subject{
    ID:     2,
    Title:  "千与千寻",
    Genres: []string{"剧情", "喜剧", "爱情", "战争"},
}

b, err := msgpack.Marshal(&subject)
if err != nil {
    panic(err)
}

msg := &sarama.ProducerMessage{
    Topic: topic,
    Key:   sarama.StringEncoder(strconv.Itoa(subject.ID)),
    Value: sarama.StringEncoder(b),
}

Key是转化成字符串的subject.ID,Value就是把subject序列化后的结果:

❯ gore
gore version 0.4.1  :help for help
gore> type Subject struct {
.....             ID     int
.....             Title  string
.....             Genres []string
..... }
gore> subject := Subject{
.....         ID:     2,
.....             Title:  "千与千寻",
.....             Genres: []string{"剧情", "喜剧", "爱情", "战争"},
..... }
main.Subject{
  ID:     2,
  Title:  "千与千寻",
  Genres: []string{
    "剧情",
    "喜剧",
    "爱情",
    "战争",
  },
}
gore> :import github.com/vmihailenco/msgpack
gore> b, err := msgpack.Marshal(&subject)
[]uint8{
  0x83, 0xa2, 0x49, 0x44, 0xd3, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xa5, 0x54, 0x69,
  0x74, 0x6c, 0x65, 0xac, 0xe5, 0x8d, 0x83, 0xe4, 0xb8, 0x8e, 0xe5, 0x8d, 0x83, 0xe5, 0xaf, 0xbb,
  0xa6, 0x47, 0x65, 0x6e, 0x72, 0x65, 0x73, 0x94, 0xa6, 0xe5, 0x89, 0xa7, 0xe6, 0x83, 0x85, 0xa6,
  0xe5, 0x96, 0x9c, 0xe5, 0x89, 0xa7, 0xa6, 0xe7, 0x88, 0xb1, 0xe6, 0x83, 0x85, 0xa6, 0xe6, 0x88,
  0x98, 0xe4, 0xba, 0x89,
}
nil
gore> string(b)
"\x83\xa2ID\xd3\x00\x00\x00\x00\x00\x00\x00\x02\xa5Title\xac千与千寻\xa6Genres\x94\xa6剧情\xa6喜剧\xa6爱情\xa6战争"

接着是消费者端,消费者拿到的消息的Value部分也是字节数组,可以反序列化成为subject:

var subject Subject

func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    for message := range claim.Messages() {
        err := msgpack.Unmarshal(message.Value, &subject)
        if err != nil {
            panic(err)
        }
        log.Printf("Message claimed: key = %s, Subject(id=%d, title=%s, genres=%v)", string(message.Key),
            subject.ID, subject.Title, subject.Genres)
        session.MarkMessage(message, "")
    }

    return nil
}

这样就能正确显示条目信息了。感受一下反序列化的过程:

gore> var movie Subject
gore> err = msgpack.Unmarshal(b, &movie)
nil
gore> movie
main.Subject{
  ID:     2,
  Title:  "千与千寻",
  Genres: []string{
    "剧情",
    "喜剧",
    "爱情",
    "战争",
  },
}
gore> s := string(b)
"\x83\xa2ID\xd3\x00\x00\x00\x00\x00\x00\x00\x02\xa5Title\xac千与千寻\xa6Genres\x94\xa6剧情\xa6喜剧\xa6爱情\xa6战争"
gore> var movie2 Subject
gore> err = msgpack.Unmarshal([]byte(s), &movie2)
nil
gore> :import fmt
gore> fmt.Println(movie.ID, movie2.Title)
2 千与千寻
15
nil

序列化和反序列化的过程需要花时间理解一下。我们看看实际的Kafka消费者和发布者的效果:

❯ bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 2 --topic msgpack
❯ go run producer.go localhost:9092 msgpack
Produce message: {2 千与千寻 [剧情 喜剧 爱情 战争]}
Produce message: {2 千与千寻 [剧情 喜剧 爱情 战争]}
Produce message: {2 千与千寻 [剧情 喜剧 爱情 战争]}
Produce message: {2 千与千寻 [剧情 喜剧 爱情 战争]}
Produce message: {2 千与千寻 [剧情 喜剧 爱情 战争]}
^C^C2019/08/01 12:32:39 Enqueued: 5; errors: 0
❯ go run consumer.go localhost:9092 1 msgpack
2019/08/01 12:32:34 Message claimed: key = 2, Subject(id=2, title=千与千寻, genres=[剧情 喜剧 爱情 战争])
2019/08/01 12:32:35 Message claimed: key = 2, Subject(id=2, title=千与千寻, genres=[剧情 喜剧 爱情 战争])
2019/08/01 12:32:36 Message claimed: key = 2, Subject(id=2, title=千与千寻, genres=[剧情 喜剧 爱情 战争])
2019/08/01 12:32:37 Message claimed: key = 2, Subject(id=2, title=千与千寻, genres=[剧情 喜剧 爱情 战争])
2019/08/01 12:32:38 Message claimed: key = 2, Subject(id=2, title=千与千寻, genres=[剧情 喜剧 爱情 战争])

代码地址

完整代码可以在 这个地址
找到。

延伸阅读

  1. https://github.com/alecthomas/go_serialization_benchmarks