Skip to content

Commit

Permalink
Arrange to use Logical Replication protocol metadata. (#155)
Browse files Browse the repository at this point in the history
Instead of relying on wal2json to include the metadata we need in our
implementation of Change Data Capture, use the Logical Decoding protocol
metadata directly.

This prepares for compatibility with other output_plugins in the future.

The XID is still parsed from the "include-xids" option of wal2json at the
moment, but it's used only as a hint and a check. The current plan is to
have compatilibity with only output_plugins that can send the XIDs, such as
for example the test_decoding output_plugin.
  • Loading branch information
dimitri authored Nov 23, 2022
1 parent 382d0f6 commit 667f869
Show file tree
Hide file tree
Showing 7 changed files with 262 additions and 149 deletions.
247 changes: 143 additions & 104 deletions src/bin/pgcopydb/ld_stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -123,17 +123,17 @@ startLogicalStreaming(StreamSpecs *specs)
.keywords = {
"format-version",
"include-xids",
"include-lsn",
"include-schemas",
"include-transaction",
"include-timestamp",
"include-types",
"filter-tables"
},
.values = {
"2",
"true",
"true",
"true",
"true",
"false",
"pgcopydb.*"
}
};
Expand Down Expand Up @@ -376,6 +376,7 @@ bool
streamWrite(LogicalStreamContext *context)
{
StreamContext *privateContext = (StreamContext *) context->private;
LogicalMessageMetadata *metadata = &(privateContext->metadata);

/* we might have to rotate to the next on-disk file */
if (!streamRotateFile(context))
Expand All @@ -384,89 +385,96 @@ streamWrite(LogicalStreamContext *context)
return false;
}

LogicalMessageMetadata *metadata = &(privateContext->metadata);
JSON_Value *json = json_parse_string(context->buffer);

/* ensure we have a new all-zero metadata structure for the new message */
(void) memset(metadata, 0, sizeof(LogicalMessageMetadata));

if (!parseMessageMetadata(metadata, context->buffer, json, false))
if (!prepareMessageMetadataFromContext(context))
{
/* errors have already been logged */
if (privateContext->jsonFile != NULL)
{
if (fclose(privateContext->jsonFile) != 0)
{
log_error("Failed to close file \"%s\": %m",
privateContext->walFileName);
}

/* reset the jsonFile FILE * pointer to NULL, it's closed now */
privateContext->jsonFile = NULL;
}

json_value_free(json);
log_error("Failed to prepare Logical Message Metadata from context, "
"see above for details");
return false;
}

json_value_free(json);

(void) updateStreamCounters(privateContext, metadata);

/*
* Write the logical decoding message to disk, appending to the already
* opened file we track in the privateContext.
*/
if (privateContext->jsonFile != NULL)
if (privateContext->jsonFile == NULL)
{
long bytes_left = strlen(context->buffer);
long bytes_written = 0;
log_error("Failed to write Logical Message: jsonFile is NULL");
return false;
}

