2025. 10. 26. 07:53ㆍProgramming/Python
- 목차
예제 1: 커피숍 비유로 이해하기
상황: 5명의 손님이 커피를 주문했습니다
커피 만드는 시간: 각 3초
방법 1: 동기 방식 (Synchronous) - 비효율적인 직원
import asyncio
import time
from datetime import datetime
print("=" \* 60)
print("☕ 커피숍 비유로 AsyncIO 이해하기")
print("=" \* 60)
def make\_coffee\_sync(customer):
"""커피를 만듭니다 (동기)"""
print(f" \[{datetime.now().strftime('%H:%M:%S')}\] {customer}님 커피 제조 시작...")
time.sleep(3) # 커피 만드는 시간 (3초 대기)
print(f" \[{datetime.now().strftime('%H:%M:%S')}\] {customer}님 커피 완성! ☕")
return f"{customer}의 커피"
print("\\n📌 방법 1: 동기 방식 (한 명씩 순서대로)")
print("직원이 한 손님의 커피가 완전히 끝날 때까지 기다림\\n")
start = time.time()
customers = \["철수", "영희", "민수", "지영", "동현"\]
for customer in customers:
make\_coffee\_sync(customer)
sync\_time = time.time() - start
print(f"\\n⏱️ 총 소요 시간: {sync\_time:.1f}초 (5명 × 3초 = 15초)") 방법 2: 비동기 방식 (Asynchronous) - 효율적인 직원
우선 async io에 대해서 이해해야 합니다.
Python 3.4 이하에서는 yield from을 사용한 generator 함수로서 coroutine을 정의합니다. 이는 제네레이터와 코루틴의 경계가 모호해져서 혼란을 줄 수 있습니다.
Python 3.5이상에서는 async def을 사용해서 코루틴을 정의하면 함수가 자동으로 코루틴 객체를 반환합니다. (함수의 호출 == 코루틴 객체의 반환)
이는 await 키워드와 함께 비동기 프로그래밍(asyncio)를 보다 직관적으로 만듭니다.
이전 generator 기반 coroutine
def old_coroutine():
yield from some_async_operation() # yield from으로 대기async def
import asyncio
async def modern_coroutine():
await some_async_operation() # await으로 명확히 대기
async def main():
result = await modern_coroutine()
asyncio.run(main()) # run()은 asyncio event loop을 생성하고 main을 실행합니다. asyncio는 async def, await, asyncio.run() 등을 사용하기 위해서 import 합니다.
async def 키워드로 함수를 정의하면 이는 자동으로 코루틴 객체를 반환하는 native coroutine 이 됩니다. def과 달리 이 함수를 호출하면 즉시 실행되지 않고 코루틴 객체가 생성되어 이벤트 루프에 의해 스케줄링됩니다.
await
await 키워드는 현재 코루틴이 다른 비동기 작업을 기다리는 지점을 명시합니다.
async def으로 정의된 함수를 호출하면 코루틴 객체가 반환됩니다. 그러나 이 객체는 실제로 실행되지 않습니다. await를 사용해야 event loop가 코루틴을 스케줄링하고 실행합니다.
만약 await 없이 async def으로 정의된 함수를 호출하면, 코루틴 객체가 반환되나 함수 본문이 실행되지 않습니다.
비동기 코드는 이벤트 루프에 의해서만 실행될 수 있기 때문에 await 키워드를 사용해야 이 async def으로 정의된 비동기 동작의 함수를 실행하고 대기해서 결과를 받게 됩니다.
여기서 await는 블로킹 대기를 하지 않습니다. await 지점에서 코루틴이 자동으로 양보를 하게 됩니다. 이렇게 양보를 하면 이벤트 루프가 다른 코루틴을 실행할 수 있어서 동시성(concurrency)를 달성합니다.
await에 도달한 코루틴은 일시중단 상태가 됩니다. 이에 이벤트 루프는 즉시 다른 대기 중인 코루틴을 선택해 실행합니다.
await의 동작을 정리하면,
await job()
을 하게되면 job 코루틴 객체를 생성하고 이를 스케줄러어 넣고, await은 여기서 yield를 하게 되니
이제 스케줄러에 들어간 여러 작업 중 무엇을 먼저 실행할지 결정하고 job이 실행될 차례이면 이를 실행합니다.
만약 스케줄러에서 다른 job2를 먼저 실행한다면, job은 대기 상태에 들어가고 이 await는 계속 기다리다가 job이 실행되고 완료되면 그때 이 await는 종료됩니다.
이러한 동작은 parallel 하게 동작하는 것이 아니라 concurrent하게 동작합니다. 왜냐하면 Python의 GIL 제약 때문에 그러합니다. 즉, 하나의 thread execution context를 await 키워드를 통해서 여러 suspendable 작업들이 서로 나눠서 실행되는 방식으로 동작하기 때문입니다.
asyncio.run(main())은 event loop를 획득하고 이 event loop에서 main의 수행을 완료할때까지 기다렸다가 반환되는 함수입니다. main()을 호출을 하는데, async def으로 정의된 main의 호출코드 main()은 사실 호출이 아니라 코루틴 객체 인스턴스를 반환해서 asyncio.run에 인자로 넘깁니다.
이 과정에서 await 지점에서 다른 코루틴으로 전환(양보)되며, 전체 작업이 끝날 때까지 블록킹 상태로 대기합니다. 코루틴의 결과값을 반환하고 이벤트 루프를 자동으로 종료(cleanup) 합니다.
asyncio.run() 대신 다음과 같이 사용할 수도 있습니다.
loop = asyncio.get_event_loop()
loop.run_until_complete(main())asyncio.gather()
여러 코루틴을 병렬로 실행하려면 asyncio.gather()를 사용
asyncio.gather()는 여러 코루틴 객체를 동시에 실행하고 모든 작업이 완료될 때까지 기다린 후 결과를 한 번에 반환하는 함수입니다.
import asyncio
import time
async def task1():
await asyncio.sleep(2) # 2초 대기 시뮬레이션
return "Task 1 done"
async def task2():
await asyncio.sleep(1) # 1초 대기 시뮬레이션
return "Task 2 done"
async def main():
start = time.time()
results = await asyncio.gather(task1(), task2()) # 병렬 실행
end = time.time()
print(f"Results: {results}")
print(f"Total time: {end - start:.2f} seconds")
asyncio.run(main())Results: ['Task 1 done', 'Task 2 done']
Total time: 2.00 seconds만약 다음과 같이 gather를 사용하지 않을 시 더 오래 기다리게 됩니다.
await task1(); await task2()사실 await task1(); await task2()를 사용하게 되면 이는 병렬 처리가 아니가 그냥 순차적으로 함수 호출한거랑 별 차이 없습니다.
여기서 잠시 정리하자면, async def은 코루틴 객체를 생성하는 것이며, await은 코루틴 객체를 스케줄링하고 실행 완료를 대기하는 것 입니다.
이외에 여러 코루틴 객체의 병렬 실행을 위해 gather()를 사용할 수도 있으며, 다음의 것들도 사용 가능합니다.
asyncio.create_task(coro)
asyncio.wait(awables, ...)
asyncio.as_completed(awables)
완료 순서대로 결과를 이터레이터로 반환
asyncio APIs
create_task(task)
await task1()(은 직접대기로서 간단하고 직관적입니다. 단일 코루틴에 적합합니다.
반면
task1 = asyncio.create_task(task1())
await task1
create_task는 코루틴을 태스크 객체로 감싸 즉시 스케쥴링합니다.
이후 await task1에서 태스크의 완료를 대기합니다.
wait vs. gather
asyncio.wait([task1(), task2()]
asyncio.gather(task1(), task2())
이 둘은 여러 코루틴을 동시에 실행하나 저수준과 고수준의 차이가 있습니다. 둘 다 concurrency를 활용해 총 시간 = 가장 긴 작업 시간
wait은 저수준으로 유연하지만 사용이 복잡합니다.
gather는 고수준으로 쉽게 사용이 가능합니다.
asyncio.as_completed([task1(), taask2()]
이는 완료 순서대로 결과를 yield 하는 이터레이터 입니다.
gather, wait과 달리 실행 시간 순서로 결과를 처리하여 가장 빠른 결과 먼저 필요한 경우에 유효합니다.
다음과 같이
async def 함수명
async def make_coffee_async(customer):
"""커피를 만듭니다 (비동기)"""
print(f" \[{datetime.now().strftime('%H:%M:%S')}\] {customer}님 커피 제조 시작...")
await asyncio.sleep(3) # 커피 만드는 동안 다른 일 가능!
print(f" \[{datetime.now().strftime('%H:%M:%S')}\] {customer}님 커피 완성! ☕")
return f"{customer}의 커피"
print("\\n\\n📌 방법 2: 비동기 방식 (동시에 여러 개)")
print("직원이 커피 머신 돌려놓고 다른 손님 주문 받음\\n")
async def serve\_all\_customers():
start = time.time()
# 모든 손님의 주문을 동시에 처리
tasks = \[make\_coffee\_async(customer) for customer in customers\]
results = await asyncio.gather(\*tasks)
async\_time = time.time() - start
print(f"\\n⏱️ 총 소요 시간: {async\_time:.1f}초 (동시에 처리!)")
print(f"✨ {sync\_time / async\_time:.1f}배 빠름!")
asyncio.run(serve\_all\_customers()) 활용 예
예제 1: 기본 비동기 요청-응답
await asyncio.gather(코루틴 인스턴스들)을 사용하여 여러 요청을 동시에 처리
import asyncio
import aiohttp
import time
from typing import List, Dict, Any
from dataclasses import dataclass
from datetime import datetime
print("=" * 60)
print("예제 1: 기본 비동기 요청-응답")
print("=" * 60)
async def fetch_user_data(user_id: int) -> Dict[str, Any]:
"""사용자 데이터를 비동기로 가져오기"""
print(f" 🔄 사용자 {user_id} 데이터 요청 중...")
# 네트워크 요청 시뮬레이션 (1초 대기)
await asyncio.sleep(1)
# 결과 반환
result = {
'user_id': user_id,
'name': f'User{user_id}',
'email': f'user{user_id}@example.com'
}
print(f" ✅ 사용자 {user_id} 데이터 수신 완료")
return result
async def main_example1():
"""여러 사용자 데이터를 동시에 가져오기"""
user_ids = [1, 2, 3, 4, 5]
print("\n순차 처리 (느림):")
start = time.time()
results_sequential = []
for user_id in user_ids:
result = await fetch_user_data(user_id)
results_sequential.append(result)
print(f"⏱️ 소요 시간: {time.time() - start:.2f}초\n")
print("비동기 처리 (빠름):")
start = time.time()
# 모든 요청을 동시에 시작
tasks = [fetch_user_data(user_id) for user_id in user_ids]
results_async = await asyncio.gather(*tasks)
print(f"⏱️ 소요 시간: {time.time() - start:.2f}초")
print(f"\n📊 결과: {len(results_async)}명의 데이터 수신")
for result in results_async:
print(f" - {result['name']}: {result['email']}")
asyncio.run(main_example1())예제 2: 실시간 결과 스트리밍 (as_completed)
asyncio.as_completed를 사용해서 여러 코루틴 인스턴스들(task)을 동시에 수행
import asyncio
import aiohttp
import time
from typing import List, Dict, Any
from dataclasses import dataclass
from datetime import datetime
print("\n\n" + "=" * 60)
print("예제 2: 완료되는 순서대로 결과 받기")
print("=" * 60)
async def process_video(video_id: int, duration: float) -> Dict[str, Any]:
"""비디오를 처리 (처리 시간이 다름)"""
print(f" 🎬 비디오 {video_id} 처리 시작 (예상 {duration}초)")
await asyncio.sleep(duration)
result = {
'video_id': video_id,
'status': 'completed',
'processed_at': datetime.now().strftime('%H:%M:%S')
}
print(f" ✅ 비디오 {video_id} 처리 완료!")
return result
async def main_example2():
"""완료되는 대로 결과 받기 (실시간 처리)"""
videos = [
(1, 2.0), # 비디오 1: 2초 걸림
(2, 0.5), # 비디오 2: 0.5초 걸림
(3, 1.5), # 비디오 3: 1.5초 걸림
(4, 1.0), # 비디오 4: 1초 걸림
]
print("\n처리 시작:")
tasks = [process_video(vid, dur) for vid, dur in videos]
# 완료되는 순서대로 결과 받기
completed_count = 0
for coro in asyncio.as_completed(tasks):
result = await coro
completed_count += 1
print(f" 📦 [{completed_count}/{len(videos)}] 결과 수신: "
f"비디오 {result['video_id']} - {result['processed_at']}")
print("\n✅ 모든 비디오 처리 완료!")
asyncio.run(main_example2())예제 3: 큐를 사용한 Producer-Consumer 패턴
producer async와 consumer async들이 queue로 통신하면서 동시에 동작하는 예
import asyncio
import aiohttp
import time
from typing import List, Dict, Any
from dataclasses import dataclass
from datetime import datetime
print("\n\n" + "=" * 60)
print("예제 3: 큐를 사용한 비동기 워크플로우")
print("=" * 60)
async def producer(queue: asyncio.Queue, num_items: int):
"""작업을 생성하여 큐에 넣기"""
print("📤 Producer 시작")
for i in range(num_items):
item = {'task_id': i, 'data': f'Task-{i}'}
await queue.put(item)
print(f" ➕ 작업 추가: {item['data']}")
await asyncio.sleep(0.3) # 작업 생성 간격
# 종료 신호
await queue.put(None)
print("📤 Producer 완료")
async def consumer(queue: asyncio.Queue, consumer_id: int):
"""큐에서 작업을 가져와서 처리"""
print(f"📥 Consumer {consumer_id} 시작")
while True:
item = await queue.get()
if item is None:
# 종료 신호 수신
await queue.put(None) # 다른 consumer를 위해 다시 넣기
break
# 작업 처리
print(f" ⚙️ Consumer {consumer_id}: {item['data']} 처리 중...")
await asyncio.sleep(1) # 처리 시간
print(f" ✅ Consumer {consumer_id}: {item['data']} 완료")
queue.task_done()
print(f"📥 Consumer {consumer_id} 종료")
async def main_example3():
"""Producer-Consumer 패턴 실행"""
queue = asyncio.Queue(maxsize=5)
# Producer 1개, Consumer 3개 동시 실행
await asyncio.gather(
producer(queue, num_items=10),
consumer(queue, consumer_id=1),
consumer(queue, consumer_id=2),
consumer(queue, consumer_id=3)
)
print("\n✅ 모든 작업 완료!")
asyncio.run(main_example3())예제 4: 실전 API 요청 (aiohttp)
import asyncio
import aiohttp
import time
from typing import List, Dict, Any
from dataclasses import dataclass
from datetime import datetime
print("\n\n" + "=" * 60)
print("예제 4: 실전 HTTP API 요청")
print("=" * 60)
async def fetch_stock_price(session: aiohttp.ClientSession, ticker: str) -> Dict[str, Any]:
"""주식 가격 API 요청 (실제 HTTP 요청)"""
# 예시 API (실제로는 yfinance나 다른 API 사용)
url = f"https://api.example.com/stock/{ticker}"
try:
print(f" 🔄 {ticker} 데이터 요청 중...")
# 실제 HTTP 요청 (예시이므로 주석 처리)
# async with session.get(url) as response:
# data = await response.json()
# return {'ticker': ticker, 'price': data['price']}
# 시뮬레이션
await asyncio.sleep(0.5)
return {
'ticker': ticker,
'price': 100.0 + hash(ticker) % 50,
'timestamp': datetime.now().isoformat()
}
except Exception as e:
print(f" ❌ {ticker} 요청 실패: {e}")
return {'ticker': ticker, 'price': None, 'error': str(e)}
async def main_example4():
"""여러 주식 가격을 동시에 조회"""
tickers = ['AAPL', 'GOOGL', 'MSFT', 'TSLA', 'NVDA']
# aiohttp 세션 생성
async with aiohttp.ClientSession() as session:
print("📊 주식 가격 조회 시작\n")
# 모든 요청을 동시에 실행
tasks = [fetch_stock_price(session, ticker) for ticker in tickers]
results = await asyncio.gather(*tasks)
print("\n📈 조회 결과:")
for result in results:
if result['price']:
print(f" {result['ticker']}: ${result['price']:.2f}")
else:
print(f" {result['ticker']}: 조회 실패")
asyncio.run(main_example4())예제 5: 에러 처리와 재시도
import asyncio
import aiohttp
import time
from typing import List, Dict, Any
from dataclasses import dataclass
from datetime import datetime
print("\n\n" + "=" * 60)
print("예제 5: 에러 처리와 재시도 로직")
print("=" * 60)
async def fetch_with_retry(url: str, max_retries: int = 3) -> Dict[str, Any]:
"""재시도 로직이 있는 비동기 요청"""
for attempt in range(max_retries):
try:
print(f" 🔄 시도 {attempt + 1}/{max_retries}: {url}")
# 시뮬레이션: 30% 확률로 실패
await asyncio.sleep(0.5)
import random
if random.random() < 0.3:
raise Exception("Network error")
print(f" ✅ 성공: {url}")
return {'url': url, 'status': 'success', 'data': 'Some data'}
except Exception as e:
print(f" ⚠️ 실패: {url} - {e}")
if attempt == max_retries - 1:
print(f" ❌ 최종 실패: {url}")
return {'url': url, 'status': 'failed', 'error': str(e)}
# 재시도 전 대기 (exponential backoff)
wait_time = 2 ** attempt
print(f" ⏳ {wait_time}초 후 재시도...")
await asyncio.sleep(wait_time)
async def main_example5():
"""여러 URL을 재시도 로직과 함께 요청"""
urls = [
'https://api.example.com/data1',
'https://api.example.com/data2',
'https://api.example.com/data3',
]
print("🌐 API 요청 시작 (재시도 로직 포함)\n")
tasks = [fetch_with_retry(url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
print("\n📊 최종 결과:")
success = sum(1 for r in results if isinstance(r, dict) and r['status'] == 'success')
failed = len(results) - success
print(f" 성공: {success}, 실패: {failed}")
asyncio.run(main_example5())예제 6: 타임아웃 처리
wait_for를 통해 timeout 처리
import asyncio
import aiohttp
import time
from typing import List, Dict, Any
from dataclasses import dataclass
from datetime import datetime
print("\n\n" + "=" * 60)
print("예제 6: 타임아웃 처리")
print("=" * 60)
async def slow_operation(task_id: int, duration: float) -> str:
"""느린 작업 시뮬레이션"""
print(f" 🐌 작업 {task_id} 시작 ({duration}초 소요 예정)")
await asyncio.sleep(duration)
print(f" ✅ 작업 {task_id} 완료")
return f"Result-{task_id}"
async def main_example6():
"""타임아웃을 설정하여 작업 실행"""
tasks = [
slow_operation(1, 1.0), # 1초 (성공)
slow_operation(2, 3.0), # 3초 (타임아웃)
slow_operation(3, 0.5), # 0.5초 (성공)
]
print("⏱️ 2초 타임아웃으로 작업 실행\n")
try:
# 2초 타임아웃 설정
results = await asyncio.wait_for(
asyncio.gather(*tasks, return_exceptions=True),
timeout=2.0
)
print(f"\n✅ 모든 작업 완료: {results}")
except asyncio.TimeoutError:
print("\n⏰ 타임아웃 발생! 일부 작업이 완료되지 않음")
# 개별 작업에 타임아웃
print("\n\n개별 타임아웃:")
for i, task_coro in enumerate([slow_operation(i+10, d) for d in [1.0, 3.0, 0.5]], 1):
try:
result = await asyncio.wait_for(task_coro, timeout=2.0)
print(f" 작업 {i}: 성공 - {result}")
except asyncio.TimeoutError:
print(f" 작업 {i}: 타임아웃!")
asyncio.run(main_example6())예제 7: 실전 MLOps - 배치 추론
import asyncio
import aiohttp
import time
from typing import List, Dict, Any
from dataclasses import dataclass
from datetime import datetime
print("\n\n" + "=" * 60)
print("예제 7: MLOps - 비동기 배치 추론")
print("=" * 60)
@dataclass
class InferenceRequest:
request_id: str
video_path: str
model_name: str
@dataclass
class InferenceResult:
request_id: str
predictions: List[str]
confidence: float
processing_time: float
async def preprocess_video(request: InferenceRequest) -> Dict[str, Any]:
"""비디오 전처리 (비동기)"""
print(f" 🎬 [{request.request_id}] 전처리 시작: {request.video_path}")
await asyncio.sleep(0.5) # 전처리 시간
return {
'request_id': request.request_id,
'frames': list(range(10)), # 10개 프레임
'preprocessed': True
}
async def run_inference(preprocessed_data: Dict[str, Any], model_name: str) -> InferenceResult:
"""모델 추론 실행 (비동기)"""
request_id = preprocessed_data['request_id']
print(f" 🤖 [{request_id}] 추론 시작: {model_name}")
start_time = time.time()
await asyncio.sleep(1.0) # 추론 시간
processing_time = time.time() - start_time
return InferenceResult(
request_id=request_id,
predictions=['class_A', 'class_B'],
confidence=0.95,
processing_time=processing_time
)
async def process_inference_request(request: InferenceRequest) -> InferenceResult:
"""전체 추론 파이프라인"""
print(f"📥 [{request.request_id}] 요청 수신")
# 1. 전처리
preprocessed = await preprocess_video(request)
# 2. 추론
result = await run_inference(preprocessed, request.model_name)
print(f"✅ [{request.request_id}] 완료 - "
f"예측: {result.predictions}, 신뢰도: {result.confidence:.2f}")
return result
async def main_example7():
"""여러 추론 요청을 동시에 처리"""
requests = [
InferenceRequest('REQ-001', 'video1.mp4', 'ResNet50'),
InferenceRequest('REQ-002', 'video2.mp4', 'ResNet50'),
InferenceRequest('REQ-003', 'video3.mp4', 'EfficientNet'),
InferenceRequest('REQ-004', 'video4.mp4', 'ResNet50'),
]
print("🚀 배치 추론 시작\n")
start = time.time()
# 모든 요청을 동시에 처리
tasks = [process_inference_request(req) for req in requests]
# 완료되는 대로 결과 수집
results = []
for i, coro in enumerate(asyncio.as_completed(tasks), 1):
result = await coro
results.append(result)
print(f"\n📊 진행: {i}/{len(requests)} 완료")
total_time = time.time() - start
print(f"\n{'='*60}")
print(f"✅ 배치 추론 완료!")
print(f" 총 요청: {len(requests)}개")
print(f" 총 시간: {total_time:.2f}초")
print(f" 평균 처리 시간: {total_time/len(requests):.2f}초/요청")
print(f"{'='*60}")
asyncio.run(main_example7())asyncio.as_completed(tasks)는 tasks의 순서대로 대기하지 않습니다. 대신 태스크들이 먼저 완료되는 순서대로 코루틴을 반환(yield) 합니다.
위 코드에서 enumerate(asyncio.as_complated(tasks), 1)에서 i는 루푸의 반복 횟수를 나타냅니다. 즉, 1, 2, 3, 4 .. 늘 순차적이고 고정입니다. 하지만 각 i에서 처리되는 coro(즉, result)는 완료 순서대로 매핑됩니다.
예를들어 첫 번째 완료된 태스트는 i = 1로 처리하고 두 번째 완료된 태스크는 i = 2로 처리합니다.
단, 처리되는 요청의 순서는 원래 순서와 다를 수 있습니다.
📊 진행: 1/4 완료 # REQ-004 처리
📊 진행: 2/4 완료 # REQ-001 처리
📊 진행: 3/4 완료 # REQ-003 처리
📊 진행: 4/4 완료 # REQ-002 처리
'Programming > Python' 카테고리의 다른 글
| Go로 서버를 구성하는 것과 Python FastAPI와 asyncio로 서버를 구성하는 것의 성능 차이 (0) | 2025.10.26 |
|---|---|
| asyncio를 사용한 비동기 요청 처리 (0) | 2025.10.26 |
| Python set 사용 주의점 (0) | 2024.02.21 |
| numpy.power (0) | 2023.09.05 |
| numpy array (0) | 2023.09.05 |