类似csv的数据日志组件设计

原文链接: https://blog.thinkeridea.com/…

我们业务每天需要记录大量的日志数据,且这些数据十分重要,它们是公司收入结算的主要依据,也是数据分析部门主要得数据源,针对这么重要的日志,且高频率的日志,我们需要一个高性能且安全的日志组件,能保证每行日志格式完整性,我们设计了一个类 csv 的日志拼接组件,它的代码在这里 datalog

它是一个可以保证日志各列完整性且高效拼接字段的组件,支持任意列和行分隔符,而且还支持数组字段,可是实现一对多的日志需求,不用记录多个日志,也不用记录多行。它响应一个 []byte
数据,方便结合其它主键写入数据到日志文件或者网络中。

使用说明

API 列表

NewRecord(len int) Record
NewRecordPool(len int) *sync.Pool
ToBytes(sep, newline string) []byte
ArrayJoin(sep string) string
ArrayFieldJoin(fieldSep, arraySep string) string
Clean()
UnsafeToBytes(sep, newline string) []byte
UnsafeArrayFieldJoin(fieldSep, arraySep string) string

底层使用 type Record []string
字符串切片作为一行或者一个数组字段,在使用时它应该是定长的,因为数据日志往往是格式化的,每列都有自己含义,使用 NewRecord(len int) Record
或者 NewRecordPool(len int) *sync.Pool
创建组件,我建议每个日志使用 NewRecordPool
在程序初始化时创建一个缓存池,程序运行时从缓存次获取 Record
将会更加高效,但是每次放回 Pool
时需要调用 Clean
清空 Record
避免引用字符串无法被回收,而导致内存泄漏。

实践

我们需要保证日志每列数据的含义一至,我们创建了定长的 Record
,但是如何保证每列数据一致性,利用go 的常量枚举可以很好的保证,例如我们定义日志列常量:

const (
    LogVersion = "v1.0.0"
)
const (
    LogVer = iota
    LogTime
    LogUid
    LogUserName
    LogFriends

    LogFieldNumber
)

LogFieldNumber
就是日志的列数量,也就是 Record
的长度,之后使用 NewRecordPool
创建缓存池,然后使用常量名称作为下标记录日志,这样就不用担心因为检查或者疏乎导致日志列错乱的问题了。

var w bytes.Buffer // 一个日志写组件
var pool = datalog.NewRecordPool(LogFieldNumber) // 创建一个缓存池

func main() {
  r := pool.Get().(datalog.Record)
  r[LogVer] = LogVersion
  r[LogTime] = time.Now().Format("2006-01-02 15:04:05")
  // 检查用户数据是否存在
  //if user !=nil{
    r[LogUid] = "Uid"
    r[LogUserName] = "UserNmae"
  //}

  // 拼接一行日志数据
  data := r.Join(datalog.FieldSep, datalog.NewLine)
  r.Clean() // 清空 Record
  pool.Put(r) // 放回到缓存池

  // 写入到日志中
  if _, err := w.Write(data); err != nil {
    panic(err)
  }

  // 打印出日志数据
  fmt.Println("'" + w.String() + "'")
}

以上程序运行会输出:
因为分隔符是不可见字符,下面使用,代替字段分隔符,使用;n代替换行符, 使用/代替数组字段分隔符,是-代替数组分隔符。

'v1.0.0,2019-07-18,11:39:09,Uid,UserNmae,;\n'

即使我们没有记录 LogFriends
列的数据,但是在日志中它仍然有一个占位符,如果 user
nil
LogUid
LogUserName
不需要特殊处理,也不需要写入数据,它依然占据自己的位置,不用担心日志因此而错乱。

使用 pool 可以很好的利用内存,不会带来过多的内存分配,而且 Record 的每个字段值都是字符串,简单的赋值并不会带来太大的开销,它会指向字符串本身的数据,不会有额外的内存分配,详细参见 string 优化误区及建议

使用 Record.Join
可以高效的连接一行日志记录,便于我们快速的写入的日志文件中,后面部分会详细介绍 Join
的设计。

包含数组的日志

