Barrier e Condvar em Rust

Guia completo de Barrier e Condvar em Rust: sincronização de threads, wait, notify_one, notify_all, produtor-consumidor e rendezvous em português.

O que faz e quando usar

Barrier e Condvar (condition variable) são primitivas de sincronização avançadas que permitem coordenar a execução de múltiplas threads.

Barrier (barreira) bloqueia um grupo de threads até que todas tenham chegado ao mesmo ponto. É útil quando você precisa que N threads completem uma fase antes de todas prosseguirem para a próxima.

Condvar (variável de condição) permite que uma thread espere até que uma condição seja sinalizada por outra thread. Diferentemente de um loop de polling, a thread fica suspensa (sem consumir CPU) até ser notificada. Condvar sempre é usada em conjunto com um Mutex.

  Barrier (N=3):                    Condvar:

  T0 ──|                            T0 ── wait() ──────── [acorda] ──>
  T1 ────|     todos chegaram!      T1 ── notify_one() ─┘
  T2 ──────|─> todos prosseguem     T2 ── wait() ─────── [acorda] ──>
                                    T3 ── notify_all() ──┘

Use Barrier quando:

  • Threads precisam sincronizar em “fases” — todas terminam a fase 1 antes de iniciar a fase 2.
  • Quer garantir que todos os workers estão prontos antes de iniciar o processamento.

Use Condvar quando:

  • Uma thread precisa esperar por uma condição que será alterada por outra thread.
  • Quer implementar padrões como produtor-consumidor, fila bloqueante ou semáforo.
  • Precisa de notificação eficiente sem polling ativo (busy-wait).

Tipos e Funções Principais

Barrier

ItemDescrição
Barrier::new(n)Cria barreira para n threads
barrier.wait()Bloqueia até n threads chamarem wait()
BarrierWaitResult::is_leader()true para exatamente uma thread (a “líder”)

Condvar

ItemDescrição
Condvar::new()Cria nova variável de condição
condvar.wait(guard)Libera o lock, espera notificação, readquire o lock
condvar.wait_while(guard, f)Espera enquanto a condição f for true
condvar.wait_timeout(guard, dur)Espera com timeout
condvar.notify_one()Acorda uma thread que está esperando
condvar.notify_all()Acorda todas as threads que estão esperando

Exemplos de Código

Barrier — sincronizando threads em fases

use std::sync::{Arc, Barrier};
use std::thread;
use std::time::Duration;

fn main() {
    let num_threads = 4;
    let barreira = Arc::new(Barrier::new(num_threads));

    let mut handles = vec![];

    for id in 0..num_threads {
        let barreira = Arc::clone(&barreira);
        handles.push(thread::spawn(move || {
            // Fase 1: preparação (cada thread leva tempo diferente)
            println!("[T{}] Preparando...", id);
            thread::sleep(Duration::from_millis(100 * id as u64));
            println!("[T{}] Pronta!", id);

            // Esperar todas as threads ficarem prontas
            let resultado = barreira.wait();
            if resultado.is_leader() {
                println!("--- Todas prontas! Líder: T{} ---", id);
            }

            // Fase 2: processamento (todas começam juntas)
            println!("[T{}] Processando...", id);
            thread::sleep(Duration::from_millis(50));
            println!("[T{}] Concluída!", id);
        }));
    }

    for h in handles {
        h.join().unwrap();
    }
}

Barrier reutilizável — múltiplas fases

A Barrier pode ser reutilizada: após todas as threads passarem, ela “reinicia”:

use std::sync::{Arc, Barrier, Mutex};
use std::thread;

fn main() {
    let num_workers = 3;
    let num_fases = 3;
    let barreira = Arc::new(Barrier::new(num_workers));
    let resultados = Arc::new(Mutex::new(vec![0i32; num_workers]));

    let mut handles = vec![];

    for id in 0..num_workers {
        let barreira = Arc::clone(&barreira);
        let resultados = Arc::clone(&resultados);
        handles.push(thread::spawn(move || {
            for fase in 1..=num_fases {
                // Cada thread calcula seu resultado da fase
                let valor = (id as i32 + 1) * fase as i32;
                resultados.lock().unwrap()[id] = valor;
                println!("Worker {} fase {}: calculou {}", id, fase, valor);

                // Sincronizar: esperar todos terminarem a fase
                let wait_result = barreira.wait();

                // Apenas o líder imprime o resumo da fase
                if wait_result.is_leader() {
                    let res = resultados.lock().unwrap();
                    let soma: i32 = res.iter().sum();
                    println!("=== Fase {} completa. Soma: {} ===", fase, soma);
                }

                // Sincronizar novamente antes da próxima fase
                barreira.wait();
            }
        }));
    }

    for h in handles {
        h.join().unwrap();
    }
}

