Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for cloud data to xdmod-shredder and xdmod-ingestor #739

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
ed0a524
Changes to xdmod-ingestor and xdmod-shredder to support cloud based e…
eiffel777 Oct 24, 2018
56ae3f9
adding checking for exception for when cloud table do not exist. adde…
eiffel777 Nov 8, 2018
c10909b
building filter list after running cloud aggregation in ingestor script
eiffel777 Nov 26, 2018
b55927a
add cloud shredder abstract class to reduce code duplication. renamed…
eiffel777 Nov 26, 2018
a1d9388
added new function to check if a realm is enabled before trying to in…
eiffel777 Nov 28, 2018
7074fc5
adding tests for new shredders
eiffel777 Nov 30, 2018
eef0291
adding phpdocs for realmEnabled function
eiffel777 Nov 30, 2018
901679b
documentation changes
eiffel777 Nov 30, 2018
b6d5609
style and test changes
eiffel777 Nov 30, 2018
ddf1eba
style changes
eiffel777 Nov 30, 2018
29c3801
adding newline that shouldn't have been deleted
eiffel777 Nov 30, 2018
e228e6d
use false instead of null to determine if realm should be aggregated
eiffel777 Dec 3, 2018
ebe18ad
change $realmToAggregate to default to false instead of null
eiffel777 Dec 3, 2018
64abdda
addressing nitpicks from @plessbd
eiffel777 Dec 14, 2018
37564c8
fixing merge conflict
eiffel777 Dec 14, 2018
30a5391
removing minmaxdate statement from bootstrap.sh
eiffel777 Dec 14, 2018
3ba69fc
Merge branch 'xdmod8.1' of https://github.com/ubccr/xdmod into add-cl…
eiffel777 Dec 17, 2018
c1af695
setting etl pipline to use for cloud shredders in constructor.
eiffel777 Dec 17, 2018
45db850
changes to get tests to pass
eiffel777 Dec 17, 2018
b60aa6b
update suggested by @plessbd
eiffel777 Dec 20, 2018
f3dc0ec
Merge branch 'xdmod8.1' into add-cloud-ingestion-to-ingestor-script
Dec 20, 2018
0eb33e0
Merge branch 'xdmod8.1' into add-cloud-ingestion-to-ingestor-script
Dec 20, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 34 additions & 6 deletions bin/xdmod-ingestor
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ function main()
array('q', 'quiet'),

array('', 'ingest'),
array('', 'aggregate'),
array('', 'aggregate::'),

// Dates used by ingestion.
array('', 'start-date:'),
Expand All @@ -57,6 +57,9 @@ function main()
array('', 'ingest-staging'),
array('', 'ingest-hpcdb'),

// Type of data that is being ingested.
array('', 'datatype:'),

// Specify an ingestor.
array('', 'ingestor:'),
);
Expand All @@ -75,9 +78,9 @@ function main()
}

$help = $ingest = $aggregate = $noAppend = $ingestAll = $ingestShredded
= $ingestStaging = $ingestHpcdb = false;
= $ingestStaging = $ingestHpcdb = $datatype = $realmToAggregate = false;

$startDate = $endDate = $lastModifiedStartDate = null;
$startDate = $endDate = $lastModifiedStartDate = $datatypeValue = null;

$logLevel = -1;

Expand Down Expand Up @@ -108,6 +111,7 @@ function main()
break;
case 'aggregate':
$aggregate = true;
$realmToAggregate = $value;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the docs the $value for an optional argument is false if it is not specified. Assuming the documentation is correct you'll need to check for === false rather than === null on line 287. Please confirm.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jpwhite4 Yeah, that does seem to be the case. It should be changed now.

plessbd marked this conversation as resolved.
Show resolved Hide resolved
break;
case 'start-date':
$startDate = $value;
Expand All @@ -130,6 +134,10 @@ function main()
case 'ingest-hpcdb':
$ingestHpcdb = true;
break;
case 'datatype':
$datatype = true;
plessbd marked this conversation as resolved.
Show resolved Hide resolved
$datatypeValue = $value;
break;
default:
fwrite(STDERR, "Unexpected option '$key'\n");
exit(1);
Expand Down Expand Up @@ -185,6 +193,11 @@ function main()
$logger->info("Using today '$endDate' for end date");
}

