Skip to content

Commit

Permalink
creating HACC pipeline for the modern analysis
Browse files Browse the repository at this point in the history
  • Loading branch information
zetwal committed Apr 8, 2020
1 parent fe18a19 commit c41335c
Show file tree
Hide file tree
Showing 5 changed files with 337 additions and 324 deletions.
46 changes: 25 additions & 21 deletions CBench/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,29 +72,40 @@ int main(int argc, char *argv[])

//
// Load in the global input parameters
std::string inputFilename = jsonInput["input"]["filename"];


// timesteps
// file timesteps
std::string inputFilename = "";
int minTimestep = 0;
int maxTimestep = 1;
if (jsonInput["input"].contains("timesteps"))
std::vector<std::string> filenameTs;
if (jsonInput["input"].contains("timesteps")) // files in order
{
minTimestep = jsonInput["input"]["timesteps"][0]; // inclusive
maxTimestep = jsonInput["input"]["timesteps"][1]; // exclusive

inputFilename = jsonInput["input"]["filename"];
}
else if (jsonInput["input"].contains("filename-timesteps")) // arbitrary file names
{
maxTimestep = jsonInput["input"]["filename-timesteps"].size();
for (int i=0; i<maxTimestep; i++)
filenameTs.push_back( jsonInput["input"]["filename-timesteps"][i] );
}
else
inputFilename = jsonInput["input"]["filename"]; // single timestep

int numTimesteps = maxTimestep - minTimestep;


// write out decompressed files + output name
bool writeData = false;
std::string outputFilename = "";
if (numTimesteps == 1)
if (jsonInput["data-reduction"]["cbench-output"].contains("output-decompressed"))
{
writeData = jsonInput["data-reduction"]["cbench-output"]["output-decompressed"];
if (writeData)
outputFilename = extractFileName(inputFilename);


// Initial a random number in case output name is not provided
srand(time(NULL));
}
Expand Down Expand Up @@ -167,10 +178,13 @@ int main(int argc, char *argv[])
// Get filename
fileToLoad = inputFilename;
if (numTimesteps > 1)
{
std::string tempStr = inputFilename;
fileToLoad = tempStr.replace( tempStr.find("%"), tempStr.find("%")+1, strConvert::toStr(ts) );
}
if (filenameTs.size() > 0)
fileToLoad = filenameTs[ts];
else
{
std::string tempStr = inputFilename;
fileToLoad = tempStr.replace( tempStr.find("%"), tempStr.find("%")+1, strConvert::toStr(ts) );
}

metricsInfo << "Input file: " << fileToLoad << std::endl;
if (myRank == 0)
Expand Down Expand Up @@ -412,9 +426,6 @@ int main(int argc, char *argv[])
double compress_time = clock.getDuration("compress");
double decompress_time = clock.getDuration("decompress");

debugLog << "compress_time: " << compress_time << std::endl;
debugLog << "decompress_time: " << decompress_time << std::endl;

double compress_throughput = ((double) (ioMgr->getNumElements() * ioMgr->getTypeSize()) / (1024.0 * 1024.0)) / compress_time; // MB/s
double decompress_throughput = ((double) (ioMgr->getNumElements() * ioMgr->getTypeSize()) / (1024.0 * 1024.0)) / decompress_time; // MB/s

Expand Down Expand Up @@ -522,14 +533,7 @@ int main(int argc, char *argv[])
else
decompressedOutputName = "__" + compressorMgr->getCompressorName() + "_" + std::to_string(rand());

// deal with timesteps
if (outputFilename.find("%") != std::string::npos)
{
std::string tempStr = outputFilename;
fileToOutput = tempStr.replace( outputFilename.find("%"), outputFilename.find("%")+1, strConvert::toStr(ts) );
}
else
fileToOutput = outputFilename;
fileToOutput = extractFileName(fileToLoad);

// path for folders
if (outputPath != ".")
Expand Down
33 changes: 1 addition & 32 deletions Workflow/cfdns/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,38 +82,7 @@ def add_data_reduction_jobs(self):
#self.add_cbench_job(dependency="single", filters="preprocess")

self.add_cbench_job()
self.postprocess_cbench()


# Re-write the json data to include the analysis; ["pat"]["analysis"]
def create_analysis_input(self):

if "analysis-results" in self.json_data["input"]:
analysis_path = self.json_data["input"]["analysis-results"]
else:
analysis_path = self.json_data["project-home"] + self.json_data['wflow-path']

# Remove all entries if any
self.json_data['pat']['analysis'].clear()


# Add analysis entries
for ana in self.json_data['pat']['analysis-tool']['analytics']:
for item in ana['type']:
json_item = {
"title" : ana['name'] + "_" + item,
"files" : []
}

for inputItem in self.json_data['pat']['input-files']:
input_item = {
'name' : inputItem["output-prefix"],
'path' : analysis_path + "/" + ana['name'] + "/" + inputItem['output-prefix'] + item + ana['postfix']
}

json_item['files'].append(input_item)

self.json_data['pat']['analysis'].append(json_item)
#self.postprocess_cbench()


# Create the analysis job; run gimlet
Expand Down
18 changes: 9 additions & 9 deletions Workflow/hacc/cinema.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,21 @@
from pat.utils import job as j


class PATCinema(cinema.CinemaWorkflow):
class HACCCinema(cinema.CinemaWorkflow):

def prepare_cinema(self):

# Open CSV file
if "metrics-csv" in self.json_data["input"]:
metrics_csv = self.json_data["input"]["metrics-csv"]
if "metrics-file" in self.json_data['data-reduction']['cbench-output']:
metrics_csv = self.json_data['data-reduction']['cbench-output']['metrics-file']
else:
metrics_csv = self.json_data["project-home"] + self.json_data['wflow-path'] + "/cbench/" + self.json_data['cbench']['output']['metrics-file'] + ".csv"
metrics_csv = self.json_data['project-home'] + self.json_data['wflow-path'] + "/reduction/cbench/" + metrics_csv + ".csv"

print(metrics_csv)

# get list of all images
image_data = {}
for sec in self.json_data["pat"]["analysis"]:
for sec in self.json_data["analysis"]["output-files"]:
col_name = sec["output-column"]
prefix = sec["output-prefix"]
path = sec["path"]
Expand Down Expand Up @@ -127,18 +127,18 @@ def prepare_cinema(self):


# parse Input
parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawTextHelpFormatter)
parser = argparse.ArgumentParser()
parser.add_argument("--input-file", required=True)
parser.add_argument("--output-file", default="results.cdb")
opts = parser.parse_args()

# load data
cinema = PATCinema(opts.input_file)

# create directory
if not os.path.exists(opts.output_file):
os.mkdir(opts.output_file)
os.chdir(opts.output_file)

# generate plots

# Create Cinema DB
cinema = HACCCinema(opts.input_file)
cinema.prepare_cinema()
Loading

0 comments on commit c41335c

Please sign in to comment.