Condvar básica — esperar por uma condição

use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use std::time::Duration;

fn main() {
    // O par (Mutex, Condvar) é o padrão canônico
    let par = Arc::new((Mutex::new(false), Condvar::new()));

    // Thread que espera
    let par_clone = Arc::clone(&par);
    let esperador = thread::spawn(move || {
        let (lock, cvar) = &*par_clone;
        let mut pronto = lock.lock().unwrap();

        println!("[esperador] Aguardando sinal...");

        // wait_while: espera enquanto a condição for true (ou seja, enquanto !pronto)
        while !*pronto {
            pronto = cvar.wait(pronto).unwrap();
        }

        println!("[esperador] Sinal recebido! Continuando...");
    });

    // Thread que sinaliza
    thread::sleep(Duration::from_millis(500));
    let (lock, cvar) = &*par;
    {
        let mut pronto = lock.lock().unwrap();
        *pronto = true;
        println!("[sinalizador] Enviando sinal!");
    }
    cvar.notify_one(); // Acorda a thread que está esperando

    esperador.join().unwrap();
}

Condvar com wait_while (mais idiomático)

use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use std::time::Duration;

fn main() {
    let estado = Arc::new((Mutex::new(0u32), Condvar::new()));

    let estado_clone = Arc::clone(&estado);
    let esperador = thread::spawn(move || {
        let (lock, cvar) = &*estado_clone;
        let guard = lock.lock().unwrap();

        // wait_while: espera enquanto a condição retorna true
        // Mais seguro que um loop manual (protege contra spurious wakeups)
        let guard = cvar.wait_while(guard, |valor| *valor < 5).unwrap();
        println!("Valor atingiu: {}", *guard);
    });

    // Incrementar o valor gradualmente
    for i in 1..=5 {
        thread::sleep(Duration::from_millis(200));
        let (lock, cvar) = &*estado;
        let mut valor = lock.lock().unwrap();
        *valor = i;
        println!("Valor atualizado para: {}", i);
        cvar.notify_one();
    }

    esperador.join().unwrap();
}

Padrão produtor-consumidor com Condvar

use std::collections::VecDeque;
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use std::time::Duration;

struct FilaBloqueante<T> {
    fila: Mutex<VecDeque<T>>,
    nao_vazia: Condvar,
}

impl<T> FilaBloqueante<T> {
    fn new() -> Self {
        FilaBloqueante {
            fila: Mutex::new(VecDeque::new()),
            nao_vazia: Condvar::new(),
        }
    }

    fn push(&self, item: T) {
        let mut fila = self.fila.lock().unwrap();
        fila.push_back(item);
        self.nao_vazia.notify_one(); // Acordar um consumidor
    }

    fn pop(&self) -> T {
        let mut fila = self.fila.lock().unwrap();
        // Esperar enquanto a fila estiver vazia
        while fila.is_empty() {
            fila = self.nao_vazia.wait(fila).unwrap();
        }
        fila.pop_front().unwrap()
    }
}

fn main() {
    let fila = Arc::new(FilaBloqueante::new());

    // 2 produtores
    for id in 0..2 {
        let fila = Arc::clone(&fila);
        thread::spawn(move || {
            for i in 0..5 {
                let msg = format!("P{}-msg{}", id, i);
                println!("[produtor {}] enviando: {}", id, msg);
                fila.push(msg);
                thread::sleep(Duration::from_millis(100));
            }
        });
    }

    // 1 consumidor
    let fila_c = Arc::clone(&fila);
    let consumidor = thread::spawn(move || {
        for _ in 0..10 {
            let item = fila_c.pop(); // bloqueia se vazio
            println!("[consumidor] recebeu: {}", item);
        }
    });

    consumidor.join().unwrap();
    println!("Todas as mensagens processadas!");
}

Condvar com timeout

use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use std::time::Duration;

