Async/Await Rust em Profundidade | Rust Brasil

Async/await em Rust: futures, pinning, wakers, executors e como o runtime funciona. Guia avançado em português.

A programação assíncrona em Rust é poderosa, mas funciona de forma fundamentalmente diferente de outras linguagens. Enquanto JavaScript e Python usam um runtime integrado à linguagem, o Rust define apenas os primitivos (Future, async, await) e delega a execução a runtimes externos como Tokio. Neste artigo, vamos abrir o capô e entender como tudo funciona por dentro — de futures até o mecanismo de polling.

O Que É uma Future?

No centro do async Rust está o trait Future:

use std::pin::Pin;
use std::task::{Context, Poll};

// Definição simplificada
trait Future {
    type Output;
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

// Poll é um enum simples
enum Poll<T> {
    Ready(T),    // Resultado pronto
    Pending,     // Ainda não terminou
}

Uma future é uma computação que pode não ter terminado ainda. Cada chamada a poll() tenta avançar a computação:

Ciclo de vida de uma Future:
                    ┌─────────┐
                    │ Criada   │
                    └────┬─────┘
                         │
          ┌──────────────▼──────────────┐
          │    poll() chamado            │
          └──────────────┬──────────────┘
                         │
              ┌──────────┴──────────┐
              │                     │
        ┌─────▼─────┐        ┌─────▼─────┐
        │  Pending   │        │  Ready(T)  │
        │(registra   │        │ (concluído)│
        │ Waker)     │        └────────────┘
        └─────┬──────┘
              │
              │ Waker notifica
              │
              ▼
        poll() novamente...

Como async/await Se Transforma em Futures

Quando você escreve uma função async, o compilador a transforma em uma máquina de estados:

// O que você escreve:
async fn buscar_dados(url: &str) -> String {
    let resposta = fazer_requisicao(url).await;
    let texto = processar(resposta).await;
    texto.to_uppercase()
}

// O que o compilador gera (conceitualmente):
// enum BuscarDadosFuture {
//     Estado0 { url: String },                        // antes do primeiro await
//     Estado1 { url: String, fut: RequisicaoFuture },  // esperando requisição
//     Estado2 { fut: ProcessarFuture },                 // esperando processamento
//     Completo,
// }
//
// impl Future for BuscarDadosFuture {
//     type Output = String;
//     fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<String> {
//         match self.estado {
//             Estado0 => { /* inicia requisição, vai para Estado1 */ }
//             Estado1 => { /* poll requisição, se Ready vai para Estado2 */ }
//             Estado2 => { /* poll processamento, se Ready retorna resultado */ }
//         }
//     }
// }

Cada .await é um ponto de suspensão onde a future pode retornar Pending e ser retomada depois.

Implementando uma Future Manual

Vamos criar uma future simples para entender o mecanismo:

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};

struct Temporizador {
    quando: Instant,
}

impl Temporizador {
    fn novo(duracao: Duration) -> Self {
        Temporizador {
            quando: Instant::now() + duracao,
        }
    }
}

impl Future for Temporizador {
    type Output = String;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        if Instant::now() >= self.quando {
            Poll::Ready("Temporizador concluído!".to_string())
        } else {
            // Registrar waker para ser notificado depois
            let waker = cx.waker().clone();
            let quando = self.quando;

            // Em um runtime real, registraríamos com o reactor
            // Aqui, simulamos com uma thread
            std::thread::spawn(move || {
                let agora = Instant::now();
                if quando > agora {
                    std::thread::sleep(quando - agora);
                }
                waker.wake(); // Notifica o runtime para poll() novamente
            });

            Poll::Pending
        }
    }
}

#[tokio::main]
async fn main() {
    println!("Iniciando temporizador...");
    let resultado = Temporizador::novo(Duration::from_secs(1)).await;
    println!("{}", resultado);
}

Pin: Por Que Futures Precisam Ser Fixadas

Pin<&mut Self> na assinatura de poll() garante que a future não seja movida na memória. Isso é necessário porque futures podem conter auto-referências:

// Esta async fn gera uma future com auto-referência:
async fn auto_referencia() {
    let dados = vec![1, 2, 3];
    let referencia = &dados; // referência a dados — auto-referência na future!
    println!("{:?}", referencia);
    tokio::time::sleep(std::time::Duration::from_millis(10)).await;
    // Após o await, 'referencia' ainda aponta para 'dados'
    // Se a future fosse movida, o ponteiro ficaria inválido!
    println!("{:?}", referencia);
}

