Channels em Rust: mpsc e Comunicação

Guia completo de channels em Rust: mpsc::channel, sync_channel, Sender, Receiver, múltiplos produtores e padrões de mensagens em português.

O que faz e quando usar

O módulo std::sync::mpsc implementa canais de comunicação entre threads baseados no modelo de passagem de mensagens. “mpsc” significa Multiple Producer, Single Consumer — múltiplos produtores podem enviar mensagens para um único consumidor. Canais são a forma idiomática em Rust de transferir dados entre threads sem compartilhar memória.

  Produtor A ──send()──┐
                        │
  Produtor B ──send()──┤──> [ canal/buffer ] ──recv()──> Consumidor
                        │
  Produtor C ──send()──┘

Use channels quando:

  • Você quer comunicação unidirecional entre threads (produtor -> consumidor).
  • Prefere o modelo de passagem de mensagens ao invés de estado compartilhado (Mutex).
  • Precisa implementar padrões como worker pool, pipeline ou fan-out/fan-in.
  • Quer transferir ownership de dados entre threads de forma natural.

Channels são uma alternativa a Arc<Mutex<T>>: em vez de proteger dados compartilhados, você envia os dados de uma thread para outra.


Tipos e Funções Principais

ItemDescrição
mpsc::channel()Cria canal assíncrono (buffer ilimitado)
mpsc::sync_channel(n)Cria canal síncrono com buffer de tamanho n
Sender<T>Extremidade de envio (clonável para múltiplos produtores)
SyncSender<T>Extremidade de envio síncrono (bloqueia quando buffer cheio)
Receiver<T>Extremidade de recebimento (não clonável)
sender.send(val)Envia valor, retorna Result<(), SendError<T>>
receiver.recv()Recebe bloqueante, retorna Result<T, RecvError>
receiver.try_recv()Recebe não-bloqueante, retorna Result<T, TryRecvError>
receiver.recv_timeout(dur)Recebe com timeout
receiver.iter()Iterador bloqueante até o canal fechar

Exemplos de Código

Canal assíncrono básico

use std::sync::mpsc;
use std::thread;

fn main() {
    // channel() retorna (Sender, Receiver)
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let mensagens = vec!["olá", "do", "outro", "lado"];
        for msg in mensagens {
            tx.send(msg.to_string()).unwrap();
            println!("[produtor] enviou: {}", msg);
        }
        // tx é dropado aqui -> canal fecha
    });

    // recv() bloqueia até receber uma mensagem
    // Retorna Err quando o canal fecha (todos os Senders dropados)
    while let Ok(msg) = rx.recv() {
        println!("[consumidor] recebeu: {}", msg);
    }

    println!("Canal fechado, fim.");
}

Iterando sobre o Receiver

O Receiver implementa IntoIterator, permitindo um loop for elegante:

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        for i in 1..=5 {
            tx.send(i * i).unwrap();
            thread::sleep(Duration::from_millis(100));
        }
    });

    // O loop termina quando todos os Senders são dropados
    for valor in rx {
        println!("Recebido: {}", valor);
    }

    println!("Todos os valores processados.");
}

Múltiplos produtores (o “mp” do mpsc)

Clone o Sender para criar múltiplos produtores:

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    // Criar 3 produtores
    for id in 0..3 {
        let tx = tx.clone(); // clone do Sender
        thread::spawn(move || {
            for i in 0..3 {
                let msg = format!("produtor-{}: msg-{}", id, i);
                tx.send(msg).unwrap();
                thread::sleep(Duration::from_millis(50));
            }
        });
    }

    // IMPORTANTE: dropar o tx original!
    // Senão o receptor nunca sabe que todos os produtores terminaram
    drop(tx);

    for msg in rx {
        println!("{}", msg);
    }

    println!("Todos os produtores finalizaram.");
}

Erro comum — esquecer de dropar o Sender original:

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    let tx_clone = tx.clone();
    thread::spawn(move || {
        tx_clone.send("olá").unwrap();
    });

    // BUG: se não dropar `tx`, o loop abaixo nunca termina!
    // O Receiver espera para sempre porque ainda existe um Sender vivo.
    drop(tx); // Sem esta linha, o programa trava

    for msg in rx {
        println!("{}", msg);
    }
}

Canal síncrono (sync_channel) — com backpressure

sync_channel cria um canal com buffer limitado. send() bloqueia quando o buffer está cheio:

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    // Buffer de 2 mensagens
    let (tx, rx) = mpsc::sync_channel(2);

    thread::spawn(move || {
        for i in 1..=5 {
            println!("[produtor] enviando {}...", i);
            tx.send(i).unwrap(); // bloqueia se buffer cheio
            println!("[produtor] {} enviado!", i);
        }
    });

    // Consumir lentamente para demonstrar o bloqueio
    for _ in 0..5 {
        thread::sleep(Duration::from_millis(500));
        let val = rx.recv().unwrap();
        println!("[consumidor] processou: {}", val);
    }
}

Com buffer 0, o canal funciona como um rendezvous — o produtor bloqueia até o consumidor estar pronto:

use std::sync::mpsc;
use std::thread;

fn main() {
    // Buffer 0 = rendezvous: send() bloqueia até recv() ser chamado
    let (tx, rx) = mpsc::sync_channel(0);

    thread::spawn(move || {
        println!("Antes de send (vai bloquear até recv)");
        tx.send(42).unwrap();
        println!("Depois de send (recv aconteceu)");
    });

    std::thread::sleep(std::time::Duration::from_millis(500));
    println!("Chamando recv...");
    let val = rx.recv().unwrap();
    println!("Recebido: {}", val);
}

