Agregador de Logs em Rust

Construa um agregador de logs em Rust que coleta, analisa e unifica logs de múltiplos arquivos com filtragem por nível e timestamp.

Logs sao a principal ferramenta de diagnostico em sistemas de producao. Quando voce tem dezenas de servicos gerando arquivos de log simultaneamente, encontrar a informacao relevante se torna um desafio. Neste projeto, vamos construir um agregador de logs em Rust que monitora multiplos arquivos, analisa cada linha extraindo timestamp e nivel de severidade, e apresenta uma visao unificada e filtrada em tempo real.

Este projeto explora conceitos fundamentais de I/O de arquivos, parsing de texto, observacao de mudancas no sistema de arquivos e manipulacao de datas – habilidades essenciais para qualquer ferramenta de DevOps.

O Que Vamos Construir

  • Monitoramento simultaneo de multiplos arquivos de log
  • Parsing automatico de timestamps e niveis (ERROR, WARN, INFO, DEBUG)
  • Filtragem por nivel de severidade
  • Saida unificada ordenada por timestamp
  • Deteccao de novas linhas em tempo real com a crate notify
  • Formatacao colorida no terminal

Estrutura do Projeto

log-aggregator/
├── Cargo.toml
└── src/
    ├── main.rs
    ├── parser.rs
    ├── watcher.rs
    └── aggregator.rs

Configurando o Projeto

cargo new log-aggregator
cd log-aggregator

Edite o Cargo.toml com as dependencias necessarias:

[package]
name = "log-aggregator"
version = "0.1.0"
edition = "2021"

[dependencies]
notify = "6.1"
chrono = "0.4"
colored = "2.1"
clap = { version = "4.5", features = ["derive"] }

Passo 1: Parser de Linhas de Log

O parser e responsavel por extrair informacoes estruturadas de cada linha de log. Vamos suportar o formato comum [YYYY-MM-DD HH:MM:SS] [NIVEL] mensagem.

Crie o arquivo src/parser.rs:

use chrono::NaiveDateTime;
use std::fmt;

/// Niveis de severidade suportados pelo agregador.
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub enum NivelLog {
    Debug,
    Info,
    Warn,
    Error,
}

impl NivelLog {
    /// Converte uma string para o nivel de log correspondente.
    pub fn from_str(s: &str) -> Option<Self> {
        match s.to_uppercase().as_str() {
            "DEBUG" => Some(NivelLog::Debug),
            "INFO" => Some(NivelLog::Info),
            "WARN" | "WARNING" => Some(NivelLog::Warn),
            "ERROR" | "ERR" => Some(NivelLog::Error),
            _ => None,
        }
    }
}

impl fmt::Display for NivelLog {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            NivelLog::Debug => write!(f, "DEBUG"),
            NivelLog::Info => write!(f, "INFO"),
            NivelLog::Warn => write!(f, "WARN"),
            NivelLog::Error => write!(f, "ERROR"),
        }
    }
}

/// Representa uma entrada de log ja parseada.
#[derive(Debug, Clone)]
pub struct EntradaLog {
    pub timestamp: NaiveDateTime,
    pub nivel: NivelLog,
    pub mensagem: String,
    pub arquivo_origem: String,
}

impl fmt::Display for EntradaLog {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(
            f,
            "[{}] [{}] [{}] {}",
            self.timestamp.format("%Y-%m-%d %H:%M:%S"),
            self.nivel,
            self.arquivo_origem,
            self.mensagem
        )
    }
}

/// Tenta parsear uma linha de log no formato:
/// [2026-01-15 10:30:45] [INFO] Mensagem aqui
pub fn parsear_linha(linha: &str, arquivo_origem: &str) -> Option<EntradaLog> {
    let linha = linha.trim();
    if linha.is_empty() {
        return None;
    }

    // Extrair timestamp entre colchetes
    let inicio_ts = linha.find('[')? + 1;
    let fim_ts = linha.find(']')?;
    let timestamp_str = &linha[inicio_ts..fim_ts];

    let timestamp = NaiveDateTime::parse_from_str(timestamp_str, "%Y-%m-%d %H:%M:%S").ok()?;

    // Extrair nivel entre o segundo par de colchetes
    let restante = &linha[fim_ts + 1..].trim_start();
    let inicio_nivel = restante.find('[')? + 1;
    let fim_nivel = restante.find(']')?;
    let nivel_str = &restante[inicio_nivel..fim_nivel];
    let nivel = NivelLog::from_str(nivel_str)?;

    // O restante e a mensagem
    let mensagem = restante[fim_nivel + 1..].trim().to_string();

    Some(EntradaLog {
        timestamp,
        nivel,
        mensagem,
        arquivo_origem: arquivo_origem.to_string(),
    })
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn teste_parsear_linha_info() {
        let linha = "[2026-01-15 10:30:45] [INFO] Servidor iniciado na porta 8080";
        let entrada = parsear_linha(linha, "app.log").unwrap();
        assert_eq!(entrada.nivel, NivelLog::Info);
        assert_eq!(entrada.mensagem, "Servidor iniciado na porta 8080");
    }

    #[test]
    fn teste_parsear_linha_error() {
        let linha = "[2026-01-15 10:31:00] [ERROR] Falha na conexao com banco";
        let entrada = parsear_linha(linha, "app.log").unwrap();
        assert_eq!(entrada.nivel, NivelLog::Error);
    }

    #[test]
    fn teste_linha_invalida() {
        let resultado = parsear_linha("linha sem formato", "app.log");
        assert!(resultado.is_none());
    }
}

O parser usa o chrono para interpretar timestamps e retorna uma estrutura EntradaLog contendo todas as informacoes extraidas. Linhas que nao seguem o formato esperado retornam None, permitindo que o agregador simplesmente as ignore.

Passo 2: Observador de Arquivos

O modulo watcher usa a crate notify para detectar quando novos dados sao escritos nos arquivos monitorados. Crie src/watcher.rs:

use notify::{Config, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
use std::path::PathBuf;
use std::sync::mpsc;
use std::time::Duration;

/// Inicia o monitoramento de arquivos e retorna um receptor de caminhos modificados.
pub fn iniciar_monitoramento(
    caminhos: Vec<PathBuf>,
) -> Result<(RecommendedWatcher, mpsc::Receiver<PathBuf>), Box<dyn std::error::Error>> {
    let (tx, rx) = mpsc::channel::<PathBuf>();

    let mut watcher = RecommendedWatcher::new(
        move |resultado: Result<Event, notify::Error>| {
            if let Ok(evento) = resultado {
                // Nos interessam apenas eventos de modificacao
                if matches!(evento.kind, EventKind::Modify(_)) {
                    for caminho in evento.paths {
                        let _ = tx.send(caminho);
                    }
                }
            }
        },
        Config::default().with_poll_interval(Duration::from_secs(1)),
    )?;

    // Registrar cada arquivo para monitoramento
    for caminho in &caminhos {
        if caminho.exists() {
            watcher.watch(caminho, RecursiveMode::NonRecursive)?;
            println!("Monitorando: {}", caminho.display());
        } else {
            eprintln!("Aviso: arquivo nao encontrado: {}", caminho.display());
        }
    }

    Ok((watcher, rx))
}

O watcher notifica o programa sempre que um dos arquivos monitorados e modificado. Usamos um canal (mpsc::channel) para comunicar as mudancas de forma thread-safe.

Passo 3: Agregador de Entradas

O agregador coleta todas as entradas parseadas, filtra por nivel e apresenta de forma ordenada. Crie src/aggregator.rs:

use crate::parser::{EntradaLog, NivelLog};
use colored::*;

/// Armazena e gerencia as entradas de log coletadas.
pub struct Agregador {
    entradas: Vec<EntradaLog>,
    nivel_minimo: NivelLog,
}

impl Agregador {
    pub fn new(nivel_minimo: NivelLog) -> Self {
        Agregador {
            entradas: Vec::new(),
            nivel_minimo,
        }
    }

    /// Adiciona uma nova entrada se ela atender o nivel minimo.
    pub fn adicionar(&mut self, entrada: EntradaLog) {
        if entrada.nivel >= self.nivel_minimo {
            self.imprimir_colorido(&entrada);
            self.entradas.push(entrada);
        }
    }

    /// Imprime uma entrada com cores baseadas no nivel.
    fn imprimir_colorido(&self, entrada: &EntradaLog) {
        let timestamp = entrada
            .timestamp
            .format("%Y-%m-%d %H:%M:%S")
            .to_string()
            .dimmed();

        let nivel = match entrada.nivel {
            NivelLog::Debug => "DEBUG".blue(),
            NivelLog::Info => " INFO".green(),
            NivelLog::Warn => " WARN".yellow(),
            NivelLog::Error => "ERROR".red().bold(),
        };

        let origem = entrada.arquivo_origem.as_str().cyan();

        println!("{} [{}] [{}] {}", timestamp, nivel, origem, entrada.mensagem);
    }

    /// Retorna todas as entradas ordenadas por timestamp.
    pub fn entradas_ordenadas(&mut self) -> &[EntradaLog] {
        self.entradas
            .sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
        &self.entradas
    }

    /// Retorna o total de entradas coletadas.
    pub fn total(&self) -> usize {
        self.entradas.len()
    }
}

O agregador usa a crate colored para diferenciar visualmente os niveis de log no terminal. Entradas de erro aparecem em vermelho e negrito, avisos em amarelo, informacoes em verde e debug em azul.

Passo 4: Juntando Tudo no main.rs

Agora vamos conectar todos os modulos no ponto de entrada do programa. Crie src/main.rs:

mod aggregator;
mod parser;
mod watcher;

use aggregator::Agregador;
use clap::Parser;
use parser::{parsear_linha, NivelLog};
use std::collections::HashMap;
use std::fs::File;
use std::io::{BufRead, BufReader, Seek, SeekFrom};
use std::path::PathBuf;
use std::time::Duration;

/// Agregador de Logs - coleta e unifica logs de multiplos arquivos
#[derive(Parser)]
#[command(name = "log-aggregator")]
#[command(about = "Agrega logs de multiplos arquivos em uma visao unificada")]
struct Argumentos {
    /// Arquivos de log para monitorar
    #[arg(required = true)]
    arquivos: Vec<PathBuf>,

    /// Nivel minimo de log (debug, info, warn, error)
    #[arg(short, long, default_value = "info")]
    nivel: String,

    /// Modo de monitoramento continuo (watch)
    #[arg(short, long)]
    watch: bool,
}

/// Le todas as linhas atuais de um arquivo a partir de uma posicao.
fn ler_novas_linhas(
    arquivo: &PathBuf,
    posicao: u64,
) -> Result<(Vec<String>, u64), std::io::Error> {
    let mut file = File::open(arquivo)?;
    file.seek(SeekFrom::Start(posicao))?;

    let leitor = BufReader::new(&file);
    let mut linhas = Vec::new();
    let mut nova_posicao = posicao;

    for linha in leitor.lines() {
        let linha = linha?;
        nova_posicao += linha.len() as u64 + 1; // +1 para o \n
        linhas.push(linha);
    }

    Ok((linhas, nova_posicao))
}

fn main() -> Result<(), Box<dyn std::error::Error>> {
    let args = Argumentos::parse();

    let nivel_minimo = NivelLog::from_str(&args.nivel).unwrap_or_else(|| {
        eprintln!("Nivel '{}' invalido. Usando 'info' como padrao.", args.nivel);
        NivelLog::Info
    });

    let mut agregador = Agregador::new(nivel_minimo);

    // Rastrear posicao de leitura de cada arquivo
    let mut posicoes: HashMap<PathBuf, u64> = HashMap::new();

    // Leitura inicial de todos os arquivos
    for arquivo in &args.arquivos {
        if !arquivo.exists() {
            eprintln!("Arquivo nao encontrado: {}", arquivo.display());
            continue;
        }

        let nome = arquivo
            .file_name()
            .unwrap_or_default()
            .to_string_lossy()
            .to_string();

        match ler_novas_linhas(arquivo, 0) {
            Ok((linhas, posicao)) => {
                for linha in &linhas {
                    if let Some(entrada) = parsear_linha(linha, &nome) {
                        agregador.adicionar(entrada);
                    }
                }
                posicoes.insert(arquivo.clone(), posicao);
            }
            Err(e) => eprintln!("Erro ao ler {}: {}", arquivo.display(), e),
        }
    }

    println!(
        "\n--- {} entradas coletadas ---\n",
        agregador.total()
    );

    // Modo watch: monitorar mudancas em tempo real
    if args.watch {
        println!("Modo watch ativado. Pressione Ctrl+C para sair.\n");

        let caminhos: Vec<PathBuf> = args
            .arquivos
            .iter()
            .filter(|p| p.exists())
            .cloned()
            .collect();

        let (_watcher, rx) = watcher::iniciar_monitoramento(caminhos)?;

        loop {
            match rx.recv_timeout(Duration::from_millis(500)) {
                Ok(caminho_modificado) => {
                    let nome = caminho_modificado
                        .file_name()
                        .unwrap_or_default()
                        .to_string_lossy()
                        .to_string();

                    let posicao_atual = posicoes
                        .get(&caminho_modificado)
                        .copied()
                        .unwrap_or(0);

                    if let Ok((linhas, nova_posicao)) =
                        ler_novas_linhas(&caminho_modificado, posicao_atual)
                    {
                        for linha in &linhas {
                            if let Some(entrada) = parsear_linha(linha, &nome) {
                                agregador.adicionar(entrada);
                            }
                        }
                        posicoes.insert(caminho_modificado, nova_posicao);
                    }
                }
                Err(std::sync::mpsc::RecvTimeoutError::Timeout) => continue,
                Err(_) => break,
            }
        }
    }

    Ok(())
}

O programa aceita uma lista de arquivos de log como argumentos, le o conteudo existente e, opcionalmente, continua monitorando mudancas no modo --watch. O HashMap de posicoes garante que apenas linhas novas sejam processadas a cada modificacao.

Como Executar

Primeiro, crie alguns arquivos de log de exemplo para testar:

# Criar arquivo de log do servidor web
cat > /tmp/web.log << 'EOF'
[2026-01-15 10:30:00] [INFO] Servidor web iniciado na porta 8080
[2026-01-15 10:30:05] [DEBUG] Carregando configuracoes
[2026-01-15 10:31:00] [WARN] Tempo de resposta alto: 2500ms
[2026-01-15 10:32:00] [ERROR] Falha na conexao com banco de dados
[2026-01-15 10:32:01] [INFO] Tentando reconexao automatica
EOF

# Criar arquivo de log do servico de autenticacao
cat > /tmp/auth.log << 'EOF'
[2026-01-15 10:30:02] [INFO] Servico de autenticacao iniciado
[2026-01-15 10:30:30] [INFO] Login bem-sucedido: usuario@email.com
[2026-01-15 10:31:15] [WARN] Tentativa de login com senha incorreta
[2026-01-15 10:31:45] [ERROR] Token JWT expirado para sessao abc123
EOF

Agora compile e execute:

# Compilar o projeto
cargo build --release

# Executar com nivel padrao (info)
cargo run -- /tmp/web.log /tmp/auth.log

# Filtrar apenas erros
cargo run -- /tmp/web.log /tmp/auth.log --nivel error

# Modo watch (monitoramento continuo)
cargo run -- /tmp/web.log /tmp/auth.log --watch

Saida esperada (com cores no terminal):

Monitorando: /tmp/web.log
Monitorando: /tmp/auth.log
2026-01-15 10:30:00 [ INFO] [web.log] Servidor web iniciado na porta 8080
2026-01-15 10:30:02 [ INFO] [auth.log] Servico de autenticacao iniciado
2026-01-15 10:30:30 [ INFO] [auth.log] Login bem-sucedido: usuario@email.com
2026-01-15 10:31:00 [ WARN] [web.log] Tempo de resposta alto: 2500ms
2026-01-15 10:31:15 [ WARN] [auth.log] Tentativa de login com senha incorreta
2026-01-15 10:31:45 [ERROR] [auth.log] Token JWT expirado para sessao abc123
2026-01-15 10:32:00 [ERROR] [web.log] Falha na conexao com banco de dados
2026-01-15 10:32:01 [ INFO] [web.log] Tentando reconexao automatica

--- 8 entradas coletadas ---

Desafios para Expandir

  1. Suporte a multiplos formatos: Adicione parsers para formatos como syslog, JSON lines e logs do nginx, detectando automaticamente o formato de cada arquivo.
  2. Busca por texto: Implemente um filtro --buscar que aceite expressoes regulares para buscar mensagens especificas usando a crate regex.
  3. Exportacao estruturada: Adicione opcoes para exportar as entradas agregadas em formato JSON ou CSV, facilitando a analise em outras ferramentas.
  4. Estatisticas resumidas: Ao encerrar o monitoramento, exiba um resumo com contagem de entradas por nivel, por arquivo e picos de atividade ao longo do tempo.
  5. Interface TUI: Substitua a saida simples por uma interface de terminal interativa usando a crate ratatui, com paineis divididos por arquivo e scroll em tempo real.

Veja Tambem