async-tokio.md 22 KB

Rust Async and Tokio Reference

Table of Contents

  1. tokio Runtime
  2. Spawn Tasks
  3. Select
  4. Channels
  5. Async Streams
  6. Timeouts and Sleep
  7. Async Traits
  8. Mutex Choice
  9. Graceful Shutdown
  10. Connection Pooling
  11. Test Async Code
  12. Common Async Mistakes

1. tokio Runtime

#[tokio::main]

// Multi-threaded (default): uses all CPU cores
#[tokio::main]
async fn main() -> anyhow::Result<()> {
    run().await
}

// Single-threaded: useful for tests or embedded contexts
#[tokio::main(flavor = "current_thread")]
async fn main() -> anyhow::Result<()> {
    run().await
}

// With worker count
#[tokio::main(worker_threads = 4)]
async fn main() -> anyhow::Result<()> {
    run().await
}

runtime::Builder

use tokio::runtime::Builder;

fn main() -> anyhow::Result<()> {
    let rt = Builder::new_multi_thread()
        .worker_threads(4)
        .thread_name("my-worker")
        .thread_stack_size(3 * 1024 * 1024)
        .enable_all()                 // enables both io and time drivers
        .build()?;

    rt.block_on(async {
        run().await
    })
}

// current_thread runtime for single-threaded executors
let rt = Builder::new_current_thread()
    .enable_all()
    .build()?;

runtime::Handle

use tokio::runtime::Handle;

// Obtain a handle from within an async context
let handle = Handle::current();

// Spawn onto the runtime from a sync context
std::thread::spawn(move || {
    handle.spawn(async { do_work().await });
    handle.block_on(async { do_sync_work().await });
});

// try_current(): returns None outside of a runtime
if let Ok(handle) = Handle::try_current() {
    handle.spawn(async { background_task().await });
}

2. Spawn Tasks

tokio::spawn and JoinHandle

use tokio::task::JoinHandle;

async fn run() {
    // spawn returns JoinHandle<T>
    let handle: JoinHandle<u32> = tokio::spawn(async {
        compute().await
    });

    // await the result — JoinHandle<T> returns Result<T, JoinError>
    match handle.await {
        Ok(value) => println!("got {value}"),
        Err(e) if e.is_panic() => eprintln!("task panicked"),
        Err(e) => eprintln!("task cancelled: {e}"),
    }
}

JoinSet for Multiple Tasks

use tokio::task::JoinSet;

async fn fetch_all(urls: Vec<String>) -> Vec<Result<String, reqwest::Error>> {
    let mut set = JoinSet::new();

    for url in urls {
        set.spawn(async move { reqwest::get(&url).await?.text().await });
    }

    let mut results = Vec::new();
    while let Some(res) = set.join_next().await {
        results.push(res.expect("task panicked"));
    }
    results
}

// Abort all remaining tasks when JoinSet drops (or explicitly)
set.abort_all();

abort

let handle = tokio::spawn(async {
    loop {
        do_work().await;
    }
});

// Cancel the task
handle.abort();

// abort() is best-effort: the task must be at an await point
// Check completion after abort
match handle.await {
    Err(e) if e.is_cancelled() => println!("task was cancelled"),
    _ => {}
}

spawn_blocking for CPU Work

// Never block the async executor — offload CPU work to a thread pool
async fn hash_password(password: String) -> String {
    tokio::task::spawn_blocking(move || {
        bcrypt::hash(&password, 12).unwrap()
    })
    .await
    .expect("blocking task panicked")
}

// spawn_blocking has a default limit of 512 threads
// For unbounded work, consider a dedicated rayon thread pool
async fn process_image(data: Vec<u8>) -> Vec<u8> {
    tokio::task::spawn_blocking(move || {
        rayon_heavy_transform(data)
    })
    .await
    .unwrap()
}

3. Select

tokio::select! Basics

use tokio::select;

async fn race() -> &'static str {
    select! {
        result = task_a() => {
            println!("A won: {result:?}");
            "a"
        }
        result = task_b() => {
            println!("B won: {result:?}");
            "b"
        }
    }
    // Unselected branch future is dropped immediately
}

biased for Deterministic Priority

select! {
    biased;  // arms checked top-to-bottom, not randomly

    // shutdown takes priority over incoming work
    _ = shutdown_signal() => {
        cleanup().await;
    }
    msg = receiver.recv() => {
        process(msg).await;
    }
}

Cancellation Safety

// Only use futures that are cancellation-safe in select!
// Safe: recv(), accept(), sleep(), read_line()
// NOT safe: read_to_end(), write_all() (partial progress is lost)