if($datatype !== false && $datatypeValue === null){
$logger->crit("You must specify the type of data you want to ingest");
exit(1);
}

$hpcdbDb = DB::factory('hpcdb');
$dwDb = DB::factory('datawarehouse');

Expand Down Expand Up @@ -220,7 +233,7 @@ function main()
$ingest = $aggregate = true;
}
// If any ingestion phase is specified, don't aggregate.
if ($ingestAll || $ingestShredded || $ingestStaging || $ingestHpcdb) {
if ($ingestAll || $ingestShredded || $ingestStaging || $ingestHpcdb || $datatype) {
$aggregate = false;
}

Expand All @@ -229,7 +242,7 @@ function main()
try {

// If no ingestion phase is specified, ingest all.
if (!$ingestShredded && !$ingestStaging && !$ingestHpcdb) {
if (!$ingestShredded && !$ingestStaging && !$ingestHpcdb && !$datatype){
$ingestAll = true;
}

Expand All @@ -248,6 +261,14 @@ function main()
if ($ingestHpcdb) {
$dwi->ingestAllHpcdb($startDate, $endDate);
}

if($datatypeValue == 'openstack'){
$dwi->ingestCloudDataOpenStack();
}

if($datatypeValue == 'genericcloud'){
$dwi->ingestCloudDataGeneric();
}
}
} catch (Exception $e) {
$logger->crit(array(
Expand All @@ -262,7 +283,14 @@ function main()
if ($aggregate) {
$logger->info('Aggregating data');
try {
$dwi->aggregateAllJobs($lastModifiedStartDate);
// If there is no realm specified to aggregate then all realms should be aggregated
if($realmToAggregate == 'job' || $realmToAggregate === false){
$dwi->aggregateAllJobs($lastModifiedStartDate);
}

if($realmToAggregate == 'cloud' || $realmToAggregate === false){
$dwi->aggregateCloudData();
}
} catch (Exception $e) {
$logger->crit(array(
'message' => 'Aggregation failed: ' . $e->getMessage(),
Expand Down
9 changes: 7 additions & 2 deletions bin/xdmod-shredder
Original file line number Diff line number Diff line change
Expand Up @@ -234,11 +234,16 @@ function main()
}

if (!$dryRun) {
$logger->notice('Normalizing data');
$logger->notice('Normalizing data!');
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am so excited we are normalizing this data, before I was just fine.


try {
$ingestor = $shredder->getJobIngestor();
$ingestor->ingest();
// The cloud shredders do not have jobs to ingest and return false when
// getJobInestor is called for them so we don't have to hard code skippping
// those formats here.
if($ingestor !== false){
$ingestor->ingest();
}
} catch (Exception $e) {
$logger->crit(array(
'message' => 'Ingestion failed: ' . $e->getMessage(),
Expand Down
50 changes: 50 additions & 0 deletions classes/ETL/Utilities.php
Original file line number Diff line number Diff line change
Expand Up @@ -323,4 +323,54 @@ public static function quoteVariables(array $variables, VariableStore $variableS

return $localVariableMap;
} // quoteVariables()

public static function runEtlPipeline(array $pipelines, $logger, array $params = array())
Copy link
Contributor

@plessbd plessbd Dec 14, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels like this should be a separate pull request that also removes similar code from the code base:

$overseer = new EtlOverseer($overseerOptions, $this->logger);

$overseer = new EtlOverseer($overseerOptions, $this->logger);

$overseer = new \ETL\EtlOverseer($overseerOptions, $logger);

$overseer = new EtlOverseer($overseerOptions, $log);

Specifically so we dont have the many places this is done in our code base and we dont forget about it.

You do state in the description that there is a subsequent PR for this can you please link to it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the branch, https://github.com/eiffel777/xdmod/tree/move-runEtlPipeline-references-to-etl-utilities-class, that has the changes talked about in the last paragraph of the description. It's a bit behind xdmod8.1 right now so there is a some extra code in the diff that can be ignored

{
$logger->debug(
sprintf(
'Shredding directory using ETL pipeline "%s" with parameters %s',
implode(', ', $pipelines),
json_encode($params)
)
);

$configOptions = array('default_module_name' => 'xdmod');
if( array_key_exists('variable-overrides', $params) ){
$configOptions['config_variables'] = $params['variable-overrides'];
}

$etlConfig = new EtlConfiguration(
CONFIG_DIR . '/etl/etl.json',
null,
$logger,
$configOptions
);
$etlConfig->initialize();
self::setEtlConfig($etlConfig);

$scriptOptions = array_merge(
array(
'default-module-name' => 'xdmod',
'process-sections' => $pipelines,
),
$params
);
$logger->debug(
sprintf(
'Running ETL pipeline with script options %s',
json_encode($scriptOptions)
)
);

$overseerOptions = new EtlOverseerOptions(
$scriptOptions,
$logger
);

$utilitySchema = $etlConfig->getGlobalEndpoint('utility')->getSchema();
$overseerOptions->setResourceCodeToIdMapSql(sprintf("SELECT id, code from %s.resourcefact", $utilitySchema));

$overseer = new EtlOverseer($overseerOptions, $logger);
$overseer->execute($etlConfig);
} // runEtlPipeline
} // class Utilities
152 changes: 118 additions & 34 deletions classes/OpenXdmod/DataWarehouseInitializer.php
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ public function ingestAll($startDate = null, $endDate = null)
$this->ingestAllShredded($startDate, $endDate);
$this->ingestAllStaging($startDate, $endDate);
$this->ingestAllHpcdb($startDate, $endDate);
$this->ingestCloudDataGeneric();
$this->ingestCloudDataOpenStack();
}

/**
Expand All @@ -136,9 +138,11 @@ public function ingestAll($startDate = null, $endDate = null)
*/
public function ingestAllShredded($startDate = null, $endDate = null)
{
$this->logger->debug('Ingesting shredded data to staging tables');
$this->runEtlPipeline('staging-ingest-common');
$this->runEtlPipeline('staging-ingest-jobs');
if( $this->realmEnabled('Jobs')){
$this->logger->debug('Ingesting shredded data to staging tables');
$this->runEtlPipeline('staging-ingest-common');
$this->runEtlPipeline('staging-ingest-jobs');
}
}

/**
Expand All @@ -149,9 +153,11 @@ public function ingestAllShredded($startDate = null, $endDate = null)
*/
public function ingestAllStaging($startDate = null, $endDate = null)
{
$this->logger->debug('Ingesting staging data to HPCDB');
$this->runEtlPipeline('hpcdb-ingest-common');
$this->runEtlPipeline('hpcdb-ingest-jobs');
if( $this->realmEnabled('Jobs')){
$this->logger->debug('Ingesting staging data to HPCDB');
$this->runEtlPipeline('hpcdb-ingest-common');
$this->runEtlPipeline('hpcdb-ingest-jobs');
}
}

/**
Expand All @@ -162,33 +168,98 @@ public function ingestAllStaging($startDate = null, $endDate = null)
*/
public function ingestAllHpcdb($startDate = null, $endDate = null)
{
$this->logger->debug('Ingesting HPCDB data to modw');

if ($startDate !== null || $endDate !== null) {
$params = array();
if ($startDate !== null) {
$params['start-date'] = $startDate . ' 00:00:00';
}
if ($endDate !== null) {
$params['end-date'] = $endDate . ' 23:59:59';
if( $this->realmEnabled('Jobs')){
$this->logger->debug('Ingesting HPCDB data to modw');

if ($startDate !== null || $endDate !== null) {
$params = array();
if ($startDate !== null) {
$params['start-date'] = $startDate . ' 00:00:00';
}
if ($endDate !== null) {
$params['end-date'] = $endDate . ' 23:59:59';
}
$this->runEtlPipeline(
'hpcdb-prep-xdw-job-ingest-by-date-range',
$params
);
} else {
$this->runEtlPipeline('hpcdb-prep-xdw-job-ingest-by-new-jobs');
}

// Use current time from the database in case clocks are not
// synchronized.
$lastModifiedStartDate
= $this->hpcdbDb->query('SELECT NOW() AS now FROM dual')[0]['now'];

$this->runEtlPipeline(
'hpcdb-prep-xdw-job-ingest-by-date-range',
$params
'hpcdb-xdw-ingest',
array('last-modified-start-date' => $lastModifiedStartDate)
);
} else {
$this->runEtlPipeline('hpcdb-prep-xdw-job-ingest-by-new-jobs');
}
}

// Use current time from the database in case clocks are not
// synchronized.
$lastModifiedStartDate
= $this->hpcdbDb->query('SELECT NOW() AS now FROM dual')[0]['now'];
/**
* Extracting openstack data from the openstack_raw_events table. If the raw
* tables do not exist then catch the resulting exception and display a message
* saying that there is no OpenStack data to ingest.
*/
public function ingestCloudDataOpenStack()
{
if( $this->realmEnabled('Cloud') ){
try{
$this->logger->notice('Ingesting OpenStack event log data');
$this->runEtlPipeline('jobs-cloud-extract-openstack');
}
catch( Exception $e ){
if( $e->getCode() == 1146 ){
$this->logger->notice('No OpenStack events to ingest');
}
else{
throw $e;
}
}
}
}

$this->runEtlPipeline(
'hpcdb-xdw-ingest',
array('last-modified-start-date' => $lastModifiedStartDate)
);
/**
* Extracting cloud log data from the generic_raw_events table. If the raw
* tables do not exist then catch the resulting exception and display a message
* saying that there is no generic cloud data to ingest.
*/
public function ingestCloudDataGeneric()
{
if( $this->realmEnabled('Cloud') ){
try{
$this->logger->notice('Ingesting generic cloud log files');
$this->runEtlPipeline('jobs-cloud-extract-eucalyptus');
}
catch( Exception $e ){
if( $e->getCode() == 1146 ){
$this->logger->notice('No cloud event data to ingest');
}
else{
throw $e;
}
}
}
}

/**
* Aggregating all cloud data. If the appropriate tables do not exist then
* catch the resulting exception and display a message saying that there
* is no cloud data to aggregate and cloud aggregation is being skipped.
*/
public function aggregateCloudData()
{
if( $this->realmEnabled('Cloud') ){
$this->logger->notice('Aggregating Cloud data');
$this->runEtlPipeline('cloud-state-pipeline');

$filterListBuilder = new FilterListBuilder();
$filterListBuilder->setLogger($this->logger);
$filterListBuilder->buildRealmLists('Cloud');
}
}

/**
Expand Down Expand Up @@ -217,13 +288,15 @@ public function initializeAggregation($startDate = null, $endDate = null)
*/
public function aggregateAllJobs($lastModifiedStartDate)
{
$this->runEtlPipeline(
'jobs-xdw-aggregate',
array('last-modified-start-date' => $lastModifiedStartDate)
);
$filterListBuilder = new FilterListBuilder();
$filterListBuilder->setLogger($this->logger);
$filterListBuilder->buildRealmLists('Jobs');
if( $this->realmEnabled('Jobs') ){
$this->runEtlPipeline(
'jobs-xdw-aggregate',
array('last-modified-start-date' => $lastModifiedStartDate)
);
$filterListBuilder = new FilterListBuilder();
$filterListBuilder->setLogger($this->logger);
$filterListBuilder->buildRealmLists('Jobs');
}
}

/**
Expand Down Expand Up @@ -275,6 +348,17 @@ public function aggregate(
));
}

/**
* Check to see if a realm exists in the realms table
*
* @param string $realm The realm you are checking to see if exists
*/
private function realmEnabled($realm)
{
$realms = $this->warehouseDb->query("SELECT * FROM moddb.realms WHERE display = :realm", [':realm' => $realm]);
return (count($realms) > 0);
}

/**
* Run an ETL pipeline.
*
Expand Down
Loading