Skip to content

Commit

Permalink
ArrowBuilder generates metadata file for Feather/CSV formats
Browse files Browse the repository at this point in the history
  • Loading branch information
elidwa committed Apr 15, 2024
1 parent 29d5c27 commit 0fdadcd
Show file tree
Hide file tree
Showing 10 changed files with 174 additions and 36 deletions.
17 changes: 10 additions & 7 deletions clients/python/sliderule/sliderule.py
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,9 @@ def procoutputfile(parm, rsps):
else:
# Return GeoParquet File as GeoDataFrame
return geopandas.read_parquet(path)
elif output["format"] == "feather":
# Return Feather File as DataFrame
return geopandas.pd.read_feather(path)
elif output["format"] == "csv":
# Return CSV File as DataFrame
return geopandas.pd.read_csv(path)
Expand Down Expand Up @@ -602,7 +605,7 @@ def todataframe(columns, time_key="time", lon_key="longitude", lat_key="latitude

# Set Default Keyword Arguments
kwargs['index_key'] = "time"
kwargs['crs'] = SLIDERULE_EPSG
kwargs['crs'] = SLIDERULE_EPSG

# Check Empty Columns
if len(columns) <= 0:
Expand Down Expand Up @@ -832,7 +835,7 @@ def set_verbose (enable, loglevel=logging.INFO):
'''
Sets up a console logger to print log messages to screen
If you want more control over the behavior of the log messages being captured, do not call this function but instead
If you want more control over the behavior of the log messages being captured, do not call this function but instead
create and configure a Python log handler of your own and attach it to `sliderule.logger`.
Parameters
Expand Down Expand Up @@ -1297,10 +1300,10 @@ def toregion(source, tolerance=0.0, cellsize=0.01, n_clusters=1):
region = {
"gdf": <GeoDataFrame of region>
"poly": [{"lat": <lat1>, "lon": <lon1> }, ...],
"raster": {"data": <geojson file as string>,
"poly": [{"lat": <lat1>, "lon": <lon1> }, ...],
"raster": {"data": <geojson file as string>,
"clusters": [[{"lat": <lat1>, "lon": <lon1>}, ...], [{"lat": <lat1>, "lon": <lon1>}, ...]] }
Expand Down Expand Up @@ -1346,7 +1349,7 @@ def toregion(source, tolerance=0.0, cellsize=0.01, n_clusters=1):
"lon": -108.20772968780051,
"lat": 38.8232055291981
}
]
]
'''

tstart = time.perf_counter()
Expand Down
54 changes: 50 additions & 4 deletions clients/python/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,9 @@ def test_atl08_ancillary(self, init):
"srt": icesat2.SRT_LAND,
"len": 30,
"res": 30,
"pass_invalid": True,
"pass_invalid": True,
"atl08_class": ["atl08_ground", "atl08_canopy", "atl08_top_of_canopy"],
"atl08_fields": ancillary_fields,
"atl08_fields": ancillary_fields,
"phoreal": {"binsize": 1.0, "geoloc": "center", "above_classifier": True, "use_abs_h": False, "send_waveform": False},
"output": {
"path": "testfile6.parquet",
Expand Down Expand Up @@ -205,7 +205,7 @@ def test_atl06_csv(self, init):
# sort values
gdf_from_parquet = gdf_from_parquet.sort_values('extent_id')
gdf_from_csv = gdf_from_csv.sort_values('extent_id')

# checks
assert init
assert len(gdf_from_parquet) == 957
Expand All @@ -218,4 +218,50 @@ def test_atl06_csv(self, init):
parquet_val = gdf_from_parquet[column].iloc[row]
csv_val = gdf_from_csv[column].iloc[row]
assert abs(parquet_val - csv_val) < 0.0001, f'mismatch in column <{column}>: {parquet_val} != {csv_val}'



# def test_atl06_feather(self, init):
# resource = "ATL03_20190314093716_11600203_005_01.h5"
# region = sliderule.toregion(os.path.join(TESTDIR, "data/dicksonfjord.geojson"))
# parms = { "poly": region['poly'],
# "cnf": "atl03_high",
# "ats": 20.0,
# "cnt": 10,
# "len": 40.0,
# "res": 20.0,
# "maxi": 1,
# "output": { "path": "", "format": "", "open_on_complete": True, "as_geo": False } }

# # create parquet file
# parms["output"]["path"] = "testfile8.parquet"
# parms["output"]["format"] = "parquet"
# gdf_from_parquet = icesat2.atl06p(parms, resources=[resource], keep_id=True)
# os.remove("testfile8.parquet")

# # create feather file
# parms["output"]["path"] = "testfile8.feather"
# parms["output"]["format"] = "feather"
# gdf_from_feather = icesat2.atl06p(parms, resources=[resource], keep_id=True)
# print(gdf_from_feather)
# os.remove("testfile8.feather")

# # sort values
# gdf_from_parquet = gdf_from_parquet.sort_values('extent_id')
# gdf_from_feather = gdf_from_feather.sort_values('extent_id')

# # checks
# assert init
# assert len(gdf_from_parquet) == 957
# assert len(gdf_from_parquet.keys()) == 18, f'keys are {list(gdf_from_parquet.keys())}'
# assert len(gdf_from_feather) == 957
# assert len(gdf_from_feather.keys()) == 18, f'keys are {list(gdf_from_feather.keys())}'
# columns_to_check = ["dh_fit_dx","n_fit_photons","longitude"]
# for column in columns_to_check:
# for row in range(len(gdf_from_parquet)):
# parquet_val = gdf_from_parquet[column].iloc[row]
# feather_val = gdf_from_feather[column].iloc[row]
# assert abs(parquet_val - feather_val) < 0.0001, f'mismatch in column <{column}>: {parquet_val} != {feather_val}'



### TODO: for CSV and Feather, check that the metadata files are created
39 changes: 29 additions & 10 deletions packages/arrow/ArrowBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,19 @@ const char* ArrowBuilder::getSubField (const char* field_name)
}

/*----------------------------------------------------------------------------
* getFileName
* getDataFile
*----------------------------------------------------------------------------*/
const char* ArrowBuilder::getFileName (void)
const char* ArrowBuilder::getDataFile(void)
{
return fileName;
return dataFile;
}

/*----------------------------------------------------------------------------
* getMetadataFile
*----------------------------------------------------------------------------*/
const char* ArrowBuilder::getMetadataFile(void)
{
return metadataFile;
}

/*----------------------------------------------------------------------------
Expand Down Expand Up @@ -264,11 +272,13 @@ ArrowBuilder::ArrowBuilder (lua_State* L, ArrowParms* _parms,
}
}

/* Get Path */
/* Get Paths */
outputPath = ArrowCommon::getOutputPath(parms);
outputMetadataPath = ArrowCommon::createMetadataFileName(outputPath);

/* Create Unique Temporary Filename */
fileName = ArrowCommon::getUniqueFileName(id);
/* Create Unique Temporary Filenames */
dataFile = ArrowCommon::getUniqueFileName(id);
metadataFile = ArrowCommon::createMetadataFileName(dataFile);

/*
* NO THROWING BEYOND THIS POINT
Expand Down Expand Up @@ -310,8 +320,10 @@ ArrowBuilder::~ArrowBuilder(void)
active = false;
delete builderPid;
parms->releaseLuaObject();
delete [] fileName;
delete [] dataFile;
delete [] metadataFile;
delete [] outputPath;
delete [] outputMetadataPath;
delete [] recType;
delete [] timeKey;
delete [] xKey;
Expand All @@ -330,7 +342,7 @@ void* ArrowBuilder::builderThread(void* parm)
int row_cnt = 0;

/* Start Trace */
uint32_t trace_id = start_trace(INFO, builder->traceId, "arrow_builder", "{\"filename\":\"%s\"}", builder->fileName);
uint32_t trace_id = start_trace(INFO, builder->traceId, "arrow_builder", "{\"filename\":\"%s\"}", builder->dataFile);
EventLib::stashId(trace_id);

/* Loop Forever */
Expand Down Expand Up @@ -486,8 +498,15 @@ void* ArrowBuilder::builderThread(void* parm)
builder->recordBatch.clear();

/* Send File to User */
ArrowCommon::send2User(builder->fileName, builder->outputPath, trace_id, builder->parms, builder->outQ);
ArrowCommon::removeFile(builder->fileName);
ArrowCommon::send2User(builder->dataFile, builder->outputPath, trace_id, builder->parms, builder->outQ);
ArrowCommon::removeFile(builder->dataFile);

/* Send Metadata File to User */
if(ArrowCommon::fileExists(builder->metadataFile))
{
ArrowCommon::send2User(builder->metadataFile, builder->outputMetadataPath, trace_id, builder->parms, builder->outQ);
ArrowCommon::removeFile(builder->metadataFile);
}

/* Signal Completion */
builder->signalComplete();
Expand Down
8 changes: 6 additions & 2 deletions packages/arrow/ArrowBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ class ArrowBuilder: public LuaObject

const char* getSubField (const char* field_name);
const char* getFileName (void);
const char* getDataFile (void);
const char* getMetadataFile (void);
const char* getRecType (void);
const char* getTimeKey (void);
const char* getXKey (void);
Expand Down Expand Up @@ -161,9 +163,11 @@ class ArrowBuilder: public LuaObject
int rowSizeBytes;
int batchRowSizeBytes;
int maxRowsInGroup;
const char* fileName; // used locally to build file
const char* outputPath; // final destination of the file
geo_data_t geoData;
const char* dataFile; // used locally to build data file
const char* metadataFile; // used locally to build json metadata file
const char* outputPath; // final destination of the data file
const char* outputMetadataPath; // final destination of the metadata file

ArrowBuilderImpl* impl; // private arrow data

Expand Down
49 changes: 46 additions & 3 deletions packages/arrow/ArrowBuilderImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@
#include <parquet/file_writer.h>
#include <arrow/csv/writer.h>
#include <regex>
#include <rapidjson/document.h>
#include <rapidjson/writer.h>
#include <rapidjson/stringbuffer.h>

#ifdef __aws__
#include "aws.h"
Expand Down Expand Up @@ -207,7 +210,7 @@ void ArrowBuilderImpl::createSchema (void)
{
/* Set Arrow Output Stream */
shared_ptr<arrow::io::FileOutputStream> file_output_stream;
PARQUET_ASSIGN_OR_THROW(file_output_stream, arrow::io::FileOutputStream::Open(arrowBuilder->getFileName()));
PARQUET_ASSIGN_OR_THROW(file_output_stream, arrow::io::FileOutputStream::Open(arrowBuilder->getDataFile()));

/* Set Writer Properties */
parquet::WriterProperties::Builder writer_props_builder;
Expand Down Expand Up @@ -239,8 +242,10 @@ void ArrowBuilderImpl::createSchema (void)
}
else if(arrowBuilder->getParms()->format == ArrowParms::FEATHER)
{
createMetadataFile();

/* Create FEATHER Writer */
auto result = arrow::io::FileOutputStream::Open(arrowBuilder->getFileName());
auto result = arrow::io::FileOutputStream::Open(arrowBuilder->getDataFile());
if(result.ok())
{
featherWriter = result.ValueOrDie();
Expand All @@ -253,8 +258,10 @@ void ArrowBuilderImpl::createSchema (void)
}
else if(arrowBuilder->getParms()->format == ArrowParms::CSV)
{
createMetadataFile();

/* Create CSV Writer */
auto result = arrow::io::FileOutputStream::Open(arrowBuilder->getFileName());
auto result = arrow::io::FileOutputStream::Open(arrowBuilder->getDataFile());
if(result.ok())
{
csvWriter = result.ValueOrDie();
Expand Down Expand Up @@ -581,6 +588,42 @@ void ArrowBuilderImpl::appendPandasMetaData (const std::shared_ptr<arrow::KeyVal
metadata->Append("pandas", pandasstr.c_str());
}

/*----------------------------------------------------------------------------
* createMetadataFile
*----------------------------------------------------------------------------*/
void ArrowBuilderImpl::createMetadataFile(void)
{
auto metadata = std::make_shared<arrow::KeyValueMetadata>();
appendServerMetaData(metadata);

rapidjson::Document doc;
doc.SetObject();
rapidjson::Document::AllocatorType& allocator = doc.GetAllocator();

for (int i = 0; i < metadata->size(); ++i)
{
print2term("Key: %s, Value: %s\n", metadata->key(i).c_str(), metadata->value(i).c_str());
rapidjson::Value key(metadata->key(i).c_str(), allocator);
rapidjson::Value value(metadata->value(i).c_str(), allocator);
doc.AddMember(key, value, allocator);
}

/* Serialize the JSON document to string */
rapidjson::StringBuffer buffer;
rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
doc.Accept(writer);

/* Write the JSON string to a file */
const char* file_path = arrowBuilder->getMetadataFile();
FILE* jsonFile = fopen(file_path, "w");
if(jsonFile)
{
fwrite(buffer.GetString(), 1, buffer.GetSize(), jsonFile);
fclose(jsonFile);
}
else throw RunTimeException(CRITICAL, RTE_ERROR, "Failed to open metadata file: %s", file_path);
}

/*----------------------------------------------------------------------------
* processField
*----------------------------------------------------------------------------*/
Expand Down
1 change: 1 addition & 0 deletions packages/arrow/ArrowBuilderImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ class ArrowBuilderImpl
void appendGeoMetaData (const std::shared_ptr<arrow::KeyValueMetadata>& metadata);
void appendServerMetaData (const std::shared_ptr<arrow::KeyValueMetadata>& metadata);
void appendPandasMetaData (const std::shared_ptr<arrow::KeyValueMetadata>& metadata);
void createMetadataFile (void);
void processField (RecordObject::field_t& field,
shared_ptr<arrow::Array>* column,
batch_list_t& record_batch,
Expand Down
10 changes: 5 additions & 5 deletions packages/arrow/ArrowCommon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ bool send2User (const char* fileName, const char* outputPath,
else
{
/* Stream File Back to Client */
status = send2Client(fileName, parms, outQ);
status = send2Client(fileName, outputPath, outQ);
}

stop_trace(INFO, send_trace_id);
Expand Down Expand Up @@ -235,11 +235,11 @@ bool send2S3 (const char* fileName, const char* s3dst, const char* outputPath,
/*----------------------------------------------------------------------------
* send2Client
*----------------------------------------------------------------------------*/
bool send2Client (const char* fileName, ArrowParms* parms, Publisher* outQ)
bool send2Client (const char* fileName, const char* outPath, Publisher* outQ)
{
bool status = true;

/* Reopen Parquet File to Stream Back as Response */
/* Reopen File to Stream Back as Response */
FILE* fp = fopen(fileName, "r");
if(fp)
{
Expand All @@ -256,7 +256,7 @@ bool send2Client (const char* fileName, ArrowParms* parms, Publisher* outQ)
/* Send Meta Record */
RecordObject meta_record(metaRecType);
arrow_file_meta_t* meta = (arrow_file_meta_t*)meta_record.getRecordData();
StringLib::copy(&meta->filename[0], parms->path, FILE_NAME_MAX_LEN);
StringLib::copy(&meta->filename[0], outPath, FILE_NAME_MAX_LEN);
meta->size = file_size;
if(!meta_record.post(outQ))
{
Expand All @@ -270,7 +270,7 @@ bool send2Client (const char* fileName, ArrowParms* parms, Publisher* outQ)
{
RecordObject data_record(dataRecType, 0, false);
arrow_file_data_t* data = (arrow_file_data_t*)data_record.getRecordData();
StringLib::copy(&data->filename[0], parms->path, FILE_NAME_MAX_LEN);
StringLib::copy(&data->filename[0], outPath, FILE_NAME_MAX_LEN);
size_t bytes_read = fread(data->data, 1, FILE_BUFFER_RSPS_SIZE, fp);
if(!data_record.post(outQ, offsetof(arrow_file_data_t, data) + bytes_read))
{
Expand Down
2 changes: 1 addition & 1 deletion packages/arrow/ArrowCommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ namespace ArrowCommon
uint32_t traceId, ArrowParms* parms, Publisher* outQ);
bool send2S3 (const char* fileName, const char* s3dst, const char* outputPath,
ArrowParms* parms, Publisher* outQ);
bool send2Client(const char* fileName, ArrowParms* parms, Publisher* outQ);
bool send2Client(const char* fileName, const char* outPath, Publisher* outQ);

const char* getOutputPath(ArrowParms* parms);
const char* getUniqueFileName(const char* id = NULL);
Expand Down
Loading

0 comments on commit 0fdadcd

Please sign in to comment.