// For non-cancellation-safe futures, pin them and reuse
let mut read_future = Box::pin(file.read_to_end(&mut buf));

loop {
    select! {
        result = &mut read_future => {
            // Only enters here when done, previous progress preserved
            break result;
        }
        _ = shutdown.recv() => {
            break Err(io::Error::new(io::ErrorKind::Interrupted, "shutdown"));
        }
    }
}

loop + select Pattern

async fn event_loop(
    mut rx: mpsc::Receiver<Message>,
    mut shutdown: broadcast::Receiver<()>,
) {
    loop {
        select! {
            Some(msg) = rx.recv() => {
                handle_message(msg).await;
            }
            _ = shutdown.recv() => {
                tracing::info!("shutting down event loop");
                break;
            }
            else => {
                // All branches are disabled (channels closed)
                break;
            }
        }
    }
}

4. Channels

mpsc — Multi-Producer, Single-Consumer

use tokio::sync::mpsc;

// Bounded channel (backpressure built-in)
let (tx, mut rx) = mpsc::channel::<String>(32);

// Clone the sender for multiple producers
let tx2 = tx.clone();

tokio::spawn(async move {
    tx.send("hello".to_string()).await.unwrap();
});
tokio::spawn(async move {
    tx2.send("world".to_string()).await.unwrap();
});

while let Some(msg) = rx.recv().await {
    println!("received: {msg}");
}
// recv() returns None when all senders are dropped

// Unbounded channel (no backpressure — use carefully)
let (tx, mut rx) = mpsc::unbounded_channel::<String>();
tx.send("hello".to_string()).unwrap();  // non-async send

oneshot — Single Value

use tokio::sync::oneshot;

// One sender, one receiver, one value
let (tx, rx) = oneshot::channel::<u64>();

tokio::spawn(async move {
    let result = compute().await;
    tx.send(result).ok();  // ok() because receiver might have dropped
});

match rx.await {
    Ok(value) => println!("computed: {value}"),
    Err(_) => println!("sender dropped before sending"),
}

// Common pattern: request-response over a channel
struct Request {
    data: Vec<u8>,
    reply: oneshot::Sender<Result<Vec<u8>, Error>>,
}

broadcast — Single-Producer, Multi-Consumer

use tokio::sync::broadcast;

// All active receivers get every message
let (tx, mut rx1) = broadcast::channel::<String>(16);
let mut rx2 = tx.subscribe();

tx.send("announcement".to_string()).unwrap();

// Each receiver gets its own copy
let msg1 = rx1.recv().await.unwrap();
let msg2 = rx2.recv().await.unwrap();

// Lagged receiver: if receiver falls behind capacity, it gets Err(Lagged(n))
match rx1.recv().await {
    Ok(msg) => handle(msg),
    Err(broadcast::error::RecvError::Lagged(n)) => {
        tracing::warn!("missed {n} messages, resyncing");
    }
    Err(broadcast::error::RecvError::Closed) => break,
}

watch — Single Writer, Multi-Reader (Latest Value)

use tokio::sync::watch;

// Only the most recent value is retained
let (tx, rx) = watch::channel(Config::default());

// Writer updates the shared value
tokio::spawn(async move {
    loop {
        let new_config = reload_config().await;
        tx.send(new_config).unwrap();
        tokio::time::sleep(Duration::from_secs(30)).await;
    }
});

// Readers clone a receiver and watch for changes
tokio::spawn(async move {
    let mut rx = rx;
    loop {
        rx.changed().await.unwrap();  // waits for a new value
        let config = rx.borrow_and_update().clone();
        apply_config(config).await;
    }
});

5. Async Streams

Stream Trait and StreamExt

use futures::StreamExt;  // or tokio_stream::StreamExt

// Streams are async iterators
async fn process_stream<S>(mut stream: S)
where
    S: futures::Stream<Item = Event> + Unpin,
{
    while let Some(event) = stream.next().await {
        handle(event).await;
    }

    // StreamExt combinators
    stream.map(|e| transform(e))
          .filter(|e| futures::future::ready(e.important))
          .take(100)
          .for_each(|e| async move { handle(e).await })
          .await;
}

tokio_stream

use tokio_stream::{self as stream, StreamExt};

// Stream from iterator
let s = stream::iter(vec![1, 2, 3]);

// Stream with delay between items
let s = stream::iter(vec![1, 2, 3])
    .throttle(Duration::from_millis(100));

// Merge streams
let s = stream::select(stream_a, stream_b);

