基础概念
这一节也要搞懂相应的知识点,对于后面的运用很关键,这一节尤其难啃的,但是如果这节没有啃下来,后面的就更不用说了,会更难的,所以慢慢来,搞懂,概念梳理清楚,着急也是没有用的。
现在的操作系统都是支持“多任务”的操作系统
多任务:概念清楚就行(操作系统可以同时运行多个任务)
多核和单核CPU: 都可以执行多任务
单核CPU如何执行多任务:操作系统轮流让各个任务交替执行,任务1执行0.01秒,切换到任务2,任务2执行0.01秒,再切换到任务3,执行0.01秒……这样反复执行下去。表面上看,每个任务都是交替执行的,但是,由于CPU的执行速度实在是太快了,我们感觉就像所有任务都在同时执行一样。(后面编写实际的代码的时候才会慢慢感觉到)
一个比较恰当的比喻:
- 一个工厂,至少有一个车间,一个车间中至少有一个工人,最终是工人在工作
- 一个程序,至少有一个进程,一个进程中至少有一个线程,最终是线程在工作
前面的编写的python程序,都是执行单任务的进程,也就只有一个线程,如果要同时执行多个任务的解决方案如下: - 启动多个进程,虽然每个进程只有一个线程,但是多个进程可以一块执行多个任务(多进程模式)
- 启动一个进程,在一个进程内启动多个线程,这样多个线程可以 一块执行多个任务(多线程模式)
- 启动多个进程,每个进程启动多个线程,执行的任务更多,模型更加的复杂(多进程+多线程模式)[要想提高产能,可以提高增加车间的数量或者增加工人的数量,开发程序也是这样的一个思路。]
小结:
- 线程是最小的执行单元,进程又至少一个线程组成,如何调度进程和线程,完全由操作系统决定,程序自己不能决定什么时候执行,执行多长时间。
- 通过进程比通过线程资源消耗要多。
- 多进程和多线程都涉及同步、数据共享的问题,编写起来相对更复杂。
多进程(multiprocessing)
把上面的例子做一个修改:
- 一个工厂,创建三个车间,每个车间一个工人(共三人),并行处理任务
- 一个程序,创建三个进程,每个进程一个线程(共三个线程),并行处理任务
Unix/Linux操作系统提供了一个fork()系统调用,普通函数的调用,调用一次,返回一次,fork()调用一次,返回两次,操作系统自动把当前进程(父进程)复制了一份(子进程),然后分别在父进程和子进程内返回
子进程永远返回0,父进程返回子进程的ID(一个父进程可以frok出很多子进程,父进程要记下每个子进程的ID,而子进程只需要调用getppid()就可以拿到父进程的ID)
Python的os模块封装了常见的系统调用,其中就包括fork,可以在Python程序中轻松创建子进程:
import os
print('Process (%s) start...' % os.getpid())
# Only works on Unix/Linux/Mac:
pid = os.fork()
if pid ==0:
print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()))
else:
print('I (%s) just created a child process (%s).' % (os.getpid(), pid))
不过在windows上无法运行,有了fork调用,一个进程在接到新的任务的时候就可以复制出一个子进程来出来处理新任务,常见的Apache服务器就是由父进程监听端口,每当有新的http请求的时候,就fork出子进程来处理新的http请求。
为了让Python也能在windows上运行,用multiprocessing模块就是跨平台版本的多进程模块
multiprocessing模块提供了一个process类来代表一个进程对象,下面的是演示启动一个启动子进程并等待其结束的过程。
from multiprocessing import Process
import os
# 子进程要执行的代码
def run_proc(name):
print('Run child process %s (%s)...' % (name, os.getpid()))
# 多进程的编写需要写在main函数里面
# 原因:在python的底层,对于不同的操作系统创建进程的时候,内部机制不一样
# window是用spawn的模式创建进程就会报错
# 对于Linux是基于fork就不需要放在if__name__=="__main__"里面
if __name__=='__main__':
print('Parent process %s.' % os.getpid())
# 创建一个Process实例(在进程中还会创建一个线程,线程是工作的最小单元)
p=Process(target = run_proc,args=('test',))
print('Child process will start.')
# 用start()方法启动
p.start()
# join()方法可以等待进程结束后再继续往下运行,通常用于进程间的同步
p.join()
print('Child process end.')
如果要启动大量的子进程,可以用进程池的方式批量创建子进程:
from multiprocessing import Pool
import os,time,random
def long_time_task(name):
print('Run task %s (%s)...' % (name,os.getpid()))
start = time.time()
time.sleep(random.random() *3)
end= time.time()
print('Task %s runs %0.2f seconds.' % (name,(end-start)))
if __name__=='__main__':
print('Parent process %s.' % os.getpid())
p=Pool(4)
for i in range(5):
p.apply_async(long_time_task,args=(i,))
print('Waiting for all subprocesses done...')
p.close()
p.join()
print('All subprocesses done.')
# 执行结果如下:
Parent process 2508.
Waiting for all subprocesses done...
Run task 0 (2548)...
Run task 1 (3788)...
Run task 2 (6204)...
Run task 3 (3704)...
Task 3 runs 0.50 seconds.
Run task 4 (3704)...
Task 2 runs 0.92 seconds.
Task 1 runs 0.98 seconds.
Task 4 runs 0.54 seconds.
Task 0 runs 1.94 seconds.
All subprocesses done.
子进程
很多时候,子进程并不是自身,而是一个外部进程,我们创建了子进程后,还需要控制子进程的输入和输出,subprocess模块可以让我们非常方便启动一个子进程,然后控制其输入于输出。
演示如**何在python中运行命令nslookup www.python.org **(感觉知识点又卡在这里了,没有理解其中的内涵)
import subprocess
print('$ nslookup www.python.org')
r=subprocess.call(['nslookup','www.python.org'])
print('Exit code:',r)
# 运行结果如下:
$ nslookup www.python.org
Server: public1.alidns.com
Address: 223.5.5.5
Non-authoritative answer:
Name: dualstack.python.map.fastly.net
Addresses: 2a04:4e42:8c::223
151.101.76.223
Aliases: www.python.org
多线程
基于多线程对上述串行示例就行优化:(这种类比的解释让概念十分的清晰)
- 一个工厂,创建一个车间,这个车间中创建3个工人,并进行处理任务
- 一个程序,创建一个进程,这个进程中创建3个线程,并进行处理任务
线程是操作系统直接支持的执行单元,因此高级语言都内置多线程的支持,python的线程是真正的Posix Thread ,不是模拟出来的线程,Python的标准库提供了两个模块:**_thread和threading**(对_thread进行了封装),分别是低级模块和高级模块,大部分情况用高级模块就行。
启动一个线程就是把一个函数传入并创建Thread实例,然后调用strat()开始执行(后面看代码的时候就发现这只要理解清楚了也就不难理解了)
# 代码来自廖雪峰的官方网站
# 导入threading模块
import time,threading
# 新线程执行的代码
def loop():
print('thread %s is running...' % threading.current_thread().name)
n=0
while n<5:
n=n+1
print('thread %s >>>%s' % (threading.current_thread().name, n))
time.sleep(1)
# 子线程结束
print('thread %s ended.' % threading.current_thread().name)
# 此处会默认启动一个主线程,用current_thread()返回当前线程的实例:MainThread
print('thread %s is running...' % threading.current_thread().name)
# 调用threading模块下的Thread类,创建一个Thread对象(实例化:线程),并封装线程被CPU调度时应该执行的任务和相关参数
# 此处用LoopThread命名子线程(人为的一个命名,打印显示,没有其他的意义)
t=threading.Thread(target=loop,name='LoopThread')
# 线程准备就绪(等待CPU调度,具体的时间由CPU来决定),代码继续向下执行
t.start()
# 等待当前线程的任务执行完毕后再向下继续执行其他线程或主线程
t.join()
# 主线程结束
print('thread %s ended.' % threading.current_thread().name)
# 执行结果如下:
thread MainThread is running...
thread LoopThread is running...
thread LoopThread >>>1
thread LoopThread >>>2
thread LoopThread >>>3
thread LoopThread >>>4
thread LoopThread >>>5
thread LoopThread ended.
thread MainThread ended.
需要特别梳理的地方:
任何进程默认会启动一个线程,该线程是主线程,主线程又可以启动新的线程,Python的threading模块有一个current_thread()函数,永远返回当前线程的实例,主线程实例的名字叫MainThread,子线程的名字在创建的时候指定,上面代码中用的LoopThread命名的子线程
由主线程从上到下去执行代码,当发现要创建一个线程的时候,线程在当前进程中会创建一个子线程去执行。主线程执行完所有代码,此时不结束,等待子线程,当子线程结束后,主线程运行完毕,整个程序才会运行结束。(通过运行上面的代码自己也能够弄清楚上面的一些过程,文字+代码能更好的理解)
守护线程
t.setDaemon(布尔值)
- t.setDaemon(True):设置为守护线程,主线程执行完毕,子线程也自动关闭
- t.setDaemon(False):设置为非守护线程,主线程等待子线程,子线程执行完毕后,主线程才结束(程序如果不修改,默认的就是非守护线程)
因为默认的就是非守护线程。这里为了学习更深入,用守护线程的例子来理解,这里用setDaemon(True)把所有的子线程都变成了主线程的守护线程,主线程结束后,整个程序就退出了(子线程随着主线程的结束而结束)
import time,threading
def run(n):
print("task",n)
time.sleep(1)
print('3')
time.sleep(1)
print('2')
time.sleep(1)
print('1')
for i in range(3):
t=threading.Thread(target=run,args=("t-%s" % i,))
# 把子进程设置为守护线程,必须在start()之前设置
t.setDaemon(True)
t.start()
time.sleep(0.5) # 主线程停0.5s
print(threading.active_count()) # 输出活跃的线程数
# 运行结果
task t-0
task t-1
task t-2
4
线程名称的设置和获取
待整理
自定义线程类,直接将线程需要做的事写到run方法中
待整理
GIL锁(Global Interpreter Lock):
如果是非Python环境,在单核情况下,同时只能有一个任务执行,多核的时候可以支持多个线程同时执行。但在python中,无论有多少核都只能执行一个线程。(和下面的解释是一样的)
以自己的电脑为例子
电脑的CPU是6核的,现在写一个死循环,如果是其他的语言,会将所有的CPU进程占据满,但是通过自己发现也验证了,cpu的占用率大约为16%(也就是只有一个线程被调用)
# 查看电脑是几核的(用代码的语言来查看)
import threading, multiprocessing
multiprocessing.cpu_count()
# 运行结果
12(双核CPU:用两个CPU去处理同一个数据)
# 用python写了一个死循环
import threading, multiprocessing
def loop():
x = 0
while True:
x = x ^ 1
for i in range(multiprocessing.cpu_count()):
t = threading.Thread(target=loop)
t.start()
# 通过观察任务管理器的占用率16%
也即全局解释器,是Cpython(通过调用c语言的原生线程来实现)解释器特有的一个玩意,让进程中同一时刻只能有一个线程可以被CPU 调用如果想利用计算机的多核优势,让CPU同时处理一些任务,适合多进程开发(即使资源开销大),GIL锁就锁不住计算机的并发能力。GIL就像一个”通行证”,在一个python进程中只有一个GIL,拿不到通行证的线程就不允许进入CPU执行
梳理多线程的工作过程
- 拿到公共数据
- 申请gil
- python解释器调用os原生线程
- os操作CPU执行运算
- 当该线程执行时间到后,无论运算是否已经执行完毕,gil都被要求释放
- 由其他进程重复上面的过程
- 等其他进程执行完后,又会切换之前的进程(从他记录的上下文继续执行),整个过程是每个线程执行自己的运算,当执行时间到就进行相应的切换(context switch)
程序不利用计算机的多核优势,适合多线程开发
所以什么时候利用多核优势进行多进程开发,什么时候不利用多核优势,进行多线程开发,两者都可以进行并发(终于理解了这样的一个过过程) - 计算密集型,用多进程,例如大量的数据计算[累加]
- IO密集型,用多线程,例如文件的读写,用网路数据传输[下载抖音视频示例]
GIL在不同版本之间的差异
只罗列最新的版本:GI不使用ticks,改为使用计时器(执行时间达到阈值后,当前线程释放GIL),对CPU密集型程序比较的友好(提升了一定的计算效率),但是无法解决GIL导致的同一时间只能执行一个线程的问题(最原始的问题)。
多线程开发
线程锁(LOCK)
有关其他锁的知识可以参考:https://www.cnblogs.com/whatisfantasy/p/6440585.html
多线程和多进程最大的不同:多进程中,同一变量,各自有一份拷贝存在每个进程中,互不影响,多线程中,所有变量都由所有线程共享,任何一个变量都可以被任何一个线程修改,存在多个线程同时改一个变量的风险(通过看视频还有当前的例子,自己对这个概念是已经理解了的),也就是存在线程安全的问题
如何避免这样的一种情况了?通过给线程上锁,锁的好处就是:确保了某段关键代码只能由一个线程从头到尾完整的执行阻止了多线程并发执行,只能以单线程模式执行,效率大大的降低了(一个线程执行完毕再让另外的一个线程去执行)。
下面的这个例子就很好的阐释了如果不加锁的话会存在线程的安全问题,将 run_thread()里面加入锁机制,当该线程获得锁,其他线程不能同时执行change_it(),只能等待,直到锁被释放后,获得该锁以后才能改。
import time, threading
# 假定这是你的银行存款(此处是一个共享变量)
balance = 0
# 创建一个锁就是通过threading.Lock()来实现
lock = threading.Lock()
def change_it(n):
# 先存后取,结果应该为0:
global balance
balance = balance + n
balance = balance - n
# def run_thread(n):
# for i in range(2000000):
# change_it(n)
def run_thread(n):
for i in range(2000000):
# 为了确保balance的计算正确,给change_it()上一把锁
# 先要获取锁
lock.acquire()
try:
# 当t1线程开始执行的时候,其他线程不能同时执行change_it()
change_it(n)
finally:
# 改完一定要释放锁,不然会成为死线程
lock.release()
t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
# 先后启动了两个线程
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)
# 运行结果
0
结合廖神的资料作最后的总结:
- GIL是python解释器的遗留问题,要正真利用多核,除非重写一个不带GIL的解释器,
- 可以使用多线程,却不要指望有效利用多核,如果一定要用的话,只能通过C扩展来实现;
- 虽然不能利用多线程实现多核任务,但是可以通过多进程实现多核任务,多个进程有各自独立的GIL锁,互不影响。
- 多线程编程模型复杂,容易发生冲突,必须用锁加以隔离,同时要小心死锁发生
ThreadLocal
这一节的内容还要反复的看,不断地找别的资料完善自己地理解和相应地知识结构体系。(不是那么好懂的,结合前后知识点和相关代码辅助理解。)
引出的原因:每个线程都有自己的数据,一个线程使用自己的局部变量比使用全局变量好(局部变量只有线程自己能看到,不会影响其他线程,全局变量的修改必须加锁[这个也是上一节的时候重点讨论过的],局部变量也存在问题:函数调用的时候,传递起来很麻烦)
用一个具体的例子来说明存在的问题:每个函数一层一层调用传参会十分的麻烦,用全局变量也不行(每个线程处理不同的Student对象,不能共享)[这里的话反复的理解,没有一定的积累是很难理解的]
def process_student(name):
std=student(name)
# std是局部变量,但是每个函数都要用它,需要传进去
do_task_1(std)
do_task_2(std)
def do_task_1(std):
do_subtask_1(std)
do_subtask_2(std)
def do_task_2(std):
do_subtask_2(std)
do_subtask_2(std)
想办法解决上述的问题:用一个全局的dict存放所有的Student对象,然后以thread自身作为key获得线程对应的Student对象(跟着廖神的思路慢慢走)
# 创建一个全局的字典
global_dict={}
def std_thread(name):
std=Student(name)
# 把std放到全局变量global_dict中:
gloabal_dict[threading.current_thread()] = std
do_task_1(std)
do_task_2(std)
def do_task_1(std):
# 不传入std,而是根据当前线程查找:
std = global_dict[threading.current_thread()]
...
def do_task_2(std):
# 任何函数都可以查找出当前线程的std变量
std = global_dict[threading.current_thread()]
...
解决了:std对象在每层函数中的传递问题
不足之处:每个函数获取std的代码有点丑
….
跟着廖神的思路继续走,查看有没有更加简单的方式?
这里引出了ThreadLocal,不用查找dict,用ThreadLocal帮你自动做这件事
…思路是符合层层推进的过程的,所以自己一时没有理解也是没有关系的,要注意看懂,也是这三段中唯一可以运行的代码
# 导入threading模块
import threading
# 创建全局ThreadLocal对象:
# 每个Thread对它都可以读写student属性
# local_school是全局变量,local_school.student都是线程的局部变量
# 任意读写并且可以互不干扰,不用管理锁的问题,ThreadLocal内部会处理
local_school = threading.local()
def process_stuent():
# 获取当前线程关联的student:(也也即线程的局部变量)
std=local_school.student
print("Hello, %s (in %s)" % (std,threading.current_thread().name))
def process_thread(name):
# 绑定ThreadLocal的student:
loacl_student.student = name
process_student()
t1= threading.Thread(target = process_thread,args=('Alice', ),name='Thread-A')
t2= threading.Thread(target = process_thread,args=('Bob', ) , name='Thread-B')
t1.start()
t2.start()
t1.join()
t2.join()
# 运行结果
Hello, Alice (in Thread-A)
Hello, Bob (in Thread-B)
小结:(说实话,有点不好理解确实虽然是层层推进的这样的一个过程)
- 全局变量local_school是一个dict,不但可以用loacl_school.student,还可以绑定其他变量(local_school.teacher)另类的解释:threadlocal让一个变量在自己的线程内相当于是全局变量(所有函数都能访问),在不同的线程间相当于是局部变量(其他线程无法访问)
- ThreadLocal最常用的可以为每个线程绑定一个数据库连接,HTTP请求,用户身份信息等,一个线程的所有调用到的处理函数都可以方便的访问这些资源。(这点暂时无法理解)
- 一个ThreadLocal变量虽然是全局变量,但是每个线程都只能读写自己线程的独立副本,互不干扰,ThreadLocal解决了参数在一个线程中各个函数之间互相传递的问题。
- 附加一个参考链接:https://www.cnblogs.com/whatisfantasy/p/6440585.html
进程 vs 线程
主要是了讨论两种方式的优缺点:
要实现多任务,通常会设计Master-Worker模式,Master负责分配任务,Worker负责执行任务,在多任务的环境下,一个是Master,一个是Worker
- 用多进程实现Master-Worker,主进程就是Master,其他进程就是Worker
- 用多线程事项Master-Worker,主线程就是Master,其他线程就是Worker
多进程模式:
优点:稳定性高,一个子进程崩溃,不会影响主进程和其他的子进程(主进程Master只负责分配任务,挂掉的概率很低)
缺点:创建进程代价大,比如在Windows,操作系统同时运行的进程数是有限的,如果同时有几千个进程同时运行,在内存和CPU的限制下,操作系统的调度会成问题。
多线程模式:
优点:相对快一点
缺点:任何一个线程挂掉都有可能直接造成整个进程崩溃(所有线程共享进程的内存):结合自己平和的一些经验来理解就可以了
线程切换
操作系统在切换进程或者线程时也是一样的,它需要先保存当前执行的现场环境(CPU寄存器状态、内存页等),然后,把新任务的执行环境准备好(恢复上次的寄存器状态,切换内存页等),才能开始执行。这个切换过程虽然很快,但是也需要耗费时间。如果有几千个任务同时进行,操作系统可能就主要忙着切换任务,根本没有多少时间去执行任务了,这种情况最常见的就是硬盘狂响,点窗口无反应,系统处于假死状态。
导致的结果:一但多任务到达一个限度:会消耗系统所有的资源,效率急剧下降,所有的任务都无法做好
计算密集型 vs IO密集型
是否采用多任务的第二个考虑是任务的类型。我们可以把任务分为计算密集型和IO密集型。(结合这里的理解对两种任务类型有了更加深刻的认识与理解)
计算密集型任务:要进行大量的计算,消耗CPU资源,比如计算圆周率、对视频进行高清解码等等,全靠CPU的运算能力。这种计算密集型任务虽然也可以用多任务完成,但是任务越多,花在任务切换的时间就越多,CPU执行任务的效率就越低,所以,要最高效地利用CPU(多线程切换需要消耗资源),计算密集型任务同时进行的数量应当等于CPU的核心数。计算密集型任务由于主要消耗CPU资源,因此,代码运行效率至关重要。Python这样的脚本语言运行效率很低,完全不适合计算密集型任务。对于计算密集型任务,最好用C语言编写。
第二种任务的类型是IO密集型:涉及到文件处理、网络爬虫、磁盘IO的任务都是IO密集型任务,这类任务的特点是CPU消耗很少,任务的大部分时间都在等待IO操作完成(因为IO的速度远远低于CPU和内存的速度,开启多个线程,自动进行切换可以不浪费CPU的资源,提高程序的执行效率)。对于IO密集型任务,任务越多,CPU效率越高,但也有一个限度。常见的大部分任务都是IO密集型任务,比如Web应用。IO密集型任务执行期间,99%的时间都花在IO上,花在CPU上的时间很少,因此,用运行速度极快的C语言替换用Python这样运行速度极低的脚本语言,完全无法提升运行效率。对于IO密集型任务,最合适的语言就是开发效率最高(代码量最少)的语言,脚本语言是首选,C语言最差。
分步式进程
在Thread(进程)和Process(线程)中,应当优先选Process,Posses更稳定,Prossess可以分布到多台机器上,但是Thread最多只能分布到同一台机器的多个CPU上
multiprocessing模块不但支持多进程,其中的mangers子模块支持把多进程分步到多台机器上,一个服务器进程可以作为调度者,将任务分布到其他多个进程中,依靠网络通信,managers模块封装很好,不必了解网络通信的细节,就可以很容易地编写分布式多进程程序
…理解廖神的教程真的需要很大的精力去理解(但是还是需要努力去啃)
有一个通过Queue通信的多进程程序在同一机器上运行,现在想改进:因为处理进程的任务繁重,希望把发送任务的进程和处理任务的进程分布到两台机器上,如何用分布式来实现?
补充知识:为什么这里会提到queue?在python中多个线程的数据是共享的,多个线程进行数据交换的时候,不能保证数据的安全性和一致性。当多个线程需要进行数据交换的时候,队列可以解决线程间的数据交换(可以保证线程间数据的安全性和一致性)
解决方法:原有的Queue继续用,通过mangers模块把Queue通过网络暴露出去,就可以让其他机器的进程访问Queue了,先看服务进程,服务进程负责启动Queue,把Queue注册到网络上,然后往Queue里面写入任务。
参考资料:https://blog.csdn.net/u011318077/article/details/88094583
# 创建一个分布式进程,用来完成10次乘法任务
# 服务器在win系统和在Linux上有所不同
# 创建一个分布式进程:包括服务器进程和任务进程
# 多个进程之间的通信使用queue
# 该代码为服务进程
# 运行时先运行服务进程,再运行任务进程
# 执行任务循序:
# 服务进程和任务进程都创建了相同的两个队列,一个用来放任务,一个用来放结果
# 第一步:服务器进程运行,例:将数字2放进任务队列,任务进程从任务队列中取出数字2
# 第二步:取出数字,执行任务,即2*2=4,任务执行完后,放入结果队列中
# 第三步:服务进程从结果队列中,取出结果
# 第四步:所有任务执行完毕,所有结果都已经取出,最终任务队列和结果队列都是空的了
import random,queue # 导入随机 时间 队列模块
from multiprocessing.managers import BaseManager # 导入多进程管理
from multiprocessing import freeze_support # window防出错
# 第一步:定义两个Queue队列,一个用于发送任务,一个接收结果
# 发送任务的队列(实例化队列为任务队列)
task_queue = queue.Queue()
# 接收结果的队列(实例化队列为结果队列)
result_queue = queue.Queue()
# 从BaseManger继承的QueueManger:(定义QueueManger继承BaseManager,用于后面创建管理器)
class QueueManager(BaseManager):
pass
# 定义两个函数,返回结果就是Queue队列
# win下queuemanger注册到网络关联队列不能用lambda,自定义一个函数用于关联
def return_task_queue():
global task_queue # 定义成全局变量
return task_queue # 饭返回发送任务的队列
# win下queuemanger注册到网络关联队列不能用lambda,自定义一个函数用于关联
def return_result_queue():
global result_queue
return result_queue # 返回接收结果的队列
# 第二步:把上面创建的两个队列注册在网络上,利用register方法
# callable参数关联了Queue对象,将Queue对象在网络中暴露出来
# 第一个参数是注册在网络上队列的名称
def test():
QueueManager.register('get_task_queue', callable=return_task_queue)
QueueManager.register('get_result_queue', callable=return_result_queue)
# 第三步:绑定端口8001,设置验证口令,这个相当于对象的初始化
# 绑定端口并填写验证口令,window下需要填写IP地址,Linux下默认为本地,地址为空
manager= QueueManager(address=('127.0.0.1', 8001), authkey=b'abc') # 口令必须斜撑类似b'abc'形式
# 第四步:启动管理器,启动Queue队列,监听信息通道
manager.start()
# 第五步:通过管理实例的方法获访问网络中的Queue对象
task = manager.get_task_queue()
result = manager.get_result_queue()
# 第六步:添加任务,获得返回的结果
# 将任务放到Queue队列中
for i in range(10):
n=random.randint(0,10) # 返回0~10之间的随机数
print('put task %d...' % n)
task.put(n) # 将n放入到任务队列中
# 从结果队列中取出结果
print('Try get results...')
for i in range(11):
# 总共循环10次,上面放了10个数字作为任务
# 加载一个异常捕获
try:
r=result.get(timeout=5) # 每次等待5s,取结果队列中的值
print('Result:%s' % r)
except queue.Empty:
print('result queue is empty.')
# 最后一定要关闭服务,不然会报错
manager.shutdown()
print('master exit.')
if __name__ == '__main__':
freeze_support()
print("Start!")
test()
然后是任务进程