Load Balancer TCP em Rust

Construa um load balancer TCP em Rust com round-robin, health checks e async I/O usando tokio para distribuir conexoes.

Um load balancer e uma peca fundamental em arquiteturas de producao, distribuindo o trafego entre multiplos servidores para garantir alta disponibilidade e desempenho. Neste projeto, vamos construir um load balancer TCP assincrono com algoritmo round-robin, verificacao periodica de saude dos backends e proxy transparente de conexoes usando tokio.

Este projeto aprofunda conceitos de programacao assincrona, concorrencia com dados compartilhados, networking TCP e padroes de resiliencia.

O Que Vamos Construir

  • Proxy TCP transparente que encaminha conexoes para backends
  • Algoritmo de distribuicao round-robin
  • Health checks periodicos dos servidores backend
  • Remocao e re-adicao automatica de backends com falha
  • Contadores de conexoes e estatisticas
  • Configuracao via arquivo TOML

Estrutura do Projeto

load-balancer/
├── Cargo.toml
└── src/
    ├── main.rs
    ├── balanceador.rs
    ├── health_check.rs
    └── proxy.rs

Configurando o Projeto

cargo new load-balancer
cd load-balancer

Edite o Cargo.toml:

[package]
name = "load-balancer"
version = "0.1.0"
edition = "2021"

