Tower: Abstrações de Serviço Modulares para Rust

Guia completo do Tower em Rust: trait Service, Layer, middleware embutido (timeout, rate limit, retry), criação de middleware customizado, integração com Axum, Hyper e Tonic.

Introdução

O Tower é uma biblioteca de abstrações de serviço para Rust que define como componentes de rede se comunicam de forma composável e modular. Ele fornece o trait Service – uma interface comum para funções assíncronas que recebem uma requisição e retornam uma resposta – e um sistema de Layer para empilhar middleware de forma elegante.

Se você já usou Axum, Hyper, Tonic ou Reqwest, você já usou Tower indiretamente. Esses frameworks utilizam o trait Service do Tower como base da sua arquitetura, permitindo que middleware escritos para um framework funcionem em qualquer outro que siga o mesmo padrão.

Por que o Tower é importante?

  • Composabilidade: empilhe middleware como blocos de construção
  • Reutilização: o mesmo middleware funciona com Axum, Hyper, Tonic, etc.
  • Middleware pronto: timeout, rate limiting, retry, load balancing, circuit breaker
  • Testabilidade: cada camada pode ser testada isoladamente
  • Performance: abstrações de custo zero com trait objects opcionais
  • Ecossistema: é a cola que conecta todo o stack de rede do Rust

Instalação

Adicione o Tower ao seu Cargo.toml:

[dependencies]
tower = { version = "0.5", features = ["full"] }
tokio = { version = "1", features = ["full"] }

Features disponíveis:

[dependencies]
tower = { version = "0.5", features = [
    "timeout",     # Middleware de timeout
    "retry",       # Middleware de retry
    "limit",       # Rate limiting e concurrency limiting
    "buffer",      # Buffer de requisições
    "load-shed",   # Descarte de carga
    "balance",     # Load balancing
    "discover",    # Service discovery
    "filter",      # Filtragem de requisições
    "hedge",       # Hedged requests
    "util",        # Utilitários gerais
] }

Para usar com Axum, você também vai precisar do tower-http:

[dependencies]
tower = "0.5"
tower-http = { version = "0.6", features = [
    "cors",        # CORS
    "trace",       # Tracing/logging
    "compression-gzip",  # Compressão
    "timeout",     # Timeout de resposta
    "auth",        # Autenticação
    "request-id",  # ID de requisição
] }

Uso Básico

O trait Service

O coração do Tower é o trait Service:

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use tower::Service;

// O trait Service simplificado:
// trait Service<Request> {
//     type Response;
//     type Error;
//     type Future: Future<Output = Result<Self::Response, Self::Error>>;
//
//     fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
//     fn call(&mut self, req: Request) -> Self::Future;
// }

// Implementando um Service manualmente
#[derive(Clone)]
struct MeuServico;

impl Service<String> for MeuServico {
    type Response = String;
    type Error = std::convert::Infallible;
    type Future = Pin<Box<dyn Future<Output = Result<String, Self::Error>> + Send>>;

    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        Poll::Ready(Ok(())) // Sempre pronto
    }

    fn call(&mut self, req: String) -> Self::Future {
        Box::pin(async move {
            Ok(format!("Olá, {}!", req))
        })
    }
}

#[tokio::main]
async fn main() {
    let mut servico = MeuServico;

    // Verificar se o serviço está pronto
    use tower::ServiceExt; // Importa métodos utilitários
    let resposta = servico.ready().await.unwrap().call("Rust".to_string()).await.unwrap();
    println!("{}", resposta); // "Olá, Rust!"
}

ServiceBuilder e Layers

O ServiceBuilder permite empilhar middleware de forma declarativa:

use std::time::Duration;
use tower::ServiceBuilder;
use tower::timeout::TimeoutLayer;
use tower::limit::RateLimitLayer;
use tower::limit::ConcurrencyLimitLayer;

