Observer em Rust

O padrao Observer em Rust: callbacks com Box<dyn Fn>, observer via canais mpsc, async streams como observer moderno e exemplos praticos de sistema de eventos.

Introducao

O Observer (Observador) e um padrao comportamental que define uma dependencia um-para-muitos entre objetos: quando um objeto (o sujeito) muda de estado, todos os seus dependentes (observadores) sao notificados automaticamente.

Em Rust, o Observer apresenta desafios unicos por causa do sistema de ownership. Diferente de linguagens com garbage collector onde observadores podem manter referencias livremente, em Rust precisamos ser explícitos sobre quem possui o que. Isso nos leva a tres abordagens idiomaticas: callbacks com closures, canais mpsc e streams assincrono.


Problema

Voce esta construindo um sistema de monitoramento de precos de acoes. Varios componentes precisam ser notificados quando o preco muda: o grafico deve atualizar, alertas devem disparar, o historico deve registrar, e a API deve notificar clientes externos.

// Sem Observer: acoplamento direto com todos os dependentes
fn atualizar_preco(acao: &str, novo_preco: f64) {
    // Preciso conhecer TODOS os dependentes aqui
    atualizar_grafico(acao, novo_preco);       // dependente 1
    verificar_alertas(acao, novo_preco);        // dependente 2
    registrar_historico(acao, novo_preco);      // dependente 3
    notificar_api(acao, novo_preco);            // dependente 4
    enviar_webhook(acao, novo_preco);           // dependente 5
    // Adicionar novo dependente = modificar esta funcao
    // Remover dependente = modificar esta funcao
    // Impossivel de manter!
}

Solucao em Rust

Abordagem 1: Callbacks com Box

A forma mais simples e direta do Observer:

use std::collections::HashMap;

/// Evento emitido quando algo acontece
#[derive(Debug, Clone)]
pub struct Evento {
    pub tipo: String,
    pub dados: DadosEvento,
    pub timestamp: u64,
}

#[derive(Debug, Clone)]
pub enum DadosEvento {
    PrecoAtualizado {
        simbolo: String,
        preco_anterior: f64,
        preco_novo: f64,
    },
    VolumeAlto {
        simbolo: String,
        volume: u64,
    },
    MercadoAberto,
    MercadoFechado,
}

/// Tipo do callback de observador
type ObservadorFn = Box<dyn Fn(&Evento) + Send>;

/// Identificador unico para cada observador registrado
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct ObservadorId(u64);

/// Sistema de eventos com registro de observadores
pub struct EventBus {
    observadores: HashMap<String, Vec<(ObservadorId, ObservadorFn)>>,
    observadores_globais: Vec<(ObservadorId, ObservadorFn)>,
    proximo_id: u64,
}

impl EventBus {
    pub fn new() -> Self {
        Self {
            observadores: HashMap::new(),
            observadores_globais: Vec::new(),
            proximo_id: 0,
        }
    }

    /// Gera um novo ID unico
    fn novo_id(&mut self) -> ObservadorId {
        self.proximo_id += 1;
        ObservadorId(self.proximo_id)
    }

    /// Registra um observador para um tipo especifico de evento
    pub fn registrar(
        &mut self,
        tipo_evento: &str,
        callback: impl Fn(&Evento) + Send + 'static,
    ) -> ObservadorId {
        let id = self.novo_id();
        self.observadores
            .entry(tipo_evento.to_string())
            .or_default()
            .push((id, Box::new(callback)));
        id
    }

    /// Registra um observador para TODOS os eventos
    pub fn registrar_global(
        &mut self,
        callback: impl Fn(&Evento) + Send + 'static,
    ) -> ObservadorId {
        let id = self.novo_id();
        self.observadores_globais.push((id, Box::new(callback)));
        id
    }

    /// Remove um observador pelo ID
    pub fn remover(&mut self, id: ObservadorId) {
        for lista in self.observadores.values_mut() {
            lista.retain(|(obs_id, _)| *obs_id != id);
        }
        self.observadores_globais.retain(|(obs_id, _)| *obs_id != id);
    }

