Skip to content

Commit

Permalink
Ensure correct field order in INSERT and implement order_id sequence …
Browse files Browse the repository at this point in the history
…functionality (ubccr#201)

* Ensure correct field order in INSERT. Implement order_id as sequence.
  • Loading branch information
smgallo authored and ryanrath committed Sep 18, 2017
1 parent 0511bae commit f9b2f3e
Showing 1 changed file with 52 additions and 17 deletions.
69 changes: 52 additions & 17 deletions classes/ETL/Ingestor/pdoIngestor.php
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,9 @@ public function initialize(EtlOverseerOptions $etlOverseerOptions = null)
}

// Get the list of available source query fields. If we have described the source query in
// the JSON config, use the record keys otherwise we need to parse the SQL string.
// the JSON config, use the record keys otherwise we need to parse the SQL string. It is
// expected that the source record fields are in the same order as they are in the SQL
// query.

$this->sourceRecordFields =
( null !== $this->etlSourceQuery
Expand Down Expand Up @@ -460,10 +462,19 @@ private function singleDatabaseIngest()
reset($this->etlDestinationTableList);
$qualifiedDestTableName = current($this->etlDestinationTableList)->getFullName();

// Keys are table definition columns (destination) and values are query result columns (source). For a
// single database ingest it is assumed that no mapping is taking place (i.e., all source
// columns are mapped to the same destination columns)
$destColumnList = array_keys(current($this->destinationFieldMappings));
// Generate the list of destination fields. Note that the field list for the INSERT must be
// in the same order as the fields returned by the query or we will get field mismatches.
// Use the source record fields (generated from the source query in initialize()) as the
// correct order. Since the destination field map may have been user-specified we cannot
// guarantee the order.

$firstFieldMap = current($this->destinationFieldMappings);
$destColumnList = array();
foreach ( $this->sourceRecordFields as $sourceField ) {
if ( array_key_exists($sourceField, $firstFieldMap) ) {
$destColumnList[] = $sourceField;
}
}

// The default method for ingestion is INSERT INTO ON DUPLICATE KEY UPDATE because tests
// have shown an approx 40% performance improvement when updating existing data over REPLACE
Expand All @@ -485,7 +496,7 @@ function ($s) {
$destColumnList
);
$updateColumns = implode(',', $updateColumnList);
$sql = "INSERT INTO $qualifiedDestTableName ($destColumns) " . $this->sourceQueryString
$sql = "INSERT INTO $qualifiedDestTableName\n($destColumns)\n" . $this->sourceQueryString
. "\nON DUPLICATE KEY UPDATE $updateColumns";
}

Expand Down Expand Up @@ -525,12 +536,16 @@ private function multiDatabaseIngest()
$loadStatementList = array();
$numDestinationTables = count($this->etlDestinationTableList);