// Empilhar middleware com ServiceBuilder
fn criar_service_com_middleware() {
    let _service = ServiceBuilder::new()
        // Timeout de 30 segundos
        .layer(TimeoutLayer::new(Duration::from_secs(30)))
        // Limite de 100 requisições por segundo
        .layer(RateLimitLayer::new(100, Duration::from_secs(1)))
        // Máximo de 50 requisições simultâneas
        .layer(ConcurrencyLimitLayer::new(50))
        // Serviço base (o handler real)
        .service_fn(|req: String| async move {
            Ok::<_, std::convert::Infallible>(format!("Processado: {}", req))
        });
}

Usando com Axum

O caso de uso mais comum do Tower é adicionar middleware a aplicações Axum:

use axum::{routing::get, Router};
use std::time::Duration;
use tower::ServiceBuilder;
use tower_http::cors::CorsLayer;
use tower_http::timeout::TimeoutLayer;
use tower_http::trace::TraceLayer;

async fn handler() -> &'static str {
    "Olá, mundo!"
}

#[tokio::main]
async fn main() {
    let app = Router::new()
        .route("/", get(handler))
        .layer(
            ServiceBuilder::new()
                // Tracing/logging de requisições
                .layer(TraceLayer::new_for_http())
                // Timeout de resposta
                .layer(TimeoutLayer::new(Duration::from_secs(30)))
                // CORS
                .layer(CorsLayer::permissive())
        );

    let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
    axum::serve(listener, app).await.unwrap();
}

Recursos Avançados

Middleware de Timeout

use std::time::Duration;
use tower::timeout::Timeout;
use tower::ServiceExt;

async fn exemplo_timeout() {
    // Serviço que demora muito
    let servico_lento = tower::service_fn(|_req: ()| async {
        tokio::time::sleep(Duration::from_secs(10)).await;
        Ok::<_, std::convert::Infallible>("Concluído")
    });

    // Adicionar timeout de 2 segundos
    let mut servico_com_timeout = Timeout::new(servico_lento, Duration::from_secs(2));

    match servico_com_timeout.ready().await.unwrap().call(()).await {
        Ok(resp) => println!("Resposta: {}", resp),
        Err(e) => println!("Timeout! Erro: {}", e), // Vai cair aqui
    }
}

Middleware de Rate Limiting

use std::time::Duration;
use tower::limit::RateLimit;
use tower::ServiceExt;

async fn exemplo_rate_limit() {
    let servico = tower::service_fn(|req: u32| async move {
        Ok::<_, std::convert::Infallible>(format!("Processado: {}", req))
    });

    // Máximo 5 requisições por segundo
    let mut servico_limitado = RateLimit::new(servico, 5, Duration::from_secs(1));

    for i in 0..10 {
        match servico_limitado.ready().await {
            Ok(svc) => {
                let resp = svc.call(i).await.unwrap();
                println!("{}", resp);
            }
            Err(e) => println!("Rate limited: {}", e),
        }
    }
}

Middleware de Retry

use std::time::Duration;
use tower::retry::{Policy, Retry};
use tower::ServiceExt;

// Política de retry customizada
#[derive(Clone)]
struct MinhaRetryPolicy {
    tentativas_restantes: u32,
}

impl Policy<String, String, String> for MinhaRetryPolicy {
    type Future = std::future::Ready<()>;

    fn retry(
        &mut self,
        _req: &mut String,
        resultado: &mut Result<String, String>,
    ) -> Option<Self::Future> {
        match resultado {
            Ok(_) => None, // Sucesso, não tentar novamente
            Err(_) if self.tentativas_restantes > 0 => {
                self.tentativas_restantes -= 1;
                println!(
                    "Retry! Tentativas restantes: {}",
                    self.tentativas_restantes
                );
                Some(std::future::ready(()))
            }
            Err(_) => None, // Sem tentativas restantes
        }
    }

    fn clone_request(&mut self, req: &String) -> Option<String> {
        Some(req.clone())
    }
}

