549 lines
18 KiB
Rust
Raw Normal View History

2026-01-23 16:13:23 +00:00
//! Bidirectional synchronization between storage backends
//!
//! Provides synchronization between two storage backends (typically filesystem
//! ↔ `SurrealDB`). Supports multiple sync strategies:
//! - `sync_to_target()`: One-way sync from source to target
//! - `sync_from_target()`: One-way sync from target to source
//! - `sync_bidirectional()`: Two-way sync with conflict resolution
//!
//! # Conflict Resolution
//!
//! When the same node exists in both storages with different content:
//! - **`LastWriteWins`**: Use the node with the most recent `modified`
//! timestamp
//! - **`SourceWins`**: Always prefer the source node
//! - **`TargetWins`**: Always prefer the target node
//!
//! # Example
//!
//! ```no_run
//! use kogral_core::sync::{SyncManager, ConflictStrategy};
//! use kogral_core::storage::memory::MemoryStorage;
//! use kogral_core::config::SyncConfig;
//! use std::sync::Arc;
//! use tokio::sync::RwLock;
//!
//! # async fn example() -> kogral_core::error::Result<()> {
//! let source = Arc::new(RwLock::new(MemoryStorage::new()));
//! let target = Arc::new(RwLock::new(MemoryStorage::new()));
//! let config = SyncConfig::default();
//!
//! let manager = SyncManager::new(source, target, config);
//! manager.sync_to_target().await?;
//! # Ok(())
//! # }
//! ```
use std::sync::Arc;
use tokio::sync::RwLock;
use tracing::{debug, info, warn};
use crate::config::SyncConfig;
use crate::error::{KbError, Result};
use crate::models::Node;
use crate::storage::Storage;
/// Conflict resolution strategy when the same node differs between storages
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum ConflictStrategy {
/// Use the node with the most recent `modified` timestamp
#[default]
LastWriteWins,
/// Always prefer the source node
SourceWins,
/// Always prefer the target node
TargetWins,
}
/// Bidirectional synchronization manager between two storage backends
///
/// Manages synchronization between a source storage (typically filesystem)
/// and a target storage (typically `SurrealDB` or another backend).
///
/// Both storages are wrapped in `Arc<RwLock<>>` to support concurrent access
/// and async operations.
pub struct SyncManager {
/// Source storage (typically filesystem)
source: Arc<RwLock<dyn Storage>>,
/// Target storage (typically `SurrealDB`)
target: Arc<RwLock<dyn Storage>>,
/// Sync configuration
config: SyncConfig,
/// Conflict resolution strategy
conflict_strategy: ConflictStrategy,
}
impl SyncManager {
/// Create a new sync manager
///
/// # Arguments
///
/// * `source` - Source storage backend (typically filesystem)
/// * `target` - Target storage backend (typically `SurrealDB`)
/// * `config` - Sync configuration
///
/// # Example
///
/// ```no_run
/// use kogral_core::sync::SyncManager;
/// use kogral_core::storage::memory::MemoryStorage;
/// use kogral_core::config::SyncConfig;
/// use std::sync::Arc;
/// use tokio::sync::RwLock;
///
/// let source = Arc::new(RwLock::new(MemoryStorage::new()));
/// let target = Arc::new(RwLock::new(MemoryStorage::new()));
/// let config = SyncConfig::default();
///
/// let manager = SyncManager::new(source, target, config);
/// ```
pub fn new(
source: Arc<RwLock<dyn Storage>>,
target: Arc<RwLock<dyn Storage>>,
config: SyncConfig,
) -> Self {
Self {
source,
target,
config,
conflict_strategy: ConflictStrategy::default(),
}
}
/// Create a sync manager with custom conflict resolution strategy
pub fn with_conflict_strategy(
source: Arc<RwLock<dyn Storage>>,
target: Arc<RwLock<dyn Storage>>,
config: SyncConfig,
conflict_strategy: ConflictStrategy,
) -> Self {
Self {
source,
target,
config,
conflict_strategy,
}
}
/// Synchronize from source to target (one-way)
///
/// Copies all graphs and nodes from source storage to target storage.
/// Does not modify source storage.
///
/// # Errors
///
/// Returns error if:
/// - Failed to list graphs from source
/// - Failed to load graph from source
/// - Failed to save graph to target
///
/// # Example
///
/// ```no_run
/// # use kogral_core::sync::SyncManager;
/// # use kogral_core::storage::memory::MemoryStorage;
/// # use kogral_core::config::SyncConfig;
/// # use std::sync::Arc;
/// # use tokio::sync::RwLock;
/// # async fn example() -> kogral_core::error::Result<()> {
/// # let source = Arc::new(RwLock::new(MemoryStorage::new()));
/// # let target = Arc::new(RwLock::new(MemoryStorage::new()));
/// # let manager = SyncManager::new(source, target, SyncConfig::default());
/// manager.sync_to_target().await?;
/// # Ok(())
/// # }
/// ```
pub async fn sync_to_target(&self) -> Result<()> {
info!("Starting sync from source to target");
let source = self.source.read().await;
let graphs = source.list_graphs().await?;
debug!("Found {} graphs in source", graphs.len());
let mut target = self.target.write().await;
for graph_name in graphs {
let graph = source.load_graph(&graph_name).await?;
info!(
"Syncing graph '{}' to target ({} nodes)",
graph_name,
graph.nodes.len()
);
target.save_graph(&graph).await?;
}
info!("Sync to target completed successfully");
Ok(())
}
/// Synchronize from target to source (one-way)
///
/// Copies all graphs and nodes from target storage to source storage.
/// Does not modify target storage.
///
/// # Errors
///
/// Returns error if:
/// - Failed to list graphs from target
/// - Failed to load graph from target
/// - Failed to save graph to source
pub async fn sync_from_target(&self) -> Result<()> {
info!("Starting sync from target to source");
let target = self.target.read().await;
let graphs = target.list_graphs().await?;
debug!("Found {} graphs in target", graphs.len());
let mut source = self.source.write().await;
for graph_name in graphs {
let graph = target.load_graph(&graph_name).await?;
info!(
"Syncing graph '{}' to source ({} nodes)",
graph_name,
graph.nodes.len()
);
source.save_graph(&graph).await?;
}
info!("Sync from target completed successfully");
Ok(())
}
/// Bidirectional synchronization with conflict resolution
///
/// Syncs nodes in both directions, resolving conflicts based on the
/// configured conflict strategy.
///
/// # Algorithm
///
/// 1. Load all graph names from both storages
/// 2. For each unique graph:
/// - Load from both storages (if exists)
/// - Compare node IDs
/// - For nodes only in source: copy to target
/// - For nodes only in target: copy to source
/// - For nodes in both: resolve conflict
///
/// # Conflict Resolution
///
/// - `LastWriteWins`: Compare `modified` timestamps
/// - `SourceWins`: Always use source node
/// - `TargetWins`: Always use target node
///
/// # Errors
///
/// Returns error if storage operations fail
pub async fn sync_bidirectional(&self) -> Result<()> {
info!(
"Starting bidirectional sync with strategy: {:?}",
self.conflict_strategy
);
let source = self.source.read().await;
let target = self.target.read().await;
let source_graphs = source.list_graphs().await?;
let target_graphs = target.list_graphs().await?;
// Combine unique graph names
let mut all_graphs: Vec<String> = source_graphs.clone();
for graph in target_graphs {
if !all_graphs.contains(&graph) {
all_graphs.push(graph);
}
}
debug!(
"Found {} unique graphs across both storages",
all_graphs.len()
);
// Drop read locks before acquiring write locks
drop(source);
drop(target);
for graph_name in all_graphs {
self.sync_graph_bidirectional(&graph_name).await?;
}
info!("Bidirectional sync completed successfully");
Ok(())
}
/// Sync a single graph bidirectionally
async fn sync_graph_bidirectional(&self, graph_name: &str) -> Result<()> {
let source = self.source.read().await;
let target = self.target.read().await;
let source_graph = source.load_graph(graph_name).await;
let target_graph = target.load_graph(graph_name).await;
match (source_graph, target_graph) {
(Ok(mut src), Ok(mut tgt)) => {
debug!("Graph '{}' exists in both storages", graph_name);
// Find nodes only in source
let source_only: Vec<String> = src
.nodes
.keys()
.filter(|id| !tgt.nodes.contains_key(*id))
.cloned()
.collect();
// Find nodes only in target
let target_only: Vec<String> = tgt
.nodes
.keys()
.filter(|id| !src.nodes.contains_key(*id))
.cloned()
.collect();
// Find nodes in both (potential conflicts)
let in_both: Vec<String> = src
.nodes
.keys()
.filter(|id| tgt.nodes.contains_key(*id))
.cloned()
.collect();
debug!(
"Graph '{}': {} source-only, {} target-only, {} in both",
graph_name,
source_only.len(),
target_only.len(),
in_both.len()
);
// Copy source-only nodes to target
for node_id in source_only {
if let Some(node) = src.nodes.get(&node_id) {
tgt.nodes.insert(node_id.clone(), node.clone());
}
}
// Copy target-only nodes to source
for node_id in target_only {
if let Some(node) = tgt.nodes.get(&node_id) {
src.nodes.insert(node_id.clone(), node.clone());
}
}
// Resolve conflicts for nodes in both
for node_id in in_both {
if let (Some(src_node), Some(tgt_node)) =
(src.nodes.get(&node_id), tgt.nodes.get(&node_id))
{
let winning_node = self.resolve_conflict(src_node, tgt_node);
src.nodes.insert(node_id.clone(), winning_node.clone());
tgt.nodes.insert(node_id.clone(), winning_node);
}
}
// Drop read locks
drop(source);
drop(target);
// Save updated graphs
let mut source_write = self.source.write().await;
let mut target_write = self.target.write().await;
source_write.save_graph(&src).await?;
target_write.save_graph(&tgt).await?;
}
(Ok(graph), Err(_)) => {
debug!("Graph '{}' only in source, copying to target", graph_name);
drop(source);
drop(target);
let mut target_write = self.target.write().await;
target_write.save_graph(&graph).await?;
}
(Err(_), Ok(graph)) => {
debug!("Graph '{}' only in target, copying to source", graph_name);
drop(source);
drop(target);
let mut source_write = self.source.write().await;
source_write.save_graph(&graph).await?;
}
(Err(e1), Err(e2)) => {
warn!(
"Graph '{}' not found in either storage: source={:?}, target={:?}",
graph_name, e1, e2
);
return Err(KbError::Graph(format!(
"Graph '{graph_name}' not found in either storage"
)));
}
}
Ok(())
}
/// Resolve conflict between two versions of the same node
fn resolve_conflict(&self, source_node: &Node, target_node: &Node) -> Node {
match self.conflict_strategy {
ConflictStrategy::LastWriteWins => {
if source_node.modified > target_node.modified {
debug!("Conflict resolved: source wins (newer modified time)");
source_node.clone()
} else {
debug!("Conflict resolved: target wins (newer modified time)");
target_node.clone()
}
}
ConflictStrategy::SourceWins => {
debug!("Conflict resolved: source wins (strategy)");
source_node.clone()
}
ConflictStrategy::TargetWins => {
debug!("Conflict resolved: target wins (strategy)");
target_node.clone()
}
}
}
/// Get the sync configuration
#[must_use]
pub fn config(&self) -> &SyncConfig {
&self.config
}
/// Get the current conflict resolution strategy
#[must_use]
pub fn conflict_strategy(&self) -> ConflictStrategy {
self.conflict_strategy
}
}
#[cfg(test)]
mod tests {
use chrono::Utc;
use super::*;
use crate::models::{Graph, Node, NodeType};
use crate::storage::memory::MemoryStorage;
fn create_test_node(id: &str, title: &str) -> Node {
let mut node = Node::new(NodeType::Note, title.to_string());
node.id = id.to_string(); // Override UUID with test ID
node
}
#[tokio::test]
async fn test_sync_to_target() {
let mut source_storage = MemoryStorage::new();
let target_storage = MemoryStorage::new();
// Add graph to source
let mut graph = Graph::new("test".to_string());
let node = create_test_node("node-1", "Test Node");
graph.nodes.insert("node-1".to_string(), node);
source_storage.save_graph(&graph).await.unwrap();
let source = Arc::new(RwLock::new(source_storage));
let target = Arc::new(RwLock::new(target_storage));
let manager = SyncManager::new(source.clone(), target.clone(), SyncConfig::default());
manager.sync_to_target().await.unwrap();
// Verify node exists in target
let target_read = target.read().await;
let loaded_node = target_read.load_node("test", "node-1").await.unwrap();
assert_eq!(loaded_node.title, "Test Node");
}
#[tokio::test]
async fn test_sync_from_target() {
let source_storage = MemoryStorage::new();
let mut target_storage = MemoryStorage::new();
// Add graph to target
let mut graph = Graph::new("test".to_string());
let node = create_test_node("node-2", "Target Node");
graph.nodes.insert("node-2".to_string(), node);
target_storage.save_graph(&graph).await.unwrap();
let source = Arc::new(RwLock::new(source_storage));
let target = Arc::new(RwLock::new(target_storage));
let manager = SyncManager::new(source.clone(), target.clone(), SyncConfig::default());
manager.sync_from_target().await.unwrap();
// Verify node exists in source
let source_read = source.read().await;
let loaded_node = source_read.load_node("test", "node-2").await.unwrap();
assert_eq!(loaded_node.title, "Target Node");
}
#[tokio::test]
async fn test_bidirectional_sync_source_only() {
let mut source_storage = MemoryStorage::new();
let target_storage = MemoryStorage::new();
let mut graph = Graph::new("test".to_string());
let node = create_test_node("node-src", "Source Only");
graph.nodes.insert("node-src".to_string(), node);
source_storage.save_graph(&graph).await.unwrap();
let source = Arc::new(RwLock::new(source_storage));
let target = Arc::new(RwLock::new(target_storage));
let manager = SyncManager::new(source.clone(), target.clone(), SyncConfig::default());
manager.sync_bidirectional().await.unwrap();
// Node should exist in both
let source_read = source.read().await;
let target_read = target.read().await;
assert!(source_read.load_node("test", "node-src").await.is_ok());
assert!(target_read.load_node("test", "node-src").await.is_ok());
}
#[tokio::test]
async fn test_conflict_resolution_last_write_wins() {
let mut source_storage = MemoryStorage::new();
let mut target_storage = MemoryStorage::new();
// Create conflicting nodes
let mut old_node = create_test_node("conflict", "Old Version");
old_node.modified = Utc::now() - chrono::Duration::hours(1);
let new_node = create_test_node("conflict", "New Version");
let mut source_graph = Graph::new("test".to_string());
source_graph
.nodes
.insert("conflict".to_string(), new_node.clone());
source_storage.save_graph(&source_graph).await.unwrap();
let mut target_graph = Graph::new("test".to_string());
target_graph.nodes.insert("conflict".to_string(), old_node);
target_storage.save_graph(&target_graph).await.unwrap();
let source = Arc::new(RwLock::new(source_storage));
let target = Arc::new(RwLock::new(target_storage));
let manager = SyncManager::with_conflict_strategy(
source.clone(),
target.clone(),
SyncConfig::default(),
ConflictStrategy::LastWriteWins,
);
manager.sync_bidirectional().await.unwrap();
// Both should have the newer version
let source_read = source.read().await;
let target_read = target.read().await;
let source_node = source_read.load_node("test", "conflict").await.unwrap();
let target_node = target_read.load_node("test", "conflict").await.unwrap();
assert_eq!(source_node.title, "New Version");
assert_eq!(target_node.title, "New Version");
}
}