conntrack通信原理分析

在Netfilter框架研究 这篇文章中说过,通过 cat /proc/net/nf_conntrack 或者是通过 conntrack -L -o extend 方式查看当前系统中的连接跟踪信息。如下所示:

ipv4 2 tcp 6 431916 ESTABLISHED src=172.22.44.167 dst=172.22.44.196 sport=44972 dport=18012 src=172.22.44.196 dst=172.22.44.167 sport=18012 dport=44972 [ASSURED] mark=0 zone=0 use=2

但是网络上大部分资料都是通过注册 NF_INET_LOCAL_IN 这样一个HOOK函数的方式来获取信息。既然在用户态可以直接获取conntrack连接跟踪信息,那么就不需要通过注册一个 NF_INET_LOCAL_IN 这样的函数来实现。

本文就是探究在用户态下如何实施获取到连接跟踪信息。

代码分析

conntrack-go 为例来看看conntrack的实现原理。基于 first comit 这个commit来进行分析。首先分析程序的入口文件, conntrack-agent.go

func main(){
    h,err := lib.NewHandle(unix.NETLINK_NETFILTER)
    if err != nil {
        log.Fatalln("failed to create Handle..ERROR:",err)
    }
    err = h.ConntrackTableFlush(lib.ConntrackTable)
    if err != nil {
        log.Fatalln("failed to flush conntrack table..ERROR:", err)
    }
    for {
        flows, err := h.ConntrackTableList(lib.ConntrackTable, lib.InetFamily(unix.AF_INET))
        if err == nil {
            if len(flows) != 0 {
                for _, flow := range flows {
                    fmt.Println(flow)
                }
            }
        }
        <-time.After(time.Millisecond * 50)
    }
}

其实主要就是三个步骤.

h,err := lib.NewHandle(unix.NETLINK_NETFILTER)
h.ConntrackTableFlush(lib.ConntrackTable)
h.ConntrackTableList(lib.ConntrackTable, lib.InetFamily(unix.AF_INET))

NewHandle(unix.NETLINK_NETFILTER)

进入到 NewHandle(….) 函数内部

func NewHandle(nlFamilies ...int) (*Handle, error) {
    return newHandle(None(), None(), nlFamilies...)
}
func newHandle(newNs, curNs NsHandle, nlFamilies ...int) (*Handle, error) {
    h := &Handle{sockets: map[int]*SocketHandle{}}
    fams := SupportedNlFamilies
    if len(nlFamilies) != 0 {
        fams = nlFamilies
    }
    for _, f := range fams {
        s, err := GetNetlinkSocketAt(newNs, curNs, f)
        if err != nil {
            return nil, err
        }
        h.sockets[f] = &SocketHandle{Socket: s}
    }
    return h, nil
}

newNs和curNs,使用newNs替换curNs 即创建一个network namespace,但是这对我们来说没有必要,所以都是nil. 程序直接调用 GetNetlinkSocketAt

// GetNetlinkSocketAt opens a netlink socket in the network namespace newNs
// and positions the thread back into the network namespace specified by curNs,
// when done. If curNs is close, the function derives the current namespace and
// moves back into it when done. If newNs is close, the socket will be opened
// in the current network namespace.
func GetNetlinkSocketAt(newNs, curNs NsHandle, protocol int) (*NetlinkSocket, error) {
    c, err := executeInNetns(newNs, curNs)
    if err != nil {
        return nil, err
    }
    defer c()
    return getNetlinkSocket(protocol)
}

// 命名空间的设置不是我们关心的重点,我们主要是关心如何如何通过protocol创建一个socket对象.
func getNetlinkSocket(protocol int) (*NetlinkSocket, error) {
    fd, err := unix.Socket(unix.AF_NETLINK, unix.SOCK_RAW|unix.SOCK_CLOEXEC, protocol)
    if err != nil {
        return nil, err
    }
    s := &NetlinkSocket{
        fd: int32(fd),
    }
    s.lsa.Family = unix.AF_NETLINK
    if err := unix.Bind(fd, &s.lsa); err != nil {
        unix.Close(fd)
        return nil, err
    }

    return s, nil
}

