python异步socket-server

同步示例代码

import socket

HOST = 'localhost'
PORT = 8888
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
    s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    s.bind((HOST, PORT))
    s.listen(128)
    while True:
        conn, addr = s.accept()
        print('connected by', addr)
        with conn:
            while 1:
                msg = conn.recv(1024)
                if not msg:
                    break
                conn.sendall(msg)

IO复用的方式进行改造

这里使用 python3 提供的 selectros 来改造它,这个模块封装了操作系统底层提供的 I/O 复用机制(同步非阻塞),比如 linux 上使用了 epoll。通过 I/O 复用机制我们可以监听多个文件描述符的可读写事件并且注册回调函数,拥有更好的并发性能。

import selectors
import socket

sel = selectors.DefaultSelector()

def accept(sock, mask):
    conn, addr = sock.accept()  # Should be ready
    print('accepted', conn, 'from', addr)
    conn.setblocking(False)
    sel.register(conn, selectors.EVENT_READ, read)

def read(conn, mask):
    data = conn.recv(1000)  # Should be ready
    if data:
        print('echoing', repr(data), 'to', conn)
        conn.send(data)  # Hope it won't block
    else:
        print('closing', conn)
        sel.unregister(conn)
        conn.close()

sock = socket.socket()
sock.bind(('localhost', 1234))
sock.listen(100)
sock.setblocking(False)
sel.register(sock, selectors.EVENT_READ, accept)

while True:  # EventLoop
    events = sel.select()
    for key, mask in events:  # key代表selectorKey对象, mask代表read or write方法
        # callback相当于调accept函数
        callback = key.data
        # 获取函数内存地址,加入参数
        # key.fileobj = 文件句柄
        callback(key.fileobj, mask)

实现EventLoop类的改造方案

import selectors
import socket


class EventLoop:
    def __init__(self, selector=None):
        if selector is None:
            selector = selectors.DefaultSelector()
        self.selector = selector

    def run_forever(self):
        while True:  # EventLoop
            event = self.selector.select()
            for key, mask in event:
                if mask == selectors.EVENT_READ:  # 当监听到是读事件
                    callback = key.data  # callback is _on_read or accept
                    callback(key.fileobj)
                else:
                    callback, msg = key.data  # 当监听到是写事件
                    callback(key.fileobj, msg)  # callback is _on_write


class TCPEchoServer:
    def __init__(self, host, port, loop):
        self.host = host
        self.port = port
        self._loop = loop
        self.s = socket.socket()

    def run(self):
        self.s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.s.bind((self.host, self.port))
        self.s.listen(128)
        self.s.setblocking(False)
        self._loop.selector.register(self.s, selectors.EVENT_READ, self._accept)
        self._loop.run_forever()

    def _accept(self, sock):
        conn, addr = sock.accept()
        print('accepted', conn, 'from', addr)
        conn.setblocking(False)
        self._loop.selector.register(conn, selectors.EVENT_READ, self._on_read)

    def _on_read(self, conn):
        msg = conn.recv(1024)
        if msg:
            print('echoing', repr(msg), 'to', conn)
            self._loop.selector.modify(conn, selectors.EVENT_WRITE, (self._on_write, msg))
        else:
            print('closing', conn)
            self._loop.selector.unregister(conn)
            conn.close()

    def _on_write(self, conn, msg):
        conn.sendall(msg)
        self._loop.selector.modify(conn, selectors.EVENT_READ, self._on_read)


event_loop = EventLoop()
echo_server = TCPEchoServer('localhost', 8888, event_loop)
echo_server.run()

基于future对象的异步模式

上述的EventLoop模型都是通过采用监听,回调的方式获取到异步调用的结果呢?python提供了一个叫做 Future的对象,当异步调用执行完的时候,用来保存它的结果。 Future 对象的 result 用来保存未来的执行结果,setresult 用来设置 result并且运行给 future 对象添加的回调。
这里使用了原生的协程实现方式——yield from。

