O que faz e quando usar
O módulo std::sync::mpsc implementa canais de comunicação entre threads baseados no modelo de passagem de mensagens. “mpsc” significa Multiple Producer, Single Consumer — múltiplos produtores podem enviar mensagens para um único consumidor. Canais são a forma idiomática em Rust de transferir dados entre threads sem compartilhar memória.
Produtor A ──send()──┐
│
Produtor B ──send()──┤──> [ canal/buffer ] ──recv()──> Consumidor
│
Produtor C ──send()──┘
Use channels quando:
- Você quer comunicação unidirecional entre threads (produtor -> consumidor).
- Prefere o modelo de passagem de mensagens ao invés de estado compartilhado (Mutex).
- Precisa implementar padrões como worker pool, pipeline ou fan-out/fan-in.
- Quer transferir ownership de dados entre threads de forma natural.
Channels são uma alternativa a Arc<Mutex<T>>: em vez de proteger dados compartilhados, você envia os dados de uma thread para outra.
Tipos e Funções Principais
| Item | Descrição |
|---|---|
mpsc::channel() | Cria canal assíncrono (buffer ilimitado) |
mpsc::sync_channel(n) | Cria canal síncrono com buffer de tamanho n |
Sender<T> | Extremidade de envio (clonável para múltiplos produtores) |
SyncSender<T> | Extremidade de envio síncrono (bloqueia quando buffer cheio) |
Receiver<T> | Extremidade de recebimento (não clonável) |
sender.send(val) | Envia valor, retorna Result<(), SendError<T>> |
receiver.recv() | Recebe bloqueante, retorna Result<T, RecvError> |
receiver.try_recv() | Recebe não-bloqueante, retorna Result<T, TryRecvError> |
receiver.recv_timeout(dur) | Recebe com timeout |
receiver.iter() | Iterador bloqueante até o canal fechar |
Exemplos de Código
Canal assíncrono básico
use std::sync::mpsc;
use std::thread;
fn main() {
// channel() retorna (Sender, Receiver)
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let mensagens = vec!["olá", "do", "outro", "lado"];
for msg in mensagens {
tx.send(msg.to_string()).unwrap();
println!("[produtor] enviou: {}", msg);
}
// tx é dropado aqui -> canal fecha
});
// recv() bloqueia até receber uma mensagem
// Retorna Err quando o canal fecha (todos os Senders dropados)
while let Ok(msg) = rx.recv() {
println!("[consumidor] recebeu: {}", msg);
}
println!("Canal fechado, fim.");
}
Iterando sobre o Receiver
O Receiver implementa IntoIterator, permitindo um loop for elegante:
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
for i in 1..=5 {
tx.send(i * i).unwrap();
thread::sleep(Duration::from_millis(100));
}
});
// O loop termina quando todos os Senders são dropados
for valor in rx {
println!("Recebido: {}", valor);
}
println!("Todos os valores processados.");
}
Múltiplos produtores (o “mp” do mpsc)
Clone o Sender para criar múltiplos produtores:
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
// Criar 3 produtores
for id in 0..3 {
let tx = tx.clone(); // clone do Sender
thread::spawn(move || {
for i in 0..3 {
let msg = format!("produtor-{}: msg-{}", id, i);
tx.send(msg).unwrap();
thread::sleep(Duration::from_millis(50));
}
});
}
// IMPORTANTE: dropar o tx original!
// Senão o receptor nunca sabe que todos os produtores terminaram
drop(tx);
for msg in rx {
println!("{}", msg);
}
println!("Todos os produtores finalizaram.");
}
Erro comum — esquecer de dropar o Sender original:
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
let tx_clone = tx.clone();
thread::spawn(move || {
tx_clone.send("olá").unwrap();
});
// BUG: se não dropar `tx`, o loop abaixo nunca termina!
// O Receiver espera para sempre porque ainda existe um Sender vivo.
drop(tx); // Sem esta linha, o programa trava
for msg in rx {
println!("{}", msg);
}
}
Canal síncrono (sync_channel) — com backpressure
sync_channel cria um canal com buffer limitado. send() bloqueia quando o buffer está cheio:
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
// Buffer de 2 mensagens
let (tx, rx) = mpsc::sync_channel(2);
thread::spawn(move || {
for i in 1..=5 {
println!("[produtor] enviando {}...", i);
tx.send(i).unwrap(); // bloqueia se buffer cheio
println!("[produtor] {} enviado!", i);
}
});
// Consumir lentamente para demonstrar o bloqueio
for _ in 0..5 {
thread::sleep(Duration::from_millis(500));
let val = rx.recv().unwrap();
println!("[consumidor] processou: {}", val);
}
}
Com buffer 0, o canal funciona como um rendezvous — o produtor bloqueia até o consumidor estar pronto:
use std::sync::mpsc;
use std::thread;
fn main() {
// Buffer 0 = rendezvous: send() bloqueia até recv() ser chamado
let (tx, rx) = mpsc::sync_channel(0);
thread::spawn(move || {
println!("Antes de send (vai bloquear até recv)");
tx.send(42).unwrap();
println!("Depois de send (recv aconteceu)");
});
std::thread::sleep(std::time::Duration::from_millis(500));
println!("Chamando recv...");
let val = rx.recv().unwrap();
println!("Recebido: {}", val);
}
try_recv — recebimento não-bloqueante
use std::sync::mpsc::{self, TryRecvError};
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
thread::sleep(Duration::from_millis(200));
tx.send("mensagem atrasada").unwrap();
});
// Polling com try_recv
loop {
match rx.try_recv() {
Ok(msg) => {
println!("Recebido: {}", msg);
break;
}
Err(TryRecvError::Empty) => {
println!("Nada ainda, fazendo outro trabalho...");
thread::sleep(Duration::from_millis(50));
}
Err(TryRecvError::Disconnected) => {
println!("Canal desconectado!");
break;
}
}
}
}
recv_timeout — recebimento com limite de tempo
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
thread::sleep(Duration::from_secs(2));
let _ = tx.send("resposta demorada");
});
// Espera no máximo 1 segundo
match rx.recv_timeout(Duration::from_secs(1)) {
Ok(msg) => println!("Recebido: {}", msg),
Err(mpsc::RecvTimeoutError::Timeout) => {
println!("Timeout! A resposta demorou demais.");
}
Err(mpsc::RecvTimeoutError::Disconnected) => {
println!("Canal desconectado.");
}
}
}
Padrão: mensagens tipadas com enum
Use um enum para enviar diferentes tipos de mensagem pelo mesmo canal:
use std::sync::mpsc;
use std::thread;
enum Comando {
Processar(String),
Calcular { a: f64, b: f64 },
Relatar,
Encerrar,
}
fn main() {
let (tx, rx) = mpsc::channel();
// Worker thread
let worker = thread::spawn(move || {
let mut contador = 0u32;
for cmd in rx {
match cmd {
Comando::Processar(texto) => {
println!("Processando: {}", texto.to_uppercase());
contador += 1;
}
Comando::Calcular { a, b } => {
println!("Resultado: {} + {} = {}", a, b, a + b);
contador += 1;
}
Comando::Relatar => {
println!("Comandos processados até agora: {}", contador);
}
Comando::Encerrar => {
println!("Encerrando worker. Total: {}", contador);
break;
}
}
}
});
tx.send(Comando::Processar("rust brasil".into())).unwrap();
tx.send(Comando::Calcular { a: 3.14, b: 2.71 }).unwrap();
tx.send(Comando::Relatar).unwrap();
tx.send(Comando::Processar("concorrência".into())).unwrap();
tx.send(Comando::Encerrar).unwrap();
worker.join().unwrap();
}
Padrão: pipeline de processamento
[Gerador] --canal1--> [Transformador] --canal2--> [Consumidor]
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx1, rx1) = mpsc::channel(); // gerador -> transformador
let (tx2, rx2) = mpsc::channel(); // transformador -> consumidor
// Estágio 1: gerar dados
thread::spawn(move || {
for i in 1..=5 {
tx1.send(i).unwrap();
}
});
// Estágio 2: transformar (dobrar o valor)
thread::spawn(move || {
for valor in rx1 {
tx2.send(valor * 2).unwrap();
}
});
// Estágio 3: consumir
for resultado in rx2 {
println!("Resultado final: {}", resultado);
}
}
Padrões Comuns e Anti-padrões
Padrão: fan-out / fan-in com canais
+--- Worker 0 ---+
[Distribuidor] ---|--- Worker 1 ---|--- [Coletor]
+--- Worker 2 ---+
use std::sync::mpsc;
use std::thread;
fn main() {
let (resultado_tx, resultado_rx) = mpsc::channel();
let num_workers = 3;
let mut senders = Vec::new();
for id in 0..num_workers {
let (tarefa_tx, tarefa_rx) = mpsc::channel::<i32>();
let resultado_tx = resultado_tx.clone();
thread::spawn(move || {
for tarefa in tarefa_rx {
let resultado = tarefa * tarefa; // processa
resultado_tx
.send(format!("Worker {}: {}^2 = {}", id, tarefa, resultado))
.unwrap();
}
});
senders.push(tarefa_tx);
}
drop(resultado_tx);
// Distribuir tarefas round-robin
for (i, tarefa) in (1..=9).enumerate() {
senders[i % num_workers].send(tarefa).unwrap();
}
drop(senders); // fechar canais dos workers
for resultado in resultado_rx {
println!("{}", resultado);
}
}
Anti-padrão: send após canal fechado
use std::sync::mpsc;
fn main() {
let (tx, rx) = mpsc::channel::<i32>();
// Dropar o receptor fecha o canal
drop(rx);
// send() retorna Err quando o receptor não existe mais
match tx.send(42) {
Ok(()) => println!("Enviado com sucesso"),
Err(e) => println!("Erro ao enviar: {} (receptor dropado)", e),
}
}
Garantias de Thread Safety
Sender<T>éSend(pode ser movido entre threads) eClone(múltiplos produtores).Receiver<T>éSendmas não éClone— apenas um consumidor.Tdeve serSendpara ser enviado pelo canal.- O canal assíncrono (
channel()) nunca bloqueia nosend()— ele aloca memória conforme necessário. - O canal síncrono (
sync_channel(n)) bloqueia osend()quando hánmensagens não consumidas. - Quando todos os
Senders são dropados,recv()retornaErr(RecvError)e o iterador termina.
Veja Também
- std::thread em Rust — criando threads para usar com canais
- Mutex e RwLock em Rust — alternativa: estado compartilhado
- Barrier e Condvar — sincronização avançada
- Padrões de Thread Safety — message passing vs shared state
- Concorrência em Rust — tutorial completo
- Documentação oficial:
std::sync::mpsc