Skip to content

Commit

Permalink
checkpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
ryanrath authored and Ryan Rathsam committed Jan 15, 2021
1 parent 95b013c commit 9c018bc
Showing 1 changed file with 99 additions and 15 deletions.
114 changes: 99 additions & 15 deletions classes/Rest/Controllers/ETLControllerProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace Rest\Controllers;

use ArrayObject;
use CCR\Json;
use Configuration\Configuration;
use ETL\Configuration\EtlConfiguration;
Expand Down Expand Up @@ -41,13 +42,17 @@ public function setupRoutes(Application $app, ControllerCollection $controller)

$controller->get("$root/pipelines/actions", "$class::getActionsForPipelines");
$controller->get("$root/pipelines/{pipeline}/actions", "$class::getActionsForPipeline");
$controller->get("$root/pipelines/{pipeline}/endpoints", "$class::getEndpointsForPipeline");

$controller->get("$root/pipelines", "$class::getPipelines");
$controller->post("$root/pipelines", "$class::getPipelines");

$controller->get("$root/files", "$class::getFileNames");
$controller->post("$root/files", "$class::getFileNames");

$controller->get("$root/endpoints", "$class::getDataEndpoints");


}

/**
Expand All @@ -58,8 +63,6 @@ public function setupRoutes(Application $app, ControllerCollection $controller)
*/
public function getPipelines(Request $request, Application $app)
{
$this->authorize($request, array(ROLE_ID_MANAGER));

$etlConfig = $this->retrieveETLConfig();

$pipelineNames = $etlConfig->getSectionNames();
Expand Down Expand Up @@ -150,6 +153,13 @@ public function getDataEndpoints(Request $request, Application $app)

$etlConfig = $this->retrieveETLConfig();

$pipelineNames = $etlConfig->getSectionNames();
sort($pipelineNames);

foreach($pipelineNames as $pipelineName) {
$results[$pipelineName] = $this->getPipelineEndpoints($pipelineName);
}

return $app->json(
array(
'results' => $results
Expand Down Expand Up @@ -284,14 +294,47 @@ public function getActionsForPipelines(Request $request, Application $app)

public function getActionsForPipeline(Request $request, Application $app, $pipeline)
{
$results = $this->getPipelineActions($pipeline);
return $app->json(
$this->getPipelineActions($pipeline)
);
}

public function getEndpointsForPipeline(Request $request, Application $app, $pipeline)
{
$flattened = $request->get('flatten', false);

$endpoints = $this->getPipelineEndpoints($pipeline);

return $app->json(
$results
$flattened ? $this->flattenEndpoints($endpoints) : $endpoints
);
}

private function getPipelineEndpoints($pipeline)
{
$results = array();
list($actions, $endpoints) = $this->getPipelineActionsAndEndpoints($pipeline);

foreach($endpoints as $key => $value) {
if (!isset($results[$value['type']])) {
$results[$value['type']] = array(
'name'=> $value['type'],
'endpoints' => array()
);
}
$results[$value['type']]['endpoints'][] = $value;
}
return array_values($results);
}

private function getPipelineActions($pipeline)
{
list($actions, $endpoints) = $this->getPipelineActionsAndEndpoints($pipeline);

return $actions;
}

private function getPipelineActionsAndEndpoints($pipeline)
{
$configOptions = array('default_module_name' => 'xdmod');
$configOptions['config_variables'] = array(
Expand Down Expand Up @@ -327,9 +370,7 @@ private function getPipelineActions($pipeline)

$actions = $overseer->verifySections($etlConfig, array($pipeline));

list($results, $endpoints) = $this->parseActions(json_decode(json_encode($actions)), $etlConfig);

return $results;
return $this->parseActions(json_decode(json_encode($actions)), $etlConfig);
}

/**
Expand Down Expand Up @@ -361,13 +402,6 @@ function ($carry, $item) {
$source = json_decode(json_encode($sourceEndpoint), true);
$destination = json_decode(json_encode($destinationEndpoint), true);

if (!array_key_exists($source->key, $endpoints)) {
$endpoints[$source->key] = $source;
}
if (!array_key_exists($destination->key, $endpoints)) {
$endpoints[$destination->key] = $destination;
}

switch ($configClass) {
case "DatabaseIngestor":
case "JobListAggregator":
Expand All @@ -378,7 +412,7 @@ function ($carry, $item) {
$sourceTables = array_reduce(
$parsed->source_query->joins,
function ($carry, $item) {
$carry[$item->alias] = $item->name;
$carry[] = $item->name;
return $carry;
},
array()
Expand All @@ -397,10 +431,27 @@ function ($carry, $item) {
$destinationTables = array_keys(get_object_vars($action->etl_destination_table_list));
$destination['tables'] = $destinationTables;
break;
case "StructuredFileIngestor":
$parsed = $action->parsed_definition_file;

$destination['tables'] = array_keys(get_object_vars($parsed->destination_record_map));
break;
default:
break;
}

if (!array_key_exists($source['key'], $endpoints)) {
if (isset($source['tables']) && count($source['tables']) > 1) {

} else {
$endpoints[$source['key']] = $source;
}

}

if (!array_key_exists($destination['key'], $endpoints)) {
$endpoints[$destination['key']] = $destination;
}

$results[$pipelineName][] = array(
'name' => $actionName,
Expand Down Expand Up @@ -432,6 +483,9 @@ private function getEndpointData($endpoint)

$result->path = $path;
$result->handlerType = $endpoint->handler->type;
$result->directoryPattern = $endpoint->directory_pattern;
$result->filePattern = $endpoint->file_pattern;
$result->recursionDepth = $endpoint->recursion_depth;
break;
case "configurationfile":
case "file":
Expand All @@ -458,4 +512,34 @@ private function getEndpointData($endpoint)

return $result;
}

/**
* @param array $typedEndpoints
*
* @return array
*/
private function flattenEndpoints($typedEndpoints)
{
$results = array();

foreach($typedEndpoints as $typedEndpoint) {
if (in_array($typedEndpoint['name'], array('mysql', 'oracle', 'postgres'))) {
$tableEndpoints = array();
$endpoints = $typedEndpoint['endpoints'];
foreach($endpoints as $endpoint) {
$endpointObject =new ArrayObject($endpoint) ;
$tableEndpoint = $endpointObject->getArrayCopy();
unset($tableEndpoint['tables']);
foreach($endpoint['tables'] as $table) {
$tableEndpoint['table'] = $table;
$tableEndpoints[$table] = $tableEndpoint;
}
}
$typedEndpoint['endpoints'] = array_values($tableEndpoints);
}
$results[] = $typedEndpoint;
}

return $results;
}
}

0 comments on commit 9c018bc

Please sign in to comment.