Skip to content

Commit

Permalink
Support ndjson
Browse files Browse the repository at this point in the history
Signed-off-by: Andrew Stein <steinlink@gmail.com>
  • Loading branch information
texodus committed Dec 9, 2024
1 parent da17600 commit cbadc6d
Show file tree
Hide file tree
Showing 24 changed files with 955 additions and 45 deletions.
51 changes: 51 additions & 0 deletions cpp/perspective/src/cpp/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -852,6 +852,7 @@ needs_poll(const proto::Request::ClientReqCase proto_case) {
case ReqCase::kViewToColumnsStringReq:
case ReqCase::kViewToCsvReq:
case ReqCase::kViewToRowsStringReq:
case ReqCase::kViewToNdjsonStringReq:
case ReqCase::kViewToArrowReq:
case ReqCase::kViewSchemaReq:
case ReqCase::kViewGetMinMaxReq:
Expand Down Expand Up @@ -910,6 +911,7 @@ entity_type_is_table(const proto::Request::ClientReqCase proto_case) {
case ReqCase::kViewDimensionsReq:
case ReqCase::kViewToColumnsStringReq:
case ReqCase::kViewToCsvReq:
case ReqCase::kViewToNdjsonStringReq:
case ReqCase::kViewToRowsStringReq:
case ReqCase::kViewToArrowReq:
case ReqCase::kViewSchemaReq:
Expand Down Expand Up @@ -1281,6 +1283,12 @@ ProtoServer::_handle_request(std::uint32_t client_id, const Request& req) {
Table::from_rows(index, r.data().from_rows(), limit);
break;
}
case proto::MakeTableData::kFromNdjson: {
table = Table::from_ndjson(
index, r.data().from_ndjson(), limit
);
break;
}
case proto::MakeTableData::kFromSchema: {
std::vector<std::string> columns;
std::vector<t_dtype> types;
Expand Down Expand Up @@ -1489,6 +1497,10 @@ ProtoServer::_handle_request(std::uint32_t client_id, const Request& req) {
table->update_cols(r.data().from_cols(), r.port_id());
break;
}
case proto::MakeTableData::kFromNdjson: {
table->update_ndjson(r.data().from_ndjson(), r.port_id());
break;
}
case proto::MakeTableData::kFromSchema:
case proto::MakeTableData::DATA_NOT_SET:
default: {
Expand Down Expand Up @@ -2038,6 +2050,45 @@ ProtoServer::_handle_request(std::uint32_t client_id, const Request& req) {
push_resp(std::move(resp));
break;
}
case proto::Request::kViewToNdjsonStringReq: {
auto view = m_resources.get_view(req.entity_id());
const auto& r = req.view_to_rows_string_req();
auto config = view->get_view_config();
std::string nidx{view_sides_to_string(*view)};
auto num_hidden = calculate_num_hidden(*view, *config);

auto dims = parse_format_options(
r.viewport(),
view->num_columns(),
view->num_rows(),
view->sides(),
config->is_column_only(),
num_hidden
);

auto json_str = view->to_ndjson(
dims.start_row,
dims.end_row,
dims.start_col,
dims.end_col,
num_hidden,
r.formatted(),
r.index(),
r.id(),
r.leaves_only(),
view->sides(),
view->sides() > 0 && !config->is_column_only(),
nidx,
config->get_columns().size(),
view->get_view_config()->get_row_pivots().size()
);

proto::Response resp;
auto* view_cols_str = resp.mutable_view_to_ndjson_string_resp();
view_cols_str->set_ndjson_string(json_str);
push_resp(std::move(resp));
break;
}
case proto::Request::kViewToRowsStringReq: {
auto view = m_resources.get_view(req.entity_id());
const auto& r = req.view_to_rows_string_req();
Expand Down
260 changes: 254 additions & 6 deletions cpp/perspective/src/cpp/table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1305,12 +1305,6 @@ Table::from_rows(
const auto& psp_pkey_col = data_table.get_column("psp_pkey");
const auto& psp_okey_col = data_table.get_column("psp_okey");

// rapidjson::StringBuffer buffer;
// buffer.Clear();
// rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
// document.Accept(writer);
// std::cout << buffer.GetString() << std::endl;

// 3.) Fill table
for (const auto& row : document.GetArray()) {
for (const auto& it : row.GetObject()) {
Expand Down Expand Up @@ -1352,6 +1346,260 @@ Table::from_rows(
return tbl;
}

void
Table::update_ndjson(const std::string_view& data, std::uint32_t port_id) {
rapidjson::Document document;
rapidjson::StringStream s(data.data());
document.ParseStream<rapidjson::kParseStopWhenDoneFlag>(s);
if (document.Size() == 0) {
return;
}

if (!document.IsObject()) {
// TODO Legacy error message
PSP_COMPLAIN_AND_ABORT(
"Cannot determine data types without column names!\n"
)
}

bool is_implicit = m_index.empty();
t_schema table_schema = get_schema();

// 2.) Create table
t_data_table data_table(table_schema);
data_table.init();

// 2a.) Estimate row size to reduce malloc pressure.
auto newlines = 0;
for (char i : data) {
if (i == '\n') {
newlines++;
}
}

data_table.reserve(newlines + 1);
if (is_implicit) {
data_table.add_column("psp_pkey", DTYPE_INT32, true);
} else {
data_table.add_column(
"psp_pkey", table_schema.get_dtype(m_index), true
);
}

t_uindex ii = 0;
const auto& psp_pkey_col = data_table.get_column("psp_pkey");
auto schema = data_table.get_schema();
bool is_first_row = true;
std::vector<std::string> missing_columns = m_column_names;

// 3.) Fill table
bool is_finished = false;
while (!is_finished) {
if (is_implicit) {
psp_pkey_col->set_nth<std::uint32_t>(ii, (ii + m_offset) % m_limit);
}

for (const auto& it : document.GetObject()) {
std::shared_ptr<t_column> col;
std::string_view col_name = it.name.GetString();
if (std::string_view{it.name.GetString()} == "__INDEX__") {
col_name = "psp_pkey";
}

if (!schema.has_column(col_name)) {
LOG_DEBUG("Ignoring column " << col_name);
LOG_DEBUG("Schema:\n" << schema);
continue;
}

if (is_first_row) {
missing_columns.erase(
std ::remove(
missing_columns.begin(), missing_columns.end(), col_name
),
missing_columns.end()
);
}

col = data_table.get_column(col_name);
auto promote = fill_column_json(col, ii, it.value, true);
if (promote) {
std::stringstream ss;
ss << "Cannot append value of type " << dtype_to_str(*promote)
<< " to column of type " << dtype_to_str(col->get_dtype())
<< std::endl;
PSP_COMPLAIN_AND_ABORT(ss.str());
}

if (!is_implicit && m_index == it.name.GetString()) {
fill_column_json(psp_pkey_col, ii, it.value, true);
}
}

is_first_row = false;
if (ii + m_offset >= m_limit) {
for (auto& col_name : missing_columns) {
data_table.get_column(col_name)->unset(ii);
}
}

ii++;

document.ParseStream<rapidjson::kParseStopWhenDoneFlag>(s);
if (document.HasParseError()) {
is_finished = true;
}
}

data_table.extend(ii);
data_table.clone_column("psp_pkey", "psp_okey");
process_op_column(data_table, t_op::OP_INSERT);
calculate_offset(ii);
m_pool->send(get_gnode()->get_id(), port_id, data_table);
}

std::shared_ptr<Table>
Table::from_ndjson(
const std::string& index, const std::string_view& data, std::uint32_t limit
) {
auto pool = std::make_shared<t_pool>();
pool->init();

// 1.) Infer schema
rapidjson::Document document;
rapidjson::StringStream s(data.data());
document.ParseStream<rapidjson::kParseStopWhenDoneFlag>(s);

if (document.Size() > 0 && !document.IsObject()) {
std::stringstream ss;
ss << "Received non-object " << document[0].GetType();
PSP_COMPLAIN_AND_ABORT(ss.str())
}

std::vector<std::string> column_names;
std::vector<t_dtype> data_types;
bool is_implicit = true;
std::set<std::string> columns_known_type;
std::set<std::string> columns_seen;

// TODO I don't think it makes sense to do the same incremental-schema
// enhancement we do for regular JSON. For now this only checks the first
// row.
[&]() {
for (const auto& col : document.GetObject()) {
columns_seen.insert(col.name.GetString());
}

// https://github.com/Tencent/rapidjson/issues/1994
for (const auto& col : document.GetObject()) {
if (col.name.GetString() == index) {
is_implicit = false;
}

if (columns_known_type.count(col.name.GetString()) > 0) {
continue;
}

auto dtype = rapidjson_type_to_dtype(col.value);
if (dtype != DTYPE_NONE) {
columns_known_type.insert(col.name.GetString());
data_types.push_back(rapidjson_type_to_dtype(col.value));
column_names.emplace_back(col.name.GetString());
}

// Theoretically there can end too early if the first
// few rows are missing columns that are present in later rows.
if (columns_known_type.size() == columns_seen.size()) {
return;
}
}
}();

auto untyped_columns = columns_seen;
for (const auto& col : columns_seen) {
if (columns_known_type.count(col) == 0) {
// Default all null columns to string
data_types.push_back(DTYPE_STR);
column_names.emplace_back(col);
}
}

t_schema schema(column_names, data_types);

// 2.) Create table
t_data_table data_table(schema);
data_table.init();

if (is_implicit) {
data_table.add_column("psp_pkey", DTYPE_INT32, true);
data_table.add_column("psp_okey", DTYPE_INT32, true);
} else {
data_table.add_column("psp_pkey", schema.get_dtype(index), true);
data_table.add_column("psp_okey", schema.get_dtype(index), true);
}

std::int32_t ii = 0;
const auto& psp_pkey_col = data_table.get_column("psp_pkey");
const auto& psp_okey_col = data_table.get_column("psp_okey");

// 2a.) Estimate row size to reduce malloc pressure.
auto newlines = 0;
for (char i : data) {
if (i == '\n') {
newlines++;
}
}

data_table.reserve(newlines + 1);

// 3.) Fill table
bool is_finished = false;
while (!is_finished) {
for (const auto& it : document.GetObject()) {
auto col = data_table.get_column(it.name.GetString());
const auto* col_name = it.name.GetString();
const auto& cell = it.value;
auto promote = fill_column_json(col, ii, cell, false);
if (promote) {
LOG_DEBUG(
"Promoting column " << col_name << " from "
<< dtype_to_str(col->get_dtype())
<< " to " << dtype_to_str(*promote)
);

data_table.promote_column(col_name, *promote, ii, true);
col = data_table.get_column(col_name);
fill_column_json(col, ii, cell, false);
}

if (!is_implicit && index == it.name.GetString()) {
fill_column_json(psp_pkey_col, ii, it.value, false);
fill_column_json(psp_okey_col, ii, it.value, false);
}
}

if (is_implicit) {
psp_pkey_col->set_nth<std::int32_t>(ii, ii % limit);
psp_okey_col->set_nth<std::int32_t>(ii, ii % limit);
}

ii++;
document.ParseStream<rapidjson::kParseStopWhenDoneFlag>(s);
if (document.HasParseError()) {
is_finished = true;
}
}

data_table.extend(ii);
auto tbl = std::make_shared<Table>(
pool, schema.columns(), schema.types(), limit, index
);

tbl->init(data_table, ii, t_op::OP_INSERT, 0);
pool->_process();
return tbl;
}

std::shared_ptr<Table>
Table::from_schema(
const std::string& index, const t_schema& schema, std::uint32_t limit
Expand Down
Loading

0 comments on commit cbadc6d

Please sign in to comment.