有时候也并非都是记录一些单一的值,比如上面 LogFriends 会记录当前记录相关的朋友信息,这可能是一组数据, datalog
也提供了一些简单的辅助函数,可以结合下面的实例实现:

// 定义 LogFriends 数组各列的数据
const (
    LogFriendUid = iota
    LogFriendUserName

    LogFriendFieldNumber
)

var w bytes.Buffer // 一个日志写组件
var pool = datalog.NewRecordPool(LogFieldNumber) // 每行日志的 pool
var frPool = datalog.NewRecordPool(LogFriendFieldNumber) // LogFriends 数组字段的 pool

func main(){
  // 程序运行时
  r := pool.Get().(datalog.Record)
  r[LogVer] = LogVersion
  r[LogTime] = time.Now().Format("2006-01-02 15:04:05")
  // 检查用户数据是否存在
  //if user !=nil{
    r[LogUid] = "Uid"
    r[LogUserName] = "UserNmae"
  //}

  // 拼接一个数组字段,其长度是不固定的
  r[LogFriends] = GetLogFriends(rand.Intn(3))
  // 拼接一行日志数据
  data := r.Join(datalog.FieldSep, datalog.NewLine)
  r.Clean() // 清空 Record
  pool.Put(r) // 放回到缓存池

  // 写入到日志中
  if _, err := w.Write(data); err != nil {
    panic(err)
  }

  // 打印出日志数据
  fmt.Println("'" + w.String() + "'")
}

// 定义一个函数来拼接 LogFriends 
func GetLogFriends(friendNum int) string {
  // 根据数组长度创建一个 Record,数组的个数往往是不固定的,它整体作为一行日志的一个字段,所以并不会破坏数据
    fs := datalog.NewRecord(friendNum) 
     // 这里只需要中 pool 中获取一个实例,它可以反复复用
    fr := frPool.Get().(datalog.Record)
    for i := 0; i < friendNum; i++ {
    // fr.Clean() 如果不是每个字段都赋值,应该在使用前或者使用后清空它们便于后面复用
        fr[LogFriendUid] = "FUid"
        fr[LogFriendUserName] = "FUserName"
    
     // 连接一个数组中各个字段,作为一个数组单元
        fs[i] = fr.ArrayFieldJoin(datalog.ArrayFieldSep, datalog.ArraySep)
    }
    fr.Clean() // 清空 Record
    frPool.Put(fr)  // 放回到缓存池

  // 连接数组的各个单元,返回一个字符串作为一行日志的一列
    return fs.ArrayJoin(datalog.ArraySep)
}

以上程序运行会输出:
因为分隔符是不可见字符,下面使用,代替字段分隔符,使用;n代替换行符, 使用/代替数组字段分隔符,是-代替数组分隔符。

'v1.0.0,2019-07-18,11:39:09,Uid,UserNmae,FUid/FUserName-FUid/FUserName;\n'

这样在解析时可以把某一字段当做数组解析,这极大的极大的提高了数据日志的灵活性,
但是并不建议使用过多的层级,数据日志应当清晰简洁,但是有些特殊场景可以使用一层嵌套。

最佳实践

使用 ToBytes
ArrayFieldJoin
时会把数据字段中的连接字符串替换一个空字符串,所以在 datalog
里面定义了4个分隔符,它们都是不可见字符,极少会出现在数据中,但是我们还需要替换数据中的这些连接字符,避免破坏日志结构。

虽然组件支持各种连接符,但是为了避免数据被破坏,我们应该选择一些不可见且少见的单字节字符作为分隔符。换行符比较特殊,因为大多数日志读取组件都是用 \n
作为行分隔符,如果数据中极少出现 \n
那就可以使用 \n
datalog
中定义 \x03\n
作为换行符,它兼容一般的日志读取组件,只需要我们做少量的工作就可以正确的解析日志了。

UnsafeToBytes
UnsafeArrayFieldJoin
性能会更好,和它们的名字一样,他们并不安全,因为它们使用 exbytes.Replace
做原地替换分隔符,这会破坏数据所指向的原始字符串。除非我们日志数据中会出现极多的分隔符需要替换,否者并不建议使用它们,因为它们只在替换时提升性能。