Layout de Memória e o Problema do Move

Antes do move (endereço 0x1000):
┌──────────────────────────────┐
│ dados: [1, 2, 3]  @ 0x1000  │
│ referencia: ptr → 0x1000    │  ← aponta para si mesmo
└──────────────────────────────┘

Após move (endereço 0x2000):
┌──────────────────────────────┐
│ dados: [1, 2, 3]  @ 0x2000  │
│ referencia: ptr → 0x1000    │  ← ponteiro INVÁLIDO!
└──────────────────────────────┘

Com Pin: a future fica fixada e não pode ser movida.

Usando Pin na Prática

use std::pin::Pin;
use std::future::Future;

// Ao armazenar futures em structs, use Pin<Box<dyn Future>>
struct TarefasPendentes {
    tarefas: Vec<Pin<Box<dyn Future<Output = String> + Send>>>,
}

impl TarefasPendentes {
    fn nova() -> Self {
        TarefasPendentes { tarefas: Vec::new() }
    }

    fn adicionar(&mut self, tarefa: impl Future<Output = String> + Send + 'static) {
        self.tarefas.push(Box::pin(tarefa));
    }
}

async fn tarefa_exemplo(id: u32) -> String {
    tokio::time::sleep(std::time::Duration::from_millis(100)).await;
    format!("Tarefa {} concluída", id)
}

#[tokio::main]
async fn main() {
    let mut pendentes = TarefasPendentes::nova();
    pendentes.adicionar(tarefa_exemplo(1));
    pendentes.adicionar(tarefa_exemplo(2));

    println!("{} tarefas pendentes", pendentes.tarefas.len());

    // Na prática, use tokio::join! ou FuturesUnordered
}

O Waker: Como o Runtime Sabe Quando Fazer Poll

O Waker é o mecanismo de notificação: quando uma operação de I/O completa, o waker notifica o runtime para chamar poll() novamente.

Fluxo de uma operação assíncrona:
──────────────────────────────────
1. Runtime chama poll() na future
2. Future tenta ler do socket → não tem dados
3. Future registra o Waker com o reactor (epoll/kqueue/IOCP)
4. Future retorna Poll::Pending
5. Runtime suspende esta future e executa outras

... tempo passa, dados chegam no socket ...

6. Reactor detecta dados disponíveis
7. Reactor chama waker.wake()
8. Runtime coloca a future na fila de prontos
9. Runtime chama poll() novamente
10. Future lê os dados → retorna Poll::Ready(dados)

Tokio: O Runtime na Prática

Configuração Básica

// Cargo.toml:
// [dependencies]
// tokio = { version = "1", features = ["full"] }

// Runtime single-threaded
#[tokio::main(flavor = "current_thread")]
async fn main() {
    println!("Single-thread!");
}

// Runtime multi-threaded (padrão)
#[tokio::main]
async fn main_multi() {
    println!("Multi-thread!");
}

// Criação manual do runtime
fn main_manual() {
    let rt = tokio::runtime::Builder::new_multi_thread()
        .worker_threads(4)
        .enable_all()
        .build()
        .unwrap();

    rt.block_on(async {
        println!("Runtime manual!");
    });
}

join! — Executar Futures em Paralelo

use tokio::time::{sleep, Duration};

async fn buscar_usuario(id: u32) -> String {
    sleep(Duration::from_millis(200)).await;
    format!("Usuário #{}", id)
}

async fn buscar_pedidos(usuario_id: u32) -> Vec<String> {
    sleep(Duration::from_millis(300)).await;
    vec![format!("Pedido A de #{}", usuario_id), format!("Pedido B de #{}", usuario_id)]
}

async fn buscar_saldo(usuario_id: u32) -> f64 {
    sleep(Duration::from_millis(150)).await;
    1500.50 + usuario_id as f64
}

#[tokio::main]
async fn main() {
    let inicio = std::time::Instant::now();

    // Executando sequencialmente: ~650ms
    // let usuario = buscar_usuario(1).await;
    // let pedidos = buscar_pedidos(1).await;
    // let saldo = buscar_saldo(1).await;

    // Executando em paralelo com join!: ~300ms (o mais lento)
    let (usuario, pedidos, saldo) = tokio::join!(
        buscar_usuario(1),
        buscar_pedidos(1),
        buscar_saldo(1),
    );

    println!("Usuário: {}", usuario);
    println!("Pedidos: {:?}", pedidos);
    println!("Saldo: R${:.2}", saldo);
    println!("Tempo: {:?}", inicio.elapsed());
}