fn main() {
    let par = Arc::new((Mutex::new(false), Condvar::new()));

    let par_clone = Arc::clone(&par);
    let esperador = thread::spawn(move || {
        let (lock, cvar) = &*par_clone;
        let guard = lock.lock().unwrap();

        // Esperar com timeout de 1 segundo
        let (guard, timeout_result) = cvar
            .wait_timeout_while(guard, Duration::from_secs(1), |pronto| !*pronto)
            .unwrap();

        if timeout_result.timed_out() {
            println!("Timeout! Não recebeu sinal a tempo.");
        } else {
            println!("Sinal recebido! Valor: {}", *guard);
        }
    });

    // Simular atraso maior que o timeout
    thread::sleep(Duration::from_secs(2));
    let (lock, cvar) = &*par;
    *lock.lock().unwrap() = true;
    cvar.notify_one();

    esperador.join().unwrap();
}

Rendezvous — duas threads se encontram

use std::sync::{Arc, Barrier, Condvar, Mutex};
use std::thread;

fn main() {
    // Usando Barrier como rendezvous simples
    let ponto_encontro = Arc::new(Barrier::new(2));

    let p1 = Arc::clone(&ponto_encontro);
    let t1 = thread::spawn(move || {
        println!("Thread 1: preparando dados...");
        thread::sleep(std::time::Duration::from_millis(200));
        println!("Thread 1: pronta, esperando Thread 2...");
        p1.wait();
        println!("Thread 1: ambas prontas, continuando!");
    });

    let p2 = Arc::clone(&ponto_encontro);
    let t2 = thread::spawn(move || {
        println!("Thread 2: preparando dados...");
        thread::sleep(std::time::Duration::from_millis(500));
        println!("Thread 2: pronta, esperando Thread 1...");
        p2.wait();
        println!("Thread 2: ambas prontas, continuando!");
    });

    t1.join().unwrap();
    t2.join().unwrap();
}

Padrões Comuns e Anti-padrões

Anti-padrão: esquecer de verificar a condição em loop

use std::sync::{Condvar, Mutex};

fn exemplo_anti_padrao() {
    let par = (Mutex::new(false), Condvar::new());
    let (lock, cvar) = &par;

    let guard = lock.lock().unwrap();

    // ERRADO: wait() pode retornar sem que a condição tenha mudado
    // (spurious wakeup)
    // let guard = cvar.wait(guard).unwrap();

    // CORRETO: sempre verificar a condição em loop
    // Opção 1: loop manual
    let mut guard = guard;
    while !*guard {
        guard = cvar.wait(guard).unwrap();
    }

    // Opção 2: wait_while (recomendado — faz o loop internamente)
    // let guard = cvar.wait_while(guard, |val| !*val).unwrap();
}

Anti-padrão: notify antes do wait

use std::sync::{Arc, Condvar, Mutex};
use std::thread;

fn main() {
    let par = Arc::new((Mutex::new(false), Condvar::new()));

    // Se notify_one() for chamado ANTES de wait(), o sinal é perdido!
    // Solução: sempre verificar o estado no Mutex antes de esperar
    let par_clone = Arc::clone(&par);
    let esperador = thread::spawn(move || {
        let (lock, cvar) = &*par_clone;
        let guard = lock.lock().unwrap();

        // Correto: verificar se já está pronto ANTES de esperar
        if *guard {
            println!("Já estava pronto, sem necessidade de esperar!");
            return;
        }

        let _guard = cvar.wait_while(guard, |pronto| !*pronto).unwrap();
        println!("Pronto!");
    });

    // Sinalizar
    let (lock, cvar) = &*par;
    *lock.lock().unwrap() = true;
    cvar.notify_one();

    esperador.join().unwrap();
}

Garantias de Thread Safety

  • Barrier é Send + Sync — pode ser compartilhada entre threads via Arc.
  • Condvar é Send + Sync — thread-safe para uso compartilhado.
  • Condvar::wait libera o lock atomicamente e readquire antes de retornar.
  • Spurious wakeups podem ocorrer — sempre use wait_while ou verifique a condição em loop.
  • Barrier é automaticamente reutilizável: após todas as threads passarem, ela pode ser usada novamente.
  • Se uma thread entrar em panic enquanto espera em um Condvar, o Mutex associado pode ficar poisoned.

Veja Também