Skip to content

Commit

Permalink
improve debuggability of json ser/de errors (#1119)
Browse files Browse the repository at this point in the history
# Description

JSON serde errors are very hard to debug because they are usually
displayed with the following vague error messages:

```
Error: Invalid JSON in log record: EOF while parsing a value at line 1 column 1                                                                                                              
                                                                                                                                                                                             
Caused by:                                                                                                                                                                                   
    EOF while parsing a value at line 1 column 1  
```

This PR separates JSON ser/de errors by different scenarios as well as
adding more context to each error whenever applicable.
  • Loading branch information
houqp authored Feb 11, 2023
1 parent 8a82f79 commit 02d5d88
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 46 deletions.
21 changes: 20 additions & 1 deletion rust/src/action/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ mod parquet_read;
#[cfg(feature = "parquet2")]
pub mod parquet2_read;

use crate::{schema::*, DeltaTableMetaData};
use crate::{schema::*, DeltaTableError, DeltaTableMetaData};
use percent_encoding::percent_decode;
use serde::{Deserialize, Serialize};
use serde_json::{Map, Value};
Expand Down Expand Up @@ -332,6 +332,25 @@ impl MetaData {
}
}

impl TryFrom<DeltaTableMetaData> for MetaData {
type Error = DeltaTableError;

fn try_from(metadata: DeltaTableMetaData) -> Result<Self, Self::Error> {
let schema_string = serde_json::to_string(&metadata.schema)
.map_err(|e| DeltaTableError::SerializeSchemaJson { json_err: e })?;
Ok(Self {
id: metadata.id,
name: metadata.name,
description: metadata.description,
format: metadata.format,
schema_string,
partition_columns: metadata.partition_columns,
created_time: metadata.created_time,
configuration: metadata.configuration,
})
}
}

/// Represents a tombstone (deleted file) in the Delta log.
/// This is a top-level action in Delta log entries.
#[derive(Serialize, Deserialize, Clone, Eq, Debug, Default)]
Expand Down
100 changes: 68 additions & 32 deletions rust/src/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,28 @@ pub enum DeltaTableError {
},