getNetlinkSocket 就是常见的socket创建方法.

fd, err := unix.Socket(unix.AF_NETLINK, unix.SOCK_RAW|unix.SOCK_CLOEXEC, protocol)
unix.Bind(fd, &s.lsa)

最后我们再回到 NewHandle

// 得到netfilter类型的socket fd
s, err := GetNetlinkSocketAt(newNs, curNs, f)

// 创建SocketHandle对象,并将socket属性赋值为socket fd
// SocketHandle contains the netlink socket and the associated
// sequence counter for a specific netlink family
type SocketHandle struct {
    Seq    uint32
    Socket *NetlinkSocket
}

type NetlinkSocket struct {
    fd  int32
    lsa unix.SockaddrNetlink
    sync.Mutex
}
h.sockets[f] = &SocketHandle{Socket: s}

ConntrackTableFlush(lib.ConntrackTable)

首先分析出 lib.ConntrackTable 的值.

type ConntrackTableType uint8
const (
    ConntrackTable = ConntrackTableType(1)
    ConntrackExpectTable = ConntrackTableType(2)
)

接下来分析 ConntrackTableFlush 顾名思义. ConntrackTableFlush 就是清空连接跟踪表.

func (h *Handle) ConntrackTableFlush(table ConntrackTableType) error {
    req := h.newConntrackRequest(table, unix.AF_INET, IPCTNL_MSG_CT_DELETE, unix.NLM_F_ACK)
    _, err := req.Execute(unix.NETLINK_NETFILTER, 0)
    return err
}
func (h *Handle) newConntrackRequest(table ConntrackTableType, family InetFamily, operation, flags int) *NetlinkRequest {
    // Create the Netlink request object
    req := h.newNetlinkRequest((int(table)<<8)|operation, flags)
    // Add the netfilter header
    msg := &Nfgenmsg{
        NfgenFamily: uint8(family),
        Version:     NFNETLINK_V0,
        ResId:       0,
    }
    req.AddData(msg)
    return req
}

req := h.newNetlinkRequest((int(table)<<8)|operation, flags) 是创建一个NetlinkRequest,至于此时各个参数的内容:

  1. table:1
  2. famliy: unix.AF_INET(值为2)
  3. operation:2
  4. flag: unix.NLM_F_ACK(值为4)

分析程序 newNetlinkRequest

type NetlinkRequest struct {
    unix.NlMsghdr
    Data    []NetlinkRequestData
    RawData []byte
    Sockets map[int]*SocketHandle
}

func (h *Handle) newNetlinkRequest(proto, flags int) *NetlinkRequest {
    // Do this so that package API still use nl package variable nextSeqNr
    if h.sockets == nil {
        return NewNetlinkRequest(proto, flags)
    }
    return &NetlinkRequest{
        // 一个标准的netlink message的头,
        // 参见:https://blog.spoock.com/2019/11/25/lkm/#%E7%A4%BA%E4%BE%8B%E7%A8%8B%E5%BA%8F
        /*
          nlh = (struct nlmsghdr *) malloc(NLMSG_SPACE(MAX_PAYLOAD));
          memset(nlh, 0, NLMSG_SPACE(MAX_PAYLOAD));
          nlh->nlmsg_len = NLMSG_SPACE(MAX_PAYLOAD);
          nlh->nlmsg_pid = getpid();
          nlh->nlmsg_flags = 0;
        */  
        NlMsghdr: unix.NlMsghdr{
            Len:   uint32(unix.SizeofNlMsghdr),
            Type:  uint16(proto),
            Flags: unix.NLM_F_REQUEST | uint16(flags),
        },
        Sockets: h.sockets,
    }
}

按照 NETLINK(7) 中的说明,

nlmsg_type can be one of the standard message types: NLMSG_NOOP message is to be ignored, NLMSG_ERROR message signals an error and the payload contains an nlmsgerr structure, NLMSG_DONE message terminates a multipart message.\

struct nlmsgerr { \

int error; /

Negative errno or 0 for acknowledgements / \

/\

};

对于nlmsg_type来说,存在四种类型.每种类型对应的Int值分别是:

// referer:https://elixir.bootlin.com/linux/v4.7/source/include/uapi/linux/netlink.h#L95
#define NLMSG_NOOP     0x1 /* Nothing.     */
#define NLMSG_ERROR        0x2 /* Error        */
#define NLMSG_DONE     0x3 /* End of a dump    */
#define NLMSG_OVERRUN  0x4 /* Data lost        */

同时在 include/uapi/linux/netfilter/nfnetlink_conntrack.h 还定义了一些消息控制类型.如下所示:

enum cntl_msg_types {
    IPCTNL_MSG_CT_NEW,
    IPCTNL_MSG_CT_GET,
    IPCTNL_MSG_CT_DELETE,
    IPCTNL_MSG_CT_GET_CTRZERO,
    IPCTNL_MSG_CT_GET_STATS_CPU,
    IPCTNL_MSG_CT_GET_STATS,
    IPCTNL_MSG_CT_GET_DYING,
    IPCTNL_MSG_CT_GET_UNCONFIRMED,

    IPCTNL_MSG_MAX
};

本例中的 IPCTNL_MSG_CT_DELETE 值是2,对应于 cntl_msg_types 中的 IPCTNL_MSG_CT_DELETE
对于nlmsg_flags,则是:

NLM_F_REQUEST Must be set on all request messages. NLM_F_MULTI The message is part of a multipart mes‐ sage terminated by NLMSG_DONE. NLM_F_ACK Request for an acknowledgment on success. NLM_F_ECHO Echo this request. Additional flag bits for GET requests

对应的值是:

/* Flags values */

#define NLM_F_REQUEST      1   /* It is request message.   */
#define NLM_F_MULTI        2   /* Multipart message, terminated by NLMSG_DONE */
#define NLM_F_ACK      4   /* Reply with ack, with zero or error code */
#define NLM_F_ECHO     8   /* Echo this request        */
#define NLM_F_DUMP_INTR        16  /* Dump was inconsistent due to sequence change */

所以在本例中选择是1和4,即 NLM_F_REQUESTNLM_F_ACK 表示是一条请求信息,并且需要回复.

最后 newNetlinkRequest 成功执行,返回一个 NetlinkRequest 类型的机构体. 内容如下:

&NetlinkRequest{
        NlMsghdr: unix.NlMsghdr{
            Len:   uint32(unix.SizeofNlMsghdr),
            Type:  uint16(proto),
            Flags: unix.NLM_F_REQUEST | uint16(flags),
        },
        // 通过前面GetNetlinkSocketAt()函数得到的socket fd
        Sockets: h.sockets,
    }

回到 newConntrackRequest .

func (h *Handle) newConntrackRequest(table ConntrackTableType, family InetFamily, operation, flags int) *NetlinkRequest {
    // Create the Netlink request object
    req := h.newNetlinkRequest((int(table)<<8)|operation, flags)
    // Add the netfilter header
    msg := &Nfgenmsg{
        NfgenFamily: uint8(family),
        Version:     NFNETLINK_V0,
        ResId:       0,
    }
    req.AddData(msg)
    return req
}

func (req *NetlinkRequest) AddData(data NetlinkRequestData) {
    req.Data = append(req.Data, data)
}

type NetlinkRequest struct {
    unix.NlMsghdr
    Data    []NetlinkRequestData
    RawData []byte
    Sockets map[int]*SocketHandle
}
type NetlinkRequestData interface {
    Len() int
    Serialize() []byte
}

获得了 NetlinkRequest 结构体之后,调用 AddData() 方法,将数据填充到Data属性中.Data属性是一个 NetlinkRequestData 的接口.

当执行完毕 newConntrackRequest 之后,程序回到主函数 ConntrackTableFlush

func (h *Handle) ConntrackTableFlush(table ConntrackTableType) error {
    req := h.newConntrackRequest(table, unix.AF_INET, IPCTNL_MSG_CT_DELETE, unix.NLM_F_ACK)
    _, err := req.Execute(unix.NETLINK_NETFILTER, 0)
    return err
}

调用 NetlinkRequestExecute() 方法.

