Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve ETLv2 query debug #46

Merged
merged 7 commits into from
Feb 3, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 21 additions & 14 deletions classes/ETL/Aggregator/JobsAggregator.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@
use ETL\iAction;
use \PDOException;

class JobsAggregator extends pdoAggregator
implements iAction
class JobsAggregator extends pdoAggregator implements iAction
{
// Name of the status table that we will be updating
const STATUS_TABLE = "jobfactstatus";
Expand Down Expand Up @@ -52,8 +51,10 @@ protected function performPreAggregationUnitTasks($aggregationUnit)
return false;
}
} catch (PDOException $e ) {
$msg = "Error querying {$sourceSchema}.{$tableName}";
$this->logAndThrowSqlException($sql, $e, $msg);
$this->logAndThrowException(
"Error querying {$sourceSchema}.{$tableName}",
array('exception' => $e, 'sql' => $sql, 'endpoint' => $this->utilityEndpoint)
);
}

return parent::performPreAggregationUnitTasks($aggregationUnit);
Expand Down Expand Up @@ -106,8 +107,10 @@ protected function performPostAggregationUnitTasks($aggregationUnit, $numAggrega
$this->logger->info("Updated $numRows rows");
}
} catch (PDOException $e ) {
$msg = "Error updating {$sourceSchema}.{$tableName}";
$this->logAndThrowSqlException($sql, $e, $msg);
$this->logAndThrowException(
"Error updating {$sourceSchema}.{$tableName}",
array('exception' => $e, 'sql' => $sql, 'endpoint' => $this->destinationEndpoint)
);
}

return parent::performPostAggregationUnitTasks($aggregationUnit, $numAggregationPeriodsProcessed);
Expand Down Expand Up @@ -179,7 +182,7 @@ protected function performPostExecuteTasks($numRecordsProcessed)

// If we always run the full set of aggregation periods, this can be done once at the end...

$sql = "DELETE FROM {$sourceSchema}.{$tableName} WHERE " . implode(" AND " , $whereClauses);
$sql = "DELETE FROM {$sourceSchema}.{$tableName} WHERE " . implode(" AND ", $whereClauses);
$this->logger->debug($sql);

if ( ! $this->etlOverseerOptions->isDryrun() ) {
Expand All @@ -188,8 +191,10 @@ protected function performPostExecuteTasks($numRecordsProcessed)
}

} catch (PDOException $e ) {
$msg = "Error cleaning {$sourceSchema}.{$tableName}";
$this->logAndThrowSqlException($sql, $e, $msg);
$this->logAndThrowException(
"Error cleaning {$sourceSchema}.{$tableName}",
array('exception' => $e, 'sql' => $sql, 'endpoint' => $this->destinationEndpoint)
);
}

return parent::performPostExecuteTasks($numRecordsProcessed);
Expand Down Expand Up @@ -239,9 +244,9 @@ protected function getDirtyAggregationPeriods($aggregationUnit)
if ( null !== $startDate && null !== $endDate ) {
$dateRangeSql = "d.${aggregationUnit}_end_ts >= UNIX_TIMESTAMP($startDate) " .
"AND d.${aggregationUnit}_start_ts <= UNIX_TIMESTAMP($endDate)";
} else if ( null !== $startDate ) {
} elseif ( null !== $startDate ) {
$dateRangeSql = "d.${aggregationUnit}_end_ts >= UNIX_TIMESTAMP($startDate)";
} else if ( null !== $endDate ) {
} elseif ( null !== $endDate ) {
$dateRangeSql = "d.${aggregationUnit}_start_ts <= UNIX_TIMESTAMP($endDate)";
}

Expand Down Expand Up @@ -337,7 +342,10 @@ protected function getDirtyAggregationPeriods($aggregationUnit)
$result = $this->utilityHandle->query($sql);
}
} catch (PDOException $e) {
$this->logAndThrowSqlException($sql, $e, "Error querying dirty date ids");
$this->logAndThrowException(
"Error querying aggregation dirty date ids",
array('exception' => $e, 'sql' => $sql, 'endpoint' => $this->utilityEndpoint)
);
}

