Tokio Rust: Guia Completo Async Runtime | Rust Brasil

Guia completo do Tokio em Rust: tasks, channels, I/O async, timers, select! e runtime multi-threaded. Referência prática.

Introdução

Tokio é o runtime assíncrono mais popular e amplamente utilizado do ecossistema Rust. Ele fornece a infraestrutura necessária para escrever aplicações assíncronas confiáveis e de alta performance: um scheduler multi-thread, I/O assíncrono, timers, canais de comunicação e primitivas de sincronização.

Criado e mantido pela equipe Tokio (que também mantém Axum, Hyper, Tonic e Tower), o Tokio é a base sobre a qual a maioria dos frameworks e bibliotecas async de Rust são construídos — incluindo Axum, Reqwest, SQLx, Tonic e muitos outros.

Neste guia completo, vamos explorar os principais componentes do Tokio com exemplos práticos.

Dependências no Cargo.toml

[dependencies]
tokio = { version = "1", features = ["full"] }

Para produção, é recomendado habilitar apenas as features necessárias:

[dependencies]
tokio = { version = "1", features = [
    "rt-multi-thread",  # Runtime multi-thread
    "macros",           # #[tokio::main] e #[tokio::test]
    "net",              # TCP, UDP, Unix sockets
    "io-util",          # AsyncRead, AsyncWrite, BufReader
    "time",             # Sleep, timeout, interval
    "sync",             # Mutex, RwLock, Semaphore, channels
    "fs",               # Operações de arquivo async
    "signal",           # Captura de sinais (SIGINT, etc)
    "process",          # Processos async
] }

O Macro #[tokio::main]

Todo programa Tokio começa com o macro #[tokio::main], que inicializa o runtime:

#[tokio::main]
async fn main() {
    println!("Rodando no Tokio!");
}

Isso é equivalente a:

fn main() {
    let rt = tokio::runtime::Runtime::new().unwrap();
    rt.block_on(async {
        println!("Rodando no Tokio!");
    });
}

Para um runtime single-thread (útil em testes ou aplicações leves):

#[tokio::main(flavor = "current_thread")]
async fn main() {
    println!("Runtime single-thread!");
}

Spawning de Tasks

O tokio::spawn cria tasks leves que rodam concorrentemente no scheduler:

use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    // Spawn retorna um JoinHandle
    let handle1 = tokio::spawn(async {
        sleep(Duration::from_millis(200)).await;
        println!("Task 1 concluída");
        10
    });

    let handle2 = tokio::spawn(async {
        sleep(Duration::from_millis(100)).await;
        println!("Task 2 concluída");
        20
    });

    // Aguardar resultados
    let r1 = handle1.await.unwrap();
    let r2 = handle2.await.unwrap();
    println!("Resultados: {r1} + {r2} = {}", r1 + r2);
}

Spawn com Dados Compartilhados

use std::sync::Arc;
use tokio::sync::Mutex;

#[tokio::main]
async fn main() {
    let contador = Arc::new(Mutex::new(0u64));
    let mut handles = vec![];

    for i in 0..10 {
        let contador = Arc::clone(&contador);
        let handle = tokio::spawn(async move {
            let mut lock = contador.lock().await;
            *lock += 1;
            println!("Task {i}: contador = {}", *lock);
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.await.unwrap();
    }

    let total = *contador.lock().await;
    println!("Total final: {total}");
}

JoinSet para Múltiplas Tasks

use tokio::task::JoinSet;
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    let mut set = JoinSet::new();

    for i in 0..5 {
        set.spawn(async move {
            sleep(Duration::from_millis(100 * (5 - i))).await;
            format!("resultado de task {i}")
        });
    }

    // Receber resultados na ordem de conclusão
    while let Some(resultado) = set.join_next().await {
        match resultado {
            Ok(valor) => println!("Concluído: {valor}"),
            Err(e) => eprintln!("Erro: {e}"),
        }
    }
}

select!: Concorrência com Múltiplos Futures

O macro select! permite aguardar múltiplos futures simultaneamente e reagir ao primeiro que completar:

use tokio::sync::mpsc;
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::channel::<String>(32);

    // Produtor que envia mensagens
    tokio::spawn(async move {
        sleep(Duration::from_secs(2)).await;
        tx.send("mensagem recebida!".to_string()).await.unwrap();
    });

    // select! entre múltiplas operações
    loop {
        tokio::select! {
            Some(msg) = rx.recv() => {
                println!("Canal: {msg}");
                break;
            }
            _ = sleep(Duration::from_secs(1)) => {
                println!("Ainda aguardando...");
            }
        }
    }
}

