欢迎您访问 最编程 本站为您分享编程语言代码,编程技术文章!
您现在的位置是: 首页

Python 中的并发编程:Asyncio 和并发性

最编程 2024-06-09 10:02:43
...

协程应该是属于python里面一个独有的概念。源自它设计的一个特性:同一时刻,Python 主程序只允许有一个线程执行
但是对于一些IO操作频繁的操作,如网络请求,如果单线程同步执行的话,那么很多时间都会浪费在等待请求返回中。假设下载一个网页的数据需要2秒,现在需要下载10个网页,按同步一个一个执行的话,则需要花费2*10=20秒。但如果我开始下载1个之后,然后又立即去下载第2个,如果第1个的结果返回了之后,我们再去处理第一个的下载结果。这样我们就不需要每个都等待2秒了。
协程就是用来在单线程中实现并发编程的一种操作。协程由用户决定,在哪些地方交出控制权,切换到下一个任务。

协程就是异步编程。可以把协程理解成一个异步函数,而直接调用这个异步函数返回一个协程对象。协程使用asyncawait语法糖,通过async def声明。

#定义一个协程
async def main():
    print("hello")
main()
#输出:返回一个协程对象
<coroutine object main at 0x7fb64949ef40>

上面看到,直接调用协程并不会真正调用执行,要真正运行一个协程,需要用到asyncio库(已经废弃的生成器调用方法就不说了)

asyncio

使用协程,我们需要用到asyncio库。

asyncio 是用来编写 并发 代码的库,使用 async/await 语法。
asyncio 被用作多个提供高性能 Python 异步框架的基础,包括网络和网站服务,数据库连接库,分布式任务队列等等。
asyncio 往往是构建 IO 密集型和高层级 结构化 网络代码的最佳选择。

asyncio 提供一组 高层级API 用于并发地 运行 Python 协程并对其执行过程实现完全控制

asyncio 提供了三种主要机制运行协程:

  • asyncio.run() 函数用来运行最高层级的入口点 "main()"。
#此代码在paycharm中运行
async def main():
    print("hello")
asyncio.run(main())
#输出:
hello

asyncio.run(coro, *, *debug=False*)会运行传入的协程coro,并返回结果。它会创建一个事件循环(event loop),负责管理asyncio事件的循环调度。而且在一个线程中,只能运行一个事件循环。比如你想在jupyter notebook中执行asyncio.run(),就会失败,你会收到一个错误提示:RuntimeError: asyncio.run() cannot be called from a running event loop。因为jupter notebook本身已经运行了一个event loop了。

一般用asyncio.run(main()) 作为主程序的入口函数,在程序运行周期内,只调用一次 asyncio.run。

  • 使用await等待一个协程

await后面接一个可等待对象。可等待对象有三种主要类型: 协程, 任务 和 Future

await,字面意思,就是等待程序执行。我们用await等待一个协程,那么程序就会阻塞在这里,进入被调用的协程函数,执行完毕后再返回继续,这个和正常python流程是一样的。使用await,在jupter notebook中,我们就可以用它来运行协程了。如下:


#此代码在jupyter notebook中执行。
import requests
import asyncio
async def crawl_page(url):
    
    print("crawl url:{}".format(url))
    await asyncio.sleep(2) #用休眠代替网络请求操作,排除网络的原因
    print("ok url:{}".format(url))
     

async def main(): 
    urls = ['https://www.amazon.co.jp/-/en/ranking?type=new-releases',
            'https://www.amazon.co.jp/-/en/ranking?type=top-sellers',
            'https://www.amazon.co.jp/gp/dmusic/promotions/AmazonMusicUnlimited']
    for url in urls:
        await crawl_page(url)
start_time = time.perf_counter()
await main()
end_time = time.perf_counter()

print("总共耗时:{}".format(end_time-start_time))
#### 输出
crawl url:https://www.amazon.co.jp/-/en/ranking?type=new-releases
ok url:https://www.amazon.co.jp/-/en/ranking?type=new-releases
crawl url:https://www.amazon.co.jp/-/en/ranking?type=top-sellers
ok url:https://www.amazon.co.jp/-/en/ranking?type=top-sellers
crawl url:https://www.amazon.co.jp/gp/dmusic/promotions/AmazonMusicUnlimited
ok url:https://www.amazon.co.jp/gp/dmusic/promotions/AmazonMusicUnlimited
总共耗时:6.007925541001896

上面模拟网络3个网络请求,每个耗时2秒,总共耗时约6秒。整个过程与同步过程是一样。相当于下面的同步流程:

import requests
import time

def crawl_page(url):
    
    print("crawl url:{}".format(url))
    time.sleep(2) #简单粗暴用休眠代替网络请求操作,也不受其他因素的影响
    print("ok url:{}".format(url))
     

def main():    
    urls = ['https://www.amazon.co.jp/-/en/ranking?type=new-releases',
            'https://www.amazon.co.jp/-/en/ranking?type=top-sellers',
            'https://www.amazon.co.jp/gp/dmusic/promotions/AmazonMusicUnlimited']
    for url in urls:
        crawl_page(url)
        
