From da0fed29dea1d2db5f3f70a3b0652e4f3bb86bb1 Mon Sep 17 00:00:00 2001 From: nobody Date: Tue, 30 Sep 2025 11:25:59 -0700 Subject: [PATCH] fix: use ArchiveReader parsing for state subcommand --- src/cmd/state.rs | 351 ++++++++++++++--------------------------------- 1 file changed, 106 insertions(+), 245 deletions(-) diff --git a/src/cmd/state.rs b/src/cmd/state.rs index 3558ae9..92da1b0 100644 --- a/src/cmd/state.rs +++ b/src/cmd/state.rs @@ -21,8 +21,7 @@ use crate::flags; use chrono::{DateTime, Utc}; -use json_archive::reader::{ArchiveReader, ReadMode}; -use json_archive::{Diagnostic, DiagnosticCode, DiagnosticLevel}; +use json_archive::{apply_add, apply_change, apply_move, apply_remove, ArchiveReader, Diagnostic, DiagnosticCode, DiagnosticLevel, Event, ReadMode}; use serde_json::Value; use std::path::Path; @@ -56,47 +55,10 @@ pub fn run(flags: &flags::State) -> Vec { Err(diagnostic) => return vec![diagnostic], }; - // Read the archive using the existing reader - let reader = match ArchiveReader::new(&flags.file, ReadMode::FullValidation) { - Ok(reader) => reader, - Err(e) => { - return vec![Diagnostic::new( - DiagnosticLevel::Fatal, - DiagnosticCode::PathNotFound, - format!("I couldn't open the archive file: {}", e), - )]; - } - }; - - let result = match reader.read(&flags.file) { - Ok(result) => result, - Err(e) => { - return vec![Diagnostic::new( - DiagnosticLevel::Fatal, - DiagnosticCode::PathNotFound, - format!("I couldn't read the archive file: {}", e), - )]; - } - }; - - // If there are fatal diagnostics, return them - if result.diagnostics.has_fatal() { - return result.diagnostics.into_diagnostics(); - } - - // For non-latest access methods, we need to collect observations and find the target - let target_state = match access_method { - AccessMethod::Latest => { - // The reader already gives us the final state after all observations - result.final_state - } - _ => { - // We need to collect observations and replay to the target - match find_and_replay_to_target(&flags.file, &access_method) { - Ok(state) => state, - Err(diagnostics) => return diagnostics, - } - } + // Find and replay to the target observation + let target_state = match find_and_replay_to_target(&flags.file, &access_method) { + Ok(state) => state, + Err(diagnostics) => return diagnostics, }; // Output the JSON state @@ -189,8 +151,107 @@ fn find_and_replay_to_target( file_path: &Path, access_method: &AccessMethod, ) -> Result> { - // We need to collect observations with full details to support all access methods - let observations = collect_observations_with_events(file_path)?; + let reader = match ArchiveReader::new(file_path, ReadMode::AppendSeek) { + Ok(r) => r, + Err(e) => { + return Err(vec![Diagnostic::new( + DiagnosticLevel::Fatal, + DiagnosticCode::PathNotFound, + format!("I couldn't open the archive file: {}", e), + )]); + } + }; + + let (initial_state, mut event_iter) = match reader.events(file_path) { + Ok(r) => r, + Err(e) => { + return Err(vec![Diagnostic::new( + DiagnosticLevel::Fatal, + DiagnosticCode::PathNotFound, + format!("I couldn't read the archive file: {}", e), + )]); + } + }; + + // Check for fatal diagnostics from initial parsing + if event_iter.diagnostics.has_fatal() { + return Err(event_iter.diagnostics.diagnostics().to_vec()); + } + + // Collect observations while replaying events + let mut observations = Vec::new(); + let mut current_state = initial_state.clone(); + let created = event_iter.header.created; + + // Add initial state as observation 0 + observations.push(ObservationWithEvents { + id: "initial".to_string(), + timestamp: created, + final_state: initial_state, + }); + + // Process events and track state at each observation + while let Some(event) = event_iter.next() { + match event { + Event::Observe { observation_id, timestamp, change_count: _ } => { + observations.push(ObservationWithEvents { + id: observation_id, + timestamp, + final_state: current_state.clone(), + }); + } + Event::Add { path, value, .. } => { + let _ = apply_add(&mut current_state, &path, value); + + // Update the final state of the last observation + if let Some(last_obs) = observations.last_mut() { + if last_obs.id != "initial" { + last_obs.final_state = current_state.clone(); + } + } + } + Event::Change { path, new_value, .. } => { + let _ = apply_change(&mut current_state, &path, new_value); + + // Update the final state of the last observation + if let Some(last_obs) = observations.last_mut() { + if last_obs.id != "initial" { + last_obs.final_state = current_state.clone(); + } + } + } + Event::Remove { path, .. } => { + let _ = apply_remove(&mut current_state, &path); + + // Update the final state of the last observation + if let Some(last_obs) = observations.last_mut() { + if last_obs.id != "initial" { + last_obs.final_state = current_state.clone(); + } + } + } + Event::Move { path, moves, .. } => { + let _ = apply_move(&mut current_state, &path, moves); + + // Update the final state of the last observation + if let Some(last_obs) = observations.last_mut() { + if last_obs.id != "initial" { + last_obs.final_state = current_state.clone(); + } + } + } + Event::Snapshot { object, .. } => { + current_state = object; + + // Update the final state of the last observation + if let Some(last_obs) = observations.last_mut() { + if last_obs.id != "initial" { + last_obs.final_state = current_state.clone(); + } + } + } + } + } if observations.is_empty() { return Err(vec![Diagnostic::new( @@ -295,7 +356,6 @@ fn find_and_replay_to_target( } }; - // Now replay events from initial state up to and including the target observation Ok(target_observation.final_state.clone()) } @@ -305,202 +365,3 @@ struct ObservationWithEvents { timestamp: DateTime, final_state: Value, } - -fn collect_observations_with_events( - file_path: &Path, -) -> Result, Vec> { - use std::fs::File; - use std::io::{BufRead, BufReader}; - - let file = match File::open(file_path) { - Ok(f) => f, - Err(e) => { - return Err(vec![Diagnostic::new( - DiagnosticLevel::Fatal, - DiagnosticCode::PathNotFound, - format!("I couldn't open the archive file: {}", e), - )]); - } - }; - - let reader = BufReader::new(file); - let mut lines = reader.lines(); - let mut observations = Vec::new(); - - // Parse header - let header_line = match lines.next() { - Some(Ok(line)) => line, - _ => { - return Err(vec![Diagnostic::new( - DiagnosticLevel::Fatal, - DiagnosticCode::EmptyFile, - "Archive file is empty or unreadable".to_string(), - )]); - } - }; - - let header: Value = match serde_json::from_str(&header_line) { - Ok(h) => h, - Err(e) => { - return Err(vec![Diagnostic::new( - DiagnosticLevel::Fatal, - DiagnosticCode::MissingHeader, - format!("I couldn't parse the header: {}", e), - )]); - } - }; - - let created_str = header["created"].as_str().unwrap_or(""); - let created: DateTime = match created_str.parse() { - Ok(dt) => dt, - Err(_) => Utc::now(), - }; - - let initial_state = header["initial"].clone(); - - // Add initial state as observation 0 - observations.push(ObservationWithEvents { - id: "initial".to_string(), - timestamp: created, - final_state: initial_state.clone(), - }); - - let mut current_state = initial_state; - - // Parse events and track state at each observation - for line in lines { - let line = match line { - Ok(l) => l, - Err(_) => continue, - }; - - if line.trim().starts_with('#') || line.trim().is_empty() { - continue; - } - - let event: Value = match serde_json::from_str(&line) { - Ok(e) => e, - Err(_) => continue, - }; - - if let Some(arr) = event.as_array() { - if arr.is_empty() { - continue; - } - - let event_type = arr[0].as_str().unwrap_or(""); - - if event_type == "observe" && arr.len() >= 4 { - let obs_id = arr[1].as_str().unwrap_or("").to_string(); - let timestamp_str = arr[2].as_str().unwrap_or(""); - - let timestamp: DateTime = match timestamp_str.parse() { - Ok(dt) => dt, - Err(_) => continue, - }; - - observations.push(ObservationWithEvents { - id: obs_id, - timestamp, - final_state: current_state.clone(), // Will be updated as events are applied - }); - } else if event_type == "snapshot" && arr.len() >= 4 { - // Handle snapshot events - let obs_id = arr[1].as_str().unwrap_or("").to_string(); - let timestamp_str = arr[2].as_str().unwrap_or(""); - let snapshot_state = arr[3].clone(); - - let timestamp: DateTime = match timestamp_str.parse() { - Ok(dt) => dt, - Err(_) => continue, - }; - - current_state = snapshot_state.clone(); - observations.push(ObservationWithEvents { - id: obs_id, - timestamp, - final_state: snapshot_state, - }); - } else { - // Apply the event to current_state - apply_event_to_state(&mut current_state, &arr); - - // Update the final state of the last observation - if let Some(last_obs) = observations.last_mut() { - last_obs.final_state = current_state.clone(); - } - } - } - } - - Ok(observations) -} - -fn apply_event_to_state(state: &mut Value, event: &[Value]) { - if event.is_empty() { - return; - } - - let event_type = event[0].as_str().unwrap_or(""); - - match event_type { - "add" if event.len() >= 3 => { - let path = event[1].as_str().unwrap_or(""); - let value = event[2].clone(); - if let Ok(pointer) = json_archive::pointer::JsonPointer::new(path) { - let _ = pointer.set(state, value); - } - } - "change" if event.len() >= 3 => { - let path = event[1].as_str().unwrap_or(""); - let value = event[2].clone(); - if let Ok(pointer) = json_archive::pointer::JsonPointer::new(path) { - let _ = pointer.set(state, value); - } - } - "remove" if event.len() >= 2 => { - let path = event[1].as_str().unwrap_or(""); - if let Ok(pointer) = json_archive::pointer::JsonPointer::new(path) { - let _ = pointer.remove(state); - } - } - "move" if event.len() >= 3 => { - let path = event[1].as_str().unwrap_or(""); - let moves_value = event[2].clone(); - if let Ok(pointer) = json_archive::pointer::JsonPointer::new(path) { - if let Ok(array_value) = pointer.get(state) { - if let Some(array) = array_value.as_array() { - let mut arr = array.clone(); - if let Some(moves) = moves_value.as_array() { - for move_pair in moves { - if let Some(pair) = move_pair.as_array() { - if pair.len() == 2 { - if let (Some(from_idx), Some(to_idx)) = ( - pair[0].as_u64().map(|i| i as usize), - pair[1].as_u64().map(|i| i as usize), - ) { - if from_idx < arr.len() && to_idx <= arr.len() { - let element = arr[from_idx].clone(); - arr.insert(to_idx, element); - let remove_idx = if from_idx > to_idx { - from_idx + 1 - } else { - from_idx - }; - if remove_idx < arr.len() { - arr.remove(remove_idx); - } - } - } - } - } - } - } - let _ = pointer.set(state, Value::Array(arr)); - } - } - } - } - _ => {} - } -}