一个email下载器:多线程思路
相对于多进程解决方案,多线程最大的好处就是imap连接可以复用,不用每次fetch的时候都要新建一个连接。
emaildownloader工具多线程解决方案(具体代码参考commit daaffb01 e_threading.py)没有使用threading类,而使用concurrent.futures 类,concurrent.futures对threading和multiprocessing 做了进一步封装,并提供一致性的接口。
concurrent.futures主要实现了线程池或进程池,这一点特别符合emaildownloader工具的应用场景,本质上也很简单,基本上涉及不到锁的控制。
基本的程序如下:
ex = futures.ThreadPoolExecutor(max_workers=5) wait_for = [ ex.submit(task, i) for i in range(5, 0, -1) ] try: for f in futures.as_completed(wait_for): print('main: result: {}'.format(f.result())) except concurrent.futures.TimeoutError as err: print(err, "TimeoutError")
submit()方法用于提交任务,task函数用户具体的fetch工作。
1:开始我设想一个imap连接断开后,可以终止线程运行,虽然线程也可以取消,但这由操作系统控制,也就是说不一定取作用,所以不能依赖它,而且如果你在as_completed()函数后面再调用Future对象的cancel()方法,基本上不会成功,因为这个时候任务已经完成。
2:as_completed()方法和map()方法的区别
map()方法会迭代每一个Future对象,然后阻塞等待结果,由于fetch指令是网络I/O操作,响应可能会很慢,所以map()基本上还是顺序运行,没有用到多线程的并发能力。
而如果子线程执行结果没有顺序的要求,那么as_completed()方法是最合适的,只要子线程任务完成,它就能随机响应,内部实现估计用到了中断,类似于异步,threading类没有这些机制。
3:as_completed()方法和result()
当Future对象finished或cancelled,则as_completed()返回一个迭代器,这个迭代器有个方法就是result(),用于响应子线程的结果,按照我的理解这二个方法应该是同时产生的,但他们居然都有timeout参数,文档中也没有详细解释差异。
设置timeout的参数是让不等待Future对象的状态,让程序可以继续做其他的事情,相当于不阻塞,但必须了解的是即使timeout了,子线程仍然会继续运行。
4:设置timeout的目的是让程序能够快速响应,避免因为某些任务一直阻塞下去,但在 emaildownloader 工具中,即使设置 timeout,有的时候整个程序完全阻塞,也不触发concurrent.futures.TimeoutError异常。
通过strace才发现有FUTEX_WAIT_PRIVATE错误,主要原因是imaplib库没有设置timeout的机制(默认是无限等待)。
也就是说,如果要避免长时间阻塞,还是需要子线程函数中处理,由于imaplib没有timeout参数设置,所以可以如下处理 socket.setdefaulttimeout(10)
。
5:imap连接复用
imap fetch指令可能会因为多种原因超时或断开,一旦触发这个异常,可以在这个子线程中重新建立连接,这样其他子线程同样也能复用该连接,不要忘记global关键字。
但发现的一个问题,在子线程中重新建立连接后,后面一些子线程仍然会报连接断开,可能是其他处于调度的线程没有获取到这个连接。
由于concurrent.futures做了很多的封装,在排查问题的时候更复杂,所以后续可能考虑使用原生的threading类来实现。
如果本篇文章看不太懂,可以查看https://github.com/ywdblog/emaildownloader/中的e_threading.py,相信会有更多的启发。
相关文章:
-
emaildownloader工具:
https://www.github.com/ywdblog/emaildownloader