while (bytes_left > 0)
{
int ret;
/* first write our own metadata, formatted in JSON */
int ret =
fformat(privateContext->jsonFile,
"{\"action\":\"%c\","
"\"xid\":\"%lld\","
"\"lsn\":\"%X/%X\","
"\"timestamp\":\"%s\","
"\"message\":",
metadata->action,
(long long) metadata->xid,
LSN_FORMAT_ARGS(metadata->lsn),
metadata->timestamp);

if (ret == -1)
{
log_error("Failed to write message metadata for action %d at LSN %X/%X "
"to file \"%s\": %m",
metadata->action,
LSN_FORMAT_ARGS(metadata->lsn),
privateContext->walFileName);
return false;
}

ret = fwrite(context->buffer + bytes_written,
sizeof(char),
bytes_left,
privateContext->jsonFile);
/* then add the logical output plugin data, inside our own JSON format */
long bytes_left = strlen(context->buffer);
long bytes_written = 0;

if (ret < 0)
{
log_error("Failed to write %ld bytes to file \"%s\": %m",
bytes_left,
privateContext->walFileName);
return false;
}
while (bytes_left > 0)
{
int ret;

/* Write was successful, advance our position */
bytes_written += ret;
bytes_left -= ret;
}
ret = fwrite(context->buffer + bytes_written,
sizeof(char),
bytes_left,
privateContext->jsonFile);

if (fwrite("\n", sizeof(char), 1, privateContext->jsonFile) != 1)
if (ret < 0)
{
log_error("Failed to write 1 byte to file \"%s\": %m",
log_error("Failed to write %ld bytes to file \"%s\": %m",
bytes_left,
privateContext->walFileName);
return false;
}

if (privateContext->jsonFile != NULL)
{
if (fclose(privateContext->jsonFile) != 0)
{
log_error("Failed to close file \"%s\": %m",
privateContext->walFileName);
}
/* Write was successful, advance our position */
bytes_written += ret;
bytes_left -= ret;
}

/* reset the jsonFile FILE * pointer to NULL, it's closed now */
privateContext->jsonFile = NULL;
if (fwrite("}\n", sizeof(char), 2, privateContext->jsonFile) != 2)
{
log_error("Failed to write 2 bytes to file \"%s\": %m",
privateContext->walFileName);

if (privateContext->jsonFile != NULL)
{
if (fclose(privateContext->jsonFile) != 0)
{
log_error("Failed to close file \"%s\": %m",
privateContext->walFileName);
}
return false;
}

/* update the LSN tracking that's reported in the feedback */
context->tracking->written_lsn = context->cur_record_lsn;
/* reset the jsonFile FILE * pointer to NULL, it's closed now */
privateContext->jsonFile = NULL;
}
return false;
}

/* update the LSN tracking that's reported in the feedback */
context->tracking->written_lsn = context->cur_record_lsn;

log_debug("Received action %c for XID %u in LSN %X/%X",
metadata->action,
metadata->xid,
Expand Down Expand Up @@ -751,50 +759,26 @@ streamKeepalive(LogicalStreamContext *context)
/* register progress made through receiving keepalive messages */
if (privateContext->jsonFile != NULL)
{
/* Postgres Epoch is 2000-01-01, Unix Epoch usually is 1970-01-01 */
char *pgepoch_str = "2000-01-01";
struct tm pgepoch = { 0 };

if (strptime(pgepoch_str, "%Y-%m-%d", &pgepoch) == NULL)
{
log_error("Failed to parse Postgres epoch \"%s\": %m", pgepoch_str);
return false;
}

time_t e = mktime(&pgepoch);
char sendTimeStr[BUFSIZE] = { 0 };

if (e == (time_t) -1)
/* add the server sendTime to the LogicalMessageMetadata */
if (!pgsql_timestamptz_to_string(context->sendTime,
sendTimeStr,
sizeof(sendTimeStr)))
{
log_error("Failed to compute Postgres epoch: %m");
log_error("Failed to format server send time %lld to time string",
(long long) context->sendTime);
return false;
}

/*
* Postgres Timestamps are stored as int64 values with units of
* microseconds. time_t are the number of seconds since the Epoch.
*/
time_t t = ((time_t) (context->now / 1000000)) + e;

struct tm lt = { 0 };

if (localtime_r(&t, &lt) == NULL)
{
log_error("Failed to format Keepalive timestamp: %m");
return false;
}

char now[BUFSIZE] = { 0 };

strftime(now, sizeof(now), "%Y-%m-%d %H:%M:%S%z", &lt);

fformat(privateContext->jsonFile,
"{\"action\":\"K\",\"lsn\":\"%X/%X\",\"timestamp\":\"%s\"}\n",
LSN_FORMAT_ARGS(context->cur_record_lsn),
now);
sendTimeStr);

log_debug("Inserted action KEEPALIVE for lsn %X/%X @%s",
LSN_FORMAT_ARGS(context->cur_record_lsn),
now);
sendTimeStr);

/* update the LSN tracking that's reported in the feedback */
context->tracking->written_lsn = context->cur_record_lsn;
Expand Down Expand Up @@ -897,6 +881,64 @@ streamFeedback(LogicalStreamContext *context)
}


/*
* prepareMessageMetadataFromContext prepares the Logical Message Metadata from
* the fields grabbbed in the logical streaming protocol.
*
* See XLogData (B) protocol message description at:
*
* https://www.postgresql.org/docs/current/protocol-replication.html
*/
bool
prepareMessageMetadataFromContext(LogicalStreamContext *context)
{
StreamContext *privateContext = (StreamContext *) context->private;
LogicalMessageMetadata *metadata = &(privateContext->metadata);

/* ensure we have a new all-zero metadata structure for the new message */
(void) memset(metadata, 0, sizeof(LogicalMessageMetadata));

/* add the server start LSN to the LogicalMessageMetadata */
metadata->lsn = context->cur_record_lsn;

/* add the server sendTime to the LogicalMessageMetadata */
if (!pgsql_timestamptz_to_string(context->sendTime,
metadata->timestamp,
sizeof(metadata->timestamp)))
{
log_error("Failed to format server send time %lld to time string",
(long long) context->sendTime);
return false;
}

/* now parse the output_plugin buffer itself (wal2json) */
JSON_Value *json = json_parse_string(context->buffer);

if (!parseMessageMetadata(metadata, context->buffer, json, false))
{
/* errors have already been logged */
if (privateContext->jsonFile != NULL)
{
if (fclose(privateContext->jsonFile) != 0)
{
log_error("Failed to close file \"%s\": %m",
privateContext->walFileName);
}

/* reset the jsonFile FILE * pointer to NULL, it's closed now */
privateContext->jsonFile = NULL;
}

json_value_free(json);
return false;
}

json_value_free(json);

return true;
}