/// Error returned when the log record has an invalid JSON.
#[error("Invalid JSON in log record: {}", .source)]
InvalidJson {
/// JSON error details returned when the log record has an invalid JSON.
#[from]
source: serde_json::error::Error,
#[error("Invalid JSON in log record, version={}, line=`{}`, err=`{}`", .version, .line, .json_err)]
InvalidJsonLog {
/// JSON error details returned when parsing the record JSON.
json_err: serde_json::error::Error,
/// invalid log entry content.
line: String,
/// corresponding table version for the log file.
version: DeltaDataTypeVersion,
},
/// Error returned when the log contains invalid stats JSON.
#[error("Invalid JSON in file stats: {}", .json_err)]
InvalidStatsJson {
/// JSON error details returned when parsing the stats JSON.
json_err: serde_json::error::Error,
},
/// Error returned when the log contains invalid stats JSON.
#[error("Invalid JSON in invariant expression, line=`{line}`, err=`{json_err}`")]
InvalidInvariantJson {
/// JSON error details returned when parsing the invariant expression JSON.
json_err: serde_json::error::Error,
/// Invariant expression.
line: String,
},
/// Error returned when the DeltaTable has an invalid version.
#[error("Invalid table version: {0}")]
Expand Down Expand Up @@ -211,6 +228,18 @@ pub enum DeltaTableError {
#[error("Cannot infer storage location from: {0}")]
InvalidTableLocation(String),
/// Generic Delta Table error
#[error("Log JSON serialization error: {json_err}")]
SerializeLogJson {
/// JSON serialization error
json_err: serde_json::error::Error,
},
/// Generic Delta Table error
#[error("Schema JSON serialization error: {json_err}")]
SerializeSchemaJson {
/// JSON serialization error
json_err: serde_json::error::Error,
},
/// Generic Delta Table error
#[error("Generic DeltaTable error: {0}")]
Generic(String),
/// Generic Delta Table error
Expand Down Expand Up @@ -321,32 +350,14 @@ impl TryFrom<action::MetaData> for DeltaTableMetaData {
}
}

impl TryFrom<DeltaTableMetaData> for action::MetaData {
type Error = serde_json::error::Error;

fn try_from(metadata: DeltaTableMetaData) -> Result<Self, Self::Error> {
let schema_string = serde_json::to_string(&metadata.schema)?;
Ok(Self {
id: metadata.id,
name: metadata.name,
description: metadata.description,
format: metadata.format,
schema_string,
partition_columns: metadata.partition_columns,
created_time: metadata.created_time,
configuration: metadata.configuration,
})
}
}

/// Error related to Delta log application
#[derive(thiserror::Error, Debug)]
pub enum ApplyLogError {
/// Error returned when the end of transaction log is reached.
#[error("End of transaction log")]
EndOfLog,
/// Error returned when the JSON of the log record is invalid.
#[error("Invalid JSON in log record")]
#[error("Invalid JSON found when applying log record")]
InvalidJson {
/// JSON error details returned when reading the JSON log record.
#[from]
Expand Down Expand Up @@ -612,6 +623,7 @@ impl DeltaTable {

async fn get_last_checkpoint(&self) -> Result<CheckPoint, LoadCheckpointError> {
let last_checkpoint_path = Path::from_iter(["_delta_log", "_last_checkpoint"]);
debug!("loading checkpoint from {last_checkpoint_path}");
match self.storage.get(&last_checkpoint_path).await {
Ok(data) => Ok(serde_json::from_slice(&data.bytes().await?)?),
Err(ObjectStoreError::NotFound { .. }) => {
Expand Down Expand Up @@ -711,6 +723,8 @@ impl DeltaTable {
}
};

debug!("start with latest checkpoint version: {version}");

// scan logs after checkpoint
loop {
match self
Expand Down Expand Up @@ -772,11 +786,19 @@ impl DeltaTable {
Ok(result) => result.bytes().await,
}?;

debug!("parsing commit with version {next_version}...");
let reader = BufReader::new(Cursor::new(commit_log_bytes));

let mut actions = Vec::new();
for line in reader.lines() {
let action: action::Action = serde_json::from_str(line?.as_str())?;
for re_line in reader.lines() {
let line = re_line?;
let lstr = line.as_str();
let action =
serde_json::from_str(lstr).map_err(|e| DeltaTableError::InvalidJsonLog {
json_err: e,
version: next_version,
line,
})?;
actions.push(action);
}
Ok(PeekCommit::New(next_version, actions))
Expand All @@ -788,6 +810,7 @@ impl DeltaTable {
pub async fn update(&mut self) -> Result<(), DeltaTableError> {
match self.get_last_checkpoint().await {
Ok(last_check_point) => {
debug!("update with latest checkpoint {last_check_point:?}");
if Some(last_check_point) == self.last_check_point {
self.update_incremental(None).await
} else {
Expand All @@ -796,7 +819,10 @@ impl DeltaTable {
self.update_incremental(None).await
}
}
Err(LoadCheckpointError::NotFound) => self.update_incremental(None).await,
Err(LoadCheckpointError::NotFound) => {
debug!("update without checkpoint");
self.update_incremental(None).await
}
Err(source) => Err(DeltaTableError::LoadCheckpoint { source }),
}
}
Expand All @@ -813,9 +839,15 @@ impl DeltaTable {
&mut self,
max_version: Option<DeltaDataTypeVersion>,
) -> Result<(), DeltaTableError> {
debug!(
"incremental update with version({}) and max_version({max_version:?})",
self.version(),
);

while let PeekCommit::New(new_version, actions) =
self.peek_next_commit(self.version()).await?
{
debug!("merging table state with version: {new_version}");
let s = DeltaTableState::from_actions(actions, new_version)?;
self.state
.merge(s, self.config.require_tombstones, self.config.require_files);
Expand Down Expand Up @@ -864,6 +896,7 @@ impl DeltaTable {
}
}

debug!("update incrementally from version {version}");
// 2. apply all logs starting from checkpoint
self.update_incremental(Some(version)).await?;

Expand Down Expand Up @@ -1002,10 +1035,10 @@ impl DeltaTable {

/// Returns statistics for files, in order
pub fn get_stats(&self) -> impl Iterator<Item = Result<Option<Stats>, DeltaTableError>> + '_ {
self.state
.files()
.iter()
.map(|add| add.get_stats().map_err(DeltaTableError::from))
self.state.files().iter().map(|add| {
add.get_stats()
.map_err(|e| DeltaTableError::InvalidStatsJson { json_err: e })
})
}

/// Returns partition values for files, in order
Expand Down Expand Up @@ -1378,7 +1411,10 @@ impl<'a> DeltaTransaction<'a> {
}

// Serialize all actions that are part of this log entry.
let log_entry = bytes::Bytes::from(log_entry_from_actions(&self.actions)?);
let log_entry = bytes::Bytes::from(
log_entry_from_actions(&self.actions)
.map_err(|e| DeltaTableError::SerializeLogJson { json_err: e })?,
);

// Write delta log entry as temporary file to storage. For the actual commit,
// the temporary file is moved (atomic rename) to the delta log folder within `commit` function.
Expand Down
16 changes: 9 additions & 7 deletions rust/src/operations/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@ enum TransactionError {
VersionAlreadyExists(DeltaDataTypeVersion),

/// Error returned when reading the delta log object failed.
#[error("Error serializing commit: {}", .source)]
Serialize {
/// Storage error details when reading the delta log object failed.
#[from]
source: serde_json::Error,
#[error("Error serializing commit log to json: {json_err}")]
SerializeLogJson {
/// Commit log record JSON serialization error.
json_err: serde_json::error::Error,
},

/// Error returned when reading the delta log object failed.
Expand All @@ -38,7 +37,9 @@ impl From<TransactionError> for DeltaTableError {
TransactionError::VersionAlreadyExists(version) => {
DeltaTableError::VersionAlreadyExists(version)
}
TransactionError::Serialize { source } => DeltaTableError::InvalidJson { source },
TransactionError::SerializeLogJson { json_err } => {
DeltaTableError::SerializeLogJson { json_err }
}
TransactionError::ObjectStore { source } => DeltaTableError::ObjectStore { source },
}
}
Expand All @@ -54,7 +55,8 @@ fn commit_uri_from_version(version: DeltaDataTypeVersion) -> Path {
fn log_entry_from_actions(actions: &[Action]) -> Result<String, TransactionError> {
let mut jsons = Vec::<String>::new();
for action in actions {
let json = serde_json::to_string(action)?;
let json = serde_json::to_string(action)
.map_err(|e| TransactionError::SerializeLogJson { json_err: e })?;
jsons.push(json);
}
Ok(jsons.join("\n"))
Expand Down
17 changes: 12 additions & 5 deletions rust/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ use serde_json::Value;
use std::borrow::Cow;
use std::collections::HashMap;

use crate::DeltaTableError;

/// Type alias for a string expected to match a GUID/UUID format
pub type Guid = String;
/// Type alias for i64/Delta long
Expand Down Expand Up @@ -64,25 +66,25 @@ impl SchemaTypeStruct {
}

/// Returns an immutable reference of a specific `Field` instance selected by name.
pub fn get_field_with_name(&self, name: &str) -> Result<&SchemaField, crate::DeltaTableError> {
pub fn get_field_with_name(&self, name: &str) -> Result<&SchemaField, DeltaTableError> {
Ok(&self.fields[self.index_of(name)?])
}

/// Find the index of the column with the given name.
pub fn index_of(&self, name: &str) -> Result<usize, crate::DeltaTableError> {
pub fn index_of(&self, name: &str) -> Result<usize, DeltaTableError> {
for i in 0..self.fields.len() {
if self.fields[i].get_name() == name {
return Ok(i);
}
}
let valid_fields: Vec<String> = self.fields.iter().map(|f| f.name.clone()).collect();
Err(crate::DeltaTableError::Generic(format!(
Err(DeltaTableError::Generic(format!(
"Unable to get field named \"{name}\". Valid fields: {valid_fields:?}"
)))
}

/// Get all invariants in the schemas
pub fn get_invariants(&self) -> Result<Vec<Invariant>, crate::DeltaTableError> {
pub fn get_invariants(&self) -> Result<Vec<Invariant>, DeltaTableError> {
let mut remaining_fields: Vec<(String, SchemaField)> = self
.get_fields()
.iter()
Expand Down Expand Up @@ -135,7 +137,12 @@ impl SchemaTypeStruct {
}
// JSON format: {"expression": {"expression": "<SQL STRING>"} }
if let Some(Value::String(invariant_json)) = field.metadata.get("delta.invariants") {
let json: Value = serde_json::from_str(invariant_json)?;
let json: Value = serde_json::from_str(invariant_json).map_err(|e| {
DeltaTableError::InvalidInvariantJson {
json_err: e,
line: invariant_json.to_string(),
}
})?;
if let Value::Object(json) = json {
if let Some(Value::Object(expr1)) = json.get("expression") {
if let Some(Value::String(sql)) = expr1.get("expression") {
Expand Down
2 changes: 1 addition & 1 deletion rust/src/table_state_arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ impl DeltaTableState {
.iter()
.map(|f| {
f.get_stats()
.map_err(|err| DeltaTableError::InvalidJson { source: err })
.map_err(|err| DeltaTableError::InvalidStatsJson { json_err: err })
})
.collect::<Result<_, DeltaTableError>>()?;

Expand Down

0 comments on commit 02d5d88

Please sign in to comment.