单机百万并发,golang 50行代码
本文首先介绍单机百万并发的测试方法和测试结果,然后分析go语言50行代码实现的单机百万并发网络服务器背后的秘密
组网
采用6台2核8G内存的云主机作为client
采用1台4核16G内存的云主机作为server

组网图
client端设置
设置系统打开的最大文件数为20万
ulimit -n 200000
修改端口可用范围为1024到65535
echo 1024 65535 > /proc/sys/net/ipv4/ip_local_port_range
单台client虚机建立18万连接
配置单网卡多ip,每个网卡配置三个ip,启动三个client进程,每个client进程指定不同的local ip建立6万连接,总共18万连接

server端配置
设置系统打开的最大文件数为100万
ulimit -n 1000000
设置半连接队列和全连接队列长度
测试过程中出现了一个现象,客户端建立了30000连接,服务端只建立了28570连接
经过排查,原因是:
1 全连接队列满了,如下图,overflowed次数在增加

2 tcp_abort_on_overflow 为0,表示如果三次握手第三步的时候全连接队列满了那么server扔掉client 发过来的ack(在server端认为连接还没建立起来)
tcp_abort_on_overflow为 1,表示第三步的时候如果全连接队列满了,server发送一个reset包给client,表示废掉这个握手过程和这个连接(本来在server端这个连接就还没建立起来)

解决方法:
设置半连接队列长度为10000
echo 10000 >/proc/sys/net/ipv4/tcp_max_syn_backlog
设置全连接队列长度为10000
echo 10000 >/proc/sys/net/core/somaxconn
参考 【转】关于TCP 半连接队列和全连接队列 – sidesky – 博客园
linux内核调优tcp_max_syn_backlog和somaxconn的区别-10931853-51CTO博客
设置conntrack最大连接数
默认net.nf_conntrack_max 为 262144,设置为100万
sysctl -w net.nf_conntrack_max=1000000
tcp最大连接数调优,可参考 Linux 内核优化-调大TCP最大连接数 – 简书
最终测试结果
server建立起96万连接
平时ss命令使用最多的是ss -anp,这里需要注意在连接数非常大的时候,指定p参数命令慢的几乎不可用,这里只指定an参数
ss比netstat性能好,参考 https://blog.csdn.net/hustsselbj/article/details/47438781

cpu和内存使用情况
cpu大概占用2个核,内存3g

查看cpu硬件信息,cpu的频率为2.4G
查看cpu硬件信息,参考 linux(centos)查看cpu硬件信息命令图解教程 电脑维修技术网

