流体 Python (XVI) - 并发程序
最编程
2024-03-02 16:34:32
...
一、核心要义
1. 生成器作为协程使用时的行为状态
2.使用装饰器自动预激协程
3.调用方使用close和throw方法控制协程
4.协程终止时如何返回值
5.yield from新句法的用途和语义
6.案例:出租车队运营仿真
二、代码示例
1、协程入门案例
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2024/3/2 9:07
# @Author : Maple
# @File : 01-协程入门案例.py
# @Software: PyCharm
import inspect
"""
协程有4种状态
1.'GEN_CREATED': 等待开始执行
2.'GEN_RUNNINF': 解释器正在执行
3.'GEN_SUSPENDED': 在yield表达式处暂停
4.'GEN_CLOSED': 执行结束
注意:只有当协程处于状态3的时候,才可以send值.而刚创建的协程,没有预激活(即没有执行next方法)
,此时处于状态1,如果给协程send值,会报错
"""
def simple_coro(a):
print('-->start: a = ',a)
b = yield a
print('-->Received:b = ',b)
c = yield a + b
print('-->Received:c = ', c)
if __name__ == '__main__':
# 1.实例化生成器函数
s = simple_coro(2)
## 查看此时协程的状态: 创建状态
print(inspect.getgeneratorstate(s)) # GEN_CREATED
# 2.调用next,预激活协程,此时协程会停止于第一个yield a处(等待外部调用函数发送send,往下推进执行),并且将a值返回给外部
a = next(s)
print(a)
## 查看此时协程的状态: 阻塞状态(等待外部通过send发送值)
print(inspect.getgeneratorstate(s)) # GEN_SUSPENDED
# 3.通过send给s发送值,重新启动协程,并且外部的值会赋给b,最终再次阻塞在下一个yield处(yield a + b)
#并将外部 a + b的值返回给调用方【此时协程又会处于阻塞状态,等待外部发送send值】
c = s.send(10)
print(c) # 12(备注:2 + 10)
## ## 查看此时协程的状态: 阻塞状态(等待外部通过send发送值)
print(inspect.getgeneratorstate(s)) # GEN_SUSPENDED
# 4.再次通过send给s发送值,重新启动协程,并且外部的值会赋值给c,输出print(-->Received:c = xx)
# 由于协程后面没有yield了,所以最后协程终止,导致[生成器]抛出StopIteration异常
"""
Traceback (most recent call last):
File "D:/01-study/python/fluent_python/16-协程/01-协程入门案例.py", line 39, in <module>
s.send(100)
StopIteration
"""
s.send(100)
2、使用协程计算平均值
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2024/3/2 9:28
# @Author : Maple
# @File : 02-使用协程计算平均值.py
# @Software: PyCharm
def avg():
sum = 0
count = 0
average = None
while True:
a = yield average
sum += a
count +=1
average = sum/count
if __name__ == '__main__':
a = avg()
# 预激活协程
next(a)
# 计算初始平均值
avg1 = a.send(10)
print(avg1) # 10.0
# 发送新数据,计算前两个数据的平均值
avg2 = a.send(20)
print(avg2) # 15.0
# 再次发送新数据,计算前三个数据的平均值
avg3 = a.send(30)
print(avg3) # 20.0
3、预激协程的装饰器
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2024/3/2 9:36
# @Author : Maple
# @File : 03-预激协程的装饰器.py
# @Software: PyCharm
from inspect import getgeneratorstate
def coroutine(func):
"""装饰器:让被装饰的协程向前执行一步,阻带到第一个yield处"""
def primer(*args,**kwargs):
gen = func(*args,**kwargs)
next(gen)
return gen
return primer
# 使用装饰器,装饰协程
@coroutine
def avg():
sum = 0
count = 0
average = None
while True:
a = yield average
sum += a
count +=1
average = sum/count
if __name__ == '__main__':
a = avg()
# 1.初始状态的协程就已经处于阻塞状态了(而不是刚创建)
print(getgeneratorstate(a)) # GEN_SUSPENDED,即无需再通过next去预激活协程
# 计算初始平均值
avg1 = a.send(10)
print(avg1) # 10.0
# 发送新数据,计算前两个数据的平均值
avg2 = a.send(20)
print(avg2) # 15.0
# 再次发送新数据,计算前三个数据的平均值
avg3 = a.send(30)
print(avg3) # 20.0
4、终止协程和异常处理
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2024/3/2 9:54
# @Author : Maple
# @File : 04-终止协程和异常处理.py
# @Software: PyCharm
# 自定义异常
class DemoException(Exception):
"""演示用异常"""
def demo_exc_handing():
print('-->coroutine started')
try:
while True:
try:
x = yield
except DemoException:
print('**DemoException handling,Continuning****')
else:
print('-->coroutine received: {!r}'.format(x))
raise RuntimeError('This line should never run')
# 该条语句永远也不会执行:如果while循环中遇到DemoException,协程处理后,会继续运行
# 如果遇到其它异常,生成器无法运行,则协程立刻终止(此时RuntimeError仍然没有机会执行)
finally:
"""如果不管协程是否结束,都有一些清理工作需要协程处理,可以使用finally"""
print('-->coroutine ending')
if __name__ == '__main__':
# 1.协程测试1
print('*****1.协程测试1***********')
d = demo_exc_handing()
# 1-1.预激活协程
next(d) # ->coroutine started
# 1-2.给协程发送数据
d.send(10) # -->coroutine received: 10
d.send(20) # -->coroutine received: 20
# 1-3.通过throw给协程发送异常
# 异常被处理,协程可以继续运行
d.throw(DemoException) # **DemoException handling,Continuning****
d.send(30) # -->coroutine received: 30
# 1-4.通过close方法,关闭协程
d.close()
from inspect import getgeneratorstate
print(getgeneratorstate(d)) # GEN_CLOSED
# 2.协程测试2
print('*****2.协程测试2***********')
d2 = demo_exc_handing()
next(d2) #-->coroutine started
d2.send(10) # -->coroutine received: 10
# 通过throw给协程发送一个其无法处理的异常,协程会停止,状态变成'GEN_CLOSED'
d2.throw(ZeroDivisionError)
print(getgeneratorstate(d2)) # GEN_CLOSED
5、协程返回值
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2024/3/2 10:22
# @Author : Maple
# @File : 05-协程返回值.py
# @Software: PyCharm
from collections import namedtuple
Result = namedtuple('Result','count average')
"""
接收协程返回值
"""
def avg():
sum = 0
count = 0
average = None
while True:
a = yield
# 由于协程必须正常终止,外部才能接收到返回值,所以要提供一种机制,保证协程可以正常终止
if a is None:
break
sum += a
count +=1
average = sum/count
# 协程正常终止,抛出StopIteration异常,此返回值会绑定在异常的一个属性中
return Result(count,average)
if __name__ == '__main__':
a = avg()
next(a)
a.send(10)
a.send(20)
a.send(30)
try:
a.send(None)
except StopIteration as exc:
# 协程返回值绑定在异常中的属性中
result = exc.value
print(result) # Result(count=3, average=20.0)
6、委派生成器
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2024/3/2 10:31
# @Author : Maple
# @File : 06-委派生成器.py
# @Software: PyCharm
from collections import namedtuple
Result = namedtuple('Result','count average')
# 子生成器
def avg():
sum = 0
count = 0
average = None
while True:
a = yield
# 由于协程必须正常终止,外部才能接收到返回值,所以要提供一种机制,保证协程可以正常终止
if a is None:
break
sum += a
count +=1
average = sum/count
# 协程正常终止,抛出StopIteration异常,此返回值会绑定在异常的一个属性中
return Result(count,average)
# 委派生成器
def grouper(results,key):
while True:
# results:接收子生成器的返回值
results[key] = yield from avg()
# 调用方
def main(data):
result = {}
for key,value in data.items():
# 每次遍历都生成一个新的委派生成器
group = grouper(result,key)
# 预激活委派生成器
next(group)
# 利用委派生成器,给子生成器avg发送数据,
for data in value:
group.send(data)
# 给子生成器发送一个None,终止子生成器(抛出StopIteration异常,yield from能够处理该异常)
# 最终result会接收到 子生成器返回的值(注意:子生成器产出的值 是直接反馈给调用方,而不是委派生成器)
group.send(None)
report(result)
def report(results):
for key,result in sorted(results.items()):
group,unit = key.split(';')
print('{:2} {:5} averaging {:.2f}{}'.format(result.count,group,result.average,unit))
if __name__ == '__main__':
data = {
'girls;kg' : [40.5,50.2,51.2,40.8],
'girls;m': [1.53, 1.61, 1.46, 1.8],
'boys;kg': [50.5, 62.4, 71.4, 60.4],
'boys;m': [1.63, 1.71, 1.76, 1.8]
}
"""
4 boys averaging 61.18kg
4 boys averaging 1.72m
4 girls averaging 45.67kg
4 girls averaging 1.60m
"""
main(data)
7、出租车运营仿真案例
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2024/3/2 12:58
# @Author : Maple
# @File : 07-出租车运营仿真案例.py
# @Software: PyCharm
import queue
import random
from collections import namedtuple
Event = namedtuple('Event','time ident action')
DEFAULT_NUMBER_OF_TAXIS = 3
DEFAULT_END_TIME = 180
SEARCH_DURATION = 5
TRIP_DURATION = 20
DEPARTURE_INTERVAL = 5
def taxi_process(ident,trips,start_time =0):
time = yield Event(start_time,ident,'leave garage')
for i in range(trips):
time = yield Event(time,ident,'pick up passenger')
time = yield Event(time,ident,'drop off passenger')
yield Event(time,ident,'go home')
class Simulator:
def __init__(self,procs_map):
self.events = queue.PriorityQueue()
self.procs = dict(procs_map)
def run(self,end_time):
for _,proc in self.procs.items():
# 放入每位司机的初始事件(从停车场出发)
self.events.put(next(proc))
# 仿真的主循环
sim_time = 0
while sim_time < end_time:
if self.events.empty():
print('end simulate')
break
# 取出start_time最早的事件
current_event = self.events.get()
sim_time, ident, previous_action = current_event
# 输出当前事件
print('taxi:',ident,ident *' ',current_event)
# 当前事件,是归属于哪一位司机
active_proc = self.procs[ident]
# 获取该司机下一个事件,并传递下一个事件开始时间
next_time = sim_time + Caculate_nexttime(previous_action)
try:
next_event = active_proc.send(next_time)
except StopIteration:
# 如果某位司机的事件已经遍历完毕,就删掉其对应记录
del self.procs[ident]
else:
# 将下一次事件放入事件库
self.events.put(next_event)
else:
msg = '** end stimulation time: {} events pending ***'
msg.format(self.events.qsize())
def Caculate_nexttime(previous_action):
if previous_action in ['leave garage','drop off passenger']:
# 新状态是四处徘徊
interval = SEARCH_DURATION
elif previous_action == 'pick up passenger':
# 新状态是行程开始
interval = TRIP_DURATION
elif previous_action == 'go home':
interval = 1
else:
raise ValueError('Unknown previous_action:%s' % previous_action)
return int(random.expovariate(1/interval)) + 1
def main(end_time = DEFAULT_END_TIME,num_taxis = DEFAULT_NUMBER_OF_TAXIS,seed = None):
if seed is not None:
random.seed(seed)
# 默认创建的司机事件集合
"""
proc_map = {
'0':taxi_process(0,2,0),
'1':taxi_process(1,4,5),
'2':taxi_process(2,6,10)
}
"""
proc_map = {i:taxi_process(i,(i+1) *2 ,i * DEPARTURE_INTERVAL ) for i in range(num_taxis)}
sim = Simulator(proc_map)
sim.run(end_time)
if __name__ == '__main__':
main(seed=3)
上一篇: 蓝桥杯配制刷子(自用)