select! — Corrida entre Futures

select! executa múltiplas futures e retorna a que completar primeiro:

use tokio::time::{sleep, Duration};

async fn servidor_principal() -> String {
    sleep(Duration::from_millis(500)).await;
    "Resposta do servidor principal".to_string()
}

async fn servidor_backup() -> String {
    sleep(Duration::from_millis(300)).await;
    "Resposta do servidor backup".to_string()
}

#[tokio::main]
async fn main() {
    // Corrida: quem responder primeiro ganha
    let resultado = tokio::select! {
        resp = servidor_principal() => {
            println!("Principal respondeu primeiro");
            resp
        }
        resp = servidor_backup() => {
            println!("Backup respondeu primeiro");
            resp
        }
    };

    println!("Resultado: {}", resultado);

    // Timeout pattern
    let resultado_com_timeout = tokio::select! {
        resp = servidor_principal() => Ok(resp),
        _ = sleep(Duration::from_millis(200)) => Err("Timeout!"),
    };

    match resultado_com_timeout {
        Ok(resp) => println!("Resposta: {}", resp),
        Err(e) => println!("Erro: {}", e),
    }
}

Spawning de Tasks

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::channel::<String>(32);

    // Spawn de múltiplas tasks
    for i in 0..5 {
        let tx_clone = tx.clone();
        tokio::spawn(async move {
            tokio::time::sleep(std::time::Duration::from_millis(100 * i)).await;
            tx_clone
                .send(format!("Mensagem da task {}", i))
                .await
                .unwrap();
        });
    }

    // Drop o sender original para que o canal feche quando todas as tasks terminarem
    drop(tx);

    // Receber resultados conforme ficam prontos
    while let Some(msg) = rx.recv().await {
        println!("Recebido: {}", msg);
    }

    println!("Todas as tasks concluíram");
}

Concorrência Estruturada com JoinSet

use tokio::task::JoinSet;

async fn processar_item(id: u32) -> Result<String, String> {
    tokio::time::sleep(std::time::Duration::from_millis(100)).await;

    if id == 3 {
        Err(format!("Erro no item {}", id))
    } else {
        Ok(format!("Item {} processado", id))
    }
}

#[tokio::main]
async fn main() {
    let mut set = JoinSet::new();

    // Spawn de múltiplas tasks
    for id in 0..5 {
        set.spawn(processar_item(id));
    }

    // Coletar resultados conforme completam
    let mut sucessos = 0;
    let mut erros = 0;

    while let Some(resultado) = set.join_next().await {
        match resultado {
            Ok(Ok(msg)) => {
                println!("Sucesso: {}", msg);
                sucessos += 1;
            }
            Ok(Err(msg)) => {
                println!("Falha: {}", msg);
                erros += 1;
            }
            Err(e) => println!("Task panic: {}", e),
        }
    }

    println!("\nResumo: {} sucessos, {} erros", sucessos, erros);
}

Streams: Iteradores Assíncronos

use tokio::sync::mpsc;
use tokio_stream::{wrappers::ReceiverStream, StreamExt};

#[tokio::main]
async fn main() {
    let (tx, rx) = mpsc::channel::<i32>(10);

    // Produtor
    tokio::spawn(async move {
        for i in 1..=5 {
            tx.send(i).await.unwrap();
            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
        }
    });

    // Consumidor usando Stream
    let mut stream = ReceiverStream::new(rx);

    // Combinadores de stream (similar a iteradores)
    while let Some(valor) = stream.next().await {
        println!("Recebido: {}", valor);
    }
}

Erros Comuns com Async

1. await fora de contexto async

// NÃO COMPILA
// fn main() {
//     let resultado = alguma_future().await; // E0728!
// }

// CORRETO: use #[tokio::main] ou block_on
#[tokio::main]
async fn main() {
    let resultado = async { 42 }.await;
    println!("{}", resultado);
}

Veja E0728: Await Fora de Async para detalhes.

2. Future não é Send

use std::rc::Rc;

