读写 Redis RESP3 协议以及Redis 6.0客户端缓存

在四月份的一篇翻译的文章中,我介绍了读写Redis RESP version 2的协议的Go 语言的实现,你可以使用它采用底层的方式读写5.0以及以下版本的Redis。Redis 6.0还在开发之中年底或者明年初就要发布了。Redis 6.0支持多线程I/O,还有客户端缓存。

客户端缓存是未来Redis最重要的特性。如果我们需要快速存储和快速缓存,那么我们就需要在客户端存储数据的子集。这是为了提供小延迟、大规模数据的想法的自然延伸。很多公司都采用了在客户端缓存数据以避免每次都请求redis,但是本地缓存和redis服务器数据之间有延迟,很难保证数据的一致性。Ben Malec在Redis Conf 2018上做了一个关于客户端缓存的演讲,给了Salvatore Sanfilippo以灵感,Salvatore Sanfilippo决定在Redis 6.0中支持客户端缓存的功能。但是为了支持这个功能,使用当前的redis协议很难实现,所以他设计了下一代的Redis协议: RESP3

所有的代码都放在了 github
上。

RESP3协议

RESP3 协议中增加了很多新的数据类型。

和 RESP version 2 等价的类型

  • Array: 一个 有序
    集合,包含N个其它类型
  • Blob string: 二进制安全字符串
  • Simple string: 一个节省空间的非二进制安全字符串
  • Simple error: 一个节省空间的非二进制安全错误码和错误信息
  • Number: 有符号64位整数

RESP3中新加的类型

  • Null: 单一的空值,代替原先的 RESP v2 的 *-1
    $-1
    空值。
  • Double: 浮点数
  • Boolean: 布尔类型 true / false
  • Blob error: 二进制安全的错误码和错误信息
  • Verbatim string: 一个二进制安全字符串,带文本格式, 如命令 LATENCY DOCTOR
    的输出
  • Map: 一个 有序
    的键值对
  • Set: 一个 无序
    的不重复的集合
  • Attribute: 类似Map类型
  • Push: 带外数据,格式类似数组,但是客户端需要检查第一个数据,第一个数据指示了带外数据的类型。注意带外数据并不是一个reply,它是redis主动推送的数据,所以客户端收到带外数据,交给对应的处理方法去处理后,你还需要继续读取你的reply数据
  • Hello: hello命令的返回结果,类似Map类型,仅仅在客户端和服务器建立连接的时候发送
  • Big number: 大数字类型

还有一种新加的stream类型,可以用来传送不确定具体长度的数据。在数据的开头有固定的标识符,在数据传输完毕后在加上这个40字节的标识符,40字节的标志符基本上不会和传输的数据有重复:

$EOF:<40 bytes marker>
... any number of bytes of data here not containing the marker ...
<40 bytes marker>

它以 $EOF:
开始接着是40字节标识符,回车换行,接着就是数据,数据结束后是40字节的标识符。因为这种数据类型开始接收时数据长度不确定,所以我们对这种数据类型处理比较特殊,我们会解析出第一行的40字节标志符,后续的读取以及结束标志符需要调用者自己去读取和验证。
可以看到出了保持和原先的RESP version 2的数据类型一致外, RESP3的数据增加了很多数据类型。因为客户端发送的命令的格式比较简单,基本上是字符串的数组形式,所以这么多类型主要用在对服务器返回数据的发送和解析上。

每种数据类型有一个特殊的字符作为标识 flag
,我们为每个类型定义了一个常量:

const (
    // simple types

    TypeBlobString     = '$' // $\r\n\r\n
    TypeSimpleString   = '+' // +\r\n
    TypeSimpleError    = '-' // -\r\n
    TypeNumber         = ':' // :\r\n
    TypeNull           = '_' // _\r\n
    TypeDouble         = ',' // ,\r\n
    TypeBoolean        = '#' // #t\r\n or #f\r\n
    TypeBlobError      = '!' // !\r\n\r\n
    TypeVerbatimString = '=' // =\r\n\r\n
    TypeBigNumber      = '(' // (\n

    // Aggregate data types

    TypeArray     = '*' // *\r\n... numelements other types ...
    TypeMap       = '%' // %\r\n... numelements key/value pair of other types ...
    TypeSet       = '~' // ~\r\n... numelements other types ...
    TypeAttribute = '|' // |~\r\n... numelements map type ...
    TypePush      = '>' // >\r\n\r\n... numelements-1 other types ...

    //special type
    TypeStream = "$EOF:" // $EOF:<40 bytes marker>... any number of bytes of data here not containing the marker ...<40 bytes marker>
)

然后我们为所有的数据定义一个统一的模型,这样做的好处是我们可以使用统一的数据处理代码,避免复杂繁琐的数据类型:

type Valuestruct{
    Type         byte
    Str          string
    StrFmt       string
    Err          string
    Integer      int64
    Boolean      bool
    Double       float64
    BigInt       *big.Int
    Elems        []*Value           // for array & set
    KV           *linkedhashmap.Map //TODO sorted map, for map & attr
    Attrs        *linkedhashmap.Map
    StreamMarker string
}

Value
代表一个数据类型的值,它既可以是客户端发送的请求,也可以是服务器端的返回或者PUSH数据。

Type
是数据的类型标识,也就是那个特殊的字符。

对于不同的数据类型,只有部分的字段有意义,比如字符串类型, Str
字段有意义, Err
Elems
KV
等就没有意义了。 Verbatim string
类型则 Str
StrFmt
有意义。 错误类型则 Err
有意义。数组和Set类型 Elems
有意义,Map和属性则 KV
有意义。
NULL类型是没有值的,光靠Type就足够了。

因为RESP3中规定数据前面可以有多个属性,所以每个数据还都包含一个 Attrs
字段,它表示这个数据的属性。

Stream类型只读取第一行, StreamMarker
字段表示它的40字节的标志符。

注意Map类型我们并没有使用Go标准库的Map,而是使用一个第三方的 linkedhashmap.Map
库,原因在于RESP3规范中约定Map类型是有序的,而标准库的Map是基于Hash的无序的Map,所以不合适。
我们可以把这种数据表示成Redis传输的字符串。 首先我们要读取属性,看看这个值前面是否有属性,如果有的话先把属性编码。属性是一个Map类型,按照Map类型的方式编码就可以了。注意数量是键值对的数量。接下来按照不同的类型进行编码。对于复杂类型,我们对于它包含的值递归调用编码方法即可。

func (r *Value) ToRESP3String() string {
    buf := new(strings.Builder)

    //check attributes
    if r.Attrs != nil && r.Attrs.Size() >0 {
        buf.WriteByte(TypeAttribute)
        buf.WriteString(strconv.Itoa(r.Attrs.Size()))
        buf.Write(CRLFByte)

        r.Attrs.Each(func(key, val interface{}) {
            k := key.(*Value)
            v := val.(*Value)
            buf.WriteByte(k.Type)
            k.toRESP3String(buf)
            buf.WriteByte(v.Type)
            v.toRESP3String(buf)
        })
    }

    buf.WriteByte(r.Type)
    r.toRESP3String(buf)
    return buf.String()
}

func (r *Value) toRESP3String(buf *strings.Builder) {
    switch r.Type {
    case TypeSimpleString:
        buf.WriteString(r.Str)
    case TypeBlobString:
    ......
    case TypeArray, TypeSet, TypePush:
        buf.WriteString(strconv.Itoa(len(r.Elems)))
        buf.Write(CRLFByte)

        for _, v := range r.Elems {
            buf.WriteByte(v.Type)
            v.toRESP3String(buf)
        }
        return
    ......
    }
}

有时候我们需要把数据返回成GO特性的类型,比如字符串、error、数组和Map等,我们也提供了一个 SmartResult
的方法:

func (r *Value) SmartResult() interface{} {
    switch r.Type {
    case TypeSimpleString:
        return r.Str
    case TypeBlobString:
        return r.Str
    ......
    }
}

Reader

上面定义了一个统一的数据类型 Value
,那么如何从一个连接中读取这个 Value
呢? 我们需要一个 Reader
类型。

type Reader struct {
    *bufio.Reader
}

// NewReader returns a RESP3 reader.
func NewReader(reader io.Reader) *Reader {
    return NewReaderSize(reader,32*1024)
}

// NewReaderSize returns a new Reader whose buffer has at least the specified size.
func NewReaderSize(reader io.Reader, size int) *Reader {
    return &Reader{
        Reader: bufio.NewReaderSize(reader, size),
    }
}

我们定义了一个 Reader
,你可以指定它的buffer大小,它的 ReadValue
方法可以从reader中读取 Value
对象,我们使用 io.Reader
作为源而不是 net.Conn
,是因为我们使用更通用的接口可以方便测试。
首先我们要检查是否有属性,如果有属性,先把属性读取,属性是Map的数据类型,所以按照Map的方式处理就可以了。接下来读取真正的数据。
简单一行的数据比较好处理。比如简单字符串,简单error、数字、布尔值、空值等等,我们为每种类型都提供了一个独立的解析方法,便于管理。