foreach ( $this->etlDestinationTableList as $etlTableKey => $etlTable ) {
// Iterate over the destination field mappings rather than the destination table list because it
// is possible that a table definition is provided but no data is mapped to it.

foreach ( $this->destinationFieldMappings as $etlTableKey => $destFieldToSourceFieldMap ) {
$etlTable = $this->etlDestinationTableList[$etlTableKey];
$qualifiedDestTableName = $etlTable->getFullName();

// If there are no source query columns mapped to this table, skip it.

if ( 0 == count($this->destinationFieldMappings[$etlTableKey]) ) {
if ( 0 == count($destFieldToSourceFieldMap) ) {
continue;
}

Expand All @@ -542,7 +557,7 @@ private function multiDatabaseIngest()
$this->logger->debug("Using temporary file '$infileName' for destination table key '$etlTableKey'");

// Keys are table columns (destination) and values are query result columns (source)
$destColumnList = array_keys($this->destinationFieldMappings[$etlTableKey]);
$destColumnList = array_keys($destFieldToSourceFieldMap);

// The default method for ingestion is INSERT INTO ON DUPLICATE KEY UPDATE because tests
// have shown an approx 40% performance improvement when updating existing data over
Expand Down Expand Up @@ -587,7 +602,7 @@ function ($s) {
$infileList[$etlTableKey] = $infileName;
$loadStatementList[$etlTableKey] = $loadStatement;

} // foreach ( $this->etlDestinationTableList as $etlTableKey => $etlTable )
} // foreach ( $this->destinationFieldMappings as $etlTableKey => $destFieldToSourceFieldMap )

if ( $this->getEtlOverseerOptions()->isDryrun() ) {
$this->logger->debug("Source query " . $this->sourceEndpoint . ":\n" . $this->sourceQueryString);
Expand Down Expand Up @@ -707,14 +722,29 @@ function ($s) {
// actually *hinders* performance in the tests! This was verified with 4,195,524 and 124,120
// rows.

$orderId = 0;

while ( $srcRecord = $sourceStatement->fetch(PDO::FETCH_ASSOC) ) {

$numSourceRecordsProcessed++;

// Some historical ingestors use an order_id and treat it similar to an auto-increment
// field, setting its value starting at 0 and incrementing for each record. It is not
// clear if this is used anywhere, but the functionality is maintained here for
// compatibility. Note that this does not work when adding fields into a table (e.g.,
// only when the table is truncated) and will not work properly in cases where the order
// is relative to a key. For example, if a key is resource_id and the order should be
// maintained for each unique resource_id we cannot use this method. To not overwrite
// existing data, only set the order_id if the source field exists and is NULL.

if ( array_key_exists('order_id', $srcRecord) && null === $srcRecord['order_id'] ) {
$srcRecord['order_id'] = $orderId++;
}

// Note that an array of transformed records is returned because a single source
// record may be transformed into multiple records.

$transformedRecords = $this->transform($srcRecord);
$transformedRecords = $this->transform($srcRecord, $orderId);

foreach ( $transformedRecords as $record ) {

Expand Down Expand Up @@ -781,7 +811,7 @@ function ($s) {
$this->logger->err($msg);
throw $e;
}
} // foreach ( $this->etlDestinationTableList as $etlTableKey => $etlTable )
} // foreach ( $loadStatementList as $etlTableKey => $loadStatement )

$this->logger->debug(
sprintf('Loaded %d files in %ds', $numFilesLoaded, microtime(true) - $loadFileStart)
Expand Down Expand Up @@ -822,7 +852,7 @@ function ($s) {
fclose($outFdList[$etlTableKey]);
@unlink($infileList[$etlTableKey]);

} // foreach ( $this->etlDestinationTableList as $etlTableKey => $etlTable )
} // foreach ( $loadStatementList as $etlTableKey => $loadStatement )

$this->logger->debug(sprintf('Loaded %d files in %ds', $numFilesLoaded, microtime(true) - $loadFileStart));

Expand Down Expand Up @@ -860,13 +890,15 @@ function ($s) {
*
* @param array $record An associative array containing the source record where the
* keys are the field names.
* @param int $orderId A reference to the relative ordering value used to set the value of an
* order_id field. @see multiDatabaseIngest()
*
* @return array A 2-dimensional array of potentially transformed records where each
* element is an individual record.
* ------------------------------------------------------------------------------------------
*/

protected function transform(array $srcRecord)
protected function transform(array $srcRecord, &$orderId)
{
foreach ( $srcRecord as $key => &$value ) {
if ( null === $value ) {
Expand Down Expand Up @@ -931,18 +963,21 @@ protected function allowSingleDatabaseOptimization()
return false;
}

// Can't optimize more than 1 destination table
// Can't optimize when writing data to more than 1 destination table

if ( count($this->etlDestinationTableList) > 1 ) {
$this->logger->debug("Multiple destination tables being populated");
return false;
}

// Can't optimize if mapping a subset of the query fields
// When creating the INSERT INTO ... SELECT statement in singleDatabaseIngest() we use the
// destination field map keys to generate the destination column list and use
// $this->sourceQueryString as the source query. These need to have the same fields (and be
// in the same order, but we will ensure proper order when generating the field list).

reset($this->destinationFieldMappings);

if ( count($this->sourceRecordFields) != count(current($this->destinationFieldMappings)) ) {
if ( 0 != count(array_diff($this->sourceRecordFields, array_keys(current($this->destinationFieldMappings)))) ) {
$this->logger->debug("Mapping a subset of the source query fields");
return false;
}
Expand Down

0 comments on commit f9b2f3e

Please sign in to comment.