Select com Cancelamento Gracioso

use tokio::signal;
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    println!("Servidor iniciado. Pressione Ctrl+C para parar.");

    let servidor = async {
        loop {
            // Simular processamento
            sleep(Duration::from_secs(1)).await;
            println!("Processando...");
        }
    };

    tokio::select! {
        _ = servidor => {
            println!("Servidor finalizou");
        }
        _ = signal::ctrl_c() => {
            println!("\nRecebido Ctrl+C, encerrando graciosamente...");
        }
    }
}

Channels: Comunicação entre Tasks

O Tokio oferece quatro tipos de canais:

mpsc (Multiple Producer, Single Consumer)

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::channel::<String>(100);

    // Múltiplos produtores
    for i in 0..3 {
        let tx = tx.clone();
        tokio::spawn(async move {
            for j in 0..3 {
                let msg = format!("produtor {i}, mensagem {j}");
                tx.send(msg).await.unwrap();
            }
        });
    }

    // Dropar o tx original para que o canal feche quando os clones forem dropados
    drop(tx);

    // Consumidor
    while let Some(msg) = rx.recv().await {
        println!("Recebido: {msg}");
    }
    println!("Canal fechado");
}

broadcast (Multiple Producer, Multiple Consumer)

use tokio::sync::broadcast;

#[tokio::main]
async fn main() {
    let (tx, _) = broadcast::channel::<String>(16);

    let mut rx1 = tx.subscribe();
    let mut rx2 = tx.subscribe();

    tokio::spawn(async move {
        while let Ok(msg) = rx1.recv().await {
            println!("[Assinante 1] {msg}");
        }
    });

    tokio::spawn(async move {
        while let Ok(msg) = rx2.recv().await {
            println!("[Assinante 2] {msg}");
        }
    });

    tx.send("evento global!".to_string()).unwrap();
    tx.send("outro evento!".to_string()).unwrap();

    tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}

watch (Single Producer, Multiple Consumer — último valor)

use tokio::sync::watch;
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    let (tx, mut rx) = watch::channel("inicial".to_string());

    // Observador
    tokio::spawn(async move {
        while rx.changed().await.is_ok() {
            println!("Novo estado: {}", *rx.borrow());
        }
    });

    // Atualizador
    for estado in ["carregando", "pronto", "processando", "concluído"] {
        sleep(Duration::from_millis(500)).await;
        tx.send(estado.to_string()).unwrap();
    }

    sleep(Duration::from_millis(100)).await;
}

oneshot (Single use)

use tokio::sync::oneshot;

#[tokio::main]
async fn main() {
    let (tx, rx) = oneshot::channel::<String>();

    tokio::spawn(async move {
        // Simular trabalho
        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
        tx.send("resultado pronto!".to_string()).unwrap();
    });

    match rx.await {
        Ok(resultado) => println!("Recebido: {resultado}"),
        Err(_) => println!("O produtor foi dropado"),
    }
}

I/O Assíncrono

TCP

use tokio::net::{TcpListener, TcpStream};
use tokio::io::{AsyncReadExt, AsyncWriteExt};