func (r *Reader) ReadValue() (*Value, []byte, error) {
    line, err := r.readLine()
    if err != nil {
        return nil, nil, err
    }
    if len(line) <3 {
        return nil, nil, ErrInvalidSyntax
    }

    var attrs *linkedhashmap.Map
    if line[0] == TypeAttribute {
        attrs, err = r.readAttr(line)
        if err != nil {
            return nil, nil, err
        }
        line, err = r.readLine()
    }

    // check stream. if it is stream, return the stream marker
    if line[0] == TypeBlobString && len(line) ==45 && bytes.Compare(line[:5], StreamMarkerPrefix) ==0 {
        return nil, line[5:], nil
    }

    v := &Value{
        Type:  line[0],
        Attrs: attrs,
    }

    switch v.Type {
    case TypeSimpleString:
        v.Str = string(line[1 : len(line)-2])
        ......
    }
}

readLine
是读取一行的方法,Redis很多数据都是以回车换行符隔开的; getCount
从一行中读取数量值,比如数组的元素的数量等:

func (r *Reader) readLine() (line []byte, err error) {
    line, err = r.ReadBytes('\n')
    if err != nil {
        return nil, err
    }
    if len(line) >1 && line[len(line)-2] == '\r' {
        return line, nil
    }
    return nil, ErrInvalidSyntax
}

func (r *Reader) getCount(line []byte) (int, error) {
    end := bytes.IndexByte(line, '\r')
    return strconv.Atoi(string(line[1:end]))
}

复杂类型的读取采用递归的方式解析,比如Map类型:

func (r *Reader) readMap(line []byte) (*linkedhashmap.Map, error) {
    count, err := r.getCount(line)
    if err != nil {
        return nil, err
    }

    rt := linkedhashmap.New()
    for i :=0; i < count; i++ {
        k, streamMarkerPrefix, err := r.ReadValue()
        if err = isError(err, streamMarkerPrefix); err != nil {
            return nil, err
        }
        v, streamMarkerPrefix, err := r.ReadValue()
        if err = isError(err, streamMarkerPrefix); err != nil {
            return nil, err
        }
        rt.Put(k, v)
    }
    return rt, nil
}

这样,我们一个底层的RESP3 Reader就实行了,你可以连接Redis 6.0 服务器, 然后从TCP连接中读取 Value
返回值,根据不同的Type进行不同的处理,或者调用 SmartResult
得到一个确定的值。

Writer

客户端的发送命令比较简单,因为发送的数据是一个字符串数组,所以编码成一个数组,数据的元素类型是字符串就可以。

type Writer struct {
    *bufio.Writer
}

// NewWriter returns a redis client writer.
func NewWriter(writer io.Writer) *Writer {
    return &Writer{
        Writer: bufio.NewWriter(writer),
    }
}

// WriteCommand write a redis command.
func (w *Writer) WriteCommand(args ...string) (err error) {
    // write the array flag
    w.WriteByte(TypeArray)
    w.WriteString(strconv.Itoa(len(args)))
    w.Write(CRLFByte)
    // write blobstring
    for _, arg := range args {
        w.WriteByte(TypeBlobString)
        w.WriteString(strconv.Itoa(len(arg)))
        w.Write(CRLFByte)
        w.WriteString(arg)
        w.Write(CRLFByte)
    }
    return w.Flush()
}

这样,一个完整的RESP3的读写器就完成了。 有什么用?
你可以使用它和redis进行通讯,它支持目前还没有发送的redis 6.0。 你也可以基于它实现一个redis的Go client库,支持最新的redis 6.0的client库,可以支持接收PUSH消息,实现pipeline的机制,接收流数据等等。
你也可以使用它实现一个类似Codis的proxy,其中协议的解析就不用写了,直接使用它就可以。

客户端缓存

为了帮助客户端实现缓存,尽可能和redis的数据保持一致,redis需要一些额外的改进,这些额外的改进称之为 tracking

key空间整体被分为很多的哈希槽,redis 6.0使用24比特位作为CRC64的输出,所以会有1600多万个不同的哈希槽。哈希槽的多少是一个tradeoff, 多了占用太多的内存空间,太少又容易引起惊群的现象。如果你有1亿个key,而在客户端缓存中,收到一个失效消息不应该影响过多的key。Redis用于存储invalidation表的内存开销为130MiB,一个1600万个条目,每个条目8字节的数组。这对我来说没问题,如果你想要这个功能,你就要充分利用客户端所有的内存,所以使用130MB服务器端内存作为代价;你所赢得的是更细粒度的失效。
客户端连接到redis服务器之后,要想打开这个特性,需要发送:

CLIENT TRACKING on

服务端回复 +OK
表示正常,同时从那时开始,每一个只读命令,除了返回key对应的数据给调用者以外,还有一个副作用,就是记录所有客户端请求的key的哈希槽(但仅仅是对只读命令的key)。Redis存储这些信息的方法很简单。每个Redis客户端都有一个惟一的ID,因此,如果ID为123的客户端执行一个MGET 命令,它的keys对应的哈希槽是1、2和5,我们将得到一个包含以下条目的无效表:

1 ->[123]
2 ->[123]
5 ->[123]