// Wrap a channel receiver as a stream
let stream = tokio_stream::wrappers::ReceiverStream::new(rx);
let stream = tokio_stream::wrappers::BroadcastStream::new(rx);
let stream = tokio_stream::wrappers::WatchStream::new(rx);

Create Streams with async_stream

use async_stream::stream;

fn paginate(client: Client, query: Query) -> impl futures::Stream<Item = Record> {
    stream! {
        let mut cursor = None;
        loop {
            let page = client.fetch(query.clone(), cursor).await.unwrap();
            for record in page.records {
                yield record;
            }
            match page.next_cursor {
                Some(c) => cursor = Some(c),
                None => break,
            }
        }
    }
}

6. Timeouts and Sleep

tokio::time::sleep

use tokio::time::{sleep, Duration};

// Non-blocking sleep
sleep(Duration::from_secs(1)).await;

// Sleep until a specific instant
use tokio::time::Instant;
sleep(Instant::now() + Duration::from_millis(500) - Instant::now()).await;

timeout

use tokio::time::timeout;

match timeout(Duration::from_secs(5), fetch_data()).await {
    Ok(Ok(data)) => process(data),
    Ok(Err(e)) => handle_error(e),
    Err(_elapsed) => eprintln!("request timed out"),
}

// timeout returns Err(Elapsed) on timeout
// The inner future is cancelled when timeout fires

interval

use tokio::time::{interval, MissedTickBehavior};

async fn heartbeat() {
    let mut ticker = interval(Duration::from_secs(10));
    ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);

    loop {
        ticker.tick().await;
        send_heartbeat().await;
    }
}

// MissedTickBehavior options:
// Burst (default): catch up all missed ticks immediately
// Skip: skip missed ticks, tick at next aligned interval
// Delay: delay next tick by full interval from now

Instant

use tokio::time::Instant;

let start = Instant::now();
do_work().await;
let elapsed = start.elapsed();
tracing::info!(?elapsed, "work completed");

7. Async Traits

RPITIT (Rust 1.75+, Preferred)

// Async functions in traits work natively since Rust 1.75
pub trait DataStore {
    async fn get(&self, key: &str) -> Option<String>;
    async fn set(&self, key: &str, value: String) -> Result<(), Error>;
}

impl DataStore for RedisStore {
    async fn get(&self, key: &str) -> Option<String> {
        self.client.get(key).await.ok()
    }

    async fn set(&self, key: &str, value: String) -> Result<(), Error> {
        self.client.set(key, value).await?;
        Ok(())
    }
}

trait_variant for dyn Compatibility

// Native async traits are not dyn-safe by default
// Use trait_variant for trait objects
use trait_variant::make;

#[make(SendDataStore: Send)]
pub trait DataStore {
    async fn get(&self, key: &str) -> Option<String>;
}

// Now use dyn SendDataStore for boxed trait objects
fn make_store() -> Box<dyn SendDataStore> {
    Box::new(RedisStore::new())
}

async-trait Crate (Pre-1.75 or dyn-safe)

use async_trait::async_trait;

#[async_trait]
pub trait Handler: Send + Sync {
    async fn handle(&self, req: Request) -> Response;
}

#[async_trait]
impl Handler for MyHandler {
    async fn handle(&self, req: Request) -> Response {
        process(req).await
    }
}

// async_trait boxes the returned future automatically
// Zero-cost in practice but adds a heap allocation per call

Manual Approach (Maximum Control)

use std::future::Future;
use std::pin::Pin;

pub trait Handler: Send + Sync {
    fn handle<'a>(
        &'a self,
        req: Request,
    ) -> Pin<Box<dyn Future<Output = Response> + Send + 'a>>;
}

impl Handler for MyHandler {
    fn handle<'a>(&'a self, req: Request) -> Pin<Box<dyn Future<Output = Response> + Send + 'a>> {
        Box::pin(async move { process(&self.state, req).await })
    }
}

8. Mutex Choice

tokio::sync::Mutex vs std::sync::Mutex

// Use std::sync::Mutex when:
// - Lock is held only for synchronous operations (no .await inside lock)
// - Lock contention is low
// - You want lower overhead

use std::sync::Mutex;

struct Cache {
    inner: Mutex<HashMap<String, String>>,
}

impl Cache {
    fn get(&self, key: &str) -> Option<String> {
        self.inner.lock().unwrap().get(key).cloned()
    }

    async fn get_or_fetch(&self, key: &str) -> String {
        if let Some(v) = self.get(key) {
            return v;
        }
        let value = fetch(key).await;  // lock NOT held during await
        self.inner.lock().unwrap().insert(key.to_string(), value.clone());
        value
    }
}
// Use tokio::sync::Mutex when:
// - You need to hold the lock across .await points
// - Multiple async tasks contend and fairness matters