return $result;
Expand Down Expand Up @@ -381,7 +389,7 @@ protected function checkResourceSpecs()
":endDate" => $endDate
);

$this->logger->debug("Verify resource specs exist:\n$sql");
$this->logger->debug("Verify resource specs exist " . $this->sourceEndpoint . ":\n$sql");
$result = $this->sourceHandle->query($sql, $params);
if ( count($result) > 0 ) {
$resources = array();
Expand All @@ -403,5 +411,4 @@ protected function checkResourceSpecs()
$this->verifiedResourceSpecs = true;

} // checkResourceSpecs()

} // class JobsAggregator
81 changes: 58 additions & 23 deletions classes/ETL/Aggregator/pdoAggregator.php
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ protected function performPreAggregationUnitTasks($aggregationUnit)
}
}

$this->executeSqlList($sqlList, $this->destinationHandle, "Pre-aggregation unit tasks");
$this->executeSqlList($sqlList, $this->destinationEndpoint, "Pre-aggregation unit tasks");

return true;
} // performPreAggregationUnitTasks()
Expand Down Expand Up @@ -418,7 +418,7 @@ protected function performPostAggregationUnitTasks($aggregationUnit, $numAggrega
}
}

$this->executeSqlList($sqlList, $this->destinationHandle, "Post-aggregation unit tasks");
$this->executeSqlList($sqlList, $this->destinationEndpoint, "Post-aggregation unit tasks");

return true;

Expand Down Expand Up @@ -448,7 +448,10 @@ protected function verifyAggregationUnitTable($aggregationUnit)
continue;
}
} catch (PDOException $e) {
$this->logAndThrowSqlException($sql, $e, "Error verifying aggregation unit table for '$aggregationUnit'");
$this->logAndThrowException(
"Error verifying aggregation unit table for '$aggregationUnit'",
array('exception' => $e, 'sql' => $sql)
);
}

return true;
Expand Down Expand Up @@ -532,7 +535,7 @@ protected function getDirtyAggregationPeriods($aggregationUnit)

if ( null === $startDayIdField || null === $endDayIdField ) {
$t = Utilities::substituteVariables($firstTable->getFullName(false), $this->variableMap);
$this->logger->debug("Discover $t");
$this->logger->debug("Discover table $t");
$firstTableDef = Table::discover($t, $this->sourceEndpoint, null, $this->logger);

// If we are in dryrun mode the table may not have been created yet but we still want to
Expand Down Expand Up @@ -698,13 +701,15 @@ protected function getDirtyAggregationPeriods($aggregationUnit)
$result = array();

try {
$this->logger->debug("Select dirty aggregation periods:\n$sql");
$this->logger->debug("Select dirty aggregation periods SQL " . $this->sourceEndpoint . ":\n$sql");
if ( ! $this->etlOverseerOptions->isDryrun() ) {
// $result = $this->utilityHandle->query($sql);
$result = $this->sourceHandle->query($sql);
}
} catch (PDOException $e) {
$this->logAndThrowSqlException($sql, $e, "Error querying dirty date ids");
$this->logAndThrowException(
"Error querying dirty date ids",
array('exception' => $e, 'sql' => $sql)
);
}

return $result;
Expand Down Expand Up @@ -825,7 +830,10 @@ protected function _execute($aggregationUnit)
try {
$insertStmt = $this->destinationHandle->prepare($this->optimizedInsertSql);
} catch (PDOException $e) {
$this->logAndThrowSqlException($this->optimizedInsertSql, $e, "Error preparing optimized aggregation insert statement");
$this->logAndThrowException(
"Error preparing optimized aggregation insert statement",
array('exception' => $e, 'sql' => $this->optimizedInsertSql)
);
}

// Detect the bind variables used in the query so we can filter these later. PDO will
Expand All @@ -835,20 +843,26 @@ protected function _execute($aggregationUnit)
preg_match_all($bindParamRegex, $this->optimizedInsertSql, $matches);
$discoveredBindParams['insert'] = array_unique($matches[0]);

$this->logger->debug("Aggregation optimized insert query ($aggregationUnit)\n" . $this->optimizedInsertSql);
$this->logger->debug("Aggregation optimized INSERT SQL ($aggregationUnit) " . $this->destinationEndpoint . ":\n" . $this->optimizedInsertSql);

} else {

try {
$selectStmt = $this->sourceHandle->prepare($this->selectSql);
} catch (PDOException $e) {
$this->logAndThrowSqlException($this->selectSql, $e, "Error preparing aggregation select statement");
$this->logAndThrowException(
"Error preparing aggregation select statement",
array('exception' => $e, 'sql' => $this->selectSql)
);
}

try {
$insertStmt = $this->destinationHandle->prepare($this->insertSql);
} catch (PDOException $e) {
$this->logAndThrowSqlException($this->insertSql, $e, "Error preparing aggregation insert statement");
$this->logAndThrowException(
"Error preparing aggregation insert statement",
array('exception' => $e, 'sql' => $this->insertSql)
);
}

// Detect the bind variables used in the query so we can filter these later. PDO will
Expand All @@ -860,8 +874,8 @@ protected function _execute($aggregationUnit)
preg_match_all($bindParamRegex, $this->insertSql, $matches);
$discoveredBindParams['insert'] = $matches[0];

$this->logger->debug("Aggregation select query ($aggregationUnit)\n" . $this->selectSql);
$this->logger->debug("Aggregation insert query ($aggregationUnit)\n" . $this->insertSql);
$this->logger->debug("Aggregation SELECT SQL ($aggregationUnit) " . $this->sourceEndpoint . ":\n" . $this->selectSql);
$this->logger->debug("Aggregation INSERT SQL ($aggregationUnit) " . $this->destinationEndpoint . ":\n" . $this->insertSql);

} // else ($optimize)

