Banco de Dados Key-Value em Rust

Construa um banco de dados key-value persistente em Rust com write-ahead log, compactação periódica, protocolo binário e serialização com serde.

Bancos de dados key-value são a base de sistemas como Redis, LevelDB e RocksDB. Neste projeto, vamos construir um banco de dados key-value persistente do zero em Rust, implementando um write-ahead log (WAL) para durabilidade, compactação periódica para gerenciar o espaço em disco e um protocolo de comandos para interação. Usaremos serde para serialização eficiente dos dados.

Este projeto ensina conceitos fundamentais de sistemas de armazenamento: durabilidade, consistência, gerenciamento de arquivos e design de protocolos — tudo com a segurança de memória que Rust oferece.

O Que Vamos Construir

Um banco de dados key-value com as seguintes funcionalidades:

  • Comandos: SET, GET, DELETE, LIST, COUNT
  • Persistência via write-ahead log (WAL)
  • Compactação automática do log
  • Serialização com serde e bincode
  • Interface REPL para interação direta
  • Tratamento de erros robusto com Result
  • Operações thread-safe com Mutex

Estrutura do Projeto

database-kv/
├── Cargo.toml
└── src/
    ├── main.rs
    ├── motor.rs
    ├── wal.rs
    └── protocolo.rs

Configurando o Projeto

cargo new database-kv
cd database-kv
[package]
name = "database-kv"
version = "0.1.0"
edition = "2021"

[dependencies]
serde = { version = "1", features = ["derive"] }
bincode = "1"

Passo 1: O Protocolo de Comandos

Definimos os comandos que nosso banco de dados aceita e as respostas que ele retorna. A serialização com serde permite gravar essas estruturas no disco de forma eficiente.

Crie o arquivo src/protocolo.rs:

use serde::{Deserialize, Serialize};

/// Comando enviado ao banco de dados
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum Comando {
    /// Armazena ou atualiza um valor
    Set { chave: String, valor: String },
    /// Recupera um valor pela chave
    Get { chave: String },
    /// Remove uma entrada
    Delete { chave: String },
    /// Lista todas as chaves
    List,
    /// Conta o número de entradas
    Count,
}

/// Resposta do banco de dados
#[derive(Debug)]
pub enum Resposta {
    Ok,
    Valor(String),
    NaoEncontrado,
    Lista(Vec<String>),
    Contagem(usize),
    Erro(String),
}

impl std::fmt::Display for Resposta {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            Resposta::Ok => write!(f, "OK"),
            Resposta::Valor(v) => write!(f, "\"{}\"", v),
            Resposta::NaoEncontrado => write!(f, "(nil)"),
            Resposta::Lista(chaves) => {
                if chaves.is_empty() {
                    write!(f, "(lista vazia)")
                } else {
                    for (i, chave) in chaves.iter().enumerate() {
                        if i > 0 {
                            writeln!(f)?;
                        }
                        write!(f, "  {}) \"{}\"", i + 1, chave)?;
                    }
                    Ok(())
                }
            }
            Resposta::Contagem(n) => write!(f, "(integer) {}", n),
            Resposta::Erro(msg) => write!(f, "(erro) {}", msg),
        }
    }
}

/// Analisa uma linha de texto e retorna o comando correspondente
pub fn analisar_comando(linha: &str) -> Result<Comando, String> {
    let partes: Vec<&str> = linha.trim().splitn(3, ' ').collect();

    if partes.is_empty() {
        return Err("Comando vazio".to_string());
    }

    match partes[0].to_uppercase().as_str() {
        "SET" => {
            if partes.len() < 3 {
                Err("Uso: SET <chave> <valor>".to_string())
            } else {
                Ok(Comando::Set {
                    chave: partes[1].to_string(),
                    valor: partes[2].to_string(),
                })
            }
        }
        "GET" => {
            if partes.len() < 2 {
                Err("Uso: GET <chave>".to_string())
            } else {
                Ok(Comando::Get {
                    chave: partes[1].to_string(),
                })
            }
        }
        "DEL" | "DELETE" => {
            if partes.len() < 2 {
                Err("Uso: DEL <chave>".to_string())
            } else {
                Ok(Comando::Delete {
                    chave: partes[1].to_string(),
                })
            }
        }
        "LIST" | "KEYS" => Ok(Comando::List),
        "COUNT" | "DBSIZE" => Ok(Comando::Count),
        outro => Err(format!("Comando desconhecido: '{}'", outro)),
    }
}

