← Back to Articles
Tutorial

Python Asyncio – Complete Asynchronous Programming Guide

Master Python asyncio with async/await, coroutines, event loops, tasks, and concurrent programming for I/O-bound applications.

📌 Python asyncio, async await, coroutines, event loop, asynchronous programming, concurrent Python

Asynchronous programming offers a different approach to concurrency. Instead of waiting for I/O operations to complete, asyncio allows your program to start an operation and switch to other tasks while waiting—maximizing CPU efficiency for I/O-bound workloads.

asyncio enables single-threaded concurrent code using coroutines with lower overhead than multithreading. The key is cooperative multitasking: tasks explicitly signal when ready to yield control, and the event loop manages all active tasks and waiting operations.

Two keywords unlock async programming: async def defines a coroutine (an asynchronous function that can be paused), and await waits for another coroutine to complete. Without await, async functions don't execute—you must await or schedule them on the event loop.

asyncio.run() is the high-level entry point for running async code (Python 3.7+). It creates the event loop, runs the coroutine, and handles cleanup. For concurrent execution, asyncio.create_task() wraps coroutines in Task objects that run independently.

The crucial distinction: async/await won't speed up CPU-bound tasks—they're specifically designed for I/O-bound operations like network requests, file operations, or database queries where waiting time dominates execution time.

Code Examples

Basic Async/Await Syntax

import asyncio

async def greet(name):
    print(f"Hello, {name}!")
    await asyncio.sleep(1)  # Non-blocking sleep
    print(f"Bye, {name}!")

async def main():
    # Runs sequentially (3 seconds total)
    await greet("Alice")
    await greet("Bob")

if __name__ == "__main__":
    asyncio.run(main())

Concurrent Execution with Tasks

import asyncio
import time

async def worker(name, delay):
    print(f"Task {name}: starting, will wait {delay} sec.")
    await asyncio.sleep(delay)
    print(f"Task {name}: finished.")
    return f"Result from {name}"

async def main():
    start = time.time()
    print("Starting concurrent tasks")

    # Create tasks for concurrent execution
    task1 = asyncio.create_task(worker("A", 2))
    task2 = asyncio.create_task(worker("B", 1))
    task3 = asyncio.create_task(worker("C", 3))

    # Wait for all tasks to complete
    results = await asyncio.gather(task1, task2, task3)

    print(f"All tasks finished in {time.time() - start:.2f} sec.")
    print(f"Results: {results}")

asyncio.run(main())
# Output: All tasks finished in ~3 sec (not 6!)

asyncio.run(main())

Handling Timeouts

import asyncio

async def slow_operation():
    await asyncio.sleep(5)
    return "Operation completed"

async def main():
    try:
        # Timeout after 2 seconds
        result = await asyncio.wait_for(
            slow_operation(),
            timeout=2.0
        )
        print(result)
    except asyncio.TimeoutError:
        print("Operation timed out!")

asyncio.run(main())

Async Context Managers

import asyncio

class AsyncConnection:
    async def __aenter__(self):
        print("Connecting...")
        await asyncio.sleep(0.1)
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("Closing connection...")
        await asyncio.sleep(0.1)

    async def query(self, sql):
        print(f"Executing: {sql}")
        await asyncio.sleep(0.5)
        return f"Results for: {sql}"

async def main():
    async with AsyncConnection() as conn:
        result = await conn.query("SELECT * FROM users")
        print(result)

asyncio.run(main())

Fetching Multiple URLs Concurrently

import asyncio
import time

async def fetch_url(session, url):
    """Simulate fetching a URL"""
    print(f"Fetching {url}...")
    await asyncio.sleep(1)  # Simulate network delay
    return f"Data from {url}"

async def main():
    urls = [
        "https://api.example.com/users",
        "https://api.example.com/posts",
        "https://api.example.com/comments",
    ]

    start = time.time()

    # Create tasks for all URLs
    tasks = [fetch_url(None, url) for url in urls]

    # Wait for all to complete
    results = await asyncio.gather(*tasks)

    print(f"Fetched {len(results)} URLs in {time.time() - start:.2f} sec")
    for result in results:
        print(f"  {result}")

asyncio.run(main())

Error Handling in Async Code

import asyncio

async def might_fail(should_fail):
    await asyncio.sleep(0.1)
    if should_fail:
        raise ValueError("Something went wrong!")
    return "Success!"

async def main():
    # Handle exceptions in async functions
    try:
        result = await might_fail(True)
    except ValueError as e:
        print(f"Caught error: {e}")

    # gather with return_exceptions=True
    results = await asyncio.gather(
        might_fail(False),
        might_fail(True),
        might_fail(False),
        return_exceptions=True
    )

    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"Task {i} failed: {result}")
        else:
            print(f"Task {i} succeeded: {result}")

asyncio.run(main())

More Python Tutorials