Tokio Rust: Runtime Assíncrono Guia Completo | Rust Brasil

Guia do Tokio em Rust: tasks, channels, I/O assíncrono, select!, timers e servidores TCP de alta performance.

O Tokio é o runtime assíncrono mais utilizado no ecossistema Rust, servindo como base para frameworks web como Axum e Actix Web, clientes HTTP como Reqwest, drivers de banco de dados como SQLx e centenas de outras crates. Ele fornece a infraestrutura necessária para executar código assíncrono com async/await, incluindo um scheduler de tasks, I/O assíncrono, timers, channels de comunicação e muito mais.

O modelo assíncrono do Rust é baseado em Futures (similar a Promises em JavaScript), mas com uma diferença fundamental: Futures em Rust são “lazy” — não fazem nada até serem polled por um runtime. O Tokio é esse runtime, responsável por agendar, executar e gerenciar milhares de tasks concorrentes com overhead mínimo.

Se você está construindo qualquer aplicação que faz I/O (rede, arquivos, bancos de dados), o Tokio provavelmente será parte do seu stack. Neste guia, vamos do básico ao avançado, construindo exemplos práticos de servidor TCP.

Instalação

Adicione o Tokio ao Cargo.toml:

[dependencies]
# Com todas as features (recomendado para aplicações)
tokio = { version = "1", features = ["full"] }

# Ou selecione features específicas (melhor para bibliotecas)
tokio = { version = "1", features = [
    "rt-multi-thread",  # Runtime multi-thread
    "macros",           # Macro #[tokio::main]
    "net",              # TCP, UDP, Unix sockets
    "io-util",          # AsyncReadExt, AsyncWriteExt
    "time",             # sleep, interval, timeout
    "sync",             # Mutex, RwLock, channels
    "fs",               # Operações de arquivo assíncronas
    "signal",           # Handlers de sinais do OS
    "process",          # Processos filhos assíncronos
] }

# Utilitários complementares
tokio-util = "0.7"       # Codecs, compat, extras
tokio-stream = "0.1"     # Streams assíncronos

Uso Básico

O Macro #[tokio::main]

use tokio;

// Configura o runtime multi-thread automaticamente
#[tokio::main]
async fn main() {
    println!("Olá do Tokio!");

    // Agora podemos usar .await
    let resultado = fazer_algo_async().await;
    println!("Resultado: {}", resultado);
}

async fn fazer_algo_async() -> String {
    // Simular operação assíncrona
    tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
    "Pronto!".to_string()
}

// Para testes assíncronos
#[cfg(test)]
mod tests {
    #[tokio::test]
    async fn teste_async() {
        let resultado = super::fazer_algo_async().await;
        assert_eq!(resultado, "Pronto!");
    }
}

Configuração Manual do Runtime

use tokio::runtime::Runtime;

fn main() {
    // Criar runtime manualmente (útil quando não pode usar #[tokio::main])
    let rt = Runtime::new().unwrap();

    // Executar uma future
    rt.block_on(async {
        println!("Dentro do runtime!");
        fazer_algo_async().await;
    });

    // Runtime com configurações customizadas
    let rt = tokio::runtime::Builder::new_multi_thread()
        .worker_threads(4)             // Número de threads
        .thread_name("meu-worker")     // Nome das threads
        .thread_stack_size(3 * 1024 * 1024)  // Stack de 3MB
        .enable_all()                  // Habilitar todos os drivers
        .build()
        .unwrap();

    // Runtime single-thread (para aplicações simples)
    let rt = tokio::runtime::Builder::new_current_thread()
        .enable_all()
        .build()
        .unwrap();
}

async fn fazer_algo_async() {
    println!("Operação assíncrona completa!");
}

Spawning Tasks

use tokio::task;

