Skip to content

Commit

Permalink
Improve DirectorySanner to properly handle first file being empty (#931)
Browse files Browse the repository at this point in the history
Improve DirectorySanner to properly handle first file being empty
  • Loading branch information
smgallo authored May 21, 2019
2 parents d5eeb65 + 58e6639 commit a41740a
Show file tree
Hide file tree
Showing 10 changed files with 402 additions and 105 deletions.
13 changes: 11 additions & 2 deletions classes/ETL/DataEndpoint/DirectoryScanner.php
Original file line number Diff line number Diff line change
Expand Up @@ -1059,7 +1059,15 @@ public function valid()
$this->firstFileIterator = $this->currentFileIterator;
$this->firstFilename = $this->currentFilename;

} elseif ( ! $this->currentFileIterator->valid() ) {
}

// Verify that the current iterator is valid. It is possible that the file does not contain
// any records.

if ( ! $this->currentFileIterator->valid() ) {

// Does this need to be it's own separate clause to catch cases wherethe current file
// iterator is not valid?!? Yes. This is the way of the future.

// If there are no records available in the current file we will need to move on to the
// next file. Since a file could be empty, move on to the next file if we initialize a
Expand Down Expand Up @@ -1136,11 +1144,12 @@ private function initializeCurrentFileIterator($filename)
}

$fileHandler->verify();
// Must call parse() for fileHandler->valid() to work proplerly.
$record = $fileHandler->parse();

// Save the first record parsed from the first file so we can return it from parse()

