Python多进程相关概念及解释

Python多进程相关概念及解释

介绍

由于GIL的存在,为了利用多核的优势,Python程序员不得不使用多进程。(多线程在PYthon中仅在IO密集型任务中有优势)。
Python提供了非常好用的多进程包multiprocessing,只需要定义一个函数,Python会完成其他所有事情。借助这个包,可以轻松完成从单进程到并发执行的转换。multiprocessing支持子进程、通信和共享数据、执行不同形式的同步,提供了Process、Queue、Pipe、Lock等组件。
感觉使用方法和threading差不多。

原始方法fork

fork函数被调用一次,但返回两次。对父进程返回子进程ID, 对子进程返回0.
子进程获得父进程数据空间、堆和栈的副本。父子进程并不共享这些存储空间部分,父、子进程共享正文段。

由于在fork之后经常跟随着exec,所以现在的很多实现并不执行一个父进程数据段、堆和栈的完全复制。作为替代,使用了写时复制技术。
这些区域由父、子进程共享,而且内核将它们的访问权限改变为只读的。
如果父、子进程中的任一个试图修改这些区域,则内核只为修改区域的那块内存制作一个副本,通常是虚拟存储器系统中的一“页”。

一般来说,在fork之后是父进程还是子进程执行是不确定的。

在类unix系统中, python的os 模块内置了fork 函数用以创建子进程。

示例代码:

import os

print("Process %s start …" %(os.getpid()))
pid = os.fork()
if pid == 0:
    print("This is child process and my pid is %d, my father process is %d" %(os.getpid(), os.getppid()))
else:
    print("This is Fater process, And Its child pid is %d" %(pid))

输出结果:

Process 872 start …
This is Fater process, And Its child pid is 873
This is child process and my pid is 873, my father process is 872

subprocess

subprocess包主要功能是执行外部的命令和程序。比如说,我需要使用wget下载文件。我在Python中调用wget程序。从这个意义上来说,subprocess的功能与shell类似。

Popen

subprocess模块中只定义了一个类: Popen。
可以使用Popen来创建进程,并与进程进行复杂的交互。它的构造函数如下:

subprocess.Popen(args, bufsize=0, executable=None, stdin=None, stdout=None, stderr=None, preexec_fn=None, close_fds=False, shell=False, cwd=None, env=None, universal_newlines=False, startupinfo=None, creationflags=0)

参数解释:

  • args 可以是字符串或者序列类型(如:list,元组),用于指定进程的可执行文件及其参数。如果是序列类型,第一个元素通常是可执行文件的路径。我们也可以显式的使用executeable参数来指定可执行文件的路径。在windows操作系统上,Popen通过调用CreateProcess()来创建子进程,CreateProcess接收一个字符串参数,如果args是序列类型,系统将会通过list2cmdline()函数将序列类型转换为字符串。
  • bufsize:指定打开stdin/stdout/stderr文件的缓冲类型。0 无缓冲;1 行缓冲; 其它正数 选择合适的缓冲; 负数(默认值),采用系统默认缓冲大小,即io.DEFAULT_BUFFER_SIZE。
  • executable用于指定可执行程序。一般情况下我们通过args参数来设置所要运行的程序。如果将参数shell设为True,executable将指定程序使用的shell。在windows平台下,默认的shell由COMSPEC环境变量来指定。
  • stdin, stdout, stderr分别表示程序的标准输入、输出、错误句柄。他们可以是PIPE,文件描述符或文件对象,也可以设置为None,表示从父进程继承。
  • preexec_fn 只在Unix平台下有效,用于指定一个可执行对象(callable object),它将在子进程运行之前被调用。
  • Close_sfs:在windows平台下,如果close_fds被设置为True,则新创建的子进程将不会继承父进程的输入、输出、错误管道。我们不能将close_fds设置为True同时重定向子进程的标准输入、输出与错误(stdin, stdout, stderr)。
  • 如果参数shell设为true,程序将通过shell来执行。
  • cwd用于设置子进程的当前目录。
  • env是字典类型,用于指定子进程的环境变量。如果env = None,子进程的环境变量将从父进程中继承。
  • Universal_newlines:不同操作系统下,文本的换行符是不一样的。如:windows下用’/r/n’表示换,而Linux下用’/n’。如果将此参数设置为True,Python统一把这些换行符当作’/n’来处理。
  • startupinfo与createionflags只在windows下用效,它们将被传递给底层的CreateProcess()函数,用于设置子进程的一些属性,如:主窗口的外观,进程的优先级等等。

