您好,欢迎来到三六零分类信息网!老站,搜索引擎当天收录,欢迎发信息

Python自定义进程池实例分析【生产者、消费者模型问题】

2026/1/2 16:06:20发布22次查看
本文实例分析了python自定义进程池。分享给大家供大家参考,具体如下:
代码说明一切:
#encoding=utf-8 #author: walker #date: 2014-05-21 #function: 自定义进程池遍历目录下文件 from multiprocessing import process, queue, lock import time, os #消费者 class consumer(process): def __init__(self, queue, iolock): super(consumer, self).__init__() self.queue = queue self.iolock = iolock def run(self): while true: task = self.queue.get() #队列中无任务时,会阻塞进程 if isinstance(task, str) and task == 'quit': break; time.sleep(1) #假定任务处理需要1秒钟 self.iolock.acquire() print( str(os.getpid()) + ' ' + task) self.iolock.release() self.iolock.acquire() print 'bye-bye' self.iolock.release() #生产者 def producer(): queue = queue() #这个队列是进程/线程安全的 iolock = lock() subnum = 4 #子进程数量 workers = build_worker_pool(queue, iolock, subnum) start_time = time.time() for parent, dirnames, filenames in os.walk(r'd:\test'): for filename in filenames: queue.put(filename) iolock.acquire() print('qsize:' + str(queue.qsize())) iolock.release() while queue.qsize() > subnum * 10: #控制队列中任务数量 time.sleep(1) for worker in workers: queue.put('quit') for worker in workers: worker.join() iolock.acquire() print('done! time taken: {}'.format(time.time() - start_time)) iolock.release() #创建进程池 def build_worker_pool(queue, iolock, size): workers = [] for _ in range(size): worker = consumer(queue, iolock) worker.start() workers.append(worker) return workers if __name__ == '__main__': producer()
ps:
self.iolock.acquire() ... self.iolock.release()
可用:
with self.iolock: ...
替代。
再来一个好玩的例子:
#encoding=utf-8 #author: walker #date: 2016-01-06 #function: 一个多进程的好玩例子 import os, sys, time from multiprocessing import pool cur_dir_fullpath = os.path.dirname(os.path.abspath(__file__)) g_list = ['a'] #修改全局变量g_list def modifydict_1(): global g_list g_list.append('b') #修改全局变量g_list def modifydict_2(): global g_list g_list.append('c') #处理一个 def procone(num): print('procone ' + str(num) + ', g_list:' + repr(g_list)) #处理所有 def procall(): pool = pool(processes = 4) for i in range(1, 20): #procone(i) #pool.apply(procone, (i,)) pool.apply_async(procone, (i,)) pool.close() pool.join() modifydict_1() #修改全局变量g_list if __name__ == '__main__': modifydict_2() #修改全局变量g_list print('in main g_list :' + repr(g_list)) procall()
windows7 下运行的结果:
λ python3 demo.py in main g_list :['a', 'b', 'c'] procone 1, g_list:['a', 'b'] procone 2, g_list:['a', 'b'] procone 3, g_list:['a', 'b'] procone 4, g_list:['a', 'b'] procone 5, g_list:['a', 'b'] procone 6, g_list:['a', 'b'] procone 7, g_list:['a', 'b'] procone 8, g_list:['a', 'b'] procone 9, g_list:['a', 'b'] procone 10, g_list:['a', 'b'] procone 11, g_list:['a', 'b'] procone 12, g_list:['a', 'b'] procone 13, g_list:['a', 'b'] procone 14, g_list:['a', 'b'] procone 15, g_list:['a', 'b'] procone 16, g_list:['a', 'b'] procone 17, g_list:['a', 'b'] procone 18, g_list:['a', 'b'] procone 19, g_list:['a', 'b']
ubuntu 14.04下运行的结果:
in main g_list :['a', 'b', 'c'] procone 1, g_list:['a', 'b', 'c'] procone 2, g_list:['a', 'b', 'c'] procone 3, g_list:['a', 'b', 'c'] procone 5, g_list:['a', 'b', 'c'] procone 4, g_list:['a', 'b', 'c'] procone 8, g_list:['a', 'b', 'c'] procone 9, g_list:['a', 'b', 'c'] procone 7, g_list:['a', 'b', 'c'] procone 11, g_list:['a', 'b', 'c'] procone 6, g_list:['a', 'b', 'c'] procone 12, g_list:['a', 'b', 'c'] procone 13, g_list:['a', 'b', 'c'] procone 10, g_list:['a', 'b', 'c'] procone 14, g_list:['a', 'b', 'c'] procone 15, g_list:['a', 'b', 'c'] procone 16, g_list:['a', 'b', 'c'] procone 17, g_list:['a', 'b', 'c'] procone 18, g_list:['a', 'b', 'c'] procone 19, g_list:['a', 'b', 'c']
可以看见windows7下第二次修改没有成功,而ubuntu下修改成功了。据uliweb作者limodou讲,原因是windows下是充重启实现的子进程;linux下是fork实现的。
更多python自定义进程池实例分析【生产者、消费者模型问题】。
该用户其它信息

VIP推荐

免费发布信息,免费发布B2B信息网站平台 - 三六零分类信息网 沪ICP备09012988号-2
企业名录 Product