Crossbeam: Primitivas de Concorrência Avançadas para Rust

Guia completo do Crossbeam para concorrência em Rust. Aprenda sobre channels de alta performance, scoped threads, epoch-based memory reclamation, estruturas lock-free e pipelines multi-produtor multi-consumidor.

O Crossbeam é uma coleção de ferramentas de concorrência de alta performance para Rust que complementa a biblioteca padrão. Enquanto std::sync oferece primitivas básicas como Mutex, RwLock e canais MPSC, o Crossbeam vai além com channels mais rápidos e flexíveis, scoped threads que permitem emprestar referências locais, estruturas de dados lock-free e um sistema de gerenciamento de memória baseado em epochs.

O Crossbeam nasceu de pesquisa acadêmica sobre concorrência segura e tem influenciado significativamente o design da própria biblioteca padrão do Rust. Várias funcionalidades que começaram no Crossbeam foram eventualmente incorporadas à stdlib, como std::thread::scope (estabilizado no Rust 1.63).

Se você precisa de concorrência de baixo nível com máxima performance — filas lock-free, channels multi-produtor multi-consumidor, ou comunicação eficiente entre threads — o Crossbeam é a ferramenta certa.

Instalação

Adicione o Crossbeam ao Cargo.toml:

[dependencies]
# Crate completa
crossbeam = "0.8"

# Ou apenas os módulos que precisa
crossbeam-channel = "0.5"    # Channels MPMC
crossbeam-utils = "0.8"      # Utilitários (scope, CachePadded, etc.)
crossbeam-deque = "0.8"      # Deques work-stealing
crossbeam-queue = "0.3"      # Filas lock-free (ArrayQueue, SegQueue)
crossbeam-epoch = "0.9"      # Epoch-based memory reclamation

Uso Básico

Crossbeam Channels

Os channels do Crossbeam são significativamente mais rápidos que std::sync::mpsc e suportam múltiplos produtores E múltiplos consumidores (MPMC):

use crossbeam_channel::{bounded, unbounded, select, Receiver, Sender};
use std::thread;
use std::time::Duration;

fn main() {
    // === Channel ilimitado (unbounded) ===
    let (tx, rx) = unbounded::<String>();

    thread::spawn(move || {
        tx.send("Olá do Crossbeam!".to_string()).unwrap();
        tx.send("Segunda mensagem".to_string()).unwrap();
    });

    // Receber mensagens
    println!("{}", rx.recv().unwrap());
    println!("{}", rx.recv().unwrap());

    // === Channel limitado (bounded) ===
    let (tx, rx) = bounded::<i32>(5); // Buffer de 5 itens

    // send() bloqueia quando o buffer está cheio
    thread::spawn(move || {
        for i in 0..10 {
            println!("Enviando: {}", i);
            tx.send(i).unwrap(); // Bloqueia se buffer cheio
        }
    });

    // recv() bloqueia quando o buffer está vazio
    for _ in 0..10 {
        thread::sleep(Duration::from_millis(100));
        let valor = rx.recv().unwrap();
        println!("Recebido: {}", valor);
    }

    // === Channel zero-sized (rendezvous) ===
    let (tx, rx) = bounded::<String>(0);
    // send() e recv() devem acontecer ao mesmo tempo (sincronização)

    thread::spawn(move || {
        tx.send("Sincronizado!".to_string()).unwrap();
        println!("Enviado (após receptor estar pronto)");
    });

    thread::sleep(Duration::from_millis(100));
    println!("Recebido: {}", rx.recv().unwrap());
}

Múltiplos Produtores e Consumidores

use crossbeam_channel::unbounded;
use std::thread;