客户端、服务端代码实现
客户端
package main import ( "flag" "fmt" "net" "os" "time" ) var RemoteAddr *string var ConcurNum *int var LocalAddr *string func init() { RemoteAddr = flag.String("remote-ip", "127.0.0.1", "ip addr of remote server") ConcurNum = flag.Int("concurrent-num", 100, "concurrent number of client") LocalAddr = flag.String("local-ip", "0.0.0.0", "ip addr of remote server") } func consume() { laddr := &net.TCPAddr{IP: net.ParseIP(*LocalAddr)} var dialer net.Dialer dialer.LocalAddr = laddr conn, err := dialer.Dial("tcp", *RemoteAddr+":8888") if err != nil { fmt.Println("dial failed:", err) os.Exit(1) } defer conn.Close() buffer := make([]byte, 512) for { _, err2 := conn.Read(buffer) if err2 != nil { fmt.Println("Read failed:", err2) return } // fmt.Println("count:", n, "msg:", string(buffer)) } } func main() { flag.Parse() for i := 0; i < *ConcurNum; i++ { go consume() } time.Sleep(3600 * time.Second) }
服务端
package main import ( "fmt" "net" "os" "time" ) var array []byte = make([]byte, 10) func checkError(err error, info string) (res bool) { if err != nil { fmt.Println(info + " " + err.Error()) return false } return true } func Handler(conn net.Conn) { for { _, err := conn.Write(array) if err != nil { return } time.Sleep(10 * time.Second) } } func main() { for i := 0; i < 10; i += 1 { array[i] = 'a' } service := ":8888" tcpAddr, _ := net.ResolveTCPAddr("tcp4", service) l, _ := net.ListenTCP("tcp", tcpAddr) for { conn, err := l.Accept() if err != nil { fmt.Printf("accept error, err=%s\n", err.Error()) os.Exit(1) } go Handler(conn) } }
高性能网络编程的线程模型
TPC
TPC 是 Thread Per Connection 的缩写,指每次有新的连接就新建一个线程去专门处理这个连接请求。

模型特点:
- 采用阻塞式I/O模型获取输入数据
- 每个连接都需要独立的线程完成数据输入,业务处理,数据返回的完整操作
存在的问题:
- 并发数较大时,需要创建大量线程来处理连接,系统资源占用较大
reactor
reactor模式的核心组成包括reactor和线程池。reactor负责监听网络连接的IO是否可读可写,线程池负责具体业务的处理。在高并发的场景下,reactor采用epoll的效率非常高。

模型特点:
- 采用非阻塞I/O,I/O多路复用
- 采用线程池来处理业务
golang GPC模型
GPC 是 Goroutine Per Connection 的缩写,指每次有新的连接就新启动一个golang协程去专门处理这个连接请求。

模型特点:
- 可采用阻塞IO的方式编程
- 每个连接都需要独立的协程完成数据输入,业务处理,数据返回的完整操作
为什么GPC可以支持单机百万并发
GPC模型跟TPC模型看起来非常相似,为什么GPC可以支持单机百万并发呢?
GPC模型、TPC模型比较
- 栈大小:GPC模型中goroutine栈初始大小为4kB,栈的大小可以按需动态增加或减小。而TPC模型中线程默认栈大小为1MB。
- IO模型:GPC和TPC都是阻塞式编程。但是GPC模型底层是非阻塞IO,golang在语言层面将非阻塞IO包装成了阻塞IO(底层实现是非阻塞IO未就绪时,读操作返回EAGAIN,golang运行时系统将协程状态设置为Wait,进行协程的切换)
- 协程、线程的切换: 协程的切换比线程切换要简单的多,可参考 linux操作系统笔记(进程)
GPC模型背后的秘密
GPC模型底层实现其实是reactor模型,golang在语言层面将这一模型封装好,可以采用阻塞的方式编码

GPC模型源码分析
golang源码版本为1.9.4

IO线程的源码实现
启动一个线程运行sysmon函数
runtime/proc.go
// The main goroutine. func main() { g := getg() // Racectx of m0->g0 is used only as the parent of the main goroutine. // It must not be used for anything else. g.m.g0.racectx = 0 // Max stack size is 1 GB on 64-bit, 250 MB on 32-bit. // Using decimal instead of binary GB and MB because // they look nicer in the stack overflow failure message. if sys.PtrSize == 8 { maxstacksize = 1000000000 } else { maxstacksize = 250000000 } // Allow newproc to start new Ms. mainStarted = true systemstack(func() { //启动线程,运行sysmon函数 newm(sysmon, nil) }) ...........
sysmon的实现
sysmon函数执行netpoll,获得可读写的fd,将fd关联的协程的状态设置为ready
runtime/proc.go
func sysmon() { // If a heap span goes unused for 5 minutes after a garbage collection, // we hand it back to the operating system. scavengelimit := int64(5 * 60 * 1e9) if debug.scavenge > 0 { // Scavenge-a-lot for testing. forcegcperiod = 10 * 1e6 scavengelimit = 20 * 1e6 } lastscavenge := nanotime() nscavenge := 0 lasttrace := int64(0) idle := 0 // how many cycles in succession we had not wokeup somebody delay := uint32(0) for { if idle == 0 { // start with 20us sleep... delay = 20 } else if idle > 50 { // start doubling the sleep after 1ms... delay *= 2 } if delay > 10*1000 { // up to 10ms delay = 10 * 1000 } usleep(delay) 。。。。 // poll network if not polled for more than 10ms lastpoll := int64(atomic.Load64(&sched.lastpoll)) now := nanotime() if lastpoll != 0 && lastpoll+10*1000*1000 < now { atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now)) //netpoll中会执行epollWait,epollWait返回可读写的fd //netpoll返回可读写的fd关联的协程 gp := netpoll(false) // non-blocking - returns list of goroutines if gp != nil { // Need to decrement number of idle locked M's // (pretending that one more is running) before injectglist. // Otherwise it can lead to the following situation: // injectglist grabs all P's but before it starts M's to run the P's, // another M returns from syscall, finishes running its G, // observes that there is no work to do and no other running M's // and reports deadlock. incidlelocked(-1) //将可读写fd关联的协程状态设置为ready injectglist(gp) incidlelocked(1) } } 。。。。。。 }
netpoll的实现
netpoll执行epollWait,获取可读写的fd,返回可读写fd关联的协程
runtime/netpoll_epoll.go
// polls for ready network connections // returns list of goroutines that become runnable func netpoll(block bool) *g { if epfd == -1 { return nil } waitms := int32(-1) if !block { waitms = 0 } var events [128]epollevent retry: n := epollwait(epfd, &events[0], int32(len(events)), waitms) // print("epoll wait\n") if n < 0 { if n != -_EINTR { println("runtime: epollwait on fd", epfd, "failed with", -n) throw("runtime: netpoll failed") } goto retry } var gp guintptr for i := int32(0); i < n; i++ { ev := &events[i] if ev.events == 0 { continue } var mode int32 if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 { mode += 'r' } if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 { mode += 'w' } if mode != 0 { pd := *(**pollDesc)(unsafe.Pointer(&ev.data)) //将pd关联的协程加入到gp协程链上 netpollready(&gp, pd, mode) } } if block && gp == 0 { goto retry } return gp.ptr() }
injectglist的实现
injectglist将协程的状态设置为ready状态
runtime/proc.go
// Injects the list of runnable G's into the scheduler. // Can run concurrently with GC. func injectglist(glist *g) { if glist == nil { return } if trace.enabled { for gp := glist; gp != nil; gp = gp.schedlink.ptr() { traceGoUnpark(gp, 0) } } lock(&sched.lock) var n int for n = 0; glist != nil; n++ { gp := glist glist = gp.schedlink.ptr() //将waiting状态的协程设置为runnable casgstatus(gp, _Gwaiting, _Grunnable) globrunqput(gp) } unlock(&sched.lock) for ; n != 0 && sched.npidle != 0; n-- { startm(nil, false) } }
服务端socket实现
net.ListenTCP的实现
ListenTCP调用socket函数,socket函数会通过系统调用创建socket、设置非阻塞、bind、listen
net/sock_posix.go
// socket returns a network file descriptor that is ready for // asynchronous I/O using the network poller. func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr) (fd *netFD, err error) { //sysSocket函数会通过系统调用创建socket,并通过系统调用设置非阻塞 s, err := sysSocket(family, sotype, proto) if err != nil { return nil, err } if err = setDefaultSockopts(s, family, sotype, ipv6only); err != nil { poll.CloseFunc(s) return nil, err } //为socket分配文件描述符fd if fd, err = newFD(s, family, sotype, net); err != nil { poll.CloseFunc(s) return nil, err } // This function makes a network file descriptor for the // following applications: // // - An endpoint holder that opens a passive stream // connection, known as a stream listener // // - An endpoint holder that opens a destination-unspecific // datagram connection, known as a datagram listener // // - An endpoint holder that opens an active stream or a // destination-specific datagram connection, known as a // dialer // - An endpoint holder that opens the other connection, such // as talking to the protocol stack inside the kernel // // For stream and datagram listeners, they will only require // named sockets, so we can assume that it's just a request // from stream or datagram listeners when laddr is not nil but // raddr is nil. Otherwise we assume it's just for dialers or // the other connection holders. if laddr != nil && raddr == nil { switch sotype { case syscall.SOCK_STREAM, syscall.SOCK_SEQPACKET: //listenStream会通过系统调用bind绑定socket地址,通过系统调用listen //进行socket监听,通过fd.init()函数将fd加入epoll if err := fd.listenStream(laddr, listenerBacklog); err != nil { fd.Close() return nil, err } return fd, nil case syscall.SOCK_DGRAM: if err := fd.listenDatagram(laddr); err != nil { fd.Close() return nil, err } return fd, nil } } if err := fd.dial(ctx, laddr, raddr); err != nil { fd.Close() return nil, err } return fd, nil
Accept的实现
net/fd_unix.go
func (fd *netFD) accept() (netfd *netFD, err error) { //pfd.Accept会执行accept系统调用,返回新的socket连接, //并设置新的socket连接为非阻塞 d, rsa, errcall, err := fd.pfd.Accept() if err != nil { if errcall != "" { err = wrapSyscallError(errcall, err) } return nil, err } //为新的连接分配一个文件描述符 if netfd, err = newFD(d, fd.family, fd.sotype, fd.net); err != nil { poll.CloseFunc(d) return nil, err } //通过netfd.init(),将accept新返回的socket fd添加到epoll if err = netfd.init(); err != nil { fd.Close() return nil, err } lsa, _ := syscall.Getsockname(netfd.pfd.Sysfd) netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa)) return netfd, nil }
internal/poll/fd_unix.go
// Accept wraps the accept network call. func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) { if err := fd.readLock(); err != nil { return -1, nil, "", err } defer fd.readUnlock() if err := fd.pd.prepareRead(fd.isFile); err != nil { return -1, nil, "", err } for { //accept函数内部会执行accept系统调用 //将返回的新的socket fd设置为非阻塞 s, rsa, errcall, err := accept(fd.Sysfd) if err == nil { return s, rsa, "", err } switch err { //socket全连接队列为空 case syscall.EAGAIN: if fd.pd.pollable() { //设置协程状态为wait if err = fd.pd.waitRead(fd.isFile); err == nil { continue } } case syscall.ECONNABORTED: // This means that a socket on the listen // queue was closed before we Accept()ed it; // it's a silly error, so try again. continue } return -1, nil, errcall, err } }
Read的实现
internal/poll/fd_unix.go
// Read implements io.Reader. func (fd *FD) Read(p []byte) (int, error) { if err := fd.readLock(); err != nil { return 0, err } defer fd.readUnlock() if len(p) == 0 { // If the caller wanted a zero byte read, return immediately // without trying (but after acquiring the readLock). // Otherwise syscall.Read returns 0, nil which looks like // io.EOF. // TODO(bradfitz): make it wait for readability? (Issue 15735) return 0, nil } if err := fd.pd.prepareRead(fd.isFile); err != nil { return 0, err } if fd.IsStream && len(p) > maxRW { p = p[:maxRW] } for { //执行read系统调用 n, err := syscall.Read(fd.Sysfd, p) if err != nil { n = 0 if err == syscall.EAGAIN && fd.pd.pollable() { //socket fd没有数据可读,将协程状态设置为wait if err = fd.pd.waitRead(fd.isFile); err == nil { continue } } } err = fd.eofError(n, err) return n, err } }
Write的实现
internal/poll/fd_unix.go
// Write implements io.Writer. func (fd *FD) Write(p []byte) (int, error) { if err := fd.writeLock(); err != nil { return 0, err } defer fd.writeUnlock() if err := fd.pd.prepareWrite(fd.isFile); err != nil { return 0, err } var nn int for { max := len(p) if fd.IsStream && max-nn > maxRW { max = nn + maxRW } //执行write系统调用 n, err := syscall.Write(fd.Sysfd, p[nn:max]) if n > 0 { nn += n } if nn == len(p) { return nn, err } if err == syscall.EAGAIN && fd.pd.pollable() { //socket fd不可写,将协程状态设置为wait if err = fd.pd.waitWrite(fd.isFile); err == nil { continue } } if err != nil { return nn, err } if n == 0 { return nn, io.ErrUnexpectedEOF } } }
GPC模型总结
1 新建socket、accept的socket都设置为非阻塞
2.新建socket、accept的socket的fd都加入epoll
- Read、Write采用循环读写,如果返回EAGAIN,将协程状态设置为wait
- io线程定期执行sysmon,通过epollWait获取可读写的fd,将fd关联的协程设置为runable