Skip to content

Commit

Permalink
fix(rust, python): properly handle json with unclosed strings (pola-r…
Browse files Browse the repository at this point in the history
  • Loading branch information
universalmind303 authored and zundertj committed Jan 7, 2023
1 parent fa62811 commit 8f1c0c4
Show file tree
Hide file tree
Showing 5 changed files with 226 additions and 134 deletions.
6 changes: 3 additions & 3 deletions polars/polars-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ description = "IO related logic for the Polars DataFrame library"

[features]
# support for arrows json parsing
json = ["arrow/io_json", "simd-json", "memmap", "lexical", "lexical-core", "csv-core"]
json = ["arrow/io_json", "simd-json", "memmap", "lexical", "lexical-core", "csv-core", "serde_json"]
# support for arrows ipc file parsing
ipc = ["arrow/io_ipc", "arrow/io_ipc_compression", "memmap"]
# support for arrows streaming ipc file parsing
Expand Down Expand Up @@ -56,8 +56,8 @@ polars-utils = { version = "0.25.1", path = "../polars-utils" }
rayon.workspace = true
regex = "1.6"
serde = { version = "1", features = ["derive"], optional = true }
serde_json = { version = "1", optional = true, default-features = false, features = ["alloc"] }
simd-json = { version = "0.6.0", optional = true, features = ["allow-non-simd", "known-key"] }
serde_json = { version = "1", optional = true, default-features = false, features = ["alloc", "raw_value"] }
simd-json = { version = "0.7.0", optional = true, features = ["allow-non-simd", "known-key"] }
simdutf8 = "0.1"

[dev-dependencies]
Expand Down
128 changes: 96 additions & 32 deletions polars/polars-io/src/ndjson_core/ndjson.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,19 @@ use std::path::PathBuf;

pub use arrow::array::StructArray;
pub use arrow::io::ndjson as arrow_ndjson;
use num::traits::Pow;
use polars_core::prelude::*;
use polars_core::utils::accumulate_dataframes_vertical;
use polars_core::POOL;
use rayon::prelude::*;

use crate::csv::parser::*;
use crate::csv::utils::*;
use crate::mmap::{MmapBytesReader, ReaderBytes};
use crate::ndjson_core::buffer::*;
use crate::prelude::*;
const QUOTE_CHAR: u8 = b'"';
const SEP: u8 = b',';
const NEWLINE: u8 = b'\n';
const RETURN: u8 = b'\r';
const CLOSING_BRACKET: u8 = b'}';

#[must_use]
pub struct JsonLineReader<'a, R>
Expand Down Expand Up @@ -180,14 +179,7 @@ impl<'a> CoreJsonReader<'a> {
let mut bytes = bytes;
let mut total_rows = 128;

if let Some((mean, std)) = get_line_stats(
bytes,
self.sample_size,
NEWLINE,
self.schema.len(),
SEP,
None,
) {
if let Some((mean, std)) = get_line_stats_json(bytes, self.sample_size) {
let line_length_upper_bound = mean + 1.1 * std;

total_rows = (bytes.len() as f32 / (mean - 0.01 * std)) as usize;
Expand All @@ -197,7 +189,7 @@ impl<'a> CoreJsonReader<'a> {
let n_bytes = (line_length_upper_bound * (n_rows as f32)) as usize;

if n_bytes < bytes.len() {
if let Some(pos) = next_line_position_naive(&bytes[n_bytes..], NEWLINE) {
if let Some(pos) = next_line_position_naive_json(&bytes[n_bytes..]) {
bytes = &bytes[..n_bytes + pos]
}
}
Expand All @@ -217,16 +209,7 @@ impl<'a> CoreJsonReader<'a> {
std::cmp::min(rows_per_thread, max_proxy)
};

let expected_fields = &self.schema.len();

let file_chunks = get_file_chunks(
bytes,
n_threads,
*expected_fields + 1,
SEP,
Some(QUOTE_CHAR),
NEWLINE,
);
let file_chunks = get_file_chunks_json(bytes, n_threads);
let dfs = POOL.install(|| {
file_chunks
.into_par_iter()
Expand Down Expand Up @@ -324,16 +307,14 @@ fn parse_lines<'a>(

let total_bytes = bytes.len();
let mut offset = 0;
for line in SplitLines::new(bytes, QUOTE_CHAR, NEWLINE) {
offset += 1; // the newline
offset += parse_impl(line, buffers, &mut buf)?;
}

// if file doesn't end with a newline, parse the last line
if offset < total_bytes {
let rem = &bytes[offset..];
offset += rem.len();
parse_impl(rem, buffers, &mut buf)?;
// The `RawValue` is a pointer to the original JSON string and does not perform any deserialization.
// It is used to properly iterate over the lines without re-implementing the splitlines logic when this does the same thing.
let mut iter =
serde_json::Deserializer::from_slice(bytes).into_iter::<Box<serde_json::value::RawValue>>();
while let Some(Ok(value)) = iter.next() {
let bytes = value.get().as_bytes();
offset += bytes.len();
parse_impl(bytes, buffers, &mut buf)?;
}

if offset != total_bytes {
Expand All @@ -344,3 +325,86 @@ fn parse_lines<'a>(

Ok(())
}

/// Find the nearest next line position.
/// Does not check for new line characters embedded in String fields.
/// This just looks for `}\n`
pub(crate) fn next_line_position_naive_json(input: &[u8]) -> Option<usize> {
let pos = memchr::memchr(NEWLINE, input)?;
if pos == 0 {
return Some(1);
}

let is_closing_bracket = input.get(pos - 1) == Some(&CLOSING_BRACKET);
if is_closing_bracket {
Some(pos + 1)
} else {
None
}
}

/// Get the mean and standard deviation of length of lines in bytes
pub(crate) fn get_line_stats_json(bytes: &[u8], n_lines: usize) -> Option<(f32, f32)> {
let mut lengths = Vec::with_capacity(n_lines);

let mut bytes_trunc;
let n_lines_per_iter = n_lines / 2;

let mut n_read = 0;

// sample from start and 75% in the file
for offset in [0, (bytes.len() as f32 * 0.75) as usize] {
bytes_trunc = &bytes[offset..];
let pos = next_line_position_naive_json(bytes_trunc)?;
bytes_trunc = &bytes_trunc[pos + 1..];

for _ in offset..(offset + n_lines_per_iter) {
let pos = next_line_position_naive_json(bytes_trunc);
if let Some(pos) = pos {
lengths.push(pos);
let next_bytes = &bytes_trunc[pos..];
if next_bytes.is_empty() {
return None;
}
bytes_trunc = next_bytes;
n_read += pos;
} else {
break;
}
}
}

let n_samples = lengths.len();
let mean = (n_read as f32) / (n_samples as f32);
let mut std = 0.0;
for &len in lengths.iter() {
std += (len as f32 - mean).pow(2.0)
}
std = (std / n_samples as f32).sqrt();
Some((mean, std))
}

pub(crate) fn get_file_chunks_json(bytes: &[u8], n_threads: usize) -> Vec<(usize, usize)> {
let mut last_pos = 0;
let total_len = bytes.len();
let chunk_size = total_len / n_threads;
let mut offsets = Vec::with_capacity(n_threads);
for _ in 0..n_threads {
let search_pos = last_pos + chunk_size;

if search_pos >= bytes.len() {
break;
}

let end_pos = match next_line_position_naive_json(&bytes[search_pos..]) {
Some(pos) => search_pos + pos,
None => {
break;
}
};
offsets.push((last_pos, end_pos));
last_pos = end_pos;
}
offsets.push((last_pos, total_len));
offsets
}
25 changes: 25 additions & 0 deletions polars/tests/it/io/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,31 @@ fn read_json_with_whitespace() {
assert_eq!("d", df.get_columns()[3].name());
assert_eq!((12, 4), df.shape());
}
#[test]
fn read_json_with_escapes() {
let escaped_json = r#"{"id": 1, "text": "\""}
{"text": "\n{\n\t\t\"inner\": \"json\n}\n", "id": 10}
{"id": 0, "text":"\"","date":"2013-08-03 15:17:23"}
{"id": 1, "text":"\"123\"","date":"2009-05-19 21:07:53"}
{"id": 2, "text":"/....","date":"2009-05-19 21:07:53"}
{"id": 3, "text":"\n\n..","date":"2"}
{"id": 4, "text":"\"'/\n...","date":"2009-05-19 21:07:53"}
{"id": 5, "text":".h\"h1hh\\21hi1e2emm...","date":"2009-05-19 21:07:53"}
{"id": 6, "text":"xxxx....","date":"2009-05-19 21:07:53"}
{"id": 7, "text":".\"quoted text\".","date":"2009-05-19 21:07:53"}
"#;
let file = Cursor::new(escaped_json);
let df = JsonLineReader::new(file)
.infer_schema_len(Some(6))
.finish()
.unwrap();
assert_eq!("id", df.get_columns()[0].name());
assert_eq!(AnyValue::Utf8("\""), df.column("text").unwrap().get(0));
assert_eq!("text", df.get_columns()[1].name());
assert_eq!((10, 3), df.shape());
}

#[test]
fn read_unordered_json() {
let unordered_json = r#"{"a":1, "b":2.0, "c":false, "d":"4"}
Expand Down
Loading

0 comments on commit 8f1c0c4

Please sign in to comment.