我在服务中大量使用 UnsafeToBytes
UnsafeArrayFieldJoin
,我总是在一个请求结束时记录日志,我确保所有相关的数据不会再使用,所以不用担心原地替换导致其它数据被无感知改变的问题,这也许是一个很好的实践,但是我仍然不推荐使用它们。

设计讲解

datalog
并没有提供太多的约束很功能,它仅仅包含一种实践和一组辅助工具,在使用它之前,我们需要了解这些实践。

它帮我们创建一个定长的日志行或者一个 sync.Pool
,我们需要结合常量枚举记录数据,它帮我们把各列数据连接成记录日志需要的数据格式。
它所提供的辅助方法都经过实际项目的考验,考量诸多细节,以高性能为核心目标所设计,使用它可以极大的降低相关组件的开发成本,接下来这节将分析它的各个部分。

我认为值得说道的是它提供的一个 Join
方法,相对于 strings.Join
可以节省两次的内存分配,现从它开始分析。

// Join 使用 sep 连接 Record, 并在末尾追加 suffix
// 这个类似 strings.Join 方法,但是避免了连接后追加后缀(往往是换行符)导致的内存分配
// 这个方法直接返回需要的 []byte 类型, 可以减少类型转换,降低内存分配导致的性能问题
func (l Record) Join(sep, suffix string) []byte {
    if len(l) == 0 {
        return []byte(suffix)
    }

    n := len(sep) * (len(l) - 1)
    for i := 0; i < len(l); i++ {
        n += len(l[i])
    }

    n += len(suffix)
    b := make([]byte, n)
    bp := copy(b, l[0])
    for i := 1; i < len(l); i++ {
        bp += copy(b[bp:], sep)
        bp += copy(b[bp:], l[i])
    }
    copy(b[bp:], suffix)
    return b
}

日志组件往往输入的参数是 []byte
类型,所以它直接返回一个 []byte
,而不像 strings.Join
响应一个字符串,在末尾是需要对内部的 buf
进行类型转换,导致额外的内存开销。我们每行日志不仅需要使用分隔符连接各列,还需要一个行分隔符作为结尾,它提供一个后缀 suffix
,不用我们之后在 Join
结果后再次拼接行分隔符,这样也能减少一个额外的内存分配。

这恰恰是 datalog
设计的精髓,它并没有大量使用标准库的方法,而是设计更符合该场景的方法,以此来获得更高的性能和更好的使用体验。

// ToBytes 使用 sep 连接 Record,并在末尾添加 newline 换行符
// 注意:这个方法会替换 sep 与 newline 为空字符串
func (l Record) ToBytes(sep, newline string) []byte {
   for i := len(l) - 1; i >= 0; i-- {
      // 提前检查是否包含特殊字符,以便跳过字符串替换
      if strings.Index(l[i], sep) < 0 && strings.Index(l[i], newline) < 0 {
         continue
      }

      b := []byte(l[i]) // 这会重新分配内存,避免原地替换导致引用字符串被修改
      b = exbytes.Replace(b, exstrings.UnsafeToBytes(sep), []byte{' '}, -1)
      b = exbytes.Replace(b, exstrings.UnsafeToBytes(newline), []byte{' '}, -1)
      l[i] = exbytes.ToString(b)
   }

   return l.Join(sep, newline)
}

ToBytes
作为很重要的交互函数,也是该组件使用频率最高的函数,它在连接各个字段之前替换每个字段中的字段和行分隔符,这里提前做了一个检查字段中是否包含分隔符,如果包含使用 []byte(l[i])
拷贝该列的数据,然后使用 exbytes.Replace
提供高性能的原地替换,因为输入数据是拷贝重新分配的,所以不用担心原地替换会影响其它数据。

之后使用之前介绍的 Join
方法连接各列数据,如果使用 strings.Join
将会是 []byte(strings.Join([]string(l), sep) + newline)
这其中会增加很多次内存分配,该组件通过巧妙的设计规避这些额外的开销,以提升性能。