use tokio::sync::Mutex;

struct Connection {
    inner: Mutex<TcpStream>,
}

impl Connection {
    async fn send_and_receive(&self, data: &[u8]) -> Vec<u8> {
        let mut conn = self.inner.lock().await;  // lock held across awaits
        conn.write_all(data).await.unwrap();
        let mut buf = vec![0u8; 1024];
        conn.read(&mut buf).await.unwrap();
        buf
    }
}

RwLock

use tokio::sync::RwLock;

struct Config {
    data: RwLock<ConfigData>,
}

impl Config {
    async fn get(&self) -> ConfigData {
        self.data.read().await.clone()  // multiple concurrent readers
    }

    async fn update(&self, new: ConfigData) {
        *self.data.write().await = new;  // exclusive writer
    }
}

// RwLock can starve writers if readers are constant — monitor in practice

9. Graceful Shutdown

Signal Handling

use tokio::signal;

async fn wait_for_shutdown() {
    let ctrl_c = async {
        signal::ctrl_c().await.expect("failed to install Ctrl+C handler");
    };

    #[cfg(unix)]
    let sigterm = async {
        signal::unix::signal(signal::unix::SignalKind::terminate())
            .expect("failed to install SIGTERM handler")
            .recv()
            .await;
    };

    #[cfg(not(unix))]
    let sigterm = std::future::pending::<()>();

    tokio::select! {
        _ = ctrl_c => {},
        _ = sigterm => {},
    }
    tracing::info!("shutdown signal received");
}

CancellationToken

use tokio_util::sync::CancellationToken;

#[tokio::main]
async fn main() {
    let token = CancellationToken::new();

    // Give child tasks a clone
    let worker_token = token.child_token();
    tokio::spawn(async move {
        select! {
            _ = worker_token.cancelled() => {
                tracing::info!("worker shutting down");
            }
            _ = do_work() => {}
        }
    });

    // Trigger shutdown on signal
    wait_for_shutdown().await;
    token.cancel();

    // Give tasks time to drain
    tokio::time::sleep(Duration::from_secs(5)).await;
}

Draining Connections and Shutdown Sequence

async fn shutdown(
    server: Server,
    mut rx: mpsc::Receiver<()>,
    token: CancellationToken,
) {
    // 1. Stop accepting new connections
    server.stop_accepting();

    // 2. Signal all workers
    token.cancel();

    // 3. Wait for in-flight requests (with timeout)
    let drain = timeout(Duration::from_secs(30), server.drain());
    match drain.await {
        Ok(_) => tracing::info!("clean shutdown"),
        Err(_) => tracing::warn!("shutdown timeout: forcing exit"),
    }

    // 4. Flush telemetry, close DB pools, etc.
    flush_telemetry().await;
}

10. Connection Pooling

sqlx Pool

use sqlx::postgres::PgPoolOptions;

let pool = PgPoolOptions::new()
    .max_connections(20)
    .min_connections(2)
    .acquire_timeout(Duration::from_secs(5))
    .idle_timeout(Duration::from_secs(600))
    .connect("postgres://user:pass@localhost/db")
    .await?;

// Clone the pool cheaply — it's Arc internally
async fn get_user(pool: &sqlx::PgPool, id: i64) -> sqlx::Result<User> {
    sqlx::query_as!(User, "SELECT * FROM users WHERE id = $1", id)
        .fetch_one(pool)
        .await
}

reqwest Client Reuse

use reqwest::Client;

// Build once, clone cheaply (Arc internally)
let client = Client::builder()
    .timeout(Duration::from_secs(30))
    .pool_max_idle_per_host(10)
    .connection_verbose(false)
    .build()?;

// Share via state, not by creating new clients per request
#[derive(Clone)]
struct AppState {
    http: Client,
    db: sqlx::PgPool,
}

bb8 Generic Pool

use bb8::Pool;
use bb8_redis::RedisConnectionManager;

let manager = RedisConnectionManager::new("redis://localhost")?;
let pool = Pool::builder()
    .max_size(15)
    .min_idle(Some(2))
    .connection_timeout(Duration::from_secs(3))
    .build(manager)
    .await?;

let mut conn = pool.get().await?;
redis::cmd("SET").arg("key").arg("value").query_async(&mut *conn).await?;

11. Test Async Code

#[tokio::test]

#[tokio::test]
async fn test_fetch_user() {
    let db = setup_test_db().await;
    let user = db.get_user(1).await.unwrap();
    assert_eq!(user.name, "Alice");
}

