diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/activity/IllustrationItemActivity.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/activity/IllustrationItemActivity.java new file mode 100644 index 00000000000..a73c845d87f --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/activity/IllustrationItemActivity.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.loadgen.activity; + +import io.temporal.activity.ActivityInterface; +import io.temporal.activity.ActivityMethod; +import org.apache.gobblin.temporal.loadgen.work.IllustrationItem; + + +/** + * Activity for processing {@link IllustrationItem}s + * + * CAUTION/FINDING: an `@ActivityInterface` must not be parameterized (e.g. here, by WORK_ITEM), as doing so results in: + * io.temporal.failure.ApplicationFailure: message='class java.util.LinkedHashMap cannot be cast to class + * org.apache.gobblin.temporal.loadgen.work.IllustrationItem', type='java.lang.ClassCastException' + */ +@ActivityInterface +public interface IllustrationItemActivity { + @ActivityMethod + String handleItem(IllustrationItem item); +} + + diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/activity/impl/IllustrationItemActivityImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/activity/impl/IllustrationItemActivityImpl.java new file mode 100644 index 00000000000..70e6b77a7f3 --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/activity/impl/IllustrationItemActivityImpl.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.loadgen.activity.impl; + + +import lombok.extern.slf4j.Slf4j; +import org.apache.gobblin.temporal.loadgen.activity.IllustrationItemActivity; +import org.apache.gobblin.temporal.loadgen.work.IllustrationItem; + + +@Slf4j +public class IllustrationItemActivityImpl implements IllustrationItemActivity { + @Override + public String handleItem(final IllustrationItem item) { + log.info("Now illustrating - '" + item.getName() + "'"); + return item.getName(); + } +} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/launcher/GenArbitraryLoadJobLauncher.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/launcher/GenArbitraryLoadJobLauncher.java new file mode 100644 index 00000000000..62a7e4cfe23 --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/launcher/GenArbitraryLoadJobLauncher.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.loadgen.launcher; + +import java.util.List; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; +import lombok.extern.slf4j.Slf4j; +import io.temporal.client.WorkflowOptions; +import org.apache.hadoop.fs.Path; + +import org.apache.gobblin.annotation.Alpha; +import org.apache.gobblin.metrics.Tag; +import org.apache.gobblin.runtime.JobLauncher; +import org.apache.gobblin.source.workunit.WorkUnit; +import org.apache.gobblin.temporal.cluster.GobblinTemporalTaskRunner; +import org.apache.gobblin.temporal.joblauncher.GobblinTemporalJobLauncher; +import org.apache.gobblin.temporal.joblauncher.GobblinTemporalJobScheduler; +import org.apache.gobblin.temporal.loadgen.work.IllustrationItem; +import org.apache.gobblin.temporal.loadgen.work.SimpleGeneratedWorkload; +import org.apache.gobblin.temporal.util.nesting.work.WorkflowAddr; +import org.apache.gobblin.temporal.util.nesting.work.Workload; +import org.apache.gobblin.temporal.util.nesting.workflow.NestingExecWorkflow; +import org.apache.gobblin.util.PropertiesUtils; + +import static org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_PREFIX; + + +/** + * A {@link JobLauncher} for the initial triggering of a Temporal workflow that generates arbitrary load of many + * activities nested beneath a single subsuming super-workflow. see: {@link NestingExecWorkflow} + * + *

+ * This class is instantiated by the {@link GobblinTemporalJobScheduler#buildJobLauncher(Properties)} on every job submission to launch the Gobblin job. + * The actual task execution happens in the {@link GobblinTemporalTaskRunner}, usually in a different process. + *

+ */ +@Alpha +@Slf4j +public class GenArbitraryLoadJobLauncher extends GobblinTemporalJobLauncher { + public static final String GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_NUM_ACTIVITIES = GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_PREFIX + "num.activities"; + public static final String GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_MAX_BRANCHES_PER_TREE = GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_PREFIX + "max.branches.per.tree"; + public static final String GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_MAX_SUB_TREES_PER_TREE = GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_PREFIX + "max.sub.trees.per.tree"; + + public GenArbitraryLoadJobLauncher( + Properties jobProps, + Path appWorkDir, + List> metadataTags, + ConcurrentHashMap runningMap + ) throws Exception { + super(jobProps, appWorkDir, metadataTags, runningMap); + } + + @Override + public void submitJob(List workunits) { + int numActivities = PropertiesUtils.getRequiredPropAsInt(this.jobProps, GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_NUM_ACTIVITIES); + int maxBranchesPerTree = PropertiesUtils.getRequiredPropAsInt(this.jobProps, GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_MAX_BRANCHES_PER_TREE); + int maxSubTreesPerTree = PropertiesUtils.getRequiredPropAsInt(this.jobProps, GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_MAX_SUB_TREES_PER_TREE); + + Workload workload = SimpleGeneratedWorkload.createAs(numActivities); + WorkflowOptions options = WorkflowOptions.newBuilder().setTaskQueue(this.queueName).build(); + + // WARNING: although type param must agree w/ that of `workload`, it's entirely unverified by type checker! + // ...and more to the point, mismatch would occur at runtime (`performWorkload` on the workflow type given to the stub)! + NestingExecWorkflow workflow = this.client.newWorkflowStub(NestingExecWorkflow.class, options); + + workflow.performWorkload(WorkflowAddr.ROOT, workload, 0, maxBranchesPerTree, maxSubTreesPerTree, Optional.empty()); + } +} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/work/IllustrationItem.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/work/IllustrationItem.java new file mode 100644 index 00000000000..aec6a6ffb53 --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/work/IllustrationItem.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.loadgen.work; + +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; + + +/** Generally, this would specify what "work" needs performing plus how to perform, but for now merely a unique name (to log) */ +@Data +@NoArgsConstructor // IMPORTANT: for jackson (de)serialization +@RequiredArgsConstructor +public class IllustrationItem { + @NonNull + private String name; +} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/work/SimpleGeneratedWorkload.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/work/SimpleGeneratedWorkload.java new file mode 100644 index 00000000000..78bbecb207c --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/work/SimpleGeneratedWorkload.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.loadgen.work; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import lombok.AccessLevel; +import org.apache.gobblin.temporal.util.nesting.work.SeqBackedWorkSpan; +import org.apache.gobblin.temporal.util.nesting.work.Workload; + + +/** Example, illustration workload that synthesizes its work items; genuine {@link Workload}s generally arise from query/calc */ +@lombok.AllArgsConstructor(access = AccessLevel.PRIVATE) +@lombok.NoArgsConstructor // IMPORTANT: for jackson (de)serialization +@lombok.ToString +public class SimpleGeneratedWorkload implements Workload { + private int numItems; + + /** Factory method */ + public static SimpleGeneratedWorkload createAs(final int numItems) { + return new SimpleGeneratedWorkload(numItems); + } + + @Override + public Optional> getSpan(final int startIndex, final int numElements) { + if (startIndex >= numItems || startIndex < 0) { + return Optional.empty(); + } else { + List elems = IntStream.range(startIndex, Math.min(startIndex + numElements, numItems)) + .mapToObj(n -> new IllustrationItem("item-" + n + "-of-" + numItems)) + .collect(Collectors.toList()); + return Optional.of(new SeqBackedWorkSpan<>(elems, startIndex)); + } + } + + @Override + public boolean isIndexKnownToExceed(final int index) { + return isDefiniteSize() && index >= numItems; + } + + @Override + @JsonIgnore // (because no-arg method resembles 'java bean property') + public boolean isDefiniteSize() { + return true; + } +} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/worker/ArbitraryLoadWorker.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/worker/ArbitraryLoadWorker.java new file mode 100644 index 00000000000..29635619008 --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/worker/ArbitraryLoadWorker.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.loadgen.worker; + +import com.typesafe.config.Config; +import io.temporal.client.WorkflowClient; +import org.apache.gobblin.temporal.cluster.AbstractTemporalWorker; +import org.apache.gobblin.temporal.loadgen.activity.impl.IllustrationItemActivityImpl; +import org.apache.gobblin.temporal.loadgen.workflow.impl.NestingExecOfIllustrationItemActivityWorkflowImpl; + + +/** Worker for {@link NestingExecOfIllustrationItemActivityWorkflowImpl} and said activity impl */ +public class ArbitraryLoadWorker extends AbstractTemporalWorker { + public ArbitraryLoadWorker(Config config, WorkflowClient workflowClient) { + super(config, workflowClient); + } + + @Override + protected Class[] getWorkflowImplClasses() { + return new Class[] { NestingExecOfIllustrationItemActivityWorkflowImpl.class }; + } + + @Override + protected Object[] getActivityImplInstances() { + return new Object[] { new IllustrationItemActivityImpl() }; + } +} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/workflow/impl/NestingExecOfIllustrationItemActivityWorkflowImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/workflow/impl/NestingExecOfIllustrationItemActivityWorkflowImpl.java new file mode 100644 index 00000000000..4346eecfdfa --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/workflow/impl/NestingExecOfIllustrationItemActivityWorkflowImpl.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.loadgen.workflow.impl; + +import io.temporal.activity.ActivityOptions; +import io.temporal.common.RetryOptions; +import io.temporal.workflow.Async; +import io.temporal.workflow.Promise; +import io.temporal.workflow.Workflow; +import java.time.Duration; +import org.apache.gobblin.temporal.loadgen.activity.IllustrationItemActivity; +import org.apache.gobblin.temporal.loadgen.work.IllustrationItem; +import org.apache.gobblin.temporal.util.nesting.workflow.AbstractNestingExecWorkflowImpl; + + +/** {@link org.apache.gobblin.temporal.util.nesting.workflow.NestingExecWorkflow} for {@link IllustrationItem} */ +public class NestingExecOfIllustrationItemActivityWorkflowImpl + extends AbstractNestingExecWorkflowImpl { + + // RetryOptions specify how to automatically handle retries when Activities fail. + private static final RetryOptions ACTIVITY_RETRY_OPTS = RetryOptions.newBuilder() + .setInitialInterval(Duration.ofSeconds(1)) + .setMaximumInterval(Duration.ofSeconds(100)) + .setBackoffCoefficient(2) + .setMaximumAttempts(3) + .build(); + + private static final ActivityOptions ACTIVITY_OPTS = ActivityOptions.newBuilder() + .setStartToCloseTimeout(Duration.ofSeconds(10)) + .setRetryOptions(ACTIVITY_RETRY_OPTS) + .build(); + + private final IllustrationItemActivity activityStub = + Workflow.newActivityStub(IllustrationItemActivity.class, ACTIVITY_OPTS); + + @Override + protected Promise launchAsyncActivity(final IllustrationItem item) { + return Async.function(activityStub::handleItem, item); + } +} diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java index bb53404d8b8..d6361b92779 100644 --- a/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java +++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java @@ -70,6 +70,13 @@ public static int getPropAsInt(Properties properties, String key, int defaultVal return Integer.parseInt(properties.getProperty(key, Integer.toString(defaultValue))); } + /** @throws {@link NullPointerException} when `key` not in `properties` */ + public static int getRequiredPropAsInt(Properties properties, String key) { + String value = properties.getProperty(key); + Preconditions.checkNotNull(value, "'" + key + "' must be set (to an integer)"); + return Integer.parseInt(value); + } + public static long getPropAsLong(Properties properties, String key, long defaultValue) { return Long.parseLong(properties.getProperty(key, Long.toString(defaultValue))); }