Python异步编程
异步编程是一种编程范式,允许程序在等待某些操作完成时继续执行其他任务。在Python中,我们可以使用asyncio库来实现异步编程,构建高性能的网络应用和IO密集型任务。
异步编程基础
同步 vs 异步
| 特性 | 同步编程 | 异步编程 |
|---|
| 执行方式 | 顺序执行,阻塞等待 | 并发执行,非阻塞 |
| CPU利用率 | 低,等待IO时CPU空闲 | 高,IO等待时执行其他任务 |
| 编程模型 | 简单,易于理解 | 复杂,需要处理回调或协程 |
| 适用场景 | CPU密集型任务 | IO密集型任务 |
协程
协程(Coroutine)是一种轻量级的线程,由Python解释器管理,而非操作系统。协程可以在执行过程中暂停,让出控制权,然后在适当的时候恢复执行。
asyncio基础
安装
asyncio是Python 3.4+的标准库,无需额外安装。
简单示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| import asyncio
async def hello():
print("Hello")
await asyncio.sleep(1) # 非阻塞等待1秒
print("World")
# 创建事件循环
loop = asyncio.get_event_loop()
# 执行协程
loop.run_until_complete(hello())
# 关闭事件循环
loop.close()
|
Python 3.7+ 简化语法
1
2
3
4
5
6
7
8
9
| import asyncio
async def hello():
print("Hello")
await asyncio.sleep(1)
print("World")
# Python 3.7+ 简化执行方式
asyncio.run(hello())
|
协程和任务
定义协程
使用async def定义协程函数:
1
2
3
| async def my_coroutine():
# 协程代码
return "结果"
|
创建和执行任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| import asyncio
async def task_func():
await asyncio.sleep(1)
return "任务完成"
async def main():
# 创建任务
task = asyncio.create_task(task_func())
# 执行其他操作
print("等待任务完成...")
# 等待任务完成并获取结果
result = await task
print(f"任务结果: {result}")
asyncio.run(main())
|
并发执行多个任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| import asyncio
async def task_func(name, delay):
await asyncio.sleep(delay)
return f"{name} 完成,延迟 {delay} 秒"
async def main():
# 并发执行多个任务
results = await asyncio.gather(
task_func("任务1", 1),
task_func("任务2", 2),
task_func("任务3", 3)
)
for result in results:
print(result)
asyncio.run(main())
|
事件循环
事件循环是asyncio的核心,负责管理协程的执行。它的主要职责包括:
- 注册和执行协程
- 处理IO事件
- 调度任务
- 管理信号和子进程
事件循环的工作原理
- 事件循环运行,等待事件发生
- 当事件发生时,事件循环调用对应的回调函数
- 回调函数可能会创建新的协程或任务
- 事件循环继续运行,直到所有任务完成
异步IO操作
异步文件IO
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| import asyncio
import aiofiles
async def read_file(filename):
async with aiofiles.open(filename, 'r') as f:
content = await f.read()
return content
async def write_file(filename, content):
async with aiofiles.open(filename, 'w') as f:
await f.write(content)
async def main():
# 读取文件
content = await read_file('example.txt')
print(f"文件内容: {content}")
# 写入文件
await write_file('output.txt', 'Hello, Async IO!')
print("文件写入完成")
asyncio.run(main())
|
异步网络请求
使用aiohttp库进行异步网络请求:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| import asyncio
import aiohttp
async def fetch(session, url):
async with session.get(url) as response:
return await response.text()
async def main():
urls = [
'https://www.example.com',
'https://www.python.org',
'https://www.github.com'
]
async with aiohttp.ClientSession() as session:
# 并发获取多个URL
tasks = [fetch(session, url) for url in urls]
results = await asyncio.gather(*tasks)
for i, result in enumerate(results):
print(f"URL {i+1} 响应长度: {len(result)} 字符")
asyncio.run(main())
|
异步上下文管理器
异步上下文管理器允许在async with语句中使用上下文管理器:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| import asyncio
class AsyncContextManager:
async def __aenter__(self):
print("进入异步上下文")
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("退出异步上下文")
async def main():
async with AsyncContextManager() as cm:
print("在异步上下文中")
asyncio.run(main())
|
异步迭代器
异步迭代器允许在async for语句中使用迭代器:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| import asyncio
class AsyncIterator:
def __init__(self, stop):
self.stop = stop
self.current = 0
def __aiter__(self):
return self
async def __anext__(self):
if self.current >= self.stop:
raise StopAsyncIteration
await asyncio.sleep(0.5) # 模拟IO操作
self.current += 1
return self.current
async def main():
async for number in AsyncIterator(5):
print(number)
asyncio.run(main())
|
错误处理
捕获异常
1
2
3
4
5
6
7
8
9
10
11
12
13
| import asyncio
async def task_with_error():
await asyncio.sleep(1)
raise ValueError("任务执行出错")
async def main():
try:
await task_with_error()
except ValueError as e:
print(f"捕获到异常: {e}")
asyncio.run(main())
|
多个任务的错误处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| import asyncio
async def task1():
await asyncio.sleep(1)
raise ValueError("任务1出错")
async def task2():
await asyncio.sleep(2)
return "任务2完成"
async def main():
# 方式1:使用gather,一个任务出错会导致所有任务失败
try:
results = await asyncio.gather(task1(), task2())
print(results)
except ValueError as e:
print(f"捕获到异常: {e}")
# 方式2:使用gather的return_exceptions参数
results = await asyncio.gather(task1(), task2(), return_exceptions=True)
print(results) # [ValueError("任务1出错"), "任务2完成"]
asyncio.run(main())
|
异步编程最佳实践
- 使用asyncio.run():在程序入口点使用
asyncio.run(),避免手动管理事件循环 - 避免阻塞调用:在协程中避免使用阻塞的同步调用,如
time.sleep()、requests.get()等 - 使用异步库:选择支持异步的库,如
aiohttp替代requests,aiofiles替代open() - 合理使用并发:根据系统资源和网络状况,合理控制并发任务数量
- 使用asyncio.create_task():使用
create_task()创建任务,而不是直接等待协程 - 使用async with和async for:充分利用异步上下文管理器和异步迭代器
- 注意异常处理:使用try-except块捕获和处理协程中的异常
- 避免长时间运行的同步代码:如果必须执行同步代码,考虑使用
loop.run_in_executor()
异步编程模式
生产者-消费者模式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
| import asyncio
import random
async def producer(queue):
for i in range(10):
item = random.randint(1, 100)
await queue.put(item)
print(f"生产: {item}")
await asyncio.sleep(random.uniform(0.1, 0.5))
# 发送终止信号
await queue.put(None)
async def consumer(queue):
while True:
item = await queue.get()
if item is None:
# 收到终止信号,退出
queue.task_done()
break
print(f"消费: {item}")
await asyncio.sleep(random.uniform(0.2, 0.8))
queue.task_done()
async def main():
# 创建队列
queue = asyncio.Queue(maxsize=5)
# 创建任务
producer_task = asyncio.create_task(producer(queue))
consumer_task = asyncio.create_task(consumer(queue))
# 等待生产者完成
await producer_task
# 等待队列处理完成
await queue.join()
# 取消消费者任务
consumer_task.cancel()
asyncio.run(main())
|
超时处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| import asyncio
async def long_running_task():
await asyncio.sleep(5)
return "任务完成"
async def main():
try:
# 设置3秒超时
result = await asyncio.wait_for(long_running_task(), timeout=3)
print(f"结果: {result}")
except asyncio.TimeoutError:
print("任务超时")
asyncio.run(main())
|
Python异步编程总结
- 异步编程:非阻塞、高CPU利用率,适用于IO密集型任务
- 协程:轻量级线程,由Python解释器管理
- asyncio:Python标准库,用于实现异步编程
- 核心概念:事件循环、协程、任务、Future
- 异步IO:文件IO、网络请求等非阻塞操作
- 异步上下文管理器和迭代器
- 错误处理:捕获和处理协程中的异常
- 最佳实践:使用异步库、合理控制并发、注意异常处理
- 常见模式:生产者-消费者、超时处理
结语
Python的异步编程为我们提供了一种高效处理IO密集型任务的方式。通过学习和掌握asyncio库,我们可以构建出高性能的网络应用、爬虫、API服务等。
在实际项目中,我们应该根据任务类型选择合适的编程模型:对于IO密集型任务,使用异步编程可以显著提高性能;对于CPU密集型任务,同步编程或多进程可能更合适。
随着Python异步生态的不断发展,越来越多的库开始支持异步编程,这为我们构建高性能应用提供了更多的选择和便利。