Passo 2: O Write-Ahead Log (WAL)

O WAL garante durabilidade: antes de qualquer modificação na memória, gravamos a operação em disco. Se o programa falhar, podemos reconstruir o estado a partir do log. A compactação remove entradas obsoletas.

Crie o arquivo src/wal.rs:

use crate::protocolo::Comando;
use std::collections::HashMap;
use std::fs::{self, File, OpenOptions};
use std::io::{self, BufReader, BufWriter, Read, Write};
use std::path::{Path, PathBuf};

/// Entrada serializada no WAL
#[derive(serde::Serialize, serde::Deserialize, Debug)]
enum EntradaWal {
    Set { chave: String, valor: String },
    Delete { chave: String },
}

/// Gerenciador do Write-Ahead Log
pub struct Wal {
    caminho: PathBuf,
    escritor: BufWriter<File>,
    total_entradas: usize,
}

impl Wal {
    /// Abre ou cria um WAL no caminho especificado
    pub fn abrir(caminho: &Path) -> io::Result<Self> {
        let arquivo = OpenOptions::new()
            .create(true)
            .append(true)
            .open(caminho)?;

        let total_entradas = Self::contar_entradas(caminho)?;

        Ok(Self {
            caminho: caminho.to_path_buf(),
            escritor: BufWriter::new(arquivo),
            total_entradas,
        })
    }

    /// Registra um comando de escrita no WAL
    pub fn registrar(&mut self, comando: &Comando) -> io::Result<()> {
        let entrada = match comando {
            Comando::Set { chave, valor } => EntradaWal::Set {
                chave: chave.clone(),
                valor: valor.clone(),
            },
            Comando::Delete { chave } => EntradaWal::Delete {
                chave: chave.clone(),
            },
            _ => return Ok(()), // Operações de leitura não são registradas
        };

        let bytes = bincode::serialize(&entrada)
            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;

        // Formato: [tamanho: u32][dados: bytes]
        let tamanho = bytes.len() as u32;
        self.escritor.write_all(&tamanho.to_le_bytes())?;
        self.escritor.write_all(&bytes)?;
        self.escritor.flush()?;

        self.total_entradas += 1;
        Ok(())
    }

    /// Reconstrói o estado a partir do WAL
    pub fn reconstruir(caminho: &Path) -> io::Result<HashMap<String, String>> {
        let mut mapa = HashMap::new();

        if !caminho.exists() {
            return Ok(mapa);
        }

        let arquivo = File::open(caminho)?;
        let mut leitor = BufReader::new(arquivo);

        loop {
            // Lê o tamanho da próxima entrada
            let mut buf_tamanho = [0u8; 4];
            match leitor.read_exact(&mut buf_tamanho) {
                Ok(()) => {}
                Err(ref e) if e.kind() == io::ErrorKind::UnexpectedEof => break,
                Err(e) => return Err(e),
            }

            let tamanho = u32::from_le_bytes(buf_tamanho) as usize;

            // Lê os dados da entrada
            let mut buf_dados = vec![0u8; tamanho];
            leitor.read_exact(&mut buf_dados)?;

            let entrada: EntradaWal = bincode::deserialize(&buf_dados)
                .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;

            match entrada {
                EntradaWal::Set { chave, valor } => {
                    mapa.insert(chave, valor);
                }
                EntradaWal::Delete { chave } => {
                    mapa.remove(&chave);
                }
            }
        }

        Ok(mapa)
    }

