Rayon: Paralelismo de Dados em Rust

Guia completo do Rayon para paralelismo de dados em Rust. Aprenda sobre iteradores paralelos, par_iter(), thread pools customizados, join() e quando usar Rayon vs Tokio.

O Rayon é a crate de paralelismo de dados mais popular do Rust, permitindo transformar código sequencial em paralelo com mudanças mínimas. Sua proposta é simples e poderosa: substitua .iter() por .par_iter() e o Rayon distribui automaticamente o trabalho entre todas as CPUs disponíveis usando uma estratégia de work-stealing.

Diferente do Tokio, que é focado em concorrência de I/O (muitas operações esperando respostas externas), o Rayon é otimizado para paralelismo computacional — dividir cálculos pesados entre múltiplas threads para terminar mais rápido. Se você tem dados que precisam ser processados, transformados, filtrados ou agregados, e o trabalho é CPU-bound, o Rayon é a ferramenta certa.

A beleza do Rayon está na simplicidade: ele utiliza o sistema de ownership do Rust para garantir data-race freedom em tempo de compilação, permitindo paralelismo seguro sem locks, sem race conditions e sem bugs de concorrência.

Instalação

Adicione o Rayon ao Cargo.toml:

[dependencies]
rayon = "1.10"

Uso Básico

Iteradores Paralelos

use rayon::prelude::*;

fn main() {
    // Sequencial
    let soma_seq: i64 = (0..1_000_000i64).sum();

    // Paralelo - mesma lógica, múltiplas threads
    let soma_par: i64 = (0..1_000_000i64).into_par_iter().sum();

    assert_eq!(soma_seq, soma_par);
    println!("Soma: {}", soma_par);
}

par_iter() vs iter()

use rayon::prelude::*;

fn main() {
    let numeros: Vec<i64> = (0..10_000_000).collect();

    // Sequencial
    let resultado_seq: Vec<i64> = numeros
        .iter()
        .filter(|&&n| n % 2 == 0)
        .map(|&n| n * n)
        .collect();

    // Paralelo - troque .iter() por .par_iter()
    let resultado_par: Vec<i64> = numeros
        .par_iter()
        .filter(|&&n| n % 2 == 0)
        .map(|&n| n * n)
        .collect();

    assert_eq!(resultado_seq.len(), resultado_par.len());
    println!("Processados {} números", resultado_par.len());

    // Diferentes métodos de criação
    let v = vec![1, 2, 3, 4, 5];

    // Iterador paralelo por referência
    let soma: i32 = v.par_iter().sum();

    // Iterador paralelo que consome o vetor
    let quadrados: Vec<i32> = v.into_par_iter().map(|n| n * n).collect();

    // Iterador paralelo mutável
    let mut dados = vec![1, 2, 3, 4, 5];
    dados.par_iter_mut().for_each(|n| *n *= 2);
    println!("Duplicados: {:?}", dados);
}

Operações Comuns com Iteradores Paralelos

use rayon::prelude::*;

fn main() {
    let dados: Vec<f64> = (0..1_000_000).map(|i| i as f64 * 0.5).collect();

    // Map e collect
    let processados: Vec<f64> = dados.par_iter()
        .map(|&x| x.sin() + x.cos())
        .collect();

    // Filter e collect
    let positivos: Vec<&f64> = dados.par_iter()
        .filter(|&&x| x > 0.0)
        .collect();

    // Reduce (fold paralelo)
    let soma: f64 = dados.par_iter()
        .copied()
        .reduce(|| 0.0, |a, b| a + b);

    // Sum (atalho para reduce com soma)
    let soma2: f64 = dados.par_iter().sum();

    // Find (encontrar primeiro que satisfaz condição)
    let encontrado = dados.par_iter()
        .find_any(|&&x| x > 999.0);
    println!("Encontrado: {:?}", encontrado);

    // Any/All
    let tem_negativo = dados.par_iter().any(|&x| x < 0.0);
    let todos_finitos = dados.par_iter().all(|x| x.is_finite());

    // For each (executar efeito para cada elemento)
    dados.par_iter()
        .enumerate()
        .for_each(|(i, &valor)| {
            if i % 100_000 == 0 {
                println!("Processando índice {}: {}", i, valor);
            }
        });

    // Flat map
    let listas: Vec<Vec<i32>> = vec![vec![1, 2], vec![3, 4], vec![5, 6]];
    let achatado: Vec<i32> = listas.par_iter()
        .flat_map(|lista| lista.par_iter().copied())
        .collect();
    println!("Achatado: {:?}", achatado);

    // Fold (como reduce, mas com tipo de acumulador diferente)
    let contagem_por_faixa = dados.par_iter()
        .fold(
            || [0usize; 10],  // acumulador inicial por thread
            |mut acc, &valor| {
                let faixa = (valor / 50000.0).min(9.0) as usize;
                acc[faixa] += 1;
                acc
            },
        )
        .reduce(
            || [0usize; 10],
            |mut a, b| {
                for i in 0..10 {
                    a[i] += b[i];
                }
                a
            },
        );
    println!("Distribuição por faixa: {:?}", contagem_por_faixa);
}