#[tokio::main]
async fn main() {
    // Spawn uma task que executa concorrentemente
    let handle = tokio::spawn(async {
        // Esta task roda em uma thread do pool do Tokio
        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
        42 // Retorna um valor
    });

    // Fazer outras coisas enquanto a task executa...
    println!("Fazendo outras coisas...");

    // Aguardar o resultado da task
    let resultado = handle.await.unwrap();
    println!("Task retornou: {}", resultado);

    // Spawn múltiplas tasks
    let mut handles = vec![];
    for i in 0..10 {
        let handle = tokio::spawn(async move {
            tokio::time::sleep(tokio::time::Duration::from_millis(100 * i)).await;
            println!("Task {} completou", i);
            i * 2
        });
        handles.push(handle);
    }

    // Aguardar todas as tasks
    let mut resultados = vec![];
    for handle in handles {
        resultados.push(handle.await.unwrap());
    }
    println!("Resultados: {:?}", resultados);

    // Spawn blocking - para código CPU-bound ou bloqueante
    let resultado = task::spawn_blocking(|| {
        // Código síncrono pesado que não deve bloquear o runtime async
        let mut soma: u64 = 0;
        for i in 0..1_000_000 {
            soma += i;
        }
        soma
    }).await.unwrap();
    println!("Soma: {}", resultado);
}

JoinSet para Gerenciar Múltiplas Tasks

use tokio::task::JoinSet;

#[tokio::main]
async fn main() {
    let mut set = JoinSet::new();

    // Adicionar tasks ao JoinSet
    for i in 0..5 {
        set.spawn(async move {
            tokio::time::sleep(tokio::time::Duration::from_millis(100 * i)).await;
            format!("Resultado da task {}", i)
        });
    }

    // Aguardar tasks conforme completam (em qualquer ordem)
    while let Some(resultado) = set.join_next().await {
        match resultado {
            Ok(valor) => println!("Task completou: {}", valor),
            Err(e) => eprintln!("Task falhou: {}", e),
        }
    }

    println!("Todas as tasks completaram!");
}

Recursos Avançados

Channels de Comunicação

use tokio::sync::{mpsc, broadcast, watch, oneshot};

#[tokio::main]
async fn main() {
    // === mpsc: Multiple Producer, Single Consumer ===
    demonstrar_mpsc().await;

    // === broadcast: Multiple Producer, Multiple Consumer ===
    demonstrar_broadcast().await;

    // === watch: Single Producer, Multiple Consumer (último valor) ===
    demonstrar_watch().await;

    // === oneshot: Single Producer, Single Consumer (um único valor) ===
    demonstrar_oneshot().await;
}

async fn demonstrar_mpsc() {
    println!("=== MPSC Channel ===");
    // Buffer de 32 mensagens
    let (tx, mut rx) = mpsc::channel::<String>(32);

    // Múltiplos produtores
    for i in 0..3 {
        let tx = tx.clone();
        tokio::spawn(async move {
            for j in 0..3 {
                let msg = format!("Produtor {}: mensagem {}", i, j);
                tx.send(msg).await.unwrap();
            }
        });
    }

    // Dropar o tx original para que o canal feche quando produtores terminarem
    drop(tx);

    // Consumidor único
    while let Some(msg) = rx.recv().await {
        println!("  Recebido: {}", msg);
    }
}

async fn demonstrar_broadcast() {
    println!("\n=== Broadcast Channel ===");
    let (tx, _) = broadcast::channel::<String>(16);

    // Múltiplos consumidores
    let mut rx1 = tx.subscribe();
    let mut rx2 = tx.subscribe();

    tokio::spawn(async move {
        tx.send("Mensagem para todos!".to_string()).unwrap();
        tx.send("Segunda mensagem!".to_string()).unwrap();
    });

    // Cada receptor recebe todas as mensagens
    let h1 = tokio::spawn(async move {
        while let Ok(msg) = rx1.recv().await {
            println!("  Receptor 1: {}", msg);
        }
    });

    let h2 = tokio::spawn(async move {
        while let Ok(msg) = rx2.recv().await {
            println!("  Receptor 2: {}", msg);
        }
    });

    let _ = tokio::join!(h1, h2);
}