start_time = time.perf_counter()
main()
end_time = time.perf_counter()

print("总共耗时:{}".format(end_time-start_time))

#### 输出
crawl url:https://www.amazon.co.jp/-/en/ranking?type=new-releases
ok url:https://www.amazon.co.jp/-/en/ranking?type=new-releases
crawl url:https://www.amazon.co.jp/-/en/ranking?type=top-sellers
ok url:https://www.amazon.co.jp/-/en/ranking?type=top-sellers
crawl url:https://www.amazon.co.jp/gp/dmusic/promotions/AmazonMusicUnlimited
ok url:https://www.amazon.co.jp/gp/dmusic/promotions/AmazonMusicUnlimited
总共耗时:6.009312429000602

看到这,是不是觉得协程没什么特别嘛,并没有实现并发呀。并没有实现我想要的耗时约2秒。那时因为真正的主角还没上场。上面只是用异步方法实现了同步功能,如果要实现并发,那就需要任务。也即协程的第三种调用方式:

  • 用任务“并行地”调度协程
    asyncio.create_task()将一个协程封装成任务,该协程就可以被自动调度执行了。创建一个任务:task = asyncio.create_task(coro),然后用await调度任务。任务可以很快地被执行而不被阻塞。

asyncio.create_task(coro,name=None):
coro 协程 封装为一个 Task 并调度其执行。返回 Task 对象。
该任务会在 get_running_loop() 返回的循环中执行,如果当前线程没有在运行的循环则会引发 RuntimeError

上面的例子用task实现如下:

import asyncio

async def crawl_page(url):
    
    print("crawl url:{}".format(url))
    await asyncio.sleep(2) #asyncio的sleep()总是会挂起当前任务,以允许其他任务运行
    print("ok url:{}".format(url))
     

async def main():    
    urls = ['https://www.amazon.co.jp/-/en/ranking?type=new-releases',
            'https://www.amazon.co.jp/-/en/ranking?type=top-sellers',
            'https://www.amazon.co.jp/gp/dmusic/promotions/AmazonMusicUnlimited']
    
    tasks = [asyncio.create_task(crawl_page(url)) for url in urls] #创建任务列表
    for task in tasks:
        await task #用await调度任务
        
start_time = time.perf_counter()
await main()
end_time = time.perf_counter()

print("总共耗时:{}".format(end_time-start_time))

#####输出#######
crawl url:https://www.amazon.co.jp/-/en/ranking?type=new-releases
crawl url:https://www.amazon.co.jp/-/en/ranking?type=top-sellers
crawl url:https://www.amazon.co.jp/gp/dmusic/promotions/AmazonMusicUnlimited
ok url:https://www.amazon.co.jp/-/en/ranking?type=new-releases
ok url:https://www.amazon.co.jp/-/en/ranking?type=top-sellers
ok url:https://www.amazon.co.jp/gp/dmusic/promotions/AmazonMusicUnlimited
总共耗时:2.004724346999865

看,使用任务Task,终于实现了想要的约2秒的效果了。那么task是如何实现并发的呢?看下面两个例子:

import asyncio
import time
async def work1():
    print("work1 start")
    await asyncio.sleep(2) #特意设置成2秒
    print("work1 done")
    
async def work2():
    print("work2 start")
    await asyncio.sleep(1)
    print("work2 done")
    
async def work3():
    print("work3 start")
    await asyncio.sleep(3)
    print("work3 done")
    
async def main():
    task1 = asyncio.create_task(work1())
    task2 = asyncio.create_task(work2())
    task3 = asyncio.create_task(work3())

print("start at {}".format(time.strftime('%X')))
await main()
print("ended at {}".format(time.strftime('%X')))

#### 输出
start at 20:26:39
ended at 20:26:39
work1 start
work2 start
work3 start
import time
async def work1():
    print("work1 start")
    await asyncio.sleep(2) #特意设置成2秒
    print("work1 done")
    
async def work2():
    print("work2 start")
    await asyncio.sleep(1)
    print("work2 done")
    
async def work3():
    print("work3 start")
    await asyncio.sleep(3)
    print("work3 done")
    
async def main():
    task1 = asyncio.create_task(work1())
    task2 = asyncio.create_task(work2())
    task3 = asyncio.create_task(work3())
    print("before await")
    await task1
    print("awaited task1")
    await task2
    print("awaited task2")
    await task3
    print("awaited task3")

print("start at {}".format(time.strftime('%X')))
await main()
print("ended at {}".format(time.strftime('%X')))

###输出
start at 20:28:31
before await
work1 start
work2 start
work3 start
work2 done
work1 done
awaited task1
awaited task2
work3 done
awaited task3
ended at 20:28:34

从两个例子中我们可以看出:
1、任务被创建后,就会被加入事件循环,等待事件调度器调度。事件调度器会自动地去调度。
2、遇到await后,当前任务就会挂起,将控制权交出,事件调度器去调度其它任务,其它任务获得控制权。当任务执行完成之后,它又会重新获得控制权,继续执行后续的代码。
3、上面我特意写了work1的休眠时长比work2的休眠时长长。work2会优先于work1完成,work2执行完成之后,主程序获得控制权后,不会打印“awaited work2”,因为主程序还在await work1呢。

