Rustelo/server/src/database/migrations.rs
2025-07-07 23:05:19 +01:00

838 lines
28 KiB
Rust

//! Database-agnostic migration system
//!
//! This module provides a unified interface for database migrations
//! that works with both SQLite and PostgreSQL databases.
use super::{Database, DatabaseType};
use anyhow::Result;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use sqlx::Row;
use std::collections::HashMap;
use std::path::Path;
/// Migration status
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MigrationStatus {
pub version: i64,
pub name: String,
pub applied: bool,
pub applied_at: Option<chrono::DateTime<chrono::Utc>>,
}
/// Migration definition
#[derive(Debug, Clone)]
pub struct Migration {
pub version: i64,
pub name: String,
pub up_sql_postgres: String,
#[allow(dead_code)]
pub down_sql_postgres: String,
pub up_sql_sqlite: String,
#[allow(dead_code)]
pub down_sql_sqlite: String,
}
#[allow(dead_code)]
impl Migration {
/// Create a new migration
pub fn new(
version: i64,
name: String,
up_sql_postgres: String,
down_sql_postgres: String,
up_sql_sqlite: String,
down_sql_sqlite: String,
) -> Self {
Self {
version,
name,
up_sql_postgres,
down_sql_postgres,
up_sql_sqlite,
down_sql_sqlite,
}
}
/// Get the appropriate up SQL for the database type
pub fn up_sql(&self, db_type: &DatabaseType) -> &str {
match db_type {
DatabaseType::PostgreSQL => &self.up_sql_postgres,
DatabaseType::SQLite => &self.up_sql_sqlite,
}
}
/// Get the appropriate down SQL for the database type
pub fn down_sql(&self, db_type: &DatabaseType) -> &str {
match db_type {
DatabaseType::PostgreSQL => &self.down_sql_postgres,
DatabaseType::SQLite => &self.down_sql_sqlite,
}
}
}
/// Database-agnostic migration runner trait
#[async_trait]
#[allow(dead_code)]
pub trait MigrationRunnerTrait: Send + Sync {
/// Run all pending migrations
async fn run_migrations(&self) -> Result<Vec<MigrationStatus>>;
/// Run migrations up to a specific version
async fn migrate_to(&self, target_version: i64) -> Result<Vec<MigrationStatus>>;
/// Rollback the last migration
async fn rollback_last(&self) -> Result<Option<MigrationStatus>>;
/// Rollback to a specific version
async fn rollback_to(&self, target_version: i64) -> Result<Vec<MigrationStatus>>;
/// Get migration status
async fn get_status(&self) -> Result<Vec<MigrationStatus>>;
/// Check if all migrations are applied
async fn is_up_to_date(&self) -> Result<bool>;
/// Reset database (drop all tables)
async fn reset(&self) -> Result<()>;
/// Validate migration integrity
async fn validate(&self) -> Result<()>;
}
/// Database-agnostic migration runner implementation
#[derive(Debug, Clone)]
pub struct MigrationRunner {
database: Database,
migrations: Vec<Migration>,
}
#[allow(dead_code)]
impl MigrationRunner {
/// Create a new migration runner
pub fn new(database: Database) -> Self {
Self {
database,
migrations: Self::get_default_migrations(),
}
}
/// Create migration runner with custom migrations
pub fn with_migrations(database: Database, migrations: Vec<Migration>) -> Self {
Self {
database,
migrations,
}
}
/// Add a migration
pub fn add_migration(&mut self, migration: Migration) {
self.migrations.push(migration);
self.migrations.sort_by_key(|m| m.version);
}
/// Get the database type
pub fn database_type(&self) -> DatabaseType {
self.database.pool().database_type()
}
/// Load migrations from directory
pub fn load_migrations_from_dir<P: AsRef<Path>>(&mut self, migrations_dir: P) -> Result<()> {
let dir = migrations_dir.as_ref();
if !dir.exists() {
return Ok(());
}
let mut entries: Vec<_> = std::fs::read_dir(dir)?
.filter_map(|entry| entry.ok())
.filter(|entry| {
entry.path().is_file() && entry.path().extension().map_or(false, |ext| ext == "sql")
})
.collect();
entries.sort_by_key(|entry| entry.path());
for entry in entries {
let path = entry.path();
let filename = path.file_stem().unwrap().to_string_lossy();
// Parse filename format: "001_create_users_table"
let parts: Vec<&str> = filename.splitn(2, '_').collect();
if parts.len() != 2 {
continue;
}
let version: i64 = parts[0].parse().map_err(|_| {
anyhow::anyhow!("Invalid migration version in filename: {}", filename)
})?;
let name = parts[1].replace('_', " ");
let content = std::fs::read_to_string(&path)?;
// Look for database-specific sections
let postgres_sql = self.extract_sql_section(&content, "postgres")?;
let sqlite_sql = self.extract_sql_section(&content, "sqlite")?;
let migration = Migration::new(
version,
name,
postgres_sql.up,
postgres_sql.down,
sqlite_sql.up,
sqlite_sql.down,
);
self.add_migration(migration);
}
Ok(())
}
/// Extract SQL sections from migration file
fn extract_sql_section(&self, content: &str, db_type: &str) -> Result<SqlSection> {
let up_marker = format!("-- {} up", db_type);
let down_marker = format!("-- {} down", db_type);
let lines = content.lines();
let mut up_sql = String::new();
let mut down_sql = String::new();
let mut current_section = None;
for line in lines {
let line_lower = line.to_lowercase();
if line_lower.starts_with(&up_marker) {
current_section = Some("up");
continue;
} else if line_lower.starts_with(&down_marker) {
current_section = Some("down");
continue;
} else if line_lower.starts_with("--") && line_lower.contains("up") {
current_section = None;
continue;
} else if line_lower.starts_with("--") && line_lower.contains("down") {
current_section = None;
continue;
}
match current_section {
Some("up") => {
up_sql.push_str(line);
up_sql.push('\n');
}
Some("down") => {
down_sql.push_str(line);
down_sql.push('\n');
}
_ => {}
}
}
Ok(SqlSection {
up: up_sql.trim().to_string(),
down: down_sql.trim().to_string(),
})
}
/// Get default migrations for auth system
fn get_default_migrations() -> Vec<Migration> {
vec![
Migration::new(
1,
"create_users_table".to_string(),
// PostgreSQL up
r#"
CREATE TABLE IF NOT EXISTS users (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
email VARCHAR(255) UNIQUE NOT NULL,
password_hash VARCHAR(255),
display_name VARCHAR(255),
is_verified BOOLEAN DEFAULT FALSE,
is_active BOOLEAN DEFAULT TRUE,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
CREATE INDEX idx_users_email ON users(email);
CREATE INDEX idx_users_active ON users(is_active);
"#
.to_string(),
// PostgreSQL down
r#"
DROP TABLE IF EXISTS users CASCADE;
"#
.to_string(),
// SQLite up
r#"
CREATE TABLE IF NOT EXISTS users (
id TEXT PRIMARY KEY DEFAULT (lower(hex(randomblob(16)))),
email TEXT UNIQUE NOT NULL,
password_hash TEXT,
display_name TEXT,
is_verified BOOLEAN DEFAULT 0,
is_active BOOLEAN DEFAULT 1,
created_at TEXT DEFAULT (datetime('now')),
updated_at TEXT DEFAULT (datetime('now'))
);
CREATE INDEX idx_users_email ON users(email);
CREATE INDEX idx_users_active ON users(is_active);
"#
.to_string(),
// SQLite down
r#"
DROP TABLE IF EXISTS users;
"#
.to_string(),
),
Migration::new(
2,
"create_sessions_table".to_string(),
// PostgreSQL up
r#"
CREATE TABLE IF NOT EXISTS user_sessions (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
token VARCHAR(255) UNIQUE NOT NULL,
expires_at TIMESTAMP WITH TIME ZONE NOT NULL,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
last_used_at TIMESTAMP WITH TIME ZONE,
user_agent TEXT,
ip_address INET,
is_active BOOLEAN DEFAULT TRUE
);
CREATE INDEX idx_sessions_token ON user_sessions(token);
CREATE INDEX idx_sessions_user_id ON user_sessions(user_id);
CREATE INDEX idx_sessions_expires_at ON user_sessions(expires_at);
"#
.to_string(),
// PostgreSQL down
r#"
DROP TABLE IF EXISTS user_sessions CASCADE;
"#
.to_string(),
// SQLite up
r#"
CREATE TABLE IF NOT EXISTS user_sessions (
id TEXT PRIMARY KEY DEFAULT (lower(hex(randomblob(16)))),
user_id TEXT NOT NULL REFERENCES users(id) ON DELETE CASCADE,
token TEXT UNIQUE NOT NULL,
expires_at TEXT NOT NULL,
created_at TEXT DEFAULT (datetime('now')),
last_used_at TEXT,
user_agent TEXT,
ip_address TEXT,
is_active BOOLEAN DEFAULT 1
);
CREATE INDEX idx_sessions_token ON user_sessions(token);
CREATE INDEX idx_sessions_user_id ON user_sessions(user_id);
CREATE INDEX idx_sessions_expires_at ON user_sessions(expires_at);
"#
.to_string(),
// SQLite down
r#"
DROP TABLE IF EXISTS user_sessions;
"#
.to_string(),
),
Migration::new(
3,
"create_roles_table".to_string(),
// PostgreSQL up
r#"
CREATE TABLE IF NOT EXISTS user_roles (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name VARCHAR(50) UNIQUE NOT NULL,
description TEXT,
is_active BOOLEAN DEFAULT TRUE,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS user_role_assignments (
user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
role_id UUID NOT NULL REFERENCES user_roles(id) ON DELETE CASCADE,
assigned_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
PRIMARY KEY (user_id, role_id)
);
INSERT INTO user_roles (name, description) VALUES
('admin', 'Administrator with full access'),
('user', 'Regular user with basic access')
ON CONFLICT (name) DO NOTHING;
"#
.to_string(),
// PostgreSQL down
r#"
DROP TABLE IF EXISTS user_role_assignments CASCADE;
DROP TABLE IF EXISTS user_roles CASCADE;
"#
.to_string(),
// SQLite up
r#"
CREATE TABLE IF NOT EXISTS user_roles (
id TEXT PRIMARY KEY DEFAULT (lower(hex(randomblob(16)))),
name TEXT UNIQUE NOT NULL,
description TEXT,
is_active BOOLEAN DEFAULT 1,
created_at TEXT DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS user_role_assignments (
user_id TEXT NOT NULL REFERENCES users(id) ON DELETE CASCADE,
role_id TEXT NOT NULL REFERENCES user_roles(id) ON DELETE CASCADE,
assigned_at TEXT DEFAULT (datetime('now')),
PRIMARY KEY (user_id, role_id)
);
INSERT OR IGNORE INTO user_roles (name, description) VALUES
('admin', 'Administrator with full access'),
('user', 'Regular user with basic access');
"#
.to_string(),
// SQLite down
r#"
DROP TABLE IF EXISTS user_role_assignments;
DROP TABLE IF EXISTS user_roles;
"#
.to_string(),
),
]
}
}
#[async_trait]
impl MigrationRunnerTrait for MigrationRunner {
async fn run_migrations(&self) -> Result<Vec<MigrationStatus>> {
match self.database.pool().database_type() {
DatabaseType::PostgreSQL => {
let pool = self
.database
.as_pg_pool()
.ok_or_else(|| anyhow::anyhow!("Expected PostgreSQL pool"))?;
self.run_postgres_migrations(&pool).await
}
DatabaseType::SQLite => {
let pool = self
.database
.as_sqlite_pool()
.ok_or_else(|| anyhow::anyhow!("Expected SQLite pool"))?;
self.run_sqlite_migrations(&pool).await
}
}
}
async fn migrate_to(&self, target_version: i64) -> Result<Vec<MigrationStatus>> {
match self.database.pool().database_type() {
DatabaseType::PostgreSQL => {
let pool = self
.database
.as_pg_pool()
.ok_or_else(|| anyhow::anyhow!("Expected PostgreSQL pool"))?;
self.migrate_to_postgres(&pool, target_version).await
}
DatabaseType::SQLite => {
let pool = self
.database
.as_sqlite_pool()
.ok_or_else(|| anyhow::anyhow!("Expected SQLite pool"))?;
self.migrate_to_sqlite(&pool, target_version).await
}
}
}
async fn rollback_last(&self) -> Result<Option<MigrationStatus>> {
match self.database.pool().database_type() {
DatabaseType::PostgreSQL => {
let pool = self
.database
.as_pg_pool()
.ok_or_else(|| anyhow::anyhow!("Expected PostgreSQL pool"))?;
self.rollback_last_postgres(&pool).await
}
DatabaseType::SQLite => {
let pool = self
.database
.as_sqlite_pool()
.ok_or_else(|| anyhow::anyhow!("Expected SQLite pool"))?;
self.rollback_last_sqlite(&pool).await
}
}
}
async fn rollback_to(&self, target_version: i64) -> Result<Vec<MigrationStatus>> {
match self.database.pool().database_type() {
DatabaseType::PostgreSQL => {
let pool = self
.database
.as_pg_pool()
.ok_or_else(|| anyhow::anyhow!("Expected PostgreSQL pool"))?;
self.rollback_to_postgres(&pool, target_version).await
}
DatabaseType::SQLite => {
let pool = self
.database
.as_sqlite_pool()
.ok_or_else(|| anyhow::anyhow!("Expected SQLite pool"))?;
self.rollback_to_sqlite(&pool, target_version).await
}
}
}
async fn get_status(&self) -> Result<Vec<MigrationStatus>> {
match self.database.pool().database_type() {
DatabaseType::PostgreSQL => {
let pool = self
.database
.as_pg_pool()
.ok_or_else(|| anyhow::anyhow!("Expected PostgreSQL pool"))?;
self.get_status_postgres(&pool).await
}
DatabaseType::SQLite => {
let pool = self
.database
.as_sqlite_pool()
.ok_or_else(|| anyhow::anyhow!("Expected SQLite pool"))?;
self.get_status_sqlite(&pool).await
}
}
}
async fn is_up_to_date(&self) -> Result<bool> {
let status = self.get_status().await?;
Ok(status.iter().all(|s| s.applied))
}
async fn reset(&self) -> Result<()> {
match self.database.pool().database_type() {
DatabaseType::PostgreSQL => {
let pool = self
.database
.as_pg_pool()
.ok_or_else(|| anyhow::anyhow!("Expected PostgreSQL pool"))?;
self.reset_postgres(&pool).await
}
DatabaseType::SQLite => {
let pool = self
.database
.as_sqlite_pool()
.ok_or_else(|| anyhow::anyhow!("Expected SQLite pool"))?;
self.reset_sqlite(&pool).await
}
}
}
async fn validate(&self) -> Result<()> {
// Check for duplicate versions
let mut versions = HashMap::new();
for migration in &self.migrations {
if versions.contains_key(&migration.version) {
return Err(anyhow::anyhow!(
"Duplicate migration version: {}",
migration.version
));
}
versions.insert(migration.version, &migration.name);
}
// Validate SQL syntax (basic check)
for migration in &self.migrations {
if migration.up_sql_postgres.trim().is_empty() {
return Err(anyhow::anyhow!(
"Empty PostgreSQL up migration for version {}",
migration.version
));
}
if migration.up_sql_sqlite.trim().is_empty() {
return Err(anyhow::anyhow!(
"Empty SQLite up migration for version {}",
migration.version
));
}
}
Ok(())
}
}
impl MigrationRunner {
/// Initialize migrations table for PostgreSQL
async fn init_postgres_migrations_table(&self, pool: &sqlx::PgPool) -> Result<()> {
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS _migrations (
version BIGINT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
applied_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
)
"#,
)
.execute(pool)
.await?;
Ok(())
}
/// Initialize migrations table for SQLite
async fn init_sqlite_migrations_table(&self, pool: &sqlx::SqlitePool) -> Result<()> {
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS _migrations (
version INTEGER PRIMARY KEY,
name TEXT NOT NULL,
applied_at TEXT DEFAULT (datetime('now'))
)
"#,
)
.execute(pool)
.await?;
Ok(())
}
/// Run PostgreSQL migrations
async fn run_postgres_migrations(&self, pool: &sqlx::PgPool) -> Result<Vec<MigrationStatus>> {
self.init_postgres_migrations_table(pool).await?;
let applied_versions = self.get_applied_versions_postgres(pool).await?;
let mut results = Vec::new();
for migration in &self.migrations {
if !applied_versions.contains(&migration.version) {
tracing::info!(
"Applying migration {}: {}",
migration.version,
migration.name
);
sqlx::query(&migration.up_sql_postgres)
.execute(pool)
.await?;
sqlx::query("INSERT INTO _migrations (version, name) VALUES ($1, $2)")
.bind(migration.version)
.bind(&migration.name)
.execute(pool)
.await?;
results.push(MigrationStatus {
version: migration.version,
name: migration.name.clone(),
applied: true,
applied_at: Some(chrono::Utc::now()),
});
}
}
Ok(results)
}
/// Run SQLite migrations
async fn run_sqlite_migrations(&self, pool: &sqlx::SqlitePool) -> Result<Vec<MigrationStatus>> {
self.init_sqlite_migrations_table(pool).await?;
let applied_versions = self.get_applied_versions_sqlite(pool).await?;
let mut results = Vec::new();
for migration in &self.migrations {
if !applied_versions.contains(&migration.version) {
tracing::info!(
"Applying migration {}: {}",
migration.version,
migration.name
);
sqlx::query(&migration.up_sql_sqlite).execute(pool).await?;
sqlx::query("INSERT INTO _migrations (version, name) VALUES (?1, ?2)")
.bind(migration.version)
.bind(&migration.name)
.execute(pool)
.await?;
results.push(MigrationStatus {
version: migration.version,
name: migration.name.clone(),
applied: true,
applied_at: Some(chrono::Utc::now()),
});
}
}
Ok(results)
}
/// Get applied migration versions for PostgreSQL
async fn get_applied_versions_postgres(&self, pool: &sqlx::PgPool) -> Result<Vec<i64>> {
let rows = sqlx::query("SELECT version FROM _migrations ORDER BY version")
.fetch_all(pool)
.await?;
Ok(rows.into_iter().map(|row| row.get("version")).collect())
}
/// Get applied migration versions for SQLite
async fn get_applied_versions_sqlite(&self, pool: &sqlx::SqlitePool) -> Result<Vec<i64>> {
let rows = sqlx::query("SELECT version FROM _migrations ORDER BY version")
.fetch_all(pool)
.await?;
Ok(rows.into_iter().map(|row| row.get("version")).collect())
}
// Placeholder implementations for other methods
async fn migrate_to_postgres(
&self,
_pool: &sqlx::PgPool,
_target_version: i64,
) -> Result<Vec<MigrationStatus>> {
// TODO: Implement
Ok(vec![])
}
async fn migrate_to_sqlite(
&self,
_pool: &sqlx::SqlitePool,
_target_version: i64,
) -> Result<Vec<MigrationStatus>> {
// TODO: Implement
Ok(vec![])
}
async fn rollback_last_postgres(
&self,
_pool: &sqlx::PgPool,
) -> Result<Option<MigrationStatus>> {
// TODO: Implement
Ok(None)
}
async fn rollback_last_sqlite(
&self,
_pool: &sqlx::SqlitePool,
) -> Result<Option<MigrationStatus>> {
// TODO: Implement
Ok(None)
}
async fn rollback_to_postgres(
&self,
_pool: &sqlx::PgPool,
_target_version: i64,
) -> Result<Vec<MigrationStatus>> {
// TODO: Implement
Ok(vec![])
}
async fn rollback_to_sqlite(
&self,
_pool: &sqlx::SqlitePool,
_target_version: i64,
) -> Result<Vec<MigrationStatus>> {
// TODO: Implement
Ok(vec![])
}
async fn get_status_postgres(&self, pool: &sqlx::PgPool) -> Result<Vec<MigrationStatus>> {
self.init_postgres_migrations_table(pool).await?;
let applied_versions = self.get_applied_versions_postgres(pool).await?;
let mut status = Vec::new();
for migration in &self.migrations {
status.push(MigrationStatus {
version: migration.version,
name: migration.name.clone(),
applied: applied_versions.contains(&migration.version),
applied_at: None, // TODO: Get actual applied_at from database
});
}
Ok(status)
}
async fn get_status_sqlite(&self, pool: &sqlx::SqlitePool) -> Result<Vec<MigrationStatus>> {
self.init_sqlite_migrations_table(pool).await?;
let applied_versions = self.get_applied_versions_sqlite(pool).await?;
let mut status = Vec::new();
for migration in &self.migrations {
status.push(MigrationStatus {
version: migration.version,
name: migration.name.clone(),
applied: applied_versions.contains(&migration.version),
applied_at: None, // TODO: Get actual applied_at from database
});
}
Ok(status)
}
async fn reset_postgres(&self, pool: &sqlx::PgPool) -> Result<()> {
sqlx::query("DROP SCHEMA public CASCADE; CREATE SCHEMA public;")
.execute(pool)
.await?;
Ok(())
}
async fn reset_sqlite(&self, pool: &sqlx::SqlitePool) -> Result<()> {
// Get all table names
let rows = sqlx::query(
"SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'",
)
.fetch_all(pool)
.await?;
// Drop all tables
for row in rows {
let table_name: String = row.get("name");
sqlx::query(&format!("DROP TABLE IF EXISTS {};", table_name))
.execute(pool)
.await?;
}
Ok(())
}
}
/// Helper structure for SQL sections
#[allow(dead_code)]
struct SqlSection {
up: String,
down: String,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_migration_creation() {
let migration = Migration::new(
1,
"test_migration".to_string(),
"CREATE TABLE test_pg ();".to_string(),
"DROP TABLE test_pg;".to_string(),
"CREATE TABLE test_sqlite ();".to_string(),
"DROP TABLE test_sqlite;".to_string(),
);
assert_eq!(migration.version, 1);
assert_eq!(migration.name, "test_migration");
assert_eq!(
migration.up_sql(&DatabaseType::PostgreSQL),
"CREATE TABLE test_pg ();"
);
assert_eq!(
migration.up_sql(&DatabaseType::SQLite),
"CREATE TABLE test_sqlite ();"
);
}
#[test]
fn test_default_migrations() {
let migrations = MigrationRunner::get_default_migrations();
assert!(!migrations.is_empty());
assert_eq!(migrations[0].version, 1);
assert!(migrations[0].name.contains("users"));
}
}