Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for VARBINARY columns #131

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
263 changes: 153 additions & 110 deletions odbc_fdw.c
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ PG_MODULE_MAGIC;
#define SQLTABLES_NAME_COLUMN 3

#define ODBC_SQLSTATE_FRACTIONAL_TRUNCATION "01S07"
#define ODBC_SQLSTATE_STRING_TRUNCATION "01004"
#define ODBC_SQLSTATE_LENGTH 5
typedef enum { NO_TRUNCATION, FRACTIONAL_TRUNCATION, STRING_TRUNCATION } GetDataTruncation;

typedef struct odbcFdwOptions
{
char *schema; /* Foreign schema name */
Expand Down Expand Up @@ -174,7 +178,62 @@ static struct odbcFdwOption valid_options[] =
{ NULL, InvalidOid}
};

typedef enum { TEXT_CONVERSION, HEX_CONVERSION, BIN_CONVERSION, BOOL_CONVERSION } ColumnConversion;
typedef enum { TEXT_CONVERSION, BIN_CONVERSION, BOOL_CONVERSION } ColumnConversion;

static GetDataTruncation
result_truncation(SQLRETURN ret, SQLHSTMT stmt)
{
SQLCHAR sqlstate[ODBC_SQLSTATE_LENGTH + 1];
GetDataTruncation truncation = NO_TRUNCATION;
if (ret == SQL_SUCCESS_WITH_INFO)
{
SQLGetDiagRec(SQL_HANDLE_STMT, stmt, 1, sqlstate, NULL, NULL, 0, NULL);
if (strncmp((char*)sqlstate, ODBC_SQLSTATE_STRING_TRUNCATION, ODBC_SQLSTATE_LENGTH) == 0)
{
truncation = STRING_TRUNCATION;
}
else if (strncmp((char*)sqlstate, ODBC_SQLSTATE_FRACTIONAL_TRUNCATION, ODBC_SQLSTATE_LENGTH) == 0)
{
truncation = FRACTIONAL_TRUNCATION;
}
}
return truncation;
}

static void
resize_buffer(char ** buffer, int *size, int used_size, int required_size)
{
if (required_size > *size)
{
int new_size = required_size; // TODO: use min increment size, maybe in relation to current size
char * new_buffer = (char *) palloc(new_size);
// TODO: out of memory error if !new_buffer
if (used_size > 0)
{
memmove(new_buffer, *buffer, used_size);
pfree(*buffer);
}
*buffer = new_buffer;
*size = new_size;
}
}

static const char * HEX_DIGITS = "0123456789ABCDEF";

static char * binary_to_hex(char * buffer, int buffer_size)
{
int i;
int hex_size = buffer_size*2;
char * hex = (char *) palloc(hex_size + 1);
hex[hex_size] = 0;
for (i=0; i<buffer_size; i++)
{
unsigned char byte = buffer[i];
hex[i*2] = HEX_DIGITS[(byte >> 4)];
hex[i*2+1] = HEX_DIGITS[(byte & 0xF)];
}
return hex;
}