    /// Compacta o WAL, removendo entradas obsoletas
    pub fn compactar(&mut self, estado_atual: &HashMap<String, String>) -> io::Result<()> {
        let caminho_temp = self.caminho.with_extension("tmp");

        {
            let arquivo_temp = File::create(&caminho_temp)?;
            let mut escritor_temp = BufWriter::new(arquivo_temp);

            // Grava apenas o estado final de cada chave
            for (chave, valor) in estado_atual {
                let entrada = EntradaWal::Set {
                    chave: chave.clone(),
                    valor: valor.clone(),
                };
                let bytes = bincode::serialize(&entrada)
                    .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
                let tamanho = bytes.len() as u32;
                escritor_temp.write_all(&tamanho.to_le_bytes())?;
                escritor_temp.write_all(&bytes)?;
            }
            escritor_temp.flush()?;
        }

        // Substitui o WAL antigo pelo compactado
        fs::rename(&caminho_temp, &self.caminho)?;

        // Reabre o arquivo para append
        let arquivo = OpenOptions::new()
            .append(true)
            .open(&self.caminho)?;
        self.escritor = BufWriter::new(arquivo);
        self.total_entradas = estado_atual.len();

        Ok(())
    }

    /// Retorna o número total de entradas no WAL
    pub fn total_entradas(&self) -> usize {
        self.total_entradas
    }

    fn contar_entradas(caminho: &Path) -> io::Result<usize> {
        if !caminho.exists() {
            return Ok(0);
        }

        let arquivo = File::open(caminho)?;
        let mut leitor = BufReader::new(arquivo);
        let mut contagem = 0;

        loop {
            let mut buf_tamanho = [0u8; 4];
            match leitor.read_exact(&mut buf_tamanho) {
                Ok(()) => {}
                Err(ref e) if e.kind() == io::ErrorKind::UnexpectedEof => break,
                Err(e) => return Err(e),
            }
            let tamanho = u32::from_le_bytes(buf_tamanho) as usize;
            let mut buf = vec![0u8; tamanho];
            leitor.read_exact(&mut buf)?;
            contagem += 1;
        }

        Ok(contagem)
    }
}

Passo 3: O Motor do Banco de Dados

O motor combina o HashMap em memória com o WAL, garantindo que os dados sejam persistidos e que a compactação ocorra automaticamente quando o log fica muito grande.

Crie o arquivo src/motor.rs:

use crate::protocolo::{Comando, Resposta};
use crate::wal::Wal;
use std::collections::HashMap;
use std::path::Path;
use std::sync::Mutex;

/// Limite de entradas no WAL antes de compactar
const LIMITE_COMPACTACAO: usize = 100;

/// Motor principal do banco de dados
pub struct Motor {
    dados: Mutex<HashMap<String, String>>,
    wal: Mutex<Wal>,
}

impl Motor {
    /// Abre o banco de dados, reconstruindo o estado a partir do WAL
    pub fn abrir(caminho_wal: &Path) -> Result<Self, String> {
        // Reconstrói o estado a partir do WAL existente
        let dados = Wal::reconstruir(caminho_wal)
            .map_err(|e| format!("Erro ao reconstruir WAL: {}", e))?;

        let num_chaves = dados.len();

        let wal = Wal::abrir(caminho_wal)
            .map_err(|e| format!("Erro ao abrir WAL: {}", e))?;

        if num_chaves > 0 {
            println!(
                "[Motor] {} chaves recuperadas do WAL ({} entradas no log)",
                num_chaves,
                wal.total_entradas()
            );
        }

        Ok(Self {
            dados: Mutex::new(dados),
            wal: Mutex::new(wal),
        })
    }

    /// Processa um comando e retorna a resposta
    pub fn processar(&self, comando: Comando) -> Resposta {
        match comando {
            Comando::Set { ref chave, ref valor } => {
                // Primeiro grava no WAL (durabilidade)
                if let Err(e) = self.wal.lock().unwrap().registrar(&comando) {
                    return Resposta::Erro(format!("Falha no WAL: {}", e));
                }

                // Depois atualiza a memória
                self.dados
                    .lock()
                    .unwrap()
                    .insert(chave.clone(), valor.clone());

                self.verificar_compactacao();
                Resposta::Ok
            }

            Comando::Get { ref chave } => {
                let dados = self.dados.lock().unwrap();
                match dados.get(chave) {
                    Some(valor) => Resposta::Valor(valor.clone()),
                    None => Resposta::NaoEncontrado,
                }
            }

            Comando::Delete { ref chave } => {
                // Grava no WAL
                if let Err(e) = self.wal.lock().unwrap().registrar(&comando) {
                    return Resposta::Erro(format!("Falha no WAL: {}", e));
                }

                let mut dados = self.dados.lock().unwrap();
                if dados.remove(chave).is_some() {
                    drop(dados);
                    self.verificar_compactacao();
                    Resposta::Ok
                } else {
                    Resposta::NaoEncontrado
                }
            }

            Comando::List => {
                let dados = self.dados.lock().unwrap();
                let mut chaves: Vec<String> = dados.keys().cloned().collect();
                chaves.sort();
                Resposta::Lista(chaves)
            }

            Comando::Count => {
                let dados = self.dados.lock().unwrap();
                Resposta::Contagem(dados.len())
            }
        }
    }