// Execute the request against a the given sockType.
// Returns a list of netlink messages in serialized format, optionally filtered
// by resType.
func (req *NetlinkRequest) Execute(sockType int, resType uint16) ([][]byte, error) {
    var (
        s   *NetlinkSocket
        err error
    )

    if req.Sockets != nil {
        // 获取socket 对象
        if sh, ok := req.Sockets[sockType]; ok {
            s = sh.Socket
            // 设置序列号为1
            req.Seq = atomic.AddUint32(&sh.Seq, 1)
        }
    }
    sharedSocket := s != nil

    if s == nil {
        s, err = getNetlinkSocket(sockType)
        if err != nil {
            return nil, err
        }
        defer s.Close()
    } else {
        s.Lock()
        defer s.Unlock()
    }
    if err := s.Send(req); err != nil {
        return nil, err
    }

    pid, err := s.GetPid()
    if err != nil {
        return nil, err
    }

    var res [][]byte

done:
    for {
        msgs, err := s.Receive()
        if err != nil {
            return nil, err
        }
        for _, m := range msgs {
            if m.Header.Seq != req.Seq {
                if sharedSocket {
                    continue
                }
                return nil, fmt.Errorf("Wrong Seq nr %d, expected %d", m.Header.Seq, req.Seq)
            }
            if m.Header.Pid != pid {
                return nil, fmt.Errorf("Wrong pid %d, expected %d", m.Header.Pid, pid)
            }
            if m.Header.Type == unix.NLMSG_DONE {
                break done
            }
            if m.Header.Type == unix.NLMSG_ERROR {
                native := NativeEndian()
                error := int32(native.Uint32(m.Data[0:4]))
                if error == 0 {
                    break done
                }
                return nil, syscall.Errno(-error)
            }
            if resType != 0 && m.Header.Type != resType {
                continue
            }
            res = append(res, m.Data)
            if m.Header.Flags&unix.NLM_F_MULTI == 0 {
                break done
            }
        }
    }
    return res, nil
}

其中的关键代码是 s.Send(req) . S是NetlinkSocket,req是包含了请求数据的对象.

type NetlinkSocket struct {
    fd  int32
    lsa unix.SockaddrNetlink
    sync.Mutex
}

type NetlinkRequest struct {
    unix.NlMsghdr
    Data    []NetlinkRequestData
    RawData []byte
    Sockets map[int]*SocketHandle
}

func (s *NetlinkSocket) Send(request *NetlinkRequest) error {
    fd := int(atomic.LoadInt32(&s.fd))
    if fd < 0 {
        return fmt.Errorf("Send called on a closed socket")
    }
    if err := unix.Sendto(fd, request.Serialize(), 0, &s.lsa); err != nil {
        return err
    }
    return nil
}

此时的 NetlinkSocket 是:

NetlinkRequest 是:

NetlinkSocket 的send()方法本质上还是调用unix的send()方法, unix.Sendto(fd, request.Serialize(), 0, &s.lsa) 只是需要对request的数据进行序列化,就会调用 NetlinkRequestSerialize 方法.

// Serialize the Netlink Request into a byte array
func (req *NetlinkRequest) Serialize() []byte {
    length := unix.SizeofNlMsghdr
    dataBytes := make([][]byte, len(req.Data))
    for i, data := range req.Data {
        // 其中的req.Data是一个Nfgenmsg类型的结构体
        dataBytes[i] = data.Serialize()
        length = length + len(dataBytes[i])
    }
    length += len(req.RawData)

    req.Len = uint32(length)
    b := make([]byte, length)
    hdr := (*(*[unix.SizeofNlMsghdr]byte)(unsafe.Pointer(req)))[:]
    next := unix.SizeofNlMsghdr
    copy(b[0:next], hdr)
    for _, data := range dataBytes {
        for _, dataByte := range data {
            b[next] = dataByte
            next = next + 1
        }
    }
    // Add the raw data if any
    if len(req.RawData) > 0 {
        copy(b[next:length], req.RawData)
    }
    return b
}

所以 data.Serialize() 本质上就是调用 Nfgenmsg的Serialize() 方法.