fn main() {
    let (tx, rx) = unbounded::<String>();

    // Múltiplos produtores (clone o sender)
    let mut produtores = vec![];
    for id in 0..4 {
        let tx = tx.clone();
        produtores.push(thread::spawn(move || {
            for i in 0..5 {
                let msg = format!("Produtor {} - Msg {}", id, i);
                tx.send(msg).unwrap();
            }
        }));
    }
    drop(tx); // Dropar o sender original

    // Múltiplos consumidores (clone o receiver!)
    // Diferente de std::sync::mpsc, o receiver pode ser clonado
    let mut consumidores = vec![];
    for id in 0..2 {
        let rx = rx.clone();
        consumidores.push(thread::spawn(move || {
            let mut contagem = 0;
            // Cada mensagem é recebida por APENAS um consumidor
            while let Ok(msg) = rx.recv() {
                println!("Consumidor {}: {}", id, msg);
                contagem += 1;
            }
            println!("Consumidor {} processou {} mensagens", id, contagem);
        }));
    }
    drop(rx); // Dropar o receiver original

    // Aguardar todos
    for p in produtores { p.join().unwrap(); }
    for c in consumidores { c.join().unwrap(); }
}

Select - Esperar em Múltiplos Channels

use crossbeam_channel::{bounded, select, tick, after, never};
use std::time::Duration;

fn main() {
    let (tx_dados, rx_dados) = bounded::<String>(10);
    let (tx_controle, rx_controle) = bounded::<String>(1);

    // Produtor de dados
    std::thread::spawn(move || {
        for i in 0..20 {
            std::thread::sleep(Duration::from_millis(200));
            tx_dados.send(format!("Dado {}", i)).unwrap();
        }
    });

    // Sinal de parada após 2 segundos
    std::thread::spawn(move || {
        std::thread::sleep(Duration::from_secs(2));
        tx_controle.send("PARAR".to_string()).unwrap();
    });

    // Timer periódico
    let ticker = tick(Duration::from_millis(500));

    // Timeout global
    let timeout = after(Duration::from_secs(5));

    let mut total = 0;

    loop {
        select! {
            recv(rx_dados) -> msg => {
                match msg {
                    Ok(dado) => {
                        println!("Recebido: {}", dado);
                        total += 1;
                    }
                    Err(_) => {
                        println!("Canal de dados fechado");
                        break;
                    }
                }
            }
            recv(rx_controle) -> msg => {
                if let Ok(cmd) = msg {
                    println!("Comando: {}", cmd);
                    if cmd == "PARAR" {
                        println!("Parando por comando...");
                        break;
                    }
                }
            }
            recv(ticker) -> _ => {
                println!("--- Heartbeat: {} mensagens processadas ---", total);
            }
            recv(timeout) -> _ => {
                println!("Timeout global atingido!");
                break;
            }
        }
    }

    println!("Total processado: {}", total);
}

Operações Não-Bloqueantes

use crossbeam_channel::{bounded, TryRecvError, TrySendError};

fn main() {
    let (tx, rx) = bounded::<i32>(2);

    // try_send: não bloqueia
    tx.send(1).unwrap();
    tx.send(2).unwrap();
    match tx.try_send(3) {
        Ok(()) => println!("Enviado"),
        Err(TrySendError::Full(valor)) => println!("Canal cheio, {} não enviado", valor),
        Err(TrySendError::Disconnected(_)) => println!("Canal desconectado"),
    }

    // try_recv: não bloqueia
    match rx.try_recv() {
        Ok(valor) => println!("Recebido: {}", valor),
        Err(TryRecvError::Empty) => println!("Canal vazio"),
        Err(TryRecvError::Disconnected) => println!("Canal desconectado"),
    }

    // recv_timeout: bloqueia com limite de tempo
    use std::time::Duration;
    match rx.recv_timeout(Duration::from_millis(100)) {
        Ok(valor) => println!("Recebido: {}", valor),
        Err(_) => println!("Timeout!"),
    }

    // Iterar sobre todas as mensagens disponíveis
    let (tx, rx) = bounded(10);
    for i in 0..5 {
        tx.send(i).unwrap();
    }
    drop(tx);

    // Usar como iterador (bloqueia até fechar)
    for msg in rx {
        println!("Iterado: {}", msg);
    }
}