/*
* SQL functions
Expand Down Expand Up @@ -726,17 +785,16 @@ odbcGetTableOptions(Oid foreigntableid, odbcFdwOptions *extracted_options)
static void
check_return(SQLRETURN ret, char *msg, SQLHANDLE handle, SQLSMALLINT type)
{
static char error_msg[MAX_ERROR_MSG_LENGTH+1];
int err_code = ERRCODE_SYSTEM_ERROR;

strncpy(error_msg, msg, MAX_ERROR_MSG_LENGTH);

SQLINTEGER i = 0;
SQLINTEGER native;
SQLCHAR state[ 7 ];
SQLCHAR text[256];
SQLSMALLINT len;
SQLRETURN diag_ret;
static char error_msg[MAX_ERROR_MSG_LENGTH+1];
int err_code = ERRCODE_SYSTEM_ERROR;

strncpy(error_msg, msg, MAX_ERROR_MSG_LENGTH);

if (!SQL_SUCCEEDED(ret))
{
Expand All @@ -754,7 +812,7 @@ check_return(SQLRETURN ret, char *msg, SQLHANDLE handle, SQLSMALLINT type)
elog(DEBUG1, " %s:%ld:%ld:%s\n", state, (long int) i, (long int) native, text);
#endif
strncat(error_msg, ERROR_MSG_SEP, MAX_ERROR_MSG_LENGTH - strlen(ERROR_MSG_SEP));
strncat(error_msg, text, MAX_ERROR_MSG_LENGTH - strlen(error_msg));
strncat(error_msg, (char *)text, MAX_ERROR_MSG_LENGTH - strlen(error_msg));
}
}
while( diag_ret == SQL_SUCCESS );
Expand Down Expand Up @@ -1581,7 +1639,7 @@ odbcIterateForeignScan(ForeignScanState *node)
sql_data_type(DataTypePtr, ColumnSizePtr, DecimalDigitsPtr, NullablePtr, &sql_type);
if (strcmp("bytea", (char*)sql_type.data) == 0)
{
conversion = HEX_CONVERSION;
conversion = BIN_CONVERSION;
}
if (strcmp("boolean", (char*)sql_type.data) == 0)
{
Expand Down Expand Up @@ -1645,163 +1703,148 @@ odbcIterateForeignScan(ForeignScanState *node)
/* Loop through the columns */
for (i = 1; i <= columns; i++)
{
SQLLEN indicator;
char * buf;

int mask_index = i - 1;
int col_size = list_nth_int(col_size_array, mask_index);
int mapped_pos = list_nth_int(col_position_mask, mask_index);
ColumnConversion conversion = list_nth_int(col_conversion_array, mask_index);
SQLSMALLINT target_type = SQL_C_CHAR;
SQLLEN result_size;
int chunk_size, effective_chunk_size;
int buffer_size = 0;
char * buffer = 0;
char * hex;
int used_buffer_size = 0;
GetDataTruncation truncation;
bool binary_data = false;
if (conversion == BIN_CONVERSION)
{
target_type = SQL_C_BINARY;
binary_data = true;
}

if (col_size == 0) {
if (col_size == 0)
{
col_size = 1024;
}

chunk_size = binary_data ? col_size : col_size + 1;

/* Ignore this column if position is marked as invalid */
if (mapped_pos == -1)
continue;

buf = (char *) palloc(sizeof(char) * (col_size+1));

/* retrieve column data as a zero-terminated string */
/* TODO:
binary fields (SQL_C_BIT, SQL_C_BINARY) do not have
a trailing zero; they should be copied as now but without
adding 1 to col_size, or using SQL_C_BIT or SQL_C_BINARY
and then encoded into a binary PG literal (e.g. X'...'
or B'...')
For floating point types we should use SQL_C_FLOAT/SQL_C_DOUBLE
to avoid precision loss.
For date/time/timestamp these structures can be used:
SQL_C_TYPE_DATE/SQL_C_TYPE_TIME/SQL_C_TYPE_TIMESTAMP.
And finally, SQL_C_NUMERIC and SQL_C_GUID could also be used.
*/
buf[0] = 0;
ret = SQLGetData(stmt, i, SQL_C_CHAR,
buf, sizeof(char) * (col_size+1), &indicator);

if (ret == SQL_SUCCESS_WITH_INFO)
do // Loop for reading the field in chunks
{
SQLCHAR sqlstate[5];
SQLGetDiagRec(SQL_HANDLE_STMT, stmt, 1, sqlstate, NULL, NULL, 0, NULL);
if (strcmp((char*)sqlstate, ODBC_SQLSTATE_FRACTIONAL_TRUNCATION) == 0)
resize_buffer(&buffer, &buffer_size, used_buffer_size, used_buffer_size + chunk_size);
ret = SQLGetData(stmt, i, target_type, buffer + used_buffer_size, chunk_size, &result_size);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this missing error handling? result_truncation only does something if it was ok, but no error is raised if there was an error here.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error checking is performed further below (check_return), first we handle non-error cases 🤔 which now I see is not very elegant... but I'll leave that for when I take on the TODO comment associated with said error checking.

effective_chunk_size = chunk_size;
if (!binary_data && buffer[used_buffer_size + chunk_size - 1] == 0)
{
/* Fractional truncation has occured;
* at this point we cannot obtain the lost digits
*/
if (buf[col_size])
{
/* The driver has omitted the trailing */
char *buf2 = (char *) palloc(sizeof(char) * (col_size+2));
strncpy(buf2, buf, col_size+1);
buf2[col_size+1] = 0;
pfree(buf);
buf = buf2;
}
elog(NOTICE,"Truncating number: %s",buf);
effective_chunk_size--;
}
else
truncation = result_truncation(ret, stmt);
if (truncation == STRING_TRUNCATION)
{
/* The output is incomplete, we need to obtain the rest of the data */
char* accum_buffer;
size_t accum_buffer_size;
size_t accum_used = 0;
if (indicator == SQL_NO_TOTAL)
if (result_size == SQL_NO_TOTAL)
{
/* Unknown total size, must copy part by part */
accum_buffer_size = 0;
accum_buffer = NULL;
while (1)
// no info about remaining data size; keep reading with same chunk_size
used_buffer_size += effective_chunk_size;
}
else
{
// we read chunk_size, but there was result_size pending in total;
// adjust chunk_size for the remaining, so next wil hopely be the final chunk
used_buffer_size += effective_chunk_size;
// note that we need to read result_size - effective_chunk_size more data bytes,
chunk_size = (int)result_size - effective_chunk_size;
// wait, maybe we don't need to read, just append a zero!
if (chunk_size == 0)
{
size_t buf_len = buf[col_size] ? col_size + 1 : col_size;
// Allocate new accumulation buffer if necessary
if (accum_used + buf_len > accum_buffer_size)
if (!binary_data)
{
char *new_buff;
accum_buffer_size = accum_buffer_size == 0 ? col_size*2 : accum_buffer_size*2;
new_buff = (char *) palloc(sizeof(char) * (accum_buffer_size+1));
if (accum_buffer)
{
memmove(new_buff, accum_buffer, accum_used);
pfree(accum_buffer);
}
accum_buffer = new_buff;
accum_buffer[accum_used] = 0;
resize_buffer(&buffer, &buffer_size, used_buffer_size, used_buffer_size + 1);
buffer[used_buffer_size - 1] = 0;
}
// Copy part to the accumulation buffer
strncpy(accum_buffer+accum_used, buf, buf_len);
accum_used += buf_len;
accum_buffer[accum_used] = 0;
// Get new part
if (ret != SQL_SUCCESS_WITH_INFO)
break;
ret = SQLGetData(stmt, i, SQL_C_CHAR, buf, sizeof(char) * (col_size+1), &indicator);
};

break;
}
if (!binary_data)
{
chunk_size += 1;
}
}
else
}
else if (truncation == FRACTIONAL_TRUNCATION)
{
/* Fractional truncation has occurred;
* at this point we cannot obtain the lost digits
*/
used_buffer_size += effective_chunk_size;
if (chunk_size == effective_chunk_size)
{
/* We need to retrieve indicator more characters */
size_t buf_len = buf[col_size] ? col_size + 1 : col_size;
accum_buffer_size = buf_len + indicator;
accum_buffer = (char *) palloc(sizeof(char) * (accum_buffer_size+1));
strncpy(accum_buffer, buf, buf_len);
accum_buffer[buf_len] = 0;
ret = SQLGetData(stmt, i, SQL_C_CHAR, accum_buffer+buf_len, sizeof(char) * (indicator+1), &indicator);
/* The driver has omitted the trailing zero */
resize_buffer(&buffer, &buffer_size, used_buffer_size, used_buffer_size + 1);
buffer[used_buffer_size] = 0;
}
pfree(buf);
buf = accum_buffer;
elog_debug("Truncating number: %s", buffer);
}
else // NO_TRUNCATION: finish reading
{
used_buffer_size += effective_chunk_size;
}
} while (truncation == STRING_TRUNCATION && chunk_size > 0);

if (!binary_data)
{
used_buffer_size = strnlen(buffer, used_buffer_size);
}
else

if (ret != SQL_SUCCESS_WITH_INFO)
{
// TODO: review check_result behaviour for SQL_SUCCESS_WITH_INFO (it should not fail right?)
check_return(ret, "Reading data", stmt, SQL_HANDLE_STMT);
}

if (SQL_SUCCEEDED(ret))
{
/* Handle null columns */
if (indicator == SQL_NULL_DATA)
if (result_size == SQL_NULL_DATA)
{
// BuildTupleFromCStrings expects NULLs to be NULL pointers
values[mapped_pos] = NULL;
}
else
{
if (festate->encoding != -1)
if (festate->encoding != -1 && !binary_data)
{
/* Convert character encoding */
buf = pg_any_to_server(buf, strlen(buf), festate->encoding);
buffer = pg_any_to_server(buffer, used_buffer_size, festate->encoding);
}
initStringInfo(&col_data);
switch (conversion)
{
case TEXT_CONVERSION :
appendStringInfoString (&col_data, buf);
break;
case HEX_CONVERSION :
appendStringInfoString (&col_data, "\\x");
appendStringInfoString (&col_data, buf);
appendStringInfoString (&col_data, buffer);
break;
case BOOL_CONVERSION :
if (buf[0] == 0)
strcpy(buf, "F");
else if (buf[0] == 1)
strcpy(buf, "T");
appendStringInfoString (&col_data, buf);
if (buffer[0] == 0)
strcpy(buffer, "F");
else if (buffer[0] == 1)
strcpy(buffer, "T");
appendStringInfoString (&col_data, buffer);
break;
case BIN_CONVERSION :
ereport(ERROR,
(errcode(ERRCODE_FDW_INVALID_DATA_TYPE),
errmsg("Bit string columns are not supported")
));
/* TODO: avoid hex conversion by building the tuple from Datum values instead of using BuildTupleFromCStrings */
hex = binary_to_hex(buffer, used_buffer_size);
appendStringInfoString (&col_data, "\\x");
appendStringInfoString (&col_data, hex);
pfree(hex);
break;
}

values[mapped_pos] = col_data.data;
}
}
pfree(buf);
pfree(buffer);
}

tuple = BuildTupleFromCStrings(festate->attinmeta, values);
Expand Down
Loading