O Tokio é o runtime assíncrono mais utilizado no ecossistema Rust, servindo como base para frameworks web como Axum e Actix Web, clientes HTTP como Reqwest, drivers de banco de dados como SQLx e centenas de outras crates. Ele fornece a infraestrutura necessária para executar código assíncrono com async/await, incluindo um scheduler de tasks, I/O assíncrono, timers, channels de comunicação e muito mais.
O modelo assíncrono do Rust é baseado em Futures (similar a Promises em JavaScript), mas com uma diferença fundamental: Futures em Rust são “lazy” — não fazem nada até serem polled por um runtime. O Tokio é esse runtime, responsável por agendar, executar e gerenciar milhares de tasks concorrentes com overhead mínimo.
Se você está construindo qualquer aplicação que faz I/O (rede, arquivos, bancos de dados), o Tokio provavelmente será parte do seu stack. Neste guia, vamos do básico ao avançado, construindo exemplos práticos de servidor TCP.
Instalação
Adicione o Tokio ao Cargo.toml:
[dependencies]
# Com todas as features (recomendado para aplicações)
tokio = { version = "1", features = ["full"] }
# Ou selecione features específicas (melhor para bibliotecas)
tokio = { version = "1", features = [
"rt-multi-thread", # Runtime multi-thread
"macros", # Macro #[tokio::main]
"net", # TCP, UDP, Unix sockets
"io-util", # AsyncReadExt, AsyncWriteExt
"time", # sleep, interval, timeout
"sync", # Mutex, RwLock, channels
"fs", # Operações de arquivo assíncronas
"signal", # Handlers de sinais do OS
"process", # Processos filhos assíncronos
] }
# Utilitários complementares
tokio-util = "0.7" # Codecs, compat, extras
tokio-stream = "0.1" # Streams assíncronos
Uso Básico
O Macro #[tokio::main]
use tokio;
// Configura o runtime multi-thread automaticamente
#[tokio::main]
async fn main() {
println!("Olá do Tokio!");
// Agora podemos usar .await
let resultado = fazer_algo_async().await;
println!("Resultado: {}", resultado);
}
async fn fazer_algo_async() -> String {
// Simular operação assíncrona
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
"Pronto!".to_string()
}
// Para testes assíncronos
#[cfg(test)]
mod tests {
#[tokio::test]
async fn teste_async() {
let resultado = super::fazer_algo_async().await;
assert_eq!(resultado, "Pronto!");
}
}
Configuração Manual do Runtime
use tokio::runtime::Runtime;
fn main() {
// Criar runtime manualmente (útil quando não pode usar #[tokio::main])
let rt = Runtime::new().unwrap();
// Executar uma future
rt.block_on(async {
println!("Dentro do runtime!");
fazer_algo_async().await;
});
// Runtime com configurações customizadas
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(4) // Número de threads
.thread_name("meu-worker") // Nome das threads
.thread_stack_size(3 * 1024 * 1024) // Stack de 3MB
.enable_all() // Habilitar todos os drivers
.build()
.unwrap();
// Runtime single-thread (para aplicações simples)
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
}
async fn fazer_algo_async() {
println!("Operação assíncrona completa!");
}
Spawning Tasks
use tokio::task;
#[tokio::main]
async fn main() {
// Spawn uma task que executa concorrentemente
let handle = tokio::spawn(async {
// Esta task roda em uma thread do pool do Tokio
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
42 // Retorna um valor
});
// Fazer outras coisas enquanto a task executa...
println!("Fazendo outras coisas...");
// Aguardar o resultado da task
let resultado = handle.await.unwrap();
println!("Task retornou: {}", resultado);
// Spawn múltiplas tasks
let mut handles = vec![];
for i in 0..10 {
let handle = tokio::spawn(async move {
tokio::time::sleep(tokio::time::Duration::from_millis(100 * i)).await;
println!("Task {} completou", i);
i * 2
});
handles.push(handle);
}
// Aguardar todas as tasks
let mut resultados = vec![];
for handle in handles {
resultados.push(handle.await.unwrap());
}
println!("Resultados: {:?}", resultados);
// Spawn blocking - para código CPU-bound ou bloqueante
let resultado = task::spawn_blocking(|| {
// Código síncrono pesado que não deve bloquear o runtime async
let mut soma: u64 = 0;
for i in 0..1_000_000 {
soma += i;
}
soma
}).await.unwrap();
println!("Soma: {}", resultado);
}
JoinSet para Gerenciar Múltiplas Tasks
use tokio::task::JoinSet;
#[tokio::main]
async fn main() {
let mut set = JoinSet::new();
// Adicionar tasks ao JoinSet
for i in 0..5 {
set.spawn(async move {
tokio::time::sleep(tokio::time::Duration::from_millis(100 * i)).await;
format!("Resultado da task {}", i)
});
}
// Aguardar tasks conforme completam (em qualquer ordem)
while let Some(resultado) = set.join_next().await {
match resultado {
Ok(valor) => println!("Task completou: {}", valor),
Err(e) => eprintln!("Task falhou: {}", e),
}
}
println!("Todas as tasks completaram!");
}
Recursos Avançados
Channels de Comunicação
use tokio::sync::{mpsc, broadcast, watch, oneshot};
#[tokio::main]
async fn main() {
// === mpsc: Multiple Producer, Single Consumer ===
demonstrar_mpsc().await;
// === broadcast: Multiple Producer, Multiple Consumer ===
demonstrar_broadcast().await;
// === watch: Single Producer, Multiple Consumer (último valor) ===
demonstrar_watch().await;
// === oneshot: Single Producer, Single Consumer (um único valor) ===
demonstrar_oneshot().await;
}
async fn demonstrar_mpsc() {
println!("=== MPSC Channel ===");
// Buffer de 32 mensagens
let (tx, mut rx) = mpsc::channel::<String>(32);
// Múltiplos produtores
for i in 0..3 {
let tx = tx.clone();
tokio::spawn(async move {
for j in 0..3 {
let msg = format!("Produtor {}: mensagem {}", i, j);
tx.send(msg).await.unwrap();
}
});
}
// Dropar o tx original para que o canal feche quando produtores terminarem
drop(tx);
// Consumidor único
while let Some(msg) = rx.recv().await {
println!(" Recebido: {}", msg);
}
}
async fn demonstrar_broadcast() {
println!("\n=== Broadcast Channel ===");
let (tx, _) = broadcast::channel::<String>(16);
// Múltiplos consumidores
let mut rx1 = tx.subscribe();
let mut rx2 = tx.subscribe();
tokio::spawn(async move {
tx.send("Mensagem para todos!".to_string()).unwrap();
tx.send("Segunda mensagem!".to_string()).unwrap();
});
// Cada receptor recebe todas as mensagens
let h1 = tokio::spawn(async move {
while let Ok(msg) = rx1.recv().await {
println!(" Receptor 1: {}", msg);
}
});
let h2 = tokio::spawn(async move {
while let Ok(msg) = rx2.recv().await {
println!(" Receptor 2: {}", msg);
}
});
let _ = tokio::join!(h1, h2);
}
async fn demonstrar_watch() {
println!("\n=== Watch Channel ===");
let (tx, mut rx) = watch::channel("inicial".to_string());
// Observador
let handle = tokio::spawn(async move {
while rx.changed().await.is_ok() {
println!(" Valor atualizado: {}", *rx.borrow());
}
});
// Atualizar valores
tx.send("primeiro".to_string()).unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
tx.send("segundo".to_string()).unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
tx.send("terceiro".to_string()).unwrap();
drop(tx);
let _ = handle.await;
}
async fn demonstrar_oneshot() {
println!("\n=== Oneshot Channel ===");
let (tx, rx) = oneshot::channel::<String>();
tokio::spawn(async move {
// Simular processamento
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
tx.send("Resultado do processamento".to_string()).unwrap();
});
// Aguardar o único resultado
let resultado = rx.await.unwrap();
println!(" Recebido: {}", resultado);
}
tokio::select! - Aguardar Múltiplas Futures
use tokio::sync::mpsc;
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel::<String>(32);
// Produtor que envia mensagens periodicamente
tokio::spawn(async move {
for i in 0..5 {
sleep(Duration::from_millis(300)).await;
tx.send(format!("Mensagem {}", i)).await.unwrap();
}
});
let mut contador = 0;
loop {
tokio::select! {
// Receber mensagem do canal
Some(msg) = rx.recv() => {
println!("Recebida: {}", msg);
contador += 1;
}
// Timeout de 1 segundo
_ = sleep(Duration::from_secs(1)) => {
println!("Timeout! Nenhuma mensagem por 1 segundo.");
break;
}
// Sinal de interrupção (Ctrl+C)
_ = tokio::signal::ctrl_c() => {
println!("Ctrl+C recebido, encerrando...");
break;
}
}
}
println!("Total de mensagens recebidas: {}", contador);
}
Timeouts e Intervals
use tokio::time::{self, Duration, Instant};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// === Sleep ===
println!("Dormindo por 500ms...");
time::sleep(Duration::from_millis(500)).await;
println!("Acordei!");
// === Timeout ===
let resultado = time::timeout(
Duration::from_secs(2),
operacao_lenta()
).await;
match resultado {
Ok(valor) => println!("Completou: {}", valor),
Err(_) => println!("Timeout! Operação demorou demais."),
}
// === Interval ===
let mut intervalo = time::interval(Duration::from_secs(1));
let inicio = Instant::now();
for i in 0..5 {
intervalo.tick().await;
println!("Tick {} em {:?}", i, inicio.elapsed());
}
// === Interval com política de missed ticks ===
let mut intervalo = time::interval(Duration::from_millis(100));
// Burst: envia ticks perdidos imediatamente
intervalo.set_missed_tick_behavior(time::MissedTickBehavior::Burst);
// Delay: atrasa o próximo tick
// intervalo.set_missed_tick_behavior(time::MissedTickBehavior::Delay);
// Skip: pula ticks perdidos
// intervalo.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
Ok(())
}
async fn operacao_lenta() -> String {
time::sleep(Duration::from_secs(1)).await;
"Resultado da operação lenta".to_string()
}
I/O Assíncrono: TCP
use tokio::io::{self, AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
use tokio::net::{TcpListener, TcpStream};
// Servidor TCP simples
async fn iniciar_servidor() -> io::Result<()> {
let listener = TcpListener::bind("127.0.0.1:8080").await?;
println!("Servidor ouvindo em 127.0.0.1:8080");
loop {
let (socket, addr) = listener.accept().await?;
println!("Nova conexão de: {}", addr);
// Spawn uma task para cada conexão
tokio::spawn(async move {
if let Err(e) = tratar_conexao(socket).await {
eprintln!("Erro na conexão {}: {}", addr, e);
}
});
}
}
async fn tratar_conexao(mut socket: TcpStream) -> io::Result<()> {
let (reader, mut writer) = socket.split();
let mut reader = BufReader::new(reader);
let mut linha = String::new();
// Echo server: ler linha, enviar de volta
loop {
linha.clear();
let bytes_lidos = reader.read_line(&mut linha).await?;
if bytes_lidos == 0 {
println!("Conexão fechada pelo cliente");
break;
}
let resposta = format!("Echo: {}", linha);
writer.write_all(resposta.as_bytes()).await?;
writer.flush().await?;
}
Ok(())
}
// Cliente TCP
async fn conectar_ao_servidor() -> io::Result<()> {
let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
println!("Conectado ao servidor!");
// Enviar mensagem
stream.write_all(b"Ola, servidor!\n").await?;
// Ler resposta
let mut buffer = vec![0u8; 1024];
let n = stream.read(&mut buffer).await?;
let resposta = String::from_utf8_lossy(&buffer[..n]);
println!("Resposta: {}", resposta);
Ok(())
}
Sincronização Assíncrona
use std::sync::Arc;
use tokio::sync::{Mutex, RwLock, Semaphore, Notify};
#[tokio::main]
async fn main() {
// === Mutex Assíncrono ===
let contador = Arc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let contador = Arc::clone(&contador);
handles.push(tokio::spawn(async move {
let mut lock = contador.lock().await;
*lock += 1;
// lock é dropado aqui, liberando o mutex
}));
}
for handle in handles {
handle.await.unwrap();
}
println!("Contador: {}", *contador.lock().await);
// === RwLock ===
let dados = Arc::new(RwLock::new(vec![1, 2, 3]));
// Múltiplos leitores simultâneos
let dados_ref = Arc::clone(&dados);
let leitor = tokio::spawn(async move {
let read = dados_ref.read().await;
println!("Lendo: {:?}", *read);
});
// Escritor exclusivo
let dados_ref = Arc::clone(&dados);
let escritor = tokio::spawn(async move {
let mut write = dados_ref.write().await;
write.push(4);
println!("Escrevendo: {:?}", *write);
});
let _ = tokio::join!(leitor, escritor);
// === Semaphore ===
let semaforo = Arc::new(Semaphore::new(3)); // Máximo 3 acessos simultâneos
let mut handles = vec![];
for i in 0..10 {
let sem = Arc::clone(&semaforo);
handles.push(tokio::spawn(async move {
let _permit = sem.acquire().await.unwrap();
println!("Task {} acessando recurso", i);
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
println!("Task {} liberando recurso", i);
// _permit dropado aqui, liberando um slot
}));
}
for handle in handles {
handle.await.unwrap();
}
// === Notify ===
let notify = Arc::new(Notify::new());
let notify2 = Arc::clone(¬ify);
tokio::spawn(async move {
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
println!("Notificando...");
notify2.notify_one();
});
println!("Aguardando notificação...");
notify.notified().await;
println!("Notificação recebida!");
}
Boas Práticas
1. Não Bloqueie o Runtime Async
// ERRADO: bloqueia a thread do runtime
async fn ruim() {
std::thread::sleep(std::time::Duration::from_secs(1)); // Bloqueia!
let dados = std::fs::read_to_string("arquivo.txt").unwrap(); // Bloqueia!
}
// CERTO: use versões assíncronas
async fn bom() {
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
let dados = tokio::fs::read_to_string("arquivo.txt").await.unwrap();
}
// Para código necessariamente bloqueante, use spawn_blocking
async fn computacao_pesada() -> u64 {
tokio::task::spawn_blocking(|| {
// Código CPU-bound roda em thread pool dedicado
(0..10_000_000u64).sum()
}).await.unwrap()
}
2. Prefira Channels a Mutex Quando Possível
use tokio::sync::mpsc;
// Padrão ator: comunicação via mensagens
struct Contador {
rx: mpsc::Receiver<ContadorMsg>,
valor: i64,
}
enum ContadorMsg {
Incrementar,
Decrementar,
Obter(tokio::sync::oneshot::Sender<i64>),
}
impl Contador {
fn new() -> (Self, mpsc::Sender<ContadorMsg>) {
let (tx, rx) = mpsc::channel(32);
(Contador { rx, valor: 0 }, tx)
}
async fn executar(mut self) {
while let Some(msg) = self.rx.recv().await {
match msg {
ContadorMsg::Incrementar => self.valor += 1,
ContadorMsg::Decrementar => self.valor -= 1,
ContadorMsg::Obter(resposta) => {
let _ = resposta.send(self.valor);
}
}
}
}
}
3. Use tokio::select! com Cuidado
use tokio::sync::mpsc;
async fn loop_principal(mut rx: mpsc::Receiver<String>) {
let mut intervalo = tokio::time::interval(tokio::time::Duration::from_secs(30));
loop {
tokio::select! {
// biased; // Descomente para priorizar os branches na ordem
Some(msg) = rx.recv() => {
println!("Mensagem: {}", msg);
}
_ = intervalo.tick() => {
println!("Heartbeat");
}
_ = tokio::signal::ctrl_c() => {
println!("Encerrando gracefully...");
break;
}
}
}
}
4. Trate Erros de Tasks
#[tokio::main]
async fn main() {
let handle = tokio::spawn(async {
// Se esta task der panic, o JoinError captura
panic!("Algo deu errado!");
});
match handle.await {
Ok(()) => println!("Task completou com sucesso"),
Err(e) if e.is_panic() => {
eprintln!("Task deu panic: {:?}", e.into_panic());
}
Err(e) if e.is_cancelled() => {
eprintln!("Task foi cancelada");
}
Err(e) => eprintln!("Erro inesperado: {}", e),
}
}
5. Configure o Runtime Adequadamente
// Para servidores de alta performance
#[tokio::main(flavor = "multi_thread", worker_threads = 8)]
async fn main() {
// Runtime multi-thread com 8 workers
}
// Para CLIs ou testes simples
#[tokio::main(flavor = "current_thread")]
async fn main() {
// Runtime single-thread, mais leve
}
Exemplos Práticos
Exemplo: Servidor de Chat TCP
use std::collections::HashMap;
use std::sync::Arc;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::TcpListener;
use tokio::sync::{broadcast, Mutex};
type Clientes = Arc<Mutex<HashMap<String, tokio::sync::mpsc::Sender<String>>>>;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let listener = TcpListener::bind("127.0.0.1:8080").await?;
let (broadcast_tx, _) = broadcast::channel::<String>(100);
let clientes: Clientes = Arc::new(Mutex::new(HashMap::new()));
println!("Servidor de chat ouvindo em 127.0.0.1:8080");
loop {
let (socket, addr) = listener.accept().await?;
let broadcast_tx = broadcast_tx.clone();
let mut broadcast_rx = broadcast_tx.subscribe();
let clientes = Arc::clone(&clientes);
tokio::spawn(async move {
let (reader, mut writer) = socket.into_split();
let mut reader = BufReader::new(reader);
// Pedir nome do usuário
writer.write_all(b"Digite seu nome: ").await.unwrap();
let mut nome = String::new();
reader.read_line(&mut nome).await.unwrap();
let nome = nome.trim().to_string();
// Anunciar entrada
let msg_entrada = format!("[{}] {} entrou no chat\n", addr, nome);
println!("{}", msg_entrada.trim());
let _ = broadcast_tx.send(msg_entrada);
writer.write_all(format!("Bem-vindo, {}! Digite mensagens:\n", nome).as_bytes())
.await.unwrap();
let nome_clone = nome.clone();
// Task para enviar mensagens broadcast ao cliente
let write_handle = tokio::spawn(async move {
while let Ok(msg) = broadcast_rx.recv().await {
if writer.write_all(msg.as_bytes()).await.is_err() {
break;
}
}
});
// Ler mensagens do cliente
let mut linha = String::new();
loop {
linha.clear();
match reader.read_line(&mut linha).await {
Ok(0) | Err(_) => break,
Ok(_) => {
let msg = format!("[{}]: {}", nome_clone, linha);
let _ = broadcast_tx.send(msg);
}
}
}
// Anunciar saída
let msg_saida = format!("[{}] {} saiu do chat\n", addr, nome_clone);
println!("{}", msg_saida.trim());
let _ = broadcast_tx.send(msg_saida);
write_handle.abort();
});
}
}
Exemplo: Rate Limiter com Semáforo
use std::sync::Arc;
use tokio::sync::Semaphore;
use tokio::time::{sleep, Duration};
/// Rate limiter que permite N requisições simultâneas
struct RateLimiter {
semaforo: Arc<Semaphore>,
max_concurrent: usize,
}
impl RateLimiter {
fn new(max_concurrent: usize) -> Self {
RateLimiter {
semaforo: Arc::new(Semaphore::new(max_concurrent)),
max_concurrent,
}
}
async fn executar<F, T>(&self, tarefa: F) -> T
where
F: std::future::Future<Output = T>,
{
let _permit = self.semaforo.acquire().await.unwrap();
tarefa.await
}
}
#[tokio::main]
async fn main() {
let limiter = Arc::new(RateLimiter::new(3)); // Máximo 3 simultâneas
let mut handles = vec![];
for i in 0..10 {
let limiter = Arc::clone(&limiter);
handles.push(tokio::spawn(async move {
limiter.executar(async move {
println!("Iniciando requisição {}", i);
sleep(Duration::from_millis(500)).await;
println!("Completando requisição {}", i);
i
}).await
}));
}
for handle in handles {
let resultado = handle.await.unwrap();
println!("Resultado: {}", resultado);
}
}
Comparação com Alternativas
| Característica | Tokio | async-std | smol | monoio |
|---|---|---|---|---|
| Popularidade | Muito alta | Média | Baixa | Nicho |
| Modelo | Multi-thread work-stealing | Multi-thread | Multi-thread leve | io_uring single-thread |
| Performance | Excelente | Boa | Boa | Excelente (Linux) |
| Ecossistema | Enorme (tower, axum, tonic) | Moderado | Pequeno | Pequeno |
| API | Rica e completa | Similar à std | Minimalista | Especializada |
| Channels | mpsc, broadcast, watch, oneshot | Sim | Via async-channel | Básico |
| Timer | Integrado | Integrado | Via async-io | Integrado |
| I/O | epoll/kqueue/iocp | epoll/kqueue | epoll/kqueue | io_uring |
| Uso de memória | Moderado | Moderado | Baixo | Baixo |
| Maturidade | Muito madura | Madura | Madura | Jovem |
O Tokio se destaca por:
- Ecossistema massivo: praticamente toda crate async é compatível
- Performance comprovada: usado em produção por Discord, Cloudflare, AWS, Linkerd
- Features completas: tudo que precisa para aplicações de produção
- Documentação excelente: guias, exemplos e API docs de alta qualidade
- Tower/Hyper/Axum: stack web completo e coeso
Conclusão
O Tokio é a fundação da programação assíncrona em Rust, fornecendo tudo que você precisa para construir aplicações de rede, servidores web e sistemas distribuídos de alta performance. Dominar o Tokio abre as portas para todo o ecossistema async do Rust.
Pontos-chave para lembrar:
#[tokio::main]configura o runtime automaticamentetokio::spawnpara tarefas concorrentes,spawn_blockingpara código bloqueante- Channels (mpsc, broadcast, watch, oneshot) para comunicação entre tasks
tokio::select!para aguardar múltiplas futures simultaneamente- Nunca bloqueie o runtime async com operações síncronas
- Semaphore e Mutex para controle de concorrência
Para aprofundar, consulte o Tokio Tutorial e a API reference.
No próximo passo, aprenda sobre o Rayon para paralelismo de dados em Rust.