Coletor de Metricas com Prometheus

Construa um coletor de metricas em Rust que expoe contadores, gauges e histogramas no formato Prometheus via HTTP.

Observabilidade e um dos pilares de sistemas modernos em producao. O Prometheus se consolidou como o padrao de fato para coleta de metricas, e entender como seu formato funciona internamente e valioso para qualquer desenvolvedor. Neste projeto, vamos construir um coletor de metricas que implementa contadores, gauges e histogramas do zero, e os expoe via HTTP no formato texto do Prometheus para scraping.

Este projeto ensina concorrencia com dados compartilhados, servidores HTTP minimalistas e o formato de exposicao do Prometheus.

O Que Vamos Construir

  • Tipos de metricas: Counter, Gauge e Histogram
  • Registro centralizado de metricas com labels
  • Endpoint HTTP /metrics no formato texto do Prometheus
  • Thread-safe: metricas acessiveis de multiplas threads
  • Simulacao de uma aplicacao gerando metricas reais
  • Servidor HTTP simples sem framework externo

Estrutura do Projeto

metrics-collector/
├── Cargo.toml
└── src/
    ├── main.rs
    ├── metricas.rs
    ├── registro.rs
    └── servidor.rs

Configurando o Projeto

cargo new metrics-collector
cd metrics-collector

Edite o Cargo.toml:

[package]
name = "metrics-collector"
version = "0.1.0"
edition = "2021"

[dependencies]
# Projeto minimalista: sem dependencias externas alem do necessario

Este projeto demonstra o poder da biblioteca padrao do Rust – construiremos tudo usando apenas std, sem dependencias externas.

Passo 1: Tipos de Metricas

O modulo de metricas define os tres tipos fundamentais do Prometheus. Crie src/metricas.rs:

use std::collections::HashMap;
use std::sync::Mutex;

/// Um counter so pode ser incrementado (nunca decrementado).
/// Usado para: total de requisicoes, erros acumulados, bytes transferidos.
pub struct Counter {
    pub nome: String,
    pub descricao: String,
    pub labels: Vec<String>,
    valores: Mutex<HashMap<Vec<String>, f64>>,
}

impl Counter {
    pub fn new(nome: &str, descricao: &str, labels: Vec<String>) -> Self {
        Counter {
            nome: nome.to_string(),
            descricao: descricao.to_string(),
            labels,
            valores: Mutex::new(HashMap::new()),
        }
    }

    /// Incrementa o counter em 1 para os labels dados.
    pub fn incrementar(&self, label_valores: &[&str]) {
        self.adicionar(label_valores, 1.0);
    }

    /// Adiciona um valor ao counter para os labels dados.
    pub fn adicionar(&self, label_valores: &[&str], valor: f64) {
        let chave: Vec<String> = label_valores.iter().map(|s| s.to_string()).collect();
        let mut valores = self.valores.lock().unwrap();
        let entrada = valores.entry(chave).or_insert(0.0);
        *entrada += valor;
    }

    /// Formata o counter no formato texto do Prometheus.
    pub fn formatar(&self) -> String {
        let mut saida = String::new();
        saida.push_str(&format!("# HELP {} {}\n", self.nome, self.descricao));
        saida.push_str(&format!("# TYPE {} counter\n", self.nome));

        let valores = self.valores.lock().unwrap();
        for (label_vals, valor) in valores.iter() {
            let labels_str = formatar_labels(&self.labels, label_vals);
            saida.push_str(&format!("{}{} {}\n", self.nome, labels_str, valor));
        }

        saida
    }
}

/// Um gauge pode subir e descer livremente.
/// Usado para: temperatura, uso de memoria, conexoes ativas.
pub struct Gauge {
    pub nome: String,
    pub descricao: String,
    pub labels: Vec<String>,
    valores: Mutex<HashMap<Vec<String>, f64>>,
}

impl Gauge {
    pub fn new(nome: &str, descricao: &str, labels: Vec<String>) -> Self {
        Gauge {
            nome: nome.to_string(),
            descricao: descricao.to_string(),
            labels,
            valores: Mutex::new(HashMap::new()),
        }
    }

    /// Define o valor do gauge para os labels dados.
    pub fn definir(&self, label_valores: &[&str], valor: f64) {
        let chave: Vec<String> = label_valores.iter().map(|s| s.to_string()).collect();
        let mut valores = self.valores.lock().unwrap();
        valores.insert(chave, valor);
    }