稍后,ID为888的客户端来请求哈希槽5中的key,所以这个表变为:

5 ->[123,888]

现在,其他一些客户端在哈希槽5中更改了一些key。发生的情况是,Redis将检查Invalidation表,发现客户端123和888可能都在该槽上缓存了key。我们会向客户发送一个失效消息,他们可以有多种处理方式:要么记住哈希槽最新的失效时间戳,然后在使用的时候才检查缓存对象的失效时间戳,如果超出这个时间戳,则删除这个key,这称为lazy的方式。或者,客户端可以获取关于这个哈希槽的所有缓存内容,然后直接删除哈希槽即可。这种使用24位散列函数的方法不是问题,即使缓存了数千万个key,我们也不会有很长的哈希槽记录。在发送失效消息之后,服务端会从Invalidation表中删除条目,这样服务端将不再向这些客户端发送失效消息,直到它们再次读取该哈希槽内的key。
客户端也也可以有一些自己的设计,比如使用20比特位记录哈希槽,做好服务器的24比特位和20比特币之间的对应就好。
Redis是通过Push消息将失效消息发送给客户端的。你也可以使用另外一个客户端(1234)负责接收失效消息:

CLIENT TRACKING on REDIRECT 1234

客户端ID可以通过 CLIENT ID
请求获取。
接下来我们使用前面实现的读写器来验证TRACKING的功能。

   // 首先连接一个redis 6.0服务器,只有redis 6.0的服务器才开始支持TRACKING
conn, err := net.DialTimeout("tcp", "127.0.0.1:6379",5*time.Second)
if err != nil {
    t.Logf("can't found one of redis 6.0 server")
    return
}
defer conn.Close()

w := NewWriter(conn)
r := NewReader(conn)

   // 告诉redis采用RESP3的协议
w.WriteCommand("HELLO", "3")
helloResp, _, err := r.ReadValue()
if err != nil {
    t.Fatalf("failed to send a HELLO 3")
}
if helloResp.KV.Size() ==0 {
    t.Fatalf("expect some info but got %+v", helloResp)
}
t.Logf("hello response: %c, %v", helloResp.Type, helloResp.SmartResult())

   // 通知服务器开始追踪
w.WriteCommand("CLIENT", "TRACKING", "on")
resp, _, err := r.ReadValue()
if err != nil {
    t.Fatalf("failed to TRACKING: %v", err)
}
t.Logf("TRACKING result: %c, %+v", resp.Type, resp.SmartResult())

   // 请求一次,服务器应该计算出哈希槽,并且关联这个哈希槽和这个连接
w.WriteCommand("GET", "a")
resp, _, err = r.ReadValue()
if err != nil {
    t.Fatalf("failed to GET: %v", err)
}
t.Logf("GET result: %c, %+v", resp.Type, resp.SmartResult())

   // 启动另外一个连接,模拟更新数据
go func() {
    conn, err := net.DialTimeout("tcp", "127.0.0.1:9999",5*time.Second)
    if err != nil {
        t.Logf("can't found one of redis 6.0 server")
        return
    }
    defer conn.Close()
    w := NewWriter(conn)
    r := NewReader(conn)

    // 根据key计算出的哈希槽的哈希,服务器的PUSH消息应该推送这个槽的hash
    hash := crc64([]byte("a")) & (TRACKING_TABLE_SIZE -1)
    t.Logf("calculated hash: %d", hash)

    for i :=0; i <10; i++ {
        // 模拟更新数据
        w.WriteCommand("set", "a", strconv.Itoa(i))
        resp, _, err = r.ReadValue()
        if err != nil {
            t.Fatalf("failed to set: %v", err)
        }
        t.Logf("set result: %c, %+v", resp.Type, resp.SmartResult())
        time.Sleep(200 * time.Millisecond)
    }

}()

for i :=0; i <10; i++ {
       // 读取一个PUSH数据
    resp, _, err = r.ReadValue()
    if err != nil {
        t.Fatalf("failed to receive a message: %v", err)
    }
    if resp.Type == TypePush && len(resp.Elems) >=2 && resp.Elems[0].SmartResult().(string) == "invalidate" {
        t.Logf("received TRACKING result: %c, %+v", resp.Type, resp.SmartResult())

           // 推送消息后,服务器就不再关联这个哈希槽和这个连接了,所以我们需要在拉取一次数据,以便继续跟踪
        w.WriteCommand("GET", "a")
        resp, _, err = r.ReadValue()
    }
}

参考文档

  1. https://github.com/antirez/redis/blob/unstable/src/tracking.c
  2. http://antirez.com/news/130
  3. https://github.com/antirez/RESP3/blob/master/spec.md
  4. https://github.com/smallnest/resp3
  5. https://www.redisgreen.net/blog/reading-and-writing-redis-protocol/