async fn demonstrar_watch() {
    println!("\n=== Watch Channel ===");
    let (tx, mut rx) = watch::channel("inicial".to_string());

    // Observador
    let handle = tokio::spawn(async move {
        while rx.changed().await.is_ok() {
            println!("  Valor atualizado: {}", *rx.borrow());
        }
    });

    // Atualizar valores
    tx.send("primeiro".to_string()).unwrap();
    tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
    tx.send("segundo".to_string()).unwrap();
    tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
    tx.send("terceiro".to_string()).unwrap();

    drop(tx);
    let _ = handle.await;
}

async fn demonstrar_oneshot() {
    println!("\n=== Oneshot Channel ===");
    let (tx, rx) = oneshot::channel::<String>();

    tokio::spawn(async move {
        // Simular processamento
        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
        tx.send("Resultado do processamento".to_string()).unwrap();
    });

    // Aguardar o único resultado
    let resultado = rx.await.unwrap();
    println!("  Recebido: {}", resultado);
}

tokio::select! - Aguardar Múltiplas Futures

use tokio::sync::mpsc;
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::channel::<String>(32);

    // Produtor que envia mensagens periodicamente
    tokio::spawn(async move {
        for i in 0..5 {
            sleep(Duration::from_millis(300)).await;
            tx.send(format!("Mensagem {}", i)).await.unwrap();
        }
    });

    let mut contador = 0;

    loop {
        tokio::select! {
            // Receber mensagem do canal
            Some(msg) = rx.recv() => {
                println!("Recebida: {}", msg);
                contador += 1;
            }
            // Timeout de 1 segundo
            _ = sleep(Duration::from_secs(1)) => {
                println!("Timeout! Nenhuma mensagem por 1 segundo.");
                break;
            }
            // Sinal de interrupção (Ctrl+C)
            _ = tokio::signal::ctrl_c() => {
                println!("Ctrl+C recebido, encerrando...");
                break;
            }
        }
    }

    println!("Total de mensagens recebidas: {}", contador);
}

Timeouts e Intervals

use tokio::time::{self, Duration, Instant};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // === Sleep ===
    println!("Dormindo por 500ms...");
    time::sleep(Duration::from_millis(500)).await;
    println!("Acordei!");

    // === Timeout ===
    let resultado = time::timeout(
        Duration::from_secs(2),
        operacao_lenta()
    ).await;

    match resultado {
        Ok(valor) => println!("Completou: {}", valor),
        Err(_) => println!("Timeout! Operação demorou demais."),
    }

    // === Interval ===
    let mut intervalo = time::interval(Duration::from_secs(1));
    let inicio = Instant::now();

    for i in 0..5 {
        intervalo.tick().await;
        println!("Tick {} em {:?}", i, inicio.elapsed());
    }

    // === Interval com política de missed ticks ===
    let mut intervalo = time::interval(Duration::from_millis(100));
    // Burst: envia ticks perdidos imediatamente
    intervalo.set_missed_tick_behavior(time::MissedTickBehavior::Burst);
    // Delay: atrasa o próximo tick
    // intervalo.set_missed_tick_behavior(time::MissedTickBehavior::Delay);
    // Skip: pula ticks perdidos
    // intervalo.set_missed_tick_behavior(time::MissedTickBehavior::Skip);

    Ok(())
}

async fn operacao_lenta() -> String {
    time::sleep(Duration::from_secs(1)).await;
    "Resultado da operação lenta".to_string()
}

I/O Assíncrono: TCP