    /// Incrementa o gauge.
    pub fn incrementar(&self, label_valores: &[&str]) {
        let chave: Vec<String> = label_valores.iter().map(|s| s.to_string()).collect();
        let mut valores = self.valores.lock().unwrap();
        let entrada = valores.entry(chave).or_insert(0.0);
        *entrada += 1.0;
    }

    /// Decrementa o gauge.
    pub fn decrementar(&self, label_valores: &[&str]) {
        let chave: Vec<String> = label_valores.iter().map(|s| s.to_string()).collect();
        let mut valores = self.valores.lock().unwrap();
        let entrada = valores.entry(chave).or_insert(0.0);
        *entrada -= 1.0;
    }

    /// Formata o gauge no formato texto do Prometheus.
    pub fn formatar(&self) -> String {
        let mut saida = String::new();
        saida.push_str(&format!("# HELP {} {}\n", self.nome, self.descricao));
        saida.push_str(&format!("# TYPE {} gauge\n", self.nome));

        let valores = self.valores.lock().unwrap();
        for (label_vals, valor) in valores.iter() {
            let labels_str = formatar_labels(&self.labels, label_vals);
            saida.push_str(&format!("{}{} {}\n", self.nome, labels_str, valor));
        }

        saida
    }
}

/// Um histogram distribui observacoes em buckets predefinidos.
/// Usado para: latencia de requisicoes, tamanho de payloads.
pub struct Histogram {
    pub nome: String,
    pub descricao: String,
    pub buckets: Vec<f64>,
    contagens: Mutex<Vec<u64>>,
    soma: Mutex<f64>,
    total: Mutex<u64>,
}

impl Histogram {
    pub fn new(nome: &str, descricao: &str, buckets: Vec<f64>) -> Self {
        let num_buckets = buckets.len();
        Histogram {
            nome: nome.to_string(),
            descricao: descricao.to_string(),
            buckets,
            contagens: Mutex::new(vec![0; num_buckets]),
            soma: Mutex::new(0.0),
            total: Mutex::new(0),
        }
    }

    /// Registra uma observacao no histogram.
    pub fn observar(&self, valor: f64) {
        let mut contagens = self.contagens.lock().unwrap();
        for (i, limite) in self.buckets.iter().enumerate() {
            if valor <= *limite {
                contagens[i] += 1;
            }
        }

        let mut soma = self.soma.lock().unwrap();
        *soma += valor;

        let mut total = self.total.lock().unwrap();
        *total += 1;
    }

    /// Formata o histogram no formato texto do Prometheus.
    pub fn formatar(&self) -> String {
        let mut saida = String::new();
        saida.push_str(&format!("# HELP {} {}\n", self.nome, self.descricao));
        saida.push_str(&format!("# TYPE {} histogram\n", self.nome));

        let contagens = self.contagens.lock().unwrap();
        let mut acumulado = 0u64;

        for (i, limite) in self.buckets.iter().enumerate() {
            acumulado += contagens[i];
            saida.push_str(&format!(
                "{}_bucket{{le=\"{}\"}} {}\n",
                self.nome,
                if *limite == f64::INFINITY {
                    "+Inf".to_string()
                } else {
                    format!("{}", limite)
                },
                acumulado
            ));
        }

        let soma = self.soma.lock().unwrap();
        let total = self.total.lock().unwrap();

        saida.push_str(&format!("{}_sum {}\n", self.nome, *soma));
        saida.push_str(&format!("{}_count {}\n", self.nome, *total));

        saida
    }
}

/// Formata labels no formato Prometheus: {label1="valor1",label2="valor2"}
fn formatar_labels(nomes: &[String], valores: &[String]) -> String {
    if nomes.is_empty() || valores.is_empty() {
        return String::new();
    }

    let pares: Vec<String> = nomes
        .iter()
        .zip(valores.iter())
        .map(|(nome, valor)| format!("{}=\"{}\"", nome, valor))
        .collect();

    format!("{{{}}}", pares.join(","))
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn teste_counter_incrementar() {
        let counter = Counter::new("requisicoes_total", "Total de requisicoes", vec!["metodo".to_string()]);
        counter.incrementar(&["GET"]);
        counter.incrementar(&["GET"]);
        counter.incrementar(&["POST"]);

        let texto = counter.formatar();
        assert!(texto.contains("requisicoes_total{metodo=\"GET\"} 2"));
        assert!(texto.contains("requisicoes_total{metodo=\"POST\"} 1"));
    }

    #[test]
    fn teste_gauge_definir() {
        let gauge = Gauge::new("temperatura", "Temperatura atual", vec![]);
        gauge.definir(&[], 23.5);

        let texto = gauge.formatar();
        assert!(texto.contains("temperatura 23.5"));
    }

    #[test]
    fn teste_histogram_observar() {
        let hist = Histogram::new(
            "latencia",
            "Latencia em ms",
            vec![10.0, 50.0, 100.0, 500.0],
        );
        hist.observar(5.0);
        hist.observar(25.0);
        hist.observar(75.0);

        let texto = hist.formatar();
        assert!(texto.contains("latencia_count 3"));
    }
}