type Nfgenmsg struct {
    NfgenFamily uint8
    Version     uint8
    ResId       uint16 // big endian
}

func (msg *Nfgenmsg) Len() int {
    return SizeofNfgenmsg
}

func DeserializeNfgenmsg(b []byte) *Nfgenmsg {
    return (*Nfgenmsg)(unsafe.Pointer(&b[0:SizeofNfgenmsg][0]))
}

//将Nfgenmsg转换为一个byte数组.
func (msg *Nfgenmsg) Serialize() []byte {
    return (*(*[SizeofNfgenmsg]byte)(unsafe.Pointer(msg)))[:]
}

回到主函数 Serialize

b := make([]byte, length)
hdr := (*(*[unix.SizeofNlMsghdr]byte)(unsafe.Pointer(req)))[:]
next := unix.SizeofNlMsghdr
copy(b[0:next], hdr)
for _, data := range dataBytes {
    for _, dataByte := range data {
        b[next] = dataByte
        next = next + 1
    }
}

将req转换为长度为SizeofNlMsghdr的byte.最后将req和req.Data全部填充到b中 (b := make([]byte, length)) , 最终调用 unix.Sendto(fd, request.Serialize(), 0, &s.lsa); 发送数据.

程序继续执行,回到函数Execute函数. 调用 Sendto() 发送数据之后,接下来就是接受数据.

done:
    for {
        // 获取数据
        msgs, err := s.Receive()
        if err != nil {
            return nil, err
        }
        for _, m := range msgs {
            // 检测序列号,判断两者是否一致
            if m.Header.Seq != req.Seq {
                if sharedSocket {
                    continue
                }
                return nil, fmt.Errorf("Wrong Seq nr %d, expected %d", m.Header.Seq, req.Seq)
            }
            // 检测pid是否一直
            if m.Header.Pid != pid {
                return nil, fmt.Errorf("Wrong pid %d, expected %d", m.Header.Pid, pid)
            }
            // 通过Netlink Message的头类型判断数据是什么类型
            if m.Header.Type == unix.NLMSG_DONE {  // 数据发送完毕
                break done
            }
            if m.Header.Type == unix.NLMSG_ERROR {   // 错误信息
                native := NativeEndian()
                error := int32(native.Uint32(m.Data[0:4]))
                if error == 0 {
                    break done
                }
                return nil, syscall.Errno(-error)
            }
            if resType != 0 && m.Header.Type != resType {
                continue
            }
            // 添加数据
            res = append(res, m.Data)
            // 如果flags和NLM_F_MULTI并级是0,则同样结束遍历.
            if m.Header.Flags&unix.NLM_F_MULTI == 0 {
                break done
            }
        }
    }
    return res, nil

unix.NLMSG_ERROR 中,如果确定前面error信息的前面4个字节是0,则同样表示请求结束.

if m.Header.Type == unix.NLMSG_ERROR {   // 错误信息
    native := NativeEndian()
    error := int32(native.Uint32(m.Data[0:4]))
    if error == 0 {
        break done
    }
    return nil, syscall.Errno(-error)
}

我们观察此时我们的请求书数据.

所以,虽然是NLMSG_ERROR类型的相应包,但是只要error的前面4个字节是0,则表示没有错误,成功执行.

至此,我们的 h.ConntrackTableFlush(lib.ConntrackTable) 就执行完毕, 本质上就是发送IPCTNL_MSG_CT_DELETE请求,清空连接跟踪表.

ConntrackTableList(lib.ConntrackTable, lib.InetFamily(unix.AF_INET))

当执行完毕 ConntrackTableFlush(lib.ConntrackTable) 之后 , 程序就会执行 ConntrackTableFlush(lib.ConntrackTable) ConntrackTableList(lib.ConntrackTable, lib.InetFamily(unix.AF_INET)) 获取连接跟踪的数据.

func (h *Handle) ConntrackTableList(table ConntrackTableType, family InetFamily) ([]*ConntrackFlow, error) {
    res, err := h.dumpConntrackTable(table, family)
    if err != nil {
        return nil, err
    }

    // Deserialize all the flows
    var result []*ConntrackFlow
    for _, dataRaw := range res {
        result = append(result, parseRawData(dataRaw))
    }

    return result, nil
}

