一文了解:Python 并發(fā)、并行、同步、異步、阻塞、非阻塞
前言
在 Python 中,理解并發(fā)(Concurrency)、并行(Parallelism)、同步(Synchronization)、異步(Asynchronous)、阻塞(Blocking)和非阻塞(Non-blocking)是非常重要的,因?yàn)樗鼈兪菢?gòu)建高性能應(yīng)用程序的關(guān)鍵概念。
1. 并發(fā)(Concurrency)
并發(fā)是指程序在同一時(shí)間段內(nèi)可以處理多個(gè)任務(wù)的能力。具體來說,程序看起來像是同時(shí)執(zhí)行多個(gè)任務(wù),但實(shí)際上它們是在交替執(zhí)行。
1.1 示例:多線程
import threading
import time
def worker():
print(f"Thread {threading.current_thread().name} started")
time.sleep(2)
print(f"Thread {threading.current_thread().name} finished")
# 創(chuàng)建多個(gè)線程
threads = []
for i in range(5):
thread = threading.Thread(target=worker, name=f"Thread-{i}")
threads.append(thread)
thread.start()
# 等待所有線程完成
for thread in threads:
thread.join()
print("All threads finished")
2. 并行(Parallelism)
并行是指程序在同一時(shí)間可以真正同時(shí)執(zhí)行多個(gè)任務(wù)的能力。通常需要硬件支持,例如多核處理器。
2.1 示例:多進(jìn)程
import multiprocessing
def worker():
print(f"Process {multiprocessing.current_process().name} started")
time.sleep(2)
print(f"Process {multiprocessing.current_process().name} finished")
# 創(chuàng)建多個(gè)進(jìn)程
processes = []
for i in range(5):
process = multiprocessing.Process(target=worker, name=f"Process-{i}")
processes.append(process)
process.start()
# 等待所有進(jìn)程完成
for process in processes:
process.join()
print("All processes finished")
3. 同步(Synchronization)
同步是指在多線程或多進(jìn)程環(huán)境中,通過鎖或其他機(jī)制確保資源的安全訪問。
3.1 示例:鎖(Lock)
import threading
def worker(lock):
with lock:
print(f"Thread {threading.current_thread().name} started")
time.sleep(2)
print(f"Thread {threading.current_thread().name} finished")
lock = threading.Lock()
# 創(chuàng)建多個(gè)線程
threads = []
for i in range(5):
thread = threading.Thread(target=worker, args=(lock,), name=f"Thread-{i}")
threads.append(thread)
thread.start()
# 等待所有線程完成
for thread in threads:
thread.join()
print("All threads finished")
4. 異步(Asynchronous)
異步是指程序可以在等待某個(gè)操作完成的同時(shí)繼續(xù)執(zhí)行其他任務(wù)。異步編程通常使用回調(diào)函數(shù)或協(xié)程。
4.1 示例:異步 I/O(使用 asyncio)
import asyncio
async def worker():
print(f"Worker {asyncio.current_task().get_name()} started")
await asyncio.sleep(2)
print(f"Worker {asyncio.current_task().get_name()} finished")
async def main():
tasks = []
for i in range(5):
task = asyncio.create_task(worker(), name=f"Worker-{i}")
tasks.append(task)
await asyncio.gather(*tasks)
asyncio.run(main())
5. 阻塞(Blocking)
阻塞是指程序在執(zhí)行某個(gè)操作時(shí)會(huì)暫停執(zhí)行,直到該操作完成。例如,當(dāng)執(zhí)行一個(gè)阻塞的 I/O 操作時(shí),程序會(huì)等待直到 I/O 操作完成。
5.1 示例:阻塞 I/O
import time
def blocking_io():
print("Starting blocking IO")
time.sleep(5)
print("Finished blocking IO")
blocking_io()
6. 非阻塞(Non-blocking)
非阻塞是指程序在執(zhí)行某個(gè)操作時(shí)不會(huì)暫停執(zhí)行,而是繼續(xù)執(zhí)行其他任務(wù)。通常用于網(wǎng)絡(luò) I/O 或文件 I/O。
6.1 示例:非阻塞 I/O(使用 select)
import select
import socket
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind(('localhost', 8000))
server_socket.listen(5)
sockets_list = [server_socket]
def handle_client(client_socket):
request = client_socket.recv(1024)
print(f"Received: {request.decode()}")
response = "Hello, World!\n"
client_socket.send(response.encode())
client_socket.close()
while True:
read_sockets, _, exception_sockets = select.select(sockets_list, [], sockets_list)
for notified_socket in read_sockets:
if notified_socket == server_socket:
client_socket, client_address = server_socket.accept()
sockets_list.append(client_socket)
else:
handle_client(notified_socket)
for notified_socket in exception_sockets:
sockets_list.remove(notified_socket)
notified_socket.close()
1. 并發(fā)(Concurrency)
1.1 示例:多線程發(fā)送 HTTP 請(qǐng)求
import threading
import requests
import time
def send_request(url, headers, payload):
response = requests.post(url, headers=headers, jsnotallow=payload)
print(f"Response status code: {response.status_code}")
print(f"Response content: {response.text}")
# 測(cè)試數(shù)據(jù)
test_cases = [
{
"url": "https://api.example.com/v1/resource",
"headers": {"Authorization": "Bearer token1"},
"payload": {"key": "value1"}
},
{
"url": "https://api.example.com/v1/resource",
"headers": {"Authorization": "Bearer token2"},
"payload": {"key": "value2"}
}
]
# 創(chuàng)建線程列表
threads = []
# 創(chuàng)建并啟動(dòng)線程
for test_case in test_cases:
thread = threading.Thread(target=send_request, args=(test_case["url"], test_case["headers"], test_case["payload"]))
threads.append(thread)
thread.start()
# 等待所有線程完成
for thread in threads:
thread.join()
print("All requests finished")
2. 并行(Parallelism)
2.1 示例:多進(jìn)程發(fā)送 HTTP 請(qǐng)求
import multiprocessing
import requests
import time
def send_request(url, headers, payload):
response = requests.post(url, headers=headers, jsnotallow=payload)
print(f"Response status code: {response.status_code}")
print(f"Response content: {response.text}")
# 測(cè)試數(shù)據(jù)
test_cases = [
{
"url": "https://api.example.com/v1/resource",
"headers": {"Authorization": "Bearer token1"},
"payload": {"key": "value1"}
},
{
"url": "https://api.example.com/v1/resource",
"headers": {"Authorization": "Bearer token2"},
"payload": {"key": "value2"}
}
]
# 創(chuàng)建進(jìn)程列表
processes = []
# 創(chuàng)建并啟動(dòng)進(jìn)程
for test_case in test_cases:
process = multiprocessing.Process(target=send_request, args=(test_case["url"], test_case["headers"], test_case["payload"]))
processes.append(process)
process.start()
# 等待所有進(jìn)程完成
for process in processes:
process.join()
print("All requests finished")
3. 同步(Synchronization)
3.1 示例:使用鎖同步多線程
import threading
import requests
import time
def send_request(lock, url, headers, payload):
with lock:
response = requests.post(url, headers=headers, jsnotallow=payload)
print(f"Response status code: {response.status_code}")
print(f"Response content: {response.text}")
# 測(cè)試數(shù)據(jù)
test_cases = [
{
"url": "https://api.example.com/v1/resource",
"headers": {"Authorization": "Bearer token1"},
"payload": {"key": "value1"}
},
{
"url": "https://api.example.com/v1/resource",
"headers": {"Authorization": "Bearer token2"},
"payload": {"key": "value2"}
}
]
# 創(chuàng)建鎖
lock = threading.Lock()
# 創(chuàng)建線程列表
threads = []
# 創(chuàng)建并啟動(dòng)線程
for test_case in test_cases:
thread = threading.Thread(target=send_request, args=(lock, test_case["url"], test_case["headers"], test_case["payload"]))
threads.append(thread)
thread.start()
# 等待所有線程完成
for thread in threads:
thread.join()
print("All requests finished")
4. 異步(Asynchronous)
4.1 示例:使用 asyncio 發(fā)送異步 HTTP 請(qǐng)求
import asyncio
import aiohttp
async def send_request(url, headers, payload):
async with aiohttp.ClientSession() as session:
async with session.post(url, headers=headers, jsnotallow=payload) as response:
print(f"Response status code: {response.status}")
response_text = await response.text()
print(f"Response content: {response_text}")
# 測(cè)試數(shù)據(jù)
test_cases = [
{
"url": "https://api.example.com/v1/resource",
"headers": {"Authorization": "Bearer token1"},
"payload": {"key": "value1"}
},
{
"url": "https://api.example.com/v1/resource",
"headers": {"Authorization": "Bearer token2"},
"payload": {"key": "value2"}
}
]
async def main():
tasks = []
for test_case in test_cases:
task = asyncio.create_task(send_request(test_case["url"], test_case["headers"], test_case["payload"]))
tasks.append(task)
await asyncio.gather(*tasks)
asyncio.run(main())
5. 阻塞(Blocking)
5.1 示例:阻塞式發(fā)送 HTTP 請(qǐng)求
import requests
import time
def send_request(url, headers, payload):
response = requests.post(url, headers=headers, jsnotallow=payload)
print(f"Response status code: {response.status_code}")
print(f"Response content: {response.text}")
# 測(cè)試數(shù)據(jù)
test_cases = [
{
"url": "https://api.example.com/v1/resource",
"headers": {"Authorization": "Bearer token1"},
"payload": {"key": "value1"}
},
{
"url": "https://api.example.com/v1/resource",
"headers": {"Authorization": "Bearer token2"},
"payload": {"key": "value2"}
}
]
# 依次發(fā)送請(qǐng)求
for test_case in test_cases:
send_request(test_case["url"], test_case["headers"], test_case["payload"])
print("All requests finished")
6. 非阻塞(Non-blocking)
6.1 示例:使用 select 發(fā)送非阻塞 HTTP 請(qǐng)求
import select
import socket
import requests
import time
def send_request(url, headers, payload):
response = requests.post(url, headers=headers, jsnotallow=payload)
print(f"Response status code: {response.status_code}")
print(f"Response content: {response.text}")
# 測(cè)試數(shù)據(jù)
test_cases = [
{
"url": "https://api.example.com/v1/resource",
"headers": {"Authorization": "Bearer token1"},
"payload": {"key": "value1"}
},
{
"url": "https://api.example.com/v1/resource",
"headers": {"Authorization": "Bearer token2"},
"payload": {"key": "value2"}
}
]
# 創(chuàng)建套接字列表
sockets_list = []
# 創(chuàng)建并啟動(dòng)套接字
for test_case in test_cases:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(("localhost", 8000))
sockets_list.append(sock)
# 監(jiān)聽套接字
while sockets_list:
ready_to_read, _, _ = select.select(sockets_list, [], [])
for sock in ready_to_read:
send_request(test_cases[sockets_list.index(sock)]["url"], test_cases[sockets_list.index(sock)]["headers"], test_cases[sockets_list.index(sock)]["payload"])
sockets_list.remove(sock)
print("All requests finished")
7. 總結(jié)
通過以上示例,我們?cè)敿?xì)介紹了 Python 中的幾個(gè)關(guān)鍵概念:
并發(fā)(Concurrency):在同一時(shí)間段內(nèi)處理多個(gè)任務(wù)。
并行(Parallelism):在同一時(shí)間真正同時(shí)執(zhí)行多個(gè)任務(wù)。
同步(Synchronization):確保多線程或多進(jìn)程環(huán)境下的資源安全訪問。
異步(Asynchronous):在等待某個(gè)操作完成的同時(shí)繼續(xù)執(zhí)行其他任務(wù)。
阻塞(Blocking):在執(zhí)行某個(gè)操作時(shí)會(huì)暫停執(zhí)行,直到該操作完成。
非阻塞(Non-blocking):在執(zhí)行某個(gè)操作時(shí)不會(huì)暫停執(zhí)行,而是繼續(xù)執(zhí)行其他任務(wù)。