// UnsafeToBytes 使用 sep 连接 Record,并在末尾添加 newline 换行符
// 注意:这个方法会替换 sep 与 newline 为空字符串,替换采用原地替换,这会导致所有引用字符串被修改
// 必须明白其作用,否者将会导致意想不到的结果。但是这会大幅度减少内存分配,提升程序性能
// 我在项目中大量使用,我总是在请求最后记录日志,这样我不会再访问引用的字符串
func (l Record) UnsafeToBytes(sep, newline string) []byte {
   for i := len(l) - 1; i >= 0; i-- {
      b := exstrings.UnsafeToBytes(l[i])
      b = exbytes.Replace(b, exstrings.UnsafeToBytes(sep), []byte{' '}, -1)
      b = exbytes.Replace(b, exstrings.UnsafeToBytes(newline), []byte{' '}, -1)
      l[i] = exbytes.ToString(b)
   }

   return l.Join(sep, newline)
}

UnsafeToBytes
ToBytes
相似只是没有分割符检查,因为 exbytes.Replace
中已经包含了检查,而且直接使用 exstrings.UnsafeToBytes
把字符串转成 []byte
这不会发生数据拷贝,非常的高效,但是它不支持字面量字符串,不过我相信日志中的数据均来自运行时分配,如果不幸包含字面量字符串,也不用太过担心,只要使用一个特殊的字符作为分隔符,往往我们编程字面量字符串并不会包含这些字符,执行 exbytes.Replace
没有发生替换也是安全的。

// Clean 清空 Record 中的所有元素,如果使用 sync.Pool 在放回 Pool 之前应该清空 Record,避免内存泄漏
// 该方法没有太多的开销,可以放心的使用,只是为 Record 中的字段赋值为空字符串,空字符串会在编译时处理,没有额外的内存分配
func (l Record) Clean() {
   for i := len(l) - 1; i >= 0; i-- {
      l[i] = ""
   }
}

Clean
方法更简单,它只是把各个列的数据替换为空字符串,空字符串做为一个特殊的字符,会在编译时处理,并不会有额外的开销,它们都指向同一块内存。

// ArrayJoin 使用 sep 连接 Record,其结果作为数组字段的值
func (l Record) ArrayJoin(sep string) string {
   return exstrings.Join(l, sep)
}

// ArrayFieldJoin 使用 fieldSep 连接 Record,其结果作为一个数组的单元
// 注意:这个方法会替换 fieldSep 与 arraySep 为空字符串,替换采用原地替换
func (l Record) ArrayFieldJoin(fieldSep, arraySep string) string {
   for i := len(l) - 1; i >= 0; i-- {
      // 提前检查是否包含特殊字符,以便跳过字符串替换
      if strings.Index(l[i], fieldSep) < 0 && strings.Index(l[i], arraySep) < 0 {
         continue
      }

      b := []byte(l[i]) // 这会重新分配内存,避免原地替换导致引用字符串被修改
      b = exbytes.Replace(b, exstrings.UnsafeToBytes(fieldSep), []byte{' '}, -1)
      b = exbytes.Replace(b, exstrings.UnsafeToBytes(arraySep), []byte{' '}, -1)
      l[i] = exbytes.ToString(b)
   }

   return exstrings.Join(l, fieldSep)
}

ArrayFieldJoin
在连接各个字符串时会直接替换数组单元分隔符,之后直接使用 exstrings.Join
进行连接字符串,) 相对 strings.Join
的一个改进函数,因为它只有一次内存分配,较 strings.Join
节省一次,有兴趣可以去看它的源码实现。

总结

datalog
提供了一种实践以及一些辅助工具,可以帮助我们快速的记录数据日志,更关心数据本身。具体程序性能可以交给 datalog
来实现,它保证程序的性能。
后期我会计划提供一个高效的日志读取组件,以便于读取解析数据日志,它较与一般文件读取会更加高效且便捷,有针对性的优化日志解析效率,敬请关注吧。

转载:

本文作者: 戚银( thinkeridea

本文链接: https://blog.thinkeridea.com/201907/go/csv_like_data_logs.html

版权声明: 本博客所有文章除特别声明外,均采用 CC BY 4.0 CN协议
许可协议。转载请注明出处!