Padrões de Thread Safety em Rust

Guia completo de padrões de thread safety em Rust: Arc+Mutex, Arc+RwLock, message passing, thread pools, Rayon, actor model e boas práticas.

O que faz e quando usar

Este guia reune os padrões de projeto mais importantes para concorrência segura em Rust. Em vez de abordar uma única primitiva, aqui comparamos as diferentes abordagens e mostramos quando usar cada uma. O sistema de tipos do Rust garante segurança em tempo de compilação, mas escolher o padrão certo é responsabilidade do programador.

As duas filosofias

  ┌─────────────────────────────────────────────────────────┐
  │           Concorrência em Rust                           │
  │                                                          │
  │  ┌──────────────────┐    ┌────────────────────────────┐ │
  │  │ Passagem de       │    │ Estado Compartilhado        │ │
  │  │ Mensagens          │    │                              │ │
  │  │                    │    │  Arc<Mutex<T>>               │ │
  │  │ mpsc::channel      │    │  Arc<RwLock<T>>              │ │
  │  │ Crossbeam channel  │    │  Arc<AtomicT>                │ │
  │  │ Actor model        │    │  Scoped threads + &mut      │ │
  │  └──────────────────┘    └────────────────────────────┘ │
  │                                                          │
  │  "Não comunique compartilhando memória;                  │
  │   compartilhe memória comunicando."                      │
  └─────────────────────────────────────────────────────────┘

Padrão 1: Arc<Mutex> — Estado Compartilhado Simples

O padrão mais comum para compartilhar dados mutáveis entre threads. Arc fornece contagem de referência atômica (múltiplos donos) e Mutex garante acesso exclusivo.

Quando usar: dados compartilhados com leituras e escritas frequentes, acesso de curta duração.

use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::thread;

type CacheCompartilhado = Arc<Mutex<HashMap<String, String>>>;

fn main() {
    let cache: CacheCompartilhado = Arc::new(Mutex::new(HashMap::new()));

    let mut handles = vec![];

    // Escritores
    for i in 0..5 {
        let cache = Arc::clone(&cache);
        handles.push(thread::spawn(move || {
            let chave = format!("chave-{}", i);
            let valor = format!("valor-{}", i);

            let mut map = cache.lock().unwrap();
            map.insert(chave.clone(), valor);
            println!("Inserido: {}", chave);
        }));
    }

    // Leitores (após escritores terminarem)
    for h in handles {
        h.join().unwrap();
    }

    // Leitura final
    let map = cache.lock().unwrap();
    println!("Cache: {:?}", *map);
}

Dica: minimizar tempo de lock

use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
    let dados = Arc::new(Mutex::new(Vec::new()));

    let d = Arc::clone(&dados);
    let handle = thread::spawn(move || {
        // RUIM: segurar lock durante cálculo pesado
        // let mut guard = d.lock().unwrap();
        // let resultado = calculo_pesado();  // bloqueia outras threads!
        // guard.push(resultado);

        // BOM: calcular FORA do lock, inserir DENTRO
        let resultado = calculo_pesado();
        {
            d.lock().unwrap().push(resultado);
        }
    });

    handle.join().unwrap();
    println!("{:?}", dados.lock().unwrap());
}

fn calculo_pesado() -> i32 {
    std::thread::sleep(std::time::Duration::from_millis(100));
    42
}

Padrão 2: Arc<RwLock> — Muitas Leituras, Poucas Escritas

Quando leituras são muito mais frequentes que escritas, RwLock permite paralelismo de leitura — múltiplas threads leem simultaneamente.

Quando usar: configuração global, cache read-heavy, dados que mudam raramente.

use std::sync::{Arc, RwLock};
use std::thread;
use std::time::Duration;

struct AppConfig {
    max_conexoes: u32,
    timeout_ms: u64,
    modo_debug: bool,
    versao: String,
}