Cada tipo de metrica implementa o formato de exposicao do Prometheus. Counters incluem # TYPE counter, gauges incluem # TYPE gauge, e histograms geram buckets acumulativos com _bucket, _sum e _count.

Passo 2: Registro de Metricas

O registro centraliza todas as metricas e gera a saida combinada. Crie src/registro.rs:

use crate::metricas::{Counter, Gauge, Histogram};
use std::sync::Arc;

/// Registro centralizado de todas as metricas.
pub struct Registro {
    counters: Vec<Arc<Counter>>,
    gauges: Vec<Arc<Gauge>>,
    histograms: Vec<Arc<Histogram>>,
}

impl Registro {
    pub fn new() -> Self {
        Registro {
            counters: Vec::new(),
            gauges: Vec::new(),
            histograms: Vec::new(),
        }
    }

    /// Registra um novo counter e retorna uma referencia compartilhada.
    pub fn registrar_counter(
        &mut self,
        nome: &str,
        descricao: &str,
        labels: Vec<String>,
    ) -> Arc<Counter> {
        let counter = Arc::new(Counter::new(nome, descricao, labels));
        self.counters.push(Arc::clone(&counter));
        counter
    }

    /// Registra um novo gauge e retorna uma referencia compartilhada.
    pub fn registrar_gauge(
        &mut self,
        nome: &str,
        descricao: &str,
        labels: Vec<String>,
    ) -> Arc<Gauge> {
        let gauge = Arc::new(Gauge::new(nome, descricao, labels));
        self.gauges.push(Arc::clone(&gauge));
        gauge
    }

    /// Registra um novo histogram e retorna uma referencia compartilhada.
    pub fn registrar_histogram(
        &mut self,
        nome: &str,
        descricao: &str,
        buckets: Vec<f64>,
    ) -> Arc<Histogram> {
        let hist = Arc::new(Histogram::new(nome, descricao, buckets));
        self.histograms.push(Arc::clone(&hist));
        hist
    }

    /// Gera a saida completa no formato Prometheus.
    pub fn formatar_todas(&self) -> String {
        let mut saida = String::new();

        for counter in &self.counters {
            saida.push_str(&counter.formatar());
            saida.push('\n');
        }

        for gauge in &self.gauges {
            saida.push_str(&gauge.formatar());
            saida.push('\n');
        }

        for histogram in &self.histograms {
            saida.push_str(&histogram.formatar());
            saida.push('\n');
        }

        saida
    }
}

O registro usa Arc para compartilhar as metricas entre a thread que as modifica (a aplicacao) e a thread que as expoe (o servidor HTTP). Isso garante seguranca de concorrencia sem duplicar dados.

Passo 3: Servidor HTTP

O servidor expoe o endpoint /metrics para o Prometheus consultar. Crie src/servidor.rs:

use crate::registro::Registro;
use std::io::{Read, Write};
use std::net::TcpListener;
use std::sync::Arc;
use std::sync::RwLock;

/// Inicia o servidor HTTP para exposicao de metricas.
pub fn iniciar_servidor(
    endereco: &str,
    registro: Arc<RwLock<Registro>>,
) -> Result<(), Box<dyn std::error::Error>> {
    let listener = TcpListener::bind(endereco)?;
    println!("Servidor de metricas escutando em {}", endereco);
    println!("Endpoint: http://{}/metrics\n", endereco);

    for stream in listener.incoming() {
        match stream {
            Ok(mut stream) => {
                let mut buffer = [0u8; 1024];
                let _ = stream.read(&mut buffer);

                let requisicao = String::from_utf8_lossy(&buffer);
                let primeira_linha = requisicao.lines().next().unwrap_or("");

                if primeira_linha.contains("/metrics") {
                    // Gerar metricas
                    let registro = registro.read().unwrap();
                    let corpo = registro.formatar_todas();

                    let resposta = format!(
                        "HTTP/1.1 200 OK\r\n\
                         Content-Type: text/plain; version=0.0.4; charset=utf-8\r\n\
                         Content-Length: {}\r\n\
                         \r\n\
                         {}",
                        corpo.len(),
                        corpo
                    );
                    let _ = stream.write_all(resposta.as_bytes());
                } else if primeira_linha.contains("/ ") || primeira_linha.contains("/health") {
                    let corpo = "OK\n";
                    let resposta = format!(
                        "HTTP/1.1 200 OK\r\n\
                         Content-Type: text/plain\r\n\
                         Content-Length: {}\r\n\
                         \r\n\
                         {}",
                        corpo.len(),
                        corpo
                    );
                    let _ = stream.write_all(resposta.as_bytes());
                } else {
                    let resposta = "HTTP/1.1 404 Not Found\r\n\
                                    Content-Length: 0\r\n\
                                    \r\n";
                    let _ = stream.write_all(resposta.as_bytes());
                }
            }
            Err(e) => eprintln!("Erro ao aceitar conexao: {}", e),
        }
    }

    Ok(())
}