Sorting Paralelo

use rayon::prelude::*;

fn main() {
    let mut numeros: Vec<i64> = (0..10_000_000).rev().collect();

    // Sorting paralelo estável
    numeros.par_sort();

    // Sorting paralelo instável (mais rápido)
    numeros.par_sort_unstable();

    // Sorting por chave
    let mut pessoas = vec![
        ("Maria", 28),
        ("João", 35),
        ("Ana", 22),
        ("Pedro", 31),
    ];
    pessoas.par_sort_by_key(|&(_, idade)| idade);
    println!("Ordenados por idade: {:?}", pessoas);

    // Sorting com comparador customizado
    let mut textos = vec!["banana", "abacaxi", "caju", "damasco"];
    textos.par_sort_by(|a, b| a.len().cmp(&b.len()));
    println!("Ordenados por tamanho: {:?}", textos);
}

Recursos Avançados

join() - Fork-Join Parallelism

use rayon;

fn fibonacci(n: u64) -> u64 {
    if n <= 1 {
        return n;
    }

    // Para valores grandes, calcular em paralelo
    if n > 20 {
        let (a, b) = rayon::join(
            || fibonacci(n - 1),
            || fibonacci(n - 2),
        );
        a + b
    } else {
        // Para valores pequenos, calcular sequencialmente
        // (evita overhead de scheduling para tarefas pequenas)
        fibonacci(n - 1) + fibonacci(n - 2)
    }
}

fn main() {
    let resultado = fibonacci(40);
    println!("fibonacci(40) = {}", resultado);
}

// Quicksort paralelo usando join
fn quicksort_paralelo<T: Ord + Send>(dados: &mut [T]) {
    if dados.len() <= 32 {
        // Insertion sort para arrays pequenos
        dados.sort();
        return;
    }

    let pivot = particionar(dados);
    let (esquerda, direita) = dados.split_at_mut(pivot);

    // Ordenar as duas metades em paralelo
    rayon::join(
        || quicksort_paralelo(esquerda),
        || quicksort_paralelo(&mut direita[1..]),
    );
}

fn particionar<T: Ord>(dados: &mut [T]) -> usize {
    let len = dados.len();
    let pivot_idx = len / 2;
    dados.swap(pivot_idx, len - 1);

    let mut i = 0;
    for j in 0..len - 1 {
        if dados[j] <= dados[len - 1] {
            dados.swap(i, j);
            i += 1;
        }
    }
    dados.swap(i, len - 1);
    i
}

Thread Pool Customizado

use rayon::prelude::*;
use rayon::ThreadPoolBuilder;

fn main() {
    // Pool global customizado (chamar uma vez no início do programa)
    ThreadPoolBuilder::new()
        .num_threads(4)                        // 4 threads
        .thread_name(|idx| format!("worker-{}", idx))
        .stack_size(8 * 1024 * 1024)          // 8MB de stack
        .build_global()
        .unwrap();

    // Agora par_iter() usa esse pool
    let resultado: i64 = (0..1_000_000i64).into_par_iter().sum();
    println!("Resultado: {}", resultado);

    // Pool local (para isolar trabalho)
    let pool = ThreadPoolBuilder::new()
        .num_threads(2)
        .thread_name(|idx| format!("custom-{}", idx))
        .build()
        .unwrap();

    // Executar trabalho no pool específico
    let resultado = pool.install(|| {
        (0..1_000_000i64).into_par_iter().sum::<i64>()
    });
    println!("Pool customizado: {}", resultado);

    // Pool com panic handler
    let pool = ThreadPoolBuilder::new()
        .num_threads(4)
        .panic_handler(|_| {
            eprintln!("Uma thread do Rayon deu panic!");
        })
        .build()
        .unwrap();
}

Scope - Paralelismo com Referências Locais

use rayon;