Recursos Avançados

Scoped Threads

Scoped threads permitem emprestar referências locais para threads, algo impossível com std::thread::spawn:

use crossbeam::scope;

fn main() {
    let mut dados = vec![1, 2, 3, 4, 5, 6, 7, 8];
    let mut resultados = vec![];

    // scope garante que todas as threads terminam antes de retornar
    scope(|s| {
        // Dividir dados em duas metades
        let (esquerda, direita) = dados.split_at_mut(4);

        // Thread que empresta 'esquerda'
        let handle_a = s.spawn(|_| {
            esquerda.iter_mut().for_each(|x| *x *= 2);
            esquerda.iter().sum::<i32>()
        });

        // Thread que empresta 'direita'
        let handle_b = s.spawn(|_| {
            direita.iter_mut().for_each(|x| *x *= 3);
            direita.iter().sum::<i32>()
        });

        // Coletar resultados
        resultados.push(handle_a.join().unwrap());
        resultados.push(handle_b.join().unwrap());
    }).unwrap();

    // Ambas as threads terminaram, 'dados' foi modificado
    println!("Dados modificados: {:?}", dados);
    println!("Somas: {:?}", resultados);

    // Padrão: processar chunks em paralelo
    let dados: Vec<i32> = (0..100).collect();
    let mut somas_parciais = Vec::new();

    scope(|s| {
        let mut handles = Vec::new();

        for chunk in dados.chunks(25) {
            handles.push(s.spawn(move |_| {
                chunk.iter().sum::<i32>()
            }));
        }

        for handle in handles {
            somas_parciais.push(handle.join().unwrap());
        }
    }).unwrap();

    let soma_total: i32 = somas_parciais.iter().sum();
    println!("Soma total: {}", soma_total);
}

Estruturas de Dados Lock-Free

ArrayQueue - Fila de Tamanho Fixo

use crossbeam_queue::ArrayQueue;
use std::sync::Arc;
use std::thread;

fn main() {
    // Fila lock-free com capacidade fixa
    let fila = Arc::new(ArrayQueue::new(100));

    // Múltiplos produtores
    let mut produtores = vec![];
    for id in 0..4 {
        let fila = Arc::clone(&fila);
        produtores.push(thread::spawn(move || {
            for i in 0..25 {
                let valor = id * 100 + i;
                // push retorna Err se a fila estiver cheia
                while fila.push(valor).is_err() {
                    std::hint::spin_loop();
                }
            }
        }));
    }

    // Múltiplos consumidores
    let mut consumidores = vec![];
    for id in 0..2 {
        let fila = Arc::clone(&fila);
        consumidores.push(thread::spawn(move || {
            let mut total = 0;
            let mut contagem = 0;
            loop {
                match fila.pop() {
                    Some(valor) => {
                        total += valor;
                        contagem += 1;
                    }
                    None => {
                        // Verificar se produtores terminaram
                        thread::yield_now();
                        if fila.is_empty() {
                            break;
                        }
                    }
                }
            }
            println!("Consumidor {}: {} itens, soma = {}", id, contagem, total);
        }));
    }

    for p in produtores { p.join().unwrap(); }
    // Dar tempo para consumidores processarem
    thread::sleep(std::time::Duration::from_millis(100));
    for c in consumidores { c.join().unwrap(); }
}

SegQueue - Fila de Tamanho Dinâmico

use crossbeam_queue::SegQueue;
use std::sync::Arc;
use std::thread;