async fn tarefa_com_rc() {
    let dados = Rc::new(42); // Rc não é Send!
    // Se cruzar um .await com Rc no escopo, a future não será Send
    println!("{}", dados);
    // tokio::time::sleep(std::time::Duration::from_millis(10)).await;
    // drop(dados);  ← se dropar antes do await, funciona
}

// SOLUÇÃO: use Arc em vez de Rc
use std::sync::Arc;

async fn tarefa_com_arc() {
    let dados = Arc::new(42); // Arc é Send!
    tokio::time::sleep(std::time::Duration::from_millis(10)).await;
    println!("{}", dados);
}

#[tokio::main]
async fn main() {
    // tokio::spawn requer Send
    tokio::spawn(tarefa_com_arc()).await.unwrap();
}

3. Bloquear o runtime com operações síncronas

#[tokio::main]
async fn main() {
    // ERRADO: std::thread::sleep bloqueia a thread do runtime!
    // std::thread::sleep(std::time::Duration::from_secs(5));

    // CORRETO: use a versão async
    tokio::time::sleep(std::time::Duration::from_secs(1)).await;

    // Para operações bloqueantes necessárias, use spawn_blocking
    let resultado = tokio::task::spawn_blocking(|| {
        // Código bloqueante executa em thread separada
        std::thread::sleep(std::time::Duration::from_millis(100));
        "Operação bloqueante concluída"
    })
    .await
    .unwrap();

    println!("{}", resultado);
}

4. Deadlock com Mutex síncrono

use std::sync::Arc;

#[tokio::main]
async fn main() {
    // PERIGO: std::sync::Mutex pode causar deadlock em async
    // porque a lock é mantida durante o .await
    //
    // let mutex = Arc::new(std::sync::Mutex::new(0));
    // let guard = mutex.lock().unwrap();
    // some_future.await; // Thread bloqueada segurando a lock!

    // SOLUÇÃO: use tokio::sync::Mutex para locks assíncronas
    let mutex = Arc::new(tokio::sync::Mutex::new(0));

    let mutex_clone = Arc::clone(&mutex);
    tokio::spawn(async move {
        let mut guard = mutex_clone.lock().await;
        *guard += 1;
        // safe: soltar antes ou depois de await
        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
        println!("Valor: {}", *guard);
    })
    .await
    .unwrap();

    // Alternativa: minimizar o escopo da lock síncrona
    let mutex_sync = Arc::new(std::sync::Mutex::new(0));
    {
        let mut guard = mutex_sync.lock().unwrap();
        *guard += 1;
    } // lock liberada antes de qualquer .await
    println!("Valor final: {}", *mutex_sync.lock().unwrap());
}

Aplicação no Mundo Real: Web Scraper Assíncrono

use std::time::Instant;

async fn buscar_pagina(url: &str) -> Result<usize, String> {
    // Simulando requisição HTTP
    let delay = if url.contains("lento") { 500 } else { 100 };
    tokio::time::sleep(std::time::Duration::from_millis(delay)).await;

    // Simulando tamanho da resposta
    Ok(url.len() * 100)
}

async fn verificar_urls(urls: Vec<String>) {
    let inicio = Instant::now();

    let mut set = tokio::task::JoinSet::new();

    for url in urls {
        set.spawn(async move {
            match buscar_pagina(&url).await {
                Ok(tamanho) => {
                    println!("[OK] {} ({} bytes)", url, tamanho);
                    Ok((url, tamanho))
                }
                Err(e) => {
                    println!("[ERRO] {}: {}", url, e);
                    Err(e)
                }
            }
        });
    }

    let mut total_bytes = 0usize;
    let mut total_ok = 0u32;

    while let Some(resultado) = set.join_next().await {
        if let Ok(Ok((_, tamanho))) = resultado {
            total_bytes += tamanho;
            total_ok += 1;
        }
    }

    println!("\n--- Resumo ---");
    println!("URLs verificadas: {}", total_ok);
    println!("Total bytes: {}", total_bytes);
    println!("Tempo total: {:?}", inicio.elapsed());
}

#[tokio::main]
async fn main() {
    let urls = vec![
        "https://exemplo.com/pagina1".to_string(),
        "https://exemplo.com/pagina2".to_string(),
        "https://exemplo.com/lento/pagina3".to_string(),
        "https://exemplo.com/pagina4".to_string(),
        "https://exemplo.com/lento/pagina5".to_string(),
    ];

    verificar_urls(urls).await;
}

Veja Também