From e0036b297adfcde2d92ba858612f77970ed67eca Mon Sep 17 00:00:00 2001 From: Eric Lidwa Date: Fri, 12 Apr 2024 01:03:13 +0000 Subject: [PATCH] Implemented ArrowCommon module --- packages/arrow/ArrowBuilder.cpp | 226 +--------- packages/arrow/ArrowBuilder.h | 31 -- packages/arrow/ArrowBuilderImpl.cpp | 15 +- packages/arrow/ArrowBuilderImpl.h | 3 +- packages/arrow/ArrowCommon.cpp | 393 ++++++++++++++++++ packages/arrow/{ArrowImpl.h => ArrowCommon.h} | 65 ++- packages/arrow/ArrowSampler.cpp | 73 +++- packages/arrow/ArrowSampler.h | 26 +- packages/arrow/ArrowSamplerImpl.cpp | 63 +-- packages/arrow/ArrowSamplerImpl.h | 10 +- packages/arrow/CMakeLists.txt | 3 +- packages/arrow/arrow.cpp | 2 + scripts/selftests/parquet_sampler.lua | 51 ++- 13 files changed, 598 insertions(+), 363 deletions(-) create mode 100644 packages/arrow/ArrowCommon.cpp rename packages/arrow/{ArrowImpl.h => ArrowCommon.h} (50%) diff --git a/packages/arrow/ArrowBuilder.cpp b/packages/arrow/ArrowBuilder.cpp index 30cf925ce..c2ddca0ea 100644 --- a/packages/arrow/ArrowBuilder.cpp +++ b/packages/arrow/ArrowBuilder.cpp @@ -52,25 +52,6 @@ const struct luaL_Reg ArrowBuilder::LUA_META_TABLE[] = { {NULL, NULL} }; -const char* ArrowBuilder::metaRecType = "arrowrec.meta"; -const RecordObject::fieldDef_t ArrowBuilder::metaRecDef[] = { - {"filename", RecordObject::STRING, offsetof(arrow_file_meta_t, filename), FILE_NAME_MAX_LEN, NULL, NATIVE_FLAGS}, - {"size", RecordObject::INT64, offsetof(arrow_file_meta_t, size), 1, NULL, NATIVE_FLAGS} -}; - -const char* ArrowBuilder::dataRecType = "arrowrec.data"; -const RecordObject::fieldDef_t ArrowBuilder::dataRecDef[] = { - {"filename", RecordObject::STRING, offsetof(arrow_file_data_t, filename), FILE_NAME_MAX_LEN, NULL, NATIVE_FLAGS}, - {"data", RecordObject::UINT8, offsetof(arrow_file_data_t, data), 0, NULL, NATIVE_FLAGS} // variable length -}; - -const char* ArrowBuilder::remoteRecType = "arrowrec.remote"; -const RecordObject::fieldDef_t ArrowBuilder::remoteRecDef[] = { - {"url", RecordObject::STRING, offsetof(arrow_file_remote_t, url), URL_MAX_LEN, NULL, NATIVE_FLAGS}, - {"size", RecordObject::INT64, offsetof(arrow_file_remote_t, size), 1, NULL, NATIVE_FLAGS} -}; - -const char* ArrowBuilder::TMP_FILE_PREFIX = "/tmp/"; /****************************************************************************** * PUBLIC METHODS @@ -108,9 +89,6 @@ int ArrowBuilder::luaCreate (lua_State* L) *----------------------------------------------------------------------------*/ void ArrowBuilder::init (void) { - RECDEF(metaRecType, metaRecDef, sizeof(arrow_file_meta_t), NULL); - RECDEF(dataRecType, dataRecDef, sizeof(arrow_file_data_t), NULL); - RECDEF(remoteRecType, remoteRecDef, sizeof(arrow_file_remote_t), NULL); } /*---------------------------------------------------------------------------- @@ -287,37 +265,7 @@ ArrowBuilder::ArrowBuilder (lua_State* L, ArrowParms* _parms, } /* Get Path */ - if(parms->asset_name) - { - /* Check Private Cluster */ - if(OsApi::getIsPublic()) - { - throw RunTimeException(CRITICAL, RTE_ERROR, "Unable to stage output on public cluster"); - } - - /* Generate Output Path */ - Asset* asset = dynamic_cast(LuaObject::getLuaObjectByName(parms->asset_name, Asset::OBJECT_TYPE)); - const char* path_prefix = StringLib::match(asset->getDriver(), "s3") ? "s3://" : ""; - const char* path_suffix = "bin"; - if(parms->format == ArrowParms::PARQUET) path_suffix = parms->as_geo ? ".geoparquet" : ".parquet"; - if(parms->format == ArrowParms::CSV) path_suffix = "csv"; - FString path_name("%s.%016lX", OsApi::getCluster(), OsApi::time(OsApi::CPU_CLK)); - bool use_provided_path = ((parms->path != NULL) && (parms->path[0] != '\0')); - FString path_str("%s%s/%s%s", path_prefix, asset->getPath(), use_provided_path ? parms->path : path_name.c_str(), path_suffix); - asset->releaseLuaObject(); - - /* Set Output Path */ - outputPath = path_str.c_str(true); - mlog(INFO, "Generating unique path: %s", outputPath); - } - else if((parms->path == NULL) || (parms->path[0] == '\0')) - { - throw RunTimeException(CRITICAL, RTE_ERROR, "Unable to determine output path"); - } - else - { - outputPath = StringLib::duplicate(parms->path); - } + outputPath = ArrowCommon::getOutputPath(parms); /* * NO THROWING BEYOND THIS POINT @@ -344,8 +292,7 @@ ArrowBuilder::ArrowBuilder (lua_State* L, ArrowParms* _parms, inQ = new Subscriber(inq_name, MsgQ::SUBSCRIBER_OF_CONFIDENCE, qdepth); /* Create Unique Temporary Filename */ - FString tmp_file("%s%s.bin", TMP_FILE_PREFIX, id); - fileName = tmp_file.c_str(true); + fileName = ArrowCommon::getUniqueFileName(id); /* Allocate Implementation */ impl = new ArrowBuilderImpl(this); @@ -383,7 +330,7 @@ void* ArrowBuilder::builderThread(void* parm) int row_cnt = 0; /* Start Trace */ - uint32_t trace_id = start_trace(INFO, builder->traceId, "parquet_builder", "{\"filename\":\"%s\"}", builder->fileName); + uint32_t trace_id = start_trace(INFO, builder->traceId, "arrow_builder", "{\"filename\":\"%s\"}", builder->fileName); EventLib::stashId(trace_id); /* Loop Forever */ @@ -539,33 +486,8 @@ void* ArrowBuilder::builderThread(void* parm) builder->recordBatch.clear(); /* Send File to User */ - const char* _path = builder->outputPath; - uint32_t send_trace_id = start_trace(INFO, trace_id, "send_file", "{\"path\": \"%s\"}", _path); - int _path_len = StringLib::size(_path); - if( (_path_len > 5) && - (_path[0] == 's') && - (_path[1] == '3') && - (_path[2] == ':') && - (_path[3] == '/') && - (_path[4] == '/')) - { - /* Upload File to S3 */ - builder->send2S3(&_path[5]); - } - else - { - /* Stream File Back to Client */ - builder->send2Client(); - } - - /* Remove File */ - int rc = remove(builder->fileName); - if(rc != 0) - { - mlog(CRITICAL, "Failed (%d) to delete file %s: %s", rc, builder->fileName, strerror(errno)); - } - - stop_trace(INFO, send_trace_id); + ArrowCommon::send2User(builder->fileName, builder->outputPath, trace_id, builder->parms, builder->outQ); + ArrowCommon::removeFile(builder->fileName); /* Signal Completion */ builder->signalComplete(); @@ -577,141 +499,3 @@ void* ArrowBuilder::builderThread(void* parm) return NULL; } -/*---------------------------------------------------------------------------- - * send2S3 - *----------------------------------------------------------------------------*/ -bool ArrowBuilder::send2S3 (const char* s3dst) -{ - #ifdef __aws__ - - bool status = true; - - /* Check Path */ - if(!s3dst) return false; - - /* Get Bucket and Key */ - char* bucket = StringLib::duplicate(s3dst); - char* key = bucket; - while(*key != '\0' && *key != '/') key++; - if(*key == '/') - { - *key = '\0'; - } - else - { - status = false; - mlog(CRITICAL, "invalid S3 url: %s", s3dst); - } - key++; - - /* Put File */ - if(status) - { - /* Send Initial Status */ - alert(INFO, RTE_INFO, outQ, NULL, "Initiated upload of results to S3, bucket = %s, key = %s", bucket, key); - - try - { - /* Upload to S3 */ - int64_t bytes_uploaded = S3CurlIODriver::put(fileName, bucket, key, parms->region, &parms->credentials); - - /* Send Successful Status */ - alert(INFO, RTE_INFO, outQ, NULL, "Upload to S3 completed, bucket = %s, key = %s, size = %ld", bucket, key, bytes_uploaded); - - /* Send Remote Record */ - RecordObject remote_record(remoteRecType); - arrow_file_remote_t* remote = (arrow_file_remote_t*)remote_record.getRecordData(); - StringLib::copy(&remote->url[0], outputPath, URL_MAX_LEN); - remote->size = bytes_uploaded; - if(!remote_record.post(outQ)) - { - mlog(CRITICAL, "Failed to send remote record back to user for %s", outputPath); - } - } - catch(const RunTimeException& e) - { - status = false; - - /* Send Error Status */ - alert(e.level(), RTE_ERROR, outQ, NULL, "Upload to S3 failed, bucket = %s, key = %s, error = %s", bucket, key, e.what()); - } - } - - /* Clean Up */ - delete [] bucket; - - /* Return Status */ - return status; - - #else - alert(CRITICAL, RTE_ERROR, outQ, NULL, "Output path specifies S3, but server compiled without AWS support"); - return false; - #endif -} - -/*---------------------------------------------------------------------------- - * send2Client - *----------------------------------------------------------------------------*/ -bool ArrowBuilder::send2Client (void) -{ - bool status = true; - - /* Reopen Parquet File to Stream Back as Response */ - FILE* fp = fopen(fileName, "r"); - if(fp) - { - /* Get Size of File */ - fseek(fp, 0L, SEEK_END); - long file_size = ftell(fp); - fseek(fp, 0L, SEEK_SET); - - /* Log Status */ - mlog(INFO, "Writing file %s of size %ld", fileName, file_size); - - do - { - /* Send Meta Record */ - RecordObject meta_record(metaRecType); - arrow_file_meta_t* meta = (arrow_file_meta_t*)meta_record.getRecordData(); - StringLib::copy(&meta->filename[0], parms->path, FILE_NAME_MAX_LEN); - meta->size = file_size; - if(!meta_record.post(outQ)) - { - status = false; - break; // early exit on error - } - - /* Send Data Records */ - long offset = 0; - while(offset < file_size) - { - RecordObject data_record(dataRecType, 0, false); - arrow_file_data_t* data = (arrow_file_data_t*)data_record.getRecordData(); - StringLib::copy(&data->filename[0], parms->path, FILE_NAME_MAX_LEN); - size_t bytes_read = fread(data->data, 1, FILE_BUFFER_RSPS_SIZE, fp); - if(!data_record.post(outQ, offsetof(arrow_file_data_t, data) + bytes_read)) - { - status = false; - break; // early exit on error - } - offset += bytes_read; - } - } while(false); - - /* Close File */ - int rc = fclose(fp); - if(rc != 0) - { - status = false; - mlog(CRITICAL, "Failed (%d) to close file %s: %s", rc, fileName, strerror(errno)); - } - } - else // unable to open file - { - status = false; - mlog(CRITICAL, "Failed (%d) to read file %s: %s", errno, fileName, strerror(errno)); - } - - /* Return Status */ - return status; -} diff --git a/packages/arrow/ArrowBuilder.h b/packages/arrow/ArrowBuilder.h index 647ebd9b9..84e408f29 100644 --- a/packages/arrow/ArrowBuilder.h +++ b/packages/arrow/ArrowBuilder.h @@ -69,9 +69,6 @@ class ArrowBuilder: public LuaObject * Constants *--------------------------------------------------------------------*/ - static const int FILE_NAME_MAX_LEN = 128; - static const int URL_MAX_LEN = 512; - static const int FILE_BUFFER_RSPS_SIZE = 0x2000000; // 32MB static const int ROW_GROUP_SIZE = 0x4000000; // 64MB static const int QUEUE_BUFFER_FACTOR = 3; @@ -79,17 +76,6 @@ class ArrowBuilder: public LuaObject static const char* LUA_META_NAME; static const struct luaL_Reg LUA_META_TABLE[]; - static const char* metaRecType; - static const RecordObject::fieldDef_t metaRecDef[]; - - static const char* dataRecType; - static const RecordObject::fieldDef_t dataRecDef[]; - - static const char* remoteRecType; - static const RecordObject::fieldDef_t remoteRecDef[]; - - static const char* TMP_FILE_PREFIX; - /*-------------------------------------------------------------------- * Types *--------------------------------------------------------------------*/ @@ -127,21 +113,6 @@ class ArrowBuilder: public LuaObject RecordObject::field_t y_field; } geo_data_t; - typedef struct { - char filename[FILE_NAME_MAX_LEN]; - long size; - } arrow_file_meta_t; - - typedef struct { - char filename[FILE_NAME_MAX_LEN]; - uint8_t data[FILE_BUFFER_RSPS_SIZE]; - } arrow_file_data_t; - - typedef struct { - char url[URL_MAX_LEN]; - long size; - } arrow_file_remote_t; - /*-------------------------------------------------------------------- * Methods *--------------------------------------------------------------------*/ @@ -205,8 +176,6 @@ class ArrowBuilder: public LuaObject const char* rec_type, const char* id); ~ArrowBuilder (void); static void* builderThread (void* parm); - bool send2S3 (const char* s3dst); - bool send2Client (void); }; #endif /* __arrow_builder__ */ diff --git a/packages/arrow/ArrowBuilderImpl.cpp b/packages/arrow/ArrowBuilderImpl.cpp index c3a86c5f4..cb474d4f2 100644 --- a/packages/arrow/ArrowBuilderImpl.cpp +++ b/packages/arrow/ArrowBuilderImpl.cpp @@ -43,6 +43,15 @@ #include "core.h" #include "ArrowBuilderImpl.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include #include #include @@ -1219,7 +1228,7 @@ void ArrowBuilderImpl::processGeometry (RecordObject::field_t& x_field, RecordOb { arrow::BinaryBuilder builder; (void)builder.Reserve(num_rows); - (void)builder.ReserveData(num_rows * sizeof(wkbpoint_t)); + (void)builder.ReserveData(num_rows * sizeof(ArrowCommon::wkbpoint_t)); for(int i = 0; i < record_batch.length(); i++) { ArrowBuilder::batch_t* batch = record_batch[i]; @@ -1227,7 +1236,7 @@ void ArrowBuilderImpl::processGeometry (RecordObject::field_t& x_field, RecordOb int32_t starting_y_offset = y_field.offset; for(int row = 0; row < batch->rows; row++) { - wkbpoint_t point = { + ArrowCommon::wkbpoint_t point = { #ifdef __be__ .byteOrder = 0, #else @@ -1237,7 +1246,7 @@ void ArrowBuilderImpl::processGeometry (RecordObject::field_t& x_field, RecordOb .x = batch->pri_record->getValueReal(x_field), .y = batch->pri_record->getValueReal(y_field) }; - (void)builder.UnsafeAppend((uint8_t*)&point, sizeof(wkbpoint_t)); + (void)builder.UnsafeAppend((uint8_t*)&point, sizeof(ArrowCommon::wkbpoint_t)); if(x_field.flags & RecordObject::BATCH) x_field.offset += batch_row_size_bits; if(y_field.flags & RecordObject::BATCH) y_field.offset += batch_row_size_bits; } diff --git a/packages/arrow/ArrowBuilderImpl.h b/packages/arrow/ArrowBuilderImpl.h index 6623cdab2..6aeca700c 100644 --- a/packages/arrow/ArrowBuilderImpl.h +++ b/packages/arrow/ArrowBuilderImpl.h @@ -36,7 +36,7 @@ * INCLUDES ******************************************************************************/ -#include "ArrowImpl.h" +#include "ArrowCommon.h" #include "LuaObject.h" #include "Ordering.h" #include "RecordObject.h" @@ -45,6 +45,7 @@ #include "OsApi.h" #include "MsgQ.h" +#include /****************************************************************************** * ARROW BUILDER CLASS diff --git a/packages/arrow/ArrowCommon.cpp b/packages/arrow/ArrowCommon.cpp new file mode 100644 index 000000000..387e6debc --- /dev/null +++ b/packages/arrow/ArrowCommon.cpp @@ -0,0 +1,393 @@ +/* + * Copyright (c) 2021, University of Washington + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * 3. Neither the name of the University of Washington nor the names of its + * contributors may be used to endorse or promote products derived from this + * software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE UNIVERSITY OF WASHINGTON AND CONTRIBUTORS + * “AS IS” AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE UNIVERSITY OF WASHINGTON OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; + * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR + * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +/****************************************************************************** + * INCLUDES + ******************************************************************************/ + +#include "core.h" +#include "ArrowCommon.h" + +#include +#include + +#ifdef __aws__ +#include "aws.h" +#endif + +/****************************************************************************** + * TYPES + ******************************************************************************/ + +static const int URL_MAX_LEN = 512; +static const int FILE_NAME_MAX_LEN = 128; +static const int FILE_BUFFER_RSPS_SIZE = 0x2000000; // 32MB + +typedef struct { + char filename[FILE_NAME_MAX_LEN]; + long size; +} arrow_file_meta_t; + +typedef struct { + char filename[FILE_NAME_MAX_LEN]; + uint8_t data[FILE_BUFFER_RSPS_SIZE]; +} arrow_file_data_t; + +typedef struct { + char url[URL_MAX_LEN]; + long size; +} arrow_file_remote_t; + + +/****************************************************************************** + * CONSTANTS + ******************************************************************************/ + +static const char* metaRecType = "arrowrec.meta"; +static const char* dataRecType = "arrowrec.data"; +static const char* remoteRecType = "arrowrec.remote"; + +static const RecordObject::fieldDef_t metaRecDef[] = { + {"filename", RecordObject::STRING, offsetof(arrow_file_meta_t, filename), FILE_NAME_MAX_LEN, NULL, NATIVE_FLAGS}, + {"size", RecordObject::INT64, offsetof(arrow_file_meta_t, size), 1, NULL, NATIVE_FLAGS} +}; + +static const RecordObject::fieldDef_t dataRecDef[] = { + {"filename", RecordObject::STRING, offsetof(arrow_file_data_t, filename), FILE_NAME_MAX_LEN, NULL, NATIVE_FLAGS}, + {"data", RecordObject::UINT8, offsetof(arrow_file_data_t, data), 0, NULL, NATIVE_FLAGS} // variable length +}; + +static const RecordObject::fieldDef_t remoteRecDef[] = { + {"url", RecordObject::STRING, offsetof(arrow_file_remote_t, url), URL_MAX_LEN, NULL, NATIVE_FLAGS}, + {"size", RecordObject::INT64, offsetof(arrow_file_remote_t, size), 1, NULL, NATIVE_FLAGS} +}; + +static const char* TMP_FILE_PREFIX = "/tmp/"; + + + +/****************************************************************************** + * NAMESPACES + ******************************************************************************/ +namespace ArrowCommon +{ + +/*---------------------------------------------------------------------------- + * init + *----------------------------------------------------------------------------*/ +void init(void) +{ + static bool initialized = false; + if(initialized) return; + + initialized = true; + RECDEF(metaRecType, metaRecDef, sizeof(arrow_file_meta_t), NULL); + RECDEF(dataRecType, dataRecDef, sizeof(arrow_file_data_t), NULL); + RECDEF(remoteRecType, remoteRecDef, sizeof(arrow_file_remote_t), NULL); +} + +/*---------------------------------------------------------------------------- + * send2user + *----------------------------------------------------------------------------*/ +bool send2User (const char* fileName, const char* outputPath, + uint32_t traceId, ArrowParms* parms, Publisher* outQ) +{ + bool status = false; + + /* Send File to User */ + const char* _path = outputPath; + uint32_t send_trace_id = start_trace(INFO, traceId, "send_file", "{\"path\": \"%s\"}", _path); + int _path_len = StringLib::size(_path); + if( (_path_len > 5) && + (_path[0] == 's') && + (_path[1] == '3') && + (_path[2] == ':') && + (_path[3] == '/') && + (_path[4] == '/')) + { + /* Upload File to S3 */ + status = send2S3(fileName, &_path[5], outputPath, parms, outQ); + } + else if( (_path_len > 7) && + (_path[0] == 'f') && + (_path[1] == 'i') && + (_path[2] == 'l') && + (_path[3] == 'e') && + (_path[4] == ':') && + (_path[5] == '/') && + (_path[6] == '/')) + { + /* Rename the file - very fast if both files are on the same partiotion */ + renameFile(fileName, &_path[7]); + status = true; + } + else + { + /* Stream File Back to Client */ + status = send2Client(fileName, parms, outQ); + } + + stop_trace(INFO, send_trace_id); + return status; +} + +/*---------------------------------------------------------------------------- + * send2S3 + *----------------------------------------------------------------------------*/ +bool send2S3 (const char* fileName, const char* s3dst, const char* outputPath, + ArrowParms* parms, Publisher* outQ) +{ + #ifdef __aws__ + + bool status = true; + + /* Check Path */ + if(!s3dst) return false; + + /* Get Bucket and Key */ + char* bucket = StringLib::duplicate(s3dst); + char* key = bucket; + while(*key != '\0' && *key != '/') key++; + if(*key == '/') + { + *key = '\0'; + } + else + { + status = false; + mlog(CRITICAL, "invalid S3 url: %s", s3dst); + } + key++; + + /* Put File */ + if(status) + { + /* Send Initial Status */ + alert(INFO, RTE_INFO, outQ, NULL, "Initiated upload of results to S3, bucket = %s, key = %s", bucket, key); + + try + { + /* Upload to S3 */ + int64_t bytes_uploaded = S3CurlIODriver::put(fileName, bucket, key, parms->region, &parms->credentials); + + /* Send Successful Status */ + alert(INFO, RTE_INFO, outQ, NULL, "Upload to S3 completed, bucket = %s, key = %s, size = %ld", bucket, key, bytes_uploaded); + + /* Send Remote Record */ + RecordObject remote_record(remoteRecType); + arrow_file_remote_t* remote = (arrow_file_remote_t*)remote_record.getRecordData(); + StringLib::copy(&remote->url[0], outputPath, URL_MAX_LEN); + remote->size = bytes_uploaded; + if(!remote_record.post(outQ)) + { + mlog(CRITICAL, "Failed to send remote record back to user for %s", outputPath); + } + } + catch(const RunTimeException& e) + { + status = false; + + /* Send Error Status */ + alert(e.level(), RTE_ERROR, outQ, NULL, "Upload to S3 failed, bucket = %s, key = %s, error = %s", bucket, key, e.what()); + } + } + + /* Clean Up */ + delete [] bucket; + + /* Return Status */ + return status; + + #else + alert(CRITICAL, RTE_ERROR, outQ, NULL, "Output path specifies S3, but server compiled without AWS support"); + return false; + #endif +} + +/*---------------------------------------------------------------------------- + * send2Client + *----------------------------------------------------------------------------*/ +bool send2Client (const char* fileName, ArrowParms* parms, Publisher* outQ) +{ + bool status = true; + + /* Reopen Parquet File to Stream Back as Response */ + FILE* fp = fopen(fileName, "r"); + if(fp) + { + /* Get Size of File */ + fseek(fp, 0L, SEEK_END); + long file_size = ftell(fp); + fseek(fp, 0L, SEEK_SET); + + /* Log Status */ + mlog(INFO, "Writing file %s of size %ld", fileName, file_size); + + do + { + /* Send Meta Record */ + RecordObject meta_record(metaRecType); + arrow_file_meta_t* meta = (arrow_file_meta_t*)meta_record.getRecordData(); + StringLib::copy(&meta->filename[0], parms->path, FILE_NAME_MAX_LEN); + meta->size = file_size; + if(!meta_record.post(outQ)) + { + status = false; + break; // early exit on error + } + + /* Send Data Records */ + long offset = 0; + while(offset < file_size) + { + RecordObject data_record(dataRecType, 0, false); + arrow_file_data_t* data = (arrow_file_data_t*)data_record.getRecordData(); + StringLib::copy(&data->filename[0], parms->path, FILE_NAME_MAX_LEN); + size_t bytes_read = fread(data->data, 1, FILE_BUFFER_RSPS_SIZE, fp); + if(!data_record.post(outQ, offsetof(arrow_file_data_t, data) + bytes_read)) + { + status = false; + break; // early exit on error + } + offset += bytes_read; + } + } while(false); + + /* Close File */ + int rc = fclose(fp); + if(rc != 0) + { + status = false; + mlog(CRITICAL, "Failed (%d) to close file %s: %s", rc, fileName, strerror(errno)); + } + } + else // unable to open file + { + status = false; + mlog(CRITICAL, "Failed (%d) to read file %s: %s", errno, fileName, strerror(errno)); + } + + /* Return Status */ + return status; +} + +/*---------------------------------------------------------------------------- + * getOutputPath + *----------------------------------------------------------------------------*/ +const char* getOutputPath(ArrowParms* parms) +{ + const char* outputPath = NULL; + + if(parms->asset_name) + { + /* Check Private Cluster */ + if(OsApi::getIsPublic()) + { + throw RunTimeException(CRITICAL, RTE_ERROR, "Unable to stage output on public cluster"); + } + + /* Generate Output Path */ + Asset* asset = dynamic_cast(LuaObject::getLuaObjectByName(parms->asset_name, Asset::OBJECT_TYPE)); + const char* path_prefix = StringLib::match(asset->getDriver(), "s3") ? "s3://" : ""; + const char* path_suffix = "bin"; + if(parms->format == ArrowParms::PARQUET) path_suffix = parms->as_geo ? ".geoparquet" : ".parquet"; + if(parms->format == ArrowParms::CSV) path_suffix = "csv"; + FString path_name("%s.%016lX", OsApi::getCluster(), OsApi::time(OsApi::CPU_CLK)); + bool use_provided_path = ((parms->path != NULL) && (parms->path[0] != '\0')); + FString path_str("%s%s/%s%s", path_prefix, asset->getPath(), use_provided_path ? parms->path : path_name.c_str(), path_suffix); + asset->releaseLuaObject(); + + /* Set Output Path */ + outputPath = path_str.c_str(true); + mlog(INFO, "Generating unique path: %s", outputPath); + } + else if((parms->path == NULL) || (parms->path[0] == '\0')) + { + throw RunTimeException(CRITICAL, RTE_ERROR, "Unable to determine output path"); + } + else + { + outputPath = StringLib::duplicate(parms->path); + } + + return outputPath; +} + +/*---------------------------------------------------------------------------- + * getUniqueFileName + *----------------------------------------------------------------------------*/ +const char* getUniqueFileName(const char* id) +{ + char uuid_str[UUID_STR_LEN]; + uuid_t uuid; + uuid_generate(uuid); + uuid_unparse_lower(uuid, uuid_str); + + std::string tmp_file(TMP_FILE_PREFIX); + + if(id) tmp_file.append(id).append("."); + else tmp_file.append("arrow."); + + tmp_file.append(uuid_str).append(".bin"); + return StringLib::duplicate(tmp_file.c_str()); +} + +/*---------------------------------------------------------------------------- + * removeFile + *----------------------------------------------------------------------------*/ +void removeFile(const char* fileName) +{ + if(std::filesystem::exists(fileName)) + { + int rc = std::remove(fileName); + if(rc != 0) + { + mlog(CRITICAL, "Failed (%d) to delete file %s: %s", rc, fileName, strerror(errno)); + } + } +} + +/*---------------------------------------------------------------------------- + * renameFile + *----------------------------------------------------------------------------*/ +void renameFile (const char* oldName, const char* newName) +{ + if(std::filesystem::exists(oldName)) + { + int rc = std::rename(oldName, newName); + if(rc != 0) + { + mlog(CRITICAL, "Failed (%d) to rename file %s to %s: %s", rc, oldName, newName, strerror(errno)); + } + } +} + +} /* namespace ArrowCommon */ \ No newline at end of file diff --git a/packages/arrow/ArrowImpl.h b/packages/arrow/ArrowCommon.h similarity index 50% rename from packages/arrow/ArrowImpl.h rename to packages/arrow/ArrowCommon.h index c2b79f205..ad41cffe2 100644 --- a/packages/arrow/ArrowImpl.h +++ b/packages/arrow/ArrowCommon.h @@ -29,33 +29,56 @@ * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ -#ifndef __arrow_impl__ -#define __arrow_impl__ +#ifndef __arrow_common__ +#define __arrow_common__ /****************************************************************************** * INCLUDES ******************************************************************************/ -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - #include "OsApi.h" +#include "MsgQ.h" +#include "ArrowParms.h" + + +/****************************************************************************** + * NAMESPACES + ******************************************************************************/ +namespace ArrowCommon +{ + + /****************************************************************************** + * CONSTANTS + ******************************************************************************/ + + /****************************************************************************** + * TYPES + ******************************************************************************/ + + typedef struct WKBPoint { + uint8_t byteOrder; + uint32_t wkbType; + double x; + double y; + } ALIGN_PACKED wkbpoint_t; + + + /****************************************************************************** + * METHODS + ******************************************************************************/ + + void init (void); + bool send2User (const char* fileName, const char* outputPath, + uint32_t traceId, ArrowParms* parms, Publisher* outQ); + bool send2S3 (const char* fileName, const char* s3dst, const char* outputPath, + ArrowParms* parms, Publisher* outQ); + bool send2Client(const char* fileName, ArrowParms* parms, Publisher* outQ); -typedef struct WKBPoint { - uint8_t byteOrder; - uint32_t wkbType; - double x; - double y; -} ALIGN_PACKED wkbpoint_t; + const char* getOutputPath(ArrowParms* parms); + const char* getUniqueFileName(const char* id = NULL); + void removeFile (const char* fileName); + void renameFile (const char* oldName, const char* newName); +} -#endif /* __arrow_impl__ */ +#endif /* __arrow_common__ */ diff --git a/packages/arrow/ArrowSampler.cpp b/packages/arrow/ArrowSampler.cpp index 0a5bd5ad4..e2c31ba85 100644 --- a/packages/arrow/ArrowSampler.cpp +++ b/packages/arrow/ArrowSampler.cpp @@ -64,16 +64,17 @@ int ArrowSampler::luaCreate(lua_State* L) /* Get Parameters */ _parms = dynamic_cast(getLuaObject(L, 1, ArrowParms::OBJECT_TYPE)); const char* input_file = getLuaString(L, 2); + const char* outq_name = getLuaString(L, 3); std::vector rasters; - /* Check if the third parameter is a table */ - luaL_checktype(L, 3, LUA_TTABLE); + /* Check if the parameter is a table */ + luaL_checktype(L, 4, LUA_TTABLE); /* first key for iteration */ lua_pushnil(L); - while(lua_next(L, 3) != 0) + while(lua_next(L, 4) != 0) { const char* rkey = getLuaString(L, -2); RasterObject* robj = dynamic_cast(getLuaObject(L, -1, RasterObject::OBJECT_TYPE)); @@ -84,7 +85,7 @@ int ArrowSampler::luaCreate(lua_State* L) } /* Create Dispatch */ - return createLuaObject(L, new ArrowSampler(L, _parms, input_file, rasters)); + return createLuaObject(L, new ArrowSampler(L, _parms, input_file, outq_name, rasters)); } catch(const RunTimeException& e) { @@ -174,6 +175,10 @@ void ArrowSampler::sample(void) if(alreadySampled) return; alreadySampled = true; + /* Start Trace */ + uint32_t trace_id = start_trace(INFO, traceId, "arrow_sampler", "{\"filename\":\"%s\"}", parquetFile); + EventLib::stashId(trace_id); + /* Start sampling threads */ for(sampler_t* sampler : samplers) { @@ -190,13 +195,28 @@ void ArrowSampler::sample(void) try { - impl->createOutpuFile(); + impl->createOutpuFiles(); + + /* Send Parquet File to User */ + ArrowCommon::send2User(parquetFile, outputPath, trace_id, parms, outQ); + ArrowCommon::removeFile(parquetFile); + + if(metadataFile) + { + /* Send Metadata File to User */ + ArrowCommon::send2User(metadataFile, outputMetadataPath, trace_id, parms, outQ); + ArrowCommon::removeFile(metadataFile); + } + } catch(const RunTimeException& e) { mlog(e.level(), "Error creating output file: %s", e.what()); + stop_trace(INFO, trace_id); throw; } + + stop_trace(INFO, trace_id); } @@ -208,9 +228,10 @@ void ArrowSampler::sample(void) * Constructor *----------------------------------------------------------------------------*/ ArrowSampler::ArrowSampler(lua_State* L, ArrowParms* _parms, const char* input_file, - const std::vector& rasters): + const char* outq_name, const std::vector& rasters): LuaObject(L, OBJECT_TYPE, LUA_META_NAME, LUA_META_TABLE), parms(_parms), + metadataFile(NULL), alreadySampled(false) { /* Add Lua sample function */ @@ -219,11 +240,14 @@ ArrowSampler::ArrowSampler(lua_State* L, ArrowParms* _parms, const char* input_f if (parms == NULL) throw RunTimeException(CRITICAL, RTE_ERROR, "Invalid ArrowParms object"); + if((parms->path == NULL) || (parms->path[0] == '\0')) + throw RunTimeException(CRITICAL, RTE_ERROR, "Invalid output file path"); + if ((input_file == NULL) || (input_file[0] == '\0')) throw RunTimeException(CRITICAL, RTE_ERROR, "Invalid input file path"); - if((parms->path == NULL) || (parms->path[0] == '\0')) - throw RunTimeException(CRITICAL, RTE_ERROR, "Invalid output file path"); + if ((outq_name == NULL) || (outq_name[0] == '\0')) + throw RunTimeException(CRITICAL, RTE_ERROR, "Invalid input queue name"); try { @@ -243,6 +267,19 @@ ArrowSampler::ArrowSampler(lua_State* L, ArrowParms* _parms, const char* input_f /* Allocate Implementation */ impl = new ArrowSamplerImpl(this); + /* Get Paths */ + outputPath = ArrowCommon::getOutputPath(parms); + outputMetadataPath = createMetadataFileName(outputPath); + + /* Create Unique Temporary Filenames */ + parquetFile = ArrowCommon::getUniqueFileName(); + metadataFile = createMetadataFileName(parquetFile); + + /* Initialize Queues */ + const int qdepth = 0x4000000; // 64MB + outQ = new Publisher(outq_name, Publisher::defaultFree, qdepth); + + /* Process Input File */ impl->processInputFile(input_file, points); } catch(const RunTimeException& e) @@ -277,9 +314,29 @@ void ArrowSampler::Delete(void) for(point_info_t* pinfo : points) delete pinfo; + delete [] parquetFile; + delete [] metadataFile; + delete [] outputPath; + delete [] outputMetadataPath; + delete outQ; delete impl; } +/*---------------------------------------------------------------------------- +* createMetadataFileName +*----------------------------------------------------------------------------*/ +char* ArrowSampler::createMetadataFileName(const char* file_path) +{ + std::string path(file_path); + size_t dotIndex = path.find_last_of("."); + if(dotIndex != std::string::npos) + { + path = path.substr(0, dotIndex); + } + path.append("_metadata.json"); + return StringLib::duplicate(path.c_str()); +} + /*---------------------------------------------------------------------------- * samplerThread *----------------------------------------------------------------------------*/ diff --git a/packages/arrow/ArrowSampler.h b/packages/arrow/ArrowSampler.h index 89d80061d..b575b9b75 100644 --- a/packages/arrow/ArrowSampler.h +++ b/packages/arrow/ArrowSampler.h @@ -108,13 +108,15 @@ class ArrowSampler: public LuaObject * Methods *--------------------------------------------------------------------*/ - static int luaCreate (lua_State* L); - static int luaSample (lua_State* L); - static void init (void); - static void deinit (void); - void sample (void); - const ArrowParms* getParms (void) {return parms;} - const std::vector& getSamplers (void) {return samplers;} + static int luaCreate (lua_State* L); + static int luaSample (lua_State* L); + static void init (void); + static void deinit (void); + void sample (void); + const ArrowParms* getParms (void) {return parms;} + const char* getParquetFile (void) {return parquetFile;} + const char* getMetadataFile(void) {return metadataFile;} + const std::vector& getSamplers (void) {return samplers;} private: @@ -127,10 +129,15 @@ class ArrowSampler: public LuaObject *--------------------------------------------------------------------*/ ArrowParms* parms; + Publisher* outQ; std::vector samplerPids; std::vector points; std::vector samplers; - ArrowSamplerImpl* impl; // private arrow data + ArrowSamplerImpl* impl; + const char* parquetFile; // used locally to build parquet file + const char* metadataFile; // used locally to build json metadata file + const char* outputPath; // final destination of the data file + const char* outputMetadataPath; // final destination of the metadata file bool alreadySampled; /*-------------------------------------------------------------------- @@ -138,9 +145,10 @@ class ArrowSampler: public LuaObject *--------------------------------------------------------------------*/ ArrowSampler (lua_State* L, ArrowParms* _parms, const char* input_file, - const std::vector& rasters); + const char* outq_name, const std::vector& rasters); ~ArrowSampler (void); void Delete (void); + char* createMetadataFileName(const char* file_path); static void* samplerThread (void* parm); }; diff --git a/packages/arrow/ArrowSamplerImpl.cpp b/packages/arrow/ArrowSamplerImpl.cpp index 201d9deb0..6aad7553a 100644 --- a/packages/arrow/ArrowSamplerImpl.cpp +++ b/packages/arrow/ArrowSamplerImpl.cpp @@ -42,12 +42,17 @@ #include "ArrowSamplerImpl.h" +#include +#include +#include +#include +#include +#include +#include #include #include #include #include -#include -#include /****************************************************************************** @@ -120,20 +125,12 @@ bool ArrowSamplerImpl::processSamples(ArrowSampler::sampler_t* sampler) } /*---------------------------------------------------------------------------- -* createOutputFile +* createOutputFiles *----------------------------------------------------------------------------*/ -void ArrowSamplerImpl::createOutpuFile(void) +void ArrowSamplerImpl::createOutpuFiles(void) { const ArrowParms* parms = arrowSampler->getParms(); - - if(std::filesystem::exists(parms->path)) - { - int rc = std::remove(parms->path); - if(rc != 0) - { - mlog(CRITICAL, "Failed (%d) to delete file %s: %s", rc, parms->path, strerror(errno)); - } - } + const char* parquetFile = arrowSampler->getParquetFile(); auto table = inputFileToTable(); auto updated_table = addNewColumns(table); @@ -141,18 +138,19 @@ void ArrowSamplerImpl::createOutpuFile(void) if(parms->format == ArrowParms::PARQUET) { - tableToParquetFile(updated_table, parms->path); + tableToParquetFile(updated_table, parquetFile); } else if(parms->format == ArrowParms::CSV) { /* Arrow csv writer cannot handle columns with WKB data */ table = removeGeometryColumn(updated_table); - tableToCsvFile(table, parms->path); + tableToCsvFile(table, parquetFile); /* Generate metadata file since Arrow csv writer ignores it */ - std::string mfile = createMetadataFileName(parms->path); - tableMetadataToJson(table, mfile.c_str()); + const char* metadataFile = arrowSampler->getMetadataFile(); + ArrowCommon::removeFile(metadataFile); + tableMetadataToJson(table, metadataFile); } else throw RunTimeException(CRITICAL, RTE_ERROR, "Unsupported file format"); } @@ -293,7 +291,7 @@ void ArrowSamplerImpl::getGeoPoints(std::vector& po for(int64_t i = 0; i < binary_array->length(); i++) { std::string wkb_data = binary_array->GetString(i); /* Get WKB data as string (binary data) */ - wkbpoint_t point = convertWKBToPoint(wkb_data); + ArrowCommon::wkbpoint_t point = convertWKBToPoint(wkb_data); ArrowSampler::point_info_t* pinfo = new ArrowSampler::point_info_t({point.x, point.y, 0.0}); points.push_back(pinfo); } @@ -781,11 +779,11 @@ std::shared_ptr ArrowSamplerImpl::removeGeometryColumn(const std:: /*---------------------------------------------------------------------------- * convertWKBToPoint *----------------------------------------------------------------------------*/ -wkbpoint_t ArrowSamplerImpl::convertWKBToPoint(const std::string& wkb_data) +ArrowCommon::wkbpoint_t ArrowSamplerImpl::convertWKBToPoint(const std::string& wkb_data) { - wkbpoint_t point; + ArrowCommon::wkbpoint_t point; - if(wkb_data.size() < sizeof(wkbpoint_t)) + if(wkb_data.size() < sizeof(ArrowCommon::wkbpoint_t)) { throw std::runtime_error("Invalid WKB data size."); } @@ -906,29 +904,6 @@ std::string ArrowSamplerImpl::createFileMap(void) return serialized_json; } -/*---------------------------------------------------------------------------- -* createMetadataFileName -*----------------------------------------------------------------------------*/ -std::string ArrowSamplerImpl::createMetadataFileName(const char* file_path) -{ - /* If file has extension .csv or .txt replace it with _metadata.json else append it */ - - std::vector extensions = {".csv", ".CSV", ".txt", ".TXT"}; - std::string path(file_path); - size_t dotIndex = path.find_last_of("."); - if(dotIndex != std::string::npos) - { - std::string extension = path.substr(dotIndex); - if(std::find(extensions.begin(), extensions.end(), extension) != extensions.end()) - { - path = path.substr(0, dotIndex); - } - } - path += "_metadata.json"; - return path; -} - - /*---------------------------------------------------------------------------- * tableMetadataToJson *----------------------------------------------------------------------------*/ diff --git a/packages/arrow/ArrowSamplerImpl.h b/packages/arrow/ArrowSamplerImpl.h index 619a6a269..64e9524ae 100644 --- a/packages/arrow/ArrowSamplerImpl.h +++ b/packages/arrow/ArrowSamplerImpl.h @@ -36,11 +36,14 @@ * INCLUDES ******************************************************************************/ -#include "ArrowImpl.h" +#include "ArrowCommon.h" #include "LuaObject.h" #include "ArrowSampler.h" #include "OsApi.h" +#include +#include + /****************************************************************************** * ARROW SAMPLER CLASS ******************************************************************************/ @@ -62,7 +65,7 @@ class ArrowSamplerImpl void processInputFile (const char* file_path, std::vector& points); bool processSamples (ArrowSampler::sampler_t* sampler); - void createOutpuFile (void); + void createOutpuFiles (void); private: @@ -109,10 +112,9 @@ class ArrowSamplerImpl void tableToCsvFile (const std::shared_ptr table, const char* file_path); std::shared_ptr removeGeometryColumn (const std::shared_ptr table); - wkbpoint_t convertWKBToPoint (const std::string& wkb_data); + ArrowCommon::wkbpoint_t convertWKBToPoint (const std::string& wkb_data); void printParquetMetadata (const char* file_path); std::string createFileMap (void); - std::string createMetadataFileName (const char* file_path); void tableMetadataToJson (const std::shared_ptr table, const char* file_path); }; diff --git a/packages/arrow/CMakeLists.txt b/packages/arrow/CMakeLists.txt index f8a30cb9f..9e3ef8b18 100644 --- a/packages/arrow/CMakeLists.txt +++ b/packages/arrow/CMakeLists.txt @@ -16,6 +16,7 @@ if (Arrow_FOUND AND Parquet_FOUND) PRIVATE ${CMAKE_CURRENT_LIST_DIR}/arrow.cpp ${CMAKE_CURRENT_LIST_DIR}/ArrowParms.cpp + ${CMAKE_CURRENT_LIST_DIR}/ArrowCommon.cpp ${CMAKE_CURRENT_LIST_DIR}/ArrowBuilderImpl.cpp ${CMAKE_CURRENT_LIST_DIR}/ArrowSamplerImpl.cpp ${CMAKE_CURRENT_LIST_DIR}/ArrowBuilder.cpp @@ -32,7 +33,7 @@ if (Arrow_FOUND AND Parquet_FOUND) FILES ${CMAKE_CURRENT_LIST_DIR}/arrow.h ${CMAKE_CURRENT_LIST_DIR}/ArrowParms.h - ${CMAKE_CURRENT_LIST_DIR}/ArrowImpl.h + ${CMAKE_CURRENT_LIST_DIR}/ArrowCommon.h ${CMAKE_CURRENT_LIST_DIR}/ArrowBuilderImpl.h ${CMAKE_CURRENT_LIST_DIR}/ArrowSamplerImpl.h ${CMAKE_CURRENT_LIST_DIR}/ArrowBuilder.h diff --git a/packages/arrow/arrow.cpp b/packages/arrow/arrow.cpp index 11d7ee988..480b40d30 100644 --- a/packages/arrow/arrow.cpp +++ b/packages/arrow/arrow.cpp @@ -35,6 +35,7 @@ #include "core.h" #include "arrow.h" +#include "ArrowCommon.h" /****************************************************************************** * DEFINES @@ -75,6 +76,7 @@ extern "C" { void initarrow (void) { /* Initialize Modules */ + ArrowCommon::init(); ArrowBuilder::init(); ArrowSampler::init(); diff --git a/scripts/selftests/parquet_sampler.lua b/scripts/selftests/parquet_sampler.lua index 2c0468871..903bc2bc3 100644 --- a/scripts/selftests/parquet_sampler.lua +++ b/scripts/selftests/parquet_sampler.lua @@ -4,12 +4,23 @@ asset = require("asset") local assets = asset.loaddir() local td = runner.rootdir(arg[0]) +local outq_name = "outq-luatest" + local in_geoparquet = td.."atl06_10rows.geoparquet" local in_parquet = td.."atl06_10rows.parquet" -local out_geoparquet = td.."samples.geoparquet" -local out_parquet = td.."samples.parquet" -local out_csv = td.."samples.csv" -local out_metadata = td.."samples_metadata.json" + +-- Indicates local file system (no s3 or client) +local prefix = "file://" + +local _out_geoparquet = "/tmp/samples.geoparquet" +local _out_parquet = "/tmp/samples.parquet" +local _out_csv = "/tmp/samples.csv" +local _out_metadata = "/tmp/samples_metadata.json" + +local out_geoparquet = prefix .. _out_geoparquet +local out_parquet = prefix .. _out_parquet +local out_csv = prefix .. _out_csv +local out_metadata = prefix .. _out_metadata -- console.monitor:config(core.LOG, core.DEBUG) -- sys.setlvl(core.LOG, core.DEBUG) @@ -32,73 +43,73 @@ local dem2 = geo.raster(geo.parms({asset="arcticdem-strips", algorithm="NearestN runner.check(dem2 ~= nil) print('\n--------------------------------------\nTest01: input/output geoparquet (geo)\n--------------------------------------') -local parquet_sampler = arrow.sampler(arrow.parms({path=out_geoparquet, format="parquet"}), in_geoparquet, {["mosaic"] = dem1}) +local parquet_sampler = arrow.sampler(arrow.parms({path=out_geoparquet, format="parquet"}), in_geoparquet, outq_name, {["mosaic"] = dem1}) runner.check(parquet_sampler ~= nil) local in_file_size = getFileSize(in_geoparquet); print("Input geoparquet file size: " .. in_file_size .. " bytes") local status = parquet_sampler:sample() -local out_file_size = getFileSize(out_geoparquet); +local out_file_size = getFileSize(_out_geoparquet); print("Output geoparquet file size: " .. out_file_size .. " bytes") runner.check(out_file_size > in_file_size, "Output file size is not greater than input file size: ") print('\n--------------------------------------\nTest02: input/output parquet (x, y)\n--------------------------------------') -parquet_sampler = arrow.sampler(arrow.parms({path=out_parquet, format="parquet"}), in_parquet, {["mosaic"] = dem1}) +parquet_sampler = arrow.sampler(arrow.parms({path=out_parquet, format="parquet"}), in_parquet, outq_name, {["mosaic"] = dem1}) runner.check(parquet_sampler ~= nil) in_file_size = getFileSize(in_parquet); print("Input parquet file size: " .. in_file_size .. " bytes") status = parquet_sampler:sample() -out_file_size = getFileSize(out_parquet); +out_file_size = getFileSize(_out_parquet); print("Output parquet file size: " .. out_file_size .. " bytes") runner.check(out_file_size > in_file_size, "Output file size is not greater than input file size: ") --NOTE: generated CSV files are much smaller than the input parquet/geoparquet files print('\n--------------------------------------\nTest03: input geoparquet, output CSV\n--------------------------------------') -parquet_sampler = arrow.sampler(arrow.parms({path=out_csv, format="csv"}), in_geoparquet, {["mosaic"] = dem1}) +parquet_sampler = arrow.sampler(arrow.parms({path=out_csv, format="csv"}), in_geoparquet, outq_name, {["mosaic"] = dem1}) runner.check(parquet_sampler ~= nil) in_file_size = getFileSize(in_geoparquet); print("Input geoparquet file size: " .. in_file_size .. " bytes") status = parquet_sampler:sample() -out_file_size = getFileSize(out_csv); +out_file_size = getFileSize(_out_csv); print("Output CSV file size: " .. out_file_size .. " bytes") runner.check(out_file_size < in_file_size, "Output CSV file size is not smaller than input file size: ") -meta_file_size = getFileSize(out_metadata); +meta_file_size = getFileSize(_out_metadata); print("Output metadata file size: " .. meta_file_size .. " bytes") runner.check(meta_file_size > 0, "Output metadata json file size is empty: ") print('\n--------------------------------------\nTest04: input parquet, output CSV \n--------------------------------------') -parquet_sampler = arrow.sampler(arrow.parms({path=out_csv, format="csv"}), in_parquet, {["mosaic"] = dem1, ["strips"] = dem2}) +parquet_sampler = arrow.sampler(arrow.parms({path=out_csv, format="csv"}), in_parquet, outq_name, {["mosaic"] = dem1, ["strips"] = dem2}) runner.check(parquet_sampler ~= nil) in_file_size = getFileSize(in_parquet); print("Input parquet file size: " .. in_file_size .. " bytes") status = parquet_sampler:sample() -out_file_size = getFileSize(out_csv); +out_file_size = getFileSize(_out_csv); print("Output CSV file size: " .. out_file_size .. " bytes") runner.check(out_file_size < in_file_size, "Output CSV file size is not smaller than input file size: ") -meta_file_size = getFileSize(out_metadata); +meta_file_size = getFileSize(_out_metadata); print("Output metadata file size: " .. meta_file_size .. " bytes") runner.check(meta_file_size > 0, "Output metadata json file size is empty: ") print('\n--------------------------------------\nTest05: input/output geoparquet (geo)\n--------------------------------------') -local parquet_sampler = arrow.sampler(arrow.parms({path=out_geoparquet, format="parquet"}), in_geoparquet, {["mosaic"] = dem1, ["strips"] = dem2}) +local parquet_sampler = arrow.sampler(arrow.parms({path=out_geoparquet, format="parquet"}), in_geoparquet, outq_name, {["mosaic"] = dem1, ["strips"] = dem2}) runner.check(parquet_sampler ~= nil) local in_file_size = getFileSize(in_geoparquet); print("Input geoparquet file size: " .. in_file_size .. " bytes") local status = parquet_sampler:sample() -local out_file_size = getFileSize(out_geoparquet); +local out_file_size = getFileSize(_out_geoparquet); print("Output geoparquet file size: " .. out_file_size .. " bytes") runner.check(out_file_size > in_file_size, "Output file size is not greater than input file size: ") @@ -107,10 +118,10 @@ runner.check(out_file_size > in_file_size, "Output file size is not greater than -- the files were tested with python scripts -- Remove the output files -os.remove(out_geoparquet) -os.remove(out_parquet) -os.remove(out_csv) -os.remove(out_metadata) +os.remove(_out_geoparquet) +os.remove(_out_parquet) +os.remove(_out_csv) +os.remove(_out_metadata) -- Report Results --