Skip to content

Commit

Permalink
Merge pull request #942: [proxima-beam-core] debugging external state…
Browse files Browse the repository at this point in the history
… expander
  • Loading branch information
je-ik authored Dec 2, 2024
2 parents 0930ab1 + c079a76 commit 2c65eff
Show file tree
Hide file tree
Showing 16 changed files with 600 additions and 102 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ jobs:
with:
fetch-depth: 0
- name: Cache maven repository
uses: actions/cache@v2
uses: actions/cache@v4
with:
path: |
~/.m2/repository
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright 2017-2024 O2 Czech Republic, a.s.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cz.o2.proxima.beam.core;

import com.google.auto.service.AutoService;
import java.util.Collections;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
import org.checkerframework.checker.nullness.qual.Nullable;

public interface ProximaPipelineOptions extends PipelineOptions {

@AutoService(PipelineOptionsRegistrar.class)
class Factory implements PipelineOptionsRegistrar {

@Override
public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
return Collections.singletonList(ProximaPipelineOptions.class);
}
}

/**
* @{code false} to preserve UDF jar on exit.
*/
@Default.Boolean(true)
boolean getPreserveUDFJar();

void setPreserveUDFJar(boolean delete);

/** Set directory where to store generated UDF jar(s). */
@Nullable String getUdfJarDirPath();

void setUdfJarDirPath(String path);

/** Set delay for {@BatchLogRead} in ms. */
@Default.Long(0L)
long getStartBatchReadDelayMs();

void setStartBatchReadDelayMs(long readDelayMs);
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package cz.o2.proxima.beam.core.direct.io;

import com.google.auto.service.AutoService;
import cz.o2.proxima.beam.core.ProximaPipelineOptions;
import cz.o2.proxima.beam.core.direct.io.BatchRestrictionTracker.PartitionList;
import cz.o2.proxima.core.repository.AttributeDescriptor;
import cz.o2.proxima.core.repository.Repository;
Expand All @@ -42,9 +42,6 @@
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.InstantCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.BoundedPerElement;
import org.apache.beam.sdk.transforms.Impulse;
Expand All @@ -64,21 +61,6 @@
@Slf4j
public class BatchLogRead extends PTransform<PBegin, PCollection<StreamElement>> {

public interface BatchLogReadPipelineOptions extends PipelineOptions {
@Default.Long(0L)
long getStartBatchReadDelayMs();

void setStartBatchReadDelayMs(long readDelayMs);
}

@AutoService(PipelineOptionsRegistrar.class)
public static class BatchLogReadOptionsFactory implements PipelineOptionsRegistrar {
@Override
public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
return Collections.singletonList(BatchLogReadPipelineOptions.class);
}
}

/**
* Create the {@link BatchLogRead} transform that reads from {@link BatchLogReader} in batch
* manner.
Expand Down Expand Up @@ -351,7 +333,7 @@ public PCollection<StreamElement> expand(PBegin input) {
input
.getPipeline()
.getOptions()
.as(BatchLogReadPipelineOptions.class)
.as(ProximaPipelineOptions.class)
.getStartBatchReadDelayMs();
return input
.apply(Impulse.create())
Expand Down
18 changes: 14 additions & 4 deletions beam/core/src/main/java/cz/o2/proxima/beam/util/RunnerUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package cz.o2.proxima.beam.util;

import com.google.common.base.Strings;
import cz.o2.proxima.beam.core.ProximaPipelineOptions;
import cz.o2.proxima.core.annotations.Internal;
import cz.o2.proxima.core.util.ExceptionUtils;
import java.io.ByteArrayInputStream;
Expand All @@ -26,6 +28,7 @@
import java.net.URLClassLoader;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
Expand Down Expand Up @@ -87,11 +90,18 @@ public static void injectJarIntoContextClassLoader(Collection<File> paths) {
* @param classes map of class to bytecode
* @return generated {@link File}
*/
public static File createJarFromDynamicClasses(Map<? extends Class<?>, byte[]> classes)
throws IOException {
Path tempFile = Files.createTempFile("proxima-beam-dynamic", ".jar");
public static File createJarFromDynamicClasses(
ProximaPipelineOptions opts, Map<? extends Class<?>, byte[]> classes) throws IOException {

final Path tempFile =
Strings.isNullOrEmpty(opts.getUdfJarDirPath())
? Files.createTempFile("proxima-beam-dynamic", ".jar")
: Files.createTempFile(
Paths.get(opts.getUdfJarDirPath()), "proxima-beam-dynamic", ".jar");
File out = tempFile.toFile();
out.deleteOnExit();
if (!opts.getPreserveUDFJar()) {
out.deleteOnExit();
}
try (JarOutputStream output = new JarOutputStream(new FileOutputStream(out))) {
long now = System.currentTimeMillis();
for (Map.Entry<? extends Class<?>, byte[]> e : classes.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2017-2024 O2 Czech Republic, a.s.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cz.o2.proxima.beam.util.state;

import cz.o2.proxima.core.annotations.Experimental;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* Annotation for {@link org.apache.beam.sdk.transforms.DoFn DoFns} that should be excluded from the
* external state expansion.
*/
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
@Documented
@Experimental
public @interface ExcludeExternal {}
Loading

0 comments on commit 2c65eff

Please sign in to comment.