fn main() {
    let mut dados = vec![1, 2, 3, 4, 5, 6, 7, 8];
    let mut resultados_a = Vec::new();
    let mut resultados_b = Vec::new();

    // scope permite referenciar variáveis locais em tasks paralelas
    rayon::scope(|s| {
        let (esquerda, direita) = dados.split_at(4);

        s.spawn(|_| {
            // Pode referenciar 'esquerda' emprestada
            resultados_a.extend(esquerda.iter().map(|&x| x * 2));
        });

        s.spawn(|_| {
            // Pode referenciar 'direita' emprestada
            resultados_b.extend(direita.iter().map(|&x| x * 3));
        });
    }); // Bloqueia até todas as tasks completarem

    println!("A: {:?}", resultados_a);
    println!("B: {:?}", resultados_b);
}

Chunks Paralelos

use rayon::prelude::*;

fn main() {
    let dados: Vec<i32> = (0..1000).collect();

    // Processar em chunks paralelos
    let resultados: Vec<i32> = dados
        .par_chunks(100)  // Dividir em chunks de 100
        .flat_map(|chunk| {
            // Cada chunk é processado por uma thread
            chunk.iter()
                .map(|&x| x * x + 1)
                .collect::<Vec<_>>()
        })
        .collect();

    println!("Processados {} resultados", resultados.len());

    // Chunks mutáveis
    let mut buffer = vec![0u8; 1024];
    buffer.par_chunks_mut(256).enumerate().for_each(|(i, chunk)| {
        // Preencher cada chunk com um padrão
        for byte in chunk.iter_mut() {
            *byte = i as u8;
        }
    });
    println!("Buffer preenchido: {:?}", &buffer[..10]);
}

Par Bridge - Converter Iteradores Sequenciais

use rayon::prelude::*;
use rayon::iter::ParallelBridge;

fn main() {
    // Converter um iterador sequencial qualquer em paralelo
    let resultado: Vec<String> = (0..1000)
        .par_bridge()  // Converte Iterator -> ParallelIterator
        .map(|n| format!("item_{}", n))
        .collect();

    println!("Total: {}", resultado.len());

    // Útil para iteradores que não implementam IntoParallelIterator
    use std::collections::BTreeMap;
    let mut mapa = BTreeMap::new();
    for i in 0..100 {
        mapa.insert(format!("chave_{}", i), i * 2);
    }

    let soma: i32 = mapa.iter()
        .par_bridge()
        .map(|(_, &v)| v)
        .sum();
    println!("Soma dos valores: {}", soma);
}

Boas Práticas

1. Meça Antes de Paralelizar

use rayon::prelude::*;
use std::time::Instant;

fn benchmark_sequencial_vs_paralelo() {
    let dados: Vec<f64> = (0..10_000_000).map(|i| i as f64).collect();

    // Medir sequencial
    let inicio = Instant::now();
    let _: Vec<f64> = dados.iter().map(|x| x.sqrt().sin().cos()).collect();
    let tempo_seq = inicio.elapsed();

    // Medir paralelo
    let inicio = Instant::now();
    let _: Vec<f64> = dados.par_iter().map(|x| x.sqrt().sin().cos()).collect();
    let tempo_par = inicio.elapsed();

    println!("Sequencial: {:?}", tempo_seq);
    println!("Paralelo:   {:?}", tempo_par);
    println!("Speedup:    {:.2}x", tempo_seq.as_secs_f64() / tempo_par.as_secs_f64());
}

2. Evite Paralelismo para Dados Pequenos

use rayon::prelude::*;

fn processar(dados: &[i32]) -> Vec<i32> {
    // O overhead de scheduling do Rayon é ~1-10 microsegundos
    // Para poucos dados, sequencial é mais rápido
    if dados.len() < 10_000 {
        dados.iter().map(|&x| x * x).collect()
    } else {
        dados.par_iter().map(|&x| x * x).collect()
    }
}

3. Minimize Sincronização

use rayon::prelude::*;
use std::sync::Mutex;

fn main() {
    let dados: Vec<i32> = (0..1_000_000).collect();

    // RUIM: Mutex em cada iteração mata a performance
    let resultado_ruim = Mutex::new(Vec::new());
    dados.par_iter().for_each(|&x| {
        resultado_ruim.lock().unwrap().push(x * 2); // Lock a cada item!
    });

    // BOM: Usar collect() que combina resultados eficientemente
    let resultado_bom: Vec<i32> = dados.par_iter()
        .map(|&x| x * 2)
        .collect();

    // BOM: Usar fold + reduce para acumulação customizada
    let soma: i64 = dados.par_iter()
        .fold(|| 0i64, |acc, &x| acc + x as i64)
        .sum();
}

