---
title: "Async Streams em Rust: Processamento de Dados em Tempo Real — 2026"
url: "https://rustlang.com.br/blog/async-streams-rust-processamento-tempo-real-2026/"
markdown_url: "https://rustlang.com.br/blog/async-streams-rust-processamento-tempo-real-2026.MD"
description: "Aprenda a usar async streams em Rust com tokio-stream, futures e StreamExt para processar dados em tempo real. Exemplos práticos e padrões modernos."
date: "2026-04-10"
author: "Equipe Rust Brasil"
---

# Async Streams em Rust: Processamento de Dados em Tempo Real — 2026

Aprenda a usar async streams em Rust com tokio-stream, futures e StreamExt para processar dados em tempo real. Exemplos práticos e padrões modernos.


## Introdução

Se você já trabalha com [async/await em Rust](/artigos/async-await-profundidade/), sabe que `Future` resolve **um único valor**. Mas e quando precisa processar uma sequência contínua de dados — mensagens de WebSocket, linhas de um arquivo enorme, eventos de sensores IoT ou updates de uma API? É aí que entram os **async streams**.

Async streams são a versão assíncrona dos [iteradores](/artigos/iteradores-rust/): em vez de bloquear a thread para cada item, eles suspendem a task e liberam o runtime para fazer outras coisas. Em 2026, com o ecossistema do [Tokio](/ecossistema/tokio/) maduro e a trait `Stream` consolidada, dominar async streams é essencial para qualquer dev Rust que trabalhe com I/O.

## O que é um Async Stream?

Um async stream implementa a trait `Stream` do crate `futures-core`:

```rust
pub trait Stream {
    type Item;
    fn poll_next(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Option<Self::Item>>;
}
```

Parece complexo, mas na prática você raramente implementa `poll_next` manualmente. Assim como usamos `async fn` em vez de implementar `Future::poll`, existem ferramentas que criam streams de forma ergonômica.

A analogia é direta:

| Síncrono       | Assíncrono         |
|----------------|--------------------|
| `Iterator`     | `Stream`           |
| `next()`       | `next().await`     |
| `for item in iter` | `while let Some(item) = stream.next().await` |

## Criando Streams com tokio-stream

O crate `tokio-stream` oferece adaptadores prontos. Adicione ao seu `Cargo.toml`:

```toml
[dependencies]
tokio = { version = "1", features = ["full"] }
tokio-stream = "0.1"
futures = "0.3"
```

### Stream a partir de um intervalo

```rust
use tokio_stream::StreamExt;
use tokio::time::{interval, Duration};
use tokio_stream::wrappers::IntervalStream;

#[tokio::main]
async fn main() {
    let stream = IntervalStream::new(interval(Duration::from_secs(1)));

    // Pegar apenas os 5 primeiros ticks
    let mut stream = stream.take(5);

    while let Some(tick) = stream.next().await {
        println!("Tick em: {:?}", tick);
    }

    println!("Stream finalizado!");
}
```

O `IntervalStream` emite um item a cada intervalo. O `.take(5)` limita a 5 elementos — exatamente como faria com um iterador síncrono.

### Stream a partir de um canal

Canais [mpsc do Tokio](/stdlib/channels/) se integram naturalmente com streams:

```rust
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::StreamExt;

#[tokio::main]
async fn main() {
    let (tx, rx) = mpsc::channel(100);
    let mut stream = ReceiverStream::new(rx);

    // Produtor: simula dados chegando de uma API
    tokio::spawn(async move {
        for i in 1..=10 {
            tx.send(format!("evento_{}", i)).await.unwrap();
            tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
        }
    });

    // Consumidor: processa cada evento conforme chega
    while let Some(evento) = stream.next().await {
        println!("Recebido: {}", evento);
    }
}
```

Esse padrão é a base de praticamente qualquer pipeline de processamento em tempo real: um produtor envia dados e o consumidor processa sob demanda.

## Transformando Streams com StreamExt

A trait `StreamExt` (de `tokio-stream` ou `futures`) adiciona combinadores poderosos, análogos aos de [iteradores](/stdlib/iterator/):