use tokio::io::{self, AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
use tokio::net::{TcpListener, TcpStream};

// Servidor TCP simples
async fn iniciar_servidor() -> io::Result<()> {
    let listener = TcpListener::bind("127.0.0.1:8080").await?;
    println!("Servidor ouvindo em 127.0.0.1:8080");

    loop {
        let (socket, addr) = listener.accept().await?;
        println!("Nova conexão de: {}", addr);

        // Spawn uma task para cada conexão
        tokio::spawn(async move {
            if let Err(e) = tratar_conexao(socket).await {
                eprintln!("Erro na conexão {}: {}", addr, e);
            }
        });
    }
}

async fn tratar_conexao(mut socket: TcpStream) -> io::Result<()> {
    let (reader, mut writer) = socket.split();
    let mut reader = BufReader::new(reader);
    let mut linha = String::new();

    // Echo server: ler linha, enviar de volta
    loop {
        linha.clear();
        let bytes_lidos = reader.read_line(&mut linha).await?;

        if bytes_lidos == 0 {
            println!("Conexão fechada pelo cliente");
            break;
        }

        let resposta = format!("Echo: {}", linha);
        writer.write_all(resposta.as_bytes()).await?;
        writer.flush().await?;
    }

    Ok(())
}

// Cliente TCP
async fn conectar_ao_servidor() -> io::Result<()> {
    let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
    println!("Conectado ao servidor!");

    // Enviar mensagem
    stream.write_all(b"Ola, servidor!\n").await?;

    // Ler resposta
    let mut buffer = vec![0u8; 1024];
    let n = stream.read(&mut buffer).await?;
    let resposta = String::from_utf8_lossy(&buffer[..n]);
    println!("Resposta: {}", resposta);

    Ok(())
}

Sincronização Assíncrona

use std::sync::Arc;
use tokio::sync::{Mutex, RwLock, Semaphore, Notify};

#[tokio::main]
async fn main() {
    // === Mutex Assíncrono ===
    let contador = Arc::new(Mutex::new(0));

    let mut handles = vec![];
    for _ in 0..10 {
        let contador = Arc::clone(&contador);
        handles.push(tokio::spawn(async move {
            let mut lock = contador.lock().await;
            *lock += 1;
            // lock é dropado aqui, liberando o mutex
        }));
    }

    for handle in handles {
        handle.await.unwrap();
    }
    println!("Contador: {}", *contador.lock().await);

    // === RwLock ===
    let dados = Arc::new(RwLock::new(vec![1, 2, 3]));

    // Múltiplos leitores simultâneos
    let dados_ref = Arc::clone(&dados);
    let leitor = tokio::spawn(async move {
        let read = dados_ref.read().await;
        println!("Lendo: {:?}", *read);
    });

    // Escritor exclusivo
    let dados_ref = Arc::clone(&dados);
    let escritor = tokio::spawn(async move {
        let mut write = dados_ref.write().await;
        write.push(4);
        println!("Escrevendo: {:?}", *write);
    });

    let _ = tokio::join!(leitor, escritor);

    // === Semaphore ===
    let semaforo = Arc::new(Semaphore::new(3)); // Máximo 3 acessos simultâneos

    let mut handles = vec![];
    for i in 0..10 {
        let sem = Arc::clone(&semaforo);
        handles.push(tokio::spawn(async move {
            let _permit = sem.acquire().await.unwrap();
            println!("Task {} acessando recurso", i);
            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
            println!("Task {} liberando recurso", i);
            // _permit dropado aqui, liberando um slot
        }));
    }

    for handle in handles {
        handle.await.unwrap();
    }

    // === Notify ===
    let notify = Arc::new(Notify::new());
    let notify2 = Arc::clone(&notify);

    tokio::spawn(async move {
        tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
        println!("Notificando...");
        notify2.notify_one();
    });

    println!("Aguardando notificação...");
    notify.notified().await;
    println!("Notificação recebida!");
}

Boas Práticas

1. Não Bloqueie o Runtime Async

// ERRADO: bloqueia a thread do runtime
async fn ruim() {
    std::thread::sleep(std::time::Duration::from_secs(1)); // Bloqueia!
    let dados = std::fs::read_to_string("arquivo.txt").unwrap(); // Bloqueia!
}

// CERTO: use versões assíncronas
async fn bom() {
    tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
    let dados = tokio::fs::read_to_string("arquivo.txt").await.unwrap();
}

// Para código necessariamente bloqueante, use spawn_blocking
async fn computacao_pesada() -> u64 {
    tokio::task::spawn_blocking(|| {
        // Código CPU-bound roda em thread pool dedicado
        (0..10_000_000u64).sum()
    }).await.unwrap()
}

2. Prefira Channels a Mutex Quando Possível

use tokio::sync::mpsc;

// Padrão ator: comunicação via mensagens
struct Contador {
    rx: mpsc::Receiver<ContadorMsg>,
    valor: i64,
}

enum ContadorMsg {
    Incrementar,
    Decrementar,
    Obter(tokio::sync::oneshot::Sender<i64>),
}

impl Contador {
    fn new() -> (Self, mpsc::Sender<ContadorMsg>) {
        let (tx, rx) = mpsc::channel(32);
        (Contador { rx, valor: 0 }, tx)
    }

    async fn executar(mut self) {
        while let Some(msg) = self.rx.recv().await {
            match msg {
                ContadorMsg::Incrementar => self.valor += 1,
                ContadorMsg::Decrementar => self.valor -= 1,
                ContadorMsg::Obter(resposta) => {
                    let _ = resposta.send(self.valor);
                }
            }
        }
    }
}

3. Use tokio::select! com Cuidado

use tokio::sync::mpsc;

async fn loop_principal(mut rx: mpsc::Receiver<String>) {
    let mut intervalo = tokio::time::interval(tokio::time::Duration::from_secs(30));

    loop {
        tokio::select! {
            // biased; // Descomente para priorizar os branches na ordem
            Some(msg) = rx.recv() => {
                println!("Mensagem: {}", msg);
            }
            _ = intervalo.tick() => {
                println!("Heartbeat");
            }
            _ = tokio::signal::ctrl_c() => {
                println!("Encerrando gracefully...");
                break;
            }
        }
    }
}

4. Trate Erros de Tasks

#[tokio::main]
async fn main() {
    let handle = tokio::spawn(async {
        // Se esta task der panic, o JoinError captura
        panic!("Algo deu errado!");
    });

    match handle.await {
        Ok(()) => println!("Task completou com sucesso"),
        Err(e) if e.is_panic() => {
            eprintln!("Task deu panic: {:?}", e.into_panic());
        }
        Err(e) if e.is_cancelled() => {
            eprintln!("Task foi cancelada");
        }
        Err(e) => eprintln!("Erro inesperado: {}", e),
    }
}

5. Configure o Runtime Adequadamente

// Para servidores de alta performance
#[tokio::main(flavor = "multi_thread", worker_threads = 8)]
async fn main() {
    // Runtime multi-thread com 8 workers
}

// Para CLIs ou testes simples
#[tokio::main(flavor = "current_thread")]
async fn main() {
    // Runtime single-thread, mais leve
}

Exemplos Práticos

Exemplo: Servidor de Chat TCP

use std::collections::HashMap;
use std::sync::Arc;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::TcpListener;
use tokio::sync::{broadcast, Mutex};

type Clientes = Arc<Mutex<HashMap<String, tokio::sync::mpsc::Sender<String>>>>;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let listener = TcpListener::bind("127.0.0.1:8080").await?;
    let (broadcast_tx, _) = broadcast::channel::<String>(100);
    let clientes: Clientes = Arc::new(Mutex::new(HashMap::new()));

    println!("Servidor de chat ouvindo em 127.0.0.1:8080");

    loop {
        let (socket, addr) = listener.accept().await?;
        let broadcast_tx = broadcast_tx.clone();
        let mut broadcast_rx = broadcast_tx.subscribe();
        let clientes = Arc::clone(&clientes);

        tokio::spawn(async move {
            let (reader, mut writer) = socket.into_split();
            let mut reader = BufReader::new(reader);

            // Pedir nome do usuário
            writer.write_all(b"Digite seu nome: ").await.unwrap();
            let mut nome = String::new();
            reader.read_line(&mut nome).await.unwrap();
            let nome = nome.trim().to_string();

            // Anunciar entrada
            let msg_entrada = format!("[{}] {} entrou no chat\n", addr, nome);
            println!("{}", msg_entrada.trim());
            let _ = broadcast_tx.send(msg_entrada);

            writer.write_all(format!("Bem-vindo, {}! Digite mensagens:\n", nome).as_bytes())
                .await.unwrap();

            let nome_clone = nome.clone();

            // Task para enviar mensagens broadcast ao cliente
            let write_handle = tokio::spawn(async move {
                while let Ok(msg) = broadcast_rx.recv().await {
                    if writer.write_all(msg.as_bytes()).await.is_err() {
                        break;
                    }
                }
            });

            // Ler mensagens do cliente
            let mut linha = String::new();
            loop {
                linha.clear();
                match reader.read_line(&mut linha).await {
                    Ok(0) | Err(_) => break,
                    Ok(_) => {
                        let msg = format!("[{}]: {}", nome_clone, linha);
                        let _ = broadcast_tx.send(msg);
                    }
                }
            }

            // Anunciar saída
            let msg_saida = format!("[{}] {} saiu do chat\n", addr, nome_clone);
            println!("{}", msg_saida.trim());
            let _ = broadcast_tx.send(msg_saida);

            write_handle.abort();
        });
    }
}

