fix: use ArchiveReader parsing for state subcommand

This commit is contained in:
nobody 2025-09-30 11:25:59 -07:00
commit da0fed29de
Signed by: GrocerPublishAgent
GPG key ID: 43B1C298CDDE181C

View file

@ -21,8 +21,7 @@
use crate::flags;
use chrono::{DateTime, Utc};
use json_archive::reader::{ArchiveReader, ReadMode};
use json_archive::{Diagnostic, DiagnosticCode, DiagnosticLevel};
use json_archive::{apply_add, apply_change, apply_move, apply_remove, ArchiveReader, Diagnostic, DiagnosticCode, DiagnosticLevel, Event, ReadMode};
use serde_json::Value;
use std::path::Path;
@ -56,47 +55,10 @@ pub fn run(flags: &flags::State) -> Vec<Diagnostic> {
Err(diagnostic) => return vec![diagnostic],
};
// Read the archive using the existing reader
let reader = match ArchiveReader::new(&flags.file, ReadMode::FullValidation) {
Ok(reader) => reader,
Err(e) => {
return vec![Diagnostic::new(
DiagnosticLevel::Fatal,
DiagnosticCode::PathNotFound,
format!("I couldn't open the archive file: {}", e),
)];
}
};
let result = match reader.read(&flags.file) {
Ok(result) => result,
Err(e) => {
return vec![Diagnostic::new(
DiagnosticLevel::Fatal,
DiagnosticCode::PathNotFound,
format!("I couldn't read the archive file: {}", e),
)];
}
};
// If there are fatal diagnostics, return them
if result.diagnostics.has_fatal() {
return result.diagnostics.into_diagnostics();
}
// For non-latest access methods, we need to collect observations and find the target
let target_state = match access_method {
AccessMethod::Latest => {
// The reader already gives us the final state after all observations
result.final_state
}
_ => {
// We need to collect observations and replay to the target
match find_and_replay_to_target(&flags.file, &access_method) {
Ok(state) => state,
Err(diagnostics) => return diagnostics,
}
}
// Find and replay to the target observation
let target_state = match find_and_replay_to_target(&flags.file, &access_method) {
Ok(state) => state,
Err(diagnostics) => return diagnostics,
};
// Output the JSON state
@ -189,8 +151,107 @@ fn find_and_replay_to_target(
file_path: &Path,
access_method: &AccessMethod,
) -> Result<Value, Vec<Diagnostic>> {
// We need to collect observations with full details to support all access methods
let observations = collect_observations_with_events(file_path)?;
let reader = match ArchiveReader::new(file_path, ReadMode::AppendSeek) {
Ok(r) => r,
Err(e) => {
return Err(vec![Diagnostic::new(
DiagnosticLevel::Fatal,
DiagnosticCode::PathNotFound,
format!("I couldn't open the archive file: {}", e),
)]);
}
};
let (initial_state, mut event_iter) = match reader.events(file_path) {
Ok(r) => r,
Err(e) => {
return Err(vec![Diagnostic::new(
DiagnosticLevel::Fatal,
DiagnosticCode::PathNotFound,
format!("I couldn't read the archive file: {}", e),
)]);
}
};
// Check for fatal diagnostics from initial parsing
if event_iter.diagnostics.has_fatal() {
return Err(event_iter.diagnostics.diagnostics().to_vec());
}
// Collect observations while replaying events
let mut observations = Vec::new();
let mut current_state = initial_state.clone();
let created = event_iter.header.created;
// Add initial state as observation 0
observations.push(ObservationWithEvents {
id: "initial".to_string(),
timestamp: created,
final_state: initial_state,
});
// Process events and track state at each observation
while let Some(event) = event_iter.next() {
match event {
Event::Observe { observation_id, timestamp, change_count: _ } => {
observations.push(ObservationWithEvents {
id: observation_id,
timestamp,
final_state: current_state.clone(),
});
}
Event::Add { path, value, .. } => {
let _ = apply_add(&mut current_state, &path, value);
// Update the final state of the last observation
if let Some(last_obs) = observations.last_mut() {
if last_obs.id != "initial" {
last_obs.final_state = current_state.clone();
}
}
}
Event::Change { path, new_value, .. } => {
let _ = apply_change(&mut current_state, &path, new_value);
// Update the final state of the last observation
if let Some(last_obs) = observations.last_mut() {
if last_obs.id != "initial" {
last_obs.final_state = current_state.clone();
}
}
}
Event::Remove { path, .. } => {
let _ = apply_remove(&mut current_state, &path);
// Update the final state of the last observation
if let Some(last_obs) = observations.last_mut() {
if last_obs.id != "initial" {
last_obs.final_state = current_state.clone();
}
}
}
Event::Move { path, moves, .. } => {
let _ = apply_move(&mut current_state, &path, moves);
// Update the final state of the last observation
if let Some(last_obs) = observations.last_mut() {
if last_obs.id != "initial" {
last_obs.final_state = current_state.clone();
}
}
}
Event::Snapshot { object, .. } => {
current_state = object;
// Update the final state of the last observation
if let Some(last_obs) = observations.last_mut() {
if last_obs.id != "initial" {
last_obs.final_state = current_state.clone();
}
}
}
}
}
if observations.is_empty() {
return Err(vec![Diagnostic::new(
@ -295,7 +356,6 @@ fn find_and_replay_to_target(
}
};
// Now replay events from initial state up to and including the target observation
Ok(target_observation.final_state.clone())
}
@ -305,202 +365,3 @@ struct ObservationWithEvents {
timestamp: DateTime<Utc>,
final_state: Value,
}
fn collect_observations_with_events(
file_path: &Path,
) -> Result<Vec<ObservationWithEvents>, Vec<Diagnostic>> {
use std::fs::File;
use std::io::{BufRead, BufReader};
let file = match File::open(file_path) {
Ok(f) => f,
Err(e) => {
return Err(vec![Diagnostic::new(
DiagnosticLevel::Fatal,
DiagnosticCode::PathNotFound,
format!("I couldn't open the archive file: {}", e),
)]);
}
};
let reader = BufReader::new(file);
let mut lines = reader.lines();
let mut observations = Vec::new();
// Parse header
let header_line = match lines.next() {
Some(Ok(line)) => line,
_ => {
return Err(vec![Diagnostic::new(
DiagnosticLevel::Fatal,
DiagnosticCode::EmptyFile,
"Archive file is empty or unreadable".to_string(),
)]);
}
};
let header: Value = match serde_json::from_str(&header_line) {
Ok(h) => h,
Err(e) => {
return Err(vec![Diagnostic::new(
DiagnosticLevel::Fatal,
DiagnosticCode::MissingHeader,
format!("I couldn't parse the header: {}", e),
)]);
}
};
let created_str = header["created"].as_str().unwrap_or("");
let created: DateTime<Utc> = match created_str.parse() {
Ok(dt) => dt,
Err(_) => Utc::now(),
};
let initial_state = header["initial"].clone();
// Add initial state as observation 0
observations.push(ObservationWithEvents {
id: "initial".to_string(),
timestamp: created,
final_state: initial_state.clone(),
});
let mut current_state = initial_state;
// Parse events and track state at each observation
for line in lines {
let line = match line {
Ok(l) => l,
Err(_) => continue,
};
if line.trim().starts_with('#') || line.trim().is_empty() {
continue;
}
let event: Value = match serde_json::from_str(&line) {
Ok(e) => e,
Err(_) => continue,
};
if let Some(arr) = event.as_array() {
if arr.is_empty() {
continue;
}
let event_type = arr[0].as_str().unwrap_or("");
if event_type == "observe" && arr.len() >= 4 {
let obs_id = arr[1].as_str().unwrap_or("").to_string();
let timestamp_str = arr[2].as_str().unwrap_or("");
let timestamp: DateTime<Utc> = match timestamp_str.parse() {
Ok(dt) => dt,
Err(_) => continue,
};
observations.push(ObservationWithEvents {
id: obs_id,
timestamp,
final_state: current_state.clone(), // Will be updated as events are applied
});
} else if event_type == "snapshot" && arr.len() >= 4 {
// Handle snapshot events
let obs_id = arr[1].as_str().unwrap_or("").to_string();
let timestamp_str = arr[2].as_str().unwrap_or("");
let snapshot_state = arr[3].clone();
let timestamp: DateTime<Utc> = match timestamp_str.parse() {
Ok(dt) => dt,
Err(_) => continue,
};
current_state = snapshot_state.clone();
observations.push(ObservationWithEvents {
id: obs_id,
timestamp,
final_state: snapshot_state,
});
} else {
// Apply the event to current_state
apply_event_to_state(&mut current_state, &arr);
// Update the final state of the last observation
if let Some(last_obs) = observations.last_mut() {
last_obs.final_state = current_state.clone();
}
}
}
}
Ok(observations)
}
fn apply_event_to_state(state: &mut Value, event: &[Value]) {
if event.is_empty() {
return;
}
let event_type = event[0].as_str().unwrap_or("");
match event_type {
"add" if event.len() >= 3 => {
let path = event[1].as_str().unwrap_or("");
let value = event[2].clone();
if let Ok(pointer) = json_archive::pointer::JsonPointer::new(path) {
let _ = pointer.set(state, value);
}
}
"change" if event.len() >= 3 => {
let path = event[1].as_str().unwrap_or("");
let value = event[2].clone();
if let Ok(pointer) = json_archive::pointer::JsonPointer::new(path) {
let _ = pointer.set(state, value);
}
}
"remove" if event.len() >= 2 => {
let path = event[1].as_str().unwrap_or("");
if let Ok(pointer) = json_archive::pointer::JsonPointer::new(path) {
let _ = pointer.remove(state);
}
}
"move" if event.len() >= 3 => {
let path = event[1].as_str().unwrap_or("");
let moves_value = event[2].clone();
if let Ok(pointer) = json_archive::pointer::JsonPointer::new(path) {
if let Ok(array_value) = pointer.get(state) {
if let Some(array) = array_value.as_array() {
let mut arr = array.clone();
if let Some(moves) = moves_value.as_array() {
for move_pair in moves {
if let Some(pair) = move_pair.as_array() {
if pair.len() == 2 {
if let (Some(from_idx), Some(to_idx)) = (
pair[0].as_u64().map(|i| i as usize),
pair[1].as_u64().map(|i| i as usize),
) {
if from_idx < arr.len() && to_idx <= arr.len() {
let element = arr[from_idx].clone();
arr.insert(to_idx, element);
let remove_idx = if from_idx > to_idx {
from_idx + 1
} else {
from_idx
};
if remove_idx < arr.len() {
arr.remove(remove_idx);
}
}
}
}
}
}
}
let _ = pointer.set(state, Value::Array(arr));
}
}
}
}
_ => {}
}
}