```rust
use tokio_stream::StreamExt;
use tokio_stream::iter;

#[tokio::main]
async fn main() {
    let dados = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];

    let resultado: Vec<i32> = iter(dados)
        .filter(|x| x % 2 == 0)      // só pares
        .map(|x| x * x)               // eleva ao quadrado
        .take(3)                       // apenas 3 primeiros
        .collect()
        .await;

    println!("Resultado: {:?}", resultado); // [4, 16, 36]
}
```

### Combinadores mais úteis

| Combinador | Descrição |
|-----------|-----------|
| `.map(f)` | Transforma cada item |
| `.filter(f)` | Filtra itens por predicado |
| `.take(n)` | Limita a N itens |
| `.skip(n)` | Pula os N primeiros |
| `.throttle(dur)` | Emite no máximo 1 item por intervalo |
| `.timeout(dur)` | Erro se nenhum item chegar no prazo |
| `.merge(outro)` | Combina dois streams em um |
| `.chain(outro)` | Concatena dois streams |

## Criando Streams Customizados com async_stream

Para streams com lógica mais complexa, o crate `async-stream` permite usar a macro `stream!` com sintaxe natural:

```rust
use async_stream::stream;
use tokio_stream::StreamExt;
use std::time::Duration;

fn dados_sensor(sensor_id: &'static str) -> impl tokio_stream::Stream<Item = f64> {
    stream! {
        let mut leitura = 20.0;
        loop {
            // Simula leitura de sensor com variação
            leitura += (rand::random::<f64>() - 0.5) * 2.0;
            yield leitura;
            tokio::time::sleep(Duration::from_millis(500)).await;
        }
    }
}

#[tokio::main]
async fn main() {
    let mut stream = dados_sensor("temp_01").take(10);

    while let Some(temp) = stream.next().await {
        println!("Temperatura: {:.1}°C", temp);
    }
}
```

A palavra-chave `yield` dentro de `stream!` emite um item, e o stream suspende automaticamente nos pontos `.await`.

## Exemplo Prático: Pipeline de Logs em Tempo Real

Vamos montar um pipeline realista que lê logs, filtra por severidade e agrega métricas:

```rust
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::StreamExt;
use std::collections::HashMap;

#[derive(Debug, Clone)]
struct LogEntry {
    nivel: String,
    mensagem: String,
    timestamp: u64,
}

#[tokio::main]
async fn main() {
    let (tx, rx) = mpsc::channel::<LogEntry>(1000);
    let stream = ReceiverStream::new(rx);

    // Produtor: simula logs chegando
    tokio::spawn(async move {
        let niveis = ["INFO", "WARN", "ERROR", "DEBUG", "ERROR"];
        for (i, nivel) in niveis.iter().cycle().take(50).enumerate() {
            let entry = LogEntry {
                nivel: nivel.to_string(),
                mensagem: format!("Evento #{} processado", i),
                timestamp: i as u64,
            };
            if tx.send(entry).await.is_err() {
                break;
            }
            tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
        }
    });

    // Pipeline: filtra erros e acumula contagem
    let mut contagem: HashMap<String, usize> = HashMap::new();

    let mut stream = stream
        .filter(|log| log.nivel == "ERROR" || log.nivel == "WARN")
        .take(20);

    while let Some(log) = stream.next().await {
        *contagem.entry(log.nivel.clone()).or_insert(0) += 1;
        println!("[{}] {}", log.nivel, log.mensagem);
    }

    println!("\n--- Resumo ---");
    for (nivel, total) in &contagem {
        println!("{}: {} ocorrências", nivel, total);
    }
}
```

Esse padrão se aplica diretamente a cenários de produção: ingestão de métricas, monitoramento de infraestrutura, processamento de eventos de [microsserviços](/artigos/rust-para-microsservicos/).

## Streams vs Channels vs Iteradores: Quando Usar Cada Um

- **Iterador síncrono**: dados já estão na memória, processamento CPU-bound
- **Canal (mpsc)**: comunicação entre tasks, quando produtor e consumidor têm lógica separada
- **Async stream**: processamento sequencial de dados assíncronos com transformações compostas (filter, map, throttle)