fn main() {
    // Fila lock-free sem limite de tamanho
    let fila: Arc<SegQueue<String>> = Arc::new(SegQueue::new());

    // Produtor
    let fila_prod = Arc::clone(&fila);
    let produtor = thread::spawn(move || {
        for i in 0..1000 {
            fila_prod.push(format!("item-{}", i));
        }
    });

    // Consumidor
    let fila_cons = Arc::clone(&fila);
    let consumidor = thread::spawn(move || {
        let mut contagem = 0;
        loop {
            match fila_cons.pop() {
                Some(_item) => contagem += 1,
                None => {
                    if contagem >= 1000 {
                        break;
                    }
                    thread::yield_now();
                }
            }
        }
        contagem
    });

    produtor.join().unwrap();
    let total = consumidor.join().unwrap();
    println!("Processados: {}", total);
}

Epoch-Based Memory Reclamation

O epoch-based garbage collection permite reclaimação segura de memória em estruturas de dados lock-free:

use crossbeam_epoch::{self as epoch, Atomic, Owned, Shared};
use std::sync::atomic::Ordering;

/// Stack lock-free simples usando epoch-based reclamation
struct LockFreeStack<T> {
    head: Atomic<Node<T>>,
}

struct Node<T> {
    data: T,
    next: Atomic<Node<T>>,
}

impl<T> LockFreeStack<T> {
    fn new() -> Self {
        LockFreeStack {
            head: Atomic::null(),
        }
    }

    fn push(&self, valor: T) {
        let mut node = Owned::new(Node {
            data: valor,
            next: Atomic::null(),
        });

        let guard = epoch::pin();

        loop {
            let head = self.head.load(Ordering::Relaxed, &guard);
            node.next.store(head, Ordering::Relaxed);

            match self.head.compare_exchange(
                head,
                node,
                Ordering::Release,
                Ordering::Relaxed,
                &guard,
            ) {
                Ok(_) => break,
                Err(err) => node = err.new,
            }
        }
    }

    fn pop(&self) -> Option<T>
    where
        T: Clone,
    {
        let guard = epoch::pin();

        loop {
            let head = self.head.load(Ordering::Acquire, &guard);

            match unsafe { head.as_ref() } {
                None => return None,
                Some(node) => {
                    let next = node.next.load(Ordering::Relaxed, &guard);

                    if self.head
                        .compare_exchange(
                            head,
                            next,
                            Ordering::Release,
                            Ordering::Relaxed,
                            &guard,
                        )
                        .is_ok()
                    {
                        let data = node.data.clone();
                        // Agendar dealocação segura
                        unsafe { guard.defer_destroy(head); }
                        return Some(data);
                    }
                }
            }
        }
    }
}

// Uso seguro sem preocupação com memory leaks
fn main() {
    use std::sync::Arc;
    use std::thread;

    let stack = Arc::new(LockFreeStack::new());

    let mut handles = vec![];

    // Pushes paralelos
    for i in 0..8 {
        let stack = Arc::clone(&stack);
        handles.push(thread::spawn(move || {
            for j in 0..1000 {
                stack.push(i * 1000 + j);
            }
        }));
    }

    for h in handles { h.join().unwrap(); }

    // Pops paralelos
    let mut total = 0;
    while stack.pop().is_some() {
        total += 1;
    }
    println!("Total de itens: {}", total);
}

CachePadded - Evitar False Sharing

use crossbeam_utils::CachePadded;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::thread;

// Sem CachePadded: contadores podem compartilhar cache line (false sharing)
struct ContadoresMau {
    a: AtomicU64,
    b: AtomicU64,
}

// Com CachePadded: cada contador em sua própria cache line
struct ContadoresBom {
    a: CachePadded<AtomicU64>,
    b: CachePadded<AtomicU64>,
}

fn main() {
    let contadores = Arc::new(ContadoresBom {
        a: CachePadded::new(AtomicU64::new(0)),
        b: CachePadded::new(AtomicU64::new(0)),
    });

    let c1 = Arc::clone(&contadores);
    let t1 = thread::spawn(move || {
        for _ in 0..10_000_000 {
            c1.a.fetch_add(1, Ordering::Relaxed);
        }
    });

    let c2 = Arc::clone(&contadores);
    let t2 = thread::spawn(move || {
        for _ in 0..10_000_000 {
            c2.b.fetch_add(1, Ordering::Relaxed);
        }
    });

    t1.join().unwrap();
    t2.join().unwrap();

    println!("Contador A: {}", contadores.a.load(Ordering::Relaxed));
    println!("Contador B: {}", contadores.b.load(Ordering::Relaxed));
}