async fn exemplo_retry() {
    let mut contador = 0u32;

    let servico = tower::service_fn(move |_req: String| {
        contador += 1;
        async move {
            if contador < 3 {
                Err::<String, String>("Falha temporária".to_string())
            } else {
                Ok("Sucesso após retries!".to_string())
            }
        }
    });

    let policy = MinhaRetryPolicy {
        tentativas_restantes: 3,
    };

    let mut servico_com_retry = Retry::new(policy, servico);

    let resultado = servico_com_retry
        .ready()
        .await
        .unwrap()
        .call("teste".to_string())
        .await;

    println!("Resultado: {:?}", resultado);
}

Middleware de Concurrency Limit

use tower::limit::ConcurrencyLimit;
use tower::ServiceExt;

async fn exemplo_concurrency_limit() {
    let servico = tower::service_fn(|req: u32| async move {
        // Simular processamento demorado
        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
        Ok::<_, std::convert::Infallible>(format!("Processado: {}", req))
    });

    // Máximo 3 requisições simultâneas
    let servico_limitado = ConcurrencyLimit::new(servico, 3);

    // As requisições além do limite vão esperar
    let mut handles = vec![];
    for i in 0..10 {
        let mut svc = servico_limitado.clone();
        handles.push(tokio::spawn(async move {
            let resp = svc.ready().await.unwrap().call(i).await.unwrap();
            println!("{}", resp);
        }));
    }

    for h in handles {
        let _ = h.await;
    }
}

Criando middleware customizado

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Instant;
use tower::{Layer, Service};

// === Layer: fábrica de middleware ===

#[derive(Clone)]
struct LoggingLayer;

impl<S> Layer<S> for LoggingLayer {
    type Service = LoggingMiddleware<S>;

    fn layer(&self, inner: S) -> Self::Service {
        LoggingMiddleware { inner }
    }
}

// === Middleware: o wrapper do serviço ===

#[derive(Clone)]
struct LoggingMiddleware<S> {
    inner: S,
}

impl<S, Req> Service<Req> for LoggingMiddleware<S>
where
    S: Service<Req>,
    S::Future: Send + 'static,
    S::Response: std::fmt::Debug + Send + 'static,
    S::Error: std::fmt::Debug + Send + 'static,
    Req: std::fmt::Debug + Send + 'static,
{
    type Response = S::Response;
    type Error = S::Error;
    type Future = Pin<Box<dyn Future<Output = Result<S::Response, S::Error>> + Send>>;

    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        self.inner.poll_ready(cx)
    }

    fn call(&mut self, req: Req) -> Self::Future {
        let inicio = Instant::now();
        println!("[LOG] Requisição recebida: {:?}", req);

        let future = self.inner.call(req);

        Box::pin(async move {
            let resultado = future.await;
            let duracao = inicio.elapsed();

            match &resultado {
                Ok(resp) => println!("[LOG] Resposta OK em {:?}: {:?}", duracao, resp),
                Err(err) => println!("[LOG] Erro em {:?}: {:?}", duracao, err),
            }

            resultado
        })
    }
}

Middleware com estado compartilhado

use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use tower::{Layer, Service};

// Métricas compartilhadas
#[derive(Default)]
struct Metricas {
    total_requisicoes: AtomicU64,
    total_erros: AtomicU64,
    total_sucesso: AtomicU64,
}

// Layer com estado compartilhado
#[derive(Clone)]
struct MetricasLayer {
    metricas: Arc<Metricas>,
}

impl MetricasLayer {
    fn new() -> (Self, Arc<Metricas>) {
        let metricas = Arc::new(Metricas::default());
        (
            Self {
                metricas: metricas.clone(),
            },
            metricas,
        )
    }
}

impl<S> Layer<S> for MetricasLayer {
    type Service = MetricasMiddleware<S>;

    fn layer(&self, inner: S) -> Self::Service {
        MetricasMiddleware {
            inner,
            metricas: self.metricas.clone(),
        }
    }
}

#[derive(Clone)]
struct MetricasMiddleware<S> {
    inner: S,
    metricas: Arc<Metricas>,
}

