python异步socket-server
2014 年 9 月 18 日
同步示例代码
import socket HOST = 'localhost' PORT = 8888 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.bind((HOST, PORT)) s.listen(128) while True: conn, addr = s.accept() print('connected by', addr) with conn: while 1: msg = conn.recv(1024) if not msg: break conn.sendall(msg)
IO复用的方式进行改造
这里使用 python3 提供的 selectros 来改造它,这个模块封装了操作系统底层提供的 I/O 复用机制(同步非阻塞),比如 linux 上使用了 epoll。通过 I/O 复用机制我们可以监听多个文件描述符的可读写事件并且注册回调函数,拥有更好的并发性能。
import selectors import socket sel = selectors.DefaultSelector() def accept(sock, mask): conn, addr = sock.accept() # Should be ready print('accepted', conn, 'from', addr) conn.setblocking(False) sel.register(conn, selectors.EVENT_READ, read) def read(conn, mask): data = conn.recv(1000) # Should be ready if data: print('echoing', repr(data), 'to', conn) conn.send(data) # Hope it won't block else: print('closing', conn) sel.unregister(conn) conn.close() sock = socket.socket() sock.bind(('localhost', 1234)) sock.listen(100) sock.setblocking(False) sel.register(sock, selectors.EVENT_READ, accept) while True: # EventLoop events = sel.select() for key, mask in events: # key代表selectorKey对象, mask代表read or write方法 # callback相当于调accept函数 callback = key.data # 获取函数内存地址,加入参数 # key.fileobj = 文件句柄 callback(key.fileobj, mask)
实现EventLoop类的改造方案
import selectors import socket class EventLoop: def __init__(self, selector=None): if selector is None: selector = selectors.DefaultSelector() self.selector = selector def run_forever(self): while True: # EventLoop event = self.selector.select() for key, mask in event: if mask == selectors.EVENT_READ: # 当监听到是读事件 callback = key.data # callback is _on_read or accept callback(key.fileobj) else: callback, msg = key.data # 当监听到是写事件 callback(key.fileobj, msg) # callback is _on_write class TCPEchoServer: def __init__(self, host, port, loop): self.host = host self.port = port self._loop = loop self.s = socket.socket() def run(self): self.s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.s.bind((self.host, self.port)) self.s.listen(128) self.s.setblocking(False) self._loop.selector.register(self.s, selectors.EVENT_READ, self._accept) self._loop.run_forever() def _accept(self, sock): conn, addr = sock.accept() print('accepted', conn, 'from', addr) conn.setblocking(False) self._loop.selector.register(conn, selectors.EVENT_READ, self._on_read) def _on_read(self, conn): msg = conn.recv(1024) if msg: print('echoing', repr(msg), 'to', conn) self._loop.selector.modify(conn, selectors.EVENT_WRITE, (self._on_write, msg)) else: print('closing', conn) self._loop.selector.unregister(conn) conn.close() def _on_write(self, conn, msg): conn.sendall(msg) self._loop.selector.modify(conn, selectors.EVENT_READ, self._on_read) event_loop = EventLoop() echo_server = TCPEchoServer('localhost', 8888, event_loop) echo_server.run()
基于future对象的异步模式
上述的EventLoop模型都是通过采用监听,回调的方式获取到异步调用的结果呢?python提供了一个叫做 Future的对象,当异步调用执行完的时候,用来保存它的结果。 Future 对象的 result 用来保存未来的执行结果,setresult 用来设置 result并且运行给 future 对象添加的回调。
这里使用了原生的协程实现方式——yield from。
import selectors import socket class Future: # 自定义future对象 def __init__(self): self.result = None self._callbacks = [] def add_done_callback(self, fn): self._callbacks.append(fn) def set_result(self, result): # 设置结果同时,执行回调自己 self.result = result for callback in self._callbacks: callback(self) def __iter__(self): yield self # 产出自己 return self.result # yield from 将把 result 值返回作为 yield from 表达式的值 class Task: """管理生成器的执行""" def __init__(self, coro): self.coro = coro f = Future() f.set_result(None) self.step(f) def step(self, future): try: # 把当前 future 的结果发送给协程作为 yield from 表达式的值,同时执行到下一个 future 处 next_future = self.coro.send(future.result) except StopIteration: return next_future.add_done_callback(self.step) class TCPEchoServer: def __init__(self, host, port, loop): self.host = host self.port = port self._loop = loop self.s = socket.socket() def run(self): self.s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.s.bind((self.host, self.port)) self.s.listen(128) self.s.setblocking(False) while True: conn, addr = yield from self.accept() msg = yield from self.read(conn) if msg: yield from self.sendall(conn, msg) else: conn.close() def accept(self): f = Future() def on_accept(): conn, addr = self.s.accept() print('accepted', conn, 'from', addr) conn.setblocking(False) f.set_result((conn, addr)) # accept 的 result 是接受连接的新对象 conn, addr self._loop.selector.register(self.s, selectors.EVENT_READ, on_accept) conn, addr = yield from f # 委派给 future 对象,直到 future 执行了 socket.accept() 并且把 result 返回 self._loop.selector.unregister(self.s) return conn, addr def read(self, conn): f = Future() def on_read(): msg = conn.recv(1024) f.set_result(msg) self._loop.selector.register(conn, selectors.EVENT_READ, on_read) msg = yield from f return msg def sendall(self, conn, msg): f = Future() def on_write(): conn.sendall(msg) f.set_result(None) self._loop.selector.unregister(conn) conn.close() self._loop.selector.modify(conn, selectors.EVENT_WRITE, on_write) yield from f class EventLoop: def __init__(self, selector=None): if selector is None: selector = selectors.DefaultSelector() self.selector = selector def create_task(self, coro): return Task(coro) def run_forever(self): while 1: events = self.selector.select() for event_key, event_mask in events: callback = event_key.data callback() event_loop = EventLoop() echo_server = TCPEchoServer('localhost', 8888, event_loop) task = Task(echo_server.run()) event_loop.run_forever()
性能测试
建立10000个连接的tcp客户端
# Echo client program import socket import concurrent.futures import time HOST = '127.0.0.1' # The remote host PORT = 8888 # The same port as used by the server time_list = [] def run_time(func): def wrapper(i, *args, **kwargs): start_time = time.perf_counter() func(i) end_time = time.perf_counter() time_list.append(end_time-start_time) return wrapper @run_time def send_message(i): with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.connect((HOST, PORT)) s.sendall(('hello world {}\n'.format(i)).encode('utf-8')) data = s.recv(1024) # print('Received', repr(data)) with concurrent.futures.ThreadPoolExecutor(max_workers=128) as executors: executors.map(send_message, [i for i in range(10000)]) all_time = 0 for i in time_list: all_time += i print(all_time)
时间结果
同步测试时间: 368.0034764000006 select IO复用:154.46833750000008 异步+IO复用:69.28810920000004