这样,通过事件循环调度,异步调用,我们就实现了python的“并发”。充分利用了等待时间。
上面,我们看到的都是没有返回值的,协程也是可以返回值的。我们可以在结束之后,取到返回值。另外,协程也是有可能产生异常的,任务有可能被取消。

import asyncio
import time
async def work1():
    print("work1 start")
    await asyncio.sleep(1) 
    print("work1 done")
    return 1
    
async def work2():
    print("work2 start")
    await asyncio.sleep(2)
    print("work2 done")
    return 2/0
    
async def work3():
    print("work3 start")
    await asyncio.sleep(3)
    print("work3 done")
    return 3
async def work4():
    print("work4 start")
    await asyncio.sleep(4)
    print("work4 done")
    return 4
    
async def main():
    task1 = asyncio.create_task(work1())
    task2 = asyncio.create_task(work2())
    task3 = asyncio.create_task(work3())
    task4 = asyncio.create_task(work4())
    
    #2秒之后,取消任务4
    await asyncio.sleep(2)
    task4.cancel()

    res = await asyncio.gather(task1, task2, task3, task4,return_exceptions=True) 
    print(res)

print("start at {}".format(time.strftime('%X')))
await main()
print("ended at {}".format(time.strftime('%X')))

###输出
start at 21:10:18
work1 start
work2 start
work3 start
work4 start
work1 done
work2 done
work3 done
[1, ZeroDivisionError('division by zero'), 3, CancelledError()]
ended at 21:10:21

taskasyncio.gatherasyncio.gather(aws,return_exceptions=True)
1、并发运行 aws序列中的可等待对象
2、如果 aws 中的某个可等待对象为协程,它将自动被作为一个任务调度。
3、如果所有可等待对象都成功完成,结果将是一个由所有返回值聚合而成的列表。结果值的顺序与 aws 中可等待对象的顺序一致。
4、如果 return_exceptionsFalse (默认),所引发的首个异常会5、立即传播给等待 gather() 的任务。aws 序列中的其他可等待对象 不会被取消 并将继续运行。
如果 return_exceptionsTrue,异常会和成功的结果一样处理,并聚合至结果列表。
如果 gather() 被取消,所有被提交 (尚未完成) 的可等待对象也会 被取消

最后,看一个真正的网络请求吧

爬取豆瓣上即将上映的电影,获取电影名称,上映时间,时长,简介等信息。(仅供学习交流,勿频繁调用)

import aiohttp
import asyncio
from bs4 import BeautifulSoup
global sleep = 0 

#解析拿到的所有即将上映的电影
def get_movies_info(text):
    print("开始解析拿到所有即将上映的电影数据")
    soup = BeautifulSoup(text,"html.parser")
    all_movies = soup.find('div',id = 'showing-soon')
    movies_info = []
    for each_movie in all_movies.find_all('div',class_="item"):
        #print("=========")
        all_a_tag = each_movie.find_all('a')
        all_li_tag = each_movie.find_all("li")
        movie_name = all_a_tag[1].text
        movie_url= all_a_tag[1]["href"]
    
        movie_date = all_li_tag[0].text
        movie_type = all_li_tag[1].text
        movies_info.append([movie_url,(movie_name,movie_type,movie_date)])
    return movies_info

def get_each_movie_details(text):
    soup = BeautifulSoup(text,"html.parser")
    spans = soup.find("div",id="info").find_all("span")
    #时长
    duration = spans[-2].text
    #简介
    description = soup.find("div",{"class":"indent","id":"link-report"}).find("span").text.strip()
    #海报url
    img_tag = soup.find("img")
    url = img_tag['src']
    return (duration,description,url)

async def get_url(url)->str:
    global sleep
    asyncio.sleep(sleep) #每个请求都会被等待,等待的sleep是一个递增变量,防止频繁数据请求,我们的目的是协程。
    print("start get url:{}".format(url))
    header = {"user-agent":"Chrome/10.0"}
    async with aiohttp.ClientSession(headers=header) as session:
        async with session.get(url,headers=header) as resp:
            result =  await resp.text()
            print("getted result")
            return result
        
    sleep += 0.5
                
async def main():
    url = "https://movie.douban.com/cinema/later/shenzhen/"
    try:
        task = asyncio.create_task(get_url(url))
        res = await asyncio.gather(task)
        movies_info = get_movies_info(res[0]) 
        
        tasks = [asyncio.create_task(get_url(info[0])) for info in movies_info]
        print("start fetch each movie info")
        htmls = await asyncio.gather(*tasks,return_exceptions=True)
        print("get html :{}".format(len(htmls)))
        all_info = []
        i = 0
        for html in htmls:
            details = get_each_movie_details(html)
            all_info.append(movies_info[i][1]+details)
            i+=1
            
            
        print(" ".join(all_info))
            
    except Exception as e:
        print("Exception:{}".format(e))

await main()
    
    

参考文档:https://docs.python.org/zh-cn/3/library/asyncio.html
https://time.geekbang.org/column/article/103358

推荐阅读