impl<S, Req> Service<Req> for MetricasMiddleware<S>
where
    S: Service<Req>,
    S::Future: Send + 'static,
    S::Response: Send + 'static,
    S::Error: Send + 'static,
    Req: Send + 'static,
{
    type Response = S::Response;
    type Error = S::Error;
    type Future = Pin<Box<dyn Future<Output = Result<S::Response, S::Error>> + Send>>;

    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        self.inner.poll_ready(cx)
    }

    fn call(&mut self, req: Req) -> Self::Future {
        self.metricas.total_requisicoes.fetch_add(1, Ordering::Relaxed);
        let metricas = self.metricas.clone();
        let future = self.inner.call(req);

        Box::pin(async move {
            let resultado = future.await;
            match &resultado {
                Ok(_) => metricas.total_sucesso.fetch_add(1, Ordering::Relaxed),
                Err(_) => metricas.total_erros.fetch_add(1, Ordering::Relaxed),
            };
            resultado
        })
    }
}

// Uso
async fn exemplo_metricas() {
    let (layer, metricas) = MetricasLayer::new();

    let _service = tower::ServiceBuilder::new()
        .layer(layer)
        .service_fn(|req: String| async move {
            Ok::<_, String>(format!("OK: {}", req))
        });

    // Em outro lugar, ler as métricas
    println!("Total: {}", metricas.total_requisicoes.load(Ordering::Relaxed));
    println!("Sucesso: {}", metricas.total_sucesso.load(Ordering::Relaxed));
    println!("Erros: {}", metricas.total_erros.load(Ordering::Relaxed));
}

Integração com tower-http para aplicações web

use axum::{
    extract::Request,
    http::StatusCode,
    middleware::{self, Next},
    response::Response,
    routing::get,
    Router,
};
use std::time::Duration;
use tower::ServiceBuilder;
use tower_http::{
    compression::CompressionLayer,
    cors::{Any, CorsLayer},
    request_id::{MakeRequestUuid, PropagateRequestIdLayer, SetRequestIdLayer},
    timeout::TimeoutLayer,
    trace::TraceLayer,
};

async fn handler() -> &'static str {
    "API funcionando!"
}

async fn saude() -> &'static str {
    "OK"
}

fn criar_app() -> Router {
    // CORS configurado
    let cors = CorsLayer::new()
        .allow_origin(Any)
        .allow_methods(Any)
        .allow_headers(Any);

    Router::new()
        .route("/", get(handler))
        .route("/saude", get(saude))
        .layer(
            ServiceBuilder::new()
                // ID único para cada requisição
                .layer(SetRequestIdLayer::x_request_id(MakeRequestUuid))
                .layer(PropagateRequestIdLayer::x_request_id())
                // Logging/tracing
                .layer(TraceLayer::new_for_http())
                // Timeout
                .layer(TimeoutLayer::new(Duration::from_secs(30)))
                // Compressão
                .layer(CompressionLayer::new())
                // CORS
                .layer(cors)
        )
}

Boas Práticas

1. Ordem dos layers importa

Os layers são aplicados de fora para dentro. O primeiro layer no ServiceBuilder é o mais externo:

use tower::ServiceBuilder;
use tower::timeout::TimeoutLayer;
use tower::limit::RateLimitLayer;
use std::time::Duration;

fn exemplo_ordem() {
    let _service = ServiceBuilder::new()
        // 1. Timeout (mais externo) - cancela se tudo demorar mais que 30s
        .layer(TimeoutLayer::new(Duration::from_secs(30)))
        // 2. Rate limit - limita ANTES de processar
        .layer(RateLimitLayer::new(100, Duration::from_secs(1)))
        // 3. Serviço base (mais interno)
        .service_fn(|_req: ()| async { Ok::<_, String>("OK".to_string()) });
}

2. Use ServiceBuilder para composição legível

use tower::ServiceBuilder;
use std::time::Duration;

// Bom: ServiceBuilder com layers nomeados
fn criar_service_bom() {
    let _svc = ServiceBuilder::new()
        .layer(tower::timeout::TimeoutLayer::new(Duration::from_secs(30)))
        .layer(tower::limit::ConcurrencyLimitLayer::new(100))
        .service_fn(|_r: ()| async { Ok::<_, String>("OK".to_string()) });
}

3. Extraia middleware reutilizável em crates separados

