读写 Redis RESP3 协议以及Redis 6.0客户端缓存
客户端缓存是未来Redis最重要的特性。如果我们需要快速存储和快速缓存,那么我们就需要在客户端存储数据的子集。这是为了提供小延迟、大规模数据的想法的自然延伸。很多公司都采用了在客户端缓存数据以避免每次都请求redis,但是本地缓存和redis服务器数据之间有延迟,很难保证数据的一致性。Ben Malec在Redis Conf 2018上做了一个关于客户端缓存的演讲,给了Salvatore Sanfilippo以灵感,Salvatore Sanfilippo决定在Redis 6.0中支持客户端缓存的功能。但是为了支持这个功能,使用当前的redis协议很难实现,所以他设计了下一代的Redis协议: RESP3
。
所有的代码都放在了 github
上。
RESP3协议
RESP3 协议中增加了很多新的数据类型。
和 RESP version 2 等价的类型
-
Array: 一个 有序
集合,包含N个其它类型 - Blob string: 二进制安全字符串
- Simple string: 一个节省空间的非二进制安全字符串
- Simple error: 一个节省空间的非二进制安全错误码和错误信息
- Number: 有符号64位整数
RESP3中新加的类型
-
Null: 单一的空值,代替原先的 RESP v2 的
*-1
和$-1
空值。 - Double: 浮点数
- Boolean: 布尔类型 true / false
- Blob error: 二进制安全的错误码和错误信息
-
Verbatim string: 一个二进制安全字符串,带文本格式, 如命令
LATENCY DOCTOR
的输出 -
Map: 一个 有序
的键值对 -
Set: 一个 无序
的不重复的集合 - Attribute: 类似Map类型
- Push: 带外数据,格式类似数组,但是客户端需要检查第一个数据,第一个数据指示了带外数据的类型。注意带外数据并不是一个reply,它是redis主动推送的数据,所以客户端收到带外数据,交给对应的处理方法去处理后,你还需要继续读取你的reply数据
- Hello: hello命令的返回结果,类似Map类型,仅仅在客户端和服务器建立连接的时候发送
- Big number: 大数字类型
还有一种新加的stream类型,可以用来传送不确定具体长度的数据。在数据的开头有固定的标识符,在数据传输完毕后在加上这个40字节的标识符,40字节的标志符基本上不会和传输的数据有重复:
$EOF:<40 bytes marker>... any number of bytes of data here not containing the marker ... <40 bytes marker>
它以 $EOF:
开始接着是40字节标识符,回车换行,接着就是数据,数据结束后是40字节的标识符。因为这种数据类型开始接收时数据长度不确定,所以我们对这种数据类型处理比较特殊,我们会解析出第一行的40字节标志符,后续的读取以及结束标志符需要调用者自己去读取和验证。
可以看到出了保持和原先的RESP version 2的数据类型一致外, RESP3的数据增加了很多数据类型。因为客户端发送的命令的格式比较简单,基本上是字符串的数组形式,所以这么多类型主要用在对服务器返回数据的发送和解析上。
每种数据类型有一个特殊的字符作为标识 flag
,我们为每个类型定义了一个常量:
const ( // simple types TypeBlobString = '$' // $\r\n \r\n TypeSimpleString = '+' // + \r\n TypeSimpleError = '-' // - \r\n TypeNumber = ':' // : \r\n TypeNull = '_' // _\r\n TypeDouble = ',' // , \r\n TypeBoolean = '#' // #t\r\n or #f\r\n TypeBlobError = '!' // ! \r\n \r\n TypeVerbatimString = '=' // = \r\n \r\n TypeBigNumber = '(' // (\n // Aggregate data types TypeArray = '*' // * \r\n... numelements other types ... TypeMap = '%' // % \r\n... numelements key/value pair of other types ... TypeSet = '~' // ~ \r\n... numelements other types ... TypeAttribute = '|' // |~ \r\n... numelements map type ... TypePush = '>' // > \r\n \r\n... numelements-1 other types ... //special type TypeStream = "$EOF:" // $EOF:<40 bytes marker> ... any number of bytes of data here not containing the marker ...<40 bytes marker> )
然后我们为所有的数据定义一个统一的模型,这样做的好处是我们可以使用统一的数据处理代码,避免复杂繁琐的数据类型:
type Valuestruct{ Type byte Str string StrFmt string Err string Integer int64 Boolean bool Double float64 BigInt *big.Int Elems []*Value // for array & set KV *linkedhashmap.Map //TODO sorted map, for map & attr Attrs *linkedhashmap.Map StreamMarker string }
Value
代表一个数据类型的值,它既可以是客户端发送的请求,也可以是服务器端的返回或者PUSH数据。
Type
是数据的类型标识,也就是那个特殊的字符。
对于不同的数据类型,只有部分的字段有意义,比如字符串类型, Str
字段有意义, Err
、 Elems
、 KV
等就没有意义了。 Verbatim string
类型则 Str
和 StrFmt
有意义。 错误类型则 Err
有意义。数组和Set类型 Elems
有意义,Map和属性则 KV
有意义。
NULL类型是没有值的,光靠Type就足够了。
因为RESP3中规定数据前面可以有多个属性,所以每个数据还都包含一个 Attrs
字段,它表示这个数据的属性。
Stream类型只读取第一行, StreamMarker
字段表示它的40字节的标志符。
注意Map类型我们并没有使用Go标准库的Map,而是使用一个第三方的 linkedhashmap.Map
库,原因在于RESP3规范中约定Map类型是有序的,而标准库的Map是基于Hash的无序的Map,所以不合适。
我们可以把这种数据表示成Redis传输的字符串。 首先我们要读取属性,看看这个值前面是否有属性,如果有的话先把属性编码。属性是一个Map类型,按照Map类型的方式编码就可以了。注意数量是键值对的数量。接下来按照不同的类型进行编码。对于复杂类型,我们对于它包含的值递归调用编码方法即可。
func (r *Value) ToRESP3String() string { buf := new(strings.Builder) //check attributes if r.Attrs != nil && r.Attrs.Size() >0 { buf.WriteByte(TypeAttribute) buf.WriteString(strconv.Itoa(r.Attrs.Size())) buf.Write(CRLFByte) r.Attrs.Each(func(key, val interface{}) { k := key.(*Value) v := val.(*Value) buf.WriteByte(k.Type) k.toRESP3String(buf) buf.WriteByte(v.Type) v.toRESP3String(buf) }) } buf.WriteByte(r.Type) r.toRESP3String(buf) return buf.String() } func (r *Value) toRESP3String(buf *strings.Builder) { switch r.Type { case TypeSimpleString: buf.WriteString(r.Str) case TypeBlobString: ...... case TypeArray, TypeSet, TypePush: buf.WriteString(strconv.Itoa(len(r.Elems))) buf.Write(CRLFByte) for _, v := range r.Elems { buf.WriteByte(v.Type) v.toRESP3String(buf) } return ...... } }
有时候我们需要把数据返回成GO特性的类型,比如字符串、error、数组和Map等,我们也提供了一个 SmartResult
的方法:
func (r *Value) SmartResult() interface{} { switch r.Type { case TypeSimpleString: return r.Str case TypeBlobString: return r.Str ...... } }
Reader
上面定义了一个统一的数据类型 Value
,那么如何从一个连接中读取这个 Value
呢? 我们需要一个 Reader
类型。
type Reader struct { *bufio.Reader } // NewReader returns a RESP3 reader. func NewReader(reader io.Reader) *Reader { return NewReaderSize(reader,32*1024) } // NewReaderSize returns a new Reader whose buffer has at least the specified size. func NewReaderSize(reader io.Reader, size int) *Reader { return &Reader{ Reader: bufio.NewReaderSize(reader, size), } }
我们定义了一个 Reader
,你可以指定它的buffer大小,它的 ReadValue
方法可以从reader中读取 Value
对象,我们使用 io.Reader
作为源而不是 net.Conn
,是因为我们使用更通用的接口可以方便测试。
首先我们要检查是否有属性,如果有属性,先把属性读取,属性是Map的数据类型,所以按照Map的方式处理就可以了。接下来读取真正的数据。
简单一行的数据比较好处理。比如简单字符串,简单error、数字、布尔值、空值等等,我们为每种类型都提供了一个独立的解析方法,便于管理。
func (r *Reader) ReadValue() (*Value, []byte, error) { line, err := r.readLine() if err != nil { return nil, nil, err } if len(line) <3 { return nil, nil, ErrInvalidSyntax } var attrs *linkedhashmap.Map if line[0] == TypeAttribute { attrs, err = r.readAttr(line) if err != nil { return nil, nil, err } line, err = r.readLine() } // check stream. if it is stream, return the stream marker if line[0] == TypeBlobString && len(line) ==45 && bytes.Compare(line[:5], StreamMarkerPrefix) ==0 { return nil, line[5:], nil } v := &Value{ Type: line[0], Attrs: attrs, } switch v.Type { case TypeSimpleString: v.Str = string(line[1 : len(line)-2]) ...... } }
readLine
是读取一行的方法,Redis很多数据都是以回车换行符隔开的; getCount
从一行中读取数量值,比如数组的元素的数量等:
func (r *Reader) readLine() (line []byte, err error) { line, err = r.ReadBytes('\n') if err != nil { return nil, err } if len(line) >1 && line[len(line)-2] == '\r' { return line, nil } return nil, ErrInvalidSyntax } func (r *Reader) getCount(line []byte) (int, error) { end := bytes.IndexByte(line, '\r') return strconv.Atoi(string(line[1:end])) }
复杂类型的读取采用递归的方式解析,比如Map类型:
func (r *Reader) readMap(line []byte) (*linkedhashmap.Map, error) { count, err := r.getCount(line) if err != nil { return nil, err } rt := linkedhashmap.New() for i :=0; i < count; i++ { k, streamMarkerPrefix, err := r.ReadValue() if err = isError(err, streamMarkerPrefix); err != nil { return nil, err } v, streamMarkerPrefix, err := r.ReadValue() if err = isError(err, streamMarkerPrefix); err != nil { return nil, err } rt.Put(k, v) } return rt, nil }
这样,我们一个底层的RESP3 Reader就实行了,你可以连接Redis 6.0 服务器, 然后从TCP连接中读取 Value
返回值,根据不同的Type进行不同的处理,或者调用 SmartResult
得到一个确定的值。
Writer
客户端的发送命令比较简单,因为发送的数据是一个字符串数组,所以编码成一个数组,数据的元素类型是字符串就可以。
type Writer struct { *bufio.Writer } // NewWriter returns a redis client writer. func NewWriter(writer io.Writer) *Writer { return &Writer{ Writer: bufio.NewWriter(writer), } } // WriteCommand write a redis command. func (w *Writer) WriteCommand(args ...string) (err error) { // write the array flag w.WriteByte(TypeArray) w.WriteString(strconv.Itoa(len(args))) w.Write(CRLFByte) // write blobstring for _, arg := range args { w.WriteByte(TypeBlobString) w.WriteString(strconv.Itoa(len(arg))) w.Write(CRLFByte) w.WriteString(arg) w.Write(CRLFByte) } return w.Flush() }
这样,一个完整的RESP3的读写器就完成了。 有什么用?
你可以使用它和redis进行通讯,它支持目前还没有发送的redis 6.0。 你也可以基于它实现一个redis的Go client库,支持最新的redis 6.0的client库,可以支持接收PUSH消息,实现pipeline的机制,接收流数据等等。
你也可以使用它实现一个类似Codis的proxy,其中协议的解析就不用写了,直接使用它就可以。
客户端缓存
为了帮助客户端实现缓存,尽可能和redis的数据保持一致,redis需要一些额外的改进,这些额外的改进称之为 tracking
。
key空间整体被分为很多的哈希槽,redis 6.0使用24比特位作为CRC64的输出,所以会有1600多万个不同的哈希槽。哈希槽的多少是一个tradeoff, 多了占用太多的内存空间,太少又容易引起惊群的现象。如果你有1亿个key,而在客户端缓存中,收到一个失效消息不应该影响过多的key。Redis用于存储invalidation表的内存开销为130MiB,一个1600万个条目,每个条目8字节的数组。这对我来说没问题,如果你想要这个功能,你就要充分利用客户端所有的内存,所以使用130MB服务器端内存作为代价;你所赢得的是更细粒度的失效。
客户端连接到redis服务器之后,要想打开这个特性,需要发送:
CLIENT TRACKING on
服务端回复 +OK
表示正常,同时从那时开始,每一个只读命令,除了返回key对应的数据给调用者以外,还有一个副作用,就是记录所有客户端请求的key的哈希槽(但仅仅是对只读命令的key)。Redis存储这些信息的方法很简单。每个Redis客户端都有一个惟一的ID,因此,如果ID为123的客户端执行一个MGET 命令,它的keys对应的哈希槽是1、2和5,我们将得到一个包含以下条目的无效表:
1 ->[123] 2 ->[123] 5 ->[123]
稍后,ID为888的客户端来请求哈希槽5中的key,所以这个表变为:
5 ->[123,888]
现在,其他一些客户端在哈希槽5中更改了一些key。发生的情况是,Redis将检查Invalidation表,发现客户端123和888可能都在该槽上缓存了key。我们会向客户发送一个失效消息,他们可以有多种处理方式:要么记住哈希槽最新的失效时间戳,然后在使用的时候才检查缓存对象的失效时间戳,如果超出这个时间戳,则删除这个key,这称为lazy的方式。或者,客户端可以获取关于这个哈希槽的所有缓存内容,然后直接删除哈希槽即可。这种使用24位散列函数的方法不是问题,即使缓存了数千万个key,我们也不会有很长的哈希槽记录。在发送失效消息之后,服务端会从Invalidation表中删除条目,这样服务端将不再向这些客户端发送失效消息,直到它们再次读取该哈希槽内的key。
客户端也也可以有一些自己的设计,比如使用20比特位记录哈希槽,做好服务器的24比特位和20比特币之间的对应就好。
Redis是通过Push消息将失效消息发送给客户端的。你也可以使用另外一个客户端(1234)负责接收失效消息:
CLIENT TRACKING on REDIRECT 1234
客户端ID可以通过 CLIENT ID
请求获取。
接下来我们使用前面实现的读写器来验证TRACKING的功能。
// 首先连接一个redis 6.0服务器,只有redis 6.0的服务器才开始支持TRACKING conn, err := net.DialTimeout("tcp", "127.0.0.1:6379",5*time.Second) if err != nil { t.Logf("can't found one of redis 6.0 server") return } defer conn.Close() w := NewWriter(conn) r := NewReader(conn) // 告诉redis采用RESP3的协议 w.WriteCommand("HELLO", "3") helloResp, _, err := r.ReadValue() if err != nil { t.Fatalf("failed to send a HELLO 3") } if helloResp.KV.Size() ==0 { t.Fatalf("expect some info but got %+v", helloResp) } t.Logf("hello response: %c, %v", helloResp.Type, helloResp.SmartResult()) // 通知服务器开始追踪 w.WriteCommand("CLIENT", "TRACKING", "on") resp, _, err := r.ReadValue() if err != nil { t.Fatalf("failed to TRACKING: %v", err) } t.Logf("TRACKING result: %c, %+v", resp.Type, resp.SmartResult()) // 请求一次,服务器应该计算出哈希槽,并且关联这个哈希槽和这个连接 w.WriteCommand("GET", "a") resp, _, err = r.ReadValue() if err != nil { t.Fatalf("failed to GET: %v", err) } t.Logf("GET result: %c, %+v", resp.Type, resp.SmartResult()) // 启动另外一个连接,模拟更新数据 go func() { conn, err := net.DialTimeout("tcp", "127.0.0.1:9999",5*time.Second) if err != nil { t.Logf("can't found one of redis 6.0 server") return } defer conn.Close() w := NewWriter(conn) r := NewReader(conn) // 根据key计算出的哈希槽的哈希,服务器的PUSH消息应该推送这个槽的hash hash := crc64([]byte("a")) & (TRACKING_TABLE_SIZE -1) t.Logf("calculated hash: %d", hash) for i :=0; i <10; i++ { // 模拟更新数据 w.WriteCommand("set", "a", strconv.Itoa(i)) resp, _, err = r.ReadValue() if err != nil { t.Fatalf("failed to set: %v", err) } t.Logf("set result: %c, %+v", resp.Type, resp.SmartResult()) time.Sleep(200 * time.Millisecond) } }() for i :=0; i <10; i++ { // 读取一个PUSH数据 resp, _, err = r.ReadValue() if err != nil { t.Fatalf("failed to receive a message: %v", err) } if resp.Type == TypePush && len(resp.Elems) >=2 && resp.Elems[0].SmartResult().(string) == "invalidate" { t.Logf("received TRACKING result: %c, %+v", resp.Type, resp.SmartResult()) // 推送消息后,服务器就不再关联这个哈希槽和这个连接了,所以我们需要在拉取一次数据,以便继续跟踪 w.WriteCommand("GET", "a") resp, _, err = r.ReadValue() } }