fn main() {
    let config = Arc::new(RwLock::new(AppConfig {
        max_conexoes: 100,
        timeout_ms: 5000,
        modo_debug: false,
        versao: "1.0.0".into(),
    }));

    let mut handles = vec![];

    // 10 threads leitoras (executam em paralelo)
    for id in 0..10 {
        let config = Arc::clone(&config);
        handles.push(thread::spawn(move || {
            for _ in 0..3 {
                let cfg = config.read().unwrap();
                println!(
                    "Leitor {}: v{}, max={}",
                    id, cfg.versao, cfg.max_conexoes
                );
                thread::sleep(Duration::from_millis(10));
            }
        }));
    }

    // 1 thread escritora (bloqueia leitores momentaneamente)
    {
        let config = Arc::clone(&config);
        handles.push(thread::spawn(move || {
            thread::sleep(Duration::from_millis(50));
            let mut cfg = config.write().unwrap();
            cfg.max_conexoes = 200;
            cfg.versao = "1.1.0".into();
            println!("Config atualizada para v{}", cfg.versao);
        }));
    }

    for h in handles {
        h.join().unwrap();
    }
}

Padrão 3: Message Passing — Canais mpsc

Transferir ownership de dados entre threads usando canais. Sem compartilhamento de memória, sem locks.

Quando usar: pipeline de processamento, worker threads, quando dados fluem em uma direção.

use std::sync::mpsc;
use std::thread;

enum Tarefa {
    Processar(String),
    Resultado(String, usize),
    Encerrar,
}

fn main() {
    let (tarefa_tx, tarefa_rx) = mpsc::channel();
    let (resultado_tx, resultado_rx) = mpsc::channel();

    // Worker thread — recebe tarefas, envia resultados
    let worker = thread::spawn(move || {
        for tarefa in tarefa_rx {
            match tarefa {
                Tarefa::Processar(texto) => {
                    let palavras = texto.split_whitespace().count();
                    resultado_tx
                        .send(Tarefa::Resultado(texto, palavras))
                        .unwrap();
                }
                Tarefa::Encerrar => break,
                _ => {}
            }
        }
    });

    // Enviar tarefas
    let textos = vec![
        "Rust é seguro e rápido",
        "Concorrência sem medo",
        "Sistema de tipos poderoso do Rust Brasil",
    ];

    for texto in &textos {
        tarefa_tx
            .send(Tarefa::Processar(texto.to_string()))
            .unwrap();
    }
    tarefa_tx.send(Tarefa::Encerrar).unwrap();

    // Coletar resultados
    drop(tarefa_tx);
    for resultado in resultado_rx {
        if let Tarefa::Resultado(texto, palavras) = resultado {
            println!("\"{}\" -> {} palavras", texto, palavras);
        }
    }

    worker.join().unwrap();
}

Padrão 4: Thread Pool

Reutilizar um conjunto fixo de threads para processar tarefas. Evita o overhead de criar/destruir threads.

Quando usar: servidor que processa muitas requisições, processamento em lote.

use std::sync::mpsc;
use std::sync::{Arc, Mutex};
use std::thread;

type Trabalho = Box<dyn FnOnce() + Send + 'static>;

struct ThreadPool {
    workers: Vec<thread::JoinHandle<()>>,
    sender: Option<mpsc::Sender<Trabalho>>,
}

impl ThreadPool {
    fn new(tamanho: usize) -> Self {
        let (sender, receiver) = mpsc::channel::<Trabalho>();
        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(tamanho);

        for id in 0..tamanho {
            let receiver = Arc::clone(&receiver);
            workers.push(thread::spawn(move || {
                loop {
                    // Cada worker tenta pegar uma tarefa
                    let trabalho = {
                        let rx = receiver.lock().unwrap();
                        rx.recv()
                    };

                    match trabalho {
                        Ok(trabalho) => {
                            println!("[Worker {}] Executando tarefa", id);
                            trabalho();
                        }
                        Err(_) => {
                            println!("[Worker {}] Canal fechado, encerrando", id);
                            break;
                        }
                    }
                }
            }));
        }

        ThreadPool {
            workers,
            sender: Some(sender),
        }
    }

    fn executar<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        if let Some(ref sender) = self.sender {
            sender.send(Box::new(f)).unwrap();
        }
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        // Dropar o sender fecha o canal
        self.sender.take();

        // Esperar todos os workers terminarem
        for worker in self.workers.drain(..) {
            worker.join().unwrap();
        }
    }
}