// meu_middleware/src/lib.rs
use tower::Layer;

/// Layer de autenticação reutilizável
#[derive(Clone)]
pub struct AuthLayer {
    secret: String,
}

impl AuthLayer {
    pub fn new(secret: impl Into<String>) -> Self {
        Self {
            secret: secret.into(),
        }
    }
}

impl<S> Layer<S> for AuthLayer {
    type Service = AuthMiddleware<S>;

    fn layer(&self, inner: S) -> Self::Service {
        AuthMiddleware {
            inner,
            secret: self.secret.clone(),
        }
    }
}

#[derive(Clone)]
pub struct AuthMiddleware<S> {
    inner: S,
    secret: String,
}

4. Teste middleware isoladamente

#[cfg(test)]
mod tests {
    use super::*;
    use tower::ServiceExt;

    #[tokio::test]
    async fn test_logging_middleware() {
        // Serviço mock
        let servico = tower::service_fn(|_req: String| async {
            Ok::<_, String>("resposta".to_string())
        });

        // Aplicar o middleware
        let mut servico_com_log = LoggingLayer.layer(servico);

        // Testar
        let resultado = servico_com_log
            .ready()
            .await
            .unwrap()
            .call("teste".to_string())
            .await;

        assert_eq!(resultado.unwrap(), "resposta");
    }

    #[tokio::test]
    async fn test_metricas_middleware() {
        let (layer, metricas) = MetricasLayer::new();

        let servico = tower::service_fn(|_req: u32| async {
            Ok::<_, String>("ok".to_string())
        });

        let mut svc = layer.layer(servico);

        // Fazer 3 requisições
        for i in 0..3 {
            svc.ready().await.unwrap().call(i).await.unwrap();
        }

        assert_eq!(metricas.total_requisicoes.load(std::sync::atomic::Ordering::Relaxed), 3);
        assert_eq!(metricas.total_sucesso.load(std::sync::atomic::Ordering::Relaxed), 3);
    }
}

Exemplos Práticos

Middleware completo de logging de requisições HTTP

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Instant;
use tower::{Layer, Service};
use hyper::{Request, Response, body::Bytes};
use http_body_util::Full;

#[derive(Clone)]
pub struct RequestLoggerLayer;

impl<S> Layer<S> for RequestLoggerLayer {
    type Service = RequestLogger<S>;

    fn layer(&self, inner: S) -> Self::Service {
        RequestLogger { inner }
    }
}

#[derive(Clone)]
pub struct RequestLogger<S> {
    inner: S,
}

impl<S, B> Service<Request<B>> for RequestLogger<S>
where
    S: Service<Request<B>, Response = Response<Full<Bytes>>> + Clone + Send + 'static,
    S::Future: Send + 'static,
    S::Error: std::fmt::Display + Send + 'static,
    B: Send + 'static,
{
    type Response = Response<Full<Bytes>>;
    type Error = S::Error;
    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;

    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        self.inner.poll_ready(cx)
    }

    fn call(&mut self, req: Request<B>) -> Self::Future {
        let metodo = req.method().clone();
        let uri = req.uri().clone();
        let versao = req.version();
        let user_agent = req
            .headers()
            .get("user-agent")
            .and_then(|v| v.to_str().ok())
            .unwrap_or("desconhecido")
            .to_string();

        let inicio = Instant::now();
        let future = self.inner.call(req);

        Box::pin(async move {
            let resultado = future.await;
            let duracao = inicio.elapsed();

            match &resultado {
                Ok(resp) => {
                    let status = resp.status();
                    let nivel = if status.is_success() {
                        "INFO"
                    } else if status.is_client_error() {
                        "WARN"
                    } else {
                        "ERROR"
                    };

                    println!(
                        "[{}] {} {} {:?} -> {} ({:?}) UA: {}",
                        nivel, metodo, uri, versao, status, duracao, user_agent
                    );
                }
                Err(err) => {
                    println!(
                        "[ERROR] {} {} -> Erro: {} ({:?})",
                        metodo, uri, err, duracao
                    );
                }
            }

            resultado
        })
    }
}

