O Crossbeam é uma coleção de ferramentas de concorrência de alta performance para Rust que complementa a biblioteca padrão. Enquanto std::sync oferece primitivas básicas como Mutex, RwLock e canais MPSC, o Crossbeam vai além com channels mais rápidos e flexíveis, scoped threads que permitem emprestar referências locais, estruturas de dados lock-free e um sistema de gerenciamento de memória baseado em epochs.
O Crossbeam nasceu de pesquisa acadêmica sobre concorrência segura e tem influenciado significativamente o design da própria biblioteca padrão do Rust. Várias funcionalidades que começaram no Crossbeam foram eventualmente incorporadas à stdlib, como std::thread::scope (estabilizado no Rust 1.63).
Se você precisa de concorrência de baixo nível com máxima performance — filas lock-free, channels multi-produtor multi-consumidor, ou comunicação eficiente entre threads — o Crossbeam é a ferramenta certa.
Instalação
Adicione o Crossbeam ao Cargo.toml:
[dependencies]
# Crate completa
crossbeam = "0.8"
# Ou apenas os módulos que precisa
crossbeam-channel = "0.5" # Channels MPMC
crossbeam-utils = "0.8" # Utilitários (scope, CachePadded, etc.)
crossbeam-deque = "0.8" # Deques work-stealing
crossbeam-queue = "0.3" # Filas lock-free (ArrayQueue, SegQueue)
crossbeam-epoch = "0.9" # Epoch-based memory reclamation
Uso Básico
Crossbeam Channels
Os channels do Crossbeam são significativamente mais rápidos que std::sync::mpsc e suportam múltiplos produtores E múltiplos consumidores (MPMC):
use crossbeam_channel::{bounded, unbounded, select, Receiver, Sender};
use std::thread;
use std::time::Duration;
fn main() {
// === Channel ilimitado (unbounded) ===
let (tx, rx) = unbounded::<String>();
thread::spawn(move || {
tx.send("Olá do Crossbeam!".to_string()).unwrap();
tx.send("Segunda mensagem".to_string()).unwrap();
});
// Receber mensagens
println!("{}", rx.recv().unwrap());
println!("{}", rx.recv().unwrap());
// === Channel limitado (bounded) ===
let (tx, rx) = bounded::<i32>(5); // Buffer de 5 itens
// send() bloqueia quando o buffer está cheio
thread::spawn(move || {
for i in 0..10 {
println!("Enviando: {}", i);
tx.send(i).unwrap(); // Bloqueia se buffer cheio
}
});
// recv() bloqueia quando o buffer está vazio
for _ in 0..10 {
thread::sleep(Duration::from_millis(100));
let valor = rx.recv().unwrap();
println!("Recebido: {}", valor);
}
// === Channel zero-sized (rendezvous) ===
let (tx, rx) = bounded::<String>(0);
// send() e recv() devem acontecer ao mesmo tempo (sincronização)
thread::spawn(move || {
tx.send("Sincronizado!".to_string()).unwrap();
println!("Enviado (após receptor estar pronto)");
});
thread::sleep(Duration::from_millis(100));
println!("Recebido: {}", rx.recv().unwrap());
}
Múltiplos Produtores e Consumidores
use crossbeam_channel::unbounded;
use std::thread;
fn main() {
let (tx, rx) = unbounded::<String>();
// Múltiplos produtores (clone o sender)
let mut produtores = vec![];
for id in 0..4 {
let tx = tx.clone();
produtores.push(thread::spawn(move || {
for i in 0..5 {
let msg = format!("Produtor {} - Msg {}", id, i);
tx.send(msg).unwrap();
}
}));
}
drop(tx); // Dropar o sender original
// Múltiplos consumidores (clone o receiver!)
// Diferente de std::sync::mpsc, o receiver pode ser clonado
let mut consumidores = vec![];
for id in 0..2 {
let rx = rx.clone();
consumidores.push(thread::spawn(move || {
let mut contagem = 0;
// Cada mensagem é recebida por APENAS um consumidor
while let Ok(msg) = rx.recv() {
println!("Consumidor {}: {}", id, msg);
contagem += 1;
}
println!("Consumidor {} processou {} mensagens", id, contagem);
}));
}
drop(rx); // Dropar o receiver original
// Aguardar todos
for p in produtores { p.join().unwrap(); }
for c in consumidores { c.join().unwrap(); }
}
Select - Esperar em Múltiplos Channels
use crossbeam_channel::{bounded, select, tick, after, never};
use std::time::Duration;
fn main() {
let (tx_dados, rx_dados) = bounded::<String>(10);
let (tx_controle, rx_controle) = bounded::<String>(1);
// Produtor de dados
std::thread::spawn(move || {
for i in 0..20 {
std::thread::sleep(Duration::from_millis(200));
tx_dados.send(format!("Dado {}", i)).unwrap();
}
});
// Sinal de parada após 2 segundos
std::thread::spawn(move || {
std::thread::sleep(Duration::from_secs(2));
tx_controle.send("PARAR".to_string()).unwrap();
});
// Timer periódico
let ticker = tick(Duration::from_millis(500));
// Timeout global
let timeout = after(Duration::from_secs(5));
let mut total = 0;
loop {
select! {
recv(rx_dados) -> msg => {
match msg {
Ok(dado) => {
println!("Recebido: {}", dado);
total += 1;
}
Err(_) => {
println!("Canal de dados fechado");
break;
}
}
}
recv(rx_controle) -> msg => {
if let Ok(cmd) = msg {
println!("Comando: {}", cmd);
if cmd == "PARAR" {
println!("Parando por comando...");
break;
}
}
}
recv(ticker) -> _ => {
println!("--- Heartbeat: {} mensagens processadas ---", total);
}
recv(timeout) -> _ => {
println!("Timeout global atingido!");
break;
}
}
}
println!("Total processado: {}", total);
}
Operações Não-Bloqueantes
use crossbeam_channel::{bounded, TryRecvError, TrySendError};
fn main() {
let (tx, rx) = bounded::<i32>(2);
// try_send: não bloqueia
tx.send(1).unwrap();
tx.send(2).unwrap();
match tx.try_send(3) {
Ok(()) => println!("Enviado"),
Err(TrySendError::Full(valor)) => println!("Canal cheio, {} não enviado", valor),
Err(TrySendError::Disconnected(_)) => println!("Canal desconectado"),
}
// try_recv: não bloqueia
match rx.try_recv() {
Ok(valor) => println!("Recebido: {}", valor),
Err(TryRecvError::Empty) => println!("Canal vazio"),
Err(TryRecvError::Disconnected) => println!("Canal desconectado"),
}
// recv_timeout: bloqueia com limite de tempo
use std::time::Duration;
match rx.recv_timeout(Duration::from_millis(100)) {
Ok(valor) => println!("Recebido: {}", valor),
Err(_) => println!("Timeout!"),
}
// Iterar sobre todas as mensagens disponíveis
let (tx, rx) = bounded(10);
for i in 0..5 {
tx.send(i).unwrap();
}
drop(tx);
// Usar como iterador (bloqueia até fechar)
for msg in rx {
println!("Iterado: {}", msg);
}
}
Recursos Avançados
Scoped Threads
Scoped threads permitem emprestar referências locais para threads, algo impossível com std::thread::spawn:
use crossbeam::scope;
fn main() {
let mut dados = vec![1, 2, 3, 4, 5, 6, 7, 8];
let mut resultados = vec![];
// scope garante que todas as threads terminam antes de retornar
scope(|s| {
// Dividir dados em duas metades
let (esquerda, direita) = dados.split_at_mut(4);
// Thread que empresta 'esquerda'
let handle_a = s.spawn(|_| {
esquerda.iter_mut().for_each(|x| *x *= 2);
esquerda.iter().sum::<i32>()
});
// Thread que empresta 'direita'
let handle_b = s.spawn(|_| {
direita.iter_mut().for_each(|x| *x *= 3);
direita.iter().sum::<i32>()
});
// Coletar resultados
resultados.push(handle_a.join().unwrap());
resultados.push(handle_b.join().unwrap());
}).unwrap();
// Ambas as threads terminaram, 'dados' foi modificado
println!("Dados modificados: {:?}", dados);
println!("Somas: {:?}", resultados);
// Padrão: processar chunks em paralelo
let dados: Vec<i32> = (0..100).collect();
let mut somas_parciais = Vec::new();
scope(|s| {
let mut handles = Vec::new();
for chunk in dados.chunks(25) {
handles.push(s.spawn(move |_| {
chunk.iter().sum::<i32>()
}));
}
for handle in handles {
somas_parciais.push(handle.join().unwrap());
}
}).unwrap();
let soma_total: i32 = somas_parciais.iter().sum();
println!("Soma total: {}", soma_total);
}
Estruturas de Dados Lock-Free
ArrayQueue - Fila de Tamanho Fixo
use crossbeam_queue::ArrayQueue;
use std::sync::Arc;
use std::thread;
fn main() {
// Fila lock-free com capacidade fixa
let fila = Arc::new(ArrayQueue::new(100));
// Múltiplos produtores
let mut produtores = vec![];
for id in 0..4 {
let fila = Arc::clone(&fila);
produtores.push(thread::spawn(move || {
for i in 0..25 {
let valor = id * 100 + i;
// push retorna Err se a fila estiver cheia
while fila.push(valor).is_err() {
std::hint::spin_loop();
}
}
}));
}
// Múltiplos consumidores
let mut consumidores = vec![];
for id in 0..2 {
let fila = Arc::clone(&fila);
consumidores.push(thread::spawn(move || {
let mut total = 0;
let mut contagem = 0;
loop {
match fila.pop() {
Some(valor) => {
total += valor;
contagem += 1;
}
None => {
// Verificar se produtores terminaram
thread::yield_now();
if fila.is_empty() {
break;
}
}
}
}
println!("Consumidor {}: {} itens, soma = {}", id, contagem, total);
}));
}
for p in produtores { p.join().unwrap(); }
// Dar tempo para consumidores processarem
thread::sleep(std::time::Duration::from_millis(100));
for c in consumidores { c.join().unwrap(); }
}
SegQueue - Fila de Tamanho Dinâmico
use crossbeam_queue::SegQueue;
use std::sync::Arc;
use std::thread;
fn main() {
// Fila lock-free sem limite de tamanho
let fila: Arc<SegQueue<String>> = Arc::new(SegQueue::new());
// Produtor
let fila_prod = Arc::clone(&fila);
let produtor = thread::spawn(move || {
for i in 0..1000 {
fila_prod.push(format!("item-{}", i));
}
});
// Consumidor
let fila_cons = Arc::clone(&fila);
let consumidor = thread::spawn(move || {
let mut contagem = 0;
loop {
match fila_cons.pop() {
Some(_item) => contagem += 1,
None => {
if contagem >= 1000 {
break;
}
thread::yield_now();
}
}
}
contagem
});
produtor.join().unwrap();
let total = consumidor.join().unwrap();
println!("Processados: {}", total);
}
Epoch-Based Memory Reclamation
O epoch-based garbage collection permite reclaimação segura de memória em estruturas de dados lock-free:
use crossbeam_epoch::{self as epoch, Atomic, Owned, Shared};
use std::sync::atomic::Ordering;
/// Stack lock-free simples usando epoch-based reclamation
struct LockFreeStack<T> {
head: Atomic<Node<T>>,
}
struct Node<T> {
data: T,
next: Atomic<Node<T>>,
}
impl<T> LockFreeStack<T> {
fn new() -> Self {
LockFreeStack {
head: Atomic::null(),
}
}
fn push(&self, valor: T) {
let mut node = Owned::new(Node {
data: valor,
next: Atomic::null(),
});
let guard = epoch::pin();
loop {
let head = self.head.load(Ordering::Relaxed, &guard);
node.next.store(head, Ordering::Relaxed);
match self.head.compare_exchange(
head,
node,
Ordering::Release,
Ordering::Relaxed,
&guard,
) {
Ok(_) => break,
Err(err) => node = err.new,
}
}
}
fn pop(&self) -> Option<T>
where
T: Clone,
{
let guard = epoch::pin();
loop {
let head = self.head.load(Ordering::Acquire, &guard);
match unsafe { head.as_ref() } {
None => return None,
Some(node) => {
let next = node.next.load(Ordering::Relaxed, &guard);
if self.head
.compare_exchange(
head,
next,
Ordering::Release,
Ordering::Relaxed,
&guard,
)
.is_ok()
{
let data = node.data.clone();
// Agendar dealocação segura
unsafe { guard.defer_destroy(head); }
return Some(data);
}
}
}
}
}
}
// Uso seguro sem preocupação com memory leaks
fn main() {
use std::sync::Arc;
use std::thread;
let stack = Arc::new(LockFreeStack::new());
let mut handles = vec![];
// Pushes paralelos
for i in 0..8 {
let stack = Arc::clone(&stack);
handles.push(thread::spawn(move || {
for j in 0..1000 {
stack.push(i * 1000 + j);
}
}));
}
for h in handles { h.join().unwrap(); }
// Pops paralelos
let mut total = 0;
while stack.pop().is_some() {
total += 1;
}
println!("Total de itens: {}", total);
}
CachePadded - Evitar False Sharing
use crossbeam_utils::CachePadded;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::thread;
// Sem CachePadded: contadores podem compartilhar cache line (false sharing)
struct ContadoresMau {
a: AtomicU64,
b: AtomicU64,
}
// Com CachePadded: cada contador em sua própria cache line
struct ContadoresBom {
a: CachePadded<AtomicU64>,
b: CachePadded<AtomicU64>,
}
fn main() {
let contadores = Arc::new(ContadoresBom {
a: CachePadded::new(AtomicU64::new(0)),
b: CachePadded::new(AtomicU64::new(0)),
});
let c1 = Arc::clone(&contadores);
let t1 = thread::spawn(move || {
for _ in 0..10_000_000 {
c1.a.fetch_add(1, Ordering::Relaxed);
}
});
let c2 = Arc::clone(&contadores);
let t2 = thread::spawn(move || {
for _ in 0..10_000_000 {
c2.b.fetch_add(1, Ordering::Relaxed);
}
});
t1.join().unwrap();
t2.join().unwrap();
println!("Contador A: {}", contadores.a.load(Ordering::Relaxed));
println!("Contador B: {}", contadores.b.load(Ordering::Relaxed));
}
Boas Práticas
1. Prefira Channels Bounded
use crossbeam_channel::bounded;
// BOM: bounded previne uso descontrolado de memória
let (tx, rx) = bounded::<Vec<u8>>(100);
// CUIDADO: unbounded pode consumir memória indefinidamente
// Use apenas quando souber que o consumidor acompanha o produtor
2. Use select! para Múltiplos Channels
use crossbeam_channel::{bounded, select, never, tick};
use std::time::Duration;
fn loop_de_eventos(
rx_dados: crossbeam_channel::Receiver<String>,
rx_shutdown: crossbeam_channel::Receiver<()>,
) {
let heartbeat = tick(Duration::from_secs(30));
loop {
select! {
recv(rx_dados) -> msg => {
if let Ok(dados) = msg {
processar(dados);
}
}
recv(rx_shutdown) -> _ => {
println!("Shutdown recebido");
break;
}
recv(heartbeat) -> _ => {
println!("Heartbeat");
}
}
}
}
fn processar(dados: String) {
println!("Processando: {}", dados);
}
3. Use Scoped Threads em Vez de Arc
use crossbeam::scope;
fn processar_dados(dados: &[Vec<f64>]) -> Vec<f64> {
let mut resultados = vec![0.0; dados.len()];
// Sem necessidade de Arc! Scope garante lifetime
scope(|s| {
for (i, (chunk, resultado)) in dados.iter().zip(resultados.iter_mut()).enumerate() {
s.spawn(move |_| {
*resultado = chunk.iter().sum();
});
}
}).unwrap();
resultados
}
4. Dimensione Buffers Adequadamente
use crossbeam_channel::bounded;
// Regra geral para buffer:
// - 0: sincronização ponto-a-ponto (rendezvous)
// - 1-10: baixa latência, backpressure agressivo
// - 100-1000: throughput, absorve picos
// - Nunca use unbounded em produção sem monitoramento
let (tx, rx) = bounded::<Job>(num_workers * 2); // 2x workers é um bom ponto de partida
5. Trate Channel Disconnection
use crossbeam_channel::{bounded, RecvError, SendError};
fn produtor(tx: crossbeam_channel::Sender<i32>) {
for i in 0..100 {
match tx.send(i) {
Ok(()) => {},
Err(SendError(_)) => {
println!("Consumidor desconectou, parando produtor");
break;
}
}
}
}
fn consumidor(rx: crossbeam_channel::Receiver<i32>) -> Vec<i32> {
let mut resultados = Vec::new();
loop {
match rx.recv() {
Ok(valor) => resultados.push(valor),
Err(RecvError) => {
println!("Produtor desconectou, parando consumidor");
break;
}
}
}
resultados
}
Exemplos Práticos
Exemplo: Pipeline Multi-Estágio Multi-Consumidor
use crossbeam_channel::{bounded, Receiver, Sender};
use std::thread;
use std::time::{Duration, Instant};
/// Dado bruto para processamento
#[derive(Debug, Clone)]
struct DadoBruto {
id: u64,
conteudo: String,
}
/// Dado processado
#[derive(Debug)]
struct DadoProcessado {
id: u64,
conteudo_original: String,
resultado: f64,
tempo_processamento: Duration,
}
/// Cria um estágio do pipeline com múltiplos workers
fn estagio<T, U, F>(
nome: &str,
rx: Receiver<T>,
tx: Sender<U>,
num_workers: usize,
processador: F,
) -> Vec<thread::JoinHandle<()>>
where
T: Send + 'static,
U: Send + 'static,
F: Fn(T) -> U + Send + Clone + 'static,
{
let mut handles = vec![];
for i in 0..num_workers {
let rx = rx.clone();
let tx = tx.clone();
let processador = processador.clone();
let nome = nome.to_string();
handles.push(thread::spawn(move || {
while let Ok(item) = rx.recv() {
let resultado = processador(item);
if tx.send(resultado).is_err() {
break;
}
}
println!("[{}] Worker {} encerrou", nome, i);
}));
}
drop(tx); // Dropar o sender do estágio principal
handles
}
fn main() {
let inicio = Instant::now();
// Canais entre estágios
let (tx_entrada, rx_entrada) = bounded::<DadoBruto>(50);
let (tx_validado, rx_validado) = bounded::<DadoBruto>(50);
let (tx_processado, rx_processado) = bounded::<DadoProcessado>(50);
// Estágio 1: Validação (2 workers)
let validadores = estagio("validação", rx_entrada, tx_validado, 2, |dado: DadoBruto| {
// Simular validação
thread::sleep(Duration::from_millis(5));
DadoBruto {
id: dado.id,
conteudo: dado.conteudo.trim().to_uppercase(),
}
});
// Estágio 2: Processamento pesado (4 workers)
let processadores = estagio("processamento", rx_validado, tx_processado, 4, |dado: DadoBruto| {
let inicio_proc = Instant::now();
// Simular processamento CPU-bound
thread::sleep(Duration::from_millis(20));
let resultado: f64 = dado.conteudo.len() as f64 * 3.14;
DadoProcessado {
id: dado.id,
conteudo_original: dado.conteudo,
resultado,
tempo_processamento: inicio_proc.elapsed(),
}
});
// Estágio 3: Coleta de resultados (1 thread)
let coletor = thread::spawn(move || {
let mut resultados = Vec::new();
while let Ok(processado) = rx_processado.recv() {
resultados.push(processado);
}
resultados
});
// Injetar dados no pipeline
for i in 0..100 {
let dado = DadoBruto {
id: i,
conteudo: format!("Dado de entrada número {}", i),
};
tx_entrada.send(dado).unwrap();
}
drop(tx_entrada); // Sinalizar fim da entrada
// Aguardar todos os estágios
for h in validadores { h.join().unwrap(); }
drop(tx_validado); // Fechar canal para próximo estágio
for h in processadores { h.join().unwrap(); }
drop(tx_processado);
let resultados = coletor.join().unwrap();
let tempo_total = inicio.elapsed();
println!("\n=== Resultados do Pipeline ===");
println!("Total processado: {}", resultados.len());
println!("Tempo total: {:?}", tempo_total);
if let Some(primeiro) = resultados.first() {
println!("Exemplo: ID={}, resultado={:.2}, tempo={:?}",
primeiro.id, primeiro.resultado, primeiro.tempo_processamento);
}
let tempo_medio: f64 = resultados.iter()
.map(|r| r.tempo_processamento.as_secs_f64())
.sum::<f64>() / resultados.len() as f64;
println!("Tempo médio por item: {:.4}s", tempo_medio);
println!("Throughput: {:.1} itens/s",
resultados.len() as f64 / tempo_total.as_secs_f64());
}
Exemplo 2: Worker Pool com Graceful Shutdown
use crossbeam_channel::{bounded, select, Receiver, Sender};
use std::thread;
use std::time::Duration;
type Job = Box<dyn FnOnce() + Send + 'static>;
struct WorkerPool {
workers: Vec<thread::JoinHandle<()>>,
tx_jobs: Sender<Job>,
tx_shutdown: Sender<()>,
}
impl WorkerPool {
fn new(num_workers: usize) -> Self {
let (tx_jobs, rx_jobs) = bounded::<Job>(num_workers * 2);
let (tx_shutdown, rx_shutdown) = bounded::<()>(num_workers);
let mut workers = vec![];
for id in 0..num_workers {
let rx_jobs = rx_jobs.clone();
let rx_shutdown = rx_shutdown.clone();
workers.push(thread::spawn(move || {
loop {
select! {
recv(rx_jobs) -> job => {
match job {
Ok(job) => job(),
Err(_) => break,
}
}
recv(rx_shutdown) -> _ => {
println!("Worker {} recebeu sinal de shutdown", id);
break;
}
}
}
println!("Worker {} encerrou", id);
}));
}
WorkerPool {
workers,
tx_jobs,
tx_shutdown,
}
}
fn enviar<F>(&self, job: F) -> Result<(), String>
where
F: FnOnce() + Send + 'static,
{
self.tx_jobs
.send(Box::new(job))
.map_err(|_| "Pool já foi encerrado".to_string())
}
fn shutdown(self) {
// Sinalizar shutdown para todos os workers
for _ in &self.workers {
let _ = self.tx_shutdown.send(());
}
// Aguardar todos terminarem
for worker in self.workers {
worker.join().unwrap();
}
println!("Pool encerrado gracefully");
}
}
fn main() {
let pool = WorkerPool::new(4);
// Enviar trabalho
for i in 0..20 {
pool.enviar(move || {
println!("Executando job {}", i);
thread::sleep(Duration::from_millis(50));
}).unwrap();
}
// Dar tempo para jobs executarem
thread::sleep(Duration::from_secs(1));
// Shutdown graceful
pool.shutdown();
}
Comparação com Alternativas
| Característica | Crossbeam | std::sync | Tokio sync | Rayon |
|---|---|---|---|---|
| Channels | MPMC, select!, bounded/unbounded | MPSC apenas | mpsc, broadcast, watch | Não |
| Scoped threads | Sim (+ std::thread::scope) | Sim (1.63+) | Não (async) | scope() |
| Lock-free queues | ArrayQueue, SegQueue | Não | Não | Deques internos |
| Epoch GC | Sim | Não | Não | Não |
| CachePadded | Sim | Não | Não | Não |
| Performance channels | Muito alta | Moderada | Alta (async) | N/A |
| MPMC | Sim | Não | Broadcast sim | N/A |
| select! | Sim | Não | Sim (async) | Não |
| Complexidade | Moderada | Simples | Moderada | Simples |
| Caso de uso | Concorrência de baixo nível | Básico | I/O assíncrono | Paralelismo dados |
O Crossbeam se destaca por:
- Channels MPMC: múltiplos produtores E consumidores, impossível com std::sync::mpsc
- Performance: channels até 10x mais rápidos que std::sync::mpsc
- select!: aguardar em múltiplos channels simultaneamente
- Estruturas lock-free: filas sem locks para máxima performance
- CachePadded: otimização de cache para evitar false sharing
Conclusão
O Crossbeam fornece as primitivas de concorrência que faltam na biblioteca padrão do Rust, sendo essencial para sistemas de alta performance que precisam de comunicação eficiente entre threads, estruturas lock-free ou pipelines multi-estágio.
Pontos-chave para lembrar:
- Channels MPMC permitem múltiplos consumidores (impossível com std)
select!para esperar em múltiplos channels simultaneamente- Scoped threads emprestam referências locais com segurança
- ArrayQueue/SegQueue para filas lock-free de altíssima performance
- CachePadded evita false sharing em estruturas compartilhadas
- Bounded channels previnem consumo descontrolado de memória
Para aprofundar, consulte a documentação do Crossbeam e o repositório no GitHub.
No próximo passo, explore o Axum para construir APIs web de alta performance em Rust.