Exemplo: Rate Limiter com Semáforo

use std::sync::Arc;
use tokio::sync::Semaphore;
use tokio::time::{sleep, Duration};

/// Rate limiter que permite N requisições simultâneas
struct RateLimiter {
    semaforo: Arc<Semaphore>,
    max_concurrent: usize,
}

impl RateLimiter {
    fn new(max_concurrent: usize) -> Self {
        RateLimiter {
            semaforo: Arc::new(Semaphore::new(max_concurrent)),
            max_concurrent,
        }
    }

    async fn executar<F, T>(&self, tarefa: F) -> T
    where
        F: std::future::Future<Output = T>,
    {
        let _permit = self.semaforo.acquire().await.unwrap();
        tarefa.await
    }
}

#[tokio::main]
async fn main() {
    let limiter = Arc::new(RateLimiter::new(3)); // Máximo 3 simultâneas

    let mut handles = vec![];
    for i in 0..10 {
        let limiter = Arc::clone(&limiter);
        handles.push(tokio::spawn(async move {
            limiter.executar(async move {
                println!("Iniciando requisição {}", i);
                sleep(Duration::from_millis(500)).await;
                println!("Completando requisição {}", i);
                i
            }).await
        }));
    }

    for handle in handles {
        let resultado = handle.await.unwrap();
        println!("Resultado: {}", resultado);
    }
}