Stack completo de middleware para produção com Axum

use axum::{
    body::Body,
    http::{header, Method, StatusCode},
    response::IntoResponse,
    routing::{get, post},
    Json, Router,
};
use serde_json::json;
use std::time::Duration;
use tower::ServiceBuilder;
use tower_http::{
    compression::CompressionLayer,
    cors::CorsLayer,
    timeout::TimeoutLayer,
    trace::TraceLayer,
};

async fn listar_itens() -> impl IntoResponse {
    Json(json!({
        "itens": [
            {"id": 1, "nome": "Item 1"},
            {"id": 2, "nome": "Item 2"},
        ]
    }))
}

async fn criar_item(Json(body): Json<serde_json::Value>) -> impl IntoResponse {
    (
        StatusCode::CREATED,
        Json(json!({
            "criado": true,
            "dados": body
        })),
    )
}

async fn saude() -> impl IntoResponse {
    Json(json!({"status": "ok", "versao": "1.0.0"}))
}

fn criar_app_producao() -> Router {
    // Configurar CORS para produção
    let cors = CorsLayer::new()
        .allow_origin([
            "https://meusite.com.br".parse().unwrap(),
            "https://admin.meusite.com.br".parse().unwrap(),
        ])
        .allow_methods([Method::GET, Method::POST, Method::PUT, Method::DELETE])
        .allow_headers([header::CONTENT_TYPE, header::AUTHORIZATION])
        .max_age(Duration::from_secs(3600));

    // Stack de middleware
    let middleware_stack = ServiceBuilder::new()
        // Tracing
        .layer(TraceLayer::new_for_http())
        // Timeout global
        .layer(TimeoutLayer::new(Duration::from_secs(30)))
        // Compressão de resposta
        .layer(CompressionLayer::new())
        // CORS
        .layer(cors);

    Router::new()
        .route("/saude", get(saude))
        .route("/itens", get(listar_itens).post(criar_item))
        .layer(middleware_stack)
}

#[tokio::main]
async fn main() {
    // Inicializar tracing
    tracing_subscriber::fmt::init();

    let app = criar_app_producao();

    println!("Servidor de produção em http://0.0.0.0:3000");
    let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
    axum::serve(listener, app).await.unwrap();
}

Comparação com Alternativas

CaracterísticaToweractix-web MiddlewareWarp Filters
AbordagemService + LayerTransform traitFilter composition
ComposabilidadeExcelenteBoaBoa
ReutilizaçãoCross-frameworkActix-onlyWarp-only
Middleware prontostower + tower-httpactix-web embutidoWarp embutido
TipagemForteForteForte (tipos complexos)
Curva de aprendizadoMédia-altaMédiaMédia-alta
EcossistemaAxum, Hyper, TonicActix WebWarp
  • Tower vs Actix Web Middleware: Tower é cross-framework, enquanto middleware do Actix só funciona com Actix Web. Se usar Axum/Hyper/Tonic, Tower é a escolha natural.
  • Tower vs Warp Filters: Warp usa composição de filtros que é conceitualmente diferente. Tower é mais flexível e reutilizável entre frameworks.
  • Quando usar Tower diretamente: quando precisa de middleware que funcione em múltiplos frameworks ou quando está construindo infraestrutura de rede customizada.

Conclusão

O Tower é a infraestrutura invisível que torna o ecossistema de rede do Rust tão poderoso e composável. Mesmo que você nunca crie um Layer customizado, entender como o Tower funciona vai ajudá-lo a usar melhor frameworks como Axum e Tonic, e a aproveitar a rica biblioteca de middleware disponível.

A chave do Tower é a separação clara entre o que um serviço faz (implementação do Service trait) e como ele é decorado (composição de Layers). Essa separação permite construir stacks de middleware complexos de forma legível e testável.

Próximos passos

  • Use tower-http para middleware HTTP pronto (CORS, compressão, tracing)
  • Explore Axum para ver Tower em ação em um framework web completo
  • Estude Tonic para usar Tower com gRPC
  • Aprenda Tracing para observabilidade dos seus serviços Tower