try_recv — recebimento não-bloqueante

use std::sync::mpsc::{self, TryRecvError};
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        thread::sleep(Duration::from_millis(200));
        tx.send("mensagem atrasada").unwrap();
    });

    // Polling com try_recv
    loop {
        match rx.try_recv() {
            Ok(msg) => {
                println!("Recebido: {}", msg);
                break;
            }
            Err(TryRecvError::Empty) => {
                println!("Nada ainda, fazendo outro trabalho...");
                thread::sleep(Duration::from_millis(50));
            }
            Err(TryRecvError::Disconnected) => {
                println!("Canal desconectado!");
                break;
            }
        }
    }
}

recv_timeout — recebimento com limite de tempo

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        thread::sleep(Duration::from_secs(2));
        let _ = tx.send("resposta demorada");
    });

    // Espera no máximo 1 segundo
    match rx.recv_timeout(Duration::from_secs(1)) {
        Ok(msg) => println!("Recebido: {}", msg),
        Err(mpsc::RecvTimeoutError::Timeout) => {
            println!("Timeout! A resposta demorou demais.");
        }
        Err(mpsc::RecvTimeoutError::Disconnected) => {
            println!("Canal desconectado.");
        }
    }
}

Padrão: mensagens tipadas com enum

Use um enum para enviar diferentes tipos de mensagem pelo mesmo canal:

use std::sync::mpsc;
use std::thread;

enum Comando {
    Processar(String),
    Calcular { a: f64, b: f64 },
    Relatar,
    Encerrar,
}

fn main() {
    let (tx, rx) = mpsc::channel();

    // Worker thread
    let worker = thread::spawn(move || {
        let mut contador = 0u32;
        for cmd in rx {
            match cmd {
                Comando::Processar(texto) => {
                    println!("Processando: {}", texto.to_uppercase());
                    contador += 1;
                }
                Comando::Calcular { a, b } => {
                    println!("Resultado: {} + {} = {}", a, b, a + b);
                    contador += 1;
                }
                Comando::Relatar => {
                    println!("Comandos processados até agora: {}", contador);
                }
                Comando::Encerrar => {
                    println!("Encerrando worker. Total: {}", contador);
                    break;
                }
            }
        }
    });

    tx.send(Comando::Processar("rust brasil".into())).unwrap();
    tx.send(Comando::Calcular { a: 3.14, b: 2.71 }).unwrap();
    tx.send(Comando::Relatar).unwrap();
    tx.send(Comando::Processar("concorrência".into())).unwrap();
    tx.send(Comando::Encerrar).unwrap();

    worker.join().unwrap();
}

Padrão: pipeline de processamento

  [Gerador] --canal1--> [Transformador] --canal2--> [Consumidor]
use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx1, rx1) = mpsc::channel(); // gerador -> transformador
    let (tx2, rx2) = mpsc::channel(); // transformador -> consumidor

    // Estágio 1: gerar dados
    thread::spawn(move || {
        for i in 1..=5 {
            tx1.send(i).unwrap();
        }
    });

    // Estágio 2: transformar (dobrar o valor)
    thread::spawn(move || {
        for valor in rx1 {
            tx2.send(valor * 2).unwrap();
        }
    });

    // Estágio 3: consumir
    for resultado in rx2 {
        println!("Resultado final: {}", resultado);
    }
}

Padrões Comuns e Anti-padrões

Padrão: fan-out / fan-in com canais

                    +--- Worker 0 ---+
  [Distribuidor] ---|--- Worker 1 ---|--- [Coletor]
                    +--- Worker 2 ---+
use std::sync::mpsc;
use std::thread;

fn main() {
    let (resultado_tx, resultado_rx) = mpsc::channel();
    let num_workers = 3;

    let mut senders = Vec::new();
    for id in 0..num_workers {
        let (tarefa_tx, tarefa_rx) = mpsc::channel::<i32>();
        let resultado_tx = resultado_tx.clone();

        thread::spawn(move || {
            for tarefa in tarefa_rx {
                let resultado = tarefa * tarefa; // processa
                resultado_tx
                    .send(format!("Worker {}: {}^2 = {}", id, tarefa, resultado))
                    .unwrap();
            }
        });

        senders.push(tarefa_tx);
    }
    drop(resultado_tx);

    // Distribuir tarefas round-robin
    for (i, tarefa) in (1..=9).enumerate() {
        senders[i % num_workers].send(tarefa).unwrap();
    }
    drop(senders); // fechar canais dos workers

    for resultado in resultado_rx {
        println!("{}", resultado);
    }
}

Anti-padrão: send após canal fechado

use std::sync::mpsc;

fn main() {
    let (tx, rx) = mpsc::channel::<i32>();

    // Dropar o receptor fecha o canal
    drop(rx);

    // send() retorna Err quando o receptor não existe mais
    match tx.send(42) {
        Ok(()) => println!("Enviado com sucesso"),
        Err(e) => println!("Erro ao enviar: {} (receptor dropado)", e),
    }
}

Garantias de Thread Safety

  • Sender<T> é Send (pode ser movido entre threads) e Clone (múltiplos produtores).
  • Receiver<T> é Send mas não é Clone — apenas um consumidor.
  • T deve ser Send para ser enviado pelo canal.
  • O canal assíncrono (channel()) nunca bloqueia no send() — ele aloca memória conforme necessário.
  • O canal síncrono (sync_channel(n)) bloqueia o send() quando há n mensagens não consumidas.
  • Quando todos os Senders são dropados, recv() retorna Err(RecvError) e o iterador termina.

Veja Também