方法解释:

  • Popen.poll() 用于检查子进程是否已经结束。设置并返回returncode属性。
  • Popen.wait() 等待子进程结束。设置并返回returncode属性。
  • Popen.communicate(input=None) 与子进程进行交互。向stdin发送数据,或从stdout和stderr中读取数据。可选参数input指定发送到子进程的参数。Communicate()返回一个元组:(stdoutdata, stderrdata)。注意:如果希望通过进程的stdin向其发送数据,在创建Popen对象的时候,参数stdin必须被设置为PIPE。同样,如果希望从stdout和stderr获取数据,必须将stdout和stderr设置为PIPE。
  • Popen.send_signal(signal) 向子进程发送信号。
  • Popen.terminate() 停止(stop)子进程。在windows平台下,该方法将调用Windows API TerminateProcess()来结束子进程。
  • Popen.kill() 杀死子进程。

简单示例如下:

import subprocess
child = subprocess.Popen(["ping","-c","5","www.google.com"])
print("parent process")
child.wait()

输出结果:

parent process
PING www.google.com (93.46.8.89): 56 data bytes
Request timeout for icmp_seq 0
Request timeout for icmp_seq 1
Request timeout for icmp_seq 2
Request timeout for icmp_seq 3

www.google.com ping statistics —
5 packets transmitted, 0 packets received, 100.0% packet loss

可以见到子进程的输出直接打印在了屏幕上,因为子进程默认继承了父进程的stdin/stdout/stderr。

Popen的再封装

subprocess.call()
父进程等待子进程完成
返回退出信息(returncode)

subprocess.check_call()
父进程等待子进程完成。
正常执行完返回0,否者抛出subprocess.CalledProcessError,其中包含returncode

subprocess.check_output()

父进程等待子进程完成。
返回子进程向标准输出的输出结果。

事实上这个三个都是利用subprocess.Popen实现的。

简单实现的check_output示例如下:

import subprocess

def my_check_output(args,
                    stdin=None,
                    stderr=None,
                    shell=False,
                    encoding=None,
                    errors=None,
                    universal_newlines=False,
                    timeout=None):

    child = subprocess.Popen(args, stdin=stdin, stdout=subprocess.PIPE,stderr=stderr,shell=shell,universal_newlines=universal_newlines)
    stdout,stderr = child.communicate(input=stdin, timeout=timeout)
    returncode = child.returncode
    if returncode != 0:
        raise subprocess.CalledProcessError(returncode=returncode)
    else:
        return stdout

try:
    ret = my_check_output("pwd",shell=True)
    print(ret)
except subprocess.CalledProcessError as e:
    print(e)

输出结果:

b’/Users/maynard/data/workbase/python/study/Blog_mini/tests\n’

标准模块multiprocessing

fork 方式是仅在linux 下才有的接口, 在windows下并没有, 那么在windows下如何实现多进程呢, 这就用到了multiprocessing

Process创建进程

直接调用

multiprocessing 模块的Process 对象表示的是一个进程对象, 可以创建子进程并执行指定的函数。
创建进程的类:Process([group [, target [, name [, args [, kwargs]]]]]),target表示调用对象,args表示调用对象的位置参数元组。kwargs表示调用对象的字典参数。name为别名。group实质上不使用。

方法:is_alive()、join([timeout])、run()、start()、terminate()。其中,Process以start()启动某个进程。

属性:authkey、daemon、exitcode(进程在运行时为None、如果为–N,表示被信号N结束)、name、pid。
其中daemon是父进程终止后自动终止,且自己不能产生新进程,必须在start()之前设置。

from multiprocessing import Process
import os

def pro_do(name, func):
    print("This is child process %d from parent process %d, and name is  %s which is used for %s"
          %(os.getpid(), os.getppid(), name, func))

if __name__ == "__main__":
    print("Parent process id %d" %(os.getpid()))
    #process 对象指定子进程将要执行的操作方法(pro_do), 以及该函数的对象列表args(必须是tuple格式, 且元素与pro_do的参数一一对应)
    pro = Process(target=pro_do, args=("test", "dev"))
    print("start child process")
    #启动子进程
    pro.start()
    #是否阻塞方式执行, 如果有, 则阻塞方式, 否则非阻塞
    pro.join() #if has this, it’s synchronous operation or asynchronous operation
    print("Process end")

输出结果:

Parent process id 921
start child process
This is child process 922 from parent process 921, and name is test which is used for dev
Process end

子类化Process

自定义类,继承自Process, 实现run方法。

import multiprocessing
import time