Boas Práticas

1. Prefira Channels Bounded

use crossbeam_channel::bounded;

// BOM: bounded previne uso descontrolado de memória
let (tx, rx) = bounded::<Vec<u8>>(100);

// CUIDADO: unbounded pode consumir memória indefinidamente
// Use apenas quando souber que o consumidor acompanha o produtor

2. Use select! para Múltiplos Channels

use crossbeam_channel::{bounded, select, never, tick};
use std::time::Duration;

fn loop_de_eventos(
    rx_dados: crossbeam_channel::Receiver<String>,
    rx_shutdown: crossbeam_channel::Receiver<()>,
) {
    let heartbeat = tick(Duration::from_secs(30));

    loop {
        select! {
            recv(rx_dados) -> msg => {
                if let Ok(dados) = msg {
                    processar(dados);
                }
            }
            recv(rx_shutdown) -> _ => {
                println!("Shutdown recebido");
                break;
            }
            recv(heartbeat) -> _ => {
                println!("Heartbeat");
            }
        }
    }
}

fn processar(dados: String) {
    println!("Processando: {}", dados);
}

3. Use Scoped Threads em Vez de Arc

use crossbeam::scope;

fn processar_dados(dados: &[Vec<f64>]) -> Vec<f64> {
    let mut resultados = vec![0.0; dados.len()];

    // Sem necessidade de Arc! Scope garante lifetime
    scope(|s| {
        for (i, (chunk, resultado)) in dados.iter().zip(resultados.iter_mut()).enumerate() {
            s.spawn(move |_| {
                *resultado = chunk.iter().sum();
            });
        }
    }).unwrap();

    resultados
}

4. Dimensione Buffers Adequadamente

use crossbeam_channel::bounded;

// Regra geral para buffer:
// - 0: sincronização ponto-a-ponto (rendezvous)
// - 1-10: baixa latência, backpressure agressivo
// - 100-1000: throughput, absorve picos
// - Nunca use unbounded em produção sem monitoramento

let (tx, rx) = bounded::<Job>(num_workers * 2); // 2x workers é um bom ponto de partida

5. Trate Channel Disconnection

use crossbeam_channel::{bounded, RecvError, SendError};

fn produtor(tx: crossbeam_channel::Sender<i32>) {
    for i in 0..100 {
        match tx.send(i) {
            Ok(()) => {},
            Err(SendError(_)) => {
                println!("Consumidor desconectou, parando produtor");
                break;
            }
        }
    }
}

fn consumidor(rx: crossbeam_channel::Receiver<i32>) -> Vec<i32> {
    let mut resultados = Vec::new();
    loop {
        match rx.recv() {
            Ok(valor) => resultados.push(valor),
            Err(RecvError) => {
                println!("Produtor desconectou, parando consumidor");
                break;
            }
        }
    }
    resultados
}

Exemplos Práticos

Exemplo: Pipeline Multi-Estágio Multi-Consumidor

use crossbeam_channel::{bounded, Receiver, Sender};
use std::thread;
use std::time::{Duration, Instant};

/// Dado bruto para processamento
#[derive(Debug, Clone)]
struct DadoBruto {
    id: u64,
    conteudo: String,
}

/// Dado processado
#[derive(Debug)]
struct DadoProcessado {
    id: u64,
    conteudo_original: String,
    resultado: f64,
    tempo_processamento: Duration,
}

