Skip to content

Commit

Permalink
Implemented ArrowCommon module
Browse files Browse the repository at this point in the history
  • Loading branch information
elidwa committed Apr 12, 2024
1 parent 85274c7 commit e0036b2
Show file tree
Hide file tree
Showing 13 changed files with 598 additions and 363 deletions.
226 changes: 5 additions & 221 deletions packages/arrow/ArrowBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,25 +52,6 @@ const struct luaL_Reg ArrowBuilder::LUA_META_TABLE[] = {
{NULL, NULL}
};

const char* ArrowBuilder::metaRecType = "arrowrec.meta";
const RecordObject::fieldDef_t ArrowBuilder::metaRecDef[] = {
{"filename", RecordObject::STRING, offsetof(arrow_file_meta_t, filename), FILE_NAME_MAX_LEN, NULL, NATIVE_FLAGS},
{"size", RecordObject::INT64, offsetof(arrow_file_meta_t, size), 1, NULL, NATIVE_FLAGS}
};

const char* ArrowBuilder::dataRecType = "arrowrec.data";
const RecordObject::fieldDef_t ArrowBuilder::dataRecDef[] = {
{"filename", RecordObject::STRING, offsetof(arrow_file_data_t, filename), FILE_NAME_MAX_LEN, NULL, NATIVE_FLAGS},
{"data", RecordObject::UINT8, offsetof(arrow_file_data_t, data), 0, NULL, NATIVE_FLAGS} // variable length
};

const char* ArrowBuilder::remoteRecType = "arrowrec.remote";
const RecordObject::fieldDef_t ArrowBuilder::remoteRecDef[] = {
{"url", RecordObject::STRING, offsetof(arrow_file_remote_t, url), URL_MAX_LEN, NULL, NATIVE_FLAGS},
{"size", RecordObject::INT64, offsetof(arrow_file_remote_t, size), 1, NULL, NATIVE_FLAGS}
};

const char* ArrowBuilder::TMP_FILE_PREFIX = "/tmp/";

/******************************************************************************
* PUBLIC METHODS
Expand Down Expand Up @@ -108,9 +89,6 @@ int ArrowBuilder::luaCreate (lua_State* L)
*----------------------------------------------------------------------------*/
void ArrowBuilder::init (void)
{
RECDEF(metaRecType, metaRecDef, sizeof(arrow_file_meta_t), NULL);
RECDEF(dataRecType, dataRecDef, sizeof(arrow_file_data_t), NULL);
RECDEF(remoteRecType, remoteRecDef, sizeof(arrow_file_remote_t), NULL);
}

/*----------------------------------------------------------------------------
Expand Down Expand Up @@ -287,37 +265,7 @@ ArrowBuilder::ArrowBuilder (lua_State* L, ArrowParms* _parms,
}

/* Get Path */
if(parms->asset_name)
{
/* Check Private Cluster */
if(OsApi::getIsPublic())
{
throw RunTimeException(CRITICAL, RTE_ERROR, "Unable to stage output on public cluster");
}

/* Generate Output Path */
Asset* asset = dynamic_cast<Asset*>(LuaObject::getLuaObjectByName(parms->asset_name, Asset::OBJECT_TYPE));
const char* path_prefix = StringLib::match(asset->getDriver(), "s3") ? "s3://" : "";
const char* path_suffix = "bin";
if(parms->format == ArrowParms::PARQUET) path_suffix = parms->as_geo ? ".geoparquet" : ".parquet";
if(parms->format == ArrowParms::CSV) path_suffix = "csv";
FString path_name("%s.%016lX", OsApi::getCluster(), OsApi::time(OsApi::CPU_CLK));
bool use_provided_path = ((parms->path != NULL) && (parms->path[0] != '\0'));
FString path_str("%s%s/%s%s", path_prefix, asset->getPath(), use_provided_path ? parms->path : path_name.c_str(), path_suffix);
asset->releaseLuaObject();

/* Set Output Path */
outputPath = path_str.c_str(true);
mlog(INFO, "Generating unique path: %s", outputPath);
}
else if((parms->path == NULL) || (parms->path[0] == '\0'))
{
throw RunTimeException(CRITICAL, RTE_ERROR, "Unable to determine output path");
}
else
{
outputPath = StringLib::duplicate(parms->path);
}
outputPath = ArrowCommon::getOutputPath(parms);

