Python异步编程

异步编程是一种编程范式,允许程序在等待某些操作完成时继续执行其他任务。在Python中,我们可以使用asyncio库来实现异步编程,构建高性能的网络应用和IO密集型任务。

异步编程基础

同步 vs 异步

特性同步编程异步编程
执行方式顺序执行,阻塞等待并发执行,非阻塞
CPU利用率低,等待IO时CPU空闲高,IO等待时执行其他任务
编程模型简单,易于理解复杂,需要处理回调或协程
适用场景CPU密集型任务IO密集型任务

协程

协程(Coroutine)是一种轻量级的线程,由Python解释器管理,而非操作系统。协程可以在执行过程中暂停,让出控制权,然后在适当的时候恢复执行。

asyncio基础

安装

asyncio是Python 3.4+的标准库,无需额外安装。

简单示例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
import asyncio

async def hello():
    print("Hello")
    await asyncio.sleep(1)  # 非阻塞等待1秒
    print("World")

# 创建事件循环
loop = asyncio.get_event_loop()

# 执行协程
loop.run_until_complete(hello())

# 关闭事件循环
loop.close()

Python 3.7+ 简化语法

1
2
3
4
5
6
7
8
9
import asyncio

async def hello():
    print("Hello")
    await asyncio.sleep(1)
    print("World")

# Python 3.7+ 简化执行方式
asyncio.run(hello())

协程和任务

定义协程

使用async def定义协程函数:

1
2
3
async def my_coroutine():
    # 协程代码
    return "结果"

创建和执行任务

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
import asyncio

async def task_func():
    await asyncio.sleep(1)
    return "任务完成"

async def main():
    # 创建任务
    task = asyncio.create_task(task_func())
    
    # 执行其他操作
    print("等待任务完成...")
    
    # 等待任务完成并获取结果
    result = await task
    print(f"任务结果: {result}")

asyncio.run(main())

并发执行多个任务

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
import asyncio

async def task_func(name, delay):
    await asyncio.sleep(delay)
    return f"{name} 完成,延迟 {delay} 秒"

async def main():
    # 并发执行多个任务
    results = await asyncio.gather(
        task_func("任务1", 1),
        task_func("任务2", 2),
        task_func("任务3", 3)
    )
    
    for result in results:
        print(result)

asyncio.run(main())

事件循环

事件循环是asyncio的核心,负责管理协程的执行。它的主要职责包括:

  1. 注册和执行协程
  2. 处理IO事件
  3. 调度任务
  4. 管理信号和子进程

事件循环的工作原理

  1. 事件循环运行,等待事件发生
  2. 当事件发生时,事件循环调用对应的回调函数
  3. 回调函数可能会创建新的协程或任务
  4. 事件循环继续运行,直到所有任务完成

异步IO操作

异步文件IO

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
import asyncio
import aiofiles

async def read_file(filename):
    async with aiofiles.open(filename, 'r') as f:
        content = await f.read()
        return content

async def write_file(filename, content):
    async with aiofiles.open(filename, 'w') as f:
        await f.write(content)

async def main():
    # 读取文件
    content = await read_file('example.txt')
    print(f"文件内容: {content}")
    
    # 写入文件
    await write_file('output.txt', 'Hello, Async IO!')
    print("文件写入完成")

asyncio.run(main())

异步网络请求

使用aiohttp库进行异步网络请求:

1
pip install aiohttp
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import asyncio
import aiohttp

async def fetch(session, url):
    async with session.get(url) as response:
        return await response.text()

async def main():
    urls = [
        'https://www.example.com',
        'https://www.python.org',
        'https://www.github.com'
    ]
    
    async with aiohttp.ClientSession() as session:
        # 并发获取多个URL
        tasks = [fetch(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        
        for i, result in enumerate(results):
            print(f"URL {i+1} 响应长度: {len(result)} 字符")

asyncio.run(main())

异步上下文管理器

异步上下文管理器允许在async with语句中使用上下文管理器:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
import asyncio

class AsyncContextManager:
    async def __aenter__(self):
        print("进入异步上下文")
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("退出异步上下文")

async def main():
    async with AsyncContextManager() as cm:
        print("在异步上下文中")

asyncio.run(main())

异步迭代器

异步迭代器允许在async for语句中使用迭代器:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import asyncio

class AsyncIterator:
    def __init__(self, stop):
        self.stop = stop
        self.current = 0
    
    def __aiter__(self):
        return self
    
    async def __anext__(self):
        if self.current >= self.stop:
            raise StopAsyncIteration
        
        await asyncio.sleep(0.5)  # 模拟IO操作
        self.current += 1
        return self.current

async def main():
    async for number in AsyncIterator(5):
        print(number)

asyncio.run(main())

错误处理

捕获异常

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
import asyncio

async def task_with_error():
    await asyncio.sleep(1)
    raise ValueError("任务执行出错")

async def main():
    try:
        await task_with_error()
    except ValueError as e:
        print(f"捕获到异常: {e}")

asyncio.run(main())

多个任务的错误处理

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import asyncio

async def task1():
    await asyncio.sleep(1)
    raise ValueError("任务1出错")

async def task2():
    await asyncio.sleep(2)
    return "任务2完成"

async def main():
    # 方式1:使用gather,一个任务出错会导致所有任务失败
    try:
        results = await asyncio.gather(task1(), task2())
        print(results)
    except ValueError as e:
        print(f"捕获到异常: {e}")
    
    # 方式2:使用gather的return_exceptions参数
    results = await asyncio.gather(task1(), task2(), return_exceptions=True)
    print(results)  # [ValueError("任务1出错"), "任务2完成"]

asyncio.run(main())

异步编程最佳实践

  1. 使用asyncio.run():在程序入口点使用asyncio.run(),避免手动管理事件循环
  2. 避免阻塞调用:在协程中避免使用阻塞的同步调用,如time.sleep()requests.get()
  3. 使用异步库:选择支持异步的库,如aiohttp替代requestsaiofiles替代open()
  4. 合理使用并发:根据系统资源和网络状况,合理控制并发任务数量
  5. 使用asyncio.create_task():使用create_task()创建任务,而不是直接等待协程
  6. 使用async with和async for:充分利用异步上下文管理器和异步迭代器
  7. 注意异常处理:使用try-except块捕获和处理协程中的异常
  8. 避免长时间运行的同步代码:如果必须执行同步代码,考虑使用loop.run_in_executor()

异步编程模式

生产者-消费者模式

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import asyncio
import random

async def producer(queue):
    for i in range(10):
        item = random.randint(1, 100)
        await queue.put(item)
        print(f"生产: {item}")
        await asyncio.sleep(random.uniform(0.1, 0.5))
    
    # 发送终止信号
    await queue.put(None)

async def consumer(queue):
    while True:
        item = await queue.get()
        
        if item is None:
            # 收到终止信号,退出
            queue.task_done()
            break
        
        print(f"消费: {item}")
        await asyncio.sleep(random.uniform(0.2, 0.8))
        queue.task_done()

async def main():
    # 创建队列
    queue = asyncio.Queue(maxsize=5)
    
    # 创建任务
    producer_task = asyncio.create_task(producer(queue))
    consumer_task = asyncio.create_task(consumer(queue))
    
    # 等待生产者完成
    await producer_task
    
    # 等待队列处理完成
    await queue.join()
    
    # 取消消费者任务
    consumer_task.cancel()

asyncio.run(main())

超时处理

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
import asyncio

async def long_running_task():
    await asyncio.sleep(5)
    return "任务完成"

async def main():
    try:
        # 设置3秒超时
        result = await asyncio.wait_for(long_running_task(), timeout=3)
        print(f"结果: {result}")
    except asyncio.TimeoutError:
        print("任务超时")

asyncio.run(main())
Python异步编程总结
  • 异步编程:非阻塞、高CPU利用率,适用于IO密集型任务
  • 协程:轻量级线程,由Python解释器管理
  • asyncio:Python标准库,用于实现异步编程
  • 核心概念:事件循环、协程、任务、Future
  • 异步IO:文件IO、网络请求等非阻塞操作
  • 异步上下文管理器和迭代器
  • 错误处理:捕获和处理协程中的异常
  • 最佳实践:使用异步库、合理控制并发、注意异常处理
  • 常见模式:生产者-消费者、超时处理

结语

Python的异步编程为我们提供了一种高效处理IO密集型任务的方式。通过学习和掌握asyncio库,我们可以构建出高性能的网络应用、爬虫、API服务等。

在实际项目中,我们应该根据任务类型选择合适的编程模型:对于IO密集型任务,使用异步编程可以显著提高性能;对于CPU密集型任务,同步编程或多进程可能更合适。

随着Python异步生态的不断发展,越来越多的库开始支持异步编程,这为我们构建高性能应用提供了更多的选择和便利。