/// Cria um estágio do pipeline com múltiplos workers
fn estagio<T, U, F>(
    nome: &str,
    rx: Receiver<T>,
    tx: Sender<U>,
    num_workers: usize,
    processador: F,
) -> Vec<thread::JoinHandle<()>>
where
    T: Send + 'static,
    U: Send + 'static,
    F: Fn(T) -> U + Send + Clone + 'static,
{
    let mut handles = vec![];
    for i in 0..num_workers {
        let rx = rx.clone();
        let tx = tx.clone();
        let processador = processador.clone();
        let nome = nome.to_string();

        handles.push(thread::spawn(move || {
            while let Ok(item) = rx.recv() {
                let resultado = processador(item);
                if tx.send(resultado).is_err() {
                    break;
                }
            }
            println!("[{}] Worker {} encerrou", nome, i);
        }));
    }
    drop(tx); // Dropar o sender do estágio principal
    handles
}

fn main() {
    let inicio = Instant::now();

    // Canais entre estágios
    let (tx_entrada, rx_entrada) = bounded::<DadoBruto>(50);
    let (tx_validado, rx_validado) = bounded::<DadoBruto>(50);
    let (tx_processado, rx_processado) = bounded::<DadoProcessado>(50);

    // Estágio 1: Validação (2 workers)
    let validadores = estagio("validação", rx_entrada, tx_validado, 2, |dado: DadoBruto| {
        // Simular validação
        thread::sleep(Duration::from_millis(5));
        DadoBruto {
            id: dado.id,
            conteudo: dado.conteudo.trim().to_uppercase(),
        }
    });

    // Estágio 2: Processamento pesado (4 workers)
    let processadores = estagio("processamento", rx_validado, tx_processado, 4, |dado: DadoBruto| {
        let inicio_proc = Instant::now();
        // Simular processamento CPU-bound
        thread::sleep(Duration::from_millis(20));
        let resultado: f64 = dado.conteudo.len() as f64 * 3.14;

        DadoProcessado {
            id: dado.id,
            conteudo_original: dado.conteudo,
            resultado,
            tempo_processamento: inicio_proc.elapsed(),
        }
    });

    // Estágio 3: Coleta de resultados (1 thread)
    let coletor = thread::spawn(move || {
        let mut resultados = Vec::new();
        while let Ok(processado) = rx_processado.recv() {
            resultados.push(processado);
        }
        resultados
    });

    // Injetar dados no pipeline
    for i in 0..100 {
        let dado = DadoBruto {
            id: i,
            conteudo: format!("Dado de entrada número {}", i),
        };
        tx_entrada.send(dado).unwrap();
    }
    drop(tx_entrada); // Sinalizar fim da entrada

    // Aguardar todos os estágios
    for h in validadores { h.join().unwrap(); }
    drop(tx_validado); // Fechar canal para próximo estágio
    for h in processadores { h.join().unwrap(); }
    drop(tx_processado);

    let resultados = coletor.join().unwrap();

    let tempo_total = inicio.elapsed();
    println!("\n=== Resultados do Pipeline ===");
    println!("Total processado: {}", resultados.len());
    println!("Tempo total: {:?}", tempo_total);

    if let Some(primeiro) = resultados.first() {
        println!("Exemplo: ID={}, resultado={:.2}, tempo={:?}",
            primeiro.id, primeiro.resultado, primeiro.tempo_processamento);
    }

    let tempo_medio: f64 = resultados.iter()
        .map(|r| r.tempo_processamento.as_secs_f64())
        .sum::<f64>() / resultados.len() as f64;
    println!("Tempo médio por item: {:.4}s", tempo_medio);
    println!("Throughput: {:.1} itens/s",
        resultados.len() as f64 / tempo_total.as_secs_f64());
}

Exemplo 2: Worker Pool com Graceful Shutdown

use crossbeam_channel::{bounded, select, Receiver, Sender};
use std::thread;
use std::time::Duration;

type Job = Box<dyn FnOnce() + Send + 'static>;

struct WorkerPool {
    workers: Vec<thread::JoinHandle<()>>,
    tx_jobs: Sender<Job>,
    tx_shutdown: Sender<()>,
}

