Python:多进程

/ Python / 没有评论 / 1949浏览

Python中使用多进程主要方式有两种:一种方法是使用os模块中的fork方法,另外一种是multiprocessing模块。区别在于前者仅仅适用于Unix/Linux操作系统,后者跨平台方式。

  1. 使用os模块中的fork方式 普通的方法是调用一次,返回一次,而fork方法是调用一次,返回两次,原因在于操作系统将当前进程(父进程)复制出一份进程(子进程),这个两个进程几乎是相同的,于是fork分别从父进程和子进程中返回。子进程中永远返回0,fujinchn父进程返回的是子进程的ID。
import os
if __name__=='__main__':
    print('Current Process (%s) start ...' % os.getpid())
    pid = os.fork()
    if pid< 0:
        print('error in fork')
    elif pid == 0:
        print('I am child process (%s) and my parent process is (%s)', os.getpid(), os.getppid())
    else:
        print('I(%s) created a child process (%s)', os.getpid(),pid)

###windows下无法运行该代码。

  1. 使用multiprocessing模块 使用Process类描述一个进程对象。创建子进程的时候传一个执行函数和相应的参数,用start()启动一个进程,用join()方法实现进程间的同步。
  import os
  from multiprocessing import Process

  def run_proc(name):
      print('Child process %s (%s) Running ' % (name,   os.getpid()))

  if __name__=='__main__':
      print('Parent process %s.' % os.getpid())
      for i in range(5):
          p = Process(target=run_proc, args=(str(i),))
          print('Process wills start')
          p.start()
      p.join()
      print('Process end')

multiprocessing模块提供了一个Pool类来代表进程池对象

  from multiprocessing import Pool
  import os, time, random

  def run_task(name):
    print ('Task %s (pid = %s) is running...' % (name, os.getpid()))
    time.sleep(random.random() * 3)
    print ('Task %s end.' % name)

  if __name__ == '__main__':
    print ('Current process %s.' % os.getpid())
    p = Pool(processes=3)
    for i in range(5):
        p.apply_async(run_task, args=(i,))
    print ('Waiting for all subprocess done...')
    p.close()
    p.join()
    print ('All subprocess done')
    D:\SoftWare\Python3.6\python.exeE:/Python/test20.py
    Current process 6608.
    Waiting for all subprocess done...
    Task 0 (pid = 6868) is running...
    Task 1 (pid = 8216) is running...
    Task 2 (pid = 1496) is running...
    Task 0 end.
    Task 3 (pid = 6868) is running...
    Task 2 end.
    Task 4 (pid = 1496) is running...
    Task 3 end.
    Task 1 end.
    Task 4 end.
    All subprocess done
创建了容器为3的进程池,添加了5个任务,但是一开始只运行了三个,而且每次只能运行三个。新的任务依次添加进来,使用的进程还是原来的进程,通过pid看出来。

###Pool对象调用join()方法会等待所有子进程执行完毕,调用join()之前必须先调用close(),调用close()之后就不能继续添加新的Process。

##多进程之间的通信 这里用到的是Queue和Pipe两种方式进行通信。两者的区别就是Pipe经常用来两个进程间的通信,Queue用来在多个进程间实现通信。 1.Queue通信:用get和put进行操作Queue. Put方法用插入数据到队列中,有两个可选参数:blocked和timeout,如果blocked为True(默认值),并且timeout为正值,该方法为阻塞timeout指定的时间,直到队列有剩余时间。如果超时,会抛出Queue.Full的异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。 Get方法可以从队列读取并且删除一个元素。同样,Get方法有两个可选参数:blocked和timeout。如果blocked为True,并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,分两种情况:如果Queue有一个值可用,则立即返回该值;否则,如果队列为空,则立即抛出Queue.Empty异常。

Queue:

from multiprocessing import Process, Queue
import os, time, random

#写数据进程执行的代码:
def proc_write(q, urls):
    print('Process(%s) is writing...' % os.getpid())
    for url in urls:
        q.put(url)
        print('Put %s to queue...' % url)
        time.sleep(random.random())

#读数据进程执行的代码:
def proc_read(q):
    print('Process(%s) is reading...' % os.getpid())
    while True:
        url = q.get(True)
        print('Get %s from queue' % url)

if __name__=='__main__':
    # 父进程创建Queue,并传给各个子进程
    q = Queue()
    proc_writer1 = Process(target=proc_write, args=(q, ['url_1', 'url_2', 'url_3']))
    proc_writer2 = Process(target=proc_write, args=(q, ['url_4', 'url_5', 'url_6']))

    proc_reader = Process(target=proc_read, args=(q,))
    #启动子进程proc_writer,写入
    proc_writer1.start()
    proc_writer2.start()
    #启动子进程proc_reader,读取
    proc_reader.start()
    #等待proc_writer结束
    proc_writer1.join()
    proc_writer2.join()
    #proc_reader是死循环,只能强行停止程序运行
    proc_reader.terminate()

Pipe: Pipe有一个参数,为True(默认值),这个管道就是全双工,都能收和发,若为False,一头发,一头收。

import multiprocessing
import os, time, random

def proc_send(pipe, urls):
    for url in urls:
      print('Process(%s) send: %s' % (os.getpid(), url))
      pipe.send(url)
      time.sleep(random.random())

def proc_recv(pipe):
    while True:
        print('Process(%s) rev:%s' % (os.getpid(), pipe.recv()))
        time.sleep(random.random())

if __name__=='__main__':
    pipe = multiprocessing.Pipe()
    p1 = multiprocessing.Process(target=proc_send, args=(pipe[0], ['url_'+str(i) for i in range(10)]))
    p2 = multiprocessing.Process(target=proc_recv, args=(pipe[1],))
    p1.start()
    p2.start()
    p1.join()
    p2.join()