Skip to content

Commit

Permalink
initial commit for refactoring dag manager
Browse files Browse the repository at this point in the history
  • Loading branch information
Meeth Gala authored and arjun4084346 committed Nov 13, 2023
1 parent 9e30c6c commit 1ebc701
Show file tree
Hide file tree
Showing 64 changed files with 2,758 additions and 253 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
public class ServiceConfigKeys {

public static final String GOBBLIN_SERVICE_PREFIX = "gobblin.service.";
public static final String GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS = "org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler";
public static final String GOBBLIN_ORCHESTRATOR_LISTENER_CLASS = "org.apache.gobblin.service.modules.orchestration.Orchestrator";

// Gobblin Service Manager Keys
public static final String GOBBLIN_SERVICE_TOPOLOGY_CATALOG_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "topologyCatalog.enabled";
Expand All @@ -41,6 +39,7 @@ public class ServiceConfigKeys {
public static final boolean DEFAULT_GOBBLIN_SERVICE_DAG_MANAGER_ENABLED = false;
public static final String GOBBLIN_SERVICE_JOB_STATUS_MONITOR_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "jobStatusMonitor.enabled";
public static final String GOBBLIN_SERVICE_WARM_STANDBY_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "warmStandby.enabled";
public static final String GOBBLIN_SERVICE_MULTI_ACTIVE_DAG_MANAGER_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "multiActiveDagManager.enabled";
public static final String GOBBLIN_SERVICE_MULTI_ACTIVE_SCHEDULER_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "multiActiveScheduler.enabled";
// If true, will mark up/down d2 servers on leadership so that all requests will be routed to the leader node
public static final String GOBBLIN_SERVICE_D2_ONLY_ANNOUNCE_LEADER = GOBBLIN_SERVICE_PREFIX + "d2.onlyAnnounceLeader";
Expand All @@ -54,11 +53,6 @@ public class ServiceConfigKeys {
public static final String GOBBLIN_SERVICE_FLOWGRAPH_CLASS_KEY = GOBBLIN_SERVICE_PREFIX + "flowGraph.class";
public static final String GOBBLIN_SERVICE_FLOWGRAPH_HELPER_KEY = GOBBLIN_SERVICE_PREFIX + "flowGraphHelper.class";

// Helix message sub types for FlowSpec
public static final String HELIX_FLOWSPEC_ADD = "FLOWSPEC_ADD";
public static final String HELIX_FLOWSPEC_REMOVE = "FLOWSPEC_REMOVE";
public static final String HELIX_FLOWSPEC_UPDATE = "FLOWSPEC_UPDATE";

// Flow Compiler Keys
public static final String GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY = GOBBLIN_SERVICE_PREFIX + "flowCompiler.class";
public static final String COMPILATION_SUCCESSFUL = "compilation.successful";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.service;

public class ServiceConstants {
public static final String SERVICE_NAMESPACE = "org.apache.gobblin.service";
public static final String GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS =
"org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler";
public static final String GOBBLIN_ORCHESTRATOR_LISTENER_CLASS =
"org.apache.gobblin.service.modules.orchestration.Orchestrator";

// Helix message sub types for FlowSpec
public static final String HELIX_FLOWSPEC_ADD = "FLOWSPEC_ADD";
public static final String HELIX_FLOWSPEC_REMOVE = "FLOWSPEC_REMOVE";
public static final String HELIX_FLOWSPEC_UPDATE = "FLOWSPEC_UPDATE";
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public void setUp() throws Exception {
Config config = configBuilder.build();
final FlowCatalog flowCatalog = new FlowCatalog(config);
final SpecCatalogListener mockListener = mock(SpecCatalogListener.class);
when(mockListener.getName()).thenReturn(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS);
when(mockListener.getName()).thenReturn(ServiceConstants.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS);
when(mockListener.onAddSpec(any())).thenReturn(new AddSpecResponse(""));
flowCatalog.addListener(mockListener);
flowCatalog.startAsync();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public void setUp() throws Exception {
Config config = configBuilder.build();
final FlowCatalog flowCatalog = new FlowCatalog(config);
final SpecCatalogListener mockListener = mock(SpecCatalogListener.class);
when(mockListener.getName()).thenReturn(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS);
when(mockListener.getName()).thenReturn(ServiceConstants.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS);
// NOTE: more general `ArgumentMatchers` (indicating compilation unsuccessful) must precede the specific
when(mockListener.onAddSpec(any())).thenReturn(new AddSpecResponse(null));
when(mockListener.onAddSpec(ArgumentMatchers.argThat((FlowSpec flowSpec) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
/**
* Resource for handling flow configuration requests
*/
@RestLiCollection(name = "flowconfigs", namespace = "org.apache.gobblin.service", keyName = "id")
@RestLiCollection(name = "flowconfigs", namespace = ServiceConstants.SERVICE_NAMESPACE, keyName = "id")
public class FlowConfigsResource extends ComplexKeyResourceTemplate<FlowId, EmptyRecord, FlowConfig> {
private static final Logger LOG = LoggerFactory.getLogger(FlowConfigsResource.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
* Resource for handling flow configuration requests
*/
@Slf4j
@RestLiCollection(name = "flowconfigsV2", namespace = "org.apache.gobblin.service", keyName = "id")
@RestLiCollection(name = "flowconfigsV2", namespace = ServiceConstants.SERVICE_NAMESPACE, keyName = "id")
public class FlowConfigsV2Resource extends ComplexKeyResourceTemplate<FlowId, FlowStatusId, FlowConfig> {
private static final Logger LOG = LoggerFactory.getLogger(FlowConfigsV2Resource.class);
public static final String INJECT_READY_TO_USE = "v2ReadyToUse";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
/**
* Resource for handling flow execution requests
*/
@RestLiCollection(name = "flowexecutions", namespace = "org.apache.gobblin.service", keyName = "id")
@RestLiCollection(name = "flowexecutions", namespace = ServiceConstants.SERVICE_NAMESPACE, keyName = "id")
public class FlowExecutionResource extends ComplexKeyResourceTemplate<FlowStatusId, EmptyRecord, FlowExecution> {

@Inject
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
/**
* Resource for handling flow status requests
*/
@RestLiCollection(name = "flowstatuses", namespace = "org.apache.gobblin.service", keyName = "id")
@RestLiCollection(name = "flowstatuses", namespace = ServiceConstants.SERVICE_NAMESPACE, keyName = "id")
public class FlowStatusResource extends ComplexKeyResourceTemplate<FlowStatusId, EmptyRecord, FlowStatus> {
public static final String MESSAGE_SEPARATOR = ", ";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ enum FlowActionType {
class DagAction {
final String flowGroup;
final String flowName;
final String flowExecutionId;
final long flowExecutionId;
final FlowActionType flowActionType;

public FlowId getFlowId() {
Expand All @@ -50,8 +50,7 @@ public FlowId getFlowId() {
* Replace flow execution id with agreed upon event time to easily track the flow
*/
public DagAction updateFlowExecutionId(long eventTimeMillis) {
return new DagAction(this.getFlowGroup(), this.getFlowName(),
String.valueOf(eventTimeMillis), this.getFlowActionType());
return new DagAction(this.getFlowGroup(), this.getFlowName(), eventTimeMillis, this.getFlowActionType());
}
}

Expand All @@ -64,7 +63,7 @@ public DagAction updateFlowExecutionId(long eventTimeMillis) {
* @param flowActionType the value of the dag action
* @throws IOException
*/
boolean exists(String flowGroup, String flowName, String flowExecutionId, FlowActionType flowActionType) throws IOException, SQLException;
boolean exists(String flowGroup, String flowName, long flowExecutionId, FlowActionType flowActionType) throws IOException, SQLException;

/**
* Persist the dag action in {@link DagActionStore} for durability
Expand All @@ -74,7 +73,7 @@ public DagAction updateFlowExecutionId(long eventTimeMillis) {
* @param flowActionType the value of the dag action
* @throws IOException
*/
void addDagAction(String flowGroup, String flowName, String flowExecutionId, FlowActionType flowActionType) throws IOException;
void addDagAction(String flowGroup, String flowName, long flowExecutionId, FlowActionType flowActionType) throws IOException;

/**
* delete the dag action from {@link DagActionStore}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class LeaseObtainedStatus extends LeaseAttemptStatus {
* @return event time in millis since epoch for the event of this lease acquisition
*/
public long getEventTimeMillis() {
return Long.parseLong(flowAction.getFlowExecutionId());
return flowAction.getFlowExecutionId();
}
}

Expand All @@ -113,7 +113,7 @@ class LeasedToAnotherStatus extends LeaseAttemptStatus {
* @return
*/
public long getEventTimeMillis() {
return Long.parseLong(flowAction.getFlowExecutionId());
return flowAction.getFlowExecutionId();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -434,13 +434,8 @@ protected SelectInfoResult getRowInfo(DagActionStore.DagAction flowAction) throw
return dbStatementExecutor.withPreparedStatement(thisTableSelectAfterInsertStatement,
selectStatement -> {
completeWhereClauseMatchingKeyPreparedStatement(selectStatement, flowAction);
ResultSet resultSet = selectStatement.executeQuery();
try {
try (ResultSet resultSet = selectStatement.executeQuery()) {
return createSelectInfoResult(resultSet);
} finally {
if (resultSet != null) {
resultSet.close();
}
}
}, true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,13 @@ public MysqlDagActionStore(Config config) throws IOException {
}

@Override
public boolean exists(String flowGroup, String flowName, String flowExecutionId, FlowActionType flowActionType) throws IOException, SQLException {
return dbStatementExecutor.withPreparedStatement(String.format(EXISTS_STATEMENT, tableName), existStatement -> {
public boolean exists(String flowGroup, String flowName, long flowExecutionId, FlowActionType flowActionType) throws IOException, SQLException {
try (Connection connection = this.dataSource.getConnection();
PreparedStatement existStatement = connection.prepareStatement(String.format(EXISTS_STATEMENT, tableName))) {
int i = 0;
existStatement.setString(++i, flowGroup);
existStatement.setString(++i, flowName);
existStatement.setString(++i, flowExecutionId);
existStatement.setString(++i, String.valueOf(flowExecutionId));
existStatement.setString(++i, flowActionType.toString());
ResultSet rs = null;
try {
Expand All @@ -117,18 +118,18 @@ public boolean exists(String flowGroup, String flowName, String flowExecutionId,
rs.close();
}
}
}, true);
}
}

@Override
public void addDagAction(String flowGroup, String flowName, String flowExecutionId, FlowActionType flowActionType)
public void addDagAction(String flowGroup, String flowName, long flowExecutionId, FlowActionType flowActionType)
throws IOException {
dbStatementExecutor.withPreparedStatement(String.format(INSERT_STATEMENT, tableName), insertStatement -> {
try {
int i = 0;
insertStatement.setString(++i, flowGroup);
insertStatement.setString(++i, flowName);
insertStatement.setString(++i, flowExecutionId);
insertStatement.setString(++i, String.valueOf(flowExecutionId));
insertStatement.setString(++i, flowActionType.toString());
return insertStatement.executeUpdate();
} catch (SQLException e) {
Expand All @@ -144,7 +145,7 @@ public boolean deleteDagAction(DagAction dagAction) throws IOException {
int i = 0;
deleteStatement.setString(++i, dagAction.getFlowGroup());
deleteStatement.setString(++i, dagAction.getFlowName());
deleteStatement.setString(++i, dagAction.getFlowExecutionId());
deleteStatement.setString(++i, String.valueOf(dagAction.getFlowExecutionId()));
deleteStatement.setString(++i, dagAction.getFlowActionType().toString());
int result = deleteStatement.executeUpdate();
return result != 0;
Expand All @@ -155,17 +156,17 @@ public boolean deleteDagAction(DagAction dagAction) throws IOException {
}

// TODO: later change this to getDagActions relating to a particular flow execution if it makes sense
private DagAction getDagActionWithRetry(String flowGroup, String flowName, String flowExecutionId, FlowActionType flowActionType, ExponentialBackoff exponentialBackoff)
private DagAction getDagActionWithRetry(String flowGroup, String flowName, long flowExecutionId, FlowActionType flowActionType, ExponentialBackoff exponentialBackoff)
throws IOException, SQLException {
return dbStatementExecutor.withPreparedStatement(String.format(GET_STATEMENT, tableName), getStatement -> {
int i = 0;
getStatement.setString(++i, flowGroup);
getStatement.setString(++i, flowName);
getStatement.setString(++i, flowExecutionId);
getStatement.setString(++i, String.valueOf(flowExecutionId));
getStatement.setString(++i, flowActionType.toString());
try (ResultSet rs = getStatement.executeQuery()) {
if (rs.next()) {
return new DagAction(rs.getString(1), rs.getString(2), rs.getString(3), FlowActionType.valueOf(rs.getString(4)));
return new DagAction(rs.getString(1), rs.getString(2), Long.parseLong(rs.getString(3)), FlowActionType.valueOf(rs.getString(4)));
} else if (exponentialBackoff.awaitNextRetryIfAvailable()) {
return getDagActionWithRetry(flowGroup, flowName, flowExecutionId, flowActionType, exponentialBackoff);
} else {
Expand All @@ -186,7 +187,7 @@ public Collection<DagAction> getDagActions() throws IOException {
HashSet<DagAction> result = new HashSet<>();
try (ResultSet rs = getAllStatement.executeQuery()) {
while (rs.next()) {
result.add(new DagAction(rs.getString(1), rs.getString(2), rs.getString(3), FlowActionType.valueOf(rs.getString(4))));
result.add(new DagAction(rs.getString(1), rs.getString(2), Long.parseLong(rs.getString(3)), FlowActionType.valueOf(rs.getString(4))));
}
return result;
} catch (SQLException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.commons.lang3.reflect.ConstructorUtils;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.runtime.util.InjectionNames;
import org.apache.gobblin.service.ServiceConstants;
import org.apache.gobblin.util.ExponentialBackoff;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -421,10 +422,10 @@ private Map<String, AddSpecResponse> updateOrAddSpecHelper(Spec spec, boolean tr
}
AddSpecResponse<String> compileResponse;
if (isWarmStandbyEnabled) {
compileResponse = responseMap.getOrDefault(ServiceConfigKeys.GOBBLIN_ORCHESTRATOR_LISTENER_CLASS, new AddSpecResponse<>(null));
compileResponse = responseMap.getOrDefault(ServiceConstants.GOBBLIN_ORCHESTRATOR_LISTENER_CLASS, new AddSpecResponse<>(null));
//todo: do we check quota here? or in compiler? Quota manager need dag to check quota which is not accessable from this class
} else {
compileResponse = responseMap.getOrDefault(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS, new AddSpecResponse<>(null));
compileResponse = responseMap.getOrDefault(ServiceConstants.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS, new AddSpecResponse<>(null));
}
responseMap.put(ServiceConfigKeys.COMPILATION_RESPONSE, compileResponse);

Expand Down
Loading

0 comments on commit 1ebc701

Please sign in to comment.