fn main() {
    let pool = ThreadPool::new(4);

    for i in 0..10 {
        pool.executar(move || {
            println!("Tarefa {} executada na thread {:?}", i, thread::current().id());
            thread::sleep(std::time::Duration::from_millis(100));
        });
    }

    // Pool é dropado aqui: espera todas as tarefas terminarem
    drop(pool);
    println!("Todas as tarefas completadas!");
}

Padrão 5: Paralelismo de Dados com Rayon

A crate rayon transforma iteradores sequenciais em paralelos com mudanças mínimas de código. Ela gerencia um thread pool global automaticamente.

Quando usar: processar coleções grandes em paralelo, map/reduce, qualquer operação sobre iteradores que pode ser paralelizada.

# Cargo.toml
[dependencies]
rayon = "1"
use rayon::prelude::*;

fn eh_primo(n: u64) -> bool {
    if n < 2 { return false; }
    if n < 4 { return true; }
    if n % 2 == 0 || n % 3 == 0 { return false; }
    let mut i = 5;
    while i * i <= n {
        if n % i == 0 || n % (i + 2) == 0 { return false; }
        i += 6;
    }
    true
}

fn main() {
    let numeros: Vec<u64> = (2..=100_000).collect();

    // Sequencial
    let inicio = std::time::Instant::now();
    let primos_seq: Vec<u64> = numeros.iter()
        .filter(|&&n| eh_primo(n))
        .copied()
        .collect();
    println!("Sequencial: {} primos em {:?}", primos_seq.len(), inicio.elapsed());

    // Paralelo com Rayon — apenas troque iter() por par_iter()!
    let inicio = std::time::Instant::now();
    let primos_par: Vec<u64> = numeros.par_iter()
        .filter(|&&n| eh_primo(n))
        .copied()
        .collect();
    println!("Paralelo:   {} primos em {:?}", primos_par.len(), inicio.elapsed());

    assert_eq!(primos_seq.len(), primos_par.len());
}

Rayon com map-reduce

use rayon::prelude::*;

fn main() {
    let textos = vec![
        "Rust é uma linguagem de programação",
        "focada em segurança e performance",
        "com concorrência sem medo",
        "e um sistema de tipos poderoso",
    ];

    // Contar palavras em paralelo
    let total_palavras: usize = textos
        .par_iter()
        .map(|texto| texto.split_whitespace().count())
        .sum();

    println!("Total de palavras: {}", total_palavras);

    // Processar chunks em paralelo
    let dados: Vec<i64> = (1..=1_000_000).collect();
    let soma: i64 = dados.par_iter().sum();
    println!("Soma: {}", soma);
}

Padrão 6: Actor Model

Cada “ator” é uma thread com seu próprio estado, comunicando-se exclusivamente via mensagens. Não há estado compartilhado.

Quando usar: sistemas complexos com múltiplos componentes independentes, microserviços internos.

  ┌─────────┐   msg   ┌──────────┐   msg   ┌──────────┐
  │  Ator A  │ ──────> │  Ator B   │ ──────> │  Ator C   │
  │ (estado) │ <────── │ (estado)  │ <────── │ (estado)  │
  └─────────┘   resp   └──────────┘   resp   └──────────┘
use std::sync::mpsc;
use std::thread;

// Mensagens para o ator Contador
enum MsgContador {
    Incrementar,
    Decrementar,
    ObterValor(mpsc::Sender<i64>),
    Encerrar,
}

struct AtorContador {
    receiver: mpsc::Receiver<MsgContador>,
    valor: i64,
}

impl AtorContador {
    fn new(receiver: mpsc::Receiver<MsgContador>) -> Self {
        AtorContador { receiver, valor: 0 }
    }

    fn executar(mut self) {
        for msg in self.receiver {
            match msg {
                MsgContador::Incrementar => self.valor += 1,
                MsgContador::Decrementar => self.valor -= 1,
                MsgContador::ObterValor(resposta) => {
                    let _ = resposta.send(self.valor);
                }
                MsgContador::Encerrar => break,
            }
        }
        println!("Ator Contador encerrado. Valor final: {}", self.valor);
    }
}

