Logs sao a principal ferramenta de diagnostico em sistemas de producao. Quando voce tem dezenas de servicos gerando arquivos de log simultaneamente, encontrar a informacao relevante se torna um desafio. Neste projeto, vamos construir um agregador de logs em Rust que monitora multiplos arquivos, analisa cada linha extraindo timestamp e nivel de severidade, e apresenta uma visao unificada e filtrada em tempo real.
Este projeto explora conceitos fundamentais de I/O de arquivos, parsing de texto, observacao de mudancas no sistema de arquivos e manipulacao de datas – habilidades essenciais para qualquer ferramenta de DevOps.
O Que Vamos Construir
- Monitoramento simultaneo de multiplos arquivos de log
- Parsing automatico de timestamps e niveis (ERROR, WARN, INFO, DEBUG)
- Filtragem por nivel de severidade
- Saida unificada ordenada por timestamp
- Deteccao de novas linhas em tempo real com a crate
notify - Formatacao colorida no terminal
Estrutura do Projeto
log-aggregator/
├── Cargo.toml
└── src/
├── main.rs
├── parser.rs
├── watcher.rs
└── aggregator.rs
Configurando o Projeto
cargo new log-aggregator
cd log-aggregator
Edite o Cargo.toml com as dependencias necessarias:
[package]
name = "log-aggregator"
version = "0.1.0"
edition = "2021"
[dependencies]
notify = "6.1"
chrono = "0.4"
colored = "2.1"
clap = { version = "4.5", features = ["derive"] }
Passo 1: Parser de Linhas de Log
O parser e responsavel por extrair informacoes estruturadas de cada linha de log. Vamos suportar o formato comum [YYYY-MM-DD HH:MM:SS] [NIVEL] mensagem.
Crie o arquivo src/parser.rs:
use chrono::NaiveDateTime;
use std::fmt;
/// Niveis de severidade suportados pelo agregador.
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub enum NivelLog {
Debug,
Info,
Warn,
Error,
}
impl NivelLog {
/// Converte uma string para o nivel de log correspondente.
pub fn from_str(s: &str) -> Option<Self> {
match s.to_uppercase().as_str() {
"DEBUG" => Some(NivelLog::Debug),
"INFO" => Some(NivelLog::Info),
"WARN" | "WARNING" => Some(NivelLog::Warn),
"ERROR" | "ERR" => Some(NivelLog::Error),
_ => None,
}
}
}
impl fmt::Display for NivelLog {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
NivelLog::Debug => write!(f, "DEBUG"),
NivelLog::Info => write!(f, "INFO"),
NivelLog::Warn => write!(f, "WARN"),
NivelLog::Error => write!(f, "ERROR"),
}
}
}
/// Representa uma entrada de log ja parseada.
#[derive(Debug, Clone)]
pub struct EntradaLog {
pub timestamp: NaiveDateTime,
pub nivel: NivelLog,
pub mensagem: String,
pub arquivo_origem: String,
}
impl fmt::Display for EntradaLog {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"[{}] [{}] [{}] {}",
self.timestamp.format("%Y-%m-%d %H:%M:%S"),
self.nivel,
self.arquivo_origem,
self.mensagem
)
}
}
/// Tenta parsear uma linha de log no formato:
/// [2026-01-15 10:30:45] [INFO] Mensagem aqui
pub fn parsear_linha(linha: &str, arquivo_origem: &str) -> Option<EntradaLog> {
let linha = linha.trim();
if linha.is_empty() {
return None;
}
// Extrair timestamp entre colchetes
let inicio_ts = linha.find('[')? + 1;
let fim_ts = linha.find(']')?;
let timestamp_str = &linha[inicio_ts..fim_ts];
let timestamp = NaiveDateTime::parse_from_str(timestamp_str, "%Y-%m-%d %H:%M:%S").ok()?;
// Extrair nivel entre o segundo par de colchetes
let restante = &linha[fim_ts + 1..].trim_start();
let inicio_nivel = restante.find('[')? + 1;
let fim_nivel = restante.find(']')?;
let nivel_str = &restante[inicio_nivel..fim_nivel];
let nivel = NivelLog::from_str(nivel_str)?;
// O restante e a mensagem
let mensagem = restante[fim_nivel + 1..].trim().to_string();
Some(EntradaLog {
timestamp,
nivel,
mensagem,
arquivo_origem: arquivo_origem.to_string(),
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn teste_parsear_linha_info() {
let linha = "[2026-01-15 10:30:45] [INFO] Servidor iniciado na porta 8080";
let entrada = parsear_linha(linha, "app.log").unwrap();
assert_eq!(entrada.nivel, NivelLog::Info);
assert_eq!(entrada.mensagem, "Servidor iniciado na porta 8080");
}
#[test]
fn teste_parsear_linha_error() {
let linha = "[2026-01-15 10:31:00] [ERROR] Falha na conexao com banco";
let entrada = parsear_linha(linha, "app.log").unwrap();
assert_eq!(entrada.nivel, NivelLog::Error);
}
#[test]
fn teste_linha_invalida() {
let resultado = parsear_linha("linha sem formato", "app.log");
assert!(resultado.is_none());
}
}
O parser usa o chrono para interpretar timestamps e retorna uma estrutura EntradaLog contendo todas as informacoes extraidas. Linhas que nao seguem o formato esperado retornam None, permitindo que o agregador simplesmente as ignore.
Passo 2: Observador de Arquivos
O modulo watcher usa a crate notify para detectar quando novos dados sao escritos nos arquivos monitorados. Crie src/watcher.rs:
use notify::{Config, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
use std::path::PathBuf;
use std::sync::mpsc;
use std::time::Duration;
/// Inicia o monitoramento de arquivos e retorna um receptor de caminhos modificados.
pub fn iniciar_monitoramento(
caminhos: Vec<PathBuf>,
) -> Result<(RecommendedWatcher, mpsc::Receiver<PathBuf>), Box<dyn std::error::Error>> {
let (tx, rx) = mpsc::channel::<PathBuf>();
let mut watcher = RecommendedWatcher::new(
move |resultado: Result<Event, notify::Error>| {
if let Ok(evento) = resultado {
// Nos interessam apenas eventos de modificacao
if matches!(evento.kind, EventKind::Modify(_)) {
for caminho in evento.paths {
let _ = tx.send(caminho);
}
}
}
},
Config::default().with_poll_interval(Duration::from_secs(1)),
)?;
// Registrar cada arquivo para monitoramento
for caminho in &caminhos {
if caminho.exists() {
watcher.watch(caminho, RecursiveMode::NonRecursive)?;
println!("Monitorando: {}", caminho.display());
} else {
eprintln!("Aviso: arquivo nao encontrado: {}", caminho.display());
}
}
Ok((watcher, rx))
}
O watcher notifica o programa sempre que um dos arquivos monitorados e modificado. Usamos um canal (mpsc::channel) para comunicar as mudancas de forma thread-safe.
Passo 3: Agregador de Entradas
O agregador coleta todas as entradas parseadas, filtra por nivel e apresenta de forma ordenada. Crie src/aggregator.rs:
use crate::parser::{EntradaLog, NivelLog};
use colored::*;
/// Armazena e gerencia as entradas de log coletadas.
pub struct Agregador {
entradas: Vec<EntradaLog>,
nivel_minimo: NivelLog,
}
impl Agregador {
pub fn new(nivel_minimo: NivelLog) -> Self {
Agregador {
entradas: Vec::new(),
nivel_minimo,
}
}
/// Adiciona uma nova entrada se ela atender o nivel minimo.
pub fn adicionar(&mut self, entrada: EntradaLog) {
if entrada.nivel >= self.nivel_minimo {
self.imprimir_colorido(&entrada);
self.entradas.push(entrada);
}
}
/// Imprime uma entrada com cores baseadas no nivel.
fn imprimir_colorido(&self, entrada: &EntradaLog) {
let timestamp = entrada
.timestamp
.format("%Y-%m-%d %H:%M:%S")
.to_string()
.dimmed();
let nivel = match entrada.nivel {
NivelLog::Debug => "DEBUG".blue(),
NivelLog::Info => " INFO".green(),
NivelLog::Warn => " WARN".yellow(),
NivelLog::Error => "ERROR".red().bold(),
};
let origem = entrada.arquivo_origem.as_str().cyan();
println!("{} [{}] [{}] {}", timestamp, nivel, origem, entrada.mensagem);
}
/// Retorna todas as entradas ordenadas por timestamp.
pub fn entradas_ordenadas(&mut self) -> &[EntradaLog] {
self.entradas
.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
&self.entradas
}
/// Retorna o total de entradas coletadas.
pub fn total(&self) -> usize {
self.entradas.len()
}
}
O agregador usa a crate colored para diferenciar visualmente os niveis de log no terminal. Entradas de erro aparecem em vermelho e negrito, avisos em amarelo, informacoes em verde e debug em azul.
Passo 4: Juntando Tudo no main.rs
Agora vamos conectar todos os modulos no ponto de entrada do programa. Crie src/main.rs:
mod aggregator;
mod parser;
mod watcher;
use aggregator::Agregador;
use clap::Parser;
use parser::{parsear_linha, NivelLog};
use std::collections::HashMap;
use std::fs::File;
use std::io::{BufRead, BufReader, Seek, SeekFrom};
use std::path::PathBuf;
use std::time::Duration;
/// Agregador de Logs - coleta e unifica logs de multiplos arquivos
#[derive(Parser)]
#[command(name = "log-aggregator")]
#[command(about = "Agrega logs de multiplos arquivos em uma visao unificada")]
struct Argumentos {
/// Arquivos de log para monitorar
#[arg(required = true)]
arquivos: Vec<PathBuf>,
/// Nivel minimo de log (debug, info, warn, error)
#[arg(short, long, default_value = "info")]
nivel: String,
/// Modo de monitoramento continuo (watch)
#[arg(short, long)]
watch: bool,
}
/// Le todas as linhas atuais de um arquivo a partir de uma posicao.
fn ler_novas_linhas(
arquivo: &PathBuf,
posicao: u64,
) -> Result<(Vec<String>, u64), std::io::Error> {
let mut file = File::open(arquivo)?;
file.seek(SeekFrom::Start(posicao))?;
let leitor = BufReader::new(&file);
let mut linhas = Vec::new();
let mut nova_posicao = posicao;
for linha in leitor.lines() {
let linha = linha?;
nova_posicao += linha.len() as u64 + 1; // +1 para o \n
linhas.push(linha);
}
Ok((linhas, nova_posicao))
}
fn main() -> Result<(), Box<dyn std::error::Error>> {
let args = Argumentos::parse();
let nivel_minimo = NivelLog::from_str(&args.nivel).unwrap_or_else(|| {
eprintln!("Nivel '{}' invalido. Usando 'info' como padrao.", args.nivel);
NivelLog::Info
});
let mut agregador = Agregador::new(nivel_minimo);
// Rastrear posicao de leitura de cada arquivo
let mut posicoes: HashMap<PathBuf, u64> = HashMap::new();
// Leitura inicial de todos os arquivos
for arquivo in &args.arquivos {
if !arquivo.exists() {
eprintln!("Arquivo nao encontrado: {}", arquivo.display());
continue;
}
let nome = arquivo
.file_name()
.unwrap_or_default()
.to_string_lossy()
.to_string();
match ler_novas_linhas(arquivo, 0) {
Ok((linhas, posicao)) => {
for linha in &linhas {
if let Some(entrada) = parsear_linha(linha, &nome) {
agregador.adicionar(entrada);
}
}
posicoes.insert(arquivo.clone(), posicao);
}
Err(e) => eprintln!("Erro ao ler {}: {}", arquivo.display(), e),
}
}
println!(
"\n--- {} entradas coletadas ---\n",
agregador.total()
);
// Modo watch: monitorar mudancas em tempo real
if args.watch {
println!("Modo watch ativado. Pressione Ctrl+C para sair.\n");
let caminhos: Vec<PathBuf> = args
.arquivos
.iter()
.filter(|p| p.exists())
.cloned()
.collect();
let (_watcher, rx) = watcher::iniciar_monitoramento(caminhos)?;
loop {
match rx.recv_timeout(Duration::from_millis(500)) {
Ok(caminho_modificado) => {
let nome = caminho_modificado
.file_name()
.unwrap_or_default()
.to_string_lossy()
.to_string();
let posicao_atual = posicoes
.get(&caminho_modificado)
.copied()
.unwrap_or(0);
if let Ok((linhas, nova_posicao)) =
ler_novas_linhas(&caminho_modificado, posicao_atual)
{
for linha in &linhas {
if let Some(entrada) = parsear_linha(linha, &nome) {
agregador.adicionar(entrada);
}
}
posicoes.insert(caminho_modificado, nova_posicao);
}
}
Err(std::sync::mpsc::RecvTimeoutError::Timeout) => continue,
Err(_) => break,
}
}
}
Ok(())
}
O programa aceita uma lista de arquivos de log como argumentos, le o conteudo existente e, opcionalmente, continua monitorando mudancas no modo --watch. O HashMap de posicoes garante que apenas linhas novas sejam processadas a cada modificacao.
Como Executar
Primeiro, crie alguns arquivos de log de exemplo para testar:
# Criar arquivo de log do servidor web
cat > /tmp/web.log << 'EOF'
[2026-01-15 10:30:00] [INFO] Servidor web iniciado na porta 8080
[2026-01-15 10:30:05] [DEBUG] Carregando configuracoes
[2026-01-15 10:31:00] [WARN] Tempo de resposta alto: 2500ms
[2026-01-15 10:32:00] [ERROR] Falha na conexao com banco de dados
[2026-01-15 10:32:01] [INFO] Tentando reconexao automatica
EOF
# Criar arquivo de log do servico de autenticacao
cat > /tmp/auth.log << 'EOF'
[2026-01-15 10:30:02] [INFO] Servico de autenticacao iniciado
[2026-01-15 10:30:30] [INFO] Login bem-sucedido: usuario@email.com
[2026-01-15 10:31:15] [WARN] Tentativa de login com senha incorreta
[2026-01-15 10:31:45] [ERROR] Token JWT expirado para sessao abc123
EOF
Agora compile e execute:
# Compilar o projeto
cargo build --release
# Executar com nivel padrao (info)
cargo run -- /tmp/web.log /tmp/auth.log
# Filtrar apenas erros
cargo run -- /tmp/web.log /tmp/auth.log --nivel error
# Modo watch (monitoramento continuo)
cargo run -- /tmp/web.log /tmp/auth.log --watch
Saida esperada (com cores no terminal):
Monitorando: /tmp/web.log
Monitorando: /tmp/auth.log
2026-01-15 10:30:00 [ INFO] [web.log] Servidor web iniciado na porta 8080
2026-01-15 10:30:02 [ INFO] [auth.log] Servico de autenticacao iniciado
2026-01-15 10:30:30 [ INFO] [auth.log] Login bem-sucedido: usuario@email.com
2026-01-15 10:31:00 [ WARN] [web.log] Tempo de resposta alto: 2500ms
2026-01-15 10:31:15 [ WARN] [auth.log] Tentativa de login com senha incorreta
2026-01-15 10:31:45 [ERROR] [auth.log] Token JWT expirado para sessao abc123
2026-01-15 10:32:00 [ERROR] [web.log] Falha na conexao com banco de dados
2026-01-15 10:32:01 [ INFO] [web.log] Tentando reconexao automatica
--- 8 entradas coletadas ---
Desafios para Expandir
- Suporte a multiplos formatos: Adicione parsers para formatos como syslog, JSON lines e logs do nginx, detectando automaticamente o formato de cada arquivo.
- Busca por texto: Implemente um filtro
--buscarque aceite expressoes regulares para buscar mensagens especificas usando a crateregex. - Exportacao estruturada: Adicione opcoes para exportar as entradas agregadas em formato JSON ou CSV, facilitando a analise em outras ferramentas.
- Estatisticas resumidas: Ao encerrar o monitoramento, exiba um resumo com contagem de entradas por nivel, por arquivo e picos de atividade ao longo do tempo.
- Interface TUI: Substitua a saida simples por uma interface de terminal interativa usando a crate
ratatui, com paineis divididos por arquivo e scroll em tempo real.