Expand Down Expand Up @@ -930,7 +944,10 @@ protected function _execute($aggregationUnit)
$sql = "DROP TEMPORARY TABLE IF EXISTS $qualifiedTmpTableName";
$result = $this->sourceHandle->execute($sql);
} catch (PDOException $e ) {
$this->logAndThrowSqlException($sql, $e, "Error removing temporary batch aggregation table");
$this->logAndThrowException(
"Error removing temporary batch aggregation table",
array('exception' => $e, 'sql' => $sql)
);
}

$origTableName =
Expand Down Expand Up @@ -991,10 +1008,13 @@ function ($k, $first, $last) {
$sql =
"CREATE TEMPORARY TABLE $qualifiedTmpTableName AS "
. "SELECT * FROM $origTableName $tmpTableAlias WHERE " . $whereClause;
$this->logger->debug("[EXPERIMENTAL] Batch temp table: $sql");
$this->logger->debug("[EXPERIMENTAL] Batch temp table " . $this->sourceEndpoint . ": $sql");
$result = $this->sourceHandle->execute($sql, $usedParams);
} catch (PDOException $e ) {
$this->logAndThrowSqlException($sql, $e, "Error creating temporary batch aggregation table ");
$this->logAndThrowException(
"Error creating temporary batch aggregation table",
array('exception' => $e, 'sql' => $sql)
);
}

$this->logger->info("[EXPERIMENTAL] Setup for batch $minPeriodId - $maxPeriodId (day_id $minDayId - $maxDayId): "
Expand Down Expand Up @@ -1024,7 +1044,10 @@ function ($k, $first, $last) {
$sql = "DROP TEMPORARY TABLE IF EXISTS $tmpTableName";
$result = $this->sourceHandle->execute($sql);
} catch (PDOException $e ) {
$this->logAndThrowSqlException($sql, $e, "Error removing temporary batch aggregation table");
$this->logAndThrowException(
"Error removing temporary batch aggregation table",
array('exception' => $e, 'sql' => $sql)
);
}

} // else ( ! $enableBatchAggregation )
Expand Down Expand Up @@ -1131,12 +1154,15 @@ protected function processAggregationPeriods(
$deleteSql .= " AND " . implode(" AND ", $dummyQuery->getOverseerRestrictionValues());
}