import selectors
import socket


class Future:  # 自定义future对象
    def __init__(self):
        self.result = None
        self._callbacks = []

    def add_done_callback(self, fn):
        self._callbacks.append(fn)

    def set_result(self, result):  # 设置结果同时,执行回调自己
        self.result = result
        for callback in self._callbacks:
            callback(self)

    def __iter__(self):
        yield self  # 产出自己
        return self.result   # yield from 将把 result 值返回作为 yield from 表达式的值


class Task:
    """管理生成器的执行"""

    def __init__(self, coro):
        self.coro = coro
        f = Future()
        f.set_result(None)
        self.step(f)

    def step(self, future):
        try:  # 把当前 future 的结果发送给协程作为 yield from 表达式的值,同时执行到下一个 future 处
            next_future = self.coro.send(future.result)
        except StopIteration:
            return
        next_future.add_done_callback(self.step)


class TCPEchoServer:
    def __init__(self, host, port, loop):
        self.host = host
        self.port = port
        self._loop = loop
        self.s = socket.socket()

    def run(self):
        self.s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.s.bind((self.host, self.port))
        self.s.listen(128)
        self.s.setblocking(False)

        while True:
            conn, addr = yield from self.accept()
            msg = yield from self.read(conn)
            if msg:
                yield from self.sendall(conn, msg)
            else:
                conn.close()

    def accept(self):
        f = Future()

        def on_accept():
            conn, addr = self.s.accept()
            print('accepted', conn, 'from', addr)
            conn.setblocking(False)
            f.set_result((conn, addr))  # accept 的 result 是接受连接的新对象 conn, addr

        self._loop.selector.register(self.s, selectors.EVENT_READ, on_accept)
        conn, addr = yield from f  # 委派给 future 对象,直到 future 执行了 socket.accept() 并且把 result 返回
        self._loop.selector.unregister(self.s)
        return conn, addr

    def read(self, conn):
        f = Future()

        def on_read():
            msg = conn.recv(1024)
            f.set_result(msg)

        self._loop.selector.register(conn, selectors.EVENT_READ, on_read)
        msg = yield from f
        return msg

    def sendall(self, conn, msg):
        f = Future()

        def on_write():
            conn.sendall(msg)
            f.set_result(None)
            self._loop.selector.unregister(conn)
            conn.close()

        self._loop.selector.modify(conn, selectors.EVENT_WRITE, on_write)
        yield from f


class EventLoop:
    def __init__(self, selector=None):
        if selector is None:
            selector = selectors.DefaultSelector()
        self.selector = selector

    def create_task(self, coro):
        return Task(coro)

    def run_forever(self):
        while 1:
            events = self.selector.select()
            for event_key, event_mask in events:
                callback = event_key.data
                callback()


event_loop = EventLoop()
echo_server = TCPEchoServer('localhost', 8888, event_loop)
task = Task(echo_server.run())
event_loop.run_forever()

性能测试

建立10000个连接的tcp客户端

# Echo client program
import socket
import concurrent.futures
import time

HOST = '127.0.0.1'  # The remote host
PORT = 8888  # The same port as used by the server
time_list = []


def run_time(func):
    def wrapper(i, *args, **kwargs):
        start_time = time.perf_counter()
        func(i)
        end_time = time.perf_counter()
        time_list.append(end_time-start_time)

    return wrapper


@run_time
def send_message(i):
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.connect((HOST, PORT))
        s.sendall(('hello world {}\n'.format(i)).encode('utf-8'))
        data = s.recv(1024)
        # print('Received', repr(data))


with concurrent.futures.ThreadPoolExecutor(max_workers=128) as executors:
    executors.map(send_message, [i for i in range(10000)])

all_time = 0
for i in time_list:
    all_time += i
print(all_time)

时间结果

同步测试时间: 368.0034764000006
select IO复用:154.46833750000008
异步+IO复用:69.28810920000004