Comparação com Alternativas

CaracterísticaTokioasync-stdsmolmonoio
PopularidadeMuito altaMédiaBaixaNicho
ModeloMulti-thread work-stealingMulti-threadMulti-thread leveio_uring single-thread
PerformanceExcelenteBoaBoaExcelente (Linux)
EcossistemaEnorme (tower, axum, tonic)ModeradoPequenoPequeno
APIRica e completaSimilar à stdMinimalistaEspecializada
Channelsmpsc, broadcast, watch, oneshotSimVia async-channelBásico
TimerIntegradoIntegradoVia async-ioIntegrado
I/Oepoll/kqueue/iocpepoll/kqueueepoll/kqueueio_uring
Uso de memóriaModeradoModeradoBaixoBaixo
MaturidadeMuito maduraMaduraMaduraJovem

O Tokio se destaca por:

  • Ecossistema massivo: praticamente toda crate async é compatível
  • Performance comprovada: usado em produção por Discord, Cloudflare, AWS, Linkerd
  • Features completas: tudo que precisa para aplicações de produção
  • Documentação excelente: guias, exemplos e API docs de alta qualidade
  • Tower/Hyper/Axum: stack web completo e coeso

Conclusão

O Tokio é a fundação da programação assíncrona em Rust, fornecendo tudo que você precisa para construir aplicações de rede, servidores web e sistemas distribuídos de alta performance. Dominar o Tokio abre as portas para todo o ecossistema async do Rust.

Pontos-chave para lembrar:

  • #[tokio::main] configura o runtime automaticamente
  • tokio::spawn para tarefas concorrentes, spawn_blocking para código bloqueante
  • Channels (mpsc, broadcast, watch, oneshot) para comunicação entre tasks
  • tokio::select! para aguardar múltiplas futures simultaneamente
  • Nunca bloqueie o runtime async com operações síncronas
  • Semaphore e Mutex para controle de concorrência

Para aprofundar, consulte o Tokio Tutorial e a API reference.

No próximo passo, aprenda sobre o Rayon para paralelismo de dados em Rust.