$this->logger->debug($deleteSql);
$this->logger->debug("Delete aggregation unit SQL " . $this->destinationEndpoint . ":\n$deleteSql");
$this->destinationHandle->execute($deleteSql);
}

} catch (PDOException $e ) {
$this->logAndThrowSqlException($deleteSql, $e, "Error removing existing aggregation data");
$this->logAndThrowException(
"Error removing existing aggregation data",
array('exception' => $e, 'sql' => $deleteSql)
);
}
} // if ( ! $this->options->truncate_destination )

Expand All @@ -1153,10 +1179,12 @@ protected function processAggregationPeriods(
$numRecords = $insertStmt->rowCount();
}
} catch (PDOException $e ) {
$this->logAndThrowSqlException($this->optimizedInsertSql, $e, "Error processing aggregation period");
$this->logAndThrowException(
"Error processing aggregation period",
array('exception' => $e, 'sql' => $this->optimizedInsertSql)
);
}


} else {

// Query the source table and put the results into the destination table in 2 steps
Expand All @@ -1166,7 +1194,11 @@ protected function processAggregationPeriods(
$selectStmt->execute($bindParams);
$numRecords = $selectStmt->rowCount();
} catch (PDOException $e ) {
$this->logAndThrowSqlException($this->selectSql, $e, "Error selecting raw job data");
$this->logAndThrowException(
"Error selecting raw job data",
array('exception' => $e, 'sql' => $this->selectSql)
);

}

$msg = array(
Expand All @@ -1182,7 +1214,10 @@ protected function processAggregationPeriods(
$insertStmt->execute($row);
}
} catch (PDOException $e ) {
$this->logAndThrowSqlException($this->insertSql, $e, "Error inserting aggregated data");
$this->logAndThrowException(
"Error inserting aggregated data",
array('exception' => $e, 'sql' => $this->insertSql)
);
}

} // else ( $optimize )
Expand Down
6 changes: 4 additions & 2 deletions classes/ETL/DataEndpoint/Mysql.php
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,10 @@ public function createSchema($schemaName = null)
$dbh = $this->getHandle();
$result = $dbh->execute($sql);
} catch (\PdoException $e) {
$msg = "Error creating schema '$schemaName'";
$this->logAndThrowSqlException($sql, $e, $msg);
$this->logAndThrowException(
"Error creating schema '$schemaName'",
array('exception' => $e, 'sql' => $sql, 'endpoint' => $this)
);
}

return true;
Expand Down
12 changes: 8 additions & 4 deletions classes/ETL/DataEndpoint/Oracle.php
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,10 @@ public function tableExists($tableName, $schemaName = null)
return false;
}
} catch (PDOException $e) {
$msg = "Error querying for table '$schema'.'$tableName':";
$this->logAndThrowSqlException($sql, $e, $msg);
$this->logAndThrowException(
"Error querying for table '$schema'.'$tableName':",
array('exception' => $e, 'sql' => $sql, 'endpoint' => $this)
);
}

return true;
Expand Down Expand Up @@ -152,8 +154,10 @@ public function getTableColumnNames($tableName, $schemaName = null)
$this->logAndThrowException($msg);
}
} catch (PDOException $e) {
$msg = "Error retrieving column names from '" . $this->getSchema() . ".'$tableName' ";
$this->logAndThrowSqlException($sql, $e, $msg);
$this->logAndThrowException(
"Error retrieving column names from '" . $this->getSchema() . ".'$tableName' ",
array('exception' => $e, 'sql' => $sql, 'endpoint' => $this)
);
}

$columnNames = array();
Expand Down
6 changes: 4 additions & 2 deletions classes/ETL/DataEndpoint/Postgres.php
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,10 @@ public function createSchema($schemaName = null)
$dbh = $this->getHandle();
$result = $dbh->query($sql, $params);
} catch (\PdoException $e) {
$msg = "Error creating schema '$schemaName'";
$this->logAndThrowSqlException($sql, $e, $msg);
$this->logAndThrowException(
"Error creating schema '$schemaName'",
array('exception' => $e, 'sql' => $sql, 'endpoint' => $this)
);
}

return true;
Expand Down
Loading