/*
* NO THROWING BEYOND THIS POINT
Expand All @@ -344,8 +292,7 @@ ArrowBuilder::ArrowBuilder (lua_State* L, ArrowParms* _parms,
inQ = new Subscriber(inq_name, MsgQ::SUBSCRIBER_OF_CONFIDENCE, qdepth);

/* Create Unique Temporary Filename */
FString tmp_file("%s%s.bin", TMP_FILE_PREFIX, id);
fileName = tmp_file.c_str(true);
fileName = ArrowCommon::getUniqueFileName(id);

/* Allocate Implementation */
impl = new ArrowBuilderImpl(this);
Expand Down Expand Up @@ -383,7 +330,7 @@ void* ArrowBuilder::builderThread(void* parm)
int row_cnt = 0;

/* Start Trace */
uint32_t trace_id = start_trace(INFO, builder->traceId, "parquet_builder", "{\"filename\":\"%s\"}", builder->fileName);
uint32_t trace_id = start_trace(INFO, builder->traceId, "arrow_builder", "{\"filename\":\"%s\"}", builder->fileName);
EventLib::stashId(trace_id);

/* Loop Forever */
Expand Down Expand Up @@ -539,33 +486,8 @@ void* ArrowBuilder::builderThread(void* parm)
builder->recordBatch.clear();

/* Send File to User */
const char* _path = builder->outputPath;
uint32_t send_trace_id = start_trace(INFO, trace_id, "send_file", "{\"path\": \"%s\"}", _path);
int _path_len = StringLib::size(_path);
if( (_path_len > 5) &&
(_path[0] == 's') &&
(_path[1] == '3') &&
(_path[2] == ':') &&
(_path[3] == '/') &&
(_path[4] == '/'))
{
/* Upload File to S3 */
builder->send2S3(&_path[5]);
}
else
{
/* Stream File Back to Client */
builder->send2Client();
}

/* Remove File */
int rc = remove(builder->fileName);
if(rc != 0)
{
mlog(CRITICAL, "Failed (%d) to delete file %s: %s", rc, builder->fileName, strerror(errno));
}

stop_trace(INFO, send_trace_id);
ArrowCommon::send2User(builder->fileName, builder->outputPath, trace_id, builder->parms, builder->outQ);
ArrowCommon::removeFile(builder->fileName);

/* Signal Completion */
builder->signalComplete();
Expand All @@ -577,141 +499,3 @@ void* ArrowBuilder::builderThread(void* parm)
return NULL;
}

