Skip to content

Commit

Permalink
Merge pull request #739 from eiffel777/add-cloud-ingestion-to-ingesto…
Browse files Browse the repository at this point in the history
…r-script

Add cloud data to xdmod-shredder and xdmod-ingestor
  • Loading branch information
Greg Dean authored Dec 20, 2018
2 parents d290456 + 0eb33e0 commit 0febec4
Show file tree
Hide file tree
Showing 13 changed files with 598 additions and 52 deletions.
40 changes: 34 additions & 6 deletions bin/xdmod-ingestor
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ function main()
array('q', 'quiet'),

array('', 'ingest'),
array('', 'aggregate'),
array('', 'aggregate::'),

// Dates used by ingestion.
array('', 'start-date:'),
Expand All @@ -57,6 +57,9 @@ function main()
array('', 'ingest-staging'),
array('', 'ingest-hpcdb'),

// Type of data that is being ingested.
array('', 'datatype:'),

// Specify an ingestor.
array('', 'ingestor:'),
);
Expand All @@ -75,9 +78,9 @@ function main()
}

$help = $ingest = $aggregate = $noAppend = $ingestAll = $ingestShredded
= $ingestStaging = $ingestHpcdb = false;
= $ingestStaging = $ingestHpcdb = $datatype = $realmToAggregate = false;

$startDate = $endDate = $lastModifiedStartDate = null;
$startDate = $endDate = $lastModifiedStartDate = $datatypeValue = null;

$logLevel = -1;

Expand Down Expand Up @@ -108,6 +111,7 @@ function main()
break;
case 'aggregate':
$aggregate = true;
$realmToAggregate = $value;
break;
case 'start-date':
$startDate = $value;
Expand All @@ -130,6 +134,10 @@ function main()
case 'ingest-hpcdb':
$ingestHpcdb = true;
break;
case 'datatype':
$datatype = true;
$datatypeValue = $value;
break;
default:
fwrite(STDERR, "Unexpected option '$key'\n");
exit(1);
Expand Down Expand Up @@ -185,6 +193,11 @@ function main()
$logger->info("Using today '$endDate' for end date");
}

if($datatype !== false && $datatypeValue === null){
$logger->crit("You must specify the type of data you want to ingest");
exit(1);
}

$hpcdbDb = DB::factory('hpcdb');
$dwDb = DB::factory('datawarehouse');