/*
* parseMessageMetadata parses just the metadata of the JSON replication
* message we got from wal2json.
Expand Down Expand Up @@ -953,16 +995,13 @@ parseMessageMetadata(LogicalMessageMetadata *metadata,

char *lsn = (char *) json_object_get_string(jsobj, "lsn");

if (lsn == NULL)
if (lsn != NULL)
{
log_error("Failed to parse JSON message LSN: \"%s\"", buffer);
return false;
}

if (!parseLSN(lsn, &(metadata->lsn)))
{
log_error("Failed to parse LSN \"%s\"", lsn);
return false;
if (!parseLSN(lsn, &(metadata->lsn)))
{
log_error("Failed to parse LSN \"%s\"", lsn);
return false;
}
}

char *timestamp = (char *) json_object_get_string(jsobj, "timestamp");
Expand Down
2 changes: 2 additions & 0 deletions src/bin/pgcopydb/ld_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,8 @@ bool streamCloseFile(LogicalStreamContext *context, bool time_to_abort);

bool streamWaitForSubprocess(LogicalStreamContext *context);

bool prepareMessageMetadataFromContext(LogicalStreamContext *context);

bool parseMessageMetadata(LogicalMessageMetadata *metadata,
const char *buffer,
JSON_Value *json,
Expand Down
18 changes: 11 additions & 7 deletions src/bin/pgcopydb/ld_transform.c
Original file line number Diff line number Diff line change
Expand Up @@ -527,12 +527,12 @@ parseMessage(LogicalTransaction *txn,
if (metadata->action != STREAM_ACTION_SWITCH &&
metadata->action != STREAM_ACTION_KEEPALIVE)
{
schema = (char *) json_object_get_string(jsobj, "schema");
table = (char *) json_object_get_string(jsobj, "table");
schema = (char *) json_object_dotget_string(jsobj, "message.schema");
table = (char *) json_object_dotget_string(jsobj, "message.table");

if (schema == NULL || table == NULL)
{
log_error("Failed to parse truncate message missing "
log_error("Failed to parse truncated message missing "
"schema or table property: %s",
message);
return false;
Expand Down Expand Up @@ -603,7 +603,8 @@ parseMessage(LogicalTransaction *txn,

case STREAM_ACTION_INSERT:
{
JSON_Array *jscols = json_object_get_array(jsobj, "columns");
JSON_Array *jscols =
json_object_dotget_array(jsobj, "message.columns");

stmt->action = metadata->action;

Expand Down Expand Up @@ -658,7 +659,8 @@ parseMessage(LogicalTransaction *txn,
}

LogicalMessageTuple *old = &(stmt->stmt.update.old.array[0]);
JSON_Array *jsids = json_object_get_array(jsobj, "identity");
JSON_Array *jsids =
json_object_dotget_array(jsobj, "message.identity");

if (!SetColumnNamesAndValues(old, message, jsids))
{
Expand All @@ -669,7 +671,8 @@ parseMessage(LogicalTransaction *txn,
}

LogicalMessageTuple *new = &(stmt->stmt.update.new.array[0]);
JSON_Array *jscols = json_object_get_array(jsobj, "columns");
JSON_Array *jscols =
json_object_dotget_array(jsobj, "message.columns");

if (!SetColumnNamesAndValues(new, message, jscols))
{
Expand Down Expand Up @@ -701,7 +704,8 @@ parseMessage(LogicalTransaction *txn,
}

LogicalMessageTuple *old = &(stmt->stmt.update.old.array[0]);
JSON_Array *jsids = json_object_get_array(jsobj, "identity");
JSON_Array *jsids =
json_object_dotget_array(jsobj, "message.identity");

if (!SetColumnNamesAndValues(old, message, jsids))
{
Expand Down
Loading

0 comments on commit 667f869

Please sign in to comment.