/*----------------------------------------------------------------------------
* send2S3
*----------------------------------------------------------------------------*/
bool ArrowBuilder::send2S3 (const char* s3dst)
{
#ifdef __aws__

bool status = true;

/* Check Path */
if(!s3dst) return false;

/* Get Bucket and Key */
char* bucket = StringLib::duplicate(s3dst);
char* key = bucket;
while(*key != '\0' && *key != '/') key++;
if(*key == '/')
{
*key = '\0';
}
else
{
status = false;
mlog(CRITICAL, "invalid S3 url: %s", s3dst);
}
key++;

/* Put File */
if(status)
{
/* Send Initial Status */
alert(INFO, RTE_INFO, outQ, NULL, "Initiated upload of results to S3, bucket = %s, key = %s", bucket, key);

try
{
/* Upload to S3 */
int64_t bytes_uploaded = S3CurlIODriver::put(fileName, bucket, key, parms->region, &parms->credentials);

/* Send Successful Status */
alert(INFO, RTE_INFO, outQ, NULL, "Upload to S3 completed, bucket = %s, key = %s, size = %ld", bucket, key, bytes_uploaded);

/* Send Remote Record */
RecordObject remote_record(remoteRecType);
arrow_file_remote_t* remote = (arrow_file_remote_t*)remote_record.getRecordData();
StringLib::copy(&remote->url[0], outputPath, URL_MAX_LEN);
remote->size = bytes_uploaded;
if(!remote_record.post(outQ))
{
mlog(CRITICAL, "Failed to send remote record back to user for %s", outputPath);
}
}
catch(const RunTimeException& e)
{
status = false;

/* Send Error Status */
alert(e.level(), RTE_ERROR, outQ, NULL, "Upload to S3 failed, bucket = %s, key = %s, error = %s", bucket, key, e.what());
}
}

/* Clean Up */
delete [] bucket;

/* Return Status */
return status;

#else
alert(CRITICAL, RTE_ERROR, outQ, NULL, "Output path specifies S3, but server compiled without AWS support");
return false;
#endif
}

/*----------------------------------------------------------------------------
* send2Client
*----------------------------------------------------------------------------*/
bool ArrowBuilder::send2Client (void)
{
bool status = true;

/* Reopen Parquet File to Stream Back as Response */
FILE* fp = fopen(fileName, "r");
if(fp)
{
/* Get Size of File */
fseek(fp, 0L, SEEK_END);
long file_size = ftell(fp);
fseek(fp, 0L, SEEK_SET);

/* Log Status */
mlog(INFO, "Writing file %s of size %ld", fileName, file_size);

do
{
/* Send Meta Record */
RecordObject meta_record(metaRecType);
arrow_file_meta_t* meta = (arrow_file_meta_t*)meta_record.getRecordData();
StringLib::copy(&meta->filename[0], parms->path, FILE_NAME_MAX_LEN);
meta->size = file_size;
if(!meta_record.post(outQ))
{
status = false;
break; // early exit on error
}

/* Send Data Records */
long offset = 0;
while(offset < file_size)
{
RecordObject data_record(dataRecType, 0, false);
arrow_file_data_t* data = (arrow_file_data_t*)data_record.getRecordData();
StringLib::copy(&data->filename[0], parms->path, FILE_NAME_MAX_LEN);
size_t bytes_read = fread(data->data, 1, FILE_BUFFER_RSPS_SIZE, fp);
if(!data_record.post(outQ, offsetof(arrow_file_data_t, data) + bytes_read))
{
status = false;
break; // early exit on error
}
offset += bytes_read;
}
} while(false);

/* Close File */
int rc = fclose(fp);
if(rc != 0)
{
status = false;
mlog(CRITICAL, "Failed (%d) to close file %s: %s", rc, fileName, strerror(errno));
}
}
else // unable to open file
{
status = false;
mlog(CRITICAL, "Failed (%d) to read file %s: %s", errno, fileName, strerror(errno));
}

/* Return Status */
return status;
}
31 changes: 0 additions & 31 deletions packages/arrow/ArrowBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,27 +69,13 @@ class ArrowBuilder: public LuaObject
* Constants
*--------------------------------------------------------------------*/

static const int FILE_NAME_MAX_LEN = 128;
static const int URL_MAX_LEN = 512;
static const int FILE_BUFFER_RSPS_SIZE = 0x2000000; // 32MB
static const int ROW_GROUP_SIZE = 0x4000000; // 64MB
static const int QUEUE_BUFFER_FACTOR = 3;

static const char* OBJECT_TYPE;
static const char* LUA_META_NAME;
static const struct luaL_Reg LUA_META_TABLE[];

static const char* metaRecType;
static const RecordObject::fieldDef_t metaRecDef[];

static const char* dataRecType;
static const RecordObject::fieldDef_t dataRecDef[];