Na prática, canais e streams se complementam: você cria um canal para receber dados e envolve o `Receiver` em um `ReceiverStream` para usar os combinadores.

## Tratamento de Erros em Streams

Streams podem emitir `Result` como item, e existem combinadores específicos:

```rust
use tokio_stream::StreamExt;
use tokio_stream::iter;

#[tokio::main]
async fn main() {
    let dados: Vec<Result<i32, String>> = vec![
        Ok(1), Ok(2), Err("falha na rede".into()), Ok(4), Ok(5),
    ];

    // Processar apenas os Ok, parando no primeiro erro
    let mut stream = iter(dados);

    while let Some(item) = stream.next().await {
        match item {
            Ok(valor) => println!("Processado: {}", valor),
            Err(e) => {
                eprintln!("Erro no stream: {}. Reconectando...", e);
                break;
            }
        }
    }
}
```

Para cenários de produção, combine com [tratamento de erros robusto usando thiserror e anyhow](/blog/tratamento-erros-rust-thiserror-anyhow/).

## Performance: Backpressure e Buffering

Async streams no Tokio têm **backpressure natural**: se o consumidor está lento, o produtor automaticamente suspende. Isso é uma vantagem enorme sobre modelos push-based.

Para ajustar performance:

```rust
use tokio_stream::StreamExt;

// Buffer: acumula itens antes de processar em lote
let mut chunks = stream.chunks_timeout(100, Duration::from_secs(5));

while let Some(lote) = chunks.next().await {
    processar_lote(&lote).await;
}
```

O `chunks_timeout` acumula até 100 itens **ou** espera no máximo 5 segundos — o que vier primeiro. Perfeito para batch inserts em bancos de dados como [PostgreSQL](/tutoriais/rust-postgresql/).

## Comparação com Outras Linguagens

Se você vem de <a href="https://golang.com.br/" target="_blank" rel="noopener" onclick="umami.track('portfolio-site-click', { destination: 'golang.com.br' })">Go</a>, async streams são como channels com a ergonomia de range loops mais combinadores funcionais. Em Go você faria `for msg := range ch`, em Rust é `while let Some(msg) = stream.next().await` — mas com `.filter()`, `.map()`, `.throttle()` compostos.

Em <a href="https://python.dev.br/" target="_blank" rel="noopener" onclick="umami.track('portfolio-site-click', { destination: 'python.dev.br' })">Python</a>, o equivalente são async generators (`async for item in gen`). A diferença é que Rust garante zero-cost abstractions — sem overhead de runtime para cada yield.

E em <a href="https://kotlin.dev.br/" target="_blank" rel="noopener" onclick="umami.track('portfolio-site-click', { destination: 'kotlin.dev.br' })">Kotlin</a>, o conceito mais próximo são Flows do coroutines, que também suportam backpressure e combinadores. O modelo é bastante similar ao Rust.

## Conclusão

Async streams são a ferramenta certa quando você precisa processar sequências de dados assíncronos com composição elegante. Em 2026, o ecossistema está maduro:

- **tokio-stream** para adaptadores e wrappers
- **async-stream** para criar streams com `yield`
- **StreamExt** para combinadores poderosos

Combinados com o runtime do [Tokio](/ecossistema/tokio/) e o [sistema de tipos do Rust](/tutoriais/traits-generics/), você consegue pipelines de dados em tempo real que são seguros, rápidos e legíveis.

## Leia Também

- [Async/Await em Profundidade](/artigos/async-await-profundidade/) — fundamentos de programação assíncrona em Rust
- [Tokio: Guia Completo](/artigos/tokio-guia-completo/) — o runtime assíncrono padrão
- [Async Rust: Ecossistema em 2026](/blog/async-rust-ecossistema-2026/) — panorama completo do async em Rust
- [Rayon e Paralelismo de Dados](/blog/rayon-paralelismo-dados-rust/) — quando usar paralelismo em vez de async
- <a href="https://ziglang.com.br/" target="_blank" rel="noopener" onclick="umami.track('portfolio-site-click', { destination: 'ziglang.com.br' })">Zig Brasil</a> — outra linguagem de sistemas com abordagem diferente para async I/O