func (h *Handle) dumpConntrackTable(table ConntrackTableType, family InetFamily) ([][]byte, error) {
    req := h.newConntrackRequest(table, family, IPCTNL_MSG_CT_GET, unix.NLM_F_DUMP)
    return req.Execute(unix.NETLINK_NETFILTER, 0)
}

可以看到,其实ConntrackTableList和前面分析的ConntrackTableFlush 整个流程基本相同. 不同之处在于:

  1. ConntrackTableList 是调用 newConntrackRequest(table, family, IPCTNL_MSG_CT_GET, unix.NLM_F_DUMP) 用于获取信息,而在 ConntrackTableFlush 则是调用 h.newConntrackRequest(table, unix.AF_INET, IPCTNL_MSG_CT_DELETE, unix.NLM_F_ACK) 清除连接跟踪表.
  2. ConntrackTableListres, err := h.dumpConntrackTable(table, family) ,需要得到请求之后的返回值,即连接跟踪的数据.在 ConntrackTableFlush ,则是 _, err := req.Execute(unix.NETLINK_NETFILTER, 0) ,丢弃了返回值.因为连接跟踪表仅仅只是关心请求清空的操作是否成功执行,并不关心返回数据,其实也没有数据返回.

接下里就是主要分析 ConntrackTableList 对返回数据的解析部分

parseRawData(dataRaw)

func parseRawData(data []byte) *ConntrackFlow {
    s := &ConntrackFlow{}
    var proto uint8
    // First there is the Nfgenmsg header
    // consume only the family field
    reader := bytes.NewReader(data)
    binary.Read(reader, NativeEndian(), &s.FamilyType)

    // skip rest of the Netfilter header
    reader.Seek(3, seekCurrent)
    // The message structure is the following:
    //  4 bytes
    //  4 bytes
    // flow information of the forward flow
    //  4 bytes
    //  4 bytes
    // flow information of the reverse flow
    for reader.Len() > 0 {
        if nested, t, l := parseNfAttrTL(reader); nested {
            switch t {
            case CTA_TUPLE_ORIG:
                if nested, t, _ = parseNfAttrTL(reader); nested && t == CTA_TUPLE_IP {
                    proto = parseIpTuple(reader, &s.Forward)
                }
            case CTA_TUPLE_REPLY:
                if nested, t, _ = parseNfAttrTL(reader); nested && t == CTA_TUPLE_IP {
                    parseIpTuple(reader, &s.Reverse)
                } else {
                    // Header not recognized skip it
                    reader.Seek(int64(l), seekCurrent)
                }
            case CTA_COUNTERS_ORIG:
                s.Forward.Bytes, s.Forward.Packets = parseByteAndPacketCounters(reader)
            case CTA_COUNTERS_REPLY:
                s.Reverse.Bytes, s.Reverse.Packets = parseByteAndPacketCounters(reader)
            }
        }
    }
    if proto == TCP_PROTO {
        reader.Seek(64, seekCurrent)
        _, t, _, v := parseNfAttrTLV(reader)
        if t == CTA_MARK {
            s.Mark = uint32(v[3])
        }
    } else if proto == UDP_PROTO {
        reader.Seek(16, seekCurrent)
        _, t, _, v := parseNfAttrTLV(reader)
        if t == CTA_MARK {
            s.Mark = uint32(v[3])
        }
    }
    return s
}

skip header

reader := bytes.NewReader(data)
binary.Read(reader, NativeEndian(), &s.FamilyType)

// skip rest of the Netfilter header
reader.Seek(3, seekCurrent)

将数据变为Reader对象之后,跳过前面4个字符.注释解释为跳过Netfilter header. 因为在Netlink Data中的前面4个字节一般都是代表nfgenmsg信息

/* General form of address family dependent message.
 */