async fn tratar_conexao(mut stream: TcpStream) {
    let mut buf = [0u8; 1024];

    loop {
        let n = match stream.read(&mut buf).await {
            Ok(0) => return, // Conexão fechada
            Ok(n) => n,
            Err(e) => {
                eprintln!("Erro ao ler: {e}");
                return;
            }
        };

        // Echo: devolver os bytes recebidos
        if let Err(e) = stream.write_all(&buf[..n]).await {
            eprintln!("Erro ao escrever: {e}");
            return;
        }
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let listener = TcpListener::bind("127.0.0.1:8080").await?;
    println!("Servidor escutando na porta 8080");

    loop {
        let (stream, addr) = listener.accept().await?;
        println!("Nova conexão: {addr}");
        tokio::spawn(tratar_conexao(stream));
    }
}

Arquivos

use tokio::fs;
use tokio::io::AsyncWriteExt;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Escrever arquivo
    fs::write("exemplo.txt", "Conteúdo do arquivo").await?;

    // Ler arquivo
    let conteudo = fs::read_to_string("exemplo.txt").await?;
    println!("Conteúdo: {conteudo}");

    // Append com file handle
    let mut arquivo = fs::OpenOptions::new()
        .append(true)
        .open("exemplo.txt")
        .await?;
    arquivo.write_all(b"\nNova linha!").await?;

    // Listar diretório
    let mut entries = fs::read_dir(".").await?;
    while let Some(entry) = entries.next_entry().await? {
        println!("{}", entry.file_name().to_string_lossy());
    }

    Ok(())
}

Timers

use tokio::time::{sleep, timeout, interval, Instant, Duration};

#[tokio::main]
async fn main() {
    // Sleep
    println!("Dormindo 1 segundo...");
    sleep(Duration::from_secs(1)).await;

    // Timeout
    let resultado = timeout(Duration::from_millis(500), async {
        sleep(Duration::from_millis(200)).await;
        42
    }).await;
    println!("Timeout resultado: {resultado:?}"); // Ok(42)

    // Interval
    let mut intervalo = interval(Duration::from_millis(500));
    let inicio = Instant::now();

    for _ in 0..5 {
        intervalo.tick().await;
        println!("Tick em {:?}", inicio.elapsed());
    }

    // Medir duração
    let inicio = Instant::now();
    sleep(Duration::from_millis(100)).await;
    println!("Operação levou: {:?}", inicio.elapsed());
}

Primitivas de Sincronização

Mutex e RwLock

use std::sync::Arc;
use tokio::sync::{Mutex, RwLock};

#[tokio::main]
async fn main() {
    // Mutex: acesso exclusivo
    let dados = Arc::new(Mutex::new(vec![1, 2, 3]));

    let dados_clone = dados.clone();
    tokio::spawn(async move {
        let mut lock = dados_clone.lock().await;
        lock.push(4);
    }).await.unwrap();

    println!("Dados: {:?}", dados.lock().await);

    // RwLock: múltiplos leitores, um escritor
    let config = Arc::new(RwLock::new(String::from("valor_inicial")));

    // Leitura (múltiplas simultâneas permitidas)
    let config_r = config.clone();
    let leitura = tokio::spawn(async move {
        let guard = config_r.read().await;
        println!("Lendo: {guard}");
    });

    // Escrita (exclusiva)
    let config_w = config.clone();
    let escrita = tokio::spawn(async move {
        let mut guard = config_w.write().await;
        *guard = String::from("novo_valor");
    });

    leitura.await.unwrap();
    escrita.await.unwrap();
}

Semaphore

use std::sync::Arc;
use tokio::sync::Semaphore;
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    // Limitar a 3 operações concorrentes
    let semaforo = Arc::new(Semaphore::new(3));
    let mut handles = vec![];

    for i in 0..10 {
        let sem = semaforo.clone();
        let handle = tokio::spawn(async move {
            let _permit = sem.acquire().await.unwrap();
            println!("Task {i} iniciou");
            sleep(Duration::from_millis(500)).await;
            println!("Task {i} finalizou");
        });
        handles.push(handle);
    }

    for h in handles {
        h.await.unwrap();
    }
}