if ( null === $this->firstRecord ) {
if ( null === $this->firstRecord && false !== $record ) {
$this->firstRecord = $record;
}

Expand Down
2 changes: 1 addition & 1 deletion classes/ETL/Ingestor/StructuredFileIngestor.php
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ protected function _execute()
// If there are no records we can bail out. Otherwise we may not even be able to infer the
// source field names.

if ( 0 == $this->sourceEndpoint->count() ) {
if ( false === $firstRecord || 0 == $this->sourceEndpoint->count() ) {
$this->logger->info(
sprintf("Source endpoint %s returned 0 records, skipping.", $this->sourceEndpoint)
);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,88 +1,127 @@
{
"defaults": {
"global": {
"endpoints": {
"source": {
"type": "mysql",
"name": "Cloud DB",
"config": "datawarehouse",
"schema": "modw_cloud"
"defaults": {

"global": {
"endpoints": {
"source": {
"type": "mysql",
"name": "Cloud DB",
"config": "datawarehouse",
"schema": "modw_cloud"
},
"destination": {
"type": "mysql",
"name": "Cloud DB",
"config": "datawarehouse",
"schema": "modw_cloud",
"create_schema_if_not_exists": true
}
}
},
"destination": {
"type": "mysql",
"name": "Cloud DB",
"config": "datawarehouse",
"schema": "modw_cloud",
"create_schema_if_not_exists": true

"cloud-jobs": {
"namespace": "ETL\\Ingestor",
"options_class": "IngestorOptions"
}
}
},
"cloud-jobs": {
"namespace": "ETL\\Ingestor",
"options_class": "IngestorOptions"
}
},
"#": "Current Cloud job ingestion",
"cloud-jobs": [
{
"name": "CloudTableManagement",
"class": "ManageTables",
"description": "Manage job tables",
"namespace": "ETL\\Maintenance",
"options_class": "MaintenanceOptions",
"definition_file_list": [
"cloud/account.json",
"cloud/asset.json",
"cloud/asset_event.json",
"cloud/cloud_resource_metadata.json",
"cloud/event.json",
"cloud/host.json",
"cloud/image.json",
"cloud/instance_data.json",
"cloud/instance.json",
"cloud/job_record_event.json"
],
"enabled": true
},
{
"name": "CloudEventTypeIngestor",
"description": "Cloud event types",
"class": "StructuredFileIngestor",
"definition_file": "cloud/event_type.json",
"enabled": true,
"truncate_destination": true
},
{
"name": "CloudAssetTypeIngestor",
"description": "Cloud asset types",
"class": "StructuredFileIngestor",
"definition_file": "cloud/asset_type.json",
"enabled": true,
"truncate_destination": true
},
{
"name": "CloudInstanceTypeIngestor",
"description": "Cloud instance types",
"class": "StructuredFileIngestor",
"definition_file": "cloud/instance_type.json",
"enabled": true,
"truncate_destination": true
},
{
"name": "CloudRegionIngestor",
"description": "Cloud regions",
"class": "StructuredFileIngestor",
"definition_file": "cloud/region.json",
"enabled": true,
"truncate_destination": true
},
{
"name": "CloudAvailabilityZoneIngestor",
"description": "Cloud availability zones",
"class": "StructuredFileIngestor",
"definition_file": "cloud/avail_zone.json",
"enabled": true,
"truncate_destination": true
}
]

"#": "Current Cloud job ingestion",

"cloud-jobs": [
{
"name": "CloudTableManagement",
"class": "ManageTables",
"description": "Manage job tables",
"namespace": "ETL\\Maintenance",
"options_class": "MaintenanceOptions",
"definition_file_list": [
"cloud/account.json",
"cloud/asset.json",
"cloud/asset_event.json",
"cloud/cloud_resource_metadata.json",
"cloud/event.json",
"cloud/host.json",
"cloud/image.json",
"cloud/instance_data.json",
"cloud/instance.json",
"cloud/job_record_event.json"
],
"enabled": true
},
{
"name": "CloudEventTypeIngestor",
"description": "Cloud event types",
"class": "StructuredFileIngestor",
"definition_file": "cloud/event_type.json",
"enabled": true,
"truncate_destination": true
},
{
"name": "CloudAssetTypeIngestor",
"description": "Cloud asset types",
"class": "StructuredFileIngestor",
"definition_file": "cloud/asset_type.json",
"enabled": true,
"truncate_destination": true
},
{
"name": "CloudInstanceTypeIngestor",
"description": "Cloud instance types",
"class": "StructuredFileIngestor",
"definition_file": "cloud/instance_type.json",
"enabled": true,
"truncate_destination": true
},
{
"name": "CloudRegionIngestor",
"description": "Cloud regions",
"class": "StructuredFileIngestor",
"definition_file": "cloud/region.json",
"enabled": true,
"truncate_destination": true
},
{
"name": "CloudAvailabilityZoneIngestor",
"description": "Cloud availability zones",
"class": "StructuredFileIngestor",
"definition_file": "cloud/avail_zone.json",
"enabled": true,
"truncate_destination": true
},
{
"name": "GenericRawCloudEventIngestor",
"description": "Loading generic cloud data",
"class": "StructuredFileIngestor",
"definition_file": "raw_generic_cloud_job_logs.json",
"endpoints": {
"source": {
"type": "directoryscanner",
"name": "Generic cloud event logs",
"path": "${CLOUD_EVENT_LOG_DIR}",
"file_pattern": "/\\.json$/",
"#": "Recursion depth is relative to the path",
"recursion_depth": 1,
"handler": {
"type": "jsonfile",
"record_separator": "\n",
"field_names": [
"instance_id",
"event_type",
"event_data",
"event_time",
"record_type",
"account",
"node_controller",
"image_type",
"instance_type",
"root_type",
"block_devices",
"public_ip",
"private_ip"
]
}
}
}
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
{
"#": "Load raw event data from generic cloud log files. The resource id will be specified on",
"#": "the ETL command line at run time.",

"table_definition": [
{
"$ref": "${table_definition_dir}/cloud_generic/raw_event.json#/table_definition"
},
{
"$ref": "${table_definition_dir}/cloud_generic/raw_instance_type.json#/table_definition"
}
],

"#": "Note that JSON pointers are used below to access data in nested JSON objects",

"destination_record_map": {
"generic_cloud_raw_event": {
"event_type": "event_type",
"record_type": "record_type",
"event_time_utc": "event_time",
"provider_instance_identifier": "instance_id",
"hostname": "node_controller",
"resource_id": "${RESOURCE_ID}",
"image": "image_type",
"instance_type": "/instance_type/name",
"provider_account": "account",
"event_data": "event_data",
"first_volume": "/block_devices/0/id",
"root_volume_type": "root_type"
},
"generic_cloud_raw_instance_type": {
"instance_type": "/instance_type/name",
"resource_id": "${RESOURCE_ID}",
"display": "/instance_type/name",
"num_cores": "/instance_type/cpu",
"memory_mb": "/instance_type/memory",
"disk_gb": "/instance_type/disk",
"start_time": "event_time"
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
{
"#": "Raw event information from the generic cloud log files.",

"#": "Note that almost any field in the raw event logs can be NULL so most fields are nullable.",
"#": "These will be stored here and filtered out later. For example, several events with type",
"#": "REQUEST_START have no instance id",

"table_definition": {
"name": "generic_cloud_raw_event",
"engine": "MyISAM",
"comment": "Raw events from the log file.",
"columns": [
{
"name": "resource_id",
"type": "int(11)",
"nullable": false
},
{
"name": "provider_instance_identifier",
"type": "varchar(64)",
"nullable": true,
"default": null,
"comment": "Optional instance event is associated with."
},
{
"name": "event_time_utc",
"type": "char(26)",
"nullable": false,
"default": "0000-00-00T00:00:00.000000",
"comment": "String representation of timestamp directly from the logs."
},
{
"name": "event_type",
"type": "varchar(64)",
"nullable": false
},
{
"name": "record_type",
"type": "varchar(64)",
"nullable": false
},
{
"name": "hostname",
"type": "varchar(64)",
"nullable": true,
"default": null
},
{
"name": "instance_type",
"type": "varchar(64)",
"nullable": true,
"default": null,
"comment": "Short version or abbrev"
},
{
"name": "image",
"type": "varchar(64)",
"nullable": true,
"default": null
},
{
"name": "provider_account",
"type": "varchar(64)",
"nullable": true,
"default": null
},
{
"name": "event_data",
"type": "varchar(256)",
"nullable": true,
"default": null,
"comment": "Additional data specific to an event (e.g., volume, IP address, etc.)"
},
{
"name": "first_volume",
"type": "varchar(256)",
"nullable": true,
"default": null
},
{
"name": "root_volume_type",
"type": "varchar(64)",
"nullable": true,
"default": null
}
],
"indexes": [
{
"name": "resource_id",
"columns": [
"resource_id"
],
"is_unique": false
}
]
}
}
Loading

0 comments on commit a41740a

Please sign in to comment.