struct nfgenmsg {
    __u8  nfgen_family;     /* AF_xxx */  // 在本例中是获取IPv4的连接跟踪信息,所以就是NFPROTO_IPV4,即2
    __u8  version;      /* nfnetlink version */ // 一般情况是NFNETLINK_V0,即0
    __be16    res_id;       /* resource id */  //一般情况是0
};

enum {
    NFPROTO_UNSPEC =  0,
    NFPROTO_INET   =  1,
    NFPROTO_IPV4   =  2,
    NFPROTO_ARP    =  3,
    NFPROTO_NETDEV =  5,
    NFPROTO_BRIDGE =  7,
    NFPROTO_IPV6   = 10,
    NFPROTO_DECNET = 12,
    NFPROTO_NUMPROTO,
};

parseNfAttrTL(reader)

程序解析reader,获取数据.

func parseNfAttrTL(r *bytes.Reader) (isNested bool, attrType, len uint16) {
    binary.Read(r, NativeEndian(), &len)
    len -= SizeofNfattr

    binary.Read(r, NativeEndian(), &attrType)
    isNested = (attrType & NLA_F_NESTED) == NLA_F_NESTED
    attrType = attrType & (NLA_F_NESTED - 1)

    return isNested, attrType, len
}

此时,解析得到的数据如下:

attrType的类型定义如下:

enum ctattr_type {
    CTA_UNSPEC,
    CTA_TUPLE_ORIG,
    CTA_TUPLE_REPLY,
    CTA_STATUS,
    CTA_PROTOINFO,
    CTA_HELP,
    CTA_NAT_SRC,
#define CTA_NAT    CTA_NAT_SRC /* backwards compatibility */
    CTA_TIMEOUT,
    CTA_MARK,
    CTA_COUNTERS_ORIG,
    CTA_COUNTERS_REPLY,
    CTA_USE,
    CTA_ID,
    CTA_NAT_DST,
    CTA_TUPLE_MASTER,
    CTA_SEQ_ADJ_ORIG,
    CTA_NAT_SEQ_ADJ_ORIG    = CTA_SEQ_ADJ_ORIG,
    CTA_SEQ_ADJ_REPLY,
    CTA_NAT_SEQ_ADJ_REPLY   = CTA_SEQ_ADJ_REPLY,
    CTA_SECMARK,        /* obsolete */
    CTA_ZONE,
    CTA_SECCTX,
    CTA_TIMESTAMP,
    CTA_MARK_MASK,
    CTA_LABELS,
    CTA_LABELS_MASK,
    __CTA_MAX
};

当前值为1,则对应于 CTA_TUPLE_ORIG 类型. 对应的处理代码如下:

case CTA_TUPLE_ORIG:
    if nested, t, _ = parseNfAttrTL(reader); nested && t == CTA_TUPLE_IP {
        proto = parseIpTuple(reader, &s.Forward)
    }

又经过一次 parseNfAttrTL(reader) 解析,此时返回值如下:

满足条件之后,程序进入到 parseIpTuple(reader, &s.Forward) 继续解析数据.