    /// Emite um evento, notificando todos os observadores relevantes
    pub fn emitir(&self, evento: Evento) {
        // Notifica observadores especificos do tipo
        if let Some(lista) = self.observadores.get(&evento.tipo) {
            for (_, callback) in lista {
                callback(&evento);
            }
        }

        // Notifica observadores globais
        for (_, callback) in &self.observadores_globais {
            callback(&evento);
        }
    }

    /// Retorna quantos observadores estao registrados
    pub fn total_observadores(&self) -> usize {
        let especificos: usize = self.observadores.values().map(|v| v.len()).sum();
        especificos + self.observadores_globais.len()
    }
}

fn main() {
    let mut bus = EventBus::new();

    // Observador 1: Grafico de precos
    bus.registrar("preco", |evento| {
        if let DadosEvento::PrecoAtualizado {
            simbolo,
            preco_anterior,
            preco_novo,
        } = &evento.dados
        {
            let variacao = ((preco_novo - preco_anterior) / preco_anterior) * 100.0;
            let seta = if variacao >= 0.0 { "^" } else { "v" };
            println!(
                "[GRAFICO] {} {} R${:.2} -> R${:.2} ({}{:.2}%)",
                simbolo, seta, preco_anterior, preco_novo,
                if variacao >= 0.0 { "+" } else { "" },
                variacao
            );
        }
    });

    // Observador 2: Sistema de alertas
    bus.registrar("preco", |evento| {
        if let DadosEvento::PrecoAtualizado {
            simbolo,
            preco_anterior,
            preco_novo,
        } = &evento.dados
        {
            let variacao_abs = ((preco_novo - preco_anterior) / preco_anterior * 100.0).abs();
            if variacao_abs > 3.0 {
                println!(
                    "[ALERTA!] Variacao significativa em {}: {:.2}%",
                    simbolo, variacao_abs
                );
            }
        }
    });

    // Observador 3: Logger global (observa TODOS os eventos)
    let log_id = bus.registrar_global(|evento| {
        println!("[LOG] Evento '{}' recebido (ts: {})", evento.tipo, evento.timestamp);
    });

    // Observador 4: Historico de precos
    bus.registrar("preco", |evento| {
        if let DadosEvento::PrecoAtualizado { simbolo, preco_novo, .. } = &evento.dados {
            println!(
                "[HISTORICO] Registrando {} = R${:.2}",
                simbolo, preco_novo
            );
        }
    });

    println!("Observadores registrados: {}\n", bus.total_observadores());

    // Emite eventos de atualizacao de precos
    let eventos = vec![
        Evento {
            tipo: "preco".to_string(),
            dados: DadosEvento::PrecoAtualizado {
                simbolo: "PETR4".to_string(),
                preco_anterior: 35.50,
                preco_novo: 36.20,
            },
            timestamp: 1000,
        },
        Evento {
            tipo: "preco".to_string(),
            dados: DadosEvento::PrecoAtualizado {
                simbolo: "VALE3".to_string(),
                preco_anterior: 68.00,
                preco_novo: 64.50,
            },
            timestamp: 1001,
        },
        Evento {
            tipo: "mercado".to_string(),
            dados: DadosEvento::MercadoFechado,
            timestamp: 1002,
        },
    ];

    for evento in eventos {
        println!("--- Emitindo: {} ---", evento.tipo);
        bus.emitir(evento);
        println!();
    }

    // Remove o logger
    bus.remover(log_id);
    println!("Logger removido. Observadores restantes: {}", bus.total_observadores());
}

Abordagem 2: Observer com Canais mpsc

Ideal para comunicacao entre threads:

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

/// Mensagem enviada pelo sujeito para os observadores
#[derive(Debug, Clone)]
pub enum Notificacao {
    TemperaturaAtualizada { sensor_id: String, celsius: f64 },
    UmidadeAtualizada { sensor_id: String, percentual: f64 },
    AlertaCritico { sensor_id: String, mensagem: String },
    Encerrar,
}

/// Sensor que emite notificacoes (o Sujeito)
pub struct SensorHub {
    transmissores: Vec<mpsc::Sender<Notificacao>>,
}

impl SensorHub {
    pub fn new() -> Self {
        Self {
            transmissores: Vec::new(),
        }
    }

    /// Registra um novo observador e retorna o receptor de eventos
    pub fn registrar_observador(&mut self) -> mpsc::Receiver<Notificacao> {
        let (tx, rx) = mpsc::channel();
        self.transmissores.push(tx);
        rx
    }