    /// Verifica se é necessário compactar o WAL
    fn verificar_compactacao(&self) {
        let mut wal = self.wal.lock().unwrap();
        if wal.total_entradas() >= LIMITE_COMPACTACAO {
            let dados = self.dados.lock().unwrap();
            println!(
                "[Motor] Compactando WAL ({} entradas -> {} chaves)...",
                wal.total_entradas(),
                dados.len()
            );
            if let Err(e) = wal.compactar(&dados) {
                eprintln!("[Motor] Erro na compactação: {}", e);
            } else {
                println!("[Motor] Compactação concluída.");
            }
        }
    }
}

Passo 4: A Interface REPL (main.rs)

mod motor;
mod protocolo;
mod wal;

use std::io::{self, BufRead, Write};
use std::path::Path;

fn main() {
    println!("=== Banco de Dados Key-Value em Rust ===");
    println!("Comandos: SET, GET, DEL, LIST, COUNT");
    println!("Digite 'sair' para encerrar.\n");

    let caminho_wal = Path::new("dados.wal");

    let banco = match motor::Motor::abrir(caminho_wal) {
        Ok(m) => m,
        Err(e) => {
            eprintln!("Falha ao iniciar o banco: {}", e);
            return;
        }
    };

    let stdin = io::stdin();

    loop {
        print!("kv> ");
        io::stdout().flush().unwrap();

        let mut linha = String::new();
        if stdin.lock().read_line(&mut linha).unwrap() == 0 {
            break;
        }
        let entrada = linha.trim();

        if entrada.is_empty() {
            continue;
        }

        if entrada == "sair" || entrada == "exit" {
            println!("Até logo!");
            break;
        }

        match protocolo::analisar_comando(entrada) {
            Ok(comando) => {
                let resposta = banco.processar(comando);
                println!("{}", resposta);
            }
            Err(e) => eprintln!("Erro: {}", e),
        }
    }
}

Como Executar

# Compilar e executar
cargo run

# Sessão de exemplo:
kv> SET nome Rust
OK

kv> SET versao 1.75
OK

kv> SET linguagem sistemas
OK

kv> GET nome
"Rust"

kv> LIST
  1) "linguagem"
  2) "nome"
  3) "versao"

kv> COUNT
(integer) 3

kv> DEL versao
OK

kv> GET versao
(nil)

kv> sair
Até logo!

# Reiniciando — os dados persistem!
cargo run
[Motor] 2 chaves recuperadas do WAL (3 entradas no log)

kv> GET nome
"Rust"

# Executar os testes
cargo test

Desafios para Expandir

  1. TTL (Time To Live): Adicione suporte a expiração de chaves com SET chave valor EX 60 (expira em 60 segundos), verificando a validade no GET.
  2. Servidor TCP: Transforme o REPL em um servidor TCP que aceita múltiplas conexões simultâneas usando tokio, permitindo clientes remotos.
  3. Tipos de dados: Além de strings, suporte listas (LPUSH, RPUSH, LRANGE) e conjuntos (SADD, SMEMBERS), como no Redis.
  4. Snapshots: Implemente snapshots periódicos do estado completo do banco em um formato binário compacto para recuperação rápida.
  5. Transações: Adicione suporte a MULTI/EXEC para agrupar comandos em transações atômicas, garantindo que todos sejam aplicados ou nenhum.

Veja Também