feat: add reading compressed archive files

This commit is contained in:
nobody 2025-09-30 11:19:13 -07:00
commit 1f0f41a96c
Signed by: GrocerPublishAgent
GPG key ID: 43B1C298CDDE181C
8 changed files with 552 additions and 272 deletions

View file

@ -22,7 +22,7 @@
use serde_json::Value;
use std::collections::HashSet;
use std::fs::File;
use std::io::{BufRead, BufReader};
use std::io::{BufRead, BufReader, Read};
use std::path::Path;
use crate::diagnostics::{Diagnostic, DiagnosticCode, DiagnosticCollector, DiagnosticLevel};
@ -30,12 +30,29 @@ use crate::event_deserialize::EventDeserializer;
use crate::events::{Event, Header};
use crate::pointer::JsonPointer;
#[cfg(feature = "compression")]
use flate2::read::{DeflateDecoder, GzDecoder, ZlibDecoder};
#[cfg(feature = "compression")]
use brotli::Decompressor;
#[cfg(feature = "compression")]
use zstd::stream::read::Decoder as ZstdDecoder;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ReadMode {
FullValidation,
AppendSeek,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum CompressionFormat {
Gzip,
Deflate,
Zlib,
Brotli,
Zstd,
None,
}
pub struct ArchiveReader {
mode: ReadMode,
filename: String,
@ -49,45 +66,210 @@ pub struct ReadResult {
pub observation_count: usize,
}
pub struct EventIterator {
reader: Box<dyn BufRead>,
pub diagnostics: DiagnosticCollector,
pub header: Header,
filename: String,
line_number: usize,
}
impl Iterator for EventIterator {
type Item = Event;
fn next(&mut self) -> Option<Self::Item> {
let mut line = String::new();
loop {
line.clear();
self.line_number += 1;
match self.reader.read_line(&mut line) {
Ok(0) => return None, // EOF
Ok(_) => {
let trimmed = line.trim();
// Skip comments and blank lines
if trimmed.starts_with('#') || trimmed.is_empty() {
continue;
}
// Try to parse as event
let event_deserializer = match serde_json::from_str::<EventDeserializer>(&line) {
Ok(d) => d,
Err(e) => {
self.diagnostics.add(
Diagnostic::new(
DiagnosticLevel::Fatal,
DiagnosticCode::InvalidEventJson,
format!("I couldn't parse this line as JSON: {}", e),
)
.with_location(self.filename.clone(), self.line_number)
.with_snippet(format!("{} | {}", self.line_number, line.trim()))
.with_advice(
"Each line after the header must be either:\n\
- A comment starting with #\n\
- A valid JSON array representing an event\n\n\
Check for missing commas, quotes, or brackets."
.to_string(),
),
);
continue;
}
};
// Add any diagnostics from deserialization
for diagnostic in event_deserializer.diagnostics {
self.diagnostics.add(
diagnostic
.with_location(self.filename.clone(), self.line_number)
.with_snippet(format!("{} | {}", self.line_number, line.trim()))
);
}
// Return event if we have one
if let Some(event) = event_deserializer.event {
return Some(event);
}
// If no event but had diagnostics, continue to next line
continue;
}
Err(e) if e.kind() == std::io::ErrorKind::InvalidData => {
self.diagnostics.add(
Diagnostic::new(
DiagnosticLevel::Fatal,
DiagnosticCode::InvalidUtf8,
format!("I found invalid UTF-8 bytes at line {}.", self.line_number)
)
.with_location(self.filename.clone(), self.line_number)
.with_advice(
"The JSON Archive format requires UTF-8 encoding. Make sure the file \
was saved with UTF-8 encoding, not Latin-1, Windows-1252, or another encoding."
.to_string()
)
);
return None;
}
Err(_) => return None,
}
}
}
}
fn detect_compression_format(path: &Path, bytes: &[u8]) -> CompressionFormat {
if bytes.len() < 4 {
return CompressionFormat::None;
}
// Gzip magic number: 0x1f 0x8b
if bytes[0] == 0x1f && bytes[1] == 0x8b {
return CompressionFormat::Gzip;
}
// Zlib magic number: 0x78 followed by 0x01, 0x5e, 0x9c, or 0xda
if bytes[0] == 0x78 && (bytes[1] == 0x01 || bytes[1] == 0x5e || bytes[1] == 0x9c || bytes[1] == 0xda) {
return CompressionFormat::Zlib;
}
// Zstd magic number: 0x28 0xb5 0x2f 0xfd
if bytes.len() >= 4 && bytes[0] == 0x28 && bytes[1] == 0xb5 && bytes[2] == 0x2f && bytes[3] == 0xfd {
return CompressionFormat::Zstd;
}
// Check file extension for brotli (no reliable magic number) and deflate
if let Some(ext) = path.extension() {
let ext_str = ext.to_string_lossy();
if ext_str == "br" || path.to_string_lossy().contains(".br.") {
return CompressionFormat::Brotli;
}
if ext_str == "deflate" {
return CompressionFormat::Deflate;
}
}
CompressionFormat::None
}
impl ArchiveReader {
pub fn new<P: AsRef<Path>>(path: P, mode: ReadMode) -> std::io::Result<Self> {
let filename = path.as_ref().display().to_string();
Ok(Self { mode, filename })
}
pub fn read<P: AsRef<Path>>(&self, path: P) -> std::io::Result<ReadResult> {
let file = File::open(path)?;
let reader = BufReader::new(file);
pub fn events<P: AsRef<Path>>(&self, path: P) -> std::io::Result<(Value, EventIterator)> {
let path = path.as_ref();
let mut file = File::open(path)?;
// Detect compression format
let mut magic_bytes = [0u8; 4];
let bytes_read = file.read(&mut magic_bytes)?;
let compression_format = detect_compression_format(path, &magic_bytes[..bytes_read]);
// Re-open file to reset position
file = File::open(path)?;
let mut diagnostics = DiagnosticCollector::new();
let mut lines_iter = reader.lines().enumerate();
// Check if compression is detected but not supported
#[cfg(not(feature = "compression"))]
if compression_format != CompressionFormat::None {
let format_name = match compression_format {
CompressionFormat::Gzip => "gzip",
CompressionFormat::Deflate => "deflate",
CompressionFormat::Zlib => "zlib",
CompressionFormat::Brotli => "brotli",
CompressionFormat::Zstd => "zstd",
CompressionFormat::None => unreachable!(),
};
let (header_line_number, header_line) = match lines_iter.next() {
Some((idx, Ok(line))) => (idx + 1, line),
Some((idx, Err(e))) if e.kind() == std::io::ErrorKind::InvalidData => {
let line_number = idx + 1;
diagnostics.add(
Diagnostic::new(
DiagnosticLevel::Fatal,
DiagnosticCode::InvalidUtf8,
format!("I found invalid UTF-8 bytes at line {}.", line_number)
)
.with_location(self.filename.clone(), line_number)
.with_advice(
"The JSON Archive format requires UTF-8 encoding. Make sure the file \
was saved with UTF-8 encoding, not Latin-1, Windows-1252, or another encoding."
.to_string()
)
);
return Ok(ReadResult {
header: Header::new(Value::Null, None),
final_state: Value::Null,
diagnostics,
observation_count: 0,
});
}
Some((_, Err(e))) => return Err(e),
None => {
diagnostics.add(
Diagnostic::new(
DiagnosticLevel::Fatal,
DiagnosticCode::UnsupportedVersion,
format!("I detected a {}-compressed archive, but this build doesn't support compression.", format_name)
)
.with_location(self.filename.clone(), 1)
.with_advice(
"This binary was built without compression support to reduce binary size and dependencies.\n\
You have two options:\n\
1. Install the version with compression support: cargo install json-archive --features compression\n\
2. Manually decompress the file first, then use this tool on the uncompressed archive"
.to_string()
)
);
// Return dummy values with fatal diagnostic
let iterator = EventIterator {
reader: Box::new(BufReader::new(std::io::empty())),
diagnostics,
header: Header::new(Value::Null, None),
filename: self.filename.clone(),
line_number: 1,
};
return Ok((Value::Null, iterator));
}
// Create appropriate reader based on compression format
#[cfg(feature = "compression")]
let reader: Box<dyn BufRead> = match compression_format {
CompressionFormat::Gzip => Box::new(BufReader::new(GzDecoder::new(file))),
CompressionFormat::Deflate => Box::new(BufReader::new(DeflateDecoder::new(file))),
CompressionFormat::Zlib => Box::new(BufReader::new(ZlibDecoder::new(file))),
CompressionFormat::Brotli => Box::new(BufReader::new(Decompressor::new(file, 4096))),
CompressionFormat::Zstd => Box::new(BufReader::new(ZstdDecoder::new(file)?)),
CompressionFormat::None => Box::new(BufReader::new(file)),
};
#[cfg(not(feature = "compression"))]
let reader: Box<dyn BufRead> = Box::new(BufReader::new(file));
let mut reader = reader;
let mut header_line = String::new();
let _bytes_read = match reader.read_line(&mut header_line) {
Ok(0) => {
// Empty file
diagnostics.add(
Diagnostic::new(
DiagnosticLevel::Fatal,
@ -96,124 +278,101 @@ impl ArchiveReader {
)
.with_location(self.filename.clone(), 1)
.with_advice(
"A valid JSON Archive file must start with a header object containing:\n\
- type: \"@peoplesgrocers/json-archive\"\n\
- version: 1\n\
- created: an ISO-8601 timestamp\n\
- initial: the initial state of the object"
"See the file format specification for header structure."
.to_string(),
),
);
return Ok(ReadResult {
header: Header::new(Value::Null, None),
final_state: Value::Null,
let iterator = EventIterator {
reader,
diagnostics,
observation_count: 0,
});
header: Header::new(Value::Null, None),
filename: self.filename.clone(),
line_number: 1,
};
return Ok((Value::Null, iterator));
}
Ok(n) => n,
Err(e) if e.kind() == std::io::ErrorKind::InvalidData => {
// UTF-8 error
diagnostics.add(
Diagnostic::new(
DiagnosticLevel::Fatal,
DiagnosticCode::InvalidUtf8,
"I found invalid UTF-8 bytes at line 1.".to_string()
)
.with_location(self.filename.clone(), 1)
.with_advice(
"The JSON Archive format requires UTF-8 encoding. Make sure the file \
was saved with UTF-8 encoding, not Latin-1, Windows-1252, or another encoding."
.to_string()
)
);
let iterator = EventIterator {
reader,
diagnostics,
header: Header::new(Value::Null, None),
filename: self.filename.clone(),
line_number: 1,
};
return Ok((Value::Null, iterator));
}
Err(e) => return Err(e),
};
let header = match self.parse_header(&header_line, header_line_number, &mut diagnostics) {
let header = match self.parse_header(&header_line, 1, &mut diagnostics) {
Some(h) => h,
None => {
return Ok(ReadResult {
header: Header::new(Value::Null, None),
final_state: Value::Null,
let iterator = EventIterator {
reader,
diagnostics,
observation_count: 0,
});
header: Header::new(Value::Null, None),
filename: self.filename.clone(),
line_number: 1,
};
return Ok((Value::Null, iterator));
}
};
let mut state = header.initial.clone();
let iterator = EventIterator {
reader,
diagnostics,
header: header.clone(),
filename: self.filename.clone(),
line_number: 1,
};
Ok((header.initial, iterator))
}
pub fn read<P: AsRef<Path>>(&self, path: P) -> std::io::Result<ReadResult> {
let (initial_value, mut event_iter) = self.events(&path)?;
// Check for early fatal diagnostics (like compression not supported)
if event_iter.diagnostics.has_fatal() {
return Ok(ReadResult {
header: Header::new(Value::Null, None),
final_state: Value::Null,
diagnostics: event_iter.diagnostics,
observation_count: 0,
});
}
let header = Header::new(initial_value.clone(), None);
let mut state = initial_value;
let mut seen_observations: HashSet<String> = HashSet::new();
let mut current_observation: Option<(String, usize, usize)> = None;
let mut events_in_observation = 0;
let mut observation_count = 0;
// This manual dispatcher mirrors what serde would expand but stays explicit so we can
// attach Elm-style diagnostics with precise spans and guidance for each failure case.
for (idx, line_result) in lines_iter {
let line_number = idx + 1;
let line = match line_result {
Ok(line) => line,
Err(e) if e.kind() == std::io::ErrorKind::InvalidData => {
diagnostics.add(
Diagnostic::new(
DiagnosticLevel::Fatal,
DiagnosticCode::InvalidUtf8,
format!("I found invalid UTF-8 bytes at line {}.", line_number)
)
.with_location(self.filename.clone(), line_number)
.with_advice(
"The JSON Archive format requires UTF-8 encoding. Make sure the file \
was saved with UTF-8 encoding, not Latin-1, Windows-1252, or another encoding."
.to_string()
)
);
return Ok(ReadResult {
header: Header::new(Value::Null, None),
final_state: Value::Null,
diagnostics,
observation_count: 0,
});
}
Err(e) => return Err(e),
};
let trimmed = line.trim();
if trimmed.starts_with('#') || trimmed.is_empty() {
continue;
}
let event_deserializer = match serde_json::from_str::<EventDeserializer>(&line) {
Ok(d) => d,
Err(e) => {
diagnostics.add(
Diagnostic::new(
DiagnosticLevel::Fatal,
DiagnosticCode::InvalidEventJson,
format!("I couldn't parse this line as JSON: {}", e),
)
.with_location(self.filename.clone(), line_number)
.with_snippet(format!("{} | {}", line_number, line))
.with_advice(
"Each line after the header must be either:\n\
- A comment starting with #\n\
- A valid JSON array representing an event\n\n\
Check for missing commas, quotes, or brackets."
.to_string(),
),
);
continue;
}
};
// Add any diagnostics from deserialization with location info
for diagnostic in event_deserializer.diagnostics {
diagnostics.add(
diagnostic
.with_location(self.filename.clone(), line_number)
.with_snippet(format!("{} | {}", line_number, line))
);
}
// Continue processing to collect additional errors before failing.
// Even though this function must now return an error, we continue to help
// the user identify all issues in the file at once rather than one at a time.
let event = match event_deserializer.event {
Some(e) => e,
None => {
assert!(diagnostics.has_fatal(), "Expected a fatal diagnostic when deserialization fails");
continue
},
};
// Process events from iterator
while let Some(event) = event_iter.next() {
let line_number = event_iter.line_number;
match event {
Event::Observe { observation_id, timestamp: _, change_count } => {
if let Some((_obs_id, obs_line, expected_count)) = &current_observation {
if events_in_observation != *expected_count {
diagnostics.add(
event_iter.diagnostics.add(
Diagnostic::new(
DiagnosticLevel::Warning,
DiagnosticCode::ChangeCountMismatch,
@ -233,7 +392,7 @@ impl ArchiveReader {
}
if seen_observations.contains(&observation_id) {
diagnostics.add(
event_iter.diagnostics.add(
Diagnostic::new(
DiagnosticLevel::Warning,
DiagnosticCode::DuplicateObservationId,
@ -260,14 +419,13 @@ impl ArchiveReader {
if self.mode == ReadMode::FullValidation
&& !seen_observations.contains(&observation_id)
{
diagnostics.add(
event_iter.diagnostics.add(
Diagnostic::new(
DiagnosticLevel::Fatal,
DiagnosticCode::NonExistentObservationId,
format!("I found a reference to observation '{}', but I haven't seen an observe event with that ID yet.", observation_id)
)
.with_location(self.filename.clone(), line_number)
.with_snippet(format!("{} | {}", line_number, line))
.with_advice(
"Each add/change/remove/move event must reference an observation ID from a preceding observe event."
.to_string()
@ -277,7 +435,7 @@ impl ArchiveReader {
}
if let Err(diag) = apply_add(&mut state, &path, value) {
diagnostics.add(diag.with_location(self.filename.clone(), line_number));
event_iter.diagnostics.add(diag.with_location(self.filename.clone(), line_number));
continue;
}
}
@ -288,7 +446,7 @@ impl ArchiveReader {
if self.mode == ReadMode::FullValidation
&& !seen_observations.contains(&observation_id)
{
diagnostics.add(
event_iter.diagnostics.add(
Diagnostic::new(
DiagnosticLevel::Fatal,
DiagnosticCode::NonExistentObservationId,
@ -300,7 +458,7 @@ impl ArchiveReader {
}
if let Err(diag) = apply_change(&mut state, &path, new_value) {
diagnostics.add(diag.with_location(self.filename.clone(), line_number));
event_iter.diagnostics.add(diag.with_location(self.filename.clone(), line_number));
continue;
}
}
@ -311,7 +469,7 @@ impl ArchiveReader {
if self.mode == ReadMode::FullValidation
&& !seen_observations.contains(&observation_id)
{
diagnostics.add(
event_iter.diagnostics.add(
Diagnostic::new(
DiagnosticLevel::Fatal,
DiagnosticCode::NonExistentObservationId,
@ -323,7 +481,7 @@ impl ArchiveReader {
}
if let Err(diag) = apply_remove(&mut state, &path) {
diagnostics.add(diag.with_location(self.filename.clone(), line_number));
event_iter.diagnostics.add(diag.with_location(self.filename.clone(), line_number));
continue;
}
}
@ -334,7 +492,7 @@ impl ArchiveReader {
if self.mode == ReadMode::FullValidation
&& !seen_observations.contains(&observation_id)
{
diagnostics.add(
event_iter.diagnostics.add(
Diagnostic::new(
DiagnosticLevel::Fatal,
DiagnosticCode::NonExistentObservationId,
@ -346,14 +504,14 @@ impl ArchiveReader {
}
if let Err(diag) = apply_move(&mut state, &path, moves) {
diagnostics.add(diag.with_location(self.filename.clone(), line_number));
event_iter.diagnostics.add(diag.with_location(self.filename.clone(), line_number));
continue;
}
}
Event::Snapshot { observation_id: _, timestamp: _, object } => {
if self.mode == ReadMode::FullValidation && state != object {
diagnostics.add(
event_iter.diagnostics.add(
Diagnostic::new(
DiagnosticLevel::Fatal,
DiagnosticCode::SnapshotStateMismatch,
@ -376,7 +534,7 @@ impl ArchiveReader {
if let Some((_obs_id, obs_line, expected_count)) = &current_observation {
if events_in_observation != *expected_count {
diagnostics.add(
event_iter.diagnostics.add(
Diagnostic::new(
DiagnosticLevel::Warning,
DiagnosticCode::ChangeCountMismatch,
@ -393,10 +551,11 @@ impl ArchiveReader {
Ok(ReadResult {
header,
final_state: state,
diagnostics,
diagnostics: event_iter.diagnostics,
observation_count,
})
}
fn parse_header(
&self,
line: &str,
@ -470,7 +629,7 @@ impl ArchiveReader {
}
fn apply_add(state: &mut Value, path: &str, value: Value) -> Result<(), Diagnostic> {
pub fn apply_add(state: &mut Value, path: &str, value: Value) -> Result<(), Diagnostic> {
let pointer = JsonPointer::new(path).map_err(|diag| {
diag.with_advice(
"JSON Pointer paths must start with '/' and use '/' to separate segments.\n\
@ -488,19 +647,19 @@ fn apply_add(state: &mut Value, path: &str, value: Value) -> Result<(), Diagnost
})
}
fn apply_change(state: &mut Value, path: &str, new_value: Value) -> Result<(), Diagnostic> {
pub fn apply_change(state: &mut Value, path: &str, new_value: Value) -> Result<(), Diagnostic> {
let pointer = JsonPointer::new(path)?;
pointer.set(state, new_value)?;
Ok(())
}
fn apply_remove(state: &mut Value, path: &str) -> Result<(), Diagnostic> {
pub fn apply_remove(state: &mut Value, path: &str) -> Result<(), Diagnostic> {
let pointer = JsonPointer::new(path)?;
pointer.remove(state)?;
Ok(())
}
fn apply_move(
pub fn apply_move(
state: &mut Value,
path: &str,
moves: Vec<(usize, usize)>,