impl WorkerPool {
    fn new(num_workers: usize) -> Self {
        let (tx_jobs, rx_jobs) = bounded::<Job>(num_workers * 2);
        let (tx_shutdown, rx_shutdown) = bounded::<()>(num_workers);

        let mut workers = vec![];

        for id in 0..num_workers {
            let rx_jobs = rx_jobs.clone();
            let rx_shutdown = rx_shutdown.clone();

            workers.push(thread::spawn(move || {
                loop {
                    select! {
                        recv(rx_jobs) -> job => {
                            match job {
                                Ok(job) => job(),
                                Err(_) => break,
                            }
                        }
                        recv(rx_shutdown) -> _ => {
                            println!("Worker {} recebeu sinal de shutdown", id);
                            break;
                        }
                    }
                }
                println!("Worker {} encerrou", id);
            }));
        }

        WorkerPool {
            workers,
            tx_jobs,
            tx_shutdown,
        }
    }

    fn enviar<F>(&self, job: F) -> Result<(), String>
    where
        F: FnOnce() + Send + 'static,
    {
        self.tx_jobs
            .send(Box::new(job))
            .map_err(|_| "Pool já foi encerrado".to_string())
    }

    fn shutdown(self) {
        // Sinalizar shutdown para todos os workers
        for _ in &self.workers {
            let _ = self.tx_shutdown.send(());
        }

        // Aguardar todos terminarem
        for worker in self.workers {
            worker.join().unwrap();
        }
        println!("Pool encerrado gracefully");
    }
}

fn main() {
    let pool = WorkerPool::new(4);

    // Enviar trabalho
    for i in 0..20 {
        pool.enviar(move || {
            println!("Executando job {}", i);
            thread::sleep(Duration::from_millis(50));
        }).unwrap();
    }

    // Dar tempo para jobs executarem
    thread::sleep(Duration::from_secs(1));

    // Shutdown graceful
    pool.shutdown();
}

Comparação com Alternativas

CaracterísticaCrossbeamstd::syncTokio syncRayon
ChannelsMPMC, select!, bounded/unboundedMPSC apenasmpsc, broadcast, watchNão
Scoped threadsSim (+ std::thread::scope)Sim (1.63+)Não (async)scope()
Lock-free queuesArrayQueue, SegQueueNãoNãoDeques internos
Epoch GCSimNãoNãoNão
CachePaddedSimNãoNãoNão
Performance channelsMuito altaModeradaAlta (async)N/A
MPMCSimNãoBroadcast simN/A
select!SimNãoSim (async)Não
ComplexidadeModeradaSimplesModeradaSimples
Caso de usoConcorrência de baixo nívelBásicoI/O assíncronoParalelismo dados

O Crossbeam se destaca por:

  • Channels MPMC: múltiplos produtores E consumidores, impossível com std::sync::mpsc
  • Performance: channels até 10x mais rápidos que std::sync::mpsc
  • select!: aguardar em múltiplos channels simultaneamente
  • Estruturas lock-free: filas sem locks para máxima performance
  • CachePadded: otimização de cache para evitar false sharing

Conclusão

O Crossbeam fornece as primitivas de concorrência que faltam na biblioteca padrão do Rust, sendo essencial para sistemas de alta performance que precisam de comunicação eficiente entre threads, estruturas lock-free ou pipelines multi-estágio.

Pontos-chave para lembrar:

  • Channels MPMC permitem múltiplos consumidores (impossível com std)
  • select! para esperar em múltiplos channels simultaneamente
  • Scoped threads emprestam referências locais com segurança
  • ArrayQueue/SegQueue para filas lock-free de altíssima performance
  • CachePadded evita false sharing em estruturas compartilhadas
  • Bounded channels previnem consumo descontrolado de memória

Para aprofundar, consulte a documentação do Crossbeam e o repositório no GitHub.

No próximo passo, explore o Axum para construir APIs web de alta performance em Rust.