// Handle para interagir com o ator
struct ContadorHandle {
    sender: mpsc::Sender<MsgContador>,
}

impl ContadorHandle {
    fn spawn() -> Self {
        let (sender, receiver) = mpsc::channel();
        thread::spawn(|| {
            let ator = AtorContador::new(receiver);
            ator.executar();
        });
        ContadorHandle { sender }
    }

    fn incrementar(&self) {
        self.sender.send(MsgContador::Incrementar).unwrap();
    }

    fn decrementar(&self) {
        self.sender.send(MsgContador::Decrementar).unwrap();
    }

    fn obter_valor(&self) -> i64 {
        let (tx, rx) = mpsc::channel();
        self.sender.send(MsgContador::ObterValor(tx)).unwrap();
        rx.recv().unwrap()
    }

    fn encerrar(self) {
        let _ = self.sender.send(MsgContador::Encerrar);
    }
}

fn main() {
    let contador = ContadorHandle::spawn();

    contador.incrementar();
    contador.incrementar();
    contador.incrementar();
    contador.decrementar();

    println!("Valor atual: {}", contador.obter_valor());
    // Valor atual: 2

    contador.encerrar();
    thread::sleep(std::time::Duration::from_millis(50));
}

Comparativo: Quando Usar Cada Padrão

PadrãoComplexidadePerformanceMelhor para
Arc<Mutex<T>>BaixaBoaEstado compartilhado simples
Arc<RwLock<T>>BaixaMuito boa (leitura)Cache, config read-heavy
mpsc::channelMédiaBoaPipeline, worker queues
thread::scopeBaixaExcelenteProcessamento paralelo local
Thread PoolMédiaMuito boaServidor, tarefas repetitivas
RayonMuito baixaExcelenteParalelismo de dados
Actor ModelAltaBoaSistemas complexos
AtômicosMédia/AltaExcelenteContadores, flags, lock-free

Regra geral

  Preciso de paralelismo de dados?
  │
  ├─ SIM → Use Rayon (par_iter)
  │        ou thread::scope com chunks_mut
  │
  └─ NÃO → Preciso de estado compartilhado?
            │
            ├─ NÃO → Use channels (mpsc)
            │        ou Actor Model para sistemas complexos
            │
            └─ SIM → Muitas leituras, poucas escritas?
                     │
                     ├─ SIM → Arc<RwLock<T>>
                     │
                     └─ NÃO → Dado simples (int, bool)?
                              │
                              ├─ SIM → Arc<AtomicT>
                              │
                              └─ NÃO → Arc<Mutex<T>>

Anti-padrão: Estado Compartilhado Desnecessário

use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
    // RUIM: usar Mutex quando channel seria mais simples
    let resultados = Arc::new(Mutex::new(Vec::new()));
    let mut handles = vec![];

    for i in 0..5 {
        let resultados = Arc::clone(&resultados);
        handles.push(thread::spawn(move || {
            let r = i * i;
            resultados.lock().unwrap().push(r);
        }));
    }

    for h in handles {
        h.join().unwrap();
    }
    println!("Mutex: {:?}", resultados.lock().unwrap());

    // BOM: usar channel — sem locks, sem Arc
    let (tx, rx) = std::sync::mpsc::channel();

    for i in 0..5 {
        let tx = tx.clone();
        thread::spawn(move || {
            tx.send(i * i).unwrap();
        });
    }
    drop(tx);

    let resultados: Vec<i32> = rx.into_iter().collect();
    println!("Channel: {:?}", resultados);
}

Garantias de Thread Safety

  • O compilador do Rust previne data races em tempo de compilação via Send e Sync.
  • Arc<Mutex<T>> garante que apenas uma thread acessa o dado por vez.
  • Arc<RwLock<T>> permite múltiplos leitores ou um escritor.
  • Channels transferem ownership, eliminando possibilidade de acesso concorrente ao mesmo dado.
  • thread::scope garante que threads terminam antes que dados locais sejam destruídos.
  • Rayon usa work-stealing scheduling para balancear carga entre cores automaticamente.
  • Nenhum desses padrões previne deadlocks — isso requer disciplina do programador (ex: sempre adquirir locks na mesma ordem).

Veja Também