diff --git a/classes/ETL/Aggregator/JobsAggregator.php b/classes/ETL/Aggregator/JobsAggregator.php index 159221079c..c6357173af 100644 --- a/classes/ETL/Aggregator/JobsAggregator.php +++ b/classes/ETL/Aggregator/JobsAggregator.php @@ -147,7 +147,7 @@ protected function performPreExecuteTasks() * ------------------------------------------------------------------------------------------ */ - protected function performPostExecuteTasks($numRecordsProcessed) + protected function performPostExecuteTasks($numRecordsProcessed = null) { $sourceSchema = $this->sourceEndpoint->getSchema(true); $tableName = $this->destinationEndpoint->quoteSystemIdentifier(self::STATUS_TABLE); diff --git a/classes/ETL/Aggregator/aAggregator.php b/classes/ETL/Aggregator/aAggregator.php index 2797bd8b80..63801b1396 100644 --- a/classes/ETL/Aggregator/aAggregator.php +++ b/classes/ETL/Aggregator/aAggregator.php @@ -174,32 +174,6 @@ public function execute(EtlOverseerOptions $etlOverseerOptions) } // execute() - /* ------------------------------------------------------------------------------------------ - * Perform any pre-execution tasks. For example, disabling table keys on MyISAM tables, or other - * setup tasks. - * - * NOTE: This method must check if we are in DRYRUN mode before executing any tasks. - * - * @return true on success - * ------------------------------------------------------------------------------------------ - */ - - abstract protected function performPreExecuteTasks(); - - /* ------------------------------------------------------------------------------------------ - * Perform any post-execution tasks. For example, enabling table keys on MyISAM tables, or - * tracking table history. - * - * NOTE: This method must check if we are in DRYRUN mode before executing any tasks. - * - * @param $numRecordsProcessed The number of records processed during this period. - * - * @return true on success - * ------------------------------------------------------------------------------------------ - */ - - abstract protected function performPostExecuteTasks($numRecordsProcessed); - /* ------------------------------------------------------------------------------------------ * Perform any pre-aggregation unit tasks. This are performed prior to aggregating each * aggregation unit (e.g., day, month, quarter) and might be verifying that a status table @@ -240,5 +214,6 @@ abstract protected function performPostAggregationUnitTasks($aggregationUnit, $n * ------------------------------------------------------------------------------------------ */ + // @codingStandardsIgnoreLine abstract protected function _execute($aggregationUnit); } // abstract class Aggregator diff --git a/classes/ETL/Aggregator/pdoAggregator.php b/classes/ETL/Aggregator/pdoAggregator.php index 4583010f14..1d757a4d8d 100644 --- a/classes/ETL/Aggregator/pdoAggregator.php +++ b/classes/ETL/Aggregator/pdoAggregator.php @@ -290,17 +290,20 @@ protected function createDestinationTableObjects() } // createDestinationTableObjects() - /* ------------------------------------------------------------------------------------------ - * By default, there are no pre-execution tasks. + /** ----------------------------------------------------------------------------------------- + * Note that we are not calling aRdbmsDestinationAction::performPreExecuteTasks() + * because we cannot properly manage the aggregation tables without knowing the + * aggregation unit or applying variable substitutions. Tables will be managed in + * performPreAggregationUnitTasks() instead. * - * @see aAggregator::performPreExecuteTasks() + * @see aAction::performPreExecuteTasks() * ------------------------------------------------------------------------------------------ */ protected function performPreExecuteTasks() { - // To support programmatic manipulation of the source Query object, save off the description - // of the first join (from) table + // To support programmatic manipulation of the source Query object, save off the + // description of the first join (from) table $sourceJoins = $this->etlSourceQuery->joins; $this->etlSourceQueryOrigFromTable = array_shift($sourceJoins); $this->etlSourceQueryModified = false; @@ -308,14 +311,13 @@ protected function performPreExecuteTasks() return true; } // performPreExecuteTasks() - /* ------------------------------------------------------------------------------------------ - * By default, there are no pre-execution tasks. - * - * @see aAggregator::performPostExecuteTasks() + /** ----------------------------------------------------------------------------------------- + * @see performPostAggregationUnitTasks() + * @see aAction::performPostExecuteTasks() * ------------------------------------------------------------------------------------------ */ - protected function performPostExecuteTasks($numRecordsProcessed) + protected function performPostExecuteTasks($numRecordsProcessed = null) { return true; } // performPostExecuteTasks() @@ -715,6 +717,7 @@ protected function getDirtyAggregationPeriods($aggregationUnit) * ------------------------------------------------------------------------------------------ */ + // @codingStandardsIgnoreLine protected function _execute($aggregationUnit) { $time_start = microtime(true); @@ -770,11 +773,10 @@ protected function _execute($aggregationUnit) $sourceJoins = $this->etlSourceQuery->joins; $firstJoin = array_shift($sourceJoins); $newFirstJoin = clone $firstJoin; - $newFirstJoin->setName($tmpTableName); + $newFirstJoin->name = $tmpTableName; $newFirstJoin->schema = $this->sourceEndpoint->getSchema(); - $this->etlSourceQuery->deleteJoins(); - $this->etlSourceQuery->addJoin($newFirstJoin); + $this->etlSourceQuery->joins = array($newFirstJoin); foreach ( $sourceJoins as $join ) { $this->etlSourceQuery->addJoin($join); } @@ -788,8 +790,7 @@ protected function _execute($aggregationUnit) $sourceJoins = $this->etlSourceQuery->joins; array_shift($sourceJoins); - $this->etlSourceQuery->deleteJoins(); - $this->etlSourceQuery->addJoin($this->etlSourceQueryOrigFromTable); + $this->etlSourceQuery->joins = array($this->etlSourceQueryOrigFromTable); foreach ( $sourceJoins as $join ) { $this->etlSourceQuery->addJoin($join); } @@ -891,7 +892,7 @@ protected function _execute($aggregationUnit) $sourceJoins = $this->etlSourceQuery->joins; $firstJoin = current($sourceJoins); - $tmpTableAlias = $firstJoin->getAlias(); + $tmpTableAlias = $firstJoin->alias; while ( ! $done ) { @@ -945,7 +946,7 @@ protected function _execute($aggregationUnit) try { // Use the where clause from the aggregation query to create the temporary table - $whereClause = implode(" AND ", $this->etlSourceQuery->getWheres()); + $whereClause = implode(" AND ", $this->etlSourceQuery->where); $whereClause = Utilities::substituteVariables( $whereClause, diff --git a/classes/ETL/Configuration/EtlConfiguration.php b/classes/ETL/Configuration/EtlConfiguration.php index b22408f3e5..6b480e1420 100644 --- a/classes/ETL/Configuration/EtlConfiguration.php +++ b/classes/ETL/Configuration/EtlConfiguration.php @@ -125,7 +125,7 @@ public function __construct( if ( array_key_exists('option_overrides', $options) && null !== $options['option_overrides'] ) { if ( ! is_array($options['option_overrides']) ) { $this->logAndThrowException("Option overrides must be an array"); - } else { + } elseif ( 0 !== count($options['option_overrides']) ) { $this->optionOverrides = $options['option_overrides']; } } diff --git a/classes/ETL/DataEndpoint/Filter/ExternalProcess.php b/classes/ETL/DataEndpoint/Filter/ExternalProcess.php index 1ddf33af1b..192f3505e4 100644 --- a/classes/ETL/DataEndpoint/Filter/ExternalProcess.php +++ b/classes/ETL/DataEndpoint/Filter/ExternalProcess.php @@ -176,6 +176,10 @@ public function onCreate() $arguments = ( isset($this->params->arguments) ? " " . $this->params->arguments : "" ); $this->command = $this->params->path . $arguments; + if ( isset($this->params->logger) ) { + $this->params->logger->debug(sprintf("Creating filter %s: %s", self::NAME, $this->command)); + } + // stream_bucket_new() needs somewhere to store temporary data but the // documentation doesn't give any details: // http://php.net/manual/en/function.stream-bucket-new.php diff --git a/classes/ETL/DataEndpoint/JsonFile.php b/classes/ETL/DataEndpoint/JsonFile.php index 1f98ef59e5..479f0b9791 100644 --- a/classes/ETL/DataEndpoint/JsonFile.php +++ b/classes/ETL/DataEndpoint/JsonFile.php @@ -73,7 +73,21 @@ protected function decodeRecord($data) ); } - if ( is_array($decoded) ) { + // If we parsed an empty array or object do not include it as a record. + + if ( + (is_array($decoded) && 0 == count($decoded)) || + (is_object($decoded) && 0 == count(get_object_vars($decoded))) + ) { + return true; + } + + // If we have decoded an array of records (either arrays or objects) then merge + // them onto the record list. Be careful that we have not decoded a single record + // that is an array, as this should simply be appended on to the end of the record + // list. + + if ( is_array($decoded) && (is_array(current($decoded)) || is_object(current($decoded))) ) { $this->recordList = array_merge($this->recordList, $decoded); } else { $this->recordList[] = $decoded; @@ -146,6 +160,86 @@ protected function verifyData() } // verifyData() + /** ----------------------------------------------------------------------------------------- + * @see aStructuredFile::discoverRecordFieldNames() + * ------------------------------------------------------------------------------------------ + */ + + protected function discoverRecordFieldNames() + { + // If there are no records in the file then we don't need to set the discovered + // field names. + + if ( 0 == count($this->recordList) ) { + return; + } + + // Determine the record names based on the structure of the JSON that we are + // parsing. + + reset($this->recordList); + $record = current($this->recordList); + + if ( is_array($record) ) { + + if ( $this->hasHeaderRecord ) { + + // If we have a header record skip the first record and use its values as + // the field names + + $this->discoveredRecordFieldNames = array_shift($this->recordList); + + } elseif ( 0 !== count($this->requestedRecordFieldNames) ) { + + // If there is no header record and the requested field names have been + // provided, use them as the discovered field names. If a subsequent + // record contains fewer fields return NULL values for those fields, if a + // subsequent record contains more fields ignore them. + + $this->discoveredRecordFieldNames = $this->requestedRecordFieldNames; + + } else { + $this->logAndThrowException("Record field names must be specified for JSON array records"); + } + + } elseif ( is_object($record) ) { + + // Pull the record field names from the object keys + + $this->discoveredRecordFieldNames = array_keys(get_object_vars($record)); + + } else { + $this->logAndThrowException( + sprintf("Unsupported record type in %s. Got %s, expected array or object", $this->path, gettype($record)) + ); + } + + // If no field names were requested, return all discovered fields + + if ( 0 == count($this->requestedRecordFieldNames) ) { + $this->requestedRecordFieldNames = $this->discoveredRecordFieldNames; + } + + } // setRecordFieldNames() + + /** ----------------------------------------------------------------------------------------- + * @see aStructuredFile::createReturnRecord() + * ------------------------------------------------------------------------------------------ + */ + + protected function createReturnRecord($record) + { + $arrayRecord = parent::createReturnRecord($record); + + // If the original record is a stdClass object, be sure to maintain its type. + + if ( is_object($record) ) { + return (object) $arrayRecord; + } else { + return $arrayRecord; + } + } // createReturnRecord() + /** ----------------------------------------------------------------------------------------- * Implementation of json_last_error_msg() for pre PHP 5.5 systems. * diff --git a/classes/ETL/DataEndpoint/aStructuredFile.php b/classes/ETL/DataEndpoint/aStructuredFile.php index 32ad0d2d35..50ac9dcd6d 100644 --- a/classes/ETL/DataEndpoint/aStructuredFile.php +++ b/classes/ETL/DataEndpoint/aStructuredFile.php @@ -64,6 +64,44 @@ abstract class aStructuredFile extends File */ protected $fieldSeparator = null; + /** + * TRUE if the file is expected to have a header record, FALSE otherwise. + * + * @var boolean + */ + protected $hasHeaderRecord = true; + + /** + * An array of field names to return. If this is a subset of the fields present in the + * record, then return only the fields requested. If there are requested fields that + * are not present in the record return NULL for those fields. If NULL, return all + * discovered record fields. + * + * @var array + */ + protected $requestedRecordFieldNames = array(); + + /** + * An array of field names corresponding to the data in the file. File formats may + * interpret this differently, but all implementations are expected to return data for + * all fields specified here. If a field does not exist in the data, its value + * expected to be NULL. + * + * @var array + */ + protected $discoveredRecordFieldNames = array(); + + /** + * A flag indicating whether or not records should be returned exactly as they were + * found in the data file. When set to FALSE, records are normalized into an + * associative array (or other Traversable entity) containing only the fields that + * were requested. If set to TRUE, the normalization step is skipped and the raw + * record is returned. + * + * @var boolean + */ + protected $recordPassthrough = false; + /** ----------------------------------------------------------------------------------------- * @see iDataEndpoint::__construct() * ------------------------------------------------------------------------------------------ @@ -80,7 +118,10 @@ public function __construct(DataEndpointOptions $options, Log $logger = null) 'record_schema_path' => 'string', 'filters' => 'array', 'record_separator' => 'string', - 'field_separator' => 'string' + 'field_separator' => 'string', + 'header_record' => 'bool', + 'field_names' => 'array', + 'record_passthrough' => 'bool' ); if ( ! \xd_utilities\verify_object_property_types($options, $propertyTypes, $messages, true) ) { @@ -112,6 +153,18 @@ public function __construct(DataEndpointOptions $options, Log $logger = null) $this->filterDefinitions = $options->filters; } + if ( isset($options->header_record) ) { + $this->hasHeaderRecord = $options->header_record; + } + + if ( isset($options->field_names) ) { + $this->requestedRecordFieldNames = $options->field_names; + } + + if ( isset($options->record_passthrough) ) { + $this->recordPassthrough = $options->record_passthrough; + } + } // __construct() /** ----------------------------------------------------------------------------------------- @@ -121,11 +174,15 @@ public function __construct(DataEndpointOptions $options, Log $logger = null) public function parse() { - $this->logger->info("Parsing " . $this->path); + $this->logger->debug("Parsing " . $this->path); $this->attachFilters(); $numBytesRead = $this->parseFile($this->path); $this->verifyData(); + // Determine the record field names. This is specific to the type of structured + // data that we are parsing. + $this->discoverRecordFieldNames(); + $this->rewind(); return $this->current(); @@ -183,7 +240,7 @@ protected function attachFilters() } $filterName = 'xdmod.external_process'; $resource = @stream_filter_prepend($fd, $filterName, STREAM_FILTER_READ, $config); - $this->logger->debug(sprintf("Adding filter %s to stream", $filterName)); + $this->logger->debug(sprintf("Adding filter %s to stream: %s", $filterName, $config->path)); if ( false === $resource ) { $error = error_get_last(); @@ -340,6 +397,16 @@ public function getFieldSeparator() return $this->fieldSeparator; } + /** ----------------------------------------------------------------------------------------- + * @see iStructuredFile::hasHeaderRecord() + * ------------------------------------------------------------------------------------------ + */ + + public function hasHeaderRecord() + { + return $this->hasHeaderRecord; + } + /** ----------------------------------------------------------------------------------------- * @see iStructuredFile::getRecordFieldNames() * ------------------------------------------------------------------------------------------ @@ -347,7 +414,17 @@ public function getFieldSeparator() public function getRecordFieldNames() { - return array(); + return $this->requestedRecordFieldNames; + } + + /** ----------------------------------------------------------------------------------------- + * @see iStructuredFile::getDiscoveredRecordFieldNames() + * ------------------------------------------------------------------------------------------ + */ + + public function getDiscoveredRecordFieldNames() + { + return $this->discoveredRecordFieldNames; } /** ----------------------------------------------------------------------------------------- @@ -361,16 +438,50 @@ public function getAttachedFilters() } /** ----------------------------------------------------------------------------------------- - * @see iStructuredFile::getRecordsDefinition() + * Construct a Traversable return record. The return record must contain all of the + * requested field names (keys) along with their values or NULL if a value is not + * present for that field. + * + * Be careful to maintain the type of the record so we do not break functionality + * downstream that relies on it. For example, when parsing a JSON configuration file + * we must maintain the stdClass type and not blindly convert it to an associative + * array. The child class can re-implement this method as needed. + * + * @return array A record that includes all of the data for the requested fields * ------------------------------------------------------------------------------------------ */ - public function getRecordsDefinition() + protected function createReturnRecord($record) { - return array(); - } + // Create an associative array with discovered field names as keys and the + // associated record field values. Since the expected fields can be set using a + // header row, we will need to handle the case where subsequent records could + // contain more or fewer fields than the header record. - /** ----------------------------------------------------------------------------------------- + if ( is_object($record) ) { + $arrayRecord = get_object_vars($record); + } else { + $numDiscoveredRecords = count($this->discoveredRecordFieldNames); + if ( count($record) < $numDiscoveredRecords ) { + $record = array_pad($record, $numDiscoveredRecords, null); + } + $arrayRecord = array_combine($this->discoveredRecordFieldNames, array_slice($record, 0, $numDiscoveredRecords)); + } + + // Create an iterable template where the keys are all of the requested fields with + // NULL values. Merge the data record into the template so that the NULL values in + // the template are overwritten with the record values where the fields + // match. + + $dataTemplate = array_fill_keys($this->requestedRecordFieldNames, null); + + return array_merge($dataTemplate, array_intersect_key($arrayRecord, $dataTemplate)); + } // createReturnRecord() + + /** ----------------------------------------------------------------------------------------- + * Return the current record as a Traversable entity such as an associative array or + * stdClass where the keys are field names. + * * @see Iterator::current() * ------------------------------------------------------------------------------------------ */ @@ -379,11 +490,17 @@ public function current() { if ( ! $this->valid() ) { return false; + } elseif ( $this->recordPassthrough ) { + return current($this->recordList); } - return current($this->recordList); + + // The return record must be Traversable. + + return $this->createReturnRecord(current($this->recordList)); + } // current() - /** ----------------------------------------------------------------------------------------- + /** ----------------------------------------------------------------------------------------- * @see Iterator::key() * ------------------------------------------------------------------------------------------ */ @@ -457,4 +574,17 @@ abstract protected function decodeRecord($data); */ abstract protected function verifyData(); + + /** ----------------------------------------------------------------------------------------- + * Set the discovered field names for the records in a file. How the field names are + * determined is specific to the file type. For example, the fields can be inferred + * from a CSV/TSV file with a header or a JSON file representing data as objects but + * must be specified for CSV/TSV without a header or for a JSON file containing + * records as arrays. + * + * @return array The list of record field names. + * ------------------------------------------------------------------------------------------ + */ + + abstract protected function discoverRecordFieldNames(); } // abstract class aStructuredFile diff --git a/classes/ETL/DataEndpoint/iStructuredFile.php b/classes/ETL/DataEndpoint/iStructuredFile.php index 57773940e4..1c8e7d7353 100644 --- a/classes/ETL/DataEndpoint/iStructuredFile.php +++ b/classes/ETL/DataEndpoint/iStructuredFile.php @@ -5,7 +5,8 @@ * Record Separator (RS) and individual fields may be separated by a Field Separator (FS) * if the format allows or requires it (e.g., CSV and TSV). Typically, we will read a * structured file by iterating over the records and operating on the fields within the - * record. + * record. Each record returned by the iterator methods must be traversable (either an + * array, object, or implementing the Traversable interface) * * @author Steve Gallo * @date 2017-05-10 @@ -18,43 +19,52 @@ interface iStructuredFile extends iFile, \Iterator, \Countable { /** ----------------------------------------------------------------------------------------- - * @return The record separator, or NULL if none has been set. + * @return string The record separator, or NULL if none has been set. * ------------------------------------------------------------------------------------------ */ public function getRecordSeparator(); /** ----------------------------------------------------------------------------------------- - * @return The field separator, or NULL if none has been set. + * @return string The field separator, or NULL if none has been set. * ------------------------------------------------------------------------------------------ */ public function getFieldSeparator(); /** ----------------------------------------------------------------------------------------- - * @return The list of field names for a record + * @return boolean TRUE if the file is expected to have a header record, FALSE otherwise. + * ------------------------------------------------------------------------------------------ + */ + + public function hasHeaderRecord(); + + /** ----------------------------------------------------------------------------------------- + * @return array The list of field names that will be returned for each record. These + * are set via the field_names option to the StructuredFile endpoint or based on the + * fields found in the record if field_names is not set. They may be the same as the + * discovered fields, a subset, or a superset. * ------------------------------------------------------------------------------------------ */ public function getRecordFieldNames(); /** ----------------------------------------------------------------------------------------- - * @return An associative array filters attached to be applied to the data in this - * file where the key is the filter key used in the configuration. + * @return array The list of field names that were discovered a record. These are + * auto-discovered (via a header record or the object keys of the first record, for + * example) or specified via the field_names option. * ------------------------------------------------------------------------------------------ */ - public function getAttachedFilters(); + public function getDiscoveredRecordFieldNames(); /** ----------------------------------------------------------------------------------------- - * The record definition is a mixed list of scalar field names and objects specifying - * the field name and type. - * - * @return An array containing the record definition. + * @return array An associative array of filter configuration objects to be applied to + * the data in this file. The key is the filter key used in the configuration. * ------------------------------------------------------------------------------------------ */ - public function getRecordsDefinition(); + public function getAttachedFilters(); /** ----------------------------------------------------------------------------------------- * Parse and possibly decode the (possibly filtered) file data. The resulting data @@ -70,7 +80,8 @@ public function getRecordsDefinition(); * record separator. However, a JSON file could also contain multiple individual * objects separated by a record separator without the enclosing array. * - * @return mixed The first parsed record parsed from the file. + * @return mixed The first parsed record parsed from the file. This record must be + * iterable, such as an array or an object with public data membbers. * ------------------------------------------------------------------------------------------ */ diff --git a/classes/ETL/Ingestor/RestIngestor.php b/classes/ETL/Ingestor/RestIngestor.php index 29e8e24c85..1177f8895b 100644 --- a/classes/ETL/Ingestor/RestIngestor.php +++ b/classes/ETL/Ingestor/RestIngestor.php @@ -218,25 +218,14 @@ public function initialize(EtlOverseerOptions $etlOverseerOptions = null) } // initialize() /* ------------------------------------------------------------------------------------------ - * @see aIngestor::performPreExecuteTasks() + * @see aAction::performPreExecuteTasks() * ------------------------------------------------------------------------------------------ */ protected function performPreExecuteTasks() { - // ------------------------------------------------------------------------------------------ - // This is not yet updated to fully support multiple ETL destination tables. - try { - - // Bring the destination table in line with the configuration if necessary. - // manageTable() is DRYRUN aware. - - $this->manageTable($this->etlDestinationTable, $this->destinationEndpoint); - - } catch ( Exception $e ) { - $this->logAndThrowException("Error managing ETL table for " . $this->getName() . ": " . $e->getMessage()); - } + parent::performPreExecuteTasks(); $this->destinationTableColumnNames = $this->etlDestinationTable->getColumnNames(); @@ -284,33 +273,10 @@ protected function performPreExecuteTasks() $this->processParameters(); - if ( "myisam" == strtolower($this->etlDestinationTable->engine) ) { - // Disable keys for faster inserts - $qualifiedDestTableName = $this->etlDestinationTable->getFullName(); - $sqlList = array("ALTER TABLE $qualifiedDestTableName DISABLE KEYS"); - $this->executeSqlList($sqlList, $this->destinationEndpoint); - } - return true; } // performPreExecuteTasks() - /* ------------------------------------------------------------------------------------------ - * @see aIngestor::performPostExecuteTasks() - * ------------------------------------------------------------------------------------------ - */ - - protected function performPostExecuteTasks($numRecordsProcessed) - { - if ( "myisam" == strtolower($this->etlDestinationTable->engine) ) { - $qualifiedDestTableName = $this->etlDestinationTable->getFullName(); - $sqlList = array("ALTER TABLE $qualifiedDestTableName ENABLE KEYS"); - $this->executeSqlList($sqlList, $this->destinationEndpoint); - } - - return true; - } // performPostExecuteTasks() - /* ------------------------------------------------------------------------------------------ * @see aIngestor::_execute() * ------------------------------------------------------------------------------------------ diff --git a/classes/ETL/Ingestor/StructuredFileIngestor.php b/classes/ETL/Ingestor/StructuredFileIngestor.php index d7a9572950..37eed0715f 100644 --- a/classes/ETL/Ingestor/StructuredFileIngestor.php +++ b/classes/ETL/Ingestor/StructuredFileIngestor.php @@ -23,28 +23,16 @@ class StructuredFileIngestor extends aIngestor implements iAction { /** - * Execution data set up by this ingestor's pre-execution tasks. + * The custom insert values component is an object that allows us to specify a + * subquery to use when inserting data rather than the raw source value. If the + * destination column is present as a key in the object, the key's value will be used. * - * The pre-execution function creates an array with these keys: - * - destColumns: A numeric array of destination columns. - * - destColumnsToSourceKeys: An array mapping destination columns to - * the corresponding keys in the source data. - * - sourceValues: An array of values to be ingested during execution. - * - customInsertValuesComponents: An object containing replacement SQL - * for the standard placeholders in the - * INSERT statement for the destination - * columns specified in the - * configuration file. - * - * @var array + * @var array|null */ - protected $executionData; - // This action does not (yet) support multiple destination tables. If multiple destination - // tables are present, store the first here and use it. - protected $etlDestinationTable = null; + protected $customInsertValuesComponents = null; - /* ------------------------------------------------------------------------------------------ + /** ----------------------------------------------------------------------------------------- * @see iAction::__construct() * ------------------------------------------------------------------------------------------ */ @@ -54,9 +42,8 @@ public function __construct(aOptions $options, EtlConfiguration $etlConfig, Log parent::__construct($options, $etlConfig, $logger); } // __construct() - /* ------------------------------------------------------------------------------------------ - * Initialize data required to perform the action. This should be called after the constructor and - * as part of the verification process. + /** ----------------------------------------------------------------------------------------- + * @see iAction::initialize() * * @throws Exception if any query data was not int the correct format. * ------------------------------------------------------------------------------------------ @@ -72,65 +59,14 @@ public function initialize(EtlOverseerOptions $etlOverseerOptions = null) parent::initialize($etlOverseerOptions); - // This ingestor supports an explicit source data endpoint of StructuredFile or - // JSON data specified directly in the definition file using the source_values - // key. If the source_values key is present, ignore the source endpoint as the - // data will have already been parsed. - - if ( isset($this->parsedDefinitionFile->source_values) ) { - $this->sourceEndpoint = null; - $this->sourceHandle = null; - } elseif ( $this->options->source !== null && ! $this->options->ignore_source ) { - if ( ! $this->sourceEndpoint instanceof iStructuredFile ) { - $msg = "Source is not an instance of ETL\\DataEndpoint\\iStructuredFile"; - $this->logAndThrowException($msg); - } - } - - // This action only supports 1 destination table so use the first one and log a warning if - // there are multiple. - - reset($this->etlDestinationTableList); - $this->etlDestinationTable = current($this->etlDestinationTableList); - $etlTableKey = key($this->etlDestinationTableList); - if ( count($this->etlDestinationTableList) > 1 ) { - $msg = $this . " does not support multiple ETL destination tables, using first table with key: '$etlTableKey'"; - $logger->warning($msg); - } - - if ( ! isset($this->parsedDefinitionFile->destination_columns) ) { - $msg = "destination_columns key not present in definition file: " . $this->definitionFile; - $this->logAndThrowException($msg); - } - - $destinationColumns = $this->parsedDefinitionFile->destination_columns; - if (is_object($destinationColumns)) { - $destinationColumns = array_keys(get_object_vars( - $destinationColumns - )); - } elseif (! is_array($destinationColumns)) { - $msg = "destination_columns is invalid format: " . $this->definitionFile; - $this->logAndThrowException($msg); - } - - if ( - ! $this->sourceEndpoint - && ! isset($this->parsedDefinitionFile->source_values) - ) { - $msg = "source file not configured and (default) source values not in definition file"; - $this->logAndThrowException($msg); - } - - // Verify that the columns exist - $missingColumnNames = array_diff( - $destinationColumns, - $this->etlDestinationTable->getColumnNames() - ); - - if ( 0 != count($missingColumnNames) ) { - $msg = "The following columns from the data file were not found in table " . $this->etlDestinationTable->getFullName() . ": " . - implode(", ", $missingColumnNames); - $this->logAndThrowException($msg); + if ( ! $this->sourceEndpoint instanceof iStructuredFile ) { + $this->logAndThrowException( + sprintf( + "Source endpoint %s does not implement %s", + get_class($this->sourceEndpoint), + "ETL\\DataEndpoint\\iStructuredFile" + ) + ); } $this->initialized = true; @@ -139,198 +75,163 @@ public function initialize(EtlOverseerOptions $etlOverseerOptions = null) } // initialize() - /** + /** ----------------------------------------------------------------------------------------- * @see aIngestor::_execute() + * ------------------------------------------------------------------------------------------ */ // @codingStandardsIgnoreLine protected function _execute() { - $destColumns = $this->executionData['destColumns']; - $destColumnsToSourceKeys = $this->executionData['destColumnsToSourceKeys']; - $sourceValues = $this->executionData['sourceValues']; - $customInsertValuesComponents = $this->executionData['customInsertValuesComponents']; - - // If no data was provided in the file, use the StructuredFile endpoint - if ( null === $sourceValues ) { - $sourceValues = $this->sourceEndpoint; + $numRecords = 0; + $insertStatements = array(); + + // We will need to get the record fields from the source data. This happens after + // the first record is parsed. + + $this->sourceEndpoint->parse(); + $recordFieldNames = $this->sourceEndpoint->getRecordFieldNames(); + $this->logger->debug( + sprintf("Requested %d record fields: %s", count($recordFieldNames), implode(', ', $recordFieldNames)) + ); + + if ( 0 == count($recordFieldNames) ) { + return $numRecords; } - $numColumns = count($destColumns); - $numRecords = count($sourceValues); + $this->parseDestinationFieldMap($recordFieldNames); - // Insert data for each column. + // The custom_insert_values_components option is an object that allows us to + // specify a subquery to use when inserting data rather than the raw source + // value. If the destination column is present as a key in the object, use the + // subquery, otherwise use "?" as a placeholder. Note that the raw value will be + // provided to the subquery and it should contain a single "?" placeholder. // // NOTE: Null values will not overwrite non-null values in the database. // This is done to handle destinations that can be populated by // multiple sources with varying levels of detail. - $valuesComponents = array_map(function ($destColumn) use ($customInsertValuesComponents) { - return ( - property_exists($customInsertValuesComponents, $destColumn) - ? $customInsertValuesComponents->$destColumn - : '?' - ); - }, $destColumns); - $sql = "INSERT INTO " . $this->etlDestinationTable->getFullName() . " (" . - implode(",", $destColumns) . - ") VALUES (" . - implode(",", $valuesComponents) . - ") ON DUPLICATE KEY UPDATE " . - implode(", ", array_map(function ($destColumn) { - return "$destColumn = COALESCE(VALUES($destColumn), $destColumn)"; - }, $destColumns)) - ; + $customInsertValuesComponents = $this->customInsertValuesComponents; + + // The destination field map may specify that the same source field is mapped to + // multiple destination fields and the order that the source record fields is + // returned may be different from the order the fields were specified in the + // map. Maintain a mapping between source fields and the position (index) that + // they were specified in the map so we cam properly build the SQL parameter list. + + $sourceFieldIndexes = array(); + + foreach ( $this->etlDestinationTableList as $etlTableKey => $etlTable ) { + $destFieldToSourceFieldMap = $this->destinationFieldMappings[$etlTableKey]; + $destinationFields = array_keys($destFieldToSourceFieldMap); + $sourceFieldIndexes[$etlTableKey] = array_values($destFieldToSourceFieldMap); + + $valuesComponents = array_map( + function ($destField) use ($customInsertValuesComponents) { + return ( property_exists($customInsertValuesComponents, $destField) + ? $customInsertValuesComponents->$destField + : '?' ); + }, + $destinationFields + ); - $this->logger->debug("Insert query " . $this->destinationEndpoint . ":\n$sql"); + // Generate one statement per destination table + + $sql = sprintf( + 'INSERT INTO %s (%s) VALUES (%s) ON DUPLICATE KEY UPDATE %s', + $etlTable->getFullName(), + implode(', ', $destinationFields), + implode(', ', $valuesComponents), + implode(', ', array_map( + function ($destField) { + return "$destField = COALESCE(VALUES($destField), $destField)"; + }, + array_keys($destFieldToSourceFieldMap) + )) + ); - if ( ! $this->getEtlOverseerOptions()->isDryrun() ) { try { - $insertStatement = $this->destinationHandle->prepare($sql); - - foreach ( $sourceValues as $sourceValue ) { - $insertStatement->execute($this->convertSourceValueToRow( - $sourceValue, - $destColumns, - $destColumnsToSourceKeys - )); + $this->logger->debug("Insert SQL: $sql"); + if ( ! $this->getEtlOverseerOptions()->isDryrun() ) { + $insertStatements[$etlTableKey] = $this->destinationHandle->prepare($sql); } } catch (PDOException $e) { $this->logAndThrowException( - "Error inserting file data", + "Error preparing insert statement for table key '$etlTableKey'", array('exception' => $e, 'endpoint' => $this) ); } } - return $numRecords; - } // execute() - - /** - * @see aIngestor::performPreExecuteTasks - */ - protected function performPreExecuteTasks() - { - $this->time_start = microtime(true); - - $this->initialize($this->getEtlOverseerOptions()); - - $this->manageTable($this->etlDestinationTable, $this->destinationEndpoint); + if ( $this->getEtlOverseerOptions()->isDryrun() ) { + return $numRecords; + } - $this->truncateDestination(); + // Insert each source record. Note that the source record may be an array or an + // object and must be Traversable. - // The destination columns may be specified as an array when the source - // values are given as arrays and an object for when the source values - // are given as objects. Regardless of form, get a list of destination - // columns and a mapping of those columns to their corresponding keys in - // source values. - $destColumns = $this->parsedDefinitionFile->destination_columns; - if (is_array($destColumns)) { - $destColumnsToSourceKeys = array_flip($destColumns); - if ($destColumnsToSourceKeys === null) { - $msg = "destination_columns is an invalid array: " . $this->definitionFile; - $this->logAndThrowException($msg); - } - } else { - $destColumnsToSourceKeys = (array) $destColumns; - $destColumns = array_keys($destColumnsToSourceKeys); - } + foreach ( $this->sourceEndpoint as $sourceRecord ) { + foreach ( $this->destinationFieldMappings as $etlTableKey => $destFieldToSourceFieldMap ) { - // The StructuredFile data endpoint now implements the Iterator interface. Storing - // source values directly in the definition file will be deprecated in the future - // in favor of maintaing a separate data file with a reference to that file in the - // definition. + $parameters = array(); - $sourceValues = null; + // Build up the parameter list for the query. Note that the same source + // value may be used multiple times. The records returned from a + // StructuredFile endpoint will be Traversable as ($key, $value) pairs, + // however this does not mean that we can assume they can be treated as + // arrays (e.g., $sourceRecord[$sourceField]) because they may be objects + // or store data in private members that are exposed by the Iterator + // interface. - // If a source data endpoint was given, use it. Otherwise, use data - // values specified in the definition file. - if ( null !== $this->sourceEndpoint) { - $this->sourceEndpoint->parse(); - } else { - $sourceValues = $this->parsedDefinitionFile->source_values; - } + foreach ($sourceRecord as $sourceField => $sourceValue) { + // Find all indexes that match the current source field + $indexes = array_keys(array_intersect($sourceFieldIndexes[$etlTableKey], array($sourceField))); + foreach ( $indexes as $i ) { + $parameters[$i] = $sourceValue; + } + } - // If any custom SQL fragments for insertion were specified, use them. - $customInsertValuesComponents = $this->parsedDefinitionFile->custom_insert_values_components; - if ($customInsertValuesComponents === null) { - $customInsertValuesComponents = new stdClass(); + try { + $insertStatements[$etlTableKey]->execute($parameters); + } catch (PDOException $e) { + $this->logAndThrowException( + "Error inserting data into table key '$etlTableKey' for record " . ( $numRecords + 1), + array('exception' => $e, 'endpoint' => $this) + ); + } + } + $numRecords++; } - $this->executionData = array( - 'destColumns' => $destColumns, - 'destColumnsToSourceKeys' => $destColumnsToSourceKeys, - 'sourceValues' => $sourceValues, - 'customInsertValuesComponents' => $customInsertValuesComponents, - ); + return $numRecords; - return true; - } + } // _execute() - /** - * @see aIngestor::performPostExecuteTasks + /** ----------------------------------------------------------------------------------------- + * @see aIngestor::performPreExecuteTasks + * ------------------------------------------------------------------------------------------ */ - protected function performPostExecuteTasks($numRecords = null) - { - $time_start = $this->time_start; - $time_end = microtime(true); - $time = $time_end - $time_start; - - $logArray = array( - 'action' => (string) $this, - 'start_time' => $time_start, - 'end_time' => $time_end, - 'elapsed_time' => round($time, 5), - ); - if ($numRecords !== null) { - $logArray['records_loaded'] = $numRecords; - } - - $this->logger->notice($logArray); + protected function performPreExecuteTasks() + { + parent::performPreExecuteTasks(); - return true; - } + // If any custom SQL fragments for insertion were specified, use them. - /** - * Convert a given data point into a set of values for database insertion. - * - * NOTE: Values not found in the data point will be treated as null. - * - * @param array|stdClass $sourceValue The data point to convert. - * @param array $destColumns An ordered list of columns an - * INSERT statement is expected - * to be provided values for. - * @param array $destColumnsToSourceKeys A mapping of columns to their - * corresponding keys in the data. - * @return array A set of values ready to be - * used with an INSERT statement. - */ - protected function convertSourceValueToRow( - $sourceValue, - $destColumns, - $destColumnsToSourceKeys - ) { - $row = array(); - foreach ($destColumns as $destColumn) { - $sourceKey = $destColumnsToSourceKeys[$destColumn]; - - // If the key is an integer, then the data point should be an array. - // Otherwise, the data point should be an object. - if (is_int($sourceKey)) { - $row[] = ( - isset($sourceValue[$sourceKey]) - ? $sourceValue[$sourceKey] - : null - ); - } else { - $row[] = ( - property_exists($sourceValue, $sourceKey) - ? $sourceValue->$sourceKey - : null + if ( isset($this->parsedDefinitionFile->custom_insert_values_components) ) { + $this->customInsertValuesComponents = $this->parsedDefinitionFile->custom_insert_values_components; + if ( ! is_object($this->customInsertValuesComponents) ) { + $this->logAndThrowException( + sprintf( + "custom_insert_values_components must be an object, %s given", + gettype($customInsertValuesComponents) + ) ); } + } else { + $this->customInsertValuesComponents = new stdClass(); } - return $row; + + return true; } } // class StructuredFileIngestor diff --git a/classes/ETL/Ingestor/UpdateIngestor.php b/classes/ETL/Ingestor/UpdateIngestor.php index ad3e65ca0b..7a8cf507c1 100644 --- a/classes/ETL/Ingestor/UpdateIngestor.php +++ b/classes/ETL/Ingestor/UpdateIngestor.php @@ -19,17 +19,20 @@ use ETL\aOptions; use ETL\DataEndpoint; use ETL\DataEndpoint\DataEndpointOptions; +use ETL\DataEndpoint\iStructuredFile; class UpdateIngestor extends aRdbmsDestinationAction implements iAction { - // Data parsed from the source JSON file or inline from the source_data definition - protected $data = null; + /** + * This action does not (yet) support multiple destination tables. If multiple + * destination tables are present, store the first here and use it. + * + * @var Table + */ - // This action does not (yet) support multiple destination tables. If multiple destination - // tables are present, store the first here and use it. protected $etlDestinationTable = null; - /* ------------------------------------------------------------------------------------------ + /** ----------------------------------------------------------------------------------------- * @see iAction::__construct() * ------------------------------------------------------------------------------------------ */ @@ -39,13 +42,12 @@ public function __construct(aOptions $options, EtlConfiguration $etlConfig, Log parent::__construct($options, $etlConfig, $logger); if ( ! $options instanceof IngestorOptions ) { - $msg = "Options is not an instance of IngestorOptions"; - $this->logAndThrowException($msg); + $this->logAndThrowException("Options is not an instance of IngestorOptions"); } } // __construct() - /* ------------------------------------------------------------------------------------------ + /** ----------------------------------------------------------------------------------------- * @see iAction::initialize() * ------------------------------------------------------------------------------------------ */ @@ -60,6 +62,16 @@ public function initialize(EtlOverseerOptions $etlOverseerOptions = null) parent::initialize($etlOverseerOptions); + if ( ! $this->sourceEndpoint instanceof iStructuredFile ) { + $this->logAndThrowException( + sprintf( + "Source endpoint %s does not implement %s", + get_class($this->sourceEndpoint), + "ETL\\DataEndpoint\\iStructuredFile" + ) + ); + } + // This action only supports 1 destination table so use the first one and log a warning if // there are multiple. @@ -67,53 +79,42 @@ public function initialize(EtlOverseerOptions $etlOverseerOptions = null) $this->etlDestinationTable = current($this->etlDestinationTableList); $etlTableKey = key($this->etlDestinationTableList); if ( count($this->etlDestinationTableList) > 1 ) { - $msg = $this . " does not support multiple ETL destination tables, using first table with key: '$etlTableKey'"; - $logger->warning($msg); + $logger->warning( + sprintf( + "%s does not support multiple ETL destination tables, using first table with key: '%s'", + $this, + $etlTableKey + ) + ); } - // Verify that we have a properly formatted "source_data" and "update" property - - $requiredKeys = array('source_data', 'update'); - - foreach ( $requiredKeys as $key ) { - if ( ! isset($this->parsedDefinitionFile->$key) ) { - $msg = "'$key' key missing in definition file: " . $this->definitionFile; - $this->logAndThrowException($msg); - } - } + // Verify that we have a properly "update" property - if ( ! isset($this->parsedDefinitionFile->update->set) || ! is_array($this->parsedDefinitionFile->update->set) ) { - $msg = "'set' key missing or not an array in 'update' block: " . $this->definitionFile; - $this->logAndThrowException($msg); - } + $requiredKeys = array( + 'update' => 'object' + ); - if ( ! isset($this->parsedDefinitionFile->update->where) || ! is_array($this->parsedDefinitionFile->update->where) ) { - $msg = "'where' key missing or not an array in 'update' block: " . $this->definitionFile; - $this->logAndThrowException($msg); + if ( ! \xd_utilities\verify_object_property_types($this->parsedDefinitionFile, $requiredKeys, $messages) ) { + $this->logAndThrowException(sprintf("Definition file error: %s", implode(', ', $messages))); } - if ( ! isset($this->parsedDefinitionFile->source_data->data) ) { - $msg = "'data' key missing in in 'source_data' block: " . $this->definitionFile; - $this->logAndThrowException($msg); - } + $requiredKeys = array( + 'set' => 'array', + 'where' => 'array' + ); - if ( ! isset($this->parsedDefinitionFile->source_data->fields) || ! is_array($this->parsedDefinitionFile->source_data->fields) ) { - $msg = "'fields' key missing or not an array in 'source_data' block: " . $this->definitionFile; - $this->logAndThrowException($msg); + if ( ! \xd_utilities\verify_object_property_types($this->parsedDefinitionFile->update, $requiredKeys, $messages) ) { + $this->logAndThrowException( + sprintf("Error verifying definition file 'update' section: %s", implode(', ', $messages)) + ); } - // 2. Create data & verify source. Data source is iteratable. Instantiate object based on columns and data. - // 3. In execute() - // a. Construct prepared update statement - // b. Iterate over data source and execute statement - // Merge source data fields with update set and where fields and ensure that they exist in // the table definition $updateColumns = array_merge( $this->parsedDefinitionFile->update->set, - $this->parsedDefinitionFile->update->where, - $this->parsedDefinitionFile->source_data->fields + $this->parsedDefinitionFile->update->where ); $missingColumnNames = array_diff( @@ -122,29 +123,13 @@ public function initialize(EtlOverseerOptions $etlOverseerOptions = null) ); if ( 0 != count($missingColumnNames) ) { - $msg = "The following columns from the update configuration were not " . - "found in table " . $this->etlDestinationTable->getFullName() . ": " . - implode(", ", $missingColumnNames); - $this->logAndThrowException($msg); - } - - // If the data is a string assume it is a filename, otherwise assume it is parsed JSON. - - if ( is_string($this->parsedDefinitionFile->source_data->data) ) { - $filename = $this->parsedDefinitionFile->source_data->data; - $filename = \xd_utilities\qualify_path($filename, $this->options->paths->base_dir); - - $this->logger->debug("Load data from '$filename'"); - $opt = new DataEndpointOptions(array('name' => "Configuration", - 'path' => $filename, - 'type' => "jsonfile")); - $jsonFile = DataEndpoint::factory($opt, $this->logger); - $this->data = $jsonFile->parse(); - } elseif ( is_array($this->parsedDefinitionFile->source_data->data) ) { - $this->data = $this->parsedDefinitionFile->source_data->data; - } else { - $msg = "Source data must be an inline array or a filename"; - $this->logAndThrowException($msg); + $this->logAndThrowException( + sprintf( + "The following columns from the update configuration were not found in table '%s': %s", + $this->etlDestinationTable->getFullName(), + implode(", ", $missingColumnNames) + ) + ); } $this->initialized = true; @@ -153,7 +138,7 @@ public function initialize(EtlOverseerOptions $etlOverseerOptions = null) } // initialize() - /* ------------------------------------------------------------------------------------------ + /** ----------------------------------------------------------------------------------------- * @see iAction::execute() * ------------------------------------------------------------------------------------------ */ @@ -184,52 +169,62 @@ public function execute(EtlOverseerOptions $etlOverseerOptions) // Note that the update ingestor does not manage or truncate tables. - $sql = "UPDATE " . $this->etlDestinationTable->getFullName() . " SET " - . implode( - ", ", - array_map( - function ($s) { - return "$s = ?"; - }, - $this->parsedDefinitionFile->update->set - ) + $sql = sprintf( + "UPDATE %s SET %s WHERE %s", + $this->etlDestinationTable->getFullName(), + implode( + ', ', + array_map(function ($s) { + return "$s = ?"; + }, $this->parsedDefinitionFile->update->set) + ), + implode( + ' AND ', + array_map(function ($w) { + return "$w = ?"; + }, $this->parsedDefinitionFile->update->where) ) - . " WHERE " - . implode( - " AND ", - array_map( - function ($w) { - return "$w = ?"; - }, - $this->parsedDefinitionFile->update->where - ) - ); + ); $this->logger->debug("Update query\n$sql"); - // The order and number of the fields must match the update statement - $dataFields = array_merge($this->parsedDefinitionFile->update->set, $this->parsedDefinitionFile->update->where); + // The order and number of the fields must match the placeholders update statement - // Set up the indexes that we will need in the correct order for each data record - $fieldsToIndexes = array_flip($this->parsedDefinitionFile->source_data->fields); - $dataIndexes = array_map( - function ($field) use ($fieldsToIndexes) { - return $fieldsToIndexes[$field]; - }, - $dataFields + $queryDataFields = array_merge( + $this->parsedDefinitionFile->update->set, + $this->parsedDefinitionFile->update->where ); + // Verify that all of the required fields are present in the data + + $firstRecord = $this->sourceEndpoint->parse(); + if ( ! is_array($firstRecord) ) { + $this->logAndThrowException("The current implementation of %s only supports array records", $this); + } + + $missing = array_diff($queryDataFields, $this->sourceEndpoint->getRecordFieldNames()); + if ( 0 != count($missing) ) { + $this->logAndThrowException( + sprintf( + "These fields are required by the update but are not present in the source data: %s", + implode(', ', $missing) + ) + ); + } + if ( ! $etlOverseerOptions->isDryrun() ) { try { $updateStatement = $this->destinationHandle->prepare($sql); - foreach ( $this->data as $record ) { + foreach ( $this->sourceEndpoint as $record ) { + $row = array_map( - function ($index) use ($record) { - return $record[$index]; + function ($field) use ($record) { + return $record[$field]; }, - $dataIndexes + $queryDataFields ); + $updateStatement->execute($row); $numRecordsUpdated += $updateStatement->rowCount(); $numRecordsProcessed++; diff --git a/classes/ETL/Ingestor/aIngestor.php b/classes/ETL/Ingestor/aIngestor.php index 1c169825cc..567b24e436 100644 --- a/classes/ETL/Ingestor/aIngestor.php +++ b/classes/ETL/Ingestor/aIngestor.php @@ -149,33 +149,7 @@ public function execute(EtlOverseerOptions $etlOverseerOptions) 'records_examined' => $totalRecordsProcessed, 'records_loaded' => $totalRecordsProcessed )); - } - - /* ------------------------------------------------------------------------------------------ - * Perform any pre-execution tasks. For example, disabling table keys on MyISAM tables, or other - * setup tasks. - * - * NOTE: This method must check if we are in DRYRUN mode before executing any tasks. - * - * @return true on success - * ------------------------------------------------------------------------------------------ - */ - - abstract protected function performPreExecuteTasks(); - - /* ------------------------------------------------------------------------------------------ - * Perform any post-execution tasks. For example, enabling table keys on MyISAM tables, or - * tracking table history. - * - * NOTE: This method must check if we are in DRYRUN mode before executing any tasks. - * - * @param $numRecordsProcessed The number of records processed during this period. - * - * @return true on success - * ------------------------------------------------------------------------------------------ - */ - - abstract protected function performPostExecuteTasks($numRecordsProcessed); + } // execute() /* ------------------------------------------------------------------------------------------ * Perform the actual work of ingestion. diff --git a/classes/ETL/Ingestor/pdoIngestor.php b/classes/ETL/Ingestor/pdoIngestor.php index fd09c0a914..58872e2400 100644 --- a/classes/ETL/Ingestor/pdoIngestor.php +++ b/classes/ETL/Ingestor/pdoIngestor.php @@ -58,57 +58,115 @@ class pdoIngestor extends aIngestor { - // Maximum number of times to attempt to execute the source query + + /** ----------------------------------------------------------------------------------------- + * Maximum number of times to attempt to execute the source query + * + * @var int + * ------------------------------------------------------------------------------------------ + */ + const MAX_QUERY_ATTEMPTS = 3; - // Write a log message after processing this many source records + /** ----------------------------------------------------------------------------------------- + * Write a log message after processing this many source records + * + * @var int + * ------------------------------------------------------------------------------------------ + */ + const NUM_RECORDS_PER_LOG_MSG = 100000; - // Maximum number of records to import at once + /** ----------------------------------------------------------------------------------------- + * Maximum number of records to import in one LOAD DATA IN FILE + * + * @var int + * ------------------------------------------------------------------------------------------ + */ + const MAX_RECORDS_PER_INFILE = 250000; - // The number of records per load file to use when calculating the write timeout - const NET_WRITE_TIMEOUT_RECORD_CHUNK = 250000; + /** ----------------------------------------------------------------------------------------- + * The number of records per load file to use when calculating the database write timeout + * + * @var int + * ------------------------------------------------------------------------------------------ + */ - // The number of seconds to allot per file per record chunk - const NET_WRITE_TIMEOUT_SECONDS_PER_FILE_CHUNK = 60; + const NET_WRITE_TIMEOUT_RECORD_CHUNK = 250000; - // ------------------------------------------------------------------------------------------ + /** ----------------------------------------------------------------------------------------- + * The number of seconds to allot for the timeout per file per record chunk + * + * @var int + * ------------------------------------------------------------------------------------------ + */ - // An 2-dimensional associative array where the keys are ETL table definition keys and the - // values are a mapping between ETL table columns (keys) and source query columns (values). - protected $destinationFieldMappings = array(); + const NET_WRITE_TIMEOUT_SECONDS_PER_FILE_CHUNK = 60; - // Set to TRUE to indicate a destination field mapping was not specified in the configuration - // file and was auto-generated using all source query columns. This can be used for - // optimizations later. - protected $fullSourceToDestinationMapping = false; + /** ----------------------------------------------------------------------------------------- + * Query used for extracting data from the source endpoint. + * + * @var string | null + * ------------------------------------------------------------------------------------------ + */ - // Query for extracting data from the source endpoint. private $sourceQueryString = null; - // A Query object containing the source query for this ingestor + /** ----------------------------------------------------------------------------------------- + * A Query object representing the source query for this action + * + * @var Query | null + * ------------------------------------------------------------------------------------------ + */ + protected $etlSourceQuery = null; - // The list of field names in the source query - protected $availableSourceQueryFields = null; + /** ----------------------------------------------------------------------------------------- + * An array containing the field names available from the source record (query, + * structured file, etc.) + * + * @var array | null + * ------------------------------------------------------------------------------------------ + */ - // Note these values are used so we don't have to escape quotes and such. + protected $sourceRecordFields = null; + + /** ----------------------------------------------------------------------------------------- + * Line separator for MySQL LOAD DATA INFILE LINES TERMINATED BY. + * + * @var string + * ------------------------------------------------------------------------------------------ + */ - // Line separator for MySQL LOAD DATA INFILE LINES TERMINATED BY. protected $lineSeparator = '\n'; - // Field separator for MySQL LOAD DATA INFILE FIELDS TERMINATED BY. + /** ----------------------------------------------------------------------------------------- + * Field separator for MySQL LOAD DATA INFILE FIELDS TERMINATED BY. + * + * @var string + * ------------------------------------------------------------------------------------------ + */ + protected $fieldSeparator = '\t'; - // String enclosure for MySQL LOAD DATA INFILE ENCLOSED BY. + /** ----------------------------------------------------------------------------------------- + * String enclosure for MySQL LOAD DATA INFILE ENCLOSED BY. + * + * @var string + * ------------------------------------------------------------------------------------------ + */ + protected $stringEnclosure = ''; - /* ------------------------------------------------------------------------------------------ - * Set up data endpoints and other options. + /** ----------------------------------------------------------------------------------------- + * General setup. + * + * @see iAction::__construct() * - * @param IngestorOptions $options Options specific to this Ingestor + * @param aOptions $options Options specific to this Ingestor * @param EtlConfiguration $etlConfig Parsed configuration options for this ETL + * @param Log $logger PEAR Log object for system logging * ------------------------------------------------------------------------------------------ */ @@ -128,7 +186,7 @@ public function __construct(aOptions $options, EtlConfiguration $etlConfig, Log } // __construct() - /* ------------------------------------------------------------------------------------------ + /** ----------------------------------------------------------------------------------------- * @see iAction::initialize() * ------------------------------------------------------------------------------------------ */ @@ -146,13 +204,21 @@ public function initialize(EtlOverseerOptions $etlOverseerOptions = null) // Get the handles for the various database endpoints if ( ! $this->utilityEndpoint instanceof iRdbmsEndpoint ) { - $msg = "Utility endpoint does not implement of ETL\\DataEndpoint\\iRdbmsEndpoint"; - $this->logAndThrowException($msg); + $this->logAndThrowException( + sprintf( + "Utility endpoint %s does not implement ETL\\DataEndpoint\\iRdbmsEndpoint", + get_class($this->utilityEndpoint) + ) + ); } if ( ! $this->sourceEndpoint instanceof iRdbmsEndpoint ) { - $msg = "Source endpoint is not an instance of ETL\\DataEndpoint\\iRdbmsEndpoint"; - $this->logAndThrowException($msg); + $this->logAndThrowException( + sprintf( + "Source endpoint %s does not implement ETL\\DataEndpoint\\iRdbmsEndpoint", + get_class($this->sourceEndpoint) + ) + ); } if ( "mysql" == $this->destinationHandle->_db_engine ) { @@ -178,7 +244,11 @@ public function initialize(EtlOverseerOptions $etlOverseerOptions = null) // in the _execute() function with the current start/end dates but are needed here to // parse the query. - $this->getEtlOverseerOptions()->applyOverseerRestrictions($this->etlSourceQuery, $this->sourceEndpoint, $this); + $this->getEtlOverseerOptions()->applyOverseerRestrictions( + $this->etlSourceQuery, + $this->sourceEndpoint, + $this + ); } // ( null === $this->etlSourceQuery ) @@ -190,124 +260,18 @@ public function initialize(EtlOverseerOptions $etlOverseerOptions = null) if ( null !== $this->sourceQueryString && ! (is_string($this->sourceQueryString) || empty($this->sourceQueryString)) ) { - $msg = "Source query must be null or a non-empty string"; - $this->logAndThrowException($msg); + $this->logAndThrowException("Source query must be null or a non-empty string"); } // Get the list of available source query fields. If we have described the source query in // the JSON config, use the record keys otherwise we need to parse the SQL string. - $this->availableSourceQueryFields = + $this->sourceRecordFields = ( null !== $this->etlSourceQuery ? array_keys($this->etlSourceQuery->records) : $this->getSqlColumnNames($this->sourceQueryString) ); - $this->destinationFieldMappings = $this->getDestinationFields(); - - // Generate and verify destination fields. - // - // Use Cases: - // - // >= 1 table & mismatch in # tables vs # dest fields = error - // 1 table & 0 field list = create destination fields from query - // 1 table & 1 field list = verify columns - // >= 1 table & # tables = # dest fields = verify columns - // - // 1. If a single destination table definition has been provided and destination fields have - // not been defined, create the destination fields assuming all of the columns from the - // query will be used. - // - // 2. If multiple destination tables have been defined the destination fields must be - // provided in the configuration or a subclass. Verify that the number of tables and - // destination field lists match. - // - // 3. Verify that all destination field mappings are valid. - - if ( 1 == count($this->etlDestinationTableList) - && 0 == count($this->destinationFieldMappings) ) - { - // Use all of the source columns as destination fields. Check that the all of the - // parsed columns are found in the table definition. If not, throw an error and the - // developer will need to provide them. - - reset($this->etlDestinationTableList); - $etlTableKey = key($this->etlDestinationTableList); - - // We only need to parse the SQL if it has been provided as a string, otherwise use: - // array_keys($this->etlSourceQuery->records); - - $this->destinationFieldMappings[$etlTableKey] = - array_combine($this->availableSourceQueryFields, $this->availableSourceQueryFields); - $this->logger->debug("Destination fields parsed from source query (table definition key '$etlTableKey'): " . - implode(", ", $this->destinationFieldMappings[$etlTableKey])); - $this->fullSourceToDestinationMapping = true; - } elseif ( count($this->etlDestinationTableList) > 1 - && count($this->destinationFieldMappings) != count($this->etlDestinationTableList) ) - { - if ( 0 == count($this->destinationFieldMappings) ) { - $msg = "destination_field_map must be defined when > 1 table definitions are provided"; - } else { - $msg = "Destination fields missing for destination tables (" . - implode(",", array_diff(array_keys($this->etlDestinationTableList), array_keys($this->destinationFieldMappings))) . - ")"; - } - $this->logAndThrowException($msg); - } - - // Ensure that the keys in the destination record map match a defined table - - foreach ( array_keys($this->destinationFieldMappings) as $destinationTableKey ) { - if ( ! array_key_exists($destinationTableKey, $this->etlDestinationTableList) ) { - $msg = "Destination record map references undefined table: $destinationTableKey"; - $this->logAndThrowException($msg); - } - } - - // Verify that the destination column keys match the table columns and the values match a - // column in the query. - - $undefinedDestinationTableColumns = array(); - $undefinedSourceQueryColumns = array(); - - foreach ( $this->etlDestinationTableList as $etlTableKey => $etlTable ) { - $availableTableFields = $etlTable->getColumnNames(); - - // Ensure destination table columns exist (keys) - - $destinationTableMap = array_keys($this->destinationFieldMappings[$etlTableKey]); - $missing = array_diff($destinationTableMap, $availableTableFields); - if ( 0 != count($missing) ) { - $undefinedDestinationTableColumns[] = "Table '$etlTableKey' has undefined table columns/keys (" . - implode(",", $missing) . ")"; - } - - // Ensure source query columns exist (values) - $sourceQueryFields = $this->destinationFieldMappings[$etlTableKey]; - $missing = array_diff($sourceQueryFields, $this->availableSourceQueryFields); - if ( 0 != count($missing) ) { - $missing = array_map( - function ($k, $v) { - return "$k = $v"; - }, - array_keys($missing), - $missing - ); - $undefinedSourceQueryColumns[] = "Table '$etlTableKey' has undefined source query records for keys (" . - implode(", ", $missing) . ")"; - } - - } // foreach ( $this->etlDestinationTableList as $etlTableKey => $etlTable ) - - if ( 0 != count($undefinedDestinationTableColumns) || 0 != count($undefinedSourceQueryColumns) ) { - $msg = "Undefined keys or values in ETL destination_record_map. "; - if ( 0 != count($undefinedDestinationTableColumns) ) { - $msg .= implode("; ", $undefinedDestinationTableColumns) . ", "; - } - if ( 0 != count($undefinedSourceQueryColumns) ) { - $msg .= implode("; ", $undefinedSourceQueryColumns); - } - $this->logAndThrowException($msg); - } + $this->parseDestinationFieldMap($this->sourceRecordFields); $this->initialized = true; @@ -315,57 +279,21 @@ function ($k, $v) { } // initialize() - /* ------------------------------------------------------------------------------------------ - * By default, we will attempt to parse the destination fields from the source query unless this - * method returns a non-null value. Child classes may override this method if parsing the source - * query is not appropriate. + /** ----------------------------------------------------------------------------------------- + * Get the query to be run against the source data endpoint that will be used to + * extract the data. * - * @return NULL to attempt to parse the destination fields from the source query, or an array - * where the keys match etl table definitions and values map table columns (destination) to - * query result columns (source). - * ------------------------------------------------------------------------------------------ - */ - - protected function getDestinationFields() - { - if ( ! isset($this->parsedDefinitionFile->destination_record_map) ) { - return null; - } elseif ( ! is_object($this->parsedDefinitionFile->destination_record_map) ) { - $msg = "destination_fields must be an object where keys match table definition keys"; - $this->logAndThrowException($msg); - } - - $destinationFieldMappings = array(); - - foreach ( $this->parsedDefinitionFile->destination_record_map as $etlTableKey => $fieldMap ) { - if ( ! is_object($fieldMap) ) { - $msg = "Destination field map for table '$etlTableKey' must be an object"; - $this->logAndThrowException($msg); - } elseif ( 0 == count(array_keys((array) $fieldMap)) ) { - $msg = "destination_record_map for '$etlTableKey' is empty"; - $this->logger->warning($msg); - } - // Convert the field map from an object to an associative array. Keys are table columns - // (destination) and values are query result columns (source) - $destinationFieldMappings[$etlTableKey] = (array) $fieldMap; - } - - return $destinationFieldMappings; - } // getDestinationFields() - - /* ------------------------------------------------------------------------------------------ - * Get the query to be run against the source data endpoint that will extract the data. - * - * @return A string containing the query on the source endpoint. + * @return string The query on the source endpoint. * ------------------------------------------------------------------------------------------ */ protected function getSourceQueryString() { if ( null === $this->etlSourceQuery ) { - $msg ="ETL source query object not instantiated. " . - "Perhaps it is not specified in the definition file and not implemented in the Ingestor."; - $this->logAndThrowException($msg); + $this->logAndThrowException( + "ETL source query object not instantiated. Perhaps it is not specified in " + . "the definition file and not implemented in the Ingestor." + ); } $sql = $this->etlSourceQuery->getSql(); @@ -383,7 +311,7 @@ protected function getSourceQueryString() } // getSourceQueryString() - /* ------------------------------------------------------------------------------------------ + /** ----------------------------------------------------------------------------------------- * Perform the query on the data source. * * @return A PDOStatement with the results of the source query. @@ -425,8 +353,9 @@ protected function getSourceData() array('exception' => $e, 'sql' => $this->sourceQueryString, 'endpoint' => $this->sourceEndpoint) ); } elseif ( $n_attempts > self::MAX_QUERY_ATTEMPTS ) { - $msg = "Could not execute source query after " . self::MAX_QUERY_ATTEMPTS . " attempts. Exiting."; - $this->logAndThrowException($msg); + $this->logAndThrowException( + sprintf("Could not execute source query after %d attempts. Exiting.", self::MAX_QUERY_ATTEMPTS) + ); } $this->logger->info( @@ -447,104 +376,29 @@ protected function getSourceData() } // getSourceData() - /* ------------------------------------------------------------------------------------------ - * By default, there are no pre-execution tasks. - * - * @see iAction::performPreExecuteTasks() - * ------------------------------------------------------------------------------------------ - */ - - protected function performPreExecuteTasks() { - - // ------------------------------------------------------------------------------------------ - // Update the start/end dates for this query and get the source query string. It is important - // to do it in the pre-execute stage because if we are chunking our ingest it will get - // updated every time. - - $this->sourceQueryString = $this->getSourceQueryString(); - - // ------------------------------------------------------------------------------------------ - // ETL table management. We can extract the table name, schema, and column names from this - // object. - - $sqlList = array(); - $disableForeignKeys = false; - - try { - - // Bring the destination table in line with the configuration if necessary. Note that - // manageTable() is DRYRUN aware. - - foreach ( $this->etlDestinationTableList as $etlTableKey => $etlTable ) { - $qualifiedDestTableName = $etlTable->getFullName(); - - if ( "myisam" == strtolower($etlTable->engine) ) { - $disableForeignKeys = true; - if ( $this->options->disable_keys ) { - $this->logger->info("Disable keys on $qualifiedDestTableName"); - $sqlList[] = "ALTER TABLE $qualifiedDestTableName DISABLE KEYS"; - } - } - - $this->manageTable($etlTable, $this->destinationEndpoint); - - } // foreach ( $this->etlDestinationTableList as $etlTableKey => $etlTable ) - - } catch ( Exception $e ) { - $msg = "Error managing ETL table for " . $this->getName() . ": " . $e->getMessage(); - $this->logAndThrowException($msg); - } - - if ( $disableForeignKeys ) { - // See http://dev.mysql.com/doc/refman/5.7/en/server-system-variables.html#sysvar_foreign_key_checks - $sqlList[] = "SET FOREIGN_KEY_CHECKS = 0"; - } - - $this->executeSqlList($sqlList, $this->destinationEndpoint, "Pre-execute tasks"); - - return true; - - } // performPreExecuteTasks() - - /* ------------------------------------------------------------------------------------------ - * By default, there are no pre-execution tasks. + /** ----------------------------------------------------------------------------------------- + * Perform pre-execution tasks including disabling foreign key constraints and + * managing table structure. * - * @see iAction::performPostExecuteTasks() + * @see iAction::performPreExecuteTasks() * ------------------------------------------------------------------------------------------ */ - protected function performPostExecuteTasks($numRecordsProcessed) - { - $sqlList = array(); - $enableForeignKeys = false; - - foreach ( $this->etlDestinationTableList as $etlTableKey => $etlTable ) { - $qualifiedDestTableName = $etlTable->getFullName(); - - if ( "myisam" == strtolower($etlTable->engine) ) { - $enableForeignKeys = true; - if ( $this->options->disable_keys ) { - $this->logger->info("Enable keys on $qualifiedDestTableName"); - $sqlList[] = "ALTER TABLE $qualifiedDestTableName ENABLE KEYS"; - } - } + protected function performPreExecuteTasks() { - if ( $numRecordsProcessed > 0 ) { - $sqlList[] = "ANALYZE TABLE $qualifiedDestTableName"; - } - } + parent::performPreExecuteTasks(); - if ( $enableForeignKeys ) { - // See http://dev.mysql.com/doc/refman/5.7/en/server-system-variables.html#sysvar_foreign_key_checks - $sqlList[] = "SET FOREIGN_KEY_CHECKS = 1"; - } + // Update the start/end dates for this query and get the source query string. It + // is important to do it in the pre-execute stage because if we are chunking our + // ingest it will get updated every time. - $this->executeSqlList($sqlList, $this->destinationEndpoint, "Post-execute tasks"); + $this->sourceQueryString = $this->getSourceQueryString(); return true; - } // performPostExecuteTasks() - /* ------------------------------------------------------------------------------------------ + } // performPreExecuteTasks() + + /** ------------------------------------------------------------------------------------------ * @see iAction::execute() * ------------------------------------------------------------------------------------------ */ @@ -583,18 +437,19 @@ protected function _execute() } // _execute() - /* ------------------------------------------------------------------------------------------ - * If the source and destination endpoints are on the same database server we can optimize the - * ingestion by executing an INSERT...SELECT statement directly on the server rather than - * selecting the data, brining it back to this host, chunking it into a file, and running LOAD - * DATA INFILE to load it back into the database. Tests on 875K records show a 29% improvement - * using INSERT...SELECT and a 48% improvement using INSERT...SELECT and disabling keys during - * load. + /** ----------------------------------------------------------------------------------------- + * If the source and destination endpoints are on the same database server we can + * optimize the ingestion by executing an INSERT...SELECT statement directly on the + * server rather than selecting the data, brining it back to this host, chunking it + * into a file, and running LOAD DATA INFILE to load it back into the database. Tests + * on 875K records show a 29% improvement using INSERT...SELECT and a 48% improvement + * using INSERT...SELECT and disabling keys during load. * - * NOTE: This method assumes that data is being mapped from one source table to one destination - * table and all columns are being used. + * NOTE: This method assumes that data is being mapped from one source table to one + * destination table and all columns are being used. If any translation is performed + * then this method cannot be used. * - * @return The number of records processed + * @return int The number of records processed * ------------------------------------------------------------------------------------------ */ @@ -648,19 +503,20 @@ function ($s) { } // singleDatabaseIngest() - /* ------------------------------------------------------------------------------------------ - * If the source and destination endpoints are not on the same database server (or other - * criteria are met such as needing to update rather than replace a row) we will need to select - * the data, brining it back to this host, chunk it into a file, and run LOAD DATA INFILE to - * load it into the database. + /** ----------------------------------------------------------------------------------------- + * If the source and destination endpoints are not on the same database server, we are + * populating multiple tables from a single query, or translation is being performed + * on the data we will need to select the data, brining it back to this host, chunk it + * into a file, and run LOAD DATA INFILE to load it into the database. * - * @return The number of records processed + * @see allowSingleDatabaseOptimization() + * + * @return int The number of records processed * ------------------------------------------------------------------------------------------ */ private function multiDatabaseIngest() { - // Set up one infile and output file descriptor for each destination $infileList = array(); @@ -745,8 +601,9 @@ function ($s) { foreach ( $infileList as $etlTableKey => $infileName ) { if ( false === ($outFd = fopen($infileName, 'w')) ) { - $msg = "Failed to open temporary file for database ingest: '$infileName'"; - $this->logAndThrowException($msg); + $this->logAndThrowException( + sprintf("Failed to open temporary file for database ingest: '%s'", $infileName) + ); } $outFdList[$etlTableKey] = $outFd; } // foreach ( $infileList as $etlTableKey => $infileName ) @@ -925,7 +782,9 @@ function ($s) { } } // foreach ( $this->etlDestinationTableList as $etlTableKey => $etlTable ) - $this->logger->debug(sprintf('Loaded %d files in %ds', $numFilesLoaded, microtime(true) - $loadFileStart)); + $this->logger->debug( + sprintf('Loaded %d files in %ds', $numFilesLoaded, microtime(true) - $loadFileStart) + ); $loadFileStart = microtime(true); $numRecordsInFile = 0; @@ -969,13 +828,20 @@ function ($s) { } // if ( $numRecordsInFile) $this->logger->info( - sprintf('%s: Processed %s records (%s source records)', get_class($this), number_format($totalRecordsProcessed), number_format($numSourceRecordsProcessed)) + sprintf( + '%s: Processed %s records (%s source records)', + get_class($this), + number_format($totalRecordsProcessed), + number_format($numSourceRecordsProcessed) + ) ); // Return buffering to its original state. This is a MySQL specific optimization. if ( ! $this->options->buffered_query && $this->sourceEndpoint instanceof Mysql ) { - $this->logger->info("Returning buffered query mode to: " . ($originalBufferedQueryAttribute ? "true" : "false") ); + $this->logger->info( + sprintf("Returning buffered query mode to: %s". ($originalBufferedQueryAttribute ? "true" : "false")) + ); $pdo = $this->sourceHandle->handle(); $pdo->setAttribute(PDO::MYSQL_ATTR_USE_BUFFERED_QUERY, $originalBufferedQueryAttribute); } @@ -984,16 +850,18 @@ function ($s) { } // multiDatabaseIngest() - /* ------------------------------------------------------------------------------------------ + /** ----------------------------------------------------------------------------------------- * Perform a transformation on a single data record (row). Transformation may alter * the values of the record and may create multiple records from a single record, but * it should not modify the struture of the record itself (e.g., the keys). Because we * support the ability to transform a single source record into multiple result * records, an array of records is returned, even for a single result record. * - * @param $record An associative array containing the source record + * @param array $record An associative array containing the source record where the + * keys are the field names. * - * @return An array of transformed records. + * @return array A 2-dimensional array of potentially transformed records where each + * element is an individual record. * ------------------------------------------------------------------------------------------ */ @@ -1015,14 +883,14 @@ protected function transform(array $srcRecord) } // transform() - /* ------------------------------------------------------------------------------------------ - * Determine if we can support optimizations for queries within a single database. Not only must - * our source and destination databases be the same, but we cannot optimize if we are dealing - * with multiple destination tables, or other factors. Optimization may be performed as a - * "INSERT...SELECT" directly in the database rather than a SELECT returning the data and then a - * separate INSERT. + /** ------------------------------------------------------------------------------------------ + * Determine if we can support optimizations for queries within a single database. Not + * only must our source and destination databases be the same, but we cannot optimize + * if we are dealing with multiple destination tables, or other factors. Optimization + * may be performed as a "INSERT...SELECT" directly in the database rather than a + * SELECT returning the data and then a separate INSERT. * - * @return true If both the source and destination are the same server. + * @return boolean TRUE if database optimization is allowed, FALSE if not. * ------------------------------------------------------------------------------------------ */ @@ -1073,7 +941,7 @@ protected function allowSingleDatabaseOptimization() reset($this->destinationFieldMappings); - if ( count($this->availableSourceQueryFields) != count(current($this->destinationFieldMappings)) ) { + if ( count($this->sourceRecordFields) != count(current($this->destinationFieldMappings)) ) { $this->logger->debug("Mapping a subset of the source query fields"); return false; } diff --git a/classes/ETL/Maintenance/ExecuteSql.php b/classes/ETL/Maintenance/ExecuteSql.php index 4fefda41cc..077a79c928 100644 --- a/classes/ETL/Maintenance/ExecuteSql.php +++ b/classes/ETL/Maintenance/ExecuteSql.php @@ -15,9 +15,9 @@ use ETL\iAction; use ETL\aAction; use ETL\DataEndpoint\iRdbmsEndpoint; -use \PDOException; +use PDOException; use ETL\Utilities; -use \Log; +use Log; use PHPSQLParser\PHPSQLParser; use PHPSQLParser\PHPSQLCreator; @@ -147,6 +147,26 @@ public function initialize(EtlOverseerOptions $etlOverseerOptions = null) } // initialize() + /** ----------------------------------------------------------------------------------------- + * @see aAction::performPreExecuteTasks() + * ------------------------------------------------------------------------------------------ + */ + + protected function performPreExecuteTasks() + { + return true; + } // performPreExecuteTasks() + + /** ----------------------------------------------------------------------------------------- + * @see aAction::performPostExecuteTasks() + * ------------------------------------------------------------------------------------------ + */ + + protected function performPostExecuteTasks($numRecordsProcessed = null) + { + return true; + } // performPostExecuteTasks() + /* ------------------------------------------------------------------------------------------ * @see iAction::execute() * ------------------------------------------------------------------------------------------ diff --git a/classes/ETL/aAction.php b/classes/ETL/aAction.php index df80787de3..e7df5b8325 100644 --- a/classes/ETL/aAction.php +++ b/classes/ETL/aAction.php @@ -106,7 +106,6 @@ public function __construct(aOptions $options, EtlConfiguration $etlConfig, Log // has already been set by a child constructor leave it alone. if ( null === $this->parsedDefinitionFile ) { - $this->logger->info("Parse definition file: '" . $this->definitionFile . "'"); $this->parsedDefinitionFile = new Configuration( $this->definitionFile, $this->options->paths->base_dir, @@ -401,5 +400,30 @@ public function initializeDestinationEndpoint() return $this; } - abstract public function execute(EtlOverseerOptions $etlOverseerOptions); + /** ----------------------------------------------------------------------------------------- + * Perform any pre-execution tasks. For example, disabling table keys on MyISAM + * tables, or other setup tasks. + * + * NOTE: This method must check if we are in DRYRUN mode before executing any tasks. + * + * @return true on success + * ------------------------------------------------------------------------------------------ + */ + + abstract protected function performPreExecuteTasks(); + + /** ----------------------------------------------------------------------------------------- + * Perform any post-execution tasks. For example, enabling table keys on MyISAM + * tables, or tracking table history. + * + * NOTE: This method must check if we are in DRYRUN mode before executing any tasks. + * + * @param integer|null $numRecordsProcessed The number of records processed during + * execution, or NULL if it is not used by this action. + * + * @return true on success + * ------------------------------------------------------------------------------------------ + */ + + abstract protected function performPostExecuteTasks($numRecordsProcessed = null); } // abstract class aAction diff --git a/classes/ETL/aRdbmsDestinationAction.php b/classes/ETL/aRdbmsDestinationAction.php index 0188582174..0dc79b11e0 100644 --- a/classes/ETL/aRdbmsDestinationAction.php +++ b/classes/ETL/aRdbmsDestinationAction.php @@ -1,7 +1,8 @@ destinationEndpoint instanceof iRdbmsEndpoint ) { $this->destinationEndpoint = null; - $msg = "Destination endpoint does not implement ETL\\DataEndpoint\\iRdbmsEndpoint"; - $this->logAndThrowException($msg); + $this->logAndThrowException( + "Destination endpoint does not implement ETL\\DataEndpoint\\iRdbmsEndpoint" + ); + } + + if ( ! isset($this->parsedDefinitionFile->table_definition) ) { + $this->logAndThrowException("Definition file does not contain a 'table_definition' key"); } // Create the objects representing the destination tables. This method can be @@ -89,15 +115,15 @@ public function initialize(EtlOverseerOptions $etlOverseerOptions = null) $this->createDestinationTableObjects(); if ( 0 == count($this->etlDestinationTableList) ) { - $msg = "No ETL destination tables defined"; - $this->logAndThrowException($msg); + $this->logAndThrowException("No ETL destination tables defined"); } foreach ( $this->etlDestinationTableList as $etlTableKey => $etlTable ) { if ( ! $etlTable instanceof Table ) { - $msg = "ETL destination table with key '$etlTableKey' is not an instance of Table"; - $this->logAndThrowException($msg); + $this->logAndThrowException( + sprintf("ETL destination table with key '%s' is not an instance of Table", $etlTableKey) + ); } $etlTable->verify(); @@ -109,31 +135,25 @@ public function initialize(EtlOverseerOptions $etlOverseerOptions = null) } // initialize() - /* ------------------------------------------------------------------------------------------ + /** ----------------------------------------------------------------------------------------- * Populate the $etlDestinationTableList with Table objects representing the tables * described in the table definition configuration block from the definition file. If * another type of table object is needed (e.g., AggregationTable for aggregation - * actions, then this method can be overriden. + * actions) then this method can be overriden. + * + * @return int The number of table definitions processed. * ------------------------------------------------------------------------------------------ */ protected function createDestinationTableObjects() { - if ( ! isset($this->parsedDefinitionFile->table_definition) ) { - $msg = "Definition file does not contain a 'table_definition' key"; - $this->logAndThrowException($msg); - } - // A table definition can be either: // // (1) A single table definition object (current default for a single destination - // table) or (2) An array of one or more table definitions (how we will initially - // handle multiple destination tables). - // - // In the future, we will support an object with key value pairs where the key is - // the table name and the value is the definition object. In the mean time, we - // will generate this format here so the rest of the code does not need to change - // later. + // table) or (2) An array of one or more table definitions. Both are stored + // internally as an associative array where the key is the name of the table. We + // could also represent multiple tables using the name as the key but I can't + // think of a current use case where we would need to do this // Normalize the table definition into a set of key-value pairs where the key is the // table name and the value is the definition object. @@ -154,39 +174,309 @@ protected function createDestinationTableObjects() $this->logger ); $this->logger->debug( - "Created ETL destination table object for table definition key '" - . $etlTable->name - . "'" + sprintf("Created ETL destination table object for table definition key '%s'", $etlTable->name) ); $etlTable->schema = $this->destinationEndpoint->getSchema(); - $tableName = $etlTable->getFullName(); - if ( ! is_string($tableName) || empty($tableName) ) - { - $msg = "Destination table name must be a non-empty string"; - $this->logAndThrowException($msg); + if ( ! is_string($etlTable->name) || empty($etlTable->name) ) { + $this->logAndThrowException("Destination table name must be a non-empty string"); } $this->etlDestinationTableList[$etlTable->name] = $etlTable; } catch (Exception $e) { - $this->logAndThrowException($e->getMessage() . " in file '" . $this->definitionFile . "'"); + $this->logAndThrowException(sprintf("%s in file '%s'", $e->getMessage(), $this->definitionFile)); + continue; } } // foreach ( $tableDefinitionList as $etlTableKey => $tableDefinition ) - if ( 0 == count($this->etlDestinationTableList) ) { - $msg = "No table definitions specified"; - $this->logAndThrowException($msg); + $numTableDefinitions = count($this->etlDestinationTableList); + if ( 0 == $numTableDefinitions ) { + $this->logAndThrowException("No table definitions specified"); } + return $numTableDefinitions; } // createDestinationTableObjects() - /* ------------------------------------------------------------------------------------------ - * Truncate the destination table. Note that performTruncateDestinationTasks() will be called to - * do the actual work. + /** ----------------------------------------------------------------------------------------- + * Parse and verify the mapping between source record fields and destination table + * fields. If a mapping has not been provided, generate one automatically. The + * destination field map specifies a mapping from source record fields to destination + * table fields for one or more destination tables. + * + * Use Cases: + * + * 1. There are >= 1 destination tables and no destination field map * - * @return TRUE on success - * @throws Exception If any operations failed + * Automatically create the destination field map by mapping source fields to + * destination table fields **where the fields match, excluding non-matching fields.** + * Log a warning for any fields that do not match. + * + * 2. There are >= 1 destination tables and a destination field map is specified. + * + * Verify that the destination fields specified in the mapping are valid fields for + * the destination table that they references. Also verify that the source fields are + * valid. It is not required that all source fields are mapped to destination fields + * but care should be exercised that resonable defaults are specified in the table + * definitions. + * + * @param array An array containing the fields available from the source record + * + * @return array | null A 2-dimensional array where the keys match etl table + * definitions and values map table columns (destination) to query result columns + * (source), or null if no destination record map was specified. + * ------------------------------------------------------------------------------------------ + */ + + protected function parseDestinationFieldMap(array $sourceFields) + { + $this->destinationFieldMappings = array(); + + if ( ! isset($this->parsedDefinitionFile->destination_record_map) ) { + + $this->destinationFieldMappings = $this->generateDestinationFieldMap($sourceFields); + + } elseif ( ! is_object($this->parsedDefinitionFile->destination_record_map) ) { + + $this->logAndThrowException("destination_record_map must be an object"); + + } else { + + foreach ( $this->parsedDefinitionFile->destination_record_map as $etlTableKey => $fieldMap ) { + + if ( ! is_object($fieldMap) ) { + $this->logAndThrowException( + sprintf("destination_record_map for table '%s' must be an object", $etlTableKey) + ); + } elseif ( 0 == count(array_keys((array) $fieldMap)) ) { + $this->logger->warning( + sprintf("%s: destination_record_map for table '%s' is empty", $this, $etlTableKey) + ); + } + + // Convert the field map from an object to an associative array where keys + // are destination table columns and values are source record fields + $this->destinationFieldMappings[$etlTableKey] = (array) $fieldMap; + + } + } + + $success = true; + $success &= $this->verifyDestinationMapKeys(); + $success &= $this->verifyDestinationMapValues($sourceFields); + + return $success; + + } // parseDestinationFieldMap() + + /** ----------------------------------------------------------------------------------------- + * Generate a destination field map for each destination table based on the + * intersection of the source record fields and table fields. Only fields common to + * both source and destination are mapped with unknown fields logged as warnings. + * + * @param array An array containing the fields available from the source record + * + * @return array A 2-dimensional array where the keys match ETL table names and values + * are a 2-dimensional array mapping destination table fields to source record + * fields. + * ------------------------------------------------------------------------------------------ + */ + + protected function generateDestinationFieldMap(array $sourceFields) + { + $destinationFieldMap = array(); + $fieldMapDebugOutput = ''; + $numSourceFields = count($sourceFields); + + $this->logger->debug( + sprintf( + "Auto-generating destination_field_map from %d source fields: %s", + $numSourceFields, + implode(', ', $sourceFields) + ) + ); + + foreach ( $this->etlDestinationTableList as $etlTableKey => $etlTable ) { + + $availableTableFields = $etlTable->getColumnNames(); + + $this->logger->debug( + sprintf("Available fields for table key '%s': %s", $etlTableKey, implode(', ', $availableTableFields)) + ); + + if ( 0 == $numSourceFields ) { + $destinationFieldMap[$etlTableKey] = array(); + continue; + } + + // Map common fields and log warnings for fields that are not mapped + + $commonFields = array_intersect($availableTableFields, $sourceFields); + $unmappedSourceFields = array_diff($sourceFields, $availableTableFields); + + $destinationFieldMap[$etlTableKey] = array_combine($commonFields, $commonFields); + + // Generate a more succinct representation of the field map + + $fieldMapDebugOutput .= sprintf( + "Table: %s%s", + $etlTableKey, + array_reduce( + $commonFields, + function ($carry, $item) { + $carry .= sprintf("%s %s -> %s", PHP_EOL, $item, $item); + return $carry; + }, + '' + ) + ); + + if ( 0 != count($unmappedSourceFields) ) { + $this->logger->warning( + sprintf( + "%s: The following source record fields were not mapped for table '%s': (%s)", + $this, + $etlTableKey, + implode(', ', $unmappedSourceFields) + ) + ); + } + } + + if ( 0 == count($sourceFields) ) { + $this->logger->debug( + sprintf( + "Generated empty destination_field_map for table keys: %s", + implode(', ', array_keys($destinationFieldMap)) + ) + ); + + } else { + $this->logger->debug( + sprintf("Generated destination_field_map:\n%s", $fieldMapDebugOutput) + ); + } + + return $destinationFieldMap; + + } // generateDestinationFieldMap() + + /** ----------------------------------------------------------------------------------------- + * Verify that the destination map keys are valid table fields. Remember that the + * destination record map translates source (query, structured file, etc.) fields to + * destination table fields. The keys in the map must be valid destination table + * fields. + * + * Note that when a destination map is auto-generated, source fields not found in the + * destination are not added. + * + * @return bool TRUE on success + * + * @throws Exception If a key is not a valid table field. + * ------------------------------------------------------------------------------------------ + */ + + protected function verifyDestinationMapKeys() + { + // For each table field specified in the destination table field mapping, verify + // that it is present in one of the destination table definitions. + + $undefinedFields = array(); + + foreach ( $this->destinationFieldMappings as $etlTableKey => $destinationTableMap ) { + if ( ! array_key_exists($etlTableKey, $this->etlDestinationTableList) ) { + $this->logAndThrowException( + sprintf("Unknown table '%s' referenced in destination_record_map", $etlTableKey) + ); + } + $availableTableFields = $this->etlDestinationTableList[$etlTableKey]->getColumnNames(); + // Remember that the keys in the field map are table field names + $destinationTableFields = array_keys($destinationTableMap); + $missing = array_diff($destinationTableFields, $availableTableFields); + + if ( 0 != count($missing) ) { + $undefinedFields[] = sprintf( + "Table '%s' has undefined table columns/keys (%s)", + $etlTableKey, + implode(",", $missing) + ); + } + } + + if ( 0 != count($undefinedFields) ) { + $this->logAndThrowException( + sprintf( + "Undefined keys (destination table fields) in ETL destination_record_map: (%s)", + implode(', ', $undefinedFields) + ) + ); + } + + return true; + + } // verifyDestinationMapKeys() + + /** ----------------------------------------------------------------------------------------- + * Verify that the destination map values are valid source record fields. Remember that the + * destination record map translates source (query, structured file, etc.) fields to + * destination table fields. The values in the map must be valid source record fields. + * + * @param array An array containing the fields available from the source record + * + * @return bool TRUE on success + * + * @throws Exception If a value is not a valid source record field. + * ------------------------------------------------------------------------------------------ + */ + + protected function verifyDestinationMapValues(array $sourceFields) + { + $undefinedFields = array(); + + foreach ( $this->destinationFieldMappings as $etlTableKey => $destinationTableMap ) { + if ( ! array_key_exists($etlTableKey, $this->etlDestinationTableList) ) { + $this->logAndThrowException( + sprintf("Unknown table '%s' referenced in destination_record_map", $etlTableKey) + ); + } + + $missing = array_diff($destinationTableMap, $sourceFields); + + if ( 0 != count($missing) ) { + $missing = array_map( + function ($k, $v) { + return "$k = $v"; + }, + array_keys($missing), + $missing + ); + $undefinedFields[] = sprintf( + "Table '%s' has undefined source query fields for keys (%s)", + $etlTableKey, + implode(",", $missing) + ); + } + + } // foreach ( $this->etlDestinationTableList as $etlTableKey => $etlTable ) + + if ( 0 != count($undefinedFields) ) { + $this->logAndThrowException( + sprintf( + "Undefined values (source record fields) in ETL destination_record_map: (%s)", + implode(', ', $undefinedFields) + ) + ); + } + + return true; + + } // verifyDestinationMapValues() + + /** ----------------------------------------------------------------------------------------- + * Truncate records from the destination table. Note that + * performTruncateDestinationTasks() will be called to do the actual work. + * + * @return bool TRUE on success * ------------------------------------------------------------------------------------------ */ @@ -196,21 +486,19 @@ protected function truncateDestination() return; } - // Truncate the old table, if requested. If queries are provided use them, otherwise truncate - // the table. + // Truncate the old table, if requested. If queries are provided use them, + // otherwise truncate the table. return $this->performTruncateDestinationTasks(); } // truncateDestination() - /* ------------------------------------------------------------------------------------------ - * The default task for truncating the destination table is executing a single TRUNCATE statement - * on the table. If other actions are required, this method should be extended. Note that DELETE - * triggers will not fire when the table is truncated. + /** ----------------------------------------------------------------------------------------- + * The default task for truncating the destination table is executing a single + * TRUNCATE statement on the table. If other actions are required, this method should + * be extended. Note that DELETE triggers will not fire when the table is truncated. * * NOTE: This method must check if we are in DRYRUN mode before executing any tasks. - * - * @see iIngestor::truncateDestinationTasks() * ------------------------------------------------------------------------------------------ */ @@ -253,15 +541,104 @@ protected function performTruncateDestinationTasks() } // performTruncateDestinationTasks() - /* ------------------------------------------------------------------------------------------ + /** ----------------------------------------------------------------------------------------- + * Manage destination tables and disable foreign keys if needed. + * + * @see aAction::performPreExecuteTasks() + * ------------------------------------------------------------------------------------------ + */ + + protected function performPreExecuteTasks() + { + $sqlList = array(); + $disableForeignKeys = false; + + try { + + // Bring the destination table in line with the configuration if necessary. Note that + // manageTable() is DRYRUN aware so we don't need to handle that here. + + foreach ( $this->etlDestinationTableList as $etlTableKey => $etlTable ) { + $qualifiedDestTableName = $etlTable->getFullName(); + + if ( "myisam" == strtolower($etlTable->engine) ) { + $disableForeignKeys = true; + if ( $this->options->disable_keys ) { + $this->logger->info("Disable keys on $qualifiedDestTableName"); + $sqlList[] = "ALTER TABLE $qualifiedDestTableName DISABLE KEYS"; + } + } + + $this->manageTable($etlTable, $this->destinationEndpoint); + + } // foreach ( $this->etlDestinationTableList as $etlTableKey => $etlTable ) + + } catch ( Exception $e ) { + $this->logAndThrowException( + sprintf("Error managing ETL table for '%s': %s", $this->getName(), $e->getMessage()) + ); + } + + if ( $disableForeignKeys ) { + // See http://dev.mysql.com/doc/refman/5.7/en/server-system-variables.html#sysvar_foreign_key_checks + $sqlList[] = "SET FOREIGN_KEY_CHECKS = 0"; + } + + $this->executeSqlList($sqlList, $this->destinationEndpoint, "Pre-execute tasks"); + + return true; + + } // performPreExecuteTasks() + + /** ----------------------------------------------------------------------------------------- + * Perform post-execution tasks such as re-enabling foreign key constraints and + * analyzing or optimizing the table. + * + * @see aAction::performPostExecuteTasks() + * ------------------------------------------------------------------------------------------ + */ + + protected function performPostExecuteTasks($numRecordsProcessed = null) + { + $sqlList = array(); + $enableForeignKeys = false; + + foreach ( $this->etlDestinationTableList as $etlTableKey => $etlTable ) { + $qualifiedDestTableName = $etlTable->getFullName(); + + if ( "myisam" == strtolower($etlTable->engine) ) { + $enableForeignKeys = true; + if ( $this->options->disable_keys ) { + $this->logger->info("Enable keys on $qualifiedDestTableName"); + $sqlList[] = "ALTER TABLE $qualifiedDestTableName ENABLE KEYS"; + } + } + + if ( null !== $numRecordsProcessed && $numRecordsProcessed > 0 ) { + $sqlList[] = "ANALYZE TABLE $qualifiedDestTableName"; + } + } + + if ( $enableForeignKeys ) { + // See http://dev.mysql.com/doc/refman/5.7/en/server-system-variables.html#sysvar_foreign_key_checks + $sqlList[] = "SET FOREIGN_KEY_CHECKS = 1"; + } + + $this->executeSqlList($sqlList, $this->destinationEndpoint, "Post-execute tasks"); + + return true; + + } // performPostExecuteTasks() + + /** ----------------------------------------------------------------------------------------- * Execute a list of SQL statements on the specified database handle, throwing an exception if * there was an error. * - * @param $sqlList An array of SQL statements to execute - * @param $endpoint An endpoint implementing iDataEndpoint - * @param $msgPrefix String to prefix log messages with + * @param array $sqlList The list of SQL statements to execute + * @param iDataEndpoint $endpoint An endpoint implementing iDataEndpoint + * @param string $msgPrefix Log message with prefix * - * @return TRUE on success + * @return bool TRUE on success * * @throws Exception If there was an error executing a statement * ------------------------------------------------------------------------------------------ @@ -283,7 +660,7 @@ protected function executeSqlList(array $sqlList, iDataEndpoint $endpoint, $msgP } catch (PDOException $e) { $this->logAndThrowException( - "Error executing " . ( "" != $msgPrefix ? "$msgPrefix " : "" ) . "SQL", + sprintf("Error executing %s SQL", ( "" != $msgPrefix ? "$msgPrefix " : "" )), array('exception' => $e, 'sql' => $sql, 'endpoint' => $endpoint) ); } @@ -293,15 +670,17 @@ protected function executeSqlList(array $sqlList, iDataEndpoint $endpoint, $msgP } // executeSqlList() - /* ------------------------------------------------------------------------------------------ - * Parse an SQL statement to retrieve column names, tables used, etc. - * @ See https://code.google.com/p/php-sql-parser/ + /** ----------------------------------------------------------------------------------------- + * Parse an SQL statement to retrieve column names, tables used, etc. This uses the + * Google SQL parser. * - * @param $sql The SQL statement to parse + * @see https://code.google.com/p/php-sql-parser/ * - * @return An associative array containing the parsed SQL + * @param string $sql The SQL statement to parse * - * @throws Exception If the SQL was empty + * @return array An associative array containing the parsed SQL + * + * @throws Exception If $sql was empty * ------------------------------------------------------------------------------------------ */ @@ -316,15 +695,15 @@ public function parseSql($sql) } // parseSql() - /* ------------------------------------------------------------------------------------------ - * Parse an SQL SELECT statement and return the selected colum names. - * @ See https://code.google.com/p/php-sql-parser/ + /** ----------------------------------------------------------------------------------------- + * Parse an SQL SELECT statement and return the fields (columns) that are being queried. + * @see https://code.google.com/p/php-sql-parser/ * - * @param $sql The SQL statement to parse + * @param string $sql The SQL statement to parse * - * @return An array containing the parsed column names + * @return array A lit of the parsed fieldnames * - * @throws Exception If the SQL was empty + * @throws Exception If $sql was empty * @throws Exception If there was no SELECT clause detected * ------------------------------------------------------------------------------------------ */ @@ -334,8 +713,7 @@ public function getSqlColumnNames($sql) $parsedSql = $this->parseSql($sql); if ( ! array_key_exists("SELECT", $parsedSql) ) { - $msg = "Select block not found in parsed SQL"; - $this->logAndThrowException($msg); + $this->logAndThrowException("Select block not found in parsed SQL"); } $columnNames = array(); @@ -343,7 +721,8 @@ public function getSqlColumnNames($sql) foreach ( $parsedSql['SELECT'] as $item ) { if ( array_key_exists('alias', $item) && $item['alias']['as'] - && array_key_exists('name', $item['alias']) ) { + && array_key_exists('name', $item['alias']) + ) { $columnNames[] = $item['alias']['name']; } else { $pos = strrpos($item['base_expr'], "."); @@ -355,19 +734,18 @@ public function getSqlColumnNames($sql) } // getSqlColumnNames() - /* ------------------------------------------------------------------------------------------ - * Compare the columns from the table object to those parsed from the SQL SELECT clause and verify - * that all of the parsed SQL columns are present in the table object. If the table object - * contains all columns parsed from SELECT clause of the SQL statement return the list of parsed - * column names, otherwise throw an exception. + /** ----------------------------------------------------------------------------------------- + * Compare the fields from the table object to those parsed from the SQL SELECT + * statement and verify that all of the parsed SQL fields are present in the table + * object. If the table object contains all columns parsed from SELECT clause of the + * SQL statement return the list of parsed column names, otherwise throw an exception. * - * @param $sql The SQL statement to parse. - * @param $table A Table object containing a table definition + * @param string $sql The SQL statement to parse. + * @param Table $table An object containing the table definition * - * @return An array containing all columns in the SELECT clause of the $sql parameter. + * @return array A list of all field names found in $sql * - * @throws Exception If any of the columns from the SQL SELECT clause were not found in the Table - * object + * @throws Exception If any of the fields from $sql were not found in the table object * ------------------------------------------------------------------------------------------ */ @@ -378,27 +756,30 @@ public function verifySqlColumns($sql, Table $table) $missingColumnNames = array_diff($sqlColumnNames, $tableColumnNames); if ( 0 != count($missingColumnNames) ) { - $msg = "The following columns from the SQL SELECT were not found in table definition for '{$table->name}': " . - implode(", ", $missingColumnNames); - $this->logAndThrowException($msg); + $this->logAndThrowException( + sprintf( + "The following columns from the SQL SELECT were not found in table definition for '%s': %s", + $table->name, + implode(', ', $missingColumnNames) + ) + ); } return $sqlColumnNames; } // verifySqlColumns() - /* ------------------------------------------------------------------------------------------ - * Manage an ETL tables. Based on the table object, create a new table or alter an existing table - * to bring it in line with the configuration in the table object. If we are in dryrun mode, do - * not perform any actions, only logging. - * - * @param $table A Table object - * @param $endpoint The destination data endpoint where the table will be created + /** ----------------------------------------------------------------------------------------- + * Manage an ETL table in a data endpoint to bring it in line with the structure + * specified in the table object. This includes creating a new table or alter an + * existing table. If we are in dryrun mode, do not perform any actions, only + * logging. * - * @return The Table object generated from the table configuration file + * @param Table $table An object describing the desired table structure + * @param iDataEndpoint $endpoint The destination data endpoint where the table will + * be created/altered * - * @throws Exception If any query data was not int the correct format. - * @throws Exception If the ETLOverseerOptions have not been set. + * @return Table The table object to support method chaining * ------------------------------------------------------------------------------------------ */ @@ -412,12 +793,12 @@ public function manageTable(Table $table, iDataEndpoint $endpoint) if ( false === $existingTable->discover($table->name, $endpoint) ) { - $this->logger->notice("Table " . $table->getFullName() . " does not exist, creating."); + $this->logger->notice(sprintf("Table %s does not exist, creating.", $table->getFullName())); $sqlList = $table->getSql(); foreach ( $sqlList as $sql ) { - $this->logger->debug("Create table SQL " . $endpoint . ":\n$sql"); + $this->logger->debug(sprintf("Create table SQL %s:\n%s", $endpoint, $sql)); if ( ! $this->getEtlOverseerOptions()->isDryrun() ) { $endpoint->getHandle()->execute($sql); } @@ -428,10 +809,10 @@ public function manageTable(Table $table, iDataEndpoint $endpoint) $sqlList = $existingTable->getAlterSql($table); if ( false !== $sqlList ) { - $this->logger->notice("Altering table " . $existingTable->getFullName()); + $this->logger->notice(sprintf("Altering table %s", $existingTable->getFullName())); foreach ( $sqlList as $sql ) { - $this->logger->debug("Alter table SQL " . $endpoint . ":\n$sql"); + $this->logger->debug(sprintf("Alter table SQL %s:\n%s", $endpoint, $sql)); if ( ! $this->getEtlOverseerOptions()->isDryrun() ) { $endpoint->getHandle()->execute($sql); } diff --git a/configuration/etl/etl.d.samples/static.json b/configuration/etl/etl.d.samples/static.json index 6c966ad32e..abe22b4271 100644 --- a/configuration/etl/etl.d.samples/static.json +++ b/configuration/etl/etl.d.samples/static.json @@ -9,13 +9,27 @@ "class": "StructuredFileIngestor", "definition_file": "static/job_times.json", "enabled": true, - "truncate_destination": true + "truncate_destination": true, + "endpoints": { + "source": { + "type": "jsonfile", + "name": "Job times bucket", + "path": "static/job_times.json" + } + } },{ "name": "ProcessorBucketsIngestor", "description": "Processor bucket ingestor", "class": "StructuredFileIngestor", "definition_file": "static/processor_buckets.json", "enabled": true, - "truncate_destination": true + "truncate_destination": true, + "endpoints": { + "source": { + "type": "jsonfile", + "name": "Processor bucket", + "path": "static/processor_buckets.json" + } + } }] } diff --git a/configuration/etl/etl.d/jobs_cloud.json b/configuration/etl/etl.d/jobs_cloud.json index 09883b890b..fe4274ecb3 100644 --- a/configuration/etl/etl.d/jobs_cloud.json +++ b/configuration/etl/etl.d/jobs_cloud.json @@ -19,7 +19,7 @@ } }, - "cloud-jobs": { + "jobs-cloud": { "namespace": "ETL\\Ingestor", "options_class": "IngestorOptions" } @@ -27,7 +27,7 @@ "#": "Current Cloud job ingestion", - "cloud-jobs": [ + "jobs-cloud": [ { "name": "CloudTableManagement", "class": "ManageTables", @@ -54,7 +54,14 @@ "class": "StructuredFileIngestor", "definition_file": "cloud/event_type.json", "enabled": true, - "truncate_destination": true + "truncate_destination": true, + "endpoints": { + "source": { + "type": "jsonfile", + "name": "Cloud event types", + "path": "cloud/event_type.json" + } + } }, { "name": "CloudAssetTypeIngestor", @@ -62,7 +69,14 @@ "class": "StructuredFileIngestor", "definition_file": "cloud/asset_type.json", "enabled": true, - "truncate_destination": true + "truncate_destination": true, + "endpoints": { + "source": { + "type": "jsonfile", + "name": "Cloud asset types", + "path": "cloud/asset_type.json" + } + } }, { "name": "CloudInstanceTypeIngestor", @@ -70,7 +84,14 @@ "class": "StructuredFileIngestor", "definition_file": "cloud/instance_type.json", "enabled": true, - "truncate_destination": true + "truncate_destination": true, + "endpoints": { + "source": { + "type": "jsonfile", + "name": "Cloud instance types", + "path": "cloud/instance_type.json" + } + } }, { "name": "CloudRegionIngestor", @@ -78,7 +99,14 @@ "class": "StructuredFileIngestor", "definition_file": "cloud/region.json", "enabled": true, - "truncate_destination": true + "truncate_destination": true, + "endpoints": { + "source": { + "type": "jsonfile", + "name": "Cloud regions", + "path": "cloud/region.json" + } + } }, { "name": "CloudAvailabilityZoneIngestor", @@ -86,7 +114,14 @@ "class": "StructuredFileIngestor", "definition_file": "cloud/avail_zone.json", "enabled": true, - "truncate_destination": true + "truncate_destination": true, + "endpoints": { + "source": { + "type": "jsonfile", + "name": "Cloud availability zones", + "path": "cloud/avail_zone.json" + } + } } ] } diff --git a/configuration/etl/etl.d/jobs_common.json b/configuration/etl/etl.d/jobs_common.json index 6ce4808f01..b441443439 100644 --- a/configuration/etl/etl.d/jobs_common.json +++ b/configuration/etl/etl.d/jobs_common.json @@ -49,7 +49,14 @@ "class": "StructuredFileIngestor", "definition_file": "jobs/countable_type.json", "enabled": true, - "truncate_destination": true + "truncate_destination": true, + "endpoints": { + "source": { + "type": "jsonfile", + "name": "Countable types", + "path": "jobs/countable_type.json" + } + } }, { "name": "JobRecordTypeIngestor", @@ -57,7 +64,14 @@ "class": "StructuredFileIngestor", "definition_file": "jobs/job_record_type.json", "enabled": true, - "truncate_destination": true + "truncate_destination": true, + "endpoints": { + "source": { + "type": "jsonfile", + "name": "Job record types", + "path": "jobs/job_record_type.json" + } + } }, { "name": "JobTaskTypeIngestor", @@ -65,7 +79,14 @@ "class": "StructuredFileIngestor", "definition_file": "jobs/job_task_type.json", "enabled": true, - "truncate_destination": true + "truncate_destination": true, + "endpoints": { + "source": { + "type": "jsonfile", + "name": "Job task types", + "path": "jobs/job_task_type.json" + } + } }, { "name": "UnitIngestor", @@ -73,7 +94,14 @@ "class": "StructuredFileIngestor", "definition_file": "jobs/unit.json", "enabled": true, - "truncate_destination": true + "truncate_destination": true, + "endpoints": { + "source": { + "type": "jsonfile", + "name": "Units of measure", + "path": "jobs/unit.json" + } + } }, { "name": "SubmissionVenueIngestor", @@ -81,7 +109,14 @@ "class": "StructuredFileIngestor", "definition_file": "jobs/submission_venue.json", "enabled": true, - "truncate_destination": true + "truncate_destination": true, + "endpoints": { + "source": { + "type": "jsonfile", + "name": "Submission venues", + "path": "jobs/submission_venue.json" + } + } } ] diff --git a/configuration/etl/etl_data.d.samples/static/job_times.json b/configuration/etl/etl_data.d.samples/static/job_times.json new file mode 100644 index 0000000000..1af27c3c61 --- /dev/null +++ b/configuration/etl/etl_data.d.samples/static/job_times.json @@ -0,0 +1,11 @@ +[ + ["id", "min_duration", "max_duration", "description"], + [ 0, 0, 0, "0 - 1s" ], + [ 1, 1, 29, "1 - 30s" ], + [ 2, 30, 1799, "30s - 30min" ], + [ 3, 1800, 3599, "30 - 60min" ], + [ 4, 3600, 17999, "1 - 5hr" ], + [ 5, 18000, 35999, "5 - 10hr" ], + [ 6, 36000, 64799, "10 - 18hr" ], + [ 7, 64800, 2147483647, "18+hr" ] +] diff --git a/configuration/etl/etl_data.d.samples/static/processor_buckets.json b/configuration/etl/etl_data.d.samples/static/processor_buckets.json new file mode 100644 index 0000000000..5a5702fc1a --- /dev/null +++ b/configuration/etl/etl_data.d.samples/static/processor_buckets.json @@ -0,0 +1,14 @@ +[ + ["id", "min_processors", "max_processors", "description"], + [1, 1, 1, "1"], + [2, 2, 4, "2 - 4"], + [3, 5, 8, "5 - 8"], + [4, 9, 64, "9 - 64"], + [5, 65, 256, "65 - 256"], + [6, 257, 512, "257 - 512"], + [7, 513, 1024, "513 - 1024"], + [8, 1025, 8192, "1k - 8k"], + [9, 8193, 32768, "8k - 32k"], + [10, 32769, 131072, "32k - 131k"], + [11, 131073, 2147483647, "> 131k"] +] diff --git a/configuration/etl/etl_data.d/cloud/asset_type.json b/configuration/etl/etl_data.d/cloud/asset_type.json new file mode 100644 index 0000000000..a6860d3b6e --- /dev/null +++ b/configuration/etl/etl_data.d/cloud/asset_type.json @@ -0,0 +1,8 @@ +[ + ["asset_type_id", "asset_type", "display", "description"], + [-1, "unknown", "Unknown", "Unknown" ], + [1, "vol-ebs", "EBS Volume", "EBS Volume" ], + [2, "vol-instance", "Instance Volume", "Instance Store Volume" ], + [3, "ip", "IP", "IP Address" ], + [4, "s3-bucket", "S3 Bucket", "S3 Bucket" ] +] diff --git a/configuration/etl/etl_data.d/cloud/avail_zone.json b/configuration/etl/etl_data.d/cloud/avail_zone.json new file mode 100644 index 0000000000..6eadf7b468 --- /dev/null +++ b/configuration/etl/etl_data.d/cloud/avail_zone.json @@ -0,0 +1,4 @@ +[ + ["avail_zone_id", "avail_zone", "display", "description"], + [-1, "unknown", "Unknown", "Unknown availability zone" ] +] diff --git a/configuration/etl/etl_data.d/cloud/event_type.json b/configuration/etl/etl_data.d/cloud/event_type.json new file mode 100644 index 0000000000..09bf119ac0 --- /dev/null +++ b/configuration/etl/etl_data.d/cloud/event_type.json @@ -0,0 +1,14 @@ +[ + ["event_type_id", "event_type", "display", "description"], + [-1, "unknown", "Unknown", "Unknown event type" ], + [1, "request-start", "Request Start", "Request to start instance" ], + [2, "start", "Start", "Instance started" ], + [3, "request-stop", "Request Stop", "Request to stop instance" ], + [4, "stop", "Stop", "Instance stopped" ], + [5, "request-terminate", "Request Terminate", "Request to terminate instance" ], + [6, "terminate", "Terminate", "Instance terminated" ], + [7, "request-vol-attach", "Request Attach Volume", "Request to attach storage volume" ], + [8, "vol-attach", "Volume Attach", "Storage volume attached" ], + [9, "request-vol-detach", "Request Detach Volume", "Request to detach storage volume" ], + [10, "request-ip-attach", "Request Attach IP", "Request to attach IP address" ] +] diff --git a/configuration/etl/etl_data.d/cloud/instance_type.json b/configuration/etl/etl_data.d/cloud/instance_type.json new file mode 100644 index 0000000000..14a7b96e2b --- /dev/null +++ b/configuration/etl/etl_data.d/cloud/instance_type.json @@ -0,0 +1,4 @@ +[ + ["instance_type_id", "instance_type", "display", "description"], + [-1, "unknown", "Unknown", "Unknown instance type" ] +] diff --git a/configuration/etl/etl_data.d/cloud/region.json b/configuration/etl/etl_data.d/cloud/region.json new file mode 100644 index 0000000000..38ce7be137 --- /dev/null +++ b/configuration/etl/etl_data.d/cloud/region.json @@ -0,0 +1,4 @@ +[ + ["region_id", "region", "display", "description"], + [-1, "unknown", "Unknown", "Unknown region" ] +] diff --git a/configuration/etl/etl_data.d/jobs/countable_type.json b/configuration/etl/etl_data.d/jobs/countable_type.json new file mode 100644 index 0000000000..490a3a6899 --- /dev/null +++ b/configuration/etl/etl_data.d/jobs/countable_type.json @@ -0,0 +1,8 @@ +[ + ["countable_type_id", "unit_id", "countable_type", "display", "description"], + [-1, null, "unknown", "Unknown", "Unknown" ], + [1, null, "gpu", "GPUs", "GPUs" ], + [2, null, "mic", "MICs", "MICs" ], + [3, null, "database", "Databases", "Databases" ], + [4, 2, "storage", "Storage", "Storage" ] +] diff --git a/configuration/etl/etl_data.d/jobs/job_record_type.json b/configuration/etl/etl_data.d/jobs/job_record_type.json new file mode 100644 index 0000000000..6b5e846cd8 --- /dev/null +++ b/configuration/etl/etl_data.d/jobs/job_record_type.json @@ -0,0 +1,9 @@ +[ + ["job_record_type_id", "job_record_type", "display", "description"], + [-1, "unknown", "Unknown", "Unknown" ], + [1, "hpc", "HPC", "HPC Job" ], + [2, "reservation", "Reservation", "HPC Reservation" ], + [3, "array", "Job Array", "HPC Job Array" ], + [4, "cloud", "Cloud", "Cloud Job" ] +] + diff --git a/configuration/etl/etl_data.d/jobs/job_task_type.json b/configuration/etl/etl_data.d/jobs/job_task_type.json new file mode 100644 index 0000000000..e328c83683 --- /dev/null +++ b/configuration/etl/etl_data.d/jobs/job_task_type.json @@ -0,0 +1,7 @@ +[ + ["job_task_type_id", "job_task_type", "display", "description"], + [-1, "unknown", "Unknown", "Unknown" ], + [1, "start", "Job Start", "Start of a job" ], + [2, "end", "Job End", "End of a job" ], + [3, "intermediate", "Intermediate", "Intermediate job reporting" ] +] diff --git a/configuration/etl/etl_data.d/jobs/submission_venue.json b/configuration/etl/etl_data.d/jobs/submission_venue.json new file mode 100644 index 0000000000..d373a4b53f --- /dev/null +++ b/configuration/etl/etl_data.d/jobs/submission_venue.json @@ -0,0 +1,7 @@ +[ + ["submission_venue_id", "submission_venue", "display", "description"], + [-1, "unknown", "Unknown", "Unknown" ], + [1, "cli", "CLI", "Command Line Interface" ], + [2, "gateway", "Gateway", "Science Gateway" ] +] + diff --git a/configuration/etl/etl_data.d/jobs/unit.json b/configuration/etl/etl_data.d/jobs/unit.json new file mode 100644 index 0000000000..d8f659065b --- /dev/null +++ b/configuration/etl/etl_data.d/jobs/unit.json @@ -0,0 +1,9 @@ +[ + ["unit_id", "unit", "display", "description"], + [-1, "unknown", "Unknown", "Unknown" ], + [1, "megabytes", "MB", "Megabytes" ], + [2, "gigabytes", "GB", "Gigabytes" ], + [3, "terabytes", "TB", "Terabytes" ], + [4, "petabytes", "PB", "Petabytes" ], + [5, "sus", "SUs", "Service Units" ] +] diff --git a/configuration/etl/etl_tables.d.samples/resource_allocations/historical_su_recommended.json b/configuration/etl/etl_tables.d.samples/resource_allocations/historical_su_recommended.json index cb58035a61..7172493cb4 100644 --- a/configuration/etl/etl_tables.d.samples/resource_allocations/historical_su_recommended.json +++ b/configuration/etl/etl_tables.d.samples/resource_allocations/historical_su_recommended.json @@ -1,15 +1,9 @@ { "#": "Include the resource allocaitons table definition", - "include": "etl_tables.d/resource_allocations/resource_allocations.json", - - "#": "Specify the source data file and metadata, in this case an array of JSON data.", - - "source_data": { - "type": "JsonArray", - "fields": [ "resource", "id", "recommended", "start_alloc_date" ], - "#": "It would be nice to be able to reference path variables like ${data_dir} here", - "data": "etl_data.d/resource_allocations/historical_su_recommended.json" + "#": "Include the resource allocaitons table definition", + "table_definition": { + "$ref": "etl_tables.d/resource_allocations/resource_allocations.json#/table_definition" }, "#": "Generate UPDATE table SET recommended = ? WHERE id = ? AND start_alloc_date = ?", diff --git a/configuration/etl/etl_tables.d.samples/static/job_times.json b/configuration/etl/etl_tables.d.samples/static/job_times.json index 48c88af1dc..d9e7b731e7 100644 --- a/configuration/etl/etl_tables.d.samples/static/job_times.json +++ b/configuration/etl/etl_tables.d.samples/static/job_times.json @@ -50,20 +50,5 @@ } ], "triggers": [] - }, - - "#": "Define the mapping between the source value fields and the table columns.", - "destination_columns": ["id", "min_duration", "max_duration", "description"], - - "#": "Provide the raw source values.", - "source_values": [ - [ 0, 0, 0, "0 - 1s" ], - [ 1, 1, 29, "1 - 30s" ], - [ 2, 30, 1799, "30s - 30min" ], - [ 3, 1800, 3599, "30 - 60min" ], - [ 4, 3600, 17999, "1 - 5hr" ], - [ 5, 18000, 35999, "5 - 10hr" ], - [ 6, 36000, 64799, "10 - 18hr" ], - [ 7, 64800, 2147483647, "18+hr" ] - ] + } } diff --git a/configuration/etl/etl_tables.d.samples/static/processor_buckets.json b/configuration/etl/etl_tables.d.samples/static/processor_buckets.json index eab9897f4c..7f4b10c016 100644 --- a/configuration/etl/etl_tables.d.samples/static/processor_buckets.json +++ b/configuration/etl/etl_tables.d.samples/static/processor_buckets.json @@ -50,24 +50,5 @@ } ], "triggers": [] - }, - - "#": "Define the mapping between the source value fields and the table columns.", - "destination_columns": ["id", "min_processors", "max_processors", "description"], - - "#": "Provide the raw source values.", - "source_values":[ - [1, 1, 1, "1"], - [2, 2, 4, "2 - 4"], - [3, 5, 8, "5 - 8"], - [4, 9, 64, "9 - 64"], - [5, 65, 256, "65 - 256"], - [6, 257, 512, "257 - 512"], - [7, 513, 1024, "513 - 1024"], - [8, 1025, 8192, "1k - 8k"], - [9, 8193, 32768, "8k - 32k"], - [10, 32769, 131072, "32k - 131k"], - [11, 131073, 2147483647, "> 131k"] - ] - + } } diff --git a/configuration/etl/etl_tables.d/cloud/asset_type.json b/configuration/etl/etl_tables.d/cloud/asset_type.json index 117d33547e..754fab4c4f 100644 --- a/configuration/etl/etl_tables.d/cloud/asset_type.json +++ b/configuration/etl/etl_tables.d/cloud/asset_type.json @@ -53,18 +53,5 @@ } ], "triggers": [] - }, - - "#": "Define the mapping between the source value fields and the table columns.", - "destination_columns": ["asset_type_id", "asset_type", "display", "description"], - - "#": "Provide the raw source values.", - "source_values": [ - [-1, "unknown", "Unknown", "Unknown" ], - [1, "vol-ebs", "EBS Volume", "EBS Volume" ], - [2, "vol-instance", "Instance Volume", "Instance Store Volume" ], - [3, "ip", "IP", "IP Address" ], - [4, "s3-bucket", "S3 Bucket", "S3 Bucket" ] - ] - + } } diff --git a/configuration/etl/etl_tables.d/cloud/avail_zone.json b/configuration/etl/etl_tables.d/cloud/avail_zone.json index 62063030cf..cebe38ba74 100644 --- a/configuration/etl/etl_tables.d/cloud/avail_zone.json +++ b/configuration/etl/etl_tables.d/cloud/avail_zone.json @@ -39,13 +39,5 @@ } ], "triggers": [] - }, - - "#": "Define the mapping between the source value fields and the table columns.", - "destination_columns": ["avail_zone_id", "avail_zone", "display", "description"], - - "#": "Provide the raw source values.", - "source_values": [ - [-1, "unknown", "Unknown", "Unknown availability zone" ] - ] + } } diff --git a/configuration/etl/etl_tables.d/cloud/event_type.json b/configuration/etl/etl_tables.d/cloud/event_type.json index 1a357dd7b9..686ba02039 100644 --- a/configuration/etl/etl_tables.d/cloud/event_type.json +++ b/configuration/etl/etl_tables.d/cloud/event_type.json @@ -39,23 +39,5 @@ } ], "triggers": [] - }, - - "#": "Define the mapping between the source value fields and the table columns.", - "destination_columns": ["event_type_id", "event_type", "display", "description"], - - "#": "Provide the raw source values.", - "source_values": [ - [-1, "unknown", "Unknown", "Unknown event type" ], - [1, "request-start", "Request Start", "Request to start instance" ], - [2, "start", "Start", "Instance started" ], - [3, "request-stop", "Request Stop", "Request to stop instance" ], - [4, "stop", "Stop", "Instance stopped" ], - [5, "request-terminate", "Request Terminate", "Request to terminate instance" ], - [6, "terminate", "Terminate", "Instance terminated" ], - [7, "request-vol-attach", "Request Attach Volume", "Request to attach storage volume" ], - [8, "vol-attach", "Volume Attach", "Storage volume attached" ], - [9, "request-vol-detach", "Request Detach Volume", "Request to detach storage volume" ], - [10, "request-ip-attach", "Request Attach IP", "Request to attach IP address" ] - ] + } } diff --git a/configuration/etl/etl_tables.d/cloud/instance_type.json b/configuration/etl/etl_tables.d/cloud/instance_type.json index eb8319a14d..0e6a03d51e 100644 --- a/configuration/etl/etl_tables.d/cloud/instance_type.json +++ b/configuration/etl/etl_tables.d/cloud/instance_type.json @@ -73,13 +73,5 @@ } ], "triggers": [] - }, - - "#": "Define the mapping between the source value fields and the table columns.", - "destination_columns": ["instance_type_id", "instance_type", "display", "description"], - - "#": "Provide the raw source values.", - "source_values": [ - [-1, "unknown", "Unknown", "Unknown instance type" ] - ] + } } diff --git a/configuration/etl/etl_tables.d/cloud/region.json b/configuration/etl/etl_tables.d/cloud/region.json index daa05c936d..c6a84489c8 100644 --- a/configuration/etl/etl_tables.d/cloud/region.json +++ b/configuration/etl/etl_tables.d/cloud/region.json @@ -39,13 +39,5 @@ } ], "triggers": [] - }, - - "#": "Define the mapping between the source value fields and the table columns.", - "destination_columns": ["region_id", "region", "display", "description"], - - "#": "Provide the raw source values.", - "source_values": [ - [-1, "unknown", "Unknown", "Unknown region" ] - ] + } } diff --git a/configuration/etl/etl_tables.d/jobs/countable_type.json b/configuration/etl/etl_tables.d/jobs/countable_type.json index 52355dd95d..88ca0e5896 100644 --- a/configuration/etl/etl_tables.d/jobs/countable_type.json +++ b/configuration/etl/etl_tables.d/jobs/countable_type.json @@ -45,17 +45,5 @@ } ], "triggers": [] - }, - - "#": "Define the mapping between the source value fields and the table columns.", - "destination_columns": ["countable_type_id", "unit_id", "countable_type", "display", "description"], - - "#": "Provide the raw source values.", - "source_values":[ - [-1, null, "unknown", "Unknown", "Unknown" ], - [1, null, "gpu", "GPUs", "GPUs" ], - [2, null, "mic", "MICs", "MICs" ], - [3, null, "database", "Databases", "Databases" ], - [4, 2, "storage", "Storage", "Storage" ] - ] + } } diff --git a/configuration/etl/etl_tables.d/jobs/job_record_type.json b/configuration/etl/etl_tables.d/jobs/job_record_type.json index b1ee4cdbb3..40a877a156 100644 --- a/configuration/etl/etl_tables.d/jobs/job_record_type.json +++ b/configuration/etl/etl_tables.d/jobs/job_record_type.json @@ -38,17 +38,5 @@ } ], "triggers": [] - }, - - "#": "Define the mapping between the source value fields and the table columns.", - "destination_columns": ["job_record_type_id", "job_record_type", "display", "description"], - - "#": "Provide the raw source values.", - "source_values": [ - [-1, "unknown", "Unknown", "Unknown" ], - [1, "hpc", "HPC", "HPC Job" ], - [2, "reservation", "Reservation", "HPC Reservation" ], - [3, "array", "Job Array", "HPC Job Array" ], - [4, "cloud", "Cloud", "Cloud Job" ] - ] + } } diff --git a/configuration/etl/etl_tables.d/jobs/job_task_type.json b/configuration/etl/etl_tables.d/jobs/job_task_type.json index 61b9f98c75..a3df54da12 100644 --- a/configuration/etl/etl_tables.d/jobs/job_task_type.json +++ b/configuration/etl/etl_tables.d/jobs/job_task_type.json @@ -53,16 +53,5 @@ } ], "triggers": [] - }, - - "#": "Define the mapping between the source value fields and the table columns.", - "destination_columns": ["job_task_type_id", "job_task_type", "display", "description"], - - "#": "Provide the raw source values.", - "source_values": [ - [-1, "unknown", "Unknown", "Unknown" ], - [1, "start", "Job Start", "Start of a job" ], - [2, "end", "Job End", "End of a job" ], - [3, "intermediate", "Intermediate", "Intermediate job reporting" ] - ] + } } diff --git a/configuration/etl/etl_tables.d/jobs/jobfact_hpc_aggregation.json- b/configuration/etl/etl_tables.d/jobs/jobfact_hpc_aggregation.json- new file mode 100644 index 0000000000..e7b1c55442 --- /dev/null +++ b/configuration/etl/etl_tables.d/jobs/jobfact_hpc_aggregation.json- @@ -0,0 +1,458 @@ +{ + "#": "Aggregation of HPC job records and tasks ingested from the XDCDB", + + "table_definition": { + "name": "jobfact_by_", + "engine": "MyISAM", + "comment": "Jobfacts aggregated by ${AGGREGATION_UNIT}.", + "columns": [ + { + "name": "${AGGREGATION_UNIT}_id", + "type": "int(10) unsigned", + "nullable": false, + "comment": "DIMENSION: The id related to modw.${AGGREGATION_UNIT}s." + },{ + "name": "year", + "type": "smallint(5) unsigned", + "nullable": false, + "comment": "DIMENSION: The year of the ${AGGREGATION_UNIT}" + },{ + "name": "${AGGREGATION_UNIT}", + "type": "smallint(5) unsigned", + "nullable": false, + "comment": "DIMENSION: The ${AGGREGATION_UNIT} of the year." + },{ + "name": "record_resource_id", + "type": "int(11)", + "nullable": false, + "comment": "DIMENSION: The resource on which the request was made" + },{ + "name": "resource_id", + "type": "int(11)", + "nullable": false, + "comment": "DIMENSION: The resource on which the jobs ran" + },{ + "name": "organization_id", + "type": "int(11)", + "nullable": false, + "comment": "DIMENSION: The organization of the resource that the jobs ran on." + },{ + "name": "resourcetype_id", + "type": "int(11)", + "nullable": false, + "comment": "DIMENSION: The type of the resource on which the jobs ran. References resourcetype.id" + },{ + "name": "systemaccount_id", + "type": "int(11)", + "nullable": false, + "comment": "DIMENSION: The id of the resource system account (local username) under which the job ran. References systemaccount.id" + },{ + "name": "submission_venue_id", + "type": "int(11)", + "nullable": false, + "comment": "DIMENSION: The method used to submit this job: cli, gateway, ..." + },{ + "name": "job_record_type_id", + "type": "int(11)", + "nullable": false, + "comment": "DIMENSION: Task type or event." + },{ + "name": "job_task_type_id", + "type": "int(11)", + "nullable": false, + "comment": "DIMENSION: Type of job: hpc, cloud, hpc-reservation, ...." + },{ + "name": "queue_id", + "type": "char(50)", + "nullable": false, + "comment": "DIMENSION: The queue of the resource on which the jobs ran." + },{ + "name": "allocation_id", + "type": "int(11)", + "nullable": false, + "comment": "DIMENSION: The id of allocation these jobs used to run" + },{ + "name": "account_id", + "type": "int(11)", + "nullable": false, + "comment": "DIMENSION: The id of the account record from which one can get charge number" + },{ + "name": "requesting_person_id", + "type": "int(11)", + "nullable": false, + "comment": "DIMENSION: The id of the person that requested the resources (e.g., made the reservation or submitted the job." + },{ + "name": "person_id", + "type": "int(11)", + "nullable": false, + "comment": "DIMENSION: The id of the person that ran the jobs." + },{ + "name": "person_organization_id", + "type": "int(11)", + "nullable": false, + "comment": "DIMENSION: The organization of the person that ran the jobs." + },{ + "name": "person_nsfstatuscode_id", + "type": "int(11)", + "nullable": false, + "comment": "DIMENSION: The NSF status code of the person that ran the jobs. References person.nsfstatuscode_id" + },{ + "name": "fos_id", + "type": "int(11)", + "nullable": false, + "comment": "DIMENSION: The field of science of the project to which the jobs belong." + },{ + "name": "principalinvestigator_person_id", + "type": "int(11)", + "nullable": false, + "comment": "DIMENSION: The PI that owns the allocations that these jobs ran under. References principalinvestigator.person_id" + },{ + "name": "piperson_organization_id", + "type": "int(11)", + "nullable": false, + "comment": "DIMENSION: The organization of the PI that owns the project that funds these jobs. References piperson.organization_id" + },{ + "name": "jobtime_id", + "type": "int(4)", + "nullable": false, + "comment": "DIMENSION: Job time is bucketing of wall time based on prechosen intervals in the modw.job_times table." + },{ + "name": "nodecount_id", + "type": "int(11)", + "nullable": false, + "comment": "DIMENSION: Number of nodes each of the jobs used." + },{ + "name": "processors", + "type": "int(11)", + "nullable": false, + "comment": "DIMENSION: Number of processors each of the jobs used." + },{ + "name": "processorbucket_id", + "type": "int(4)", + "nullable": true, + "comment": "FACT: Pre-determined processor bucket sizes. References processorbucket.id" + },{ + "name": "submitted_job_count", + "type": "int(11)", + "nullable": true, + "comment": "FACT: The number of jobs that started during this day." + },{ + "name": "job_count", + "type": "int(11)", + "nullable": true, + "comment": "FACT: The number of jobs that ended during this day." + },{ + "name": "started_job_count", + "type": "int(11)", + "nullable": true, + "comment": "FACT: The number of jobs that started during this day." + },{ + "name": "running_job_count", + "type": "int(11)", + "nullable": true, + "comment": "FACT: The number of jobs that were running during this day." + },{ + "name": "wallduration", + "type": "decimal(18,0)", + "nullable": true, + "comment": "FACT: (seconds) The wallduration of the jobs that were running during this period. This will only count the walltime of the jobs that fell during this day. If a job started in the previous day(s) the wall time for that day will be added to that day. Same logic is true if a job ends not in this day, but upcoming days." + },{ + "name": "sum_wallduration_squared", + "type": "decimal(36,4)", + "nullable": true, + "comment": "FACT: (seconds) The sum of the square of wallduration of the jobs that were running during this period. This will only count the walltime of the jobs that fell during this day. If a job started in the previous day(s) the wall time for that day will be added to that day. Same logic is true if a job ends not in this day, but upcoming days." + },{ + "name": "waitduration", + "type": "decimal(18,0)", + "nullable": true, + "default": null, + "comment": "FACT: (seconds) The amount of time jobs waited to execute during this day." + },{ + "name": "sum_waitduration_squared", + "type": "decimal(36,4)", + "nullable": true, + "default": null, + "comment": "FACT: (seconds) The sum of the square of the amount of time jobs waited to execute during this day." + },{ + "name": "local_charge_su", + "type": "decimal(18,0)", + "nullable": true, + "comment": "FACT: The amount of local SUs charged to jobs pertaining to this day. If a job took more than one day, its local_charge is distributed linearly across the days it used." + },{ + "name": "local_charge", + "type": "decimal(18,0)", + "nullable": true, + "comment": "FACT: The amount of XDSUs charged to jobs pertaining to this day. If a job took more than one day, its local_charge is distributed linearly across the days it used." + },{ + "name": "local_charge_nu", + "type": "decimal(18,0)", + "nullable": true, + "comment": "FACT: The amount of NUs charged to jobs pertaining to this day. If a job took more than one day, its local_charge is distributed linearly across the days it used." + },{ + "name": "adjusted_charge_su", + "type": "decimal(18,0)", + "nullable": true, + "comment": "FACT: The amount of local SUs charged to jobs pertaining to this day. If a job took more than one day, its local_charge is distributed linearly across the days it used." + },{ + "name": "adjusted_charge", + "type": "decimal(18,0)", + "nullable": true, + "comment": "FACT: The amount of XDSUs charged to jobs pertaining to this day. If a job took more than one day, its local_charge is distributed linearly across the days it used." + },{ + "name": "adjusted_charge_nu", + "type": "decimal(18,0)", + "nullable": true, + "comment": "FACT: The amount of NUs charged to jobs pertaining to this day. If a job took more than one day, its local_charge is distributed linearly across the days it used." + },{ + "name": "sum_local_charge_squared", + "type": "decimal(36,4)", + "nullable": true, + "comment": "FACT: The sum of the square of local_charge of jobs pertaining to this day. If a job took more than one day, its local_charge is distributed linearly across the days it used." + },{ + "name": "cpu_time", + "type": "decimal(18,0)", + "nullable": true, + "comment": "FACT: (seconds) The amount of the cpu time (processor_count * wallduration) of the jobs pertaining to this day. If a job took more than one day, its cpu_time is distributed linearly across the days it used." + },{ + "name": "sum_cpu_time_squared", + "type": "decimal(36,4)", + "nullable": true, + "comment": "FACT: (seconds) The sum of the square of the amount of the cpu_time of the jobs pertaining to this day. If a job took more than one day, its cpu_time is distributed linearly across the days it used." + },{ + "name": "node_time", + "type": "decimal(18,0)", + "nullable": true, + "comment": "FACT: (seconds) The amount of the node time (nodes * wallduration) of the jobs pertaining to this day. If a job took more than one day, its node_time is distributed linearly across the days it used." + },{ + "name": "sum_node_time_squared", + "type": "decimal(36,4)", + "nullable": true, + "comment": "FACT: (seconds) The sum of the square of the amount of the node_time of the jobs pertaining to this day. If a job took more than one day, its node_time is distributed linearly across the days it used." + },{ + "name": "sum_weighted_expansion_factor", + "type": "decimal(18,0)", + "nullable": true, + "comment": "FACT: This is the sum of expansion factor per job multiplied by node_count and the [adjusted] duration of jobs that ran in this days." + },{ + "name": "sum_job_weights", + "type": "decimal(18,0)", + "nullable": true, + "comment": "FACT: this is the sum of (node_count multipled by the [adjusted] duration) for jobs that ran in this days." + } + ], + + "indexes": [ + { + "name": "index_account", + "columns": [ "account_id" ] + },{ + "name": "index_allocation", + "columns": [ "allocation_id" ] + },{ + "name": "index_fos", + "columns": [ "fos_id" ] + },{ + "name": "index_job_time_bucket_id", + "columns": [ "jobtime_id" ] + },{ + "name": "index_node_count", + "columns": [ "nodecount_id" ] + },{ + "name": "index_resource_organization", + "columns": [ "organization_id" ] + },{ + "name": "index_person", + "columns": [ "person_id" ] + },{ + "name": "index_person_nsf_status_code", + "columns": [ "person_nsfstatuscode_id" ] + },{ + "name": "index_person_organization", + "columns": [ "person_organization_id" ] + },{ + "name": "index_pi_organization", + "columns": [ "piperson_organization_id" ] + },{ + "name": "index_pi_person", + "columns": [ "principalinvestigator_person_id" ] + },{ + "name": "index_processor_count", + "columns": [ "processors" ] + },{ + "name": "index_queue", + "columns": [ "queue_id" ] + },{ + "name": "index_resource_type", + "columns": [ "resourcetype_id" ] + },{ + "name": "index_resource", + "columns": [ "record_resource_id" ] + },{ + "name": "index_system_account", + "columns": [ "systemaccount_id" ] + },{ + "name": "index_period_value", + "columns": [ "${AGGREGATION_UNIT}" ] + },{ + "name": "index_period", + "columns": [ "${AGGREGATION_UNIT}_id" ] + }] + }, + + "#": "The aggregation period query determines which periods need to be aggregated based on added or modified", + "#": "records. The overseer_restrictions block specifies the criteria for selecting periods requiring", + "#": "aggregation. If this clause is not specified or no restrictions match then all records will be", + "#": "considered. The first table specified in source_query.joins will be used to determine periods that", + "#": "need aggregation.", + + "aggregation_period_query": { + "overseer_restrictions": { + "last_modified_start_date": "last_modified >= ${VALUE}", + "last_modified_end_date": "last_modified <= ${VALUE}", + "include_only_resource_codes": "resource_id IN ${VALUE}", + "exclude_resource_codes": "resource_id NOT IN ${VALUE}" + } + }, + + "#": "The destination query block allows us to specify overseer restrictions that apply to operations on", + "#": "the destination table (e.g., deleting records from the table during aggregation). If no restrictions", + "#": "are specified then the entire aggregation period will be deleted. Note that if there is a restriction", + "#": "on the source_query block it is possible to delete an aggregation period from the destination table", + "#": "with no restictions and replace it with aggregated data that has been restricted.", + + "destination_query": { + "overseer_restrictions": { + "include_only_resource_codes": "record_resource_id IN ${VALUE}", + "exclude_resource_codes": "record_resource_id NOT IN ${VALUE}" + } + }, + + "source_query": { + + "overseer_restrictions": { + "include_only_resource_codes": "record.resource_id IN ${VALUE}", + "exclude_resource_codes": "record.resource_id NOT IN ${VALUE}" + }, + + "query_hint": "SQL_NO_CACHE", + "records": { + "${AGGREGATION_UNIT}_id": "${:PERIOD_ID}", + "year": "${:YEAR_VALUE}", + "${AGGREGATION_UNIT}": "${:PERIOD_VALUE}", + "record_resource_id": "record.resource_id", + "resource_id": "task.resource_id", + "organization_id": "requested_resource.organization_id", + "resourcetype_id": "requested_resource.resourcetype_id", + "systemaccount_id": "task.systemaccount_id", + "submission_venue_id": "record.submission_venue_id", + "job_record_type_id": "record.job_record_type_id", + "job_task_type_id": "task.job_task_type_id", + "queue_id": "record.queue", + "allocation_id": "record.allocation_id", + "account_id": "record.account_id", + "requesting_person_id": "record.person_id", + "person_id": "task.person_id", + "person_organization_id": "task.person_organization_id", + "person_nsfstatuscode_id": "task.person_nsfstatuscode_id", + "fos_id": "record.fos_id", + "principalinvestigator_person_id": "record.principalinvestigator_person_id", + "piperson_organization_id": "COALESCE(record.piperson_organization_id, 0)", + "jobtime_id": "(SELECT id FROM ${UTILITY_SCHEMA}.job_times jt WHERE task.wallduration >= jt.min_duration AND task.wallduration <= jt.max_duration)", + "nodecount_id": "task.node_count", + "processors": "task.processor_count", + "processorbucket_id": "(SELECT id FROM ${UTILITY_SCHEMA}.processor_buckets pb WHERE task.processor_count BETWEEN pb.min_processors AND pb.max_processors)", + "submitted_job_count": "SUM( IF(task.submit_time_ts BETWEEN ${:PERIOD_START_TS} AND ${:PERIOD_END_TS}, 1, 0) )", + "job_count": "SUM( IF(task.end_day_id BETWEEN ${:PERIOD_START_DAY_ID} AND ${:PERIOD_END_DAY_ID}, 1, 0) )", + "started_job_count": "SUM( IF(task.start_day_id BETWEEN ${:PERIOD_START_DAY_ID} AND ${:PERIOD_END_DAY_ID}, 1, 0) )", + "running_job_count": "SUM(1)", + "wallduration": "COALESCE(SUM( ${wallduration_case_statement}), 0)", + "sum_wallduration_squared": "COALESCE(SUM( CAST(POW(${wallduration_case_statement}, 2) AS DECIMAL(36,4)) ), 0)", + "waitduration": "SUM( IF(task.start_day_id BETWEEN ${:PERIOD_START_DAY_ID} AND ${:PERIOD_END_DAY_ID}, task.waitduration, NULL) )", + "sum_waitduration_squared": "SUM( CAST(IF(task.start_day_id BETWEEN ${:PERIOD_START_DAY_ID} AND ${:PERIOD_END_DAY_ID}, pow(task.waitduration, 2), NULL) AS DECIMAL(36,4)) )", + "local_charge": "SUM(${local_charge_xdsu_case_statement})", + "sum_local_charge_squared": "SUM( CAST(POW(${local_charge_xdsu_case_statement}, 2) AS DECIMAL(36,4)) )", + "cpu_time": "COALESCE(SUM(task.processor_count * ${wallduration_case_statement}), 0)", + "sum_cpu_time_squared": "COALESCE(SUM( CAST(POW(task.processor_count * ${wallduration_case_statement}, 2) AS DECIMAL(36,4)) ), 0)", + "node_time": "COALESCE(SUM(task.node_count * ${wallduration_case_statement}), 0)", + "sum_node_time_squared": "COALESCE(SUM( CAST(POW(task.node_count * ${wallduration_case_statement}, 2) AS DECIMAL(36,4)) ), 0)", + "sum_weighted_expansion_factor": "SUM( ((task.wallduration + task.waitduration) / task.wallduration) * task.node_count * COALESCE(${wallduration_case_statement}, 0))", + "sum_job_weights": "SUM(task.node_count * COALESCE(${wallduration_case_statement}, 0))" + }, + + "groupby": [ + "${AGGREGATION_UNIT}_id", + "year", + "${AGGREGATION_UNIT}", + "piperson_organization_id", + "jobtime_id", + "nodecount_id", + "processors", + "person_id", + "organization_id", + "person_organization_id", + "person_nsfstatuscode_id", + "resource_id", + "resourcetype_id", + "queue_id", + "fos_id", + "account_id", + "systemaccount_id", + "allocation_id", + "principalinvestigator_person_id" + ], + + "joins": [ + { + "name": "job_tasks", + "schema": "${SOURCE_SCHEMA}", + "alias": "task" + },{ + "name": "job_records", + "schema": "${SOURCE_SCHEMA}", + "alias": "record", + "on": "record.job_record_id = task.job_record_id", + "type": "STRAIGHT" + },{ + "name": "resourcefact", + "schema": "${UTILITY_SCHEMA}", + "alias": "requested_resource", + "on": "requested_resource.id = record.resource_id" + },{ + "name": "resourcefact", + "schema": "${UTILITY_SCHEMA}", + "alias": "task_resource", + "on": "task_resource.id = task.resource_id" + } + ], + + "where": [ + "task.start_day_id <= ${:PERIOD_END_DAY_ID} AND task.end_day_id >= ${:PERIOD_START_DAY_ID}", + "task.is_deleted = 0" + ], + + "macros": [{ + "name": "wallduration_case_statement", + "file": "statistic_ratio_case.sql", + "args": { + "statistic": "task.wallduration", + "max": "${:PERIOD_SECONDS}", + "src_start_ts": "task.start_time_ts", + "src_end_ts": "task.end_time_ts", + "dest_start_ts": "${:PERIOD_START_TS}", + "dest_end_ts": "${:PERIOD_END_TS}" + } + },{ + "name": "local_charge_xdsu_case_statement", + "file": "statistic_ratio_as_datatype_case.sql", + "args": { + "data_type": "DECIMAL(18,3)", + "statistic": "task.local_charge_xdsu", + "max": "${:PERIOD_SECONDS}", + "src_start_ts": "task.start_time_ts", + "src_end_ts": "task.end_time_ts", + "dest_start_ts": "${:PERIOD_START_TS}", + "dest_end_ts": "${:PERIOD_END_TS}" + } + }] + } +} diff --git a/configuration/etl/etl_tables.d/jobs/submission_venue.json b/configuration/etl/etl_tables.d/jobs/submission_venue.json index b6da13c303..719805822d 100644 --- a/configuration/etl/etl_tables.d/jobs/submission_venue.json +++ b/configuration/etl/etl_tables.d/jobs/submission_venue.json @@ -38,15 +38,5 @@ } ], "triggers": [] - }, - - "#": "Define the mapping between the source value fields and the table columns.", - "destination_columns": ["submission_venue_id", "submission_venue", "display", "description"], - - "#": "Provide the raw source values.", - "source_values": [ - [-1, "unknown", "Unknown", "Unknown" ], - [1, "cli", "CLI", "Command Line Interface" ], - [2, "gateway", "Gateway", "Science Gateway" ] - ] + } } diff --git a/configuration/etl/etl_tables.d/jobs/unit.json b/configuration/etl/etl_tables.d/jobs/unit.json index 15ebf843d9..4412df980f 100644 --- a/configuration/etl/etl_tables.d/jobs/unit.json +++ b/configuration/etl/etl_tables.d/jobs/unit.json @@ -38,18 +38,5 @@ } ], "triggers": [] - }, - - "#": "Define the mapping between the source value fields and the table columns.", - "destination_columns": ["unit_id", "unit", "display", "description"], - - "#": "Provide the raw source values.", - "source_values":[ - [-1, "unknown", "Unknown", "Unknown" ], - [1, "megabytes", "MB", "Megabytes" ], - [2, "gigabytes", "GB", "Gigabytes" ], - [3, "terabytes", "TB", "Terabytes" ], - [4, "petabytes", "PB", "Petabytes" ], - [5, "sus", "SUs", "Service Units" ] - ] + } } diff --git a/libraries/utilities.php b/libraries/utilities.php index f49f4258f9..7eca3d9035 100644 --- a/libraries/utilities.php +++ b/libraries/utilities.php @@ -631,8 +631,12 @@ function resolve_path($path) * @return TRUE if the object contains all of the required properties, FALSE otherwise. */ -function verify_required_object_properties(\stdClass $obj, array $propertyList, array &$missing = null) +function verify_required_object_properties($obj, array $propertyList, array &$missing = null) { + if ( ! is_object($obj) ) { + throw new Exception(sprintf("First argument must be an object, %s given", gettype($obj))); + } + $missing = array(); foreach ( $propertyList as $p ) { @@ -664,11 +668,15 @@ function verify_required_object_properties(\stdClass $obj, array $propertyList, */ function verify_object_property_types( - \stdClass $obj, + $obj, array $propertyList, array &$messages = null, $skipMissingProperties = false ) { + if ( ! is_object($obj) ) { + throw new Exception(sprintf("First argument must be an object, %s given", gettype($obj))); + } + $messages = array(); foreach ( $propertyList as $p => $type ) { diff --git a/open_xdmod/modules/xdmod/tests/lib/ETL/DataEndpoint/StructuredFileTest.php b/open_xdmod/modules/xdmod/tests/lib/ETL/DataEndpoint/StructuredFileTest.php index f32171e60a..d2301fd5d4 100644 --- a/open_xdmod/modules/xdmod/tests/lib/ETL/DataEndpoint/StructuredFileTest.php +++ b/open_xdmod/modules/xdmod/tests/lib/ETL/DataEndpoint/StructuredFileTest.php @@ -1,9 +1,34 @@ - * @date 2017-05-15 + * @date 2017-06-29 * ------------------------------------------------------------------------------------------ */ @@ -34,7 +59,7 @@ public function __construct() } // __construct() /** - * Test parsing a simple JSON file containing an array of objects. + * Test #1: Parsing a simple JSON file containing an array of objects. */ public function testParseJsonFileArray() @@ -110,8 +135,8 @@ public function testParseJsonFileArray() } // testParseJsonFileArray() /** - * Test parsing a simple JSON file containing multiple objects, each on a single line, - * separated by a newline. + * Test #2: Parsing a simple JSON file containing multiple objects, each on a single + * line, separated by a newline. */ public function testParseJsonFileRecords() @@ -198,7 +223,7 @@ public function testParseJsonFileRecords() } /** - * Test error reporting when config is not valid. + * Test #3: Error reporting when config is not valid. * * @expectedException Exception */ @@ -225,7 +250,7 @@ public function testInvalidFilterConfig() } // testInvalidFilterConfig() /** - * Test error reporting when a filter type is not provided. + * Test #4: Error reporting when a filter type is not provided. * * @expectedException Exception */ @@ -256,7 +281,7 @@ public function testMissingFilterType() } // testMissingFilterType() /** - * Test filter syntax error. + * Test #5: Filter syntax error. * * @expectedException Exception */ @@ -286,7 +311,7 @@ public function testFilterSyntaxError() } // testFilterSyntaxError() /** - * Test unknown filter executable. + * Test #6: Unknown filter executable. * * @expectedException Exception */ @@ -315,7 +340,7 @@ public function testInvalidFilter() } // testInvalidFilter() /** - * Test parsing of an empty file. + * Test #7: Parsing of an empty file. */ public function testEmptyFile() @@ -343,8 +368,8 @@ public function testEmptyFile() } // testEmptyFile() /** - * Test parsing a simple JSON file containing an array of objects filtered through an - * external process. + * Test #8: Parsing a simple JSON file containing an array of objects filtered through + * an external process. */ public function testParseJsonFileFilteredArray() @@ -379,8 +404,8 @@ public function testParseJsonFileFilteredArray() } // testParseJsonFileFilteredArray() /** - * Test parsing a simple JSON file containing multiple records separated by a newline - * and filtered through an external process. + * Test #9: Parsing a simple JSON file containing multiple records separated by a + * newline and filtered through an external process. */ public function testParseJsonFileFilteredRecords() @@ -432,7 +457,7 @@ public function testParseJsonFileFilteredRecords() } // testParseJsonFileFilteredRecords() /** - * Test successful JSON schema validation. + * Test #10: Successful JSON schema validation. */ public function testSchemaValidationSuccess() @@ -472,7 +497,7 @@ public function testSchemaValidationSuccess() } // testSchemaValidationSuccess() /** - * Test failed JSON schema validation. + * Test #11: Failed JSON schema validation. * * @expectedException Exception */ @@ -493,4 +518,265 @@ public function testSchemaValidationFailure() $generated = $file->parse(); } // testSchemaValidationFailure() + + /** + * Test #13: Parse JSON array of objects, subset of field names specified. + */ + + public function testParseJsonArrayOfObjectsWithFieldNameSubset() + { + $expected = array( + (object) array( + 'first_name' => 'Helen', + 'last_name' => 'Green' + ), + (object) array( + 'first_name' => 'Dorothy', + 'last_name' => 'Green' + ), + (object) array( + 'first_name' => 'Mario', + 'last_name' => 'Johnson' + ) + ); + + $path = self::TEST_ARTIFACT_INPUT_PATH . '/xdmod_va_users.json'; + $config = array( + 'name' => 'xdmod_va_users.json', + 'path' => $path, + 'type' => 'jsonfile', + // Only return these fields + 'field_names' => array('first_name', 'last_name') + ); + + $options = new DataEndpointOptions($config); + $file = DataEndpoint::factory($options, $this->logger); + $file->verify(); + $file->parse(); + + foreach ($file as $index => $record) { + $this->assertEquals($expected[$index], $record); + } + } // testParseJsonArrayOfObjectsWithFieldNameSubset() + + /** + * Test #14: Parse JSON array of objects, extra field names specified (expect null + * values). + */ + + public function testParseJsonArrayOfObjectsWithExtraFieldName() + { + $expected = array( + (object) array( + 'first_name' => 'Helen', + 'last_name' => 'Green', + 'extra' => null + ), + (object) array( + 'first_name' => 'Dorothy', + 'last_name' => 'Green', + 'extra' => null + ), + (object) array( + 'first_name' => 'Mario', + 'last_name' => 'Johnson', + 'extra' => null + ) + ); + + $path = self::TEST_ARTIFACT_INPUT_PATH . '/xdmod_va_users.json'; + $config = array( + 'name' => 'xdmod_va_users.json', + 'path' => $path, + 'type' => 'jsonfile', + // Only return these fields + 'field_names' => array('first_name', 'last_name', 'extra') + ); + + $options = new DataEndpointOptions($config); + $file = DataEndpoint::factory($options, $this->logger); + $file->verify(); + $file->parse(); + + foreach ($file as $index => $record) { + $this->assertEquals($expected[$index], $record); + } + } // testParseJsonArrayOfObjectsWithExtraFieldName() + + /** + * Test #15: Parse JSON 2d array, no header row, no field names (excpect Exception). + * + * @expectedException Exception + */ + + public function testParseJsonArrayNoHeaderNoFieldNames() + { + $path = self::TEST_ARTIFACT_INPUT_PATH . '/event_types_no_header.json'; + $config = array( + 'name' => 'event_types_no_header.json', + 'path' => $path, + 'type' => 'jsonfile', + 'header_record' => false + ); + + $options = new DataEndpointOptions($config); + $file = DataEndpoint::factory($options, $this->logger); + $file->verify(); + $file->parse(); + + } // testParseJsonArrayNoHeaderNoFieldNames() + + /** + * Test #16: Parse JSON 2d array, no header row, with field names. + */ + + public function testParseJsonArrayNoHeaderWithFieldNames() + { + $expected = array( + array( + 'field1' => -1, + 'field2' => 'unknown', + 'field3' => 'Unknown', + 'field4' => 'Unknown event type' + ), + array( + 'field1' => 1, + 'field2' => 'request-start', + 'field3' => 'Request Start', + 'field4' => 'Request to start instance' + ) + ); + + $path = self::TEST_ARTIFACT_INPUT_PATH . '/event_types_no_header.json'; + $config = array( + 'name' => 'event_types_no_header.json', + 'path' => $path, + 'type' => 'jsonfile', + 'header_record' => false, + 'field_names' => array('field1', 'field2', 'field3', 'field4') + ); + + $options = new DataEndpointOptions($config); + $file = DataEndpoint::factory($options, $this->logger); + $file->verify(); + $file->parse(); + + foreach ($file as $index => $record) { + $this->assertEquals($expected[$index], $record); + } + } // testParseJsonArrayNoHeaderWithFieldNames() + + /** + * Test #17: Parse JSON 2d array, with header row. + */ + + public function testParseJsonArrayWithHeader() + { + $expected = array( + array( + 'event_type_id' => -1, + 'event_type' => 'unknown', + 'display' => 'Unknown', + 'description' => 'Unknown event type' + ), + array( + 'event_type_id' => 1, + 'event_type' => 'request-start', + 'display' => 'Request Start', + 'description' => 'Request to start instance' + ) + ); + + $path = self::TEST_ARTIFACT_INPUT_PATH . '/event_types_with_header.json'; + $config = array( + 'name' => 'event_types_with_header.json', + 'path' => $path, + 'type' => 'jsonfile', + 'header_record' => true + ); + + $options = new DataEndpointOptions($config); + $file = DataEndpoint::factory($options, $this->logger); + $file->verify(); + $file->parse(); + + foreach ($file as $index => $record) { + $this->assertEquals($expected[$index], $record); + } + } // testParseJsonArrayWithHeader() + + /** + * Test #18: Parse JSON 2d array, with header row and field names subset. + */ + + public function testParseJsonArrayWithHeaderAndFieldNames() + { + $expected = array( + array( + 'event_type_id' => -1, + 'display' => 'Unknown' + ), + array( + 'event_type_id' => 1, + 'display' => 'Request Start' + ) + ); + + $path = self::TEST_ARTIFACT_INPUT_PATH . '/event_types_with_header.json'; + $config = array( + 'name' => 'event_types_with_header.json', + 'path' => $path, + 'type' => 'jsonfile', + 'header_record' => true, + 'field_names' => array('event_type_id', 'display') + ); + + $options = new DataEndpointOptions($config); + $file = DataEndpoint::factory($options, $this->logger); + $file->verify(); + $file->parse(); + + foreach ($file as $index => $record) { + $this->assertEquals($expected[$index], $record); + } + } // testParseJsonArrayWithHeaderAndFieldNames() + + /** + * Test #19: Parse JSON 2d array, with header row, subset of field names specified + * with extra field (expect null values). + */ + + public function testParseJsonArrayWithHeaderAndExtraFieldNames() + { + $expected = array( + array( + 'event_type_id' => -1, + 'display' => 'Unknown', + 'extra' => null + ), + array( + 'event_type_id' => 1, + 'display' => 'Request Start', + 'extra' => null + ) + ); + + $path = self::TEST_ARTIFACT_INPUT_PATH . '/event_types_with_header.json'; + $config = array( + 'name' => 'event_types_with_header.json', + 'path' => $path, + 'type' => 'jsonfile', + 'header_record' => true, + 'field_names' => array('event_type_id', 'display', 'extra') + ); + + $options = new DataEndpointOptions($config); + $file = DataEndpoint::factory($options, $this->logger); + $file->verify(); + $file->parse(); + + foreach ($file as $index => $record) { + $this->assertEquals($expected[$index], $record); + } + } // testParseJsonArrayWithHeaderAndExtraFieldNames() } // class StructuredFileTest diff --git a/tools/dev/verify_table_data.php b/tools/dev/verify_table_data.php index 2aaebc10bf..2e0dff4a3a 100755 --- a/tools/dev/verify_table_data.php +++ b/tools/dev/verify_table_data.php @@ -453,10 +453,14 @@ function ($c) { function getTableRows($table, $schema) { - global $dbh, $logger; + global $dbh, $logger, $scriptOptions; $tableName = "`$schema`.`$table`"; - $sql = "SELECT COUNT(*) AS table_rows FROM $tableName"; + $sql = "SELECT COUNT(*) AS table_rows FROM $tableName src"; + + if ( 0 != count($scriptOptions['wheres']) ) { + $sql .= ' WHERE ' . implode(' AND ', $scriptOptions['wheres']); + } try { $stmt = $dbh->prepare($sql); diff --git a/tools/etl/etl_overseer.php b/tools/etl/etl_overseer.php index b82430808d..f29b37242f 100755 --- a/tools/etl/etl_overseer.php +++ b/tools/etl/etl_overseer.php @@ -393,7 +393,7 @@ $scriptOptions['config-file'], $scriptOptions['base-dir'], $logger, - $scriptOptions['option-overrides'] + array('option_overrides' => $scriptOptions['option-overrides']) ); $etlConfig->setLogger($logger); $etlConfig->initialize();