class ClockProcess(multiprocessing.Process):
    def __init__(self, interval):
        multiprocessing.Process.__init__(self)
        self.interval = interval

    def run(self):
        n = 5
        while n > 0:
            print("the time is {0}".format(time.ctime()))
            time.sleep(self.interval)
            n -= 1

if __name__ == '__main__':
    p = ClockProcess(3)
    p.start()

输出结果:

the time is Wed Feb 8 16:40:55 2017
the time is Wed Feb 8 16:40:58 2017
the time is Wed Feb 8 16:41:01 2017
the time is Wed Feb 8 16:41:04 2017
the time is Wed Feb 8 16:41:07 2017

Pool进程池

multiprocessing.Pool提供进程池功能。
在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但如果是上百个,上千个目标,手动的去限制进程数量却又太过繁琐,此时可以发挥进程池的功效。
Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来它。

函数解释:

  • apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞,apply(func[, args[, kwds]])是阻塞的
  • close() 关闭pool,使其不在接受新的任务。
  • terminate() 结束工作进程,不再处理未完成的任务。
  • join() 主进程阻塞,等待子进程的退出, join方法要在close或terminate之后使用。
from multiprocessing import Pool
import os, time

def pro_do(process_num):
    print("child process id is %d" %(os.getpid()))
    time.sleep(6-process_num)
    print("this is process %d" %(process_num))

if __name__ == "__main__":
    print("Current process is %d" %(os.getpid()))
    p = Pool()#默认启动CPU个数的进程,可以传参指定。
    for i in range(5):
        p.apply_async(pro_do, (i,))  #增加新的进程
    p.close() # 禁止在增加新的进程
    p.join()#等待所有子进程结束
    print("pool process done")

输出结果:

Current process is 960
child process id is 961
child process id is 963
child process id is 962
child process id is 964
this is process 3
child process id is 964
this is process 2
this is process 1
this is process 4
this is process 0
pool process done

进程间的通信

父进程可以指定子进程执行的方法及其参数, 达到父进程向子进程传递消息的单向通信的目的, 那子进程之间或子进程怎么向父进程通信呢

Queue

Queue是通过Pipe和锁、信号量实现,在进程间共享的一个通信机制。
Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。

get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常。

示例如下:

from multiprocessing import Process, Queue
import os, time

def write_queue(q):
    for name in ["Yi_Zhi_Yu", "Tony" ,"San"]:
        print("put name %s to queue" %(name))
        q.put(name)
        time.sleep(2)
    print ("write data finished")

def read_queue(q):
    print( "begin to read data")
    while True:
        name = q.get()
        print ("get name %s from queue" %(name))

if __name__ == "__main__":
    q = Queue()
    pw = Process(target=write_queue, args=(q,))
    pr = Process(target=read_queue,args=(q,))

    pw.start()
    pr.start()
    pw.join() #这个表示是否阻塞方式启动进程, 如果要立即读取的话, 两个进程的启动就应该是非阻塞式的, 所以pw在start后不能立即使用pw.join(), 要等pr start后方可
    pr.terminate() #服务进程,强制停止

输出结果:

put name Yi_Zhi_Yu to queue
begin to read data
get name Yi_Zhi_Yu from queue
put name Tony to queue
get name Tony from queue
put name San to queue
get name San from queue
write data finished

Manager

Queue不能用在父进程和Pool产生的子进程中。这个时候需要用到Manager。
其它如Lock等,也不能用multiprocessing.Lock,而是得用Manager中的Lock。

代码示例如下:

from multiprocessing import Process, Queue,Pool,Manager
import os, time

def write_queue(q):
    for name in ["Yi_Zhi_Yu", "Tony" ,"San"]:
        print("put name %s to queue" %(name))
        q.put(name)
        time.sleep(2)
    print ("write data finished")

def read_queue(q):
    print( "begin to read data")
    while True:
        name = q.get()
        print ("get name %s from queue" %(name))

if __name__ == "__main__":
    manager = Manager()
    q = manager.Queue()
    p = Pool()
    pw = p.apply_async(write_queue,args=(q,))
    pr = p.apply_async(read_queue,args=(q,))
    p.close()
    p.join()
    print("main end")

输出结果:

put name Yi_Zhi_Yu to queue
begin to read data
get name Yi_Zhi_Yu from queue
put name Tony to queue
get name Tony from queue
put name San to queue
get name San from queue
write data finished

Pipe

管道是UNIX系统IPC的最古老的形式,所有UNIX系统都提供此种通信机制。管道有以下两种局限性。

  1. 历史上,它们是半双工的(即数据只能在一个方向上流动)。现在,某些系统提供全双工管道,但是为了最佳的可移植性,我们决不应预先假定系统支持全双工管道。
  2. 管道只能在具有公共祖先的两个进程之间使用。通常,一个管道由一个进程创建,在进程调用fork之后,这个管道就能在父进程和子进程之间使用了。