4. Use with_min_len para Controlar Granularidade

use rayon::prelude::*;

fn main() {
    let dados: Vec<f64> = (0..1_000_000).map(|i| i as f64).collect();

    // Garantir que cada thread processa pelo menos 1000 itens
    let resultado: Vec<f64> = dados.par_iter()
        .with_min_len(1000)  // Mínimo de itens por task
        .with_max_len(50000) // Máximo de itens por task
        .map(|&x| x.sqrt())
        .collect();
}

5. Prefira Operações Funcionais a Efeitos Colaterais

use rayon::prelude::*;

fn main() {
    let dados: Vec<i32> = (0..100).collect();

    // RUIM: efeitos colaterais com for_each
    // A ordem não é determinística!
    dados.par_iter().for_each(|x| {
        println!("{}", x); // Ordem aleatória!
    });

    // BOM: transformações puras com map + collect
    let resultados: Vec<String> = dados.par_iter()
        .map(|x| format!("Item: {}", x))
        .collect();

    // A ordem é preservada no collect!
    for r in &resultados[..5] {
        println!("{}", r);
    }
}

Exemplos Práticos

Exemplo 1: Processamento Paralelo de Imagens

use rayon::prelude::*;
use std::path::PathBuf;

/// Representa um pixel RGB
#[derive(Clone, Copy)]
struct Pixel {
    r: u8,
    g: u8,
    b: u8,
}

/// Imagem simples em memória
struct Imagem {
    largura: usize,
    altura: usize,
    pixels: Vec<Pixel>,
}

impl Imagem {
    fn nova(largura: usize, altura: usize) -> Self {
        Imagem {
            largura,
            altura,
            pixels: vec![Pixel { r: 0, g: 0, b: 0 }; largura * altura],
        }
    }

    /// Aplicar filtro de escala de cinza em paralelo
    fn escala_de_cinza(&mut self) {
        self.pixels.par_iter_mut().for_each(|pixel| {
            // Fórmula de luminância ITU-R BT.709
            let cinza = (0.2126 * pixel.r as f32
                + 0.7152 * pixel.g as f32
                + 0.0722 * pixel.b as f32) as u8;
            pixel.r = cinza;
            pixel.g = cinza;
            pixel.b = cinza;
        });
    }

    /// Aplicar brilho em paralelo
    fn ajustar_brilho(&mut self, fator: f32) {
        self.pixels.par_iter_mut().for_each(|pixel| {
            pixel.r = ((pixel.r as f32 * fator).min(255.0)) as u8;
            pixel.g = ((pixel.g as f32 * fator).min(255.0)) as u8;
            pixel.b = ((pixel.b as f32 * fator).min(255.0)) as u8;
        });
    }

    /// Calcular histograma em paralelo
    fn histograma(&self) -> [u32; 256] {
        self.pixels
            .par_iter()
            .fold(
                || [0u32; 256],
                |mut hist, pixel| {
                    let luminancia = ((pixel.r as u32 + pixel.g as u32 + pixel.b as u32) / 3) as usize;
                    hist[luminancia] += 1;
                    hist
                },
            )
            .reduce(
                || [0u32; 256],
                |mut a, b| {
                    for i in 0..256 {
                        a[i] += b[i];
                    }
                    a
                },
            )
    }

    /// Gerar Mandelbrot em paralelo por linhas
    fn gerar_mandelbrot(largura: usize, altura: usize, max_iter: u32) -> Self {
        let mut img = Imagem::nova(largura, altura);

        // Processar linhas em paralelo
        img.pixels
            .par_chunks_mut(largura)
            .enumerate()
            .for_each(|(y, linha)| {
                for (x, pixel) in linha.iter_mut().enumerate() {
                    let cx = (x as f64 / largura as f64) * 3.5 - 2.5;
                    let cy = (y as f64 / altura as f64) * 2.0 - 1.0;

                    let mut zx = 0.0f64;
                    let mut zy = 0.0f64;
                    let mut iter = 0u32;

                    while zx * zx + zy * zy <= 4.0 && iter < max_iter {
                        let tmp = zx * zx - zy * zy + cx;
                        zy = 2.0 * zx * zy + cy;
                        zx = tmp;
                        iter += 1;
                    }

                    // Colorir baseado no número de iterações
                    let t = iter as f32 / max_iter as f32;
                    pixel.r = (9.0 * (1.0 - t) * t * t * t * 255.0) as u8;
                    pixel.g = (15.0 * (1.0 - t) * (1.0 - t) * t * t * 255.0) as u8;
                    pixel.b = (8.5 * (1.0 - t) * (1.0 - t) * (1.0 - t) * t * 255.0) as u8;
                }
            });

        img
    }
}