Expand Down Expand Up @@ -220,7 +233,7 @@ function main()
$ingest = $aggregate = true;
}
// If any ingestion phase is specified, don't aggregate.
if ($ingestAll || $ingestShredded || $ingestStaging || $ingestHpcdb) {
if ($ingestAll || $ingestShredded || $ingestStaging || $ingestHpcdb || $datatype) {
$aggregate = false;
}

Expand All @@ -229,7 +242,7 @@ function main()
try {

// If no ingestion phase is specified, ingest all.
if (!$ingestShredded && !$ingestStaging && !$ingestHpcdb) {
if (!$ingestShredded && !$ingestStaging && !$ingestHpcdb && !$datatype){
$ingestAll = true;
}

Expand All @@ -248,6 +261,14 @@ function main()
if ($ingestHpcdb) {
$dwi->ingestAllHpcdb($startDate, $endDate);
}

if($datatypeValue == 'openstack'){
$dwi->ingestCloudDataOpenStack();
}

if($datatypeValue == 'genericcloud'){
$dwi->ingestCloudDataGeneric();
}
}
} catch (Exception $e) {
$logger->crit(array(
Expand All @@ -262,7 +283,14 @@ function main()
if ($aggregate) {
$logger->info('Aggregating data');
try {
$dwi->aggregateAllJobs($lastModifiedStartDate);
// If there is no realm specified to aggregate then all realms should be aggregated
if($realmToAggregate == 'job' || $realmToAggregate === false){
$dwi->aggregateAllJobs($lastModifiedStartDate);
}

if($realmToAggregate == 'cloud' || $realmToAggregate === false){
$dwi->aggregateCloudData();
}
} catch (Exception $e) {
$logger->crit(array(
'message' => 'Aggregation failed: ' . $e->getMessage(),
Expand Down
9 changes: 7 additions & 2 deletions bin/xdmod-shredder
Original file line number Diff line number Diff line change
Expand Up @@ -234,11 +234,16 @@ function main()
}

if (!$dryRun) {
$logger->notice('Normalizing data');
$logger->notice('Normalizing data!');

try {
$ingestor = $shredder->getJobIngestor();
$ingestor->ingest();
// The cloud shredders do not have jobs to ingest and return false when
// getJobInestor is called for them so we don't have to hard code skippping
// those formats here.
if($ingestor !== false){
$ingestor->ingest();
}
} catch (Exception $e) {
$logger->crit(array(
'message' => 'Ingestion failed: ' . $e->getMessage(),
Expand Down
50 changes: 50 additions & 0 deletions classes/ETL/Utilities.php
Original file line number Diff line number Diff line change
Expand Up @@ -323,4 +323,54 @@ public static function quoteVariables(array $variables, VariableStore $variableS

return $localVariableMap;
} // quoteVariables()

public static function runEtlPipeline(array $pipelines, $logger, array $params = array())
{
$logger->debug(
sprintf(
'Shredding directory using ETL pipeline "%s" with parameters %s',
implode(', ', $pipelines),
json_encode($params)
)
);

$configOptions = array('default_module_name' => 'xdmod');
if( array_key_exists('variable-overrides', $params) ){
$configOptions['config_variables'] = $params['variable-overrides'];
}

$etlConfig = new EtlConfiguration(
CONFIG_DIR . '/etl/etl.json',
null,
$logger,
$configOptions
);
$etlConfig->initialize();
self::setEtlConfig($etlConfig);

$scriptOptions = array_merge(
array(
'default-module-name' => 'xdmod',
'process-sections' => $pipelines,
),
$params
);
$logger->debug(
sprintf(
'Running ETL pipeline with script options %s',
json_encode($scriptOptions)
)
);

$overseerOptions = new EtlOverseerOptions(
$scriptOptions,
$logger
);

$utilitySchema = $etlConfig->getGlobalEndpoint('utility')->getSchema();
$overseerOptions->setResourceCodeToIdMapSql(sprintf("SELECT id, code from %s.resourcefact", $utilitySchema));

$overseer = new EtlOverseer($overseerOptions, $logger);
$overseer->execute($etlConfig);
} // runEtlPipeline
} // class Utilities
152 changes: 118 additions & 34 deletions classes/OpenXdmod/DataWarehouseInitializer.php
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ public function ingestAll($startDate = null, $endDate = null)
$this->ingestAllShredded($startDate, $endDate);
$this->ingestAllStaging($startDate, $endDate);
$this->ingestAllHpcdb($startDate, $endDate);
$this->ingestCloudDataGeneric();
$this->ingestCloudDataOpenStack();
}

/**
Expand All @@ -136,9 +138,11 @@ public function ingestAll($startDate = null, $endDate = null)
*/
public function ingestAllShredded($startDate = null, $endDate = null)
{
$this->logger->debug('Ingesting shredded data to staging tables');
$this->runEtlPipeline('staging-ingest-common');
$this->runEtlPipeline('staging-ingest-jobs');
if( $this->realmEnabled('Jobs')){
$this->logger->debug('Ingesting shredded data to staging tables');
$this->runEtlPipeline('staging-ingest-common');
$this->runEtlPipeline('staging-ingest-jobs');
}
}

/**
Expand All @@ -149,9 +153,11 @@ public function ingestAllShredded($startDate = null, $endDate = null)
*/
public function ingestAllStaging($startDate = null, $endDate = null)
{
$this->logger->debug('Ingesting staging data to HPCDB');
$this->runEtlPipeline('hpcdb-ingest-common');
$this->runEtlPipeline('hpcdb-ingest-jobs');
if( $this->realmEnabled('Jobs')){
$this->logger->debug('Ingesting staging data to HPCDB');
$this->runEtlPipeline('hpcdb-ingest-common');
$this->runEtlPipeline('hpcdb-ingest-jobs');
}
}

/**
Expand All @@ -162,33 +168,98 @@ public function ingestAllStaging($startDate = null, $endDate = null)
*/
public function ingestAllHpcdb($startDate = null, $endDate = null)
{
$this->logger->debug('Ingesting HPCDB data to modw');

if ($startDate !== null || $endDate !== null) {
$params = array();
if ($startDate !== null) {
$params['start-date'] = $startDate . ' 00:00:00';
}
if ($endDate !== null) {
$params['end-date'] = $endDate . ' 23:59:59';
if( $this->realmEnabled('Jobs')){
$this->logger->debug('Ingesting HPCDB data to modw');

if ($startDate !== null || $endDate !== null) {
$params = array();
if ($startDate !== null) {
$params['start-date'] = $startDate . ' 00:00:00';
}
if ($endDate !== null) {
$params['end-date'] = $endDate . ' 23:59:59';
}
$this->runEtlPipeline(
'hpcdb-prep-xdw-job-ingest-by-date-range',
$params
);
} else {
$this->runEtlPipeline('hpcdb-prep-xdw-job-ingest-by-new-jobs');
}

// Use current time from the database in case clocks are not
// synchronized.
$lastModifiedStartDate
= $this->hpcdbDb->query('SELECT NOW() AS now FROM dual')[0]['now'];

$this->runEtlPipeline(
'hpcdb-prep-xdw-job-ingest-by-date-range',
$params
'hpcdb-xdw-ingest',
array('last-modified-start-date' => $lastModifiedStartDate)
);
} else {
$this->runEtlPipeline('hpcdb-prep-xdw-job-ingest-by-new-jobs');
}
}

// Use current time from the database in case clocks are not
// synchronized.
$lastModifiedStartDate
= $this->hpcdbDb->query('SELECT NOW() AS now FROM dual')[0]['now'];
/**
* Extracting openstack data from the openstack_raw_events table. If the raw
* tables do not exist then catch the resulting exception and display a message
* saying that there is no OpenStack data to ingest.
*/
public function ingestCloudDataOpenStack()
{
if( $this->realmEnabled('Cloud') ){
try{
$this->logger->notice('Ingesting OpenStack event log data');
$this->runEtlPipeline('jobs-cloud-extract-openstack');
}
catch( Exception $e ){
if( $e->getCode() == 1146 ){
$this->logger->notice('No OpenStack events to ingest');
}
else{
throw $e;
}
}
}
}

$this->runEtlPipeline(
'hpcdb-xdw-ingest',
array('last-modified-start-date' => $lastModifiedStartDate)
);
/**
* Extracting cloud log data from the generic_raw_events table. If the raw
* tables do not exist then catch the resulting exception and display a message
* saying that there is no generic cloud data to ingest.
*/
public function ingestCloudDataGeneric()
{
if( $this->realmEnabled('Cloud') ){
try{
$this->logger->notice('Ingesting generic cloud log files');
$this->runEtlPipeline('jobs-cloud-extract-eucalyptus');
}
catch( Exception $e ){
if( $e->getCode() == 1146 ){
$this->logger->notice('No cloud event data to ingest');
}
else{
throw $e;
}
}
}
}

/**
* Aggregating all cloud data. If the appropriate tables do not exist then
* catch the resulting exception and display a message saying that there
* is no cloud data to aggregate and cloud aggregation is being skipped.
*/
public function aggregateCloudData()
{
if( $this->realmEnabled('Cloud') ){
$this->logger->notice('Aggregating Cloud data');
$this->runEtlPipeline('cloud-state-pipeline');

$filterListBuilder = new FilterListBuilder();
$filterListBuilder->setLogger($this->logger);
$filterListBuilder->buildRealmLists('Cloud');
}
}

/**
Expand Down Expand Up @@ -217,13 +288,15 @@ public function initializeAggregation($startDate = null, $endDate = null)
*/
public function aggregateAllJobs($lastModifiedStartDate)
{
$this->runEtlPipeline(
'jobs-xdw-aggregate',
array('last-modified-start-date' => $lastModifiedStartDate)
);
$filterListBuilder = new FilterListBuilder();
$filterListBuilder->setLogger($this->logger);
$filterListBuilder->buildRealmLists('Jobs');
if( $this->realmEnabled('Jobs') ){
$this->runEtlPipeline(
'jobs-xdw-aggregate',
array('last-modified-start-date' => $lastModifiedStartDate)
);
$filterListBuilder = new FilterListBuilder();
$filterListBuilder->setLogger($this->logger);
$filterListBuilder->buildRealmLists('Jobs');
}
}

/**
Expand Down Expand Up @@ -275,6 +348,17 @@ public function aggregate(
));
}

/**
* Check to see if a realm exists in the realms table
*
* @param string $realm The realm you are checking to see if exists
*/
private function realmEnabled($realm)
{
$realms = $this->warehouseDb->query("SELECT * FROM moddb.realms WHERE display = :realm", [':realm' => $realm]);
return (count($realms) > 0);
}

/**
* Run an ETL pipeline.
*
Expand Down
Loading

0 comments on commit 0febec4

Please sign in to comment.