Skip to content

Commit

Permalink
Add codeql analysis (#25)
Browse files Browse the repository at this point in the history
* [GOBBLIN-1921] Properly handle reminder events (apache#3790)

* Add millisecond level precision to timestamp cols & proper timezone conversion

	- existing tests pass with minor modifications

* Handle reminder events properly

* Fix compilation errors & add isReminder flag

* Add unit tests

* Address review comments

* Add newline to address comment

* Include reminder/original tag in logging

* Clarify timezone issues in comment

---------

Co-authored-by: Urmi Mustafi <umustafi@linkedin.com>

* [GOBBLIN-1924] Reminder event flag true (apache#3795)

* Set reminder event flag to true for reminders

* Update unit tests

* remove unused variable

---------

Co-authored-by: Urmi Mustafi <umustafi@linkedin.com>

* [GOBBLIN-1923] Add retention for lease arbiter table (apache#3792)

* Add retention for lease arbiter table

* Replace blocking thread with scheduled thread pool executor

* Make Calendar instance thread-safe

* Rename variables, make values more clear

* Update timestamp related cols

---------

Co-authored-by: Urmi Mustafi <umustafi@linkedin.com>

* Change debug statements to info temporarily to debug (apache#3796)

Co-authored-by: Urmi Mustafi <umustafi@linkedin.com>

* [GOBBLIN-1926] Fix Reminder Event Epsilon Comparison (apache#3797)

* Fix Reminder Event Epsilon Comparison

* Add TODO comment

---------

Co-authored-by: Urmi Mustafi <umustafi@linkedin.com>

* [GOBBLIN-1930] Improve Multi-active related logs and metrics (apache#3800)

* Improve Multi-active related logs and metrics

* Add more metrics and logs around forwarding dag action to DagManager

* Improve logs in response to review comments

* Replace flow execution id with trigger timestamp from multi-active

* Update flow action execution id within lease arbiter

* Fix test & make Lease Statuses more lean

* Update javadoc

---------

Co-authored-by: Urmi Mustafi <umustafi@linkedin.com>

* add dataset root in some common type of datasets

* [GOBBLIN-1927] Add topic validation support in KafkaSource, and add TopicNameValidator (apache#3793)

* * Add generic topic validation support
* Add the first validator TopicNameValidator into the validator chain, as a refactor of existing codes

* Refine to address comments

* Refine

---------

Co-authored-by: Tao Qin <tqin@linkedin.com>

* [GOBBLIN-1931] Refactor dag action updating method & add clarifying comment (apache#3801)

* Refactor dag action updating method & add clarifying comment

* Log filtered out duplicate messages

* logs and metrics for missing messages from change monitor

* Only add gobblin.service prefix for dagActionStoreChangeMonitor

---------

Co-authored-by: Urmi Mustafi <umustafi@linkedin.com>

* [GOBBLIN-1934] Monitor High Level Consumer queue size (apache#3805)

* Emit metrics to monitor high level consumer queue size

* Empty commit to trigger tests

* Use BlockingQueue.size() func instead of atomic integer array

* Remove unused import & add DagActionChangeMonitor prefix to metric

* Refactor to avoid repeating code

* Make protected variables private where possible

* Fix white space

---------

Co-authored-by: Urmi Mustafi <umustafi@linkedin.com>

* [GOBBLIN-1935] Skip null dag action types unable to be processed (apache#3807)

* Skip over null dag actions from malformed messages

* Add new metric for skipped messages

---------

Co-authored-by: Urmi Mustafi <umustafi@linkedin.com>

* [GOBBLIN-1922]Add function in Kafka Source to recompute workUnits for filtered partitions (apache#3798)

* add function in Kafka Source to recompute workUnits for filtered partitions

* address comments

* set default min container value to 1

* add condition when create empty wu

* update the condition

* Expose functions to fetch record partitionColumn value (apache#3810)

* [GOBBLIN-1938] preserve x bit in manifest file based copy (apache#3804)

* preserve x bit in manifest file based copy
* fix project structure preventing running unit tests from intellij
* fix unit test

* [GOBBLIN-1919] Simplify a few elements of MR-related job exec before reusing code in Temporal-based execution (apache#3784)

* Simplify a few elements of MR-related job exec before reusing code in Temporal-based execution

* Add JSON-ification to several foundational config-state representations, plus encapsulated convience method `JobState.getJobIdFromProps`

* Update javadoc comments

* Encapsulate check for whether a path has the extension of a multi-work-unit

* [GOBBLIN-1939] Bump AWS version to use a compatible version of Jackson with Gobblin (apache#3809)

* Bump AWS version to use a compatible version of jackson with Gobblin

* use shared aws version

* [GOBBLIN-1937] Quantify Missed Work Completed by Reminders (apache#3808)

* Quantify Missed Work Completed by Reminders
   Also fix bug to filter out heartbeat events before extracting field

* Refactor changeMonitorUtils & add delimiter to metrics prefix

* Re-order params to group similar ones

---------

Co-authored-by: Urmi Mustafi <umustafi@linkedin.com>

* GOBBLIN-1933]Change the logic in completeness verifier to support multi reference tier (apache#3806)

* address comments

* use connectionmanager when httpclient is not cloesable

* [GOBBLIN-1933] Change the logic in completeness verifier to support multi reference tier

* add uite test

* fix typo

* change the javadoc

* change the javadoc

---------

Co-authored-by: Zihan Li <zihli@zihli-mn2.linkedin.biz>

* [GOBBLIN-1943] Use AWS version 1.12.261 to fix a security vulnerability in the previous version (apache#3813)

* [GOBBLIN-1941] Develop Temporal abstractions, including `Workload` for workflows of unbounded size through sub-workflow nesting (apache#3811)

* Define `Workload` abstraction for Temporal workflows of unbounded size through sub-workflow nesting

* Adjust Gobblin-Temporal configurability for consistency and abstraction

* Define `WorkerConfig`, to pass the `TemporalWorker`'s configuration to the workflows and activities it hosts

* Improve javadoc

* Javadoc fixup

* Minor changes

* Update per review suggestions

* Insert pause, to spread the load on the temporal server, before launch of each child workflow that may have direct leaves of its own

* Appease findbugs by having `SeqSliceBackedWorkSpan::next` throw `NoSuchElementException`

* Add comment

* [GOBBLIN-1944] Add gobblin-temporal load generator for a single subsuming super-workflow with a configurable number of activities nested beneath (apache#3815)

* Add gobblin-temporal load generator for a single subsuming super-workflow with a configurable number of activities nested beneath

* Update per findbugs advice

* Improve processing of int props

* [GOBBLIN-1945] Implement Distributed Data Movement (DDM) Gobblin-on-Temporal `WorkUnit` evaluation (apache#3816)

* Implement Distributed Data Movement (DDM) Gobblin-on-Temporal `WorkUnit` evaluation

* Adjust work unit processing tuning for start-to-close timeout and nested execution branching

* Rework `ProcessWorkUnitImpl` and fix `FileSystem` misuse; plus convenience abstractions to load `FileSystem`, `JobState`, and `StateStore<TaskState>`

* Fix `FileSystem` resource lifecycle, uniquely name each workflow, and drastically reduce worker concurrent task execution

* Heed findbugs advice

* prep before commit

* Improve processing of required props

* Update comment in response to PR feedback

* [GOBBLIN-1942] Create MySQL util class for re-usable methods and setup MysqlDagActio… (apache#3812)

* Create MySQL util class for re-usable methods and setup MysqlDagActionStore retention

* Add a java doc

* Address review comments

* Close scheduled executors on shutdown & clarify naming and comments

* Remove extra period making config key invalid

* implement Closeable

* Use try with resources

---------

Co-authored-by: Urmi Mustafi <umustafi@linkedin.com>

* [Hotfix][GOBBLIN-1949] add option to detect malformed orc during commit (apache#3818)

* add option to detect malformed ORC during commit phase

* better logging

* address comment

* catch more generic exception

* validate ORC file after close

* move validate in between close and commit

* syntax

* whitespace

* update log

* [GOBBLIN-1948] Use same flowExecutionId across participants (apache#3819)

* Use same flowExecutionId across participants
* Set config field as well in new FlowSpec
* Use gobblin util to create config
* Rename function and move to util
---------
Co-authored-by: Urmi Mustafi <umustafi@linkedin.com>

* Allow extension of functions in GobblinMCEPublisher and customization of fileList file metrics are calculated for (apache#3820)

* [GOBBLIN-1951] Emit GTE when deleting corrupted ORC files (apache#3821)

* [GOBBLIN-1951] Emit GTE when deleting corrupted ORC files

This commit adds ORC file validation during the commit phase and deletes
corrupted files. It also includes a test for ORC file validation.

* Linter fixes

* Add framework and unit tests for DagActionStoreChangeMonitor (apache#3817)

* Add framework and unit tests for DagActionStoreChangeMonitor

* Add more test cases and validation

* Add header for new file

* Move FlowSpec static function to Utils class

* Remove unused import

* Fix compile error

* Fix unit tests

---------

Co-authored-by: Urmi Mustafi <umustafi@linkedin.com>

* [GOBBLIN-1952] Make jobname shortening in GaaS more aggressive (apache#3822)

* Make jobname shortening in GaaS more aggressive

* Change long name prefix to flowgroup

* Make KafkaTopicGroupingWorkUnitPacker pack with desired num of container (apache#3814)

* Make KafkaTopicGroupingWorkUnitPacker pack with desired num of container

* update comment

* [GOBBLIN-1953] Add an exception message to orc writer validation GTE (apache#3826)

* Fix FlowSpec Updating Function (apache#3823)

* Fix FlowSpec Updating Function
   * makes Config object with FlowSpec mutable
   * adds unit test to ensure flow compiles after updating FlowSpec
   * ensure DagManager resilient to exceptions on leadership change

* Only update Properties obj not Config to avoid GC overhead

* Address findbugs error

* Avoid updating or creating new FlowSpec objects by passing flowExecutionId directly to metadata

* Remove changes that are not needed anymore

* Add TODO to handle failed DagManager leadership change

* Overload function and add more documentation

---------

Co-authored-by: Urmi Mustafi <umustafi@linkedin.com>

* Emit metric to tune LeaseArbiter Linger metric  (apache#3824)

* Monitor number of failed persisting leases to tune linger

* Increase default linger and epsilon values

* Add metric for lease persisting success

* Rename metrics

---------

Co-authored-by: Urmi Mustafi <umustafi@linkedin.com>

* [GOBBLIN-1956]Make Kafka streaming pipeline be able to config the max poll records during runtime (apache#3827)

* address comments

* use connectionmanager when httpclient is not cloesable

* add uite test

* fix typo

* [GOBBLIN-1956] Make Kafka streaming pipeline be able to config the max poll records during runtime

* small refractor

---------

Co-authored-by: Zihan Li <zihli@zihli-mn2.linkedin.biz>

* Add semantics for failure on partial success (apache#3831)

* Consistly handle Rest.li /flowexecutions KILL and RESUME actions (apache#3830)

* [GOBBLIN-1957] GobblinOrcwriter improvements for large records (apache#3828)

* WIP

* Optimization to limit batchsize based on large record sizes

* Address review

* Use DB-qualified table ID as `IcebergTable` dataset descriptor (apache#3834)

* [GOBBLIN-1961] Allow `IcebergDatasetFinder` to use separate names for source vs. destination-side DB and table (apache#3835)

* Allow `IcebergDatasetFinder` to use separate names for source vs. destination-side DB and table

* Adjust Mockito.verify to pass test

* Prevent NPE in `FlowCompilationValidationHelper.validateAndHandleConcurrentExecution` (apache#3836)

* Prevent NPE in `FlowCompilationValidationHelper.validateAndHandleConcurrentExecution`

* improved `MultiHopFlowCompiler` javadoc

* Delete Launch Action Events After Processing (apache#3837)

* Delete launch action event after persisting

* Fix default value for flowExecutionId retrieval from metadata map

* Address review comments and add unit test

* Code clean up

---------

Co-authored-by: Urmi Mustafi <umustafi@linkedin.com>

* [GOBBLIN-1960] Emit audit count after commit in IcebergMetadataWriter (apache#3833)

* Emit audit count after commit in IcebergMetadataWriter

* Unit tests by extracting to a post commit

* Emit audit count first

* find bugs complaint

* [GOBBLIN-1967] Add external data node for generic ingress/egress on GaaS (apache#3838)

* Add external data node for generic ingress/egress on GaaS

* Address reviews and cleanup

* Use URI representation for external dataset descriptor node

* Fix error message in containing check

* Address review

* [GOBBLIN-1971] Allow `IcebergCatalog` to specify the `DatasetDescriptor` name for the `IcebergTable`s it creates (apache#3842)

* Allow `IcebergCatalog` to specify the `DatasetDescriptor` name for the `IcebergTable`s it creates

* small method javadoc

* [GOBBLIN-1970] Consolidate processing dag actions to one code path (apache#3841)

* Consolidate processing dag actions to one code path

* Delete dag action in failure cases too

* Distinguish metrics for startup

* Refactor to avoid duplicated code and create static metrics proxy class

* Remove DagManager checks that don't apply on startup

* Add test to check kill/resume dag action removal after processing

* Remove unused import

* Initialize metrics proxy with Null Pattern

---------

Co-authored-by: Urmi Mustafi <umustafi@linkedin.com>

* [GOBBLIN-1972] Fix `CopyDataPublisher` to avoid committing post-publish WUs before they've actually run (apache#3844)

* Fix `CopyDataPublisher` to avoid committing post-publish WUs before they've actually run

* fixup findbugsMain

* [GOBBLIN-1975] Keep job.name configuration immutable if specified on GaaS (apache#3847)

* Revert "[GOBBLIN-1952] Make jobname shortening in GaaS more aggressive (apache#3822)"

This reverts commit 5619a0a.

* use configuration to keep specified jobname if enabled

* Cleanup

* [GOBBLIN-1974] Ensure Adhoc Flows can be Executed in Multi-active Scheduler state (apache#3846)

* Ensure Adhoc Flows can be Executed in Multi-active Scheduler state

* Only delete spec for adhoc flows & always after orchestration

* Delete adhoc flows when dagManager is not present as well

* Fix flaky test for scheduler

* Add clarifying comment about failure recovery

* Re-ordered private method

* Move private methods again

* Enforce sequential ordering of unit tests to make more reliable

---------

Co-authored-by: Urmi Mustafi <umustafi@linkedin.com>

* [GOBBLIN-1973] Change Manifest distcp logic to compare permissions of source and dest files even when source is older (apache#3845)

* change should copy logic

* Add tests, address review

* Fix checkstyle

* Remove unused imports

* [GOBBLIN-1976] Allow an `IcebergCatalog` to override the `DatasetDescriptor` platform name for the `IcebergTable`s it creates (apache#3848)

* Allow an `IcebergCatalog` to override the `DatasetDescriptor` platform for the `IcebergTable`s it creates

* fixup javadoc

* Log when `PasswordManager` fails to load any master password (apache#3849)

* [GOBBLIN-1968] Temporal commit step integration (apache#3829)

Add commit step to Gobblin temporal workflow for job publish

* Add codeql analysis

* Make gradle specific

* Add codeql as part of build script

* Initialize codeql

* Use separate workflow for codeql instead with custom build function as autobuild seems to not work

* Add jdk jar for global dependencies script

---------

Co-authored-by: umustafi <umust77@gmail.com>
Co-authored-by: Urmi Mustafi <umustafi@linkedin.com>
Co-authored-by: Arjun <abora@linkedin.com>
Co-authored-by: Tao Qin <35046097+wsarecv@users.noreply.github.com>
Co-authored-by: Tao Qin <tqin@linkedin.com>
Co-authored-by: Hanghang Nate Liu <nate.hanghang.liu@gmail.com>
Co-authored-by: Andy Jiang <andy.jiang99@outlook.com>
Co-authored-by: Kip Kohn <ckohn@linkedin.com>
Co-authored-by: Zihan Li <zihli@linkedin.com>
Co-authored-by: Zihan Li <zihli@zihli-mn2.linkedin.biz>
Co-authored-by: Matthew Ho <mho@linkedin.com>
  • Loading branch information
12 people authored Dec 20, 2023
1 parent 028b85f commit edb3726
Show file tree
Hide file tree
Showing 143 changed files with 6,405 additions and 1,260 deletions.
74 changes: 74 additions & 0 deletions .github/workflows/codeql.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
name: "CodeQL"

on:
push:
branches: [ "master" ]
pull_request:
branches: [ "master" ]
schedule:
- cron: '28 23 * * 4'

jobs:
analyze:
name: Analyze
# Runner size impacts CodeQL analysis time. To learn more, please see:
# - https://gh.io/recommended-hardware-resources-for-running-codeql
# - https://gh.io/supported-runners-and-hardware-resources
# - https://gh.io/using-larger-runners
# Consider using larger runners for possible analysis time improvements.
runs-on: ${{ (matrix.language == 'swift' && 'macos-latest') || 'ubuntu-latest' }}
timeout-minutes: ${{ (matrix.language == 'swift' && 120) || 360 }}
permissions:
actions: read
contents: read
security-events: write

strategy:
fail-fast: false
matrix:
language: [ 'java-kotlin' ]
# CodeQL supports [ 'c-cpp', 'csharp', 'go', 'java-kotlin', 'javascript-typescript', 'python', 'ruby', 'swift' ]
# Use only 'java-kotlin' to analyze code written in Java, Kotlin or both
# Use only 'javascript-typescript' to analyze code written in JavaScript, TypeScript or both
# Learn more about CodeQL language support at https://aka.ms/codeql-docs/language-support

steps:
- name: Set up JDK 1.8
uses: actions/setup-java@v1
with:
java-version: 1.8

- name: Checkout repository
uses: actions/checkout@v4

# Initializes the CodeQL tools for scanning.
- name: Initialize CodeQL
uses: github/codeql-action/init@v3
with:
languages: ${{ matrix.language }}
# If you wish to specify custom queries, you can do so here or in a config file.
# By default, queries listed here will override any specified in a config file.
# Prefix the list here with "+" to use these queries and those in the config file.

# For more details on CodeQL's query packs, refer to: https://docs.github.com/en/code-security/code-scanning/automatically-scanning-your-code-for-vulnerabilities-and-errors/configuring-code-scanning#using-queries-in-ql-packs
# queries: security-extended,security-and-quality

- name: Build repository
run: |
./gradlew --no-daemon clean build -x test -x rat -x javadoc -x findbugsMain -x findbugsTest -x checkstyleMain \
-x checkstyleJmh -x checkstyleTest -x checkstyleMainGeneratedDataTemplate -x checkstyleMainGeneratedRest -Dorg.gradle.parallel=true
# ℹ️ Command-line programs to run using the OS shell.
# 📚 See https://docs.github.com/en/actions/using-workflows/workflow-syntax-for-github-actions#jobsjob_idstepsrun

# If the Autobuild fails above, remove it and uncomment the following three lines.
# modify them (or add more) to build your code if your project, please refer to the EXAMPLE below for guidance.

# - run: |
# echo "Run, Build Application using script"
# ./location_of_script_within_repo/buildscript.sh

- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@v3
with:
category: "/language:${{matrix.language}}"
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,18 @@ public class ConfigurationKeys {
public static final int DEFAULT_LOAD_SPEC_BATCH_SIZE = 500;
public static final String SKIP_SCHEDULING_FLOWS_AFTER_NUM_DAYS = "skip.scheduling.flows.after.num.days";
public static final int DEFAULT_NUM_DAYS_TO_SKIP_AFTER = 365;
// Mysql Dag Action Store configuration
public static final String MYSQL_DAG_ACTION_STORE_PREFIX = "MysqlDagActionStore.";
public static final String MYSQL_DAG_ACTION_STORE_TABLE_RETENTION_PERIOD_SECONDS_KEY = MYSQL_DAG_ACTION_STORE_PREFIX + "retentionPeriodSeconds";
public static final long DEFAULT_MYSQL_DAG_ACTION_STORE_TABLE_RETENTION_PERIOD_SEC_KEY = 3 * 24 * 60 * 60; // (3 days in seconds)
// Scheduler lease determination store configuration
public static final String MYSQL_LEASE_ARBITER_PREFIX = "MysqlMultiActiveLeaseArbiter";
public static final String MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE_KEY = MYSQL_LEASE_ARBITER_PREFIX + ".constantsTable";
public static final String DEFAULT_MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE = "gobblin_multi_active_scheduler_constants_store";
public static final String SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE_KEY = MYSQL_LEASE_ARBITER_PREFIX + ".schedulerLeaseArbiter.store.db.table";
public static final String DEFAULT_SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE = "gobblin_scheduler_lease_determination_store";
public static final String SCHEDULER_LEASE_DETERMINATION_TABLE_RETENTION_PERIOD_MILLIS_KEY = MYSQL_LEASE_ARBITER_PREFIX + ".retentionPeriodMillis";
public static final long DEFAULT_SCHEDULER_LEASE_DETERMINATION_TABLE_RETENTION_PERIOD_MILLIS = 3 * 24 * 60 * 60 * 1000; // (3 days in ms)
// Refers to the event we originally tried to acquire a lease which achieved `consensus` among participants through
// the database
public static final String SCHEDULER_PRESERVED_CONSENSUS_EVENT_TIME_MILLIS_KEY = "preservedConsensusEventTimeMillis";
Expand All @@ -109,13 +115,14 @@ public class ConfigurationKeys {
// Event time of flow action to orchestrate using the multi-active lease arbiter
public static final String ORCHESTRATOR_TRIGGER_EVENT_TIME_MILLIS_KEY = "orchestratorTriggerEventTimeMillis";
public static final String ORCHESTRATOR_TRIGGER_EVENT_TIME_NEVER_SET_VAL = "-1";
public static final String FLOW_IS_REMINDER_EVENT_KEY = "isReminderEvent";
public static final String SCHEDULER_EVENT_EPSILON_MILLIS_KEY = MYSQL_LEASE_ARBITER_PREFIX + ".epsilonMillis";
public static final int DEFAULT_SCHEDULER_EVENT_EPSILON_MILLIS = 5000;
public static final int DEFAULT_SCHEDULER_EVENT_EPSILON_MILLIS = 2000;
// Note: linger should be on the order of seconds even though we measure in millis
public static final String SCHEDULER_EVENT_LINGER_MILLIS_KEY = MYSQL_LEASE_ARBITER_PREFIX + ".lingerMillis";
public static final int DEFAULT_SCHEDULER_EVENT_LINGER_MILLIS = 30000;
public static final int DEFAULT_SCHEDULER_EVENT_LINGER_MILLIS = 90000;
public static final String SCHEDULER_MAX_BACKOFF_MILLIS_KEY = MYSQL_LEASE_ARBITER_PREFIX + ".maxBackoffMillis";
public static final int DEFAULT_SCHEDULER_MAX_BACKOFF_MILLIS = 5000;
public static final int DEFAULT_SCHEDULER_MAX_BACKOFF_MILLIS = 10000;

// Job executor thread pool size
public static final String JOB_EXECUTOR_THREAD_POOL_SIZE_KEY = "jobexecutor.threadpool.size";
Expand Down Expand Up @@ -198,6 +205,8 @@ public class ConfigurationKeys {
public static final String DEFAULT_FORK_OPERATOR_CLASS = "org.apache.gobblin.fork.IdentityForkOperator";
public static final String JOB_COMMIT_POLICY_KEY = "job.commit.policy";
public static final String DEFAULT_JOB_COMMIT_POLICY = "full";

public static final String PARTIAL_FAIL_TASK_FAILS_JOB_COMMIT = "job.commit.partial.fail.task.fails.job.commit";
// If true, commit of different datasets will be performed in parallel
// only turn on if publisher is thread-safe
public static final String PARALLELIZE_DATASET_COMMIT = "job.commit.parallelize";
Expand Down Expand Up @@ -722,7 +731,7 @@ public class ConfigurationKeys {
public static final int DEFAULT_MR_JOB_MAX_MAPPERS = 100;
public static final boolean DEFAULT_MR_JOB_MAPPER_FAILURE_IS_FATAL = false;
public static final boolean DEFAULT_MR_PERSIST_WORK_UNITS_THEN_CANCEL = false;
public static final String DEFAULT_ENABLE_MR_SPECULATIVE_EXECUTION = "false";
public static final boolean DEFAULT_ENABLE_MR_SPECULATIVE_EXECUTION = false;

/**
* Configuration properties used by the distributed job launcher.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public class State implements WritableShim {

private static final Joiner LIST_JOINER = Joiner.on(",");
private static final Splitter LIST_SPLITTER = Splitter.on(",").trimResults().omitEmptyStrings();
private static final JsonParser JSON_PARSER = new JsonParser();

private String id;

Expand All @@ -62,8 +63,6 @@ public class State implements WritableShim {
@Getter
private Properties specProperties;

private final JsonParser jsonParser = new JsonParser();

public State() {
this.specProperties = new Properties();
this.commonProperties = new Properties();
Expand Down Expand Up @@ -476,7 +475,7 @@ public boolean getPropAsBoolean(String key, boolean def) {
* @return {@link JsonArray} value associated with the key
*/
public JsonArray getPropAsJsonArray(String key) {
JsonElement jsonElement = this.jsonParser.parse(getProp(key));
JsonElement jsonElement = this.JSON_PARSER.parse(getProp(key));
Preconditions.checkArgument(jsonElement.isJsonArray(),
"Value for key " + key + " is malformed, it must be a JsonArray: " + jsonElement);
return jsonElement.getAsJsonArray();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ private List<TextEncryptor> getEncryptors(CachedInstanceKey cacheKey) {
try (Closer closer = Closer.create()) {
if (!fs.exists(currentMasterPasswordFile) ||
fs.getFileStatus(currentMasterPasswordFile).isDirectory()) {
LOG.warn("Master password path '" + currentMasterPasswordFile + "' not a FileSystem file.");
continue;
}
InputStream in = closer.register(fs.open(currentMasterPasswordFile));
Expand All @@ -124,18 +125,23 @@ private List<TextEncryptor> getEncryptors(CachedInstanceKey cacheKey) {
suffix = "." + String.valueOf(i);
} catch (FileNotFoundException fnf) {
// It is ok for password files not being present
LOG.warn("Master password file " + currentMasterPasswordFile + " not found.");
LOG.warn("Master password file '" + currentMasterPasswordFile + "' not found.");
} catch (IOException ioe) {
exception = ioe;
LOG.warn("Master password could not be read from file " + currentMasterPasswordFile);
LOG.warn("Master password file could not be read from '" + currentMasterPasswordFile + "'");
} catch (Exception e) {
LOG.warn("Encryptor could not be instantiated.");
LOG.warn("Encryptor could not be instantiated using file '" + currentMasterPasswordFile + "'.", e);
}
} while (i++ < numOfEncryptionKeys);

// Throw exception if could not read any existing password file
if (encryptors.size() < 1 && exception != null) {
throw new RuntimeException("Master Password could not be read from any master password file.", exception);
if (encryptors.size() < 1) {
if (exception != null) {
throw new RuntimeException("Master password could not be read from any master password file.", exception);
} else {
// TODO: determine whether to always throw whenever no encryptors, despite `exception == null`! (for now, at least give notice by logging)
LOG.error("No master password loaded, despite " + numOfEncryptionKeys + " encryption keys!");
}
}
return encryptors;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ public MultiWorkUnit() {
super();
}

@Override
public boolean isMultiWorkUnit() {
return true;
}

/**
* Get an immutable list of {@link WorkUnit}s wrapped by this {@link MultiWorkUnit}.
*
Expand Down Expand Up @@ -155,4 +160,15 @@ public int hashCode() {
public static MultiWorkUnit createEmpty() {
return new MultiWorkUnit();
}

/**
* Create a new {@link MultiWorkUnit} instance based on provided collection of {@link WorkUnit}s.
*
* @return a the {@link MultiWorkUnit} instance with the provided collection of {@link WorkUnit}s.
*/
public static MultiWorkUnit createMultiWorkUnit(Collection<WorkUnit> workUnits) {
MultiWorkUnit multiWorkUnit = new MultiWorkUnit();
multiWorkUnit.addWorkUnits(workUnits);
return multiWorkUnit;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.StringWriter;

import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
import com.google.gson.stream.JsonWriter;

import org.apache.gobblin.source.extractor.Extractor;
import org.apache.gobblin.source.extractor.Watermark;
Expand Down Expand Up @@ -134,6 +136,11 @@ public WorkUnit(WorkUnit other) {
this.extract = other.getExtract();
}

/** @return whether a multi-work-unit (or else a singular one) */
public boolean isMultiWorkUnit() {
return false; // more efficient than `this instanceof MultiWorkUnit` plus no circular dependency
}

/**
* Factory method.
*
Expand Down Expand Up @@ -365,6 +372,57 @@ public int hashCode() {
return result;
}

/** @return pretty-printed JSON, including all properties */
public String toJsonString() {
StringWriter stringWriter = new StringWriter();
try (JsonWriter jsonWriter = new JsonWriter(stringWriter)) {
jsonWriter.setIndent("\t");
this.toJson(jsonWriter);
} catch (IOException ioe) {
// Ignored
}
return stringWriter.toString();
}

public void toJson(JsonWriter jsonWriter) throws IOException {
jsonWriter.beginObject();

jsonWriter.name("id").value(this.getId());
jsonWriter.name("properties");
jsonWriter.beginObject();
for (String key : this.getPropertyNames()) {
jsonWriter.name(key).value(this.getProp(key));
}
jsonWriter.endObject();

jsonWriter.name("extract");
jsonWriter.beginObject();
jsonWriter.name("extractId").value(this.getExtract().getId());
jsonWriter.name("extractProperties");
jsonWriter.beginObject();
for (String key : this.getExtract().getPropertyNames()) {
jsonWriter.name(key).value(this.getExtract().getProp(key));
}
jsonWriter.endObject();

State prevTableState = this.getExtract().getPreviousTableState();
if (prevTableState != null) {
jsonWriter.name("extractPrevTableState");
jsonWriter.beginObject();
jsonWriter.name("prevStateId").value(prevTableState.getId());
jsonWriter.name("prevStateProperties");
jsonWriter.beginObject();
for (String key : prevTableState.getPropertyNames()) {
jsonWriter.name(key).value(prevTableState.getProp(key));
}
jsonWriter.endObject();
jsonWriter.endObject();
}
jsonWriter.endObject();

jsonWriter.endObject();
}

public String getOutputFilePath() {
// Search for the properties in the workunit.
// This search for the property first in State and then in the Extract of this workunit.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@
import org.apache.gobblin.runtime.TaskStateCollectorService;
import org.apache.gobblin.runtime.listeners.JobListener;
import org.apache.gobblin.runtime.util.StateStores;
import org.apache.gobblin.source.workunit.MultiWorkUnit;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.JobLauncherUtils;
Expand Down Expand Up @@ -110,8 +109,6 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {

private static final Logger LOGGER = LoggerFactory.getLogger(GobblinHelixJobLauncher.class);

private static final String WORK_UNIT_FILE_EXTENSION = ".wu";

private final HelixManager helixManager;
private final TaskDriver helixTaskDriver;
private final String helixWorkFlowName;
Expand Down Expand Up @@ -345,7 +342,7 @@ JobConfig.Builder createHelixJob(List<WorkUnit> workUnits) throws IOException {
try (ParallelRunner stateSerDeRunner = new ParallelRunner(this.stateSerDeRunnerThreads, this.fs)) {
int multiTaskIdSequence = 0;
for (WorkUnit workUnit : workUnits) {
if (workUnit instanceof MultiWorkUnit) {
if (workUnit.isMultiWorkUnit()) {
workUnit.setId(JobLauncherUtils.newMultiTaskId(this.jobContext.getJobId(), multiTaskIdSequence++));
}
addWorkUnit(workUnit, stateSerDeRunner, taskConfigMap);
Expand Down Expand Up @@ -535,15 +532,12 @@ private void addWorkUnit(WorkUnit workUnit, ParallelRunner stateSerDeRunner, Map
private void deleteWorkUnitFromStateStore(String workUnitId, ParallelRunner stateSerDeRunner) {
String workUnitFilePath =
workUnitToHelixConfig.get(workUnitId).getConfigMap().get(GobblinClusterConfigurationKeys.WORK_UNIT_FILE_PATH);
final StateStore stateStore;
Path workUnitFile = new Path(workUnitFilePath);
final String fileName = workUnitFile.getName();
final String storeName = workUnitFile.getParent().getName();
if (fileName.endsWith(MULTI_WORK_UNIT_FILE_EXTENSION)) {
stateStore = stateStores.getMwuStateStore();
} else {
stateStore = stateStores.getWuStateStore();
}
final StateStore stateStore = JobLauncherUtils.hasMultiWorkUnitExtension(workUnitFile)
? stateStores.getMwuStateStore()
: stateStores.getWuStateStore();
stateSerDeRunner.submitCallable(new Callable<Void>() {
@Override
public Void call() throws Exception {
Expand All @@ -561,11 +555,11 @@ private String persistWorkUnit(final Path workUnitFileDir, final WorkUnit workUn
final StateStore stateStore;
String workUnitFileName = workUnit.getId();

if (workUnit instanceof MultiWorkUnit) {
workUnitFileName += MULTI_WORK_UNIT_FILE_EXTENSION;
if (workUnit.isMultiWorkUnit()) {
workUnitFileName += JobLauncherUtils.MULTI_WORK_UNIT_FILE_EXTENSION;
stateStore = stateStores.getMwuStateStore();
} else {
workUnitFileName += WORK_UNIT_FILE_EXTENSION;
workUnitFileName += JobLauncherUtils.WORK_UNIT_FILE_EXTENSION;
stateStore = stateStores.getWuStateStore();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
import org.apache.gobblin.broker.gobblin_scopes.JobScopeInstance;
import org.apache.gobblin.broker.iface.SharedResourcesBroker;
import org.apache.gobblin.runtime.AbstractJobLauncher;
import org.apache.gobblin.runtime.GobblinMultiTaskAttempt;
import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.runtime.util.StateStores;
Expand Down Expand Up @@ -184,7 +183,7 @@ protected List<WorkUnit> getWorkUnits()
WorkUnit workUnit;

try {
if (_workUnitFilePath.getName().endsWith(AbstractJobLauncher.MULTI_WORK_UNIT_FILE_EXTENSION)) {
if (JobLauncherUtils.hasMultiWorkUnitExtension(_workUnitFilePath)) {
workUnit = _stateStores.getMwuStateStore().getAll(storeName, fileName).get(0);
} else {
workUnit = _stateStores.getWuStateStore().getAll(storeName, fileName).get(0);
Expand Down
Loading

0 comments on commit edb3726

Please sign in to comment.