Introdução
Tokio é o runtime assíncrono mais popular e amplamente utilizado do ecossistema Rust. Ele fornece a infraestrutura necessária para escrever aplicações assíncronas confiáveis e de alta performance: um scheduler multi-thread, I/O assíncrono, timers, canais de comunicação e primitivas de sincronização.
Criado e mantido pela equipe Tokio (que também mantém Axum, Hyper, Tonic e Tower), o Tokio é a base sobre a qual a maioria dos frameworks e bibliotecas async de Rust são construídos — incluindo Axum, Reqwest, SQLx, Tonic e muitos outros.
Neste guia completo, vamos explorar os principais componentes do Tokio com exemplos práticos.
Dependências no Cargo.toml
[dependencies]
tokio = { version = "1", features = ["full"] }
Para produção, é recomendado habilitar apenas as features necessárias:
[dependencies]
tokio = { version = "1", features = [
"rt-multi-thread", # Runtime multi-thread
"macros", # #[tokio::main] e #[tokio::test]
"net", # TCP, UDP, Unix sockets
"io-util", # AsyncRead, AsyncWrite, BufReader
"time", # Sleep, timeout, interval
"sync", # Mutex, RwLock, Semaphore, channels
"fs", # Operações de arquivo async
"signal", # Captura de sinais (SIGINT, etc)
"process", # Processos async
] }
O Macro #[tokio::main]
Todo programa Tokio começa com o macro #[tokio::main], que inicializa o runtime:
#[tokio::main]
async fn main() {
println!("Rodando no Tokio!");
}
Isso é equivalente a:
fn main() {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
println!("Rodando no Tokio!");
});
}
Para um runtime single-thread (útil em testes ou aplicações leves):
#[tokio::main(flavor = "current_thread")]
async fn main() {
println!("Runtime single-thread!");
}
Spawning de Tasks
O tokio::spawn cria tasks leves que rodam concorrentemente no scheduler:
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
// Spawn retorna um JoinHandle
let handle1 = tokio::spawn(async {
sleep(Duration::from_millis(200)).await;
println!("Task 1 concluída");
10
});
let handle2 = tokio::spawn(async {
sleep(Duration::from_millis(100)).await;
println!("Task 2 concluída");
20
});
// Aguardar resultados
let r1 = handle1.await.unwrap();
let r2 = handle2.await.unwrap();
println!("Resultados: {r1} + {r2} = {}", r1 + r2);
}
Spawn com Dados Compartilhados
use std::sync::Arc;
use tokio::sync::Mutex;
#[tokio::main]
async fn main() {
let contador = Arc::new(Mutex::new(0u64));
let mut handles = vec![];
for i in 0..10 {
let contador = Arc::clone(&contador);
let handle = tokio::spawn(async move {
let mut lock = contador.lock().await;
*lock += 1;
println!("Task {i}: contador = {}", *lock);
});
handles.push(handle);
}
for handle in handles {
handle.await.unwrap();
}
let total = *contador.lock().await;
println!("Total final: {total}");
}
JoinSet para Múltiplas Tasks
use tokio::task::JoinSet;
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
let mut set = JoinSet::new();
for i in 0..5 {
set.spawn(async move {
sleep(Duration::from_millis(100 * (5 - i))).await;
format!("resultado de task {i}")
});
}
// Receber resultados na ordem de conclusão
while let Some(resultado) = set.join_next().await {
match resultado {
Ok(valor) => println!("Concluído: {valor}"),
Err(e) => eprintln!("Erro: {e}"),
}
}
}
select!: Concorrência com Múltiplos Futures
O macro select! permite aguardar múltiplos futures simultaneamente e reagir ao primeiro que completar:
use tokio::sync::mpsc;
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel::<String>(32);
// Produtor que envia mensagens
tokio::spawn(async move {
sleep(Duration::from_secs(2)).await;
tx.send("mensagem recebida!".to_string()).await.unwrap();
});
// select! entre múltiplas operações
loop {
tokio::select! {
Some(msg) = rx.recv() => {
println!("Canal: {msg}");
break;
}
_ = sleep(Duration::from_secs(1)) => {
println!("Ainda aguardando...");
}
}
}
}
Select com Cancelamento Gracioso
use tokio::signal;
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
println!("Servidor iniciado. Pressione Ctrl+C para parar.");
let servidor = async {
loop {
// Simular processamento
sleep(Duration::from_secs(1)).await;
println!("Processando...");
}
};
tokio::select! {
_ = servidor => {
println!("Servidor finalizou");
}
_ = signal::ctrl_c() => {
println!("\nRecebido Ctrl+C, encerrando graciosamente...");
}
}
}
Channels: Comunicação entre Tasks
O Tokio oferece quatro tipos de canais:
mpsc (Multiple Producer, Single Consumer)
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel::<String>(100);
// Múltiplos produtores
for i in 0..3 {
let tx = tx.clone();
tokio::spawn(async move {
for j in 0..3 {
let msg = format!("produtor {i}, mensagem {j}");
tx.send(msg).await.unwrap();
}
});
}
// Dropar o tx original para que o canal feche quando os clones forem dropados
drop(tx);
// Consumidor
while let Some(msg) = rx.recv().await {
println!("Recebido: {msg}");
}
println!("Canal fechado");
}
broadcast (Multiple Producer, Multiple Consumer)
use tokio::sync::broadcast;
#[tokio::main]
async fn main() {
let (tx, _) = broadcast::channel::<String>(16);
let mut rx1 = tx.subscribe();
let mut rx2 = tx.subscribe();
tokio::spawn(async move {
while let Ok(msg) = rx1.recv().await {
println!("[Assinante 1] {msg}");
}
});
tokio::spawn(async move {
while let Ok(msg) = rx2.recv().await {
println!("[Assinante 2] {msg}");
}
});
tx.send("evento global!".to_string()).unwrap();
tx.send("outro evento!".to_string()).unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
watch (Single Producer, Multiple Consumer — último valor)
use tokio::sync::watch;
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
let (tx, mut rx) = watch::channel("inicial".to_string());
// Observador
tokio::spawn(async move {
while rx.changed().await.is_ok() {
println!("Novo estado: {}", *rx.borrow());
}
});
// Atualizador
for estado in ["carregando", "pronto", "processando", "concluído"] {
sleep(Duration::from_millis(500)).await;
tx.send(estado.to_string()).unwrap();
}
sleep(Duration::from_millis(100)).await;
}
oneshot (Single use)
use tokio::sync::oneshot;
#[tokio::main]
async fn main() {
let (tx, rx) = oneshot::channel::<String>();
tokio::spawn(async move {
// Simular trabalho
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
tx.send("resultado pronto!".to_string()).unwrap();
});
match rx.await {
Ok(resultado) => println!("Recebido: {resultado}"),
Err(_) => println!("O produtor foi dropado"),
}
}
I/O Assíncrono
TCP
use tokio::net::{TcpListener, TcpStream};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
async fn tratar_conexao(mut stream: TcpStream) {
let mut buf = [0u8; 1024];
loop {
let n = match stream.read(&mut buf).await {
Ok(0) => return, // Conexão fechada
Ok(n) => n,
Err(e) => {
eprintln!("Erro ao ler: {e}");
return;
}
};
// Echo: devolver os bytes recebidos
if let Err(e) = stream.write_all(&buf[..n]).await {
eprintln!("Erro ao escrever: {e}");
return;
}
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let listener = TcpListener::bind("127.0.0.1:8080").await?;
println!("Servidor escutando na porta 8080");
loop {
let (stream, addr) = listener.accept().await?;
println!("Nova conexão: {addr}");
tokio::spawn(tratar_conexao(stream));
}
}
Arquivos
use tokio::fs;
use tokio::io::AsyncWriteExt;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Escrever arquivo
fs::write("exemplo.txt", "Conteúdo do arquivo").await?;
// Ler arquivo
let conteudo = fs::read_to_string("exemplo.txt").await?;
println!("Conteúdo: {conteudo}");
// Append com file handle
let mut arquivo = fs::OpenOptions::new()
.append(true)
.open("exemplo.txt")
.await?;
arquivo.write_all(b"\nNova linha!").await?;
// Listar diretório
let mut entries = fs::read_dir(".").await?;
while let Some(entry) = entries.next_entry().await? {
println!("{}", entry.file_name().to_string_lossy());
}
Ok(())
}
Timers
use tokio::time::{sleep, timeout, interval, Instant, Duration};
#[tokio::main]
async fn main() {
// Sleep
println!("Dormindo 1 segundo...");
sleep(Duration::from_secs(1)).await;
// Timeout
let resultado = timeout(Duration::from_millis(500), async {
sleep(Duration::from_millis(200)).await;
42
}).await;
println!("Timeout resultado: {resultado:?}"); // Ok(42)
// Interval
let mut intervalo = interval(Duration::from_millis(500));
let inicio = Instant::now();
for _ in 0..5 {
intervalo.tick().await;
println!("Tick em {:?}", inicio.elapsed());
}
// Medir duração
let inicio = Instant::now();
sleep(Duration::from_millis(100)).await;
println!("Operação levou: {:?}", inicio.elapsed());
}
Primitivas de Sincronização
Mutex e RwLock
use std::sync::Arc;
use tokio::sync::{Mutex, RwLock};
#[tokio::main]
async fn main() {
// Mutex: acesso exclusivo
let dados = Arc::new(Mutex::new(vec![1, 2, 3]));
let dados_clone = dados.clone();
tokio::spawn(async move {
let mut lock = dados_clone.lock().await;
lock.push(4);
}).await.unwrap();
println!("Dados: {:?}", dados.lock().await);
// RwLock: múltiplos leitores, um escritor
let config = Arc::new(RwLock::new(String::from("valor_inicial")));
// Leitura (múltiplas simultâneas permitidas)
let config_r = config.clone();
let leitura = tokio::spawn(async move {
let guard = config_r.read().await;
println!("Lendo: {guard}");
});
// Escrita (exclusiva)
let config_w = config.clone();
let escrita = tokio::spawn(async move {
let mut guard = config_w.write().await;
*guard = String::from("novo_valor");
});
leitura.await.unwrap();
escrita.await.unwrap();
}
Semaphore
use std::sync::Arc;
use tokio::sync::Semaphore;
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
// Limitar a 3 operações concorrentes
let semaforo = Arc::new(Semaphore::new(3));
let mut handles = vec![];
for i in 0..10 {
let sem = semaforo.clone();
let handle = tokio::spawn(async move {
let _permit = sem.acquire().await.unwrap();
println!("Task {i} iniciou");
sleep(Duration::from_millis(500)).await;
println!("Task {i} finalizou");
});
handles.push(handle);
}
for h in handles {
h.await.unwrap();
}
}
Integração com Tracing
O Tokio funciona perfeitamente com o crate tracing para observabilidade:
[dependencies]
tokio = { version = "1", features = ["full"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
use tracing::{info, warn, instrument};
use tracing_subscriber::EnvFilter;
#[instrument]
async fn processar_pedido(pedido_id: u64) -> String {
info!("Iniciando processamento");
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
if pedido_id % 2 == 0 {
warn!("Pedido par detectado");
}
info!("Processamento concluído");
format!("resultado-{pedido_id}")
}
#[tokio::main]
async fn main() {
tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env())
.init();
info!("Aplicação iniciada");
let resultado = processar_pedido(42).await;
info!(resultado = %resultado, "Pedido processado");
}
Para mais sobre observabilidade, veja Tracing vs Log em Rust.
Testando Código Async
O Tokio fornece o macro #[tokio::test]:
#[cfg(test)]
mod tests {
use tokio::sync::mpsc;
use tokio::time::{sleep, Duration};
#[tokio::test]
async fn test_basico() {
let resultado = async { 2 + 2 }.await;
assert_eq!(resultado, 4);
}
#[tokio::test]
async fn test_canal() {
let (tx, mut rx) = mpsc::channel::<i32>(10);
tokio::spawn(async move {
tx.send(42).await.unwrap();
});
let valor = rx.recv().await.unwrap();
assert_eq!(valor, 42);
}
#[tokio::test]
async fn test_timeout() {
let resultado = tokio::time::timeout(
Duration::from_millis(100),
async {
sleep(Duration::from_millis(50)).await;
"ok"
}
).await;
assert!(resultado.is_ok());
}
// Teste com runtime single-thread
#[tokio::test(flavor = "current_thread")]
async fn test_single_thread() {
assert!(true);
}
}
Padrões de Produção
Graceful Shutdown
use tokio::signal;
use tokio::sync::watch;
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
let (shutdown_tx, mut shutdown_rx) = watch::channel(false);
// Worker
let mut worker_rx = shutdown_tx.subscribe();
let worker = tokio::spawn(async move {
loop {
tokio::select! {
_ = sleep(Duration::from_secs(1)) => {
println!("Trabalhando...");
}
_ = worker_rx.changed() => {
println!("Worker recebeu sinal de shutdown");
break;
}
}
}
println!("Worker finalizou limpeza");
});
// Aguardar Ctrl+C
signal::ctrl_c().await.unwrap();
println!("Enviando sinal de shutdown...");
shutdown_tx.send(true).unwrap();
// Aguardar worker finalizar
worker.await.unwrap();
println!("Shutdown completo!");
}
Rate Limiting com Semaphore
use std::sync::Arc;
use tokio::sync::Semaphore;
struct Cliente {
semaforo: Arc<Semaphore>,
client: reqwest::Client,
}
impl Cliente {
fn new(max_concorrente: usize) -> Self {
Self {
semaforo: Arc::new(Semaphore::new(max_concorrente)),
client: reqwest::Client::new(),
}
}
async fn get(&self, url: &str) -> Result<String, reqwest::Error> {
let _permit = self.semaforo.acquire().await.unwrap();
self.client.get(url).send().await?.text().await
}
}
Conclusão
O Tokio é a espinha dorsal da programação assíncrona em Rust. Dominar seus componentes — tasks, channels, I/O, timers e primitivas de sincronização — é essencial para construir aplicações de alta performance. Com o ecossistema Tower/Hyper/Axum, o Tokio oferece tudo o que você precisa para desenvolver servidores web, microserviços e sistemas distribuídos robustos.