fn main() {
    use std::time::Instant;

    let inicio = Instant::now();
    let img = Imagem::gerar_mandelbrot(4096, 2304, 1000);
    let tempo = inicio.elapsed();

    println!("Mandelbrot 4096x2304 gerado em {:?}", tempo);
    println!("Total de pixels: {}", img.pixels.len());

    let histograma = img.histograma();
    let pixel_mais_comum = histograma.iter()
        .enumerate()
        .max_by_key(|(_, &count)| count)
        .unwrap();
    println!("Luminância mais comum: {} ({} pixels)", pixel_mais_comum.0, pixel_mais_comum.1);
}

Exemplo 2: Processamento de Textos em Lote

use rayon::prelude::*;
use std::collections::HashMap;

/// Contar palavras em paralelo em múltiplos documentos
fn contar_palavras_paralelo(documentos: &[String]) -> HashMap<String, usize> {
    documentos
        .par_iter()
        .map(|doc| {
            // Contagem local por documento (sem sincronização)
            let mut contagem_local: HashMap<String, usize> = HashMap::new();
            for palavra in doc.split_whitespace() {
                let palavra = palavra.to_lowercase();
                let palavra = palavra.trim_matches(|c: char| !c.is_alphanumeric());
                if !palavra.is_empty() {
                    *contagem_local.entry(palavra.to_string()).or_insert(0) += 1;
                }
            }
            contagem_local
        })
        .reduce(
            HashMap::new,
            |mut acc, local| {
                // Combinar contagens de cada thread
                for (palavra, contagem) in local {
                    *acc.entry(palavra).or_insert(0) += contagem;
                }
                acc
            },
        )
}

fn main() {
    // Simular documentos
    let documentos: Vec<String> = (0..100)
        .map(|i| {
            format!(
                "Este é o documento número {}. Rust é uma linguagem de programação. \
                 O ecossistema Rust inclui Cargo, Clippy e muitas crates. \
                 Programação em Rust é segura e eficiente.",
                i
            )
        })
        .collect();

    let contagem = contar_palavras_paralelo(&documentos);

    // Top 10 palavras mais frequentes
    let mut palavras: Vec<_> = contagem.into_iter().collect();
    palavras.par_sort_by(|a, b| b.1.cmp(&a.1));

    println!("Top 10 palavras:");
    for (palavra, contagem) in palavras.iter().take(10) {
        println!("  {}: {}", palavra, contagem);
    }
}

Comparação com Alternativas

CaracterísticaRayonTokioCrossbeamstd::thread
FocoParalelismo de dadosConcorrência I/OPrimitivas de concorrênciaThreads de baixo nível
Caso de usoCPU-boundI/O-boundComunicação entre threadsControle total
FacilidadeMuito fácilModeradaModeradaDifícil
Work-stealingSimSimNãoNão
Iteradores paralelosSimNãoNãoNão
Fork-joinjoin()JoinSetscope()JoinHandle
OverheadMínimoModeradoMínimoNenhum
DeterminismoOrdem preservadaNão garantidoManualManual
Async/awaitNãoSimNãoNão

Quando usar cada um:

  • Rayon: Processar grandes volumes de dados (map, filter, sort, reduce)
  • Tokio: Servidores web, I/O de rede, operações assíncronas
  • Crossbeam: Comunicação entre threads, estruturas lock-free
  • std::thread: Controle fino de threads, quando outras abstrações não servem

Conclusão

O Rayon é a maneira mais ergonômica de adicionar paralelismo ao seu código Rust. Com uma mudança de .iter() para .par_iter(), você pode aproveitar todos os cores da CPU sem se preocupar com data races, deadlocks ou bugs de concorrência.

Pontos-chave para lembrar:

  • par_iter() para paralelizar iterações existentes
  • join() para fork-join de duas computações
  • scope() para paralelismo com referências locais
  • fold() + reduce() para acumulação eficiente sem locks
  • Meça sempre antes de paralelizar — nem tudo melhora com threads
  • Evite Mutex dentro de iteradores paralelos; use transformações funcionais
  • with_min_len() para controlar a granularidade das tasks

Para aprofundar, consulte a documentação do Rayon e o Rayon FAQ.

No próximo passo, aprenda sobre o Crossbeam para primitivas de concorrência avançadas em Rust.