json-archive/src/archive_writer.rs

475 lines
16 KiB
Rust
Raw Normal View History

// json-archive is a tool for tracking JSON file changes over time
// Copyright (C) 2025 Peoples Grocers LLC
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published
// by the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//
// To purchase a license under different terms contact admin@peoplesgrocers.com
// To request changes, report bugs, or give user feedback contact
// marxism@peoplesgrocers.com
//
use chrono::{DateTime, Utc};
use serde_json::Value;
use std::io::Write;
use std::path::{Path, PathBuf};
use uuid::Uuid;
use crate::diagnostics::{Diagnostic, DiagnosticCode};
use crate::diff;
use crate::events::{Event, Header, Observation};
pub struct ArchiveWriter {
observation_count: usize,
filename: String,
}
impl ArchiveWriter {
pub fn write_header(&self, writer: &mut impl Write, header: &Header) -> Result<(), Diagnostic> {
let header_json = match serde_json::to_string(header) {
Ok(json) => json,
Err(e) => {
return Err(Diagnostic::fatal(
DiagnosticCode::InvalidEventJson,
format!("I couldn't serialize the header to JSON: {}", e),
)
.with_location(self.filename.clone(), 1));
}
};
if let Err(e) = writeln!(writer, "{}", header_json) {
return Err(Diagnostic::fatal(
DiagnosticCode::PathNotFound,
format!("I couldn't write to the output file: {}", e),
)
.with_location(self.filename.clone(), 1));
}
Ok(())
}
pub fn write_comment(
&self,
writer: &mut impl Write,
comment: &str,
) -> Result<(), Vec<Diagnostic>> {
if let Err(e) = writeln!(writer, "# {}", comment) {
return Err(vec![Diagnostic::fatal(
DiagnosticCode::PathNotFound,
format!("I couldn't write to the output file: {}", e),
)]);
}
Ok(())
}
pub fn write_observation(
&mut self,
writer: &mut impl Write,
observation: Observation,
) -> Result<(), Vec<Diagnostic>> {
let events = observation.to_events();
for event in events {
let event_json = match serde_json::to_string(&event) {
Ok(json) => json,
Err(e) => {
return Err(vec![Diagnostic::fatal(
DiagnosticCode::InvalidEventJson,
format!("I couldn't serialize an event to JSON: {}", e),
)]);
}
};
if let Err(e) = writeln!(writer, "{}", event_json) {
return Err(vec![Diagnostic::fatal(
DiagnosticCode::PathNotFound,
format!("I couldn't write to the output file: {}", e),
)]);
}
}
self.observation_count += 1;
Ok(())
}
pub fn write_snapshot(
&self,
writer: &mut impl Write,
object: &Value,
) -> Result<(), Vec<Diagnostic>> {
let snapshot_id = format!("snapshot-{}", Uuid::new_v4());
let snapshot = Event::Snapshot {
observation_id: snapshot_id,
timestamp: Utc::now(),
object: object.clone(),
};
let event_json = match serde_json::to_string(&snapshot) {
Ok(json) => json,
Err(e) => {
return Err(vec![Diagnostic::fatal(
DiagnosticCode::InvalidEventJson,
format!("I couldn't serialize the snapshot to JSON: {}", e),
)]);
}
};
if let Err(e) = writeln!(writer, "{}", event_json) {
return Err(vec![Diagnostic::fatal(
DiagnosticCode::PathNotFound,
format!("I couldn't write to the output file: {}", e),
)]);
}
Ok(())
}
}
/// Generate default output filename from input filename
pub fn default_output_filename<P: AsRef<Path>>(input_path: P) -> PathBuf {
let path = input_path.as_ref();
let mut output = path.to_path_buf();
// If it already ends with .json.archive, don't modify it
if let Some(filename) = path.file_name() {
if let Some(filename_str) = filename.to_str() {
if filename_str.ends_with(".json.archive") {
return output;
}
}
}
if let Some(extension) = path.extension() {
if extension == "json" {
output.set_extension("json.archive");
} else {
let new_extension = format!("{}.json.archive", extension.to_string_lossy());
output.set_extension(new_extension);
}
} else {
output.set_extension("json.archive");
}
output
}
/// Get the file modification time as a DateTime<Utc>
fn get_file_mtime<P: AsRef<Path>>(path: P) -> std::io::Result<DateTime<Utc>> {
let metadata = std::fs::metadata(path)?;
let modified = metadata.modified()?;
Ok(modified.into())
}
/// Core event writing logic that writes observations to any writer
///
/// This function is independent of file I/O and compression - it just
/// writes JSON event lines to the provided writer.
///
/// # Arguments
///
/// * `writer` - Any writer to output JSON lines to
/// * `current_state` - The current state to diff against
/// * `new_files` - Paths to JSON files containing new states
/// * `observation_count` - Current observation count for snapshot logic
/// * `snapshot_interval` - Optional interval for writing snapshots
///
/// # Returns
///
/// Returns the number of observations written
pub fn write_observation<W: Write, P: AsRef<Path>>(
writer: &mut W,
observation_count: &mut usize,
snapshot_interval: Option<usize>,
current_state: &Value,
filename: &P,
source: Option<String>,
) -> Result<Value, Diagnostic> {
// Get file modification time
let file_mtime = match get_file_mtime(filename) {
Ok(mtime) => mtime,
Err(e) => {
return Err(Diagnostic::fatal(
DiagnosticCode::PathNotFound,
format!("I couldn't get the file modification time: {}", e),
));
}
};
let content = match std::fs::read_to_string(filename) {
Ok(content) => content,
Err(e) => {
return Err(Diagnostic::fatal(
DiagnosticCode::PathNotFound,
format!("I couldn't read the input file: {}", e),
));
}
};
let state: Value = match serde_json::from_str(&content) {
Ok(state) => state,
Err(e) => {
return Err(Diagnostic::fatal(
DiagnosticCode::InvalidEventJson,
format!("I couldn't parse the input file as JSON: {}", e),
)
.with_advice("Make sure the file contains valid JSON.".to_string()));
}
};
if *observation_count == 0 {
// TODO: See if we can get rid of this clone on the Value
let header = Header::new(state.clone(), source);
let aw = ArchiveWriter {
observation_count: *observation_count,
filename: filename.as_ref().display().to_string(),
};
aw.write_header(writer, &header)?;
*observation_count += 1;
} else {
let observation_id = format!("obs-{}", Uuid::new_v4());
let diff_result: Vec<Event> = diff::diff(current_state, &state, "", &observation_id);
let mut observation = Observation::new(observation_id, file_mtime);
for event in diff_result {
observation.add_event(event);
}
*observation_count += 1;
// Write observation events
for event in observation.to_events() {
let event_json = match serde_json::to_string(&event) {
Ok(json) => json,
Err(e) => {
return Err(Diagnostic::fatal(
DiagnosticCode::InvalidEventJson,
format!("I couldn't serialize an event to JSON: {}", e),
));
}
};
if let Err(e) = writeln!(writer, "{}", event_json) {
return Err(Diagnostic::fatal(
DiagnosticCode::PathNotFound,
format!("I couldn't write to the output: {}", e),
));
}
}
// Check if we should write a snapshot
if let Some(interval) = snapshot_interval {
if *observation_count > 0 && *observation_count % interval == 0 {
let snapshot_id = format!("snapshot-{}", Uuid::new_v4());
let snapshot = Event::Snapshot {
observation_id: snapshot_id,
timestamp: file_mtime,
object: state.clone(),
};
let snapshot_json = match serde_json::to_string(&snapshot) {
Ok(json) => json,
Err(e) => {
return Err(Diagnostic::fatal(
DiagnosticCode::InvalidEventJson,
format!("I couldn't serialize the snapshot to JSON: {}", e),
));
}
};
if let Err(e) = writeln!(writer, "{}", snapshot_json) {
return Err(Diagnostic::fatal(
DiagnosticCode::PathNotFound,
format!("I couldn't write to the output: {}", e),
));
}
}
}
}
Ok(state)
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
use std::fs::File;
use std::io::{BufWriter, Write};
use tempfile::NamedTempFile;
/// Helper to create a temp file with JSON content
fn create_json_file(content: &Value) -> NamedTempFile {
let mut file = NamedTempFile::new().unwrap();
writeln!(file, "{}", serde_json::to_string(content).unwrap()).unwrap();
file
}
#[test]
fn test_single_file_creates_header_only() {
// When we have a single input file, the archive should contain just the header
// with that file's contents as the initial state
let input = create_json_file(&json!({"test": "value"}));
let output = NamedTempFile::new().unwrap();
let input_files = vec![input.path().to_path_buf()];
{
let file = File::create(output.path()).unwrap();
let mut writer = BufWriter::new(file);
let mut current_state = Value::Null;
let mut observation_count: usize = 0;
for file_path in &input_files {
current_state = write_observation(
&mut writer,
&mut observation_count,
None,
&current_state,
file_path,
Some("test-source".to_string()),
)
.unwrap();
}
writer.flush().unwrap();
}
let content = std::fs::read_to_string(output.path()).unwrap();
let lines: Vec<&str> = content.lines().filter(|l| !l.starts_with('#')).collect();
assert_eq!(lines.len(), 1);
let parsed_header: Header = serde_json::from_str(lines[0]).unwrap();
assert_eq!(parsed_header.file_type, "@peoplesgrocers/json-archive");
assert_eq!(parsed_header.version, 1);
assert_eq!(parsed_header.initial, json!({"test": "value"}));
}
#[test]
fn test_two_files_creates_header_and_observation() {
// When we have two input files, the first becomes the header's initial state
// and the second generates change events
let file1 = create_json_file(&json!({"count": 0, "name": "test"}));
let file2 = create_json_file(&json!({"count": 1, "name": "test"}));
let output = NamedTempFile::new().unwrap();
let input_files = vec![file1.path().to_path_buf(), file2.path().to_path_buf()];
{
let file = File::create(output.path()).unwrap();
let mut writer = BufWriter::new(file);
let mut current_state = Value::Null;
let mut observation_count: usize = 0;
for file_path in &input_files {
current_state = write_observation(
&mut writer,
&mut observation_count,
None,
&current_state,
file_path,
Some("test-source".to_string()),
)
.unwrap();
}
writer.flush().unwrap();
}
let content = std::fs::read_to_string(output.path()).unwrap();
let lines: Vec<&str> = content.lines().filter(|l| !l.starts_with('#')).collect();
// Should have header + observe event + at least one change event
assert!(lines.len() >= 2);
// First line should be header
let header: Header = serde_json::from_str(lines[0]).unwrap();
assert_eq!(header.file_type, "@peoplesgrocers/json-archive");
assert_eq!(header.version, 1);
assert_eq!(header.initial, json!({"count": 0, "name": "test"}));
}
#[test]
fn test_snapshot_written_at_interval() {
// When snapshot_interval is set, a snapshot should be written every N observations
let file1 = create_json_file(&json!({"count": 0}));
let file2 = create_json_file(&json!({"count": 1}));
let file3 = create_json_file(&json!({"count": 2}));
let output = NamedTempFile::new().unwrap();
let input_files = vec![
file1.path().to_path_buf(),
file2.path().to_path_buf(),
file3.path().to_path_buf(),
];
{
let file = File::create(output.path()).unwrap();
let mut writer = BufWriter::new(file);
let mut current_state = Value::Null;
let mut observation_count: usize = 0;
let snapshot_interval = Some(2);
for file_path in &input_files {
current_state = write_observation(
&mut writer,
&mut observation_count,
snapshot_interval,
&current_state,
file_path,
None,
)
.unwrap();
}
writer.flush().unwrap();
}
let content = std::fs::read_to_string(output.path()).unwrap();
let lines: Vec<&str> = content.lines().filter(|l| !l.starts_with('#')).collect();
// Look for a snapshot event
let has_snapshot = lines.iter().any(|line| {
if let Ok(event) = serde_json::from_str::<Event>(line) {
matches!(event, Event::Snapshot { .. })
} else {
false
}
});
assert!(
has_snapshot,
"Expected a snapshot event after 2 observations"
);
}
#[test]
fn test_default_output_filename() {
assert_eq!(
default_output_filename("test.json"),
PathBuf::from("test.json.archive")
);
assert_eq!(
default_output_filename("test.txt"),
PathBuf::from("test.txt.json.archive")
);
assert_eq!(
default_output_filename("test"),
PathBuf::from("test.json.archive")
);
assert_eq!(
default_output_filename("test.json.archive"),
PathBuf::from("test.json.archive")
);
}
}