// Multi-thread flavor for concurrency tests
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_concurrent_writes() {
    let state = Arc::new(SharedState::new());
    let handles: Vec<_> = (0..10)
        .map(|i| {
            let s = state.clone();
            tokio::spawn(async move { s.write(i).await })
        })
        .collect();
    futures::future::join_all(handles).await;
    assert_eq!(state.count().await, 10);
}

Mock Time with tokio::time::pause

#[tokio::test]
async fn test_timeout_behavior() {
    tokio::time::pause();  // freeze time

    let result = tokio::spawn(async {
        timeout(Duration::from_secs(5), slow_operation()).await
    });

    // Advance time without actually waiting
    tokio::time::advance(Duration::from_secs(6)).await;

    let outcome = result.await.unwrap();
    assert!(outcome.is_err(), "should have timed out");
}

Testing Channels

#[tokio::test]
async fn test_message_processing() {
    let (tx, rx) = mpsc::channel(8);
    let processor = Processor::new(rx);

    let handle = tokio::spawn(processor.run());

    tx.send(Message::Ping).await.unwrap();
    tx.send(Message::Stop).await.unwrap();
    drop(tx);

    handle.await.unwrap();
}

// Test that a task completes within expected time
#[tokio::test]
async fn test_completes_quickly() {
    let result = timeout(Duration::from_millis(100), fast_task()).await;
    assert!(result.is_ok(), "task took too long");
}

12. Common Async Mistakes

Block in Async Context

// BAD: blocks the executor thread, starves other tasks
async fn hash(password: &str) -> String {
    bcrypt::hash(password, 12).unwrap()  // CPU-intensive, blocks
}

// GOOD: offload to blocking thread pool
async fn hash(password: String) -> String {
    tokio::task::spawn_blocking(move || {
        bcrypt::hash(&password, 12).unwrap()
    })
    .await
    .unwrap()
}

// BAD: blocking sleep
async fn wait() {
    std::thread::sleep(Duration::from_secs(1));  // blocks executor
}

// GOOD:
async fn wait() {
    tokio::time::sleep(Duration::from_secs(1)).await;
}

Hold std::Mutex Across .await

// BAD: MutexGuard (which is not Send) held across await point
// This will fail to compile if the future must be Send
async fn bad_update(state: Arc<Mutex<State>>) {
    let mut guard = state.lock().unwrap();
    guard.count += 1;
    do_async_work().await;  // guard still held — not Send!
    guard.finalize();
}

// GOOD: release lock before await
async fn good_update(state: Arc<Mutex<State>>) {
    {
        let mut guard = state.lock().unwrap();
        guard.count += 1;
    }  // guard dropped here
    do_async_work().await;
    {
        let mut guard = state.lock().unwrap();
        guard.finalize();
    }
}

// ALTERNATIVE: use tokio::sync::Mutex if you need to hold across awaits

Missing Send Bounds

// Task spawned with tokio::spawn must be Send + 'static
// This fails if you capture non-Send types (like Rc, RefCell)
let rc = Rc::new(5);
tokio::spawn(async move {
    println!("{}", rc);  // ERROR: Rc is not Send
});

// GOOD: use Arc instead of Rc
let arc = Arc::new(5);
tokio::spawn(async move {
    println!("{}", arc);  // OK
});

Forget to Drive Futures

// BAD: creating a future without awaiting it — nothing happens
async fn fire_and_maybe_forget() {
    let future = send_email("user@example.com");  // not awaited, not spawned
    // future is dropped, email never sent
}

// GOOD: either await or spawn
async fn fire_and_actually_do_it() {
    // Option 1: await (sequential)
    send_email("user@example.com").await.unwrap();

    // Option 2: spawn (concurrent, detached)
    tokio::spawn(async { send_email("user@example.com").await.ok() });
}

Unbounded Spawning

// BAD: spawning one task per item with no limit — OOM on large input
async fn process_all(items: Vec<Item>) {
    for item in items {
        tokio::spawn(async move { process(item).await });
    }
}

// GOOD: use JoinSet with capacity limit or a semaphore
use tokio::sync::Semaphore;

async fn process_all(items: Vec<Item>) {
    let sem = Arc::new(Semaphore::new(50));  // max 50 concurrent tasks
    let mut set = JoinSet::new();

    for item in items {
        let permit = sem.clone().acquire_owned().await.unwrap();
        set.spawn(async move {
            let _permit = permit;  // released when task ends
            process(item).await
        });
    }

    while set.join_next().await.is_some() {}
}