static const char* remoteRecType;
static const RecordObject::fieldDef_t remoteRecDef[];

static const char* TMP_FILE_PREFIX;

/*--------------------------------------------------------------------
* Types
*--------------------------------------------------------------------*/
Expand Down Expand Up @@ -127,21 +113,6 @@ class ArrowBuilder: public LuaObject
RecordObject::field_t y_field;
} geo_data_t;

typedef struct {
char filename[FILE_NAME_MAX_LEN];
long size;
} arrow_file_meta_t;

typedef struct {
char filename[FILE_NAME_MAX_LEN];
uint8_t data[FILE_BUFFER_RSPS_SIZE];
} arrow_file_data_t;

typedef struct {
char url[URL_MAX_LEN];
long size;
} arrow_file_remote_t;

/*--------------------------------------------------------------------
* Methods
*--------------------------------------------------------------------*/
Expand Down Expand Up @@ -205,8 +176,6 @@ class ArrowBuilder: public LuaObject
const char* rec_type, const char* id);
~ArrowBuilder (void);
static void* builderThread (void* parm);
bool send2S3 (const char* s3dst);
bool send2Client (void);
};

#endif /* __arrow_builder__ */
15 changes: 12 additions & 3 deletions packages/arrow/ArrowBuilderImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,15 @@
#include "core.h"
#include "ArrowBuilderImpl.h"

#include <parquet/arrow/schema.h>
#include <parquet/arrow/writer.h>
#include <parquet/arrow/schema.h>
#include <parquet/properties.h>
#include <arrow/table.h>
#include <arrow/util/key_value_metadata.h>
#include <arrow/io/file.h>
#include <arrow/builder.h>
#include <parquet/file_writer.h>
#include <arrow/csv/writer.h>
#include <regex>

Expand Down Expand Up @@ -1219,15 +1228,15 @@ void ArrowBuilderImpl::processGeometry (RecordObject::field_t& x_field, RecordOb
{
arrow::BinaryBuilder builder;
(void)builder.Reserve(num_rows);
(void)builder.ReserveData(num_rows * sizeof(wkbpoint_t));
(void)builder.ReserveData(num_rows * sizeof(ArrowCommon::wkbpoint_t));
for(int i = 0; i < record_batch.length(); i++)
{
ArrowBuilder::batch_t* batch = record_batch[i];
int32_t starting_x_offset = x_field.offset;
int32_t starting_y_offset = y_field.offset;
for(int row = 0; row < batch->rows; row++)
{
wkbpoint_t point = {
ArrowCommon::wkbpoint_t point = {
#ifdef __be__
.byteOrder = 0,
#else
Expand All @@ -1237,7 +1246,7 @@ void ArrowBuilderImpl::processGeometry (RecordObject::field_t& x_field, RecordOb
.x = batch->pri_record->getValueReal(x_field),
.y = batch->pri_record->getValueReal(y_field)
};
(void)builder.UnsafeAppend((uint8_t*)&point, sizeof(wkbpoint_t));
(void)builder.UnsafeAppend((uint8_t*)&point, sizeof(ArrowCommon::wkbpoint_t));
if(x_field.flags & RecordObject::BATCH) x_field.offset += batch_row_size_bits;
if(y_field.flags & RecordObject::BATCH) y_field.offset += batch_row_size_bits;
}
Expand Down
3 changes: 2 additions & 1 deletion packages/arrow/ArrowBuilderImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
* INCLUDES
******************************************************************************/

#include "ArrowImpl.h"
#include "ArrowCommon.h"
#include "LuaObject.h"
#include "Ordering.h"
#include "RecordObject.h"
Expand All @@ -45,6 +45,7 @@
#include "OsApi.h"
#include "MsgQ.h"

#include <parquet/arrow/schema.h>

/******************************************************************************
* ARROW BUILDER CLASS
Expand Down
Loading

0 comments on commit e0036b2

Please sign in to comment.