Integração com Tracing

O Tokio funciona perfeitamente com o crate tracing para observabilidade:

[dependencies]
tokio = { version = "1", features = ["full"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
use tracing::{info, warn, instrument};
use tracing_subscriber::EnvFilter;

#[instrument]
async fn processar_pedido(pedido_id: u64) -> String {
    info!("Iniciando processamento");
    tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;

    if pedido_id % 2 == 0 {
        warn!("Pedido par detectado");
    }

    info!("Processamento concluído");
    format!("resultado-{pedido_id}")
}

#[tokio::main]
async fn main() {
    tracing_subscriber::fmt()
        .with_env_filter(EnvFilter::from_default_env())
        .init();

    info!("Aplicação iniciada");

    let resultado = processar_pedido(42).await;
    info!(resultado = %resultado, "Pedido processado");
}

Para mais sobre observabilidade, veja Tracing vs Log em Rust.

Testando Código Async

O Tokio fornece o macro #[tokio::test]:

#[cfg(test)]
mod tests {
    use tokio::sync::mpsc;
    use tokio::time::{sleep, Duration};

    #[tokio::test]
    async fn test_basico() {
        let resultado = async { 2 + 2 }.await;
        assert_eq!(resultado, 4);
    }

    #[tokio::test]
    async fn test_canal() {
        let (tx, mut rx) = mpsc::channel::<i32>(10);

        tokio::spawn(async move {
            tx.send(42).await.unwrap();
        });

        let valor = rx.recv().await.unwrap();
        assert_eq!(valor, 42);
    }

    #[tokio::test]
    async fn test_timeout() {
        let resultado = tokio::time::timeout(
            Duration::from_millis(100),
            async {
                sleep(Duration::from_millis(50)).await;
                "ok"
            }
        ).await;

        assert!(resultado.is_ok());
    }

    // Teste com runtime single-thread
    #[tokio::test(flavor = "current_thread")]
    async fn test_single_thread() {
        assert!(true);
    }
}

Padrões de Produção

Graceful Shutdown

use tokio::signal;
use tokio::sync::watch;
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    let (shutdown_tx, mut shutdown_rx) = watch::channel(false);

    // Worker
    let mut worker_rx = shutdown_tx.subscribe();
    let worker = tokio::spawn(async move {
        loop {
            tokio::select! {
                _ = sleep(Duration::from_secs(1)) => {
                    println!("Trabalhando...");
                }
                _ = worker_rx.changed() => {
                    println!("Worker recebeu sinal de shutdown");
                    break;
                }
            }
        }
        println!("Worker finalizou limpeza");
    });

    // Aguardar Ctrl+C
    signal::ctrl_c().await.unwrap();
    println!("Enviando sinal de shutdown...");
    shutdown_tx.send(true).unwrap();

    // Aguardar worker finalizar
    worker.await.unwrap();
    println!("Shutdown completo!");
}

Rate Limiting com Semaphore

use std::sync::Arc;
use tokio::sync::Semaphore;

struct Cliente {
    semaforo: Arc<Semaphore>,
    client: reqwest::Client,
}

impl Cliente {
    fn new(max_concorrente: usize) -> Self {
        Self {
            semaforo: Arc::new(Semaphore::new(max_concorrente)),
            client: reqwest::Client::new(),
        }
    }

    async fn get(&self, url: &str) -> Result<String, reqwest::Error> {
        let _permit = self.semaforo.acquire().await.unwrap();
        self.client.get(url).send().await?.text().await
    }
}

Conclusão

O Tokio é a espinha dorsal da programação assíncrona em Rust. Dominar seus componentes — tasks, channels, I/O, timers e primitivas de sincronização — é essencial para construir aplicações de alta performance. Com o ecossistema Tower/Hyper/Axum, o Tokio oferece tudo o que você precisa para desenvolver servidores web, microserviços e sistemas distribuídos robustos.

Veja Também