    /// Notifica todos os observadores
    pub fn notificar(&self, notificacao: Notificacao) {
        // Remove transmissores desconectados mantendo os ativos
        for tx in &self.transmissores {
            let _ = tx.send(notificacao.clone());
        }
    }

    /// Simula leituras dos sensores
    pub fn simular_leituras(&self) {
        let leituras = vec![
            Notificacao::TemperaturaAtualizada {
                sensor_id: "S001".to_string(),
                celsius: 25.3,
            },
            Notificacao::UmidadeAtualizada {
                sensor_id: "S001".to_string(),
                percentual: 65.0,
            },
            Notificacao::TemperaturaAtualizada {
                sensor_id: "S002".to_string(),
                celsius: 42.8,
            },
            Notificacao::AlertaCritico {
                sensor_id: "S002".to_string(),
                mensagem: "Temperatura acima do limite!".to_string(),
            },
            Notificacao::TemperaturaAtualizada {
                sensor_id: "S001".to_string(),
                celsius: 26.1,
            },
        ];

        for leitura in leituras {
            println!("[HUB] Emitindo: {:?}", leitura);
            self.notificar(leitura);
            thread::sleep(Duration::from_millis(100));
        }

        // Encerra todos os observadores
        self.notificar(Notificacao::Encerrar);
    }
}

fn demonstrar_mpsc() {
    let mut hub = SensorHub::new();

    // Observador 1: Display de temperatura (thread separada)
    let rx_display = hub.registrar_observador();
    let handle_display = thread::spawn(move || {
        println!("[DISPLAY] Iniciado");
        loop {
            match rx_display.recv() {
                Ok(Notificacao::TemperaturaAtualizada { sensor_id, celsius }) => {
                    let barra = "=".repeat((celsius as usize).min(50));
                    println!("[DISPLAY] {}: {:.1}C |{}|", sensor_id, celsius, barra);
                }
                Ok(Notificacao::Encerrar) => {
                    println!("[DISPLAY] Encerrando");
                    break;
                }
                Ok(_) => {} // ignora outros eventos
                Err(_) => break, // canal fechado
            }
        }
    });

    // Observador 2: Sistema de alertas (thread separada)
    let rx_alertas = hub.registrar_observador();
    let handle_alertas = thread::spawn(move || {
        println!("[ALERTAS] Iniciado");
        loop {
            match rx_alertas.recv() {
                Ok(Notificacao::AlertaCritico { sensor_id, mensagem }) => {
                    println!("[ALERTAS] *** CRITICO *** Sensor {}: {}", sensor_id, mensagem);
                }
                Ok(Notificacao::TemperaturaAtualizada { celsius, sensor_id, .. }) => {
                    if celsius > 40.0 {
                        println!(
                            "[ALERTAS] Aviso: Sensor {} com temperatura alta: {:.1}C",
                            sensor_id, celsius
                        );
                    }
                }
                Ok(Notificacao::Encerrar) => {
                    println!("[ALERTAS] Encerrando");
                    break;
                }
                Ok(_) => {}
                Err(_) => break,
            }
        }
    });

    // Observador 3: Logger (thread separada)
    let rx_logger = hub.registrar_observador();
    let handle_logger = thread::spawn(move || {
        let mut contagem = 0u64;
        loop {
            match rx_logger.recv() {
                Ok(Notificacao::Encerrar) => {
                    println!("[LOGGER] Total de eventos registrados: {}", contagem);
                    break;
                }
                Ok(notificacao) => {
                    contagem += 1;
                    println!("[LOGGER] #{}: {:?}", contagem, notificacao);
                }
                Err(_) => break,
            }
        }
    });

    // Simula leituras no thread principal
    thread::sleep(Duration::from_millis(200)); // espera observadores iniciarem
    hub.simular_leituras();

    // Aguarda todos os observadores encerrarem
    handle_display.join().unwrap();
    handle_alertas.join().unwrap();
    handle_logger.join().unwrap();

    println!("\nTodos os observadores encerraram.");
}

fn main() {
    demonstrar_mpsc();
}

Diagrama

OBSERVER COM CALLBACKS:

    +--------------------+
    |     EventBus       |
    |     (Sujeito)      |
    +--------+-----------+
             | emitir(evento)
             |
    +--------v-----------+
    | HashMap<tipo,      |
    |   Vec<callbacks>>  |
    +----+-----+-----+---+
         |     |     |
         v     v     v
    callback  callback  callback
    (Grafico) (Alerta)  (Historico)


OBSERVER COM CANAIS (mpsc):

    +----------------+
    |   SensorHub    |
    |   (Sujeito)    |
    +---+---+---+----+
        |   |   |
        |   |   |  mpsc::Sender
        v   v   v
    +---+ +---+ +---+
    |rx1| |rx2| |rx3|     <- mpsc::Receiver (um por thread)
    +---+ +---+ +---+
      |     |     |
      v     v     v
    Thread Thread Thread
    (Disp) (Alert) (Log)

    Cada observador roda em sua propria thread!
    Comunicacao segura via canais.

Exemplo do Mundo Real

Sistema de eventos tipado para uma aplicacao GUI simplificada:

use std::collections::HashMap;
use std::any::{Any, TypeId};

/// Trait marcadora para tipos de eventos
pub trait EventoGui: Any + Send + 'static {
    fn nome_tipo(&self) -> &'static str;
}

/// Eventos concretos do GUI
#[derive(Debug, Clone)]
pub struct CliqueEvento {
    pub x: f64,
    pub y: f64,
    pub botao: BotaoMouse,
}

#[derive(Debug, Clone)]
pub enum BotaoMouse {
    Esquerdo,
    Direito,
    Meio,
}

impl EventoGui for CliqueEvento {
    fn nome_tipo(&self) -> &'static str {
        "CliqueEvento"
    }
}

#[derive(Debug, Clone)]
pub struct TeclaEvento {
    pub tecla: String,
    pub ctrl: bool,
    pub shift: bool,
}

impl EventoGui for TeclaEvento {
    fn nome_tipo(&self) -> &'static str {
        "TeclaEvento"
    }
}

#[derive(Debug, Clone)]
pub struct RedimensionarEvento {
    pub largura: u32,
    pub altura: u32,
}

impl EventoGui for RedimensionarEvento {
    fn nome_tipo(&self) -> &'static str {
        "RedimensionarEvento"
    }
}

/// Despachante de eventos tipado
pub struct Despachante {
    handlers: HashMap<TypeId, Vec<Box<dyn Fn(&dyn Any) + Send>>>,
}

impl Despachante {
    pub fn new() -> Self {
        Self {
            handlers: HashMap::new(),
        }
    }

    /// Registra um handler para um tipo especifico de evento
    pub fn ao_receber<E: 'static>(
        &mut self,
        handler: impl Fn(&E) + Send + 'static,
    ) {
        let type_id = TypeId::of::<E>();
        let handler_wrapper: Box<dyn Fn(&dyn Any) + Send> = Box::new(move |any| {
            if let Some(evento) = any.downcast_ref::<E>() {
                handler(evento);
            }
        });

        self.handlers
            .entry(type_id)
            .or_default()
            .push(handler_wrapper);
    }

    /// Despacha um evento para todos os handlers registrados
    pub fn despachar<E: 'static>(&self, evento: &E) {
        let type_id = TypeId::of::<E>();
        if let Some(handlers) = self.handlers.get(&type_id) {
            for handler in handlers {
                handler(evento as &dyn Any);
            }
        }
    }
}

fn main() {
    let mut despachante = Despachante::new();

    // Registra handlers tipados - sem casts inseguros!
    despachante.ao_receber::<CliqueEvento>(|evento| {
        println!(
            "[UI] Clique {:?} em ({:.0}, {:.0})",
            evento.botao, evento.x, evento.y
        );
    });

    despachante.ao_receber::<CliqueEvento>(|evento| {
        if let BotaoMouse::Direito = evento.botao {
            println!("[UI] Menu de contexto aberto em ({:.0}, {:.0})", evento.x, evento.y);
        }
    });

    despachante.ao_receber::<TeclaEvento>(|evento| {
        if evento.ctrl && evento.tecla == "S" {
            println!("[UI] Ctrl+S: Salvando documento...");
        } else if evento.ctrl && evento.tecla == "Z" {
            println!("[UI] Ctrl+Z: Desfazendo...");
        } else {
            println!("[UI] Tecla pressionada: {}", evento.tecla);
        }
    });

    despachante.ao_receber::<RedimensionarEvento>(|evento| {
        println!(
            "[UI] Janela redimensionada: {}x{}",
            evento.largura, evento.altura
        );
    });

    // Simula eventos do GUI
    println!("=== Simulacao de eventos GUI ===\n");

    despachante.despachar(&CliqueEvento {
        x: 100.0,
        y: 200.0,
        botao: BotaoMouse::Esquerdo,
    });

    despachante.despachar(&CliqueEvento {
        x: 300.0,
        y: 150.0,
        botao: BotaoMouse::Direito,
    });

    despachante.despachar(&TeclaEvento {
        tecla: "S".to_string(),
        ctrl: true,
        shift: false,
    });

    despachante.despachar(&TeclaEvento {
        tecla: "A".to_string(),
        ctrl: false,
        shift: false,
    });

    despachante.despachar(&RedimensionarEvento {
        largura: 1920,
        altura: 1080,
    });
}

Quando Usar

  • Sistemas de eventos em GUIs, jogos, aplicacoes reativas
  • Desacoplamento entre produtor e consumidor de eventos
  • Pub/Sub para comunicacao entre modulos
  • Notificacoes de mudanca de estado (model -> view, por exemplo)
  • Plugins que reagem a eventos do sistema hospedeiro

Quando NAO Usar

  • Dependencias claras e diretas - uma chamada de funcao e mais simples
  • Ordem de processamento importa - Observer nao garante ordem (a menos que voce force)
  • Poucos observadores fixos - o overhead do sistema de eventos nao se justifica
  • Debugging complexo - observers tornam o fluxo de execucao dificil de rastrear
// Para poucos dependentes fixos, uma chamada direta e melhor:
struct Sistema {
    display: Display,
    logger: Logger,
}

impl Sistema {
    fn atualizar(&mut self, dados: &Dados) {
        // Claro, simples, rastreavel
        self.display.atualizar(dados);
        self.logger.registrar(dados);
    }
}

Variacoes em Rust

1. Observer com Weak references (evita ciclos)

use std::sync::{Arc, Weak};

struct Sujeito {
    observadores: Vec<Weak<dyn Observador>>,
}

impl Sujeito {
    fn notificar(&mut self) {
        // Remove observadores que ja foram destruidos
        self.observadores.retain(|obs| obs.strong_count() > 0);

        for obs in &self.observadores {
            if let Some(obs) = obs.upgrade() {
                obs.atualizar();
            }
        }
    }
}

2. Observer reativo com iteradores

// Usando iteradores como "streams sincronos"
fn observar_mudancas(valores: &[i32]) -> impl Iterator<Item = (i32, i32)> + '_ {
    valores.windows(2).filter_map(|w| {
        if w[0] != w[1] {
            Some((w[0], w[1]))
        } else {
            None
        }
    })
}

3. Observer com tokio broadcast (async)

// Para aplicacoes async, tokio::sync::broadcast e ideal
// use tokio::sync::broadcast;
//
// let (tx, mut rx1) = broadcast::channel(100);
// let mut rx2 = tx.subscribe();
//
// tx.send(evento)?; // envia para TODOS os receptores

Padroes Relacionados

  • Strategy - Observers sao essencialmente strategies de reacao a eventos
  • Decorator - Middleware e um Observer/Decorator hibrido em web frameworks
  • Composite - Eventos podem propagar por uma arvore de componentes
  • Facade - O EventBus pode servir como Facade para o sistema de eventos

Conclusao

O Observer em Rust requer mais reflexao sobre ownership do que em linguagens com garbage collection, mas as solucoes resultantes sao mais robustas e livres de memory leaks. A abordagem com callbacks (Box<dyn Fn>) e a mais simples para cenarios single-threaded. Canais mpsc sao a escolha natural para comunicacao entre threads, aproveitando o modelo de concorrencia segura de Rust. Para aplicacoes async, tokio::broadcast e tokio::watch oferecem padroes Observer performantes e escaláveis. Independente da abordagem escolhida, o sistema de tipos de Rust garante que observadores nao causem data races, dangling references ou memory leaks.