Pipe方法返回(conn1, conn2)代表一个管道的两个端。Pipe方法有duplex参数,如果duplex参数为True(默认值),那么这个管道是全双工模式,也就是说conn1和conn2均可收发。duplex为False,conn1只负责接受消息,conn2只负责发送消息。

send和recv方法分别是发送和接受消息的方法。例如,在全双工模式下,可以调用conn1.send发送消息,conn1.recv接收消息。如果没有消息可接收,recv方法会一直阻塞。如果管道已经被关闭,那么recv方法会抛出EOFError。

代码示例:

#!/usr/bin/env python
#encoding=utf-8

from multiprocessing import Process,Pipe
import os,time,sys

def send_pipe(p):
    names = ["Yi_Zhi_Yu", "Tony", "San"]
    for name in names:
        print("put name %s to Pipe" %(name))
        p.send(name)
        time.sleep(1)
def recv_pipe(p):
    print("Try to read data in pipe")
    while True:
            name = p.recv()
            print("get name %s from pipe" %(name))

if __name__ == "__main__":
   #pipe, one for send, one for read
   ps_pipe, pr_pipe = Pipe()# return  (conn1, conn2). 有个参数来决定是否双向管理.True为双向管道,False为单向,conn1发送,conn2接收.默认True
   #process
   ps = Process(target=send_pipe, args=(ps_pipe,))
   pr = Process(target=recv_pipe, args=(pr_pipe,))
   pr.start()
   ps.start()
   ps.join()
   pr.terminate()

输出结果:

Try to read data in pipe
put name Yi_Zhi_Yu to Pipe
get name Yi_Zhi_Yu from pipe
put name Tony to Pipe
get name Tony from pipe
put name San to Pipe
get name San from pipe

Event

Event用来实现进程间同步通信。

示例代码:

import multiprocessing
import time

def wait_for_event(e):
    print("wait_for_event: starting")
    e.wait()
    print("wairt_for_event: e.is_set()->" + str(e.is_set()))

def wait_for_event_timeout(e, t):
    print("wait_for_event_timeout:starting")
    e.wait(t)
    print("wait_for_event_timeout:e.is_set->" + str(e.is_set()))

if __name__ == "__main__":
    e = multiprocessing.Event()
    w1 = multiprocessing.Process(name = "block",
            target = wait_for_event,
            args = (e,))

    w2 = multiprocessing.Process(name = "non-block",
            target = wait_for_event_timeout,
            args = (e, 2))
    w1.start()
    w2.start()

    time.sleep(3)

    e.set()
    print("main: event is set")

输出结果:

wait_for_event: starting
wait_for_event_timeout:starting
wait_for_event_timeout:e.is_set->False
main: event is set
wairt_for_event: e.is_set()->True

解决共享资源

Lock

有with用法和明确的acquire和release用法。见代码示例:

import multiprocessing
import sys

def worker_with(lock, f):
    with lock:
        fs = open(f, 'a+')
        n = 10
        while n > 1:
            fs.write("Lockd acquired via with\n")
            n -= 1
        fs.close()

def worker_no_with(lock, f):
    lock.acquire()
    try:
        fs = open(f, 'a+')
        n = 10
        while n > 1:
            fs.write("Lock acquired directly\n")
            n -= 1
        fs.close()
    finally:
        lock.release()

if __name__ == "__main__":
    lock = multiprocessing.Lock()
    f = "file.txt"
    w = multiprocessing.Process(target = worker_with, args=(lock, f))
    nw = multiprocessing.Process(target = worker_no_with, args=(lock, f))
    w.start()
    nw.start()
    print("end")

file.txt文件内容:

Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly

Semaphore

信号量(Semaphore)用来控制对共享资源的访问数量,例如池的最大连接数。

代码示例:

import multiprocessing
import time

def worker(s, i):
    s.acquire()
    print(multiprocessing.current_process().name + "acquire");
    time.sleep(i)
    print(multiprocessing.current_process().name + "release\n");
    s.release()

if __name__ == "__main__":
    s = multiprocessing.Semaphore(2)
    for i in range(5):
        p = multiprocessing.Process(target = worker, args=(s, i*2))
        p.start()

输出结果:

Process-1acquire
Process-1release

Process-2acquire
Process-3acquire
Process-2release

Process-4acquire
Process-3release

Process-5acquire
Process-4release

Process-5release

发表评论

电子邮件地址不会被公开。 必填项已用*标注