// This method parse the ip tuple structure
// The message structure is the following:
// 
// 
// 
// 
// 
func parseIpTuple(reader *bytes.Reader, tpl *ipTuple) uint8 {
    for i := 0; i < 2; i++ {
        _, t, _, v := parseNfAttrTLV(reader)
      /*
        referer:include/uapi/linux/netfilter.h
        enum ctattr_ip {
            CTA_IP_UNSPEC,
            CTA_IP_V4_SRC,
            CTA_IP_V4_DST,
            CTA_IP_V6_SRC,
            CTA_IP_V6_DST,
            __CTA_IP_MAX
      };
      */
        //解析源地址和目标地址
        switch t {
        case CTA_IP_V4_SRC, CTA_IP_V6_SRC:
            tpl.SrcIP = v
        case CTA_IP_V4_DST, CTA_IP_V6_DST:
            tpl.DstIP = v
        }
    }
    // Skip the next 4 bytes  nl.NLA_F_NESTED|nl.CTA_TUPLE_PROTO
    reader.Seek(4, seekCurrent)
    _, t, _, v := parseNfAttrTLV(reader)
    // 解析消息类型
    /*
    referer: include/uapi/linux/netfilter/nfnetlink_conntrack.h
    enum ctattr_l4proto {
    CTA_PROTO_UNSPEC,
    CTA_PROTO_NUM,
    CTA_PROTO_SRC_PORT,
    CTA_PROTO_DST_PORT,
    CTA_PROTO_ICMP_ID,
    CTA_PROTO_ICMP_TYPE,
    CTA_PROTO_ICMP_CODE,
    CTA_PROTO_ICMPV6_ID,
    CTA_PROTO_ICMPV6_TYPE,
    CTA_PROTO_ICMPV6_CODE,
    __CTA_PROTO_MAX
};
    */
    if t == CTA_PROTO_NUM {
        // 解析得到对应的protocol的编号,在本里是6,即TCP
        tpl.Protocol = uint8(v[0])
    }
    // Skip some padding 3 bytes
    reader.Seek(3, seekCurrent)
    for i := 0; i < 2; i++ {
        // 同样的方法解析得到源端口和目的端口
        _, t, _ := parseNfAttrTL(reader)
        switch t {
        case CTA_PROTO_SRC_PORT:
            parseBERaw16(reader, &tpl.SrcPort)
        case CTA_PROTO_DST_PORT:
            parseBERaw16(reader, &tpl.DstPort)
        }
        // Skip some padding 2 byte
        reader.Seek(2, seekCurrent)
    }
    return tpl.Protocol
}

// 获取对应的属性和值
func parseNfAttrTLV(r *bytes.Reader) (isNested bool, attrType, len uint16, value []byte) {
    isNested, attrType, len = parseNfAttrTL(r)

    value = make([]byte, len)
    binary.Read(r, binary.BigEndian, &value)
    return isNested, attrType, len, value
}
func parseNfAttrTL(r *bytes.Reader) (isNested bool, attrType, len uint16) {
    binary.Read(r, NativeEndian(), &len)
    len -= SizeofNfattr

    binary.Read(r, NativeEndian(), &attrType)
    isNested = (attrType & NLA_F_NESTED) == NLA_F_NESTED
    attrType = attrType & (NLA_F_NESTED - 1)

    return isNested, attrType, len
}

按照上面的这种方式将数据解析完毕之后,就需要将信息返回.

type ipTuple struct {
    Bytes    uint64
    DstIP    net.IP
    DstPort  uint16
    Packets  uint64
    Protocol uint8
    SrcIP    net.IP
    SrcPort  uint16
}

type ConntrackFlow struct {
    FamilyType uint8
    Forward    ipTuple
    Reverse    ipTuple
    Mark       uint32
}
s := &ConntrackFlow{}
if proto == TCP_PROTO {
    reader.Seek(64, seekCurrent)
    _, t, _, v := parseNfAttrTLV(reader)
    if t == CTA_MARK {
        s.Mark = uint32(v[3])
    }
} else if proto == UDP_PROTO {
    reader.Seek(16, seekCurrent)
    _, t, _, v := parseNfAttrTLV(reader)
    if t == CTA_MARK {
        s.Mark = uint32(v[3])
    }
}

解析得到MARK值,填充到 ConntrackFlow 结构体中.

output

解析得到数据之后,接下来就是输出结果.

for {
    flows, err := h.ConntrackTableList(lib.ConntrackTable, lib.InetFamily(unix.AF_INET))
    if err == nil {
        if len(flows) != 0 {
            for _, flow := range flows {
                fmt.Println(flow)
            }
        }
    }

最终输出结果.内容如下:

tcp 6 src=IP1 dst=IP2 sport=33508 dport=17250 packets=175 bytes=23397   src=IP2 dst=IP1 sport=17250 dport=33508 packets=214 bytes=28663 mark=0
udp 17 src=IP3 dst=IP4 sport=5353 dport=5353 packets=5 bytes=469    src=IP4 dst=IP3 sport=5353 dport=5353 packets=0 bytes=0 mark=0

总结

花了大量的时间来分析整个获取连接跟踪信息的过程,收获非常的大.但是由于精力和时间的关系,对于最后的解析netfilter的返回回来的数据没有详解,等有时间在详细说明吧.