[dependencies]
tokio = { version = "1", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
toml = "0.8"
colored = "2.1"

Passo 1: Balanceador Round-Robin

O balanceador gerencia a lista de backends e escolhe o proximo servidor para cada nova conexao. Crie src/balanceador.rs:

use std::sync::Arc;
use tokio::sync::RwLock;

/// Representa um servidor backend.
#[derive(Debug, Clone)]
pub struct Backend {
    pub endereco: String,
    pub saudavel: bool,
    pub conexoes_ativas: u64,
    pub total_conexoes: u64,
}

impl Backend {
    pub fn new(endereco: String) -> Self {
        Backend {
            endereco,
            saudavel: true,
            conexoes_ativas: 0,
            total_conexoes: 0,
        }
    }
}

/// Balanceador round-robin com estado compartilhado.
#[derive(Clone)]
pub struct Balanceador {
    backends: Arc<RwLock<Vec<Backend>>>,
    indice_atual: Arc<RwLock<usize>>,
}

impl Balanceador {
    /// Cria um novo balanceador com a lista de enderecos de backend.
    pub fn new(enderecos: Vec<String>) -> Self {
        let backends = enderecos
            .into_iter()
            .map(Backend::new)
            .collect();

        Balanceador {
            backends: Arc::new(RwLock::new(backends)),
            indice_atual: Arc::new(RwLock::new(0)),
        }
    }

    /// Seleciona o proximo backend saudavel usando round-robin.
    pub async fn proximo_backend(&self) -> Option<String> {
        let backends = self.backends.read().await;
        let total = backends.len();

        if total == 0 {
            return None;
        }

        let mut indice = self.indice_atual.write().await;

        // Tentar encontrar um backend saudavel
        for _ in 0..total {
            let idx = *indice % total;
            *indice = (*indice + 1) % total;

            if backends[idx].saudavel {
                return Some(backends[idx].endereco.clone());
            }
        }

        None // Nenhum backend saudavel
    }

    /// Registra uma nova conexao para um backend.
    pub async fn registrar_conexao(&self, endereco: &str) {
        let mut backends = self.backends.write().await;
        if let Some(backend) = backends.iter_mut().find(|b| b.endereco == endereco) {
            backend.conexoes_ativas += 1;
            backend.total_conexoes += 1;
        }
    }

    /// Registra o encerramento de uma conexao.
    pub async fn encerrar_conexao(&self, endereco: &str) {
        let mut backends = self.backends.write().await;
        if let Some(backend) = backends.iter_mut().find(|b| b.endereco == endereco) {
            backend.conexoes_ativas = backend.conexoes_ativas.saturating_sub(1);
        }
    }

    /// Define o status de saude de um backend.
    pub async fn definir_saude(&self, endereco: &str, saudavel: bool) {
        let mut backends = self.backends.write().await;
        if let Some(backend) = backends.iter_mut().find(|b| b.endereco == endereco) {
            backend.saudavel = saudavel;
        }
    }

    /// Retorna uma copia do estado atual de todos os backends.
    pub async fn estado_backends(&self) -> Vec<Backend> {
        self.backends.read().await.clone()
    }
}

O balanceador usa Arc<RwLock<>> do tokio para permitir acesso concorrente seguro. O RwLock permite multiplas leituras simultaneas (ao selecionar backends) e acesso exclusivo para escrita (ao atualizar contadores e status).

Passo 2: Verificacao de Saude

O health check verifica periodicamente se os backends estao respondendo. Crie src/health_check.rs:

use crate::balanceador::Balanceador;
use colored::*;
use std::time::Duration;
use tokio::net::TcpStream;
use tokio::time;

/// Inicia a verificacao periodica de saude dos backends.
pub async fn iniciar_verificacao(
    balanceador: Balanceador,
    intervalo_segundos: u64,
    timeout_ms: u64,
) {
    let mut intervalo = time::interval(Duration::from_secs(intervalo_segundos));

    loop {
        intervalo.tick().await;

        let backends = balanceador.estado_backends().await;

        for backend in &backends {
            let saudavel = verificar_backend(&backend.endereco, timeout_ms).await;

            let status_anterior = backend.saudavel;
            balanceador
                .definir_saude(&backend.endereco, saudavel)
                .await;

            // Logar mudancas de status
            if status_anterior != saudavel {
                if saudavel {
                    println!(
                        "{} {} voltou ao servico",
                        "[HEALTH]".green(),
                        backend.endereco.cyan()
                    );
                } else {
                    println!(
                        "{} {} removido do pool (falha no health check)",
                        "[HEALTH]".red().bold(),
                        backend.endereco.cyan()
                    );
                }
            }
        }
    }
}

/// Verifica se um backend esta respondendo tentando uma conexao TCP.
async fn verificar_backend(endereco: &str, timeout_ms: u64) -> bool {
    let timeout = Duration::from_millis(timeout_ms);

    match time::timeout(timeout, TcpStream::connect(endereco)).await {
        Ok(Ok(_)) => true,
        _ => false,
    }
}

O health check tenta abrir uma conexao TCP com cada backend. Se a conexao falhar ou exceder o timeout, o backend e marcado como indisponivel e removido do pool de distribuicao. Quando volta a responder, e automaticamente readicionado.

Passo 3: Proxy TCP

O modulo proxy encaminha dados entre o cliente e o backend escolhido. Crie src/proxy.rs:

use colored::*;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;

/// Realiza o proxy bidirecional entre duas conexoes TCP.
pub async fn proxy_bidirecional(
    mut cliente: TcpStream,
    endereco_backend: &str,
) -> Result<(u64, u64), Box<dyn std::error::Error + Send + Sync>> {
    let mut backend = TcpStream::connect(endereco_backend).await?;

    let (mut leitura_cliente, mut escrita_cliente) = cliente.split();
    let (mut leitura_backend, mut escrita_backend) = backend.split();

    // Copiar dados em ambas as direcoes simultaneamente
    let cliente_para_backend = async {
        let mut total = 0u64;
        let mut buffer = [0u8; 8192];
        loop {
            match leitura_cliente.read(&mut buffer).await {
                Ok(0) => break,
                Ok(n) => {
                    if escrita_backend.write_all(&buffer[..n]).await.is_err() {
                        break;
                    }
                    total += n as u64;
                }
                Err(_) => break,
            }
        }
        let _ = escrita_backend.shutdown().await;
        total
    };

    let backend_para_cliente = async {
        let mut total = 0u64;
        let mut buffer = [0u8; 8192];
        loop {
            match leitura_backend.read(&mut buffer).await {
                Ok(0) => break,
                Ok(n) => {
                    if escrita_cliente.write_all(&buffer[..n]).await.is_err() {
                        break;
                    }
                    total += n as u64;
                }
                Err(_) => break,
            }
        }
        let _ = escrita_cliente.shutdown().await;
        total
    };

    let (enviados, recebidos) = tokio::join!(cliente_para_backend, backend_para_cliente);

    Ok((enviados, recebidos))
}

/// Formata o tamanho em bytes de forma legivel.
pub fn formatar_bytes(bytes: u64) -> String {
    if bytes < 1024 {
        format!("{} B", bytes)
    } else if bytes < 1_048_576 {
        format!("{:.1} KB", bytes as f64 / 1024.0)
    } else {
        format!("{:.1} MB", bytes as f64 / 1_048_576.0)
    }
}

O proxy bidirecional usa tokio::join! para copiar dados em ambas as direcoes simultaneamente. Quando qualquer lado fecha a conexao, o proxy encerra graciosamente. O buffer de 8KB oferece bom desempenho para a maioria dos casos de uso.

Passo 4: Juntando Tudo no main.rs

Crie src/main.rs:

mod balanceador;
mod health_check;
mod proxy;

use balanceador::Balanceador;
use colored::*;
use serde::Deserialize;
use std::path::PathBuf;
use tokio::net::TcpListener;

/// Configuracao do load balancer.
#[derive(Debug, Deserialize)]
struct Configuracao {
    escutar: String,
    backends: Vec<String>,
    health_check_intervalo: Option<u64>,
    health_check_timeout: Option<u64>,
}

/// Carrega a configuracao de um arquivo TOML.
fn carregar_configuracao(caminho: &PathBuf) -> Result<Configuracao, Box<dyn std::error::Error>> {
    let conteudo = std::fs::read_to_string(caminho)?;
    let config: Configuracao = toml::from_str(&conteudo)?;
    Ok(config)
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let args: Vec<String> = std::env::args().collect();

    let caminho_config = if args.len() > 1 {
        PathBuf::from(&args[1])
    } else {
        PathBuf::from("balancer.toml")
    };

    // Tentar carregar configuracao ou usar valores padrao
    let config = if caminho_config.exists() {
        carregar_configuracao(&caminho_config)?
    } else {
        eprintln!(
            "Arquivo '{}' nao encontrado. Usando configuracao padrao.\n",
            caminho_config.display()
        );
        eprintln!("Crie um arquivo 'balancer.toml' com:");
        eprintln!(
            r#"
escutar = "127.0.0.1:8080"
backends = ["127.0.0.1:3001", "127.0.0.1:3002", "127.0.0.1:3003"]
health_check_intervalo = 10
health_check_timeout = 2000
"#
        );
        Configuracao {
            escutar: "127.0.0.1:8080".to_string(),
            backends: vec![
                "127.0.0.1:3001".to_string(),
                "127.0.0.1:3002".to_string(),
                "127.0.0.1:3003".to_string(),
            ],
            health_check_intervalo: Some(10),
            health_check_timeout: Some(2000),
        }
    };

    let balanceador = Balanceador::new(config.backends.clone());

    println!("{}", "=== Load Balancer TCP ===".green().bold());
    println!("Escutando em: {}", config.escutar.cyan());
    println!("Backends:");
    for (i, backend) in config.backends.iter().enumerate() {
        println!("  {}. {}", i + 1, backend.yellow());
    }
    println!();

    // Iniciar health checks em background
    let balanceador_health = balanceador.clone();
    let intervalo_hc = config.health_check_intervalo.unwrap_or(10);
    let timeout_hc = config.health_check_timeout.unwrap_or(2000);

    tokio::spawn(async move {
        health_check::iniciar_verificacao(balanceador_health, intervalo_hc, timeout_hc).await;
    });

    // Iniciar tarefa de exibicao de estatisticas
    let balanceador_stats = balanceador.clone();
    tokio::spawn(async move {
        let mut intervalo = tokio::time::interval(tokio::time::Duration::from_secs(30));
        loop {
            intervalo.tick().await;
            let backends = balanceador_stats.estado_backends().await;
            println!("\n{}", "--- Estatisticas ---".dimmed());
            for b in &backends {
                let status = if b.saudavel {
                    "OK".green().to_string()
                } else {
                    "FALHA".red().to_string()
                };
                println!(
                    "  {} [{}] ativas: {} total: {}",
                    b.endereco, status, b.conexoes_ativas, b.total_conexoes
                );
            }
            println!();
        }
    });

    // Aceitar conexoes
    let listener = TcpListener::bind(&config.escutar).await?;
    println!(
        "{}\n",
        "Aguardando conexoes... (Ctrl+C para sair)".dimmed()
    );

    loop {
        let (cliente, addr_cliente) = listener.accept().await?;
        let balanceador_clone = balanceador.clone();

        tokio::spawn(async move {
            // Selecionar backend
            let backend_endereco = match balanceador_clone.proximo_backend().await {
                Some(endereco) => endereco,
                None => {
                    eprintln!(
                        "{} Nenhum backend disponivel para {}",
                        "[ERRO]".red().bold(),
                        addr_cliente
                    );
                    return;
                }
            };

            println!(
                "{} {} -> {}",
                "[CONN]".blue(),
                addr_cliente.to_string().dimmed(),
                backend_endereco.cyan()
            );

            balanceador_clone
                .registrar_conexao(&backend_endereco)
                .await;

            // Fazer proxy da conexao
            match proxy::proxy_bidirecional(cliente, &backend_endereco).await {
                Ok((enviados, recebidos)) => {
                    println!(
                        "{} {} fechada (enviado: {}, recebido: {})",
                        "[DONE]".green(),
                        addr_cliente.to_string().dimmed(),
                        proxy::formatar_bytes(enviados).cyan(),
                        proxy::formatar_bytes(recebidos).cyan()
                    );
                }
                Err(e) => {
                    println!(
                        "{} Erro com {} -> {}: {}",
                        "[ERRO]".red(),
                        addr_cliente,
                        backend_endereco,
                        e
                    );
                }
            }

            balanceador_clone
                .encerrar_conexao(&backend_endereco)
                .await;
        });
    }
}

O main inicia tres tarefas concorrentes: o health check periodico, a exibicao de estatisticas e o loop principal de aceitacao de conexoes. Cada conexao recebida e tratada em sua propria task, permitindo milhares de conexoes simultaneas.

Como Executar

Primeiro, crie o arquivo de configuracao balancer.toml:

escutar = "127.0.0.1:8080"
backends = ["127.0.0.1:3001", "127.0.0.1:3002", "127.0.0.1:3003"]
health_check_intervalo = 10
health_check_timeout = 2000

Para testar, inicie alguns servidores simples de backend (em terminais separados):

# Terminal 1: Backend 1
python3 -m http.server 3001

# Terminal 2: Backend 2
python3 -m http.server 3002

# Terminal 3: Backend 3
python3 -m http.server 3003

# Terminal 4: Iniciar o load balancer
cargo run

# Terminal 5: Testar com curl
curl http://127.0.0.1:8080/
curl http://127.0.0.1:8080/
curl http://127.0.0.1:8080/

Saida esperada do load balancer:

=== Load Balancer TCP ===
Escutando em: 127.0.0.1:8080
Backends:
  1. 127.0.0.1:3001
  2. 127.0.0.1:3002
  3. 127.0.0.1:3003

Aguardando conexoes... (Ctrl+C para sair)

[CONN] 127.0.0.1:54321 -> 127.0.0.1:3001
[DONE] 127.0.0.1:54321 fechada (enviado: 78 B, recebido: 1.2 KB)
[CONN] 127.0.0.1:54322 -> 127.0.0.1:3002
[DONE] 127.0.0.1:54322 fechada (enviado: 78 B, recebido: 1.2 KB)
[CONN] 127.0.0.1:54323 -> 127.0.0.1:3003
[DONE] 127.0.0.1:54323 fechada (enviado: 78 B, recebido: 1.2 KB)

Desafios para Expandir

  1. Algoritmos alternativos: Implemente least-connections (encaminha para o backend com menos conexoes ativas) e weighted round-robin (backends com pesos diferentes).
  2. Sticky sessions: Adicione suporte a sessoes fixas baseadas no IP do cliente, garantindo que o mesmo cliente sempre se conecte ao mesmo backend.
  3. Metricas Prometheus: Exponha metricas (conexoes por backend, latencia, erros) em formato Prometheus para monitoramento com Grafana.
  4. Limite de conexoes: Implemente rate limiting por IP e limite maximo de conexoes simultaneas por backend, rejeitando excedentes com uma mensagem apropriada.
  5. TLS termination: Adicione suporte a TLS usando a crate tokio-rustls para que o balanceador aceite conexoes HTTPS e encaminhe para backends HTTP.

Veja Tambem