From 41dc5eb3b7e0c609c16dd7a7fb6402ba4c546677 Mon Sep 17 00:00:00 2001 From: Steven Gallo Date: Mon, 20 May 2019 12:05:59 -0400 Subject: [PATCH 1/2] Do not assign first record if parse failed --- classes/ETL/DataEndpoint/DirectoryScanner.php | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/classes/ETL/DataEndpoint/DirectoryScanner.php b/classes/ETL/DataEndpoint/DirectoryScanner.php index bebd054937..a4f023fc32 100644 --- a/classes/ETL/DataEndpoint/DirectoryScanner.php +++ b/classes/ETL/DataEndpoint/DirectoryScanner.php @@ -1059,7 +1059,15 @@ public function valid() $this->firstFileIterator = $this->currentFileIterator; $this->firstFilename = $this->currentFilename; - } elseif ( ! $this->currentFileIterator->valid() ) { + } + + // Verify that the current iterator is valid. It is possible that the file does not contain + // any records. + + if ( ! $this->currentFileIterator->valid() ) { + + // Does this need to be it's own separate clause to catch cases wherethe current file + // iterator is not valid?!? Yes. This is the way of the future. // If there are no records available in the current file we will need to move on to the // next file. Since a file could be empty, move on to the next file if we initialize a @@ -1136,11 +1144,12 @@ private function initializeCurrentFileIterator($filename) } $fileHandler->verify(); + // Must call parse() for fileHandler->valid() to work proplerly. $record = $fileHandler->parse(); // Save the first record parsed from the first file so we can return it from parse() - if ( null === $this->firstRecord ) { + if ( null === $this->firstRecord && false !== $record ) { $this->firstRecord = $record; } From 1ea2e62e187995563bcf6345d8fda75e4844be5d Mon Sep 17 00:00:00 2001 From: Steven Gallo Date: Mon, 20 May 2019 12:32:12 -0400 Subject: [PATCH 2/2] Add test for structured file ingestor and directory scanner --- .../ETL/Ingestor/StructuredFileIngestor.php | 2 +- .../input/etl_8.0.0.d/jobs_cloud.json | 205 +++++++++++------- .../raw_generic_cloud_job_logs.json | 42 ++++ .../cloud_generic/raw_event.json | 97 +++++++++ .../cloud_generic/raw_instance_type.json | 73 +++++++ .../input/generic_cloud_logs/euca_acct1.json | 2 + .../generic_cloud_logs/euca_acct_empty.json | 0 .../euca_acct_empty_array.json | 1 + tests/component/lib/ETL/IngestorTest.php | 72 ++++-- 9 files changed, 391 insertions(+), 103 deletions(-) create mode 100644 tests/artifacts/xdmod/etlv2/configuration/input/etl_action_defs_8.0.0.d/raw_generic_cloud_job_logs.json create mode 100644 tests/artifacts/xdmod/etlv2/configuration/input/etl_tables_8.0.0.d/cloud_generic/raw_event.json create mode 100644 tests/artifacts/xdmod/etlv2/configuration/input/etl_tables_8.0.0.d/cloud_generic/raw_instance_type.json create mode 100644 tests/artifacts/xdmod/etlv2/configuration/input/generic_cloud_logs/euca_acct1.json create mode 100644 tests/artifacts/xdmod/etlv2/configuration/input/generic_cloud_logs/euca_acct_empty.json create mode 100644 tests/artifacts/xdmod/etlv2/configuration/input/generic_cloud_logs/euca_acct_empty_array.json diff --git a/classes/ETL/Ingestor/StructuredFileIngestor.php b/classes/ETL/Ingestor/StructuredFileIngestor.php index a4353501bc..8573c8d36d 100644 --- a/classes/ETL/Ingestor/StructuredFileIngestor.php +++ b/classes/ETL/Ingestor/StructuredFileIngestor.php @@ -98,7 +98,7 @@ protected function _execute() // If there are no records we can bail out. Otherwise we may not even be able to infer the // source field names. - if ( 0 == $this->sourceEndpoint->count() ) { + if ( false === $firstRecord || 0 == $this->sourceEndpoint->count() ) { $this->logger->info( sprintf("Source endpoint %s returned 0 records, skipping.", $this->sourceEndpoint) ); diff --git a/tests/artifacts/xdmod/etlv2/configuration/input/etl_8.0.0.d/jobs_cloud.json b/tests/artifacts/xdmod/etlv2/configuration/input/etl_8.0.0.d/jobs_cloud.json index f8cb62c583..d1cfb8ca66 100644 --- a/tests/artifacts/xdmod/etlv2/configuration/input/etl_8.0.0.d/jobs_cloud.json +++ b/tests/artifacts/xdmod/etlv2/configuration/input/etl_8.0.0.d/jobs_cloud.json @@ -1,88 +1,127 @@ { - "defaults": { - "global": { - "endpoints": { - "source": { - "type": "mysql", - "name": "Cloud DB", - "config": "datawarehouse", - "schema": "modw_cloud" + "defaults": { + + "global": { + "endpoints": { + "source": { + "type": "mysql", + "name": "Cloud DB", + "config": "datawarehouse", + "schema": "modw_cloud" + }, + "destination": { + "type": "mysql", + "name": "Cloud DB", + "config": "datawarehouse", + "schema": "modw_cloud", + "create_schema_if_not_exists": true + } + } }, - "destination": { - "type": "mysql", - "name": "Cloud DB", - "config": "datawarehouse", - "schema": "modw_cloud", - "create_schema_if_not_exists": true + + "cloud-jobs": { + "namespace": "ETL\\Ingestor", + "options_class": "IngestorOptions" } - } }, - "cloud-jobs": { - "namespace": "ETL\\Ingestor", - "options_class": "IngestorOptions" - } - }, - "#": "Current Cloud job ingestion", - "cloud-jobs": [ - { - "name": "CloudTableManagement", - "class": "ManageTables", - "description": "Manage job tables", - "namespace": "ETL\\Maintenance", - "options_class": "MaintenanceOptions", - "definition_file_list": [ - "cloud/account.json", - "cloud/asset.json", - "cloud/asset_event.json", - "cloud/cloud_resource_metadata.json", - "cloud/event.json", - "cloud/host.json", - "cloud/image.json", - "cloud/instance_data.json", - "cloud/instance.json", - "cloud/job_record_event.json" - ], - "enabled": true - }, - { - "name": "CloudEventTypeIngestor", - "description": "Cloud event types", - "class": "StructuredFileIngestor", - "definition_file": "cloud/event_type.json", - "enabled": true, - "truncate_destination": true - }, - { - "name": "CloudAssetTypeIngestor", - "description": "Cloud asset types", - "class": "StructuredFileIngestor", - "definition_file": "cloud/asset_type.json", - "enabled": true, - "truncate_destination": true - }, - { - "name": "CloudInstanceTypeIngestor", - "description": "Cloud instance types", - "class": "StructuredFileIngestor", - "definition_file": "cloud/instance_type.json", - "enabled": true, - "truncate_destination": true - }, - { - "name": "CloudRegionIngestor", - "description": "Cloud regions", - "class": "StructuredFileIngestor", - "definition_file": "cloud/region.json", - "enabled": true, - "truncate_destination": true - }, - { - "name": "CloudAvailabilityZoneIngestor", - "description": "Cloud availability zones", - "class": "StructuredFileIngestor", - "definition_file": "cloud/avail_zone.json", - "enabled": true, - "truncate_destination": true - } - ] + + "#": "Current Cloud job ingestion", + + "cloud-jobs": [ + { + "name": "CloudTableManagement", + "class": "ManageTables", + "description": "Manage job tables", + "namespace": "ETL\\Maintenance", + "options_class": "MaintenanceOptions", + "definition_file_list": [ + "cloud/account.json", + "cloud/asset.json", + "cloud/asset_event.json", + "cloud/cloud_resource_metadata.json", + "cloud/event.json", + "cloud/host.json", + "cloud/image.json", + "cloud/instance_data.json", + "cloud/instance.json", + "cloud/job_record_event.json" + ], + "enabled": true + }, + { + "name": "CloudEventTypeIngestor", + "description": "Cloud event types", + "class": "StructuredFileIngestor", + "definition_file": "cloud/event_type.json", + "enabled": true, + "truncate_destination": true + }, + { + "name": "CloudAssetTypeIngestor", + "description": "Cloud asset types", + "class": "StructuredFileIngestor", + "definition_file": "cloud/asset_type.json", + "enabled": true, + "truncate_destination": true + }, + { + "name": "CloudInstanceTypeIngestor", + "description": "Cloud instance types", + "class": "StructuredFileIngestor", + "definition_file": "cloud/instance_type.json", + "enabled": true, + "truncate_destination": true + }, + { + "name": "CloudRegionIngestor", + "description": "Cloud regions", + "class": "StructuredFileIngestor", + "definition_file": "cloud/region.json", + "enabled": true, + "truncate_destination": true + }, + { + "name": "CloudAvailabilityZoneIngestor", + "description": "Cloud availability zones", + "class": "StructuredFileIngestor", + "definition_file": "cloud/avail_zone.json", + "enabled": true, + "truncate_destination": true + }, + { + "name": "GenericRawCloudEventIngestor", + "description": "Loading generic cloud data", + "class": "StructuredFileIngestor", + "definition_file": "raw_generic_cloud_job_logs.json", + "endpoints": { + "source": { + "type": "directoryscanner", + "name": "Generic cloud event logs", + "path": "${CLOUD_EVENT_LOG_DIR}", + "file_pattern": "/\\.json$/", + "#": "Recursion depth is relative to the path", + "recursion_depth": 1, + "handler": { + "type": "jsonfile", + "record_separator": "\n", + "field_names": [ + "instance_id", + "event_type", + "event_data", + "event_time", + "record_type", + "account", + "node_controller", + "image_type", + "instance_type", + "root_type", + "block_devices", + "public_ip", + "private_ip" + ] + } + } + } + } + ] } diff --git a/tests/artifacts/xdmod/etlv2/configuration/input/etl_action_defs_8.0.0.d/raw_generic_cloud_job_logs.json b/tests/artifacts/xdmod/etlv2/configuration/input/etl_action_defs_8.0.0.d/raw_generic_cloud_job_logs.json new file mode 100644 index 0000000000..aafb7d1626 --- /dev/null +++ b/tests/artifacts/xdmod/etlv2/configuration/input/etl_action_defs_8.0.0.d/raw_generic_cloud_job_logs.json @@ -0,0 +1,42 @@ +{ + "#": "Load raw event data from generic cloud log files. The resource id will be specified on", + "#": "the ETL command line at run time.", + + "table_definition": [ + { + "$ref": "${table_definition_dir}/cloud_generic/raw_event.json#/table_definition" + }, + { + "$ref": "${table_definition_dir}/cloud_generic/raw_instance_type.json#/table_definition" + } + ], + + "#": "Note that JSON pointers are used below to access data in nested JSON objects", + + "destination_record_map": { + "generic_cloud_raw_event": { + "event_type": "event_type", + "record_type": "record_type", + "event_time_utc": "event_time", + "provider_instance_identifier": "instance_id", + "hostname": "node_controller", + "resource_id": "${RESOURCE_ID}", + "image": "image_type", + "instance_type": "/instance_type/name", + "provider_account": "account", + "event_data": "event_data", + "first_volume": "/block_devices/0/id", + "root_volume_type": "root_type" + }, + "generic_cloud_raw_instance_type": { + "instance_type": "/instance_type/name", + "resource_id": "${RESOURCE_ID}", + "display": "/instance_type/name", + "num_cores": "/instance_type/cpu", + "memory_mb": "/instance_type/memory", + "disk_gb": "/instance_type/disk", + "start_time": "event_time" + } + } + +} diff --git a/tests/artifacts/xdmod/etlv2/configuration/input/etl_tables_8.0.0.d/cloud_generic/raw_event.json b/tests/artifacts/xdmod/etlv2/configuration/input/etl_tables_8.0.0.d/cloud_generic/raw_event.json new file mode 100644 index 0000000000..76a3c6954e --- /dev/null +++ b/tests/artifacts/xdmod/etlv2/configuration/input/etl_tables_8.0.0.d/cloud_generic/raw_event.json @@ -0,0 +1,97 @@ +{ + "#": "Raw event information from the generic cloud log files.", + + "#": "Note that almost any field in the raw event logs can be NULL so most fields are nullable.", + "#": "These will be stored here and filtered out later. For example, several events with type", + "#": "REQUEST_START have no instance id", + + "table_definition": { + "name": "generic_cloud_raw_event", + "engine": "MyISAM", + "comment": "Raw events from the log file.", + "columns": [ + { + "name": "resource_id", + "type": "int(11)", + "nullable": false + }, + { + "name": "provider_instance_identifier", + "type": "varchar(64)", + "nullable": true, + "default": null, + "comment": "Optional instance event is associated with." + }, + { + "name": "event_time_utc", + "type": "char(26)", + "nullable": false, + "default": "0000-00-00T00:00:00.000000", + "comment": "String representation of timestamp directly from the logs." + }, + { + "name": "event_type", + "type": "varchar(64)", + "nullable": false + }, + { + "name": "record_type", + "type": "varchar(64)", + "nullable": false + }, + { + "name": "hostname", + "type": "varchar(64)", + "nullable": true, + "default": null + }, + { + "name": "instance_type", + "type": "varchar(64)", + "nullable": true, + "default": null, + "comment": "Short version or abbrev" + }, + { + "name": "image", + "type": "varchar(64)", + "nullable": true, + "default": null + }, + { + "name": "provider_account", + "type": "varchar(64)", + "nullable": true, + "default": null + }, + { + "name": "event_data", + "type": "varchar(256)", + "nullable": true, + "default": null, + "comment": "Additional data specific to an event (e.g., volume, IP address, etc.)" + }, + { + "name": "first_volume", + "type": "varchar(256)", + "nullable": true, + "default": null + }, + { + "name": "root_volume_type", + "type": "varchar(64)", + "nullable": true, + "default": null + } + ], + "indexes": [ + { + "name": "resource_id", + "columns": [ + "resource_id" + ], + "is_unique": false + } + ] + } +} diff --git a/tests/artifacts/xdmod/etlv2/configuration/input/etl_tables_8.0.0.d/cloud_generic/raw_instance_type.json b/tests/artifacts/xdmod/etlv2/configuration/input/etl_tables_8.0.0.d/cloud_generic/raw_instance_type.json new file mode 100644 index 0000000000..e2acbea094 --- /dev/null +++ b/tests/artifacts/xdmod/etlv2/configuration/input/etl_tables_8.0.0.d/cloud_generic/raw_instance_type.json @@ -0,0 +1,73 @@ +{ + "#": "Raw instance type information from the generic cloud log files.", + + "#": "Note that some log entries do not have an instance type so all fields are nullable", + "#": "and will be filtered out when ingesting into the final table.", + + "table_definition": { + "name": "generic_cloud_raw_instance_type", + "engine": "MyISAM", + "comment": "Raw instance type data parsed from log files.", + "columns": [ + { + "name": "instance_type", + "type": "varchar(64)", + "nullable": true, + "comment": "Short version or abbrev" + }, + { + "name": "resource_id", + "type": "int(11)", + "nullable": false, + "comment": "Resource to which this type belongs" + }, + { + "name": "display", + "type": "varchar(256)", + "nullable": true, + "comment": "What to show the user" + }, + { + "name": "description", + "type": "varchar(1024)", + "nullable": true, + "default": null + }, + { + "name": "num_cores", + "type": "int(11)", + "nullable": true, + "default": 0 + }, + { + "name": "memory_mb", + "type": "int(11)", + "nullable": true, + "default": 0 + }, + { + "name": "disk_gb", + "type": "int(11)", + "nullable": true, + "default": 0, + "comment": "Disk size configured in image" + }, + { + "name": "start_time", + "type": "char(26)", + "nullable": true, + "default": "0000-00-00T00:00:00.000000", + "comment": "First time this configuration was encountered as a unix timestamp to the microsecond, defaults to unknown." + } + ], + "indexes": [ + { + "name": "resource_id", + "columns": [ + "resource_id" + ], + "is_unique": false + } + ] + } +} diff --git a/tests/artifacts/xdmod/etlv2/configuration/input/generic_cloud_logs/euca_acct1.json b/tests/artifacts/xdmod/etlv2/configuration/input/generic_cloud_logs/euca_acct1.json new file mode 100644 index 0000000000..dad3a7e9f8 --- /dev/null +++ b/tests/artifacts/xdmod/etlv2/configuration/input/generic_cloud_logs/euca_acct1.json @@ -0,0 +1,2 @@ +{"node_controller": null, "public_ip": null, "account": "000048934329", "event_type": "STATE_REPORT", "event_time": "2017-05-16T03:55:04Z", "instance_type": {"name": "c1.medium", "cpu": "4", "memory": "16384", "disk": "40", "networkInterfaces": "2"}, "image_type": "emi-521695e8", "instance_id": "i-cb13943e", "record_type": "ADMINISTRATIVE", "block_devices": [{"account": "big", "attach_time": "2017-04-19T13:47:38.609Z", "backing": "ebs", "create_time": "2017-04-19T13:47:38.550Z", "user": "tyearke", "id": "vol-6a9b5bc2", "size": "40"}], "private_ip": null, "root_type": "ebs"} +{"node_controller": "172.17.0.31", "public_ip": "199.109.192.61", "account": "000669660540", "event_type": "STATE_REPORT", "event_time": "2017-05-16T03:55:04Z", "instance_type": {"name": "m1.medium", "cpu": "2", "memory": "4096", "disk": "20", "networkInterfaces": "2"}, "image_type": "emi-3f83abf8", "instance_id": "i-dd04e6bf", "record_type": "ADMINISTRATIVE", "block_devices": [{"account": "redfly", "attach_time": "2017-03-21T16:57:45.376Z", "backing": "ebs", "create_time": "2017-03-21T16:57:45.330Z", "user": "riveraj", "id": "vol-dae393e0", "size": "10"}], "private_ip": "172.17.47.126", "root_type": "ebs"} diff --git a/tests/artifacts/xdmod/etlv2/configuration/input/generic_cloud_logs/euca_acct_empty.json b/tests/artifacts/xdmod/etlv2/configuration/input/generic_cloud_logs/euca_acct_empty.json new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/artifacts/xdmod/etlv2/configuration/input/generic_cloud_logs/euca_acct_empty_array.json b/tests/artifacts/xdmod/etlv2/configuration/input/generic_cloud_logs/euca_acct_empty_array.json new file mode 100644 index 0000000000..1e3ec7217a --- /dev/null +++ b/tests/artifacts/xdmod/etlv2/configuration/input/generic_cloud_logs/euca_acct_empty_array.json @@ -0,0 +1 @@ +[ ] diff --git a/tests/component/lib/ETL/IngestorTest.php b/tests/component/lib/ETL/IngestorTest.php index 495b92e22d..9402d06cb5 100644 --- a/tests/component/lib/ETL/IngestorTest.php +++ b/tests/component/lib/ETL/IngestorTest.php @@ -9,13 +9,25 @@ use CCR\DB; /** - * Test various components of the ETLv2 ingestors. + * Test various components of the ETLv2 ingestors. All tests in this file run the etl_overseer.php + * script to test the entire pipeline. The following tests are performed: + * + * 1. Load invalid data and ensure that LOAD DATA INFILE returns appropriate warning messages. + * 2. Insert truncated or out of range data and ensure that the SQL statements returns warning messages. + * 3. Insert truncated and out of range data but hide SQL warnings. + * 4. Insert truncated and out of range data but hide SQL warnings for incorrect values, leaving + * warnings for out of range values. + * 5. Test the structured file ingestor using the directory scanner with one empty (0 byte) + * file, one file containing an empty JSON array, and another file containing 2 records of + * data. */ class IngestorTest extends \PHPUnit_Framework_TestCase { + const TEST_INPUT_DIR = '/tests/artifacts/xdmod/etlv2/configuration/input'; + /** - * Load invalid data and ensure that LOAD DATA INFILE returns appropriate warning messages. + * 1. Load invalid data and ensure that LOAD DATA INFILE returns appropriate warning messages. */ public function testLoadDataInfileWarnings() { @@ -45,7 +57,8 @@ public function testLoadDataInfileWarnings() { } /** - * Insert truncated or out of range data and ensure that the SQL statements returns warning messages. + * 2. Insert truncated or out of range data and ensure that the SQL statements returns warning + * messages. */ public function testSqlWarnings() { @@ -65,8 +78,9 @@ public function testSqlWarnings() { if ( ! empty($result['stdout']) ) { foreach ( explode(PHP_EOL, trim($result['stdout'])) as $line ) { - $this->assertRegExp('/\[warning\]/', $line); - $numWarnings++; + if ( false !== strpos($line, '[warning]') ) { + $numWarnings++; + } } } @@ -75,7 +89,7 @@ public function testSqlWarnings() { } /** - * Insert truncated and out of range data but hide SQL warnings. + * 3. Insert truncated and out of range data but hide SQL warnings. */ public function testHideSqlWarnings() { @@ -85,22 +99,18 @@ public function testHideSqlWarnings() { // We are expecting no warnings to be returned - $numWarnings = 0; - if ( ! empty($result['stdout']) ) { foreach ( explode(PHP_EOL, trim($result['stdout'])) as $line ) { $this->assertNotRegExp('/\[warning\]/', $line); - $numWarnings++; } } - $this->assertEquals(0, $numWarnings, 'Expected number of SQL warnings'); $this->assertEquals('', $result['stderr'], "Std Error"); } /** - * Insert truncated and out of range data but hide SQL warnings for incorrect values, - * leaving warnings for out of range values. + * 4. Insert truncated and out of range data but hide SQL warnings for incorrect values, leaving + * warnings for out of range values. */ public function testHideSqlWarningCodes() { @@ -118,8 +128,9 @@ public function testHideSqlWarningCodes() { if ( ! empty($result['stdout']) ) { foreach ( explode(PHP_EOL, trim($result['stdout'])) as $line ) { - $this->assertRegExp('/\[warning\]/', $line); - $numWarnings++; + if ( false !== strpos($line, '[warning]') ) { + $numWarnings++; + } } } @@ -134,8 +145,9 @@ public function testHideSqlWarningCodes() { if ( ! empty($result['stdout']) ) { foreach ( explode(PHP_EOL, trim($result['stdout'])) as $line ) { - $this->assertRegExp('/\[warning\]/', $line); - $numWarnings++; + if ( false !== strpos($line, '[warning]') ) { + $numWarnings++; + } } } @@ -143,6 +155,22 @@ public function testHideSqlWarningCodes() { $this->assertEquals('', $result['stderr'], "Std Error"); } + /** + * 5. Test the structured file ingestor using the directory scanner with one empty (0 byte) + * file, one file containing an empty JSON array, and another file containing 2 records of + * dat. + */ + + public function testStructuredFileIngestorWithDirectoryScanner() { + $result = $this->executeOverseerAction( + 'xdmod.cloud-jobs.GenericRawCloudEventIngestor', + sprintf('-v notice -d "CLOUD_EVENT_LOG_DIR=%s/generic_cloud_logs"', BASE_DIR . self::TEST_INPUT_DIR) + ); + + $this->assertEquals(0, $result['exit_status'], 'Exit code'); + $this->assertEquals('', $result['stderr'], 'Std Error'); + } + /** * Execute the ETL overseer. * @@ -154,10 +182,16 @@ private function executeOverseerAction($action, $localOptions = "") { // Note that tests are run in the directory where the PHP class is defined. $overseer = realpath(BASE_DIR . '/tools/etl/etl_overseer.php'); - $configFile = realpath(BASE_DIR . '/tests/artifacts/xdmod/etlv2/configuration/input/xdmod_etl_config_8.0.0.json'); - $options = sprintf('-c %s -a %s %s -v warning', $configFile, $action, $localOptions); - $command = sprintf('%s %s', $overseer, $options); + $configFile = realpath(BASE_DIR . self::TEST_INPUT_DIR . '/xdmod_etl_config_8.0.0.json'); + $options = sprintf('-c %s -a %s %s', $configFile, $action, $localOptions); + + // Add a verbosity flag if the local options do not already contain one + if ( "" == $localOptions || false === strpos($localOptions, '-v') ) { + $options = sprintf('%s -v warning', $options); + } + $pipes = array(); + $command = sprintf('%s %s', $overseer, $options); $process = proc_open( $command,