O servidor HTTP e implementado usando apenas std::net::TcpListener, sem nenhum framework externo. Ele responde ao endpoint /metrics com o formato texto do Prometheus e a /health com um simples “OK”.

Passo 4: Juntando Tudo no main.rs

Crie src/main.rs:

mod metricas;
mod registro;
mod servidor;

use registro::Registro;
use std::sync::{Arc, RwLock};
use std::thread;
use std::time::{Duration, Instant};

fn main() -> Result<(), Box<dyn std::error::Error>> {
    let endereco = std::env::args()
        .nth(1)
        .unwrap_or_else(|| "127.0.0.1:9090".to_string());

    // Criar registro de metricas
    let mut registro = Registro::new();

    // Registrar metricas da aplicacao
    let requisicoes_total = registro.registrar_counter(
        "app_requisicoes_total",
        "Total de requisicoes HTTP recebidas",
        vec!["metodo".to_string(), "endpoint".to_string(), "status".to_string()],
    );

    let conexoes_ativas = registro.registrar_gauge(
        "app_conexoes_ativas",
        "Numero de conexoes ativas no momento",
        vec![],
    );

    let uso_memoria_bytes = registro.registrar_gauge(
        "app_uso_memoria_bytes",
        "Uso de memoria da aplicacao em bytes",
        vec![],
    );

    let latencia_requisicao = registro.registrar_histogram(
        "app_latencia_requisicao_ms",
        "Latencia das requisicoes em milissegundos",
        vec![1.0, 5.0, 10.0, 25.0, 50.0, 100.0, 250.0, 500.0, 1000.0, f64::INFINITY],
    );

    let erros_total = registro.registrar_counter(
        "app_erros_total",
        "Total de erros da aplicacao",
        vec!["tipo".to_string()],
    );

    let registro_compartilhado = Arc::new(RwLock::new(registro));

    // Iniciar servidor HTTP em thread separada
    let registro_servidor = Arc::clone(&registro_compartilhado);
    let endereco_clone = endereco.clone();
    thread::spawn(move || {
        if let Err(e) = servidor::iniciar_servidor(&endereco_clone, registro_servidor) {
            eprintln!("Erro no servidor: {}", e);
        }
    });

    println!("=== Coletor de Metricas Prometheus ===");
    println!("Simulando atividade da aplicacao...\n");

    // Simular atividade da aplicacao gerando metricas
    let mut rng_estado: u64 = 42;
    let inicio = Instant::now();

    loop {
        // Gerador simples de numeros pseudo-aleatorios (LCG)
        rng_estado = rng_estado.wrapping_mul(6364136223846793005).wrapping_add(1);
        let aleatorio = ((rng_estado >> 33) as f64) / (u32::MAX as f64);

        // Simular requisicoes HTTP
        let metodos = ["GET", "POST", "PUT", "DELETE"];
        let endpoints = ["/api/usuarios", "/api/produtos", "/api/pedidos", "/health"];
        let status_codes = ["200", "201", "400", "404", "500"];

        let metodo_idx = (aleatorio * metodos.len() as f64) as usize % metodos.len();
        let endpoint_idx = (aleatorio * 7.0) as usize % endpoints.len();

        // Maioria das requisicoes com sucesso
        let status_idx = if aleatorio > 0.1 {
            if aleatorio > 0.3 { 0 } else { 1 }
        } else if aleatorio > 0.05 {
            (aleatorio * 4.0) as usize % 3 + 2
        } else {
            4 // 500 - erro do servidor
        };

        requisicoes_total.incrementar(&[
            metodos[metodo_idx],
            endpoints[endpoint_idx],
            status_codes[status_idx],
        ]);

        // Simular latencia (maioria rapida, algumas lentas)
        let latencia = if aleatorio > 0.95 {
            aleatorio * 1000.0 // Requisicao lenta
        } else {
            aleatorio * 50.0 // Requisicao normal
        };
        latencia_requisicao.observar(latencia);

        // Simular erros ocasionais
        if aleatorio > 0.95 {
            let tipos_erro = ["timeout", "conexao_recusada", "parse_error"];
            let tipo_idx = (aleatorio * 10.0) as usize % tipos_erro.len();
            erros_total.incrementar(&[tipos_erro[tipo_idx]]);
        }

        // Atualizar gauges
        let conexoes = 10.0 + aleatorio * 90.0;
        conexoes_ativas.definir(&[], conexoes);

        let memoria = 50_000_000.0 + aleatorio * 200_000_000.0;
        uso_memoria_bytes.definir(&[], memoria);

        // Exibir progresso a cada 10 segundos
        let decorrido = inicio.elapsed().as_secs();
        if decorrido % 10 == 0 && decorrido > 0 {
            println!(
                "[{:>4}s] Metricas sendo geradas... Acesse http://{}/metrics",
                decorrido, endereco
            );
        }

        thread::sleep(Duration::from_millis(100));
    }
}

O main registra as metricas, inicia o servidor HTTP em uma thread separada e entra em um loop que simula a atividade de uma aplicacao real, gerando requisicoes com diferentes metodos, endpoints, latencias e erros. As metricas sao continuamente atualizadas e podem ser consultadas a qualquer momento via HTTP.

Como Executar

# Compilar
cargo build --release

# Iniciar o coletor (porta padrao 9090)
cargo run

# Ou especificar porta
cargo run -- 127.0.0.1:8080

# Em outro terminal, consultar as metricas
curl http://127.0.0.1:9090/metrics

Saida esperada do endpoint /metrics:

# HELP app_requisicoes_total Total de requisicoes HTTP recebidas
# TYPE app_requisicoes_total counter
app_requisicoes_total{metodo="GET",endpoint="/api/usuarios",status="200"} 42
app_requisicoes_total{metodo="POST",endpoint="/api/produtos",status="201"} 15
app_requisicoes_total{metodo="GET",endpoint="/health",status="200"} 38

# HELP app_conexoes_ativas Numero de conexoes ativas no momento
# TYPE app_conexoes_ativas gauge
app_conexoes_ativas 67

# HELP app_uso_memoria_bytes Uso de memoria da aplicacao em bytes
# TYPE app_uso_memoria_bytes gauge
app_uso_memoria_bytes 156234567

# HELP app_latencia_requisicao_ms Latencia das requisicoes em milissegundos
# TYPE app_latencia_requisicao_ms histogram
app_latencia_requisicao_ms_bucket{le="1"} 5
app_latencia_requisicao_ms_bucket{le="5"} 18
app_latencia_requisicao_ms_bucket{le="10"} 35
app_latencia_requisicao_ms_bucket{le="50"} 89
app_latencia_requisicao_ms_bucket{le="100"} 93
app_latencia_requisicao_ms_bucket{le="+Inf"} 100
app_latencia_requisicao_ms_sum 2345.67
app_latencia_requisicao_ms_count 100

Para configurar o Prometheus para coletar estas metricas, adicione ao prometheus.yml:

scrape_configs:
  - job_name: 'minha_app_rust'
    scrape_interval: 15s
    static_configs:
      - targets: ['127.0.0.1:9090']

Desafios para Expandir

  1. Summary: Implemente o tipo Summary do Prometheus, que calcula quantis (p50, p90, p99) de observacoes em uma janela de tempo deslizante.
  2. Labels dinamicos: Adicione suporte a labels em histograms e permita registrar metricas com conjuntos de labels variaveis em tempo de execucao.
  3. Push gateway: Implemente o envio ativo de metricas para um Prometheus Pushgateway via HTTP POST, util para jobs de curta duracao que terminam antes do scraping.
  4. Metricas do processo: Adicione metricas automaticas do processo como process_cpu_seconds_total, process_resident_memory_bytes e process_open_fds usando a crate sysinfo.
  5. Dashboard Grafana: Crie um arquivo de provisioning do Grafana com dashboards predefinidos para visualizar as metricas geradas, incluindo graficos de latencia e taxa de requisicoes.

Veja Tambem