Master Python asyncio with async/await, coroutines, event loops, tasks, and concurrent programming for I/O-bound applications.
📌 Python asyncio, async await, coroutines, event loop, asynchronous programming, concurrent Python
Asynchronous programming offers a different approach to concurrency. Instead of waiting for I/O operations to complete, asyncio allows your program to start an operation and switch to other tasks while waiting—maximizing CPU efficiency for I/O-bound workloads.
asyncio enables single-threaded concurrent code using coroutines with lower overhead than multithreading. The key is cooperative multitasking: tasks explicitly signal when ready to yield control, and the event loop manages all active tasks and waiting operations.
Two keywords unlock async programming: async def defines a coroutine (an asynchronous function that can be paused), and await waits for another coroutine to complete. Without await, async functions don't execute—you must await or schedule them on the event loop.
asyncio.run() is the high-level entry point for running async code (Python 3.7+). It creates the event loop, runs the coroutine, and handles cleanup. For concurrent execution, asyncio.create_task() wraps coroutines in Task objects that run independently.
The crucial distinction: async/await won't speed up CPU-bound tasks—they're specifically designed for I/O-bound operations like network requests, file operations, or database queries where waiting time dominates execution time.
import asyncio
async def greet(name):
print(f"Hello, {name}!")
await asyncio.sleep(1) # Non-blocking sleep
print(f"Bye, {name}!")
async def main():
# Runs sequentially (3 seconds total)
await greet("Alice")
await greet("Bob")
if __name__ == "__main__":
asyncio.run(main())import asyncio
import time
async def worker(name, delay):
print(f"Task {name}: starting, will wait {delay} sec.")
await asyncio.sleep(delay)
print(f"Task {name}: finished.")
return f"Result from {name}"
async def main():
start = time.time()
print("Starting concurrent tasks")
# Create tasks for concurrent execution
task1 = asyncio.create_task(worker("A", 2))
task2 = asyncio.create_task(worker("B", 1))
task3 = asyncio.create_task(worker("C", 3))
# Wait for all tasks to complete
results = await asyncio.gather(task1, task2, task3)
print(f"All tasks finished in {time.time() - start:.2f} sec.")
print(f"Results: {results}")
asyncio.run(main())
# Output: All tasks finished in ~3 sec (not 6!)
asyncio.run(main())import asyncio
async def slow_operation():
await asyncio.sleep(5)
return "Operation completed"
async def main():
try:
# Timeout after 2 seconds
result = await asyncio.wait_for(
slow_operation(),
timeout=2.0
)
print(result)
except asyncio.TimeoutError:
print("Operation timed out!")
asyncio.run(main())import asyncio
class AsyncConnection:
async def __aenter__(self):
print("Connecting...")
await asyncio.sleep(0.1)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("Closing connection...")
await asyncio.sleep(0.1)
async def query(self, sql):
print(f"Executing: {sql}")
await asyncio.sleep(0.5)
return f"Results for: {sql}"
async def main():
async with AsyncConnection() as conn:
result = await conn.query("SELECT * FROM users")
print(result)
asyncio.run(main())import asyncio
import time
async def fetch_url(session, url):
"""Simulate fetching a URL"""
print(f"Fetching {url}...")
await asyncio.sleep(1) # Simulate network delay
return f"Data from {url}"
async def main():
urls = [
"https://api.example.com/users",
"https://api.example.com/posts",
"https://api.example.com/comments",
]
start = time.time()
# Create tasks for all URLs
tasks = [fetch_url(None, url) for url in urls]
# Wait for all to complete
results = await asyncio.gather(*tasks)
print(f"Fetched {len(results)} URLs in {time.time() - start:.2f} sec")
for result in results:
print(f" {result}")
asyncio.run(main())import asyncio
async def might_fail(should_fail):
await asyncio.sleep(0.1)
if should_fail:
raise ValueError("Something went wrong!")
return "Success!"
async def main():
# Handle exceptions in async functions
try:
result = await might_fail(True)
except ValueError as e:
print(f"Caught error: {e}")
# gather with return_exceptions=True
results = await asyncio.gather(
might_fail(False),
might_fail(True),
might_fail(False),
return_exceptions=True
)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Task {i} failed: {result}")
else:
print(f"Task {i} succeeded: {result}")
asyncio.run(main())