diff --git a/.test-infra/mock-apis/infrastructure/kubernetes/refresher/overlays/echo-ThrottleWithExternalResourceTestIT-10-per-1s-quota/configmap.yaml b/.test-infra/mock-apis/infrastructure/kubernetes/refresher/overlays/echo-ThrottleWithExternalResourceTestIT-10-per-1s-quota/configmap.yaml new file mode 100644 index 0000000000000..6a482b21b1657 --- /dev/null +++ b/.test-infra/mock-apis/infrastructure/kubernetes/refresher/overlays/echo-ThrottleWithExternalResourceTestIT-10-per-1s-quota/configmap.yaml @@ -0,0 +1,30 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Configures patch for ../base/configmap.yaml +# See https://kubectl.docs.kubernetes.io/references/kustomize/kustomization/patches/ + +- op: replace + path: /metadata/labels/quota-id + value: echo-ThrottleWithExternalResourceTestIT-10-per-1s-quota +- op: replace + path: /data/QUOTA_ID + value: echo-ThrottleWithExternalResourceTestIT-10-per-1s-quota +- op: replace + path: /data/QUOTA_SIZE + value: "10" +- op: replace + path: /data/QUOTA_REFRESH_INTERVAL + value: 1s diff --git a/.test-infra/mock-apis/infrastructure/kubernetes/refresher/overlays/echo-ThrottleWithExternalResourceTestIT-10-per-1s-quota/deployment.yaml b/.test-infra/mock-apis/infrastructure/kubernetes/refresher/overlays/echo-ThrottleWithExternalResourceTestIT-10-per-1s-quota/deployment.yaml new file mode 100644 index 0000000000000..cff2f994cd63f --- /dev/null +++ b/.test-infra/mock-apis/infrastructure/kubernetes/refresher/overlays/echo-ThrottleWithExternalResourceTestIT-10-per-1s-quota/deployment.yaml @@ -0,0 +1,27 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Configures patch for ../base/deployment.yaml +# See https://kubectl.docs.kubernetes.io/references/kustomize/kustomization/patches/ + +- op: replace + path: /metadata/labels/quota-id + value: echo-ThrottleWithExternalResourceTestIT-10-per-1s-quota +- op: replace + path: /spec/selector/matchLabels/quota-id + value: echo-ThrottleWithExternalResourceTestIT-10-per-1s-quota +- op: replace + path: /spec/template/metadata/labels/quota-id + value: echo-ThrottleWithExternalResourceTestIT-10-per-1s-quota diff --git a/.test-infra/mock-apis/infrastructure/kubernetes/refresher/overlays/echo-ThrottleWithExternalResourceTestIT-10-per-1s-quota/kustomization.yaml b/.test-infra/mock-apis/infrastructure/kubernetes/refresher/overlays/echo-ThrottleWithExternalResourceTestIT-10-per-1s-quota/kustomization.yaml new file mode 100644 index 0000000000000..d10598be51f47 --- /dev/null +++ b/.test-infra/mock-apis/infrastructure/kubernetes/refresher/overlays/echo-ThrottleWithExternalResourceTestIT-10-per-1s-quota/kustomization.yaml @@ -0,0 +1,34 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Configures the overlay for .test-infra/mock-apis/infrastructure/kubernetes/refresher/base +# Using the Quota Id: +# echo-ThrottleWithExternalResourceTestIT-10-per-1s-quota + +resources: +- ../../base + +nameSuffix: -throttle-with-external-resource-test-10-per-1s + +patches: +- path: configmap.yaml + target: + kind: ConfigMap + name: refresher + +- path: deployment.yaml + target: + kind: Deployment + name: refresher diff --git a/sdks/java/io/rrio/build.gradle b/sdks/java/io/rrio/build.gradle index bfd030ce61dc5..4ecdf4e91df7a 100644 --- a/sdks/java/io/rrio/build.gradle +++ b/sdks/java/io/rrio/build.gradle @@ -50,6 +50,7 @@ dependencies { testImplementation platform(library.java.google_cloud_platform_libraries_bom) testImplementation library.java.google_http_client testImplementation library.java.junit + testImplementation library.java.hamcrest testImplementation library.java.testcontainers_base testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow") diff --git a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Cache.java b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Cache.java new file mode 100644 index 0000000000000..b8e526c4829b0 --- /dev/null +++ b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Cache.java @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.requestresponse; + +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.Coder.NonDeterministicException; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.ByteSource; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; + +/** Transforms for reading and writing request/response associations to a cache. */ +final class Cache { + + /** + * Instantiates a {@link Call} {@link PTransform} that reads {@link RequestT} {@link ResponseT} + * associations from a cache. The {@link KV} value is null when no association exists. This method + * does not enforce {@link Coder#verifyDeterministic} and defers to the user to determine whether + * to enforce this given the cache implementation. + */ + static < + RequestT, + @Nullable ResponseT, + CallerSetupTeardownT extends + Caller> & SetupTeardown> + PTransform, Call.Result>> read( + CallerSetupTeardownT implementsCallerSetupTeardown, + Coder requestTCoder, + Coder<@Nullable ResponseT> responseTCoder) { + return Call.ofCallerAndSetupTeardown( + implementsCallerSetupTeardown, KvCoder.of(requestTCoder, responseTCoder)); + } + + /** + * Instantiates a {@link Call} {@link PTransform}, calling {@link #read} with a {@link Caller} + * that employs a redis client. + * + *

This method requires both the {@link RequestT} and {@link ResponseT}s' {@link + * Coder#verifyDeterministic}. Otherwise, it throws a {@link NonDeterministicException}. + * + *

Redis is designed for multiple workloads, simultaneously + * reading and writing to a shared instance. See Redis FAQ for more information on important + * considerations when using this method to achieve cache reads. + */ + static + PTransform, Call.Result>> + readUsingRedis( + RedisClient client, + Coder requestTCoder, + Coder<@Nullable ResponseT> responseTCoder) + throws NonDeterministicException { + return read( + new UsingRedis<>(requestTCoder, responseTCoder, client).read(), + requestTCoder, + responseTCoder); + } + + /** + * Write a {@link RequestT} {@link ResponseT} association to a cache. This method does not enforce + * {@link Coder#verifyDeterministic} and defers to the user to determine whether to enforce this + * given the cache implementation. + */ + static < + RequestT, + ResponseT, + CallerSetupTeardownT extends + Caller, KV> & SetupTeardown> + PTransform>, Call.Result>> write( + CallerSetupTeardownT implementsCallerSetupTeardown, + KvCoder kvCoder) { + return Call.ofCallerAndSetupTeardown(implementsCallerSetupTeardown, kvCoder); + } + + /** + * Instantiates a {@link Call} {@link PTransform}, calling {@link #write} with a {@link Caller} + * that employs a redis client. + * + *

This method requires both the {@link RequestT} and {@link ResponseT}s' {@link + * Coder#verifyDeterministic}. Otherwise, it throws a {@link NonDeterministicException}. + * + *

Redis is designed for multiple workloads, simultaneously + * reading and writing to a shared instance. See Redis FAQ for more information on important + * considerations when using this method to achieve cache writes. + */ + static + PTransform>, Call.Result>> + writeUsingRedis( + Duration expiry, + RedisClient client, + Coder requestTCoder, + Coder<@Nullable ResponseT> responseTCoder) + throws NonDeterministicException { + return write( + new UsingRedis<>(requestTCoder, responseTCoder, client).write(expiry), + KvCoder.of(requestTCoder, responseTCoder)); + } + + private static class UsingRedis { + private final Coder requestTCoder; + private final Coder<@Nullable ResponseT> responseTCoder; + private final RedisClient client; + + private UsingRedis( + Coder requestTCoder, + Coder<@Nullable ResponseT> responseTCoder, + RedisClient client) + throws Coder.NonDeterministicException { + this.client = client; + requestTCoder.verifyDeterministic(); + responseTCoder.verifyDeterministic(); + this.requestTCoder = requestTCoder; + this.responseTCoder = responseTCoder; + } + + private Read read() { + return new Read<>(requestTCoder, responseTCoder, client); + } + + private Write write(Duration expiry) { + return new Write<>(expiry, requestTCoder, responseTCoder, client); + } + + /** Reads associated {@link RequestT} {@link ResponseT} using a {@link RedisClient}. */ + private static class Read + implements Caller>, SetupTeardown { + + private final Coder requestTCoder; + private final Coder<@Nullable ResponseT> responseTCoder; + private final RedisClient client; + + private Read( + Coder requestTCoder, + Coder<@Nullable ResponseT> responseTCoder, + RedisClient client) { + this.requestTCoder = requestTCoder; + this.responseTCoder = responseTCoder; + this.client = client; + } + + @Override + public KV call(RequestT request) + throws UserCodeExecutionException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try { + requestTCoder.encode(request, baos); + byte[] encodedRequest = baos.toByteArray(); + byte[] encodedResponse = client.getBytes(encodedRequest); + if (encodedResponse == null) { + return KV.of(request, null); + } + ResponseT response = + checkStateNotNull( + responseTCoder.decode(ByteSource.wrap(encodedResponse).openStream())); + return KV.of(request, response); + } catch (IllegalStateException | IOException e) { + throw new UserCodeExecutionException(e); + } + } + + @Override + public void setup() throws UserCodeExecutionException { + client.setup(); + } + + @Override + public void teardown() throws UserCodeExecutionException { + client.teardown(); + } + } + } + + private static class Write + implements Caller, KV>, SetupTeardown { + private final Duration expiry; + private final Coder requestTCoder; + private final Coder<@Nullable ResponseT> responseTCoder; + private final RedisClient client; + + private Write( + Duration expiry, + Coder requestTCoder, + Coder<@Nullable ResponseT> responseTCoder, + RedisClient client) { + this.expiry = expiry; + this.requestTCoder = requestTCoder; + this.responseTCoder = responseTCoder; + this.client = client; + } + + @Override + public KV call(KV request) + throws UserCodeExecutionException { + ByteArrayOutputStream keyStream = new ByteArrayOutputStream(); + ByteArrayOutputStream valueStream = new ByteArrayOutputStream(); + try { + requestTCoder.encode(request.getKey(), keyStream); + responseTCoder.encode(request.getValue(), valueStream); + } catch (IOException e) { + throw new UserCodeExecutionException(e); + } + client.setex(keyStream.toByteArray(), valueStream.toByteArray(), expiry); + return request; + } + + @Override + public void setup() throws UserCodeExecutionException { + client.setup(); + } + + @Override + public void teardown() throws UserCodeExecutionException { + client.teardown(); + } + } +} diff --git a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/CacheRead.java b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/CacheRead.java deleted file mode 100644 index 3765d25370a66..0000000000000 --- a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/CacheRead.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.io.requestresponse; - -import com.google.auto.value.AutoValue; -import java.util.Map; -import org.apache.beam.io.requestresponse.CacheRead.Result; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.beam.sdk.values.PInput; -import org.apache.beam.sdk.values.POutput; -import org.apache.beam.sdk.values.PValue; -import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; - -/** - * {@link CacheRead} reads associated {@link ResponseT} types from {@link RequestT} types, if any - * exist. - */ -class CacheRead - extends PTransform, Result> { - - private static final TupleTag FAILURE_TAG = new TupleTag() {}; - - // TODO(damondouglas): remove suppress warnings after instance utilized. - @SuppressWarnings({"unused"}) - private final Configuration configuration; - - private CacheRead(Configuration configuration) { - this.configuration = configuration; - } - - /** Configuration details for {@link CacheRead}. */ - @AutoValue - abstract static class Configuration { - - static Builder builder() { - return new AutoValue_CacheRead_Configuration.Builder<>(); - } - - abstract Builder toBuilder(); - - @AutoValue.Builder - abstract static class Builder { - - abstract Configuration build(); - } - } - - @Override - public Result expand(PCollection input) { - return Result.of( - new TupleTag>() {}, PCollectionTuple.empty(input.getPipeline())); - } - - /** - * The {@link Result} of reading RequestT {@link PCollection} elements yielding ResponseT {@link - * PCollection} elements. - */ - static class Result implements POutput { - - static Result of( - TupleTag> responseTag, PCollectionTuple pct) { - return new Result<>(responseTag, pct); - } - - private final Pipeline pipeline; - private final TupleTag> responseTag; - private final PCollection> responses; - private final PCollection failures; - - private Result(TupleTag> responseTag, PCollectionTuple pct) { - this.pipeline = pct.getPipeline(); - this.responseTag = responseTag; - this.responses = pct.get(responseTag); - this.failures = pct.get(FAILURE_TAG); - } - - PCollection> getResponses() { - return responses; - } - - PCollection getFailures() { - return failures; - } - - @Override - public Pipeline getPipeline() { - return this.pipeline; - } - - @Override - public Map, PValue> expand() { - return ImmutableMap.of( - responseTag, responses, - FAILURE_TAG, failures); - } - - @Override - public void finishSpecifyingOutput( - String transformName, PInput input, PTransform transform) {} - } -} diff --git a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/CacheWrite.java b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/CacheWrite.java deleted file mode 100644 index 25249c3e41b42..0000000000000 --- a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/CacheWrite.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.io.requestresponse; - -import com.google.auto.value.AutoValue; -import java.util.Map; -import org.apache.beam.io.requestresponse.CacheWrite.Result; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.beam.sdk.values.PInput; -import org.apache.beam.sdk.values.POutput; -import org.apache.beam.sdk.values.PValue; -import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; - -/** - * {@link CacheWrite} writes associated {@link RequestT} and {@link ResponseT} pairs to a cache. - * Using {@link RequestT} and {@link ResponseT}'s {@link org.apache.beam.sdk.coders.Coder}, this - * transform writes encoded representations of this association. - */ -class CacheWrite - extends PTransform>, Result> { - - private static final TupleTag FAILURE_TAG = new TupleTag() {}; - - // TODO(damondouglas): remove suppress warnings after configuration is used. - @SuppressWarnings({"unused"}) - private final Configuration configuration; - - private CacheWrite(Configuration configuration) { - this.configuration = configuration; - } - - /** Configuration details for {@link CacheWrite}. */ - @AutoValue - abstract static class Configuration { - - static Builder builder() { - return new AutoValue_CacheWrite_Configuration.Builder<>(); - } - - abstract Builder toBuilder(); - - @AutoValue.Builder - abstract static class Builder { - - abstract Configuration build(); - } - } - - @Override - public Result expand(PCollection> input) { - return Result.of( - new TupleTag>() {}, PCollectionTuple.empty(input.getPipeline())); - } - - /** The {@link Result} of writing a request/response {@link KV} {@link PCollection}. */ - static class Result implements POutput { - - static Result of( - TupleTag> responseTag, PCollectionTuple pct) { - return new Result<>(responseTag, pct); - } - - private final Pipeline pipeline; - private final TupleTag> responseTag; - private final PCollection> responses; - private final PCollection failures; - - private Result(TupleTag> responseTag, PCollectionTuple pct) { - this.pipeline = pct.getPipeline(); - this.responseTag = responseTag; - this.responses = pct.get(responseTag); - this.failures = pct.get(FAILURE_TAG); - } - - public PCollection> getResponses() { - return responses; - } - - public PCollection getFailures() { - return failures; - } - - @Override - public Pipeline getPipeline() { - return this.pipeline; - } - - @Override - public Map, PValue> expand() { - return ImmutableMap.of( - responseTag, responses, - FAILURE_TAG, failures); - } - - @Override - public void finishSpecifyingOutput( - String transformName, PInput input, PTransform transform) {} - } -} diff --git a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Call.java b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Call.java index 52181af534ed3..d52ca971ca47a 100644 --- a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Call.java +++ b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Call.java @@ -100,7 +100,11 @@ Call ofCallerAndSetupTeardown( .build()); } - private static final TupleTag FAILURE_TAG = new TupleTag() {}; + // TupleTags need to be instantiated for each Call instance. We cannot use a shared + // static instance that is shared for multiple PCollectionTuples when Call is + // instantiated multiple times as it is reused throughout code in this library. + private final TupleTag responseTag = new TupleTag() {}; + private final TupleTag failureTag = new TupleTag() {}; private final Configuration configuration; @@ -128,27 +132,30 @@ Call withTimeout(Duration timeout) { @Override public @NonNull Result expand(PCollection input) { - TupleTag responseTag = new TupleTag() {}; PCollectionTuple pct = input.apply( CallFn.class.getSimpleName(), - ParDo.of(new CallFn<>(responseTag, configuration)) - .withOutputTags(responseTag, TupleTagList.of(FAILURE_TAG))); + ParDo.of(new CallFn<>(responseTag, failureTag, configuration)) + .withOutputTags(responseTag, TupleTagList.of(failureTag))); - return Result.of(configuration.getResponseCoder(), responseTag, pct); + return Result.of(configuration.getResponseCoder(), responseTag, failureTag, pct); } private static class CallFn extends DoFn { private final TupleTag responseTag; + private final TupleTag failureTag; private final CallerWithTimeout caller; private final SetupTeardownWithTimeout setupTeardown; private transient @MonotonicNonNull ExecutorService executor; private CallFn( - TupleTag responseTag, Configuration configuration) { + TupleTag responseTag, + TupleTag failureTag, + Configuration configuration) { this.responseTag = responseTag; + this.failureTag = failureTag; this.caller = new CallerWithTimeout<>(configuration.getTimeout(), configuration.getCaller()); this.setupTeardown = new SetupTeardownWithTimeout( @@ -194,7 +201,7 @@ public void process(@Element @NonNull RequestT request, MultiOutputReceiver rece ResponseT response = this.caller.call(request); receiver.get(responseTag).output(response); } catch (UserCodeExecutionException e) { - receiver.get(FAILURE_TAG).output(ApiIOError.of(e, request)); + receiver.get(failureTag).output(ApiIOError.of(e, request)); } } } @@ -269,21 +276,29 @@ final Configuration build() { static class Result implements POutput { static Result of( - Coder responseTCoder, TupleTag responseTag, PCollectionTuple pct) { - return new Result<>(responseTCoder, responseTag, pct); + Coder responseTCoder, + TupleTag responseTag, + TupleTag failureTag, + PCollectionTuple pct) { + return new Result<>(responseTCoder, responseTag, pct, failureTag); } private final Pipeline pipeline; private final TupleTag responseTag; + private final TupleTag failureTag; private final PCollection responses; private final PCollection failures; private Result( - Coder responseTCoder, TupleTag responseTag, PCollectionTuple pct) { + Coder responseTCoder, + TupleTag responseTag, + PCollectionTuple pct, + TupleTag failureTag) { this.pipeline = pct.getPipeline(); this.responseTag = responseTag; + this.failureTag = failureTag; this.responses = pct.get(responseTag).setCoder(responseTCoder); - this.failures = pct.get(FAILURE_TAG); + this.failures = pct.get(this.failureTag); } public PCollection getResponses() { @@ -303,7 +318,7 @@ public PCollection getFailures() { public @NonNull Map, PValue> expand() { return ImmutableMap.of( responseTag, responses, - FAILURE_TAG, failures); + failureTag, failures); } @Override diff --git a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Quota.java b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Quota.java new file mode 100644 index 0000000000000..d2e538cf7cf39 --- /dev/null +++ b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Quota.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.requestresponse; + +import java.io.Serializable; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Objects; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; + +/** + * A data class that expresses a quota. Web API providers typically define a quota as the number of + * requests per time interval. + */ +public class Quota implements Serializable { + private final long numRequests; + private final @NonNull Duration interval; + + public Quota(long numRequests, @NonNull Duration interval) { + this.numRequests = numRequests; + this.interval = interval; + } + + /** The number of allowed requests. */ + public long getNumRequests() { + return numRequests; + } + + /** The duration context within which to allow requests. */ + public @NonNull Duration getInterval() { + return interval; + } + + @Override + public boolean equals(@Nullable Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Quota quota = (Quota) o; + return numRequests == quota.numRequests && Objects.equal(interval, quota.interval); + } + + @Override + public int hashCode() { + return Objects.hashCode(numRequests, interval); + } + + @Override + public String toString() { + return "Quota{" + "numRequests=" + numRequests + ", interval=" + interval + '}'; + } +} diff --git a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/RedisClient.java b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/RedisClient.java index a87f5c191e4b0..a347a18524131 100644 --- a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/RedisClient.java +++ b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/RedisClient.java @@ -24,6 +24,7 @@ import org.apache.beam.sdk.transforms.DoFn; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; import redis.clients.jedis.JedisPooled; import redis.clients.jedis.exceptions.JedisException; @@ -61,6 +62,15 @@ long decr(String key) throws UserCodeExecutionException { } } + /** Get a byte array associated with a byte array key. Returns null if key does not exist. */ + byte @Nullable [] getBytes(byte[] key) throws UserCodeExecutionException { + try { + return getSafeClient().get(key); + } catch (JedisException e) { + throw new UserCodeExecutionException(e); + } + } + /** * Get the long value stored by the key. Yields zero when key does not exist, keeping consistency * with Redis convention. Consider using {@link #exists} to query key existance. diff --git a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/ThrottleDequeue.java b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/ThrottleDequeue.java deleted file mode 100644 index 085b13b5e1120..0000000000000 --- a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/ThrottleDequeue.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.io.requestresponse; - -import com.google.auto.value.AutoValue; -import java.util.Map; -import org.apache.beam.io.requestresponse.ThrottleDequeue.Result; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.beam.sdk.values.PInput; -import org.apache.beam.sdk.values.POutput; -import org.apache.beam.sdk.values.PValue; -import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; -import org.joda.time.Instant; - -/** - * {@link ThrottleDequeue} dequeues {@link RequestT} elements at a fixed rate yielding a {@link - * Result} containing the dequeued {@link RequestT} {@link PCollection} and a {@link ApiIOError} - * {@link PCollection} of any errors. - */ -class ThrottleDequeue extends PTransform, Result> { - - private static final TupleTag FAILURE_TAG = new TupleTag() {}; - - // TODO(damondouglas): remove suppress warnings after instance utilized. - @SuppressWarnings({"unused"}) - private final Configuration configuration; - - private ThrottleDequeue(Configuration configuration) { - this.configuration = configuration; - } - - @Override - public Result expand(PCollection input) { - // TODO(damondouglas): expand in a future PR. - return new Result<>(new TupleTag() {}, PCollectionTuple.empty(input.getPipeline())); - } - - @AutoValue - abstract static class Configuration { - - @AutoValue.Builder - abstract static class Builder { - abstract Configuration build(); - } - } - - /** The {@link Result} of dequeuing {@link RequestT}s. */ - static class Result implements POutput { - - static Result of(TupleTag requestsTag, PCollectionTuple pct) { - return new Result<>(requestsTag, pct); - } - - private final Pipeline pipeline; - private final TupleTag requestsTag; - private final PCollection requests; - private final PCollection failures; - - private Result(TupleTag requestsTag, PCollectionTuple pct) { - this.pipeline = pct.getPipeline(); - this.requestsTag = requestsTag; - this.requests = pct.get(requestsTag); - this.failures = pct.get(FAILURE_TAG); - } - - @Override - public Pipeline getPipeline() { - return pipeline; - } - - @Override - public Map, PValue> expand() { - return ImmutableMap.of( - requestsTag, requests, - FAILURE_TAG, failures); - } - - @Override - public void finishSpecifyingOutput( - String transformName, PInput input, PTransform transform) {} - } -} diff --git a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/ThrottleEnqueue.java b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/ThrottleEnqueue.java deleted file mode 100644 index 505ef86be48b3..0000000000000 --- a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/ThrottleEnqueue.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.io.requestresponse; - -import com.google.auto.value.AutoValue; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.TypeDescriptor; - -/** - * {@link ThrottleEnqueue} enqueues {@link RequestT} elements yielding an {@link ApiIOError} {@link - * PCollection} of any enqueue errors. - */ -class ThrottleEnqueue extends PTransform, PCollection> { - - @SuppressWarnings({"unused"}) - private final Configuration configuration; - - private ThrottleEnqueue(Configuration configuration) { - this.configuration = configuration; - } - - /** Configuration details for {@link ThrottleEnqueue}. */ - @AutoValue - abstract static class Configuration { - - static Builder builder() { - return new AutoValue_ThrottleEnqueue_Configuration.Builder<>(); - } - - abstract Builder toBuilder(); - - @AutoValue.Builder - abstract static class Builder { - - abstract Configuration build(); - } - } - - @Override - public PCollection expand(PCollection input) { - // TODO(damondouglas): expand in a future PR. - return input.getPipeline().apply(Create.empty(TypeDescriptor.of(ApiIOError.class))); - } -} diff --git a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/ThrottleRefreshQuota.java b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/ThrottleRefreshQuota.java deleted file mode 100644 index 57e57528db4bc..0000000000000 --- a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/ThrottleRefreshQuota.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.io.requestresponse; - -import com.google.auto.value.AutoValue; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.TypeDescriptor; -import org.joda.time.Instant; - -/** - * {@link ThrottleRefreshQuota} refreshes a quota per {@link Instant} processing events emitting any - * errors into an {@link ApiIOError} {@link PCollection}. - */ -class ThrottleRefreshQuota extends PTransform, PCollection> { - - // TODO: remove suppress warnings after configuration utilized. - @SuppressWarnings({"unused"}) - private final Configuration configuration; - - private ThrottleRefreshQuota(Configuration configuration) { - this.configuration = configuration; - } - - @Override - public PCollection expand(PCollection input) { - // TODO(damondouglas): expand in a later PR. - return input.getPipeline().apply(Create.empty(TypeDescriptor.of(ApiIOError.class))); - } - - @AutoValue - abstract static class Configuration { - - @AutoValue.Builder - abstract static class Builder { - abstract Configuration build(); - } - } -} diff --git a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/ThrottleWithExternalResource.java b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/ThrottleWithExternalResource.java new file mode 100644 index 0000000000000..dffc034770aa7 --- /dev/null +++ b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/ThrottleWithExternalResource.java @@ -0,0 +1,418 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.requestresponse; + +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.PeriodicImpulse; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionList; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Throwables; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.ByteSource; +import org.joda.time.Duration; +import org.joda.time.Instant; + +/** + * Throttles a {@link T} {@link PCollection} using an external resource. + * + *

{@link ThrottleWithExternalResource} makes use of {@link PeriodicImpulse} as it needs to + * coordinate three {@link PTransform}s concurrently. Usage of {@link ThrottleWithExternalResource} + * should consider the impact of {@link PeriodicImpulse} on the pipeline. + * + *

Usage of {@link ThrottleWithExternalResource} is completely optional and serves as one of many + * methods by {@link RequestResponseIO} to protect against API overuse. Usage should not depend on + * {@link ThrottleWithExternalResource} alone to achieve API overuse prevention for several reasons. + * The underlying external resource may not scale at all or as fast as a Beam Runner. The external + * resource itself may be an API with its own quota that {@link ThrottleWithExternalResource} does + * not consider. + * + *

{@link ThrottleWithExternalResource} makes use of several {@link Caller}s that work together + * to achieve its aim of throttling a {@link T} {@link PCollection}. A {@link RefresherT} is a + * {@link Caller} that takes an {@link Instant} and refreshes a shared {@link Quota}. An {@link + * EnqueuerT} enqueues a {@link T} element while a {@link DequeuerT} dequeues said element when the + * {@link ReporterT} reports that the stored {@link Quota#getNumRequests} is >0. Finally, a {@link + * DecrementerT} decrements from the shared {@link Quota} value, additionally reporting the value + * after performing the action. + * + *

{@link ThrottleWithExternalResource} instantiates and applies two {@link Call} {@link + * PTransform}s using the aforementioned {@link Caller}s {@link RefresherT} and {@link EnqueuerT}. + * {@link ThrottleWithExternalResource} calls {@link ReporterT}, {@link DequeuerT}, {@link + * DecrementerT} within its {@link DoFn}, emitting the dequeued {@link T} when the {@link ReporterT} + * reports a value >0. As an additional safety check, the DoFn checks whether the {@link Quota} + * value after {@link DecrementerT}'s action is <0, signaling that multiple workers are attempting + * the same too fast and therefore exists the DoFn allowing for the next refresh. + * + *

{@link ThrottleWithExternalResource} flattens errors emitted from {@link EnqueuerT}, {@link + * RefresherT}, and its own {@link DoFn} into a single {@link ApiIOError} {@link PCollection} that + * is encapsulated, with a {@link T} {@link PCollection} output into a {@link Call.Result}. + */ +class ThrottleWithExternalResource< + T, + ReporterT extends Caller & SetupTeardown, + EnqueuerT extends Caller & SetupTeardown, + DequeuerT extends Caller & SetupTeardown, + DecrementerT extends Caller & SetupTeardown, + RefresherT extends Caller & SetupTeardown> + extends PTransform, Call.Result> { + + /** + * Instantiate a {@link ThrottleWithExternalResource} using a {@link RedisClient}. + * + *

Redis is designed for multiple workloads, simultaneously + * reading and writing to a shared instance. See Redis FAQ for more information on important + * considerations when using Redis as {@link ThrottleWithExternalResource}'s external resource. + */ + static + ThrottleWithExternalResource< + T, + RedisReporter, + RedisEnqueuer, + RedisDequeuer, + RedisDecrementer, + RedisRefresher> + usingRedis(URI uri, String quotaIdentifier, String queueKey, Quota quota, Coder coder) + throws Coder.NonDeterministicException { + return new ThrottleWithExternalResource< + T, RedisReporter, RedisEnqueuer, RedisDequeuer, RedisDecrementer, RedisRefresher>( + quota, + quotaIdentifier, + coder, + new RedisReporter(uri), + new RedisEnqueuer<>(uri, queueKey, coder), + new RedisDequeuer<>(uri, coder, queueKey), + new RedisDecrementer(uri, queueKey), + new RedisRefresher(uri, quota, quotaIdentifier)); + } + + private static final Duration THROTTLE_INTERVAL = Duration.standardSeconds(1L); + + private final Quota quota; + private final String quotaIdentifier; + private final Coder coder; + private final ReporterT reporterT; + private final EnqueuerT enqueuerT; + private final DequeuerT dequeuerT; + private final DecrementerT decrementerT; + private final RefresherT refresherT; + + ThrottleWithExternalResource( + Quota quota, + String quotaIdentifier, + Coder coder, + ReporterT reporterT, + EnqueuerT enqueuerT, + DequeuerT dequeuerT, + DecrementerT decrementerT, + RefresherT refresherT) + throws Coder.NonDeterministicException { + this.quotaIdentifier = quotaIdentifier; + this.reporterT = reporterT; + coder.verifyDeterministic(); + checkArgument(!quotaIdentifier.isEmpty()); + this.quota = quota; + this.coder = coder; + this.enqueuerT = enqueuerT; + this.dequeuerT = dequeuerT; + this.decrementerT = decrementerT; + this.refresherT = refresherT; + } + + @Override + public Call.Result expand(PCollection input) { + Pipeline pipeline = input.getPipeline(); + + // Refresh known quota to control the throttle rate. + Call.Result refreshResult = + pipeline + .apply("quota impulse", PeriodicImpulse.create().withInterval(quota.getInterval())) + .apply("quota refresh", getRefresher()); + + // Enqueue T elements. + Call.Result enqueuResult = input.apply("enqueue", getEnqueuer()); + + TupleTag outputTag = new TupleTag() {}; + TupleTag failureTag = new TupleTag() {}; + + // Perform Throttle. + PCollectionTuple pct = + pipeline + .apply("throttle impulse", PeriodicImpulse.create().withInterval(THROTTLE_INTERVAL)) + .apply( + "throttle fn", + ParDo.of( + new ThrottleFn( + quotaIdentifier, + dequeuerT, + decrementerT, + reporterT, + outputTag, + failureTag)) + .withOutputTags(outputTag, TupleTagList.of(failureTag))); + + PCollection errors = + PCollectionList.of(refreshResult.getFailures()) + .and(enqueuResult.getFailures()) + .and(pct.get(failureTag)) + .apply("errors flatten", Flatten.pCollections()); + + TupleTag resultOutputTag = new TupleTag() {}; + TupleTag resultFailureTag = new TupleTag() {}; + + return Call.Result.of( + coder, + resultOutputTag, + resultFailureTag, + PCollectionTuple.of(resultOutputTag, pct.get(outputTag)).and(resultFailureTag, errors)); + } + + private Call getRefresher() { + return Call.ofCallerAndSetupTeardown(refresherT, VoidCoder.of()); + } + + private Call getEnqueuer() { + return Call.ofCallerAndSetupTeardown(enqueuerT, VoidCoder.of()); + } + + private class ThrottleFn extends DoFn { + private final String quotaIdentifier; + private final DequeuerT dequeuerT; + private final DecrementerT decrementerT; + private final ReporterT reporterT; + private final TupleTag outputTag; + private final TupleTag failureTag; + + private ThrottleFn( + String quotaIdentifier, + DequeuerT dequeuerT, + DecrementerT decrementerT, + ReporterT reporterT, + TupleTag outputTag, + TupleTag failureTag) { + this.quotaIdentifier = quotaIdentifier; + this.dequeuerT = dequeuerT; + this.decrementerT = decrementerT; + this.reporterT = reporterT; + this.outputTag = outputTag; + this.failureTag = failureTag; + } + + @ProcessElement + public void process(@Element Instant instant, MultiOutputReceiver receiver) { + // Check for available quota. + try { + if (reporterT.call(quotaIdentifier) <= 0L) { + return; + } + + // Decrement the quota. + Long quotaAfterDecrement = decrementerT.call(instant); + + // As an additional protection we check what the quota is after decrementing. A value + // < 0 signals that multiple simultaneous workers have attempted to decrement too quickly. + // We don't bother adding the quota back to prevent additional workers from doing the same + // and just wait for the next refresh, exiting the DoFn. + if (quotaAfterDecrement < 0) { + return; + } + + // Dequeue an element if quota available. An error here would not result in loss of data + // as no element would successfully dequeue from the external resource but instead throw. + T element = dequeuerT.call(instant); + + // Finally, emit the element. + receiver.get(outputTag).output(element); + + } catch (UserCodeExecutionException e) { + receiver + .get(failureTag) + .output( + ApiIOError.builder() + // no request to emit as part of the error. + .setRequestAsJsonString("") + .setMessage(Optional.ofNullable(e.getMessage()).orElse("")) + .setObservedTimestamp(Instant.now()) + .setStackTrace(Throwables.getStackTraceAsString(e)) + .build()); + } + } + + @Setup + public void setup() throws UserCodeExecutionException { + enqueuerT.setup(); + dequeuerT.setup(); + decrementerT.setup(); + reporterT.setup(); + } + + @Teardown + public void teardown() throws UserCodeExecutionException { + List messages = new ArrayList<>(); + String format = "%s encountered error during teardown: %s"; + try { + enqueuerT.teardown(); + } catch (UserCodeExecutionException e) { + messages.add(String.format(format, "enqueuerT", e)); + } + try { + dequeuerT.teardown(); + } catch (UserCodeExecutionException e) { + messages.add(String.format(format, "dequeuerT", e)); + } + try { + decrementerT.teardown(); + } catch (UserCodeExecutionException e) { + messages.add(String.format(format, "decrementerT", e)); + } + try { + reporterT.teardown(); + } catch (UserCodeExecutionException e) { + messages.add(String.format(format, "reporterT", e)); + } + + if (!messages.isEmpty()) { + throw new UserCodeExecutionException(String.join("; ", messages)); + } + } + } + + private static class RedisReporter extends RedisSetupTeardown implements Caller { + private RedisReporter(URI uri) { + super(new RedisClient(uri)); + } + + @Override + public Long call(String request) throws UserCodeExecutionException { + return client.getLong(request); + } + } + + private static class RedisEnqueuer extends RedisSetupTeardown implements Caller { + private final String key; + private final Coder coder; + + private RedisEnqueuer(URI uri, String key, Coder coder) { + super(new RedisClient(uri)); + this.key = key; + this.coder = coder; + } + + @Override + public Void call(T request) throws UserCodeExecutionException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try { + coder.encode(request, baos); + } catch (IOException e) { + throw new UserCodeExecutionException(e); + } + client.rpush(key, baos.toByteArray()); + return null; + } + } + + private static class RedisDequeuer extends RedisSetupTeardown implements Caller { + + private final Coder coder; + private final String key; + + private RedisDequeuer(URI uri, Coder coder, String key) { + super(new RedisClient(uri)); + this.coder = coder; + this.key = key; + } + + @Override + public T call(Instant request) throws UserCodeExecutionException { + byte[] bytes = client.lpop(key); + try { + return checkStateNotNull(coder.decode(ByteSource.wrap(bytes).openStream())); + + } catch (IOException e) { + throw new UserCodeExecutionException(e); + } + } + } + + private static class RedisDecrementer extends RedisSetupTeardown + implements Caller { + + private final String key; + + private RedisDecrementer(URI uri, String key) { + super(new RedisClient(uri)); + this.key = key; + } + + @Override + public Long call(Instant request) throws UserCodeExecutionException { + return client.decr(key); + } + } + + private static class RedisRefresher extends RedisSetupTeardown implements Caller { + private final Quota quota; + private final String key; + + private RedisRefresher(URI uri, Quota quota, String key) { + super(new RedisClient(uri)); + this.quota = quota; + this.key = key; + } + + @Override + public Void call(Instant request) throws UserCodeExecutionException { + client.setex(key, quota.getNumRequests(), quota.getInterval()); + return null; + } + } + + private abstract static class RedisSetupTeardown implements SetupTeardown { + protected final RedisClient client; + + private RedisSetupTeardown(RedisClient client) { + this.client = client; + } + + @Override + public void setup() throws UserCodeExecutionException { + client.setup(); + } + + @Override + public void teardown() throws UserCodeExecutionException { + client.teardown(); + } + } +} diff --git a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/CacheIT.java b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/CacheIT.java new file mode 100644 index 0000000000000..95497e6013af1 --- /dev/null +++ b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/CacheIT.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.requestresponse; + +import java.net.URI; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.beam.io.requestresponse.CallTest.Request; +import org.apache.beam.io.requestresponse.CallTest.Response; +import org.apache.beam.sdk.coders.Coder.NonDeterministicException; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.joda.time.Duration; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.utility.DockerImageName; + +/** Integration tests for {@link Cache}. */ +@RunWith(JUnit4.class) +public class CacheIT { + @Rule public TestPipeline writePipeline = TestPipeline.create(); + + @Rule public TestPipeline readPipeline = TestPipeline.create(); + + private static final String CONTAINER_IMAGE_NAME = "redis:5.0.3-alpine"; + private static final Integer PORT = 6379; + + @Rule + public GenericContainer redis = + new GenericContainer<>(DockerImageName.parse(CONTAINER_IMAGE_NAME)).withExposedPorts(PORT); + + @Rule + public RedisExternalResourcesRule externalClients = + new RedisExternalResourcesRule( + () -> { + redis.start(); + return URI.create( + String.format("redis://%s:%d", redis.getHost(), redis.getFirstMappedPort())); + }); + + @Test + public void givenRequestResponsesCached_writeThenReadYieldsMatches() + throws NonDeterministicException { + List> toWrite = + ImmutableList.of( + KV.of(new Request("a"), new Response("a")), + KV.of(new Request("b"), new Response("b")), + KV.of(new Request("c"), new Response("c"))); + List toRead = ImmutableList.of(new Request("a"), new Request("b"), new Request("c")); + writeThenReadThenPAssert(toWrite, toRead, toWrite); + } + + @Test + public void givenNoMatchingRequestResponsePairs_yieldsKVsWithNullValues() + throws NonDeterministicException { + List> toWrite = + ImmutableList.of( + KV.of(new Request("a"), new Response("a")), + KV.of(new Request("b"), new Response("b")), + KV.of(new Request("c"), new Response("c"))); + List toRead = ImmutableList.of(new Request("d"), new Request("e"), new Request("f")); + List> expected = + toRead.stream() + .>map(request -> KV.of(request, null)) + .collect(Collectors.toList()); + writeThenReadThenPAssert(toWrite, toRead, expected); + } + + private void writeThenReadThenPAssert( + List> toWrite, + List toRead, + List> expected) + throws NonDeterministicException { + PCollection> toWritePCol = writePipeline.apply(Create.of(toWrite)); + toWritePCol.apply( + Cache.writeUsingRedis( + Duration.standardHours(1L), + externalClients.getActualClient(), + CallTest.DETERMINISTIC_REQUEST_CODER, + CallTest.DETERMINISTIC_RESPONSE_CODER)); + + PCollection requests = + readPipeline.apply(Create.of(toRead)).setCoder(CallTest.DETERMINISTIC_REQUEST_CODER); + + Call.Result> gotKVsResult = + requests.apply( + Cache.readUsingRedis( + externalClients.getActualClient(), + CallTest.DETERMINISTIC_REQUEST_CODER, + CallTest.DETERMINISTIC_RESPONSE_CODER)); + + PAssert.that(gotKVsResult.getFailures()).empty(); + PAssert.that(gotKVsResult.getResponses()).containsInAnyOrder(expected); + + writePipeline.run().waitUntilFinish(); + readPipeline.run(); + } +} diff --git a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/CacheTest.java b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/CacheTest.java new file mode 100644 index 0000000000000..fcb7862e991ee --- /dev/null +++ b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/CacheTest.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.requestresponse; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.junit.Assert.assertThrows; + +import java.net.URI; +import org.apache.beam.io.requestresponse.CallTest.Request; +import org.apache.beam.io.requestresponse.CallTest.Response; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.Coder.NonDeterministicException; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.UncheckedExecutionException; +import org.joda.time.Duration; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link Cache}. */ +@RunWith(JUnit4.class) +public class CacheTest { + @Rule public TestPipeline pipeline = TestPipeline.create(); + + @Test + public void givenNonDeterministicCoder_readUsingRedis_throwsError() + throws Coder.NonDeterministicException { + URI uri = URI.create("redis://localhost:6379"); + assertThrows( + NonDeterministicException.class, + () -> + Cache.readUsingRedis( + new RedisClient(uri), + CallTest.NON_DETERMINISTIC_REQUEST_CODER, + CallTest.DETERMINISTIC_RESPONSE_CODER)); + + assertThrows( + NonDeterministicException.class, + () -> + Cache.readUsingRedis( + new RedisClient(uri), + CallTest.DETERMINISTIC_REQUEST_CODER, + CallTest.NON_DETERMINISTIC_RESPONSE_CODER)); + + Cache.readUsingRedis( + new RedisClient(uri), + CallTest.DETERMINISTIC_REQUEST_CODER, + CallTest.DETERMINISTIC_RESPONSE_CODER); + } + + @Test + public void givenNonDeterministicCoder_writeUsingRedis_throwsError() + throws Coder.NonDeterministicException { + URI uri = URI.create("redis://localhost:6379"); + Duration expiry = Duration.standardSeconds(1L); + assertThrows( + NonDeterministicException.class, + () -> + Cache.writeUsingRedis( + expiry, + new RedisClient(uri), + CallTest.NON_DETERMINISTIC_REQUEST_CODER, + CallTest.DETERMINISTIC_RESPONSE_CODER)); + + assertThrows( + NonDeterministicException.class, + () -> + Cache.writeUsingRedis( + expiry, + new RedisClient(uri), + CallTest.DETERMINISTIC_REQUEST_CODER, + CallTest.NON_DETERMINISTIC_RESPONSE_CODER)); + + Cache.writeUsingRedis( + expiry, + new RedisClient(uri), + CallTest.DETERMINISTIC_REQUEST_CODER, + CallTest.DETERMINISTIC_RESPONSE_CODER); + } + + @Test + public void givenWrongRedisURI_throwsError() throws NonDeterministicException { + URI uri = URI.create("redis://1.2.3.4:6379"); + Duration expiry = Duration.standardSeconds(1L); + PCollection requests = + pipeline + .apply("create requests", Create.of(new Request(""))) + .setCoder(CallTest.DETERMINISTIC_REQUEST_CODER); + requests.apply( + "readUsingRedis", + Cache.readUsingRedis( + new RedisClient(uri), + CallTest.DETERMINISTIC_REQUEST_CODER, + CallTest.DETERMINISTIC_RESPONSE_CODER)); + + PCollection> kvs = + pipeline.apply("create kvs", Create.of(KV.of(new Request(""), new Response("")))); + kvs.apply( + "writeUsingRedis", + Cache.writeUsingRedis( + expiry, + new RedisClient(uri), + CallTest.DETERMINISTIC_REQUEST_CODER, + CallTest.DETERMINISTIC_RESPONSE_CODER)); + + UncheckedExecutionException error = + assertThrows(UncheckedExecutionException.class, pipeline::run); + assertThat( + error.getCause().getMessage(), + containsString("Failed to connect to host: redis://1.2.3.4:6379")); + } +} diff --git a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/CallTest.java b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/CallTest.java index 18574b00978dc..1566d1725295e 100644 --- a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/CallTest.java +++ b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/CallTest.java @@ -22,9 +22,17 @@ import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.io.Serializable; import org.apache.beam.io.requestresponse.Call.Result; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.CustomCoder; +import org.apache.beam.sdk.coders.NullableCoder; import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Count; @@ -36,6 +44,8 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Throwables; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.UncheckedExecutionException; import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.jetbrains.annotations.NotNull; import org.joda.time.Duration; import org.junit.Rule; import org.junit.Test; @@ -47,13 +57,23 @@ public class CallTest { @Rule public TestPipeline pipeline = TestPipeline.create(); - private static final SerializableCoder<@NonNull Response> RESPONSE_CODER = + static final SerializableCoder<@NonNull Request> NON_DETERMINISTIC_REQUEST_CODER = + SerializableCoder.of(Request.class); + + static final Coder<@NonNull Request> DETERMINISTIC_REQUEST_CODER = + new DeterministicRequestCoder(); + + static final SerializableCoder<@NonNull Response> NON_DETERMINISTIC_RESPONSE_CODER = SerializableCoder.of(Response.class); + static final Coder<@NonNull Response> DETERMINISTIC_RESPONSE_CODER = + new DeterministicResponseCoder(); + @Test public void givenCallerNotSerializable_throwsError() { assertThrows( - IllegalArgumentException.class, () -> Call.of(new UnSerializableCaller(), RESPONSE_CODER)); + IllegalArgumentException.class, + () -> Call.of(new UnSerializableCaller(), NON_DETERMINISTIC_RESPONSE_CODER)); } @Test @@ -62,7 +82,7 @@ public void givenSetupTeardownNotSerializable_throwsError() { IllegalArgumentException.class, () -> Call.ofCallerAndSetupTeardown( - new UnSerializableCallerWithSetupTeardown(), RESPONSE_CODER)); + new UnSerializableCallerWithSetupTeardown(), NON_DETERMINISTIC_RESPONSE_CODER)); } @Test @@ -70,7 +90,10 @@ public void givenCallerThrowsUserCodeExecutionException_emitsIntoFailurePCollect Result result = pipeline .apply(Create.of(new Request("a"))) - .apply(Call.of(new CallerThrowsUserCodeExecutionException(), RESPONSE_CODER)); + .apply( + Call.of( + new CallerThrowsUserCodeExecutionException(), + NON_DETERMINISTIC_RESPONSE_CODER)); PCollection failures = result.getFailures(); PAssert.thatSingleton(countStackTracesOf(failures, UserCodeExecutionException.class)) @@ -87,7 +110,7 @@ public void givenCallerThrowsQuotaException_emitsIntoFailurePCollection() { Result result = pipeline .apply(Create.of(new Request("a"))) - .apply(Call.of(new CallerInvokesQuotaException(), RESPONSE_CODER)); + .apply(Call.of(new CallerInvokesQuotaException(), NON_DETERMINISTIC_RESPONSE_CODER)); PCollection failures = result.getFailures(); PAssert.thatSingleton(countStackTracesOf(failures, UserCodeExecutionException.class)) @@ -105,7 +128,9 @@ public void givenCallerTimeout_emitsFailurePCollection() { Result result = pipeline .apply(Create.of(new Request("a"))) - .apply(Call.of(new CallerExceedsTimeout(timeout), RESPONSE_CODER).withTimeout(timeout)); + .apply( + Call.of(new CallerExceedsTimeout(timeout), NON_DETERMINISTIC_RESPONSE_CODER) + .withTimeout(timeout)); PCollection failures = result.getFailures(); PAssert.thatSingleton(countStackTracesOf(failures, UserCodeExecutionException.class)) @@ -122,7 +147,7 @@ public void givenCallerThrowsTimeoutException_emitsFailurePCollection() { Result result = pipeline .apply(Create.of(new Request("a"))) - .apply(Call.of(new CallerThrowsTimeout(), RESPONSE_CODER)); + .apply(Call.of(new CallerThrowsTimeout(), NON_DETERMINISTIC_RESPONSE_CODER)); PCollection failures = result.getFailures(); PAssert.thatSingleton(countStackTracesOf(failures, UserCodeExecutionException.class)) @@ -139,7 +164,7 @@ public void givenSetupThrowsUserCodeExecutionException_throwsError() { pipeline .apply(Create.of(new Request(""))) .apply( - Call.of(new ValidCaller(), RESPONSE_CODER) + Call.of(new ValidCaller(), NON_DETERMINISTIC_RESPONSE_CODER) .withSetupTeardown(new SetupThrowsUserCodeExecutionException())); assertPipelineThrows(UserCodeExecutionException.class, pipeline); @@ -150,7 +175,7 @@ public void givenSetupThrowsQuotaException_throwsError() { pipeline .apply(Create.of(new Request(""))) .apply( - Call.of(new ValidCaller(), RESPONSE_CODER) + Call.of(new ValidCaller(), NON_DETERMINISTIC_RESPONSE_CODER) .withSetupTeardown(new SetupThrowsUserCodeQuotaException())); assertPipelineThrows(UserCodeQuotaException.class, pipeline); @@ -163,7 +188,7 @@ public void givenSetupTimeout_throwsError() { pipeline .apply(Create.of(new Request(""))) .apply( - Call.of(new ValidCaller(), RESPONSE_CODER) + Call.of(new ValidCaller(), NON_DETERMINISTIC_RESPONSE_CODER) .withSetupTeardown(new SetupExceedsTimeout(timeout)) .withTimeout(timeout)); @@ -175,7 +200,7 @@ public void givenSetupThrowsTimeoutException_throwsError() { pipeline .apply(Create.of(new Request(""))) .apply( - Call.of(new ValidCaller(), RESPONSE_CODER) + Call.of(new ValidCaller(), NON_DETERMINISTIC_RESPONSE_CODER) .withSetupTeardown(new SetupThrowsUserCodeTimeoutException())); assertPipelineThrows(UserCodeTimeoutException.class, pipeline); @@ -186,7 +211,7 @@ public void givenTeardownThrowsUserCodeExecutionException_throwsError() { pipeline .apply(Create.of(new Request(""))) .apply( - Call.of(new ValidCaller(), RESPONSE_CODER) + Call.of(new ValidCaller(), NON_DETERMINISTIC_RESPONSE_CODER) .withSetupTeardown(new TeardownThrowsUserCodeExecutionException())); // Exceptions thrown during teardown do not populate with the cause @@ -198,7 +223,7 @@ public void givenTeardownThrowsQuotaException_throwsError() { pipeline .apply(Create.of(new Request(""))) .apply( - Call.of(new ValidCaller(), RESPONSE_CODER) + Call.of(new ValidCaller(), NON_DETERMINISTIC_RESPONSE_CODER) .withSetupTeardown(new TeardownThrowsUserCodeQuotaException())); // Exceptions thrown during teardown do not populate with the cause @@ -211,7 +236,7 @@ public void givenTeardownTimeout_throwsError() { pipeline .apply(Create.of(new Request(""))) .apply( - Call.of(new ValidCaller(), RESPONSE_CODER) + Call.of(new ValidCaller(), NON_DETERMINISTIC_RESPONSE_CODER) .withTimeout(timeout) .withSetupTeardown(new TeardownExceedsTimeout(timeout))); @@ -224,7 +249,7 @@ public void givenTeardownThrowsTimeoutException_throwsError() { pipeline .apply(Create.of(new Request(""))) .apply( - Call.of(new ValidCaller(), RESPONSE_CODER) + Call.of(new ValidCaller(), NON_DETERMINISTIC_RESPONSE_CODER) .withSetupTeardown(new TeardownThrowsUserCodeTimeoutException())); // Exceptions thrown during teardown do not populate with the cause @@ -236,7 +261,7 @@ public void givenValidCaller_emitValidResponse() { Result result = pipeline .apply(Create.of(new Request("a"))) - .apply(Call.of(new ValidCaller(), RESPONSE_CODER)); + .apply(Call.of(new ValidCaller(), NON_DETERMINISTIC_RESPONSE_CODER)); PAssert.thatSingleton(result.getFailures().apply(Count.globally())).isEqualTo(0L); PAssert.that(result.getResponses()).containsInAnyOrder(new Response("a")); @@ -275,7 +300,7 @@ public void teardown() throws UserCodeExecutionException {} private static class UnSerializable {} - private static class Request implements Serializable { + static class Request implements Serializable { final String id; @@ -305,7 +330,7 @@ public int hashCode() { } } - private static class Response implements Serializable { + static class Response implements Serializable { final String id; Response(String id) { @@ -490,4 +515,55 @@ private static void sleep(Duration timeout) { } catch (InterruptedException ignored) { } } + + private static class DeterministicRequestCoder extends CustomCoder<@NonNull Request> { + private static final Coder ID_CODER = StringUtf8Coder.of(); + + @Override + public void encode(Request value, @NotNull OutputStream outStream) + throws CoderException, IOException { + ID_CODER.encode(checkStateNotNull(value).id, outStream); + } + + @Override + public @NonNull Request decode(@NotNull InputStream inStream) + throws CoderException, IOException { + String id = ID_CODER.decode(inStream); + return new Request(id); + } + + @Override + public void verifyDeterministic() throws NonDeterministicException { + ID_CODER.verifyDeterministic(); + } + } + + private static class DeterministicResponseCoder extends CustomCoder { + private static final NullableCoder ID_CODER = NullableCoder.of(StringUtf8Coder.of()); + + @Override + public void encode(@Nullable Response value, @NotNull OutputStream outStream) + throws CoderException, IOException { + if (value == null) { + ID_CODER.encode(null, outStream); + return; + } + ID_CODER.encode(checkStateNotNull(value).id, outStream); + } + + @Override + public Response decode(@NotNull InputStream inStream) throws CoderException, IOException { + try { + String id = ID_CODER.decode(inStream); + return new Response(id); + } catch (CoderException ignored) { + return null; + } + } + + @Override + public void verifyDeterministic() throws NonDeterministicException { + ID_CODER.verifyDeterministic(); + } + } } diff --git a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/EchoGRPCCallerWithSetupTeardownTestIT.java b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/EchoGRPCCallerWithSetupTeardownIT.java similarity index 91% rename from sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/EchoGRPCCallerWithSetupTeardownTestIT.java rename to sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/EchoGRPCCallerWithSetupTeardownIT.java index 14b6e9e6433d4..c10b7ee1609e5 100644 --- a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/EchoGRPCCallerWithSetupTeardownTestIT.java +++ b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/EchoGRPCCallerWithSetupTeardownIT.java @@ -17,6 +17,7 @@ */ package org.apache.beam.io.requestresponse; +import static org.apache.beam.io.requestresponse.EchoITOptions.GRPC_ENDPOINT_ADDRESS_NAME; import static org.apache.beam.sdk.io.common.IOITHelper.readIOTestPipelineOptions; import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; import static org.junit.Assert.assertEquals; @@ -27,6 +28,7 @@ import org.apache.beam.testinfra.mockapis.echo.v1.Echo.EchoRequest; import org.apache.beam.testinfra.mockapis.echo.v1.Echo.EchoResponse; import org.apache.beam.testinfra.mockapis.echo.v1.EchoServiceGrpc; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import org.checkerframework.checker.nullness.qual.NonNull; import org.junit.AfterClass; @@ -41,7 +43,7 @@ * running integration tests. */ @RunWith(JUnit4.class) -public class EchoGRPCCallerWithSetupTeardownTestIT { +public class EchoGRPCCallerWithSetupTeardownIT { private static @MonotonicNonNull EchoITOptions options; private static @MonotonicNonNull EchoGRPCCallerWithSetupTeardown client; @@ -50,11 +52,15 @@ public class EchoGRPCCallerWithSetupTeardownTestIT { @BeforeClass public static void setUp() throws UserCodeExecutionException { options = readIOTestPipelineOptions(EchoITOptions.class); - if (options.getgRPCEndpointAddress().isEmpty()) { + if (Strings.isNullOrEmpty(options.getGrpcEndpointAddress())) { throw new RuntimeException( - "--gRPCEndpointAddress is missing. See " + EchoITOptions.class + "for details."); + "--" + + GRPC_ENDPOINT_ADDRESS_NAME + + " is missing. See " + + EchoITOptions.class + + "for details."); } - client = EchoGRPCCallerWithSetupTeardown.of(URI.create(options.getgRPCEndpointAddress())); + client = EchoGRPCCallerWithSetupTeardown.of(URI.create(options.getGrpcEndpointAddress())); checkStateNotNull(client).setup(); EchoRequest request = createShouldExceedQuotaRequest(); diff --git a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/EchoHTTPCallerTestIT.java b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/EchoHTTPCallerIT.java similarity index 87% rename from sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/EchoHTTPCallerTestIT.java rename to sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/EchoHTTPCallerIT.java index fa0cb93781100..10b92b2610d92 100644 --- a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/EchoHTTPCallerTestIT.java +++ b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/EchoHTTPCallerIT.java @@ -17,6 +17,7 @@ */ package org.apache.beam.io.requestresponse; +import static org.apache.beam.io.requestresponse.EchoITOptions.HTTP_ENDPOINT_ADDRESS_NAME; import static org.apache.beam.sdk.io.common.IOITHelper.readIOTestPipelineOptions; import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; import static org.junit.Assert.assertEquals; @@ -28,6 +29,7 @@ import org.apache.beam.testinfra.mockapis.echo.v1.Echo.EchoRequest; import org.apache.beam.testinfra.mockapis.echo.v1.Echo.EchoResponse; import org.apache.beam.testinfra.mockapis.echo.v1.EchoServiceGrpc; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import org.checkerframework.checker.nullness.qual.NonNull; import org.junit.BeforeClass; @@ -36,12 +38,12 @@ import org.junit.runners.JUnit4; /** - * Tests for {@link EchoHTTPCallerTestIT} on a deployed {@link EchoServiceGrpc} instance's HTTP - * handler. See {@link EchoITOptions} for details on the required parameters and how to provide - * these for running integration tests. + * Tests for {@link EchoHTTPCallerIT} on a deployed {@link EchoServiceGrpc} instance's HTTP handler. + * See {@link EchoITOptions} for details on the required parameters and how to provide these for + * running integration tests. */ @RunWith(JUnit4.class) -public class EchoHTTPCallerTestIT { +public class EchoHTTPCallerIT { private static @MonotonicNonNull EchoITOptions options; private static @MonotonicNonNull EchoHTTPCaller client; @@ -50,9 +52,13 @@ public class EchoHTTPCallerTestIT { @BeforeClass public static void setUp() throws UserCodeExecutionException { options = readIOTestPipelineOptions(EchoITOptions.class); - if (options.getHttpEndpointAddress().isEmpty()) { + if (Strings.isNullOrEmpty(options.getHttpEndpointAddress())) { throw new RuntimeException( - "--httpEndpointAddress is missing. See " + EchoITOptions.class + "for details."); + "--" + + HTTP_ENDPOINT_ADDRESS_NAME + + " is missing. See " + + EchoITOptions.class + + "for details."); } client = EchoHTTPCaller.of(URI.create(options.getHttpEndpointAddress())); diff --git a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/EchoITOptions.java b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/EchoITOptions.java index a32f7a78e8265..dabec75089263 100644 --- a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/EchoITOptions.java +++ b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/EchoITOptions.java @@ -37,10 +37,13 @@ * */ public interface EchoITOptions extends PipelineOptions { + String GRPC_ENDPOINT_ADDRESS_NAME = "grpcEndpointAddress"; + String HTTP_ENDPOINT_ADDRESS_NAME = "httpEndpointAddress"; + @Description("The gRPC address of the Echo API endpoint, typically of the form :.") - String getgRPCEndpointAddress(); + String getGrpcEndpointAddress(); - void setgRPCEndpointAddress(String value); + void setGrpcEndpointAddress(String value); @Description("The HTTP address of the Echo API endpoint; must being with http(s)://") String getHttpEndpointAddress(); diff --git a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/EchoRequestCoder.java b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/EchoRequestCoder.java new file mode 100644 index 0000000000000..75cd49904cff8 --- /dev/null +++ b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/EchoRequestCoder.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.requestresponse; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.CustomCoder; +import org.apache.beam.testinfra.mockapis.echo.v1.Echo.EchoRequest; +import org.checkerframework.checker.nullness.qual.NonNull; + +class EchoRequestCoder extends CustomCoder<@NonNull EchoRequest> { + + @Override + public void encode(@NonNull EchoRequest value, @NonNull OutputStream outStream) + throws CoderException, IOException { + value.writeTo(outStream); + } + + @Override + public @NonNull EchoRequest decode(@NonNull InputStream inStream) + throws CoderException, IOException { + return EchoRequest.parseFrom(inStream); + } + + @Override + public void verifyDeterministic() throws NonDeterministicException {} +} diff --git a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/RedisClientTestIT.java b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/RedisClientIT.java similarity index 89% rename from sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/RedisClientTestIT.java rename to sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/RedisClientIT.java index 1fbb320a5f23c..939515836bff3 100644 --- a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/RedisClientTestIT.java +++ b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/RedisClientIT.java @@ -20,6 +20,8 @@ import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.allPrimitiveDataTypes; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; @@ -48,7 +50,7 @@ /** Integration tests for {@link RedisClient}. */ @RunWith(JUnit4.class) -public class RedisClientTestIT { +public class RedisClientIT { private static final String CONTAINER_IMAGE_NAME = "redis:5.0.3-alpine"; private static final Integer PORT = 6379; @@ -206,4 +208,24 @@ public void setThenDecrThenIncr_yieldsExpectedValue() throws UserCodeExecutionEx public void givenKeyNotExists_getLong_yieldsZero() throws UserCodeExecutionException { assertEquals(0L, externalClients.getActualClient().getLong(UUID.randomUUID().toString())); } + + @Test + public void givenKeyNotExists_getBytes_yieldsNull() throws UserCodeExecutionException { + assertNull( + externalClients + .getActualClient() + .getBytes(UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8))); + } + + @Test + public void givenKeyExists_getBytes_yieldsValue() throws UserCodeExecutionException { + byte[] keyBytes = UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8); + String expected = UUID.randomUUID().toString(); + byte[] expectedBytes = expected.getBytes(StandardCharsets.UTF_8); + externalClients.getValidatingClient().set(keyBytes, expectedBytes); + byte[] actualBytes = externalClients.getActualClient().getBytes(keyBytes); + assertNotNull(actualBytes); + String actual = new String(actualBytes, StandardCharsets.UTF_8); + assertEquals(expected, actual); + } } diff --git a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/ThrottleWithExternalResourceIT.java b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/ThrottleWithExternalResourceIT.java new file mode 100644 index 0000000000000..24db38f926eed --- /dev/null +++ b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/ThrottleWithExternalResourceIT.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.requestresponse; + +import static org.apache.beam.io.requestresponse.EchoITOptions.GRPC_ENDPOINT_ADDRESS_NAME; +import static org.apache.beam.sdk.io.common.IOITHelper.readIOTestPipelineOptions; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; +import static org.apache.beam.sdk.values.TypeDescriptors.strings; + +import com.google.protobuf.ByteString; +import java.net.URI; +import java.util.UUID; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.Coder.NonDeterministicException; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.TestPipelineOptions; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.Distinct; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PeriodicImpulse; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.testinfra.mockapis.echo.v1.Echo.EchoRequest; +import org.apache.beam.testinfra.mockapis.echo.v1.Echo.EchoResponse; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; +import org.checkerframework.checker.nullness.qual.MonotonicNonNull; +import org.joda.time.Duration; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.utility.DockerImageName; + +/** + * Integration tests for {@link ThrottleWithExternalResource}. See {@link EchoITOptions} for details + * on the required parameters and how to provide these for running integration tests. + */ +public class ThrottleWithExternalResourceIT { + + @Rule public TestPipeline pipeline = TestPipeline.create(); + + private static final String QUOTA_ID = "echo-ThrottleWithExternalResourceTestIT-10-per-1s-quota"; + private static final Quota QUOTA = new Quota(1L, Duration.standardSeconds(1L)); + private static final ByteString PAYLOAD = ByteString.copyFromUtf8("payload"); + private static @MonotonicNonNull EchoITOptions options; + private static @MonotonicNonNull EchoGRPCCallerWithSetupTeardown client; + private static final String CONTAINER_IMAGE_NAME = "redis:5.0.3-alpine"; + private static final Integer PORT = 6379; + private static final EchoRequestCoder REQUEST_CODER = new EchoRequestCoder(); + private static final Coder RESPONSE_CODER = + SerializableCoder.of(TypeDescriptor.of(EchoResponse.class)); + + @Rule + public GenericContainer redis = + new GenericContainer<>(DockerImageName.parse(CONTAINER_IMAGE_NAME)).withExposedPorts(PORT); + + @BeforeClass + public static void setUp() throws UserCodeExecutionException { + options = readIOTestPipelineOptions(EchoITOptions.class); + if (Strings.isNullOrEmpty(options.getGrpcEndpointAddress())) { + throw new RuntimeException( + "--" + + GRPC_ENDPOINT_ADDRESS_NAME + + " is missing. See " + + EchoITOptions.class + + "for details."); + } + client = EchoGRPCCallerWithSetupTeardown.of(URI.create(options.getGrpcEndpointAddress())); + checkStateNotNull(client).setup(); + + try { + client.call(createRequest()); + } catch (UserCodeExecutionException e) { + if (e instanceof UserCodeQuotaException) { + throw new RuntimeException( + String.format( + "The quota: %s is set to refresh on an interval. Unless there are failures in this test, wait for a few seconds before running the test again.", + QUOTA_ID), + e); + } + throw e; + } + } + + @AfterClass + public static void tearDown() throws UserCodeExecutionException { + checkStateNotNull(client).teardown(); + } + + @Test + public void givenThrottleUsingRedis_preventsQuotaErrors() throws NonDeterministicException { + URI uri = + URI.create(String.format("redis://%s:%d", redis.getHost(), redis.getFirstMappedPort())); + pipeline.getOptions().as(TestPipelineOptions.class).setBlockOnRun(false); + + Call.Result throttleResult = + createRequestStream() + .apply( + "throttle", + ThrottleWithExternalResource.usingRedis( + uri, QUOTA_ID, UUID.randomUUID().toString(), QUOTA, REQUEST_CODER)); + + // Assert that we are not getting any errors and successfully emitting throttled elements. + PAssert.that(throttleResult.getFailures()).empty(); + PAssert.thatSingleton( + throttleResult + .getResponses() + .apply( + "window throttled", Window.into(FixedWindows.of(Duration.standardSeconds(1L)))) + .apply( + "count throttled", + Combine.globally(Count.combineFn()).withoutDefaults())) + .notEqualTo(0L); + + // Assert that all the throttled data is not corrupted. + PAssert.that( + throttleResult + .getResponses() + .apply( + "window throttled before extraction", + Window.into(FixedWindows.of(Duration.standardSeconds(1L)))) + .apply( + "extract request id", + MapElements.into(strings()).via(input -> checkStateNotNull(input).getId())) + .apply("distinct", Distinct.create())) + .containsInAnyOrder(QUOTA_ID); + + // Call the Echo service with throttled requests. + Call.Result echoResult = + throttleResult + .getResponses() + .apply("call", Call.ofCallerAndSetupTeardown(client, RESPONSE_CODER)); + + // Assert that there were no errors. + PAssert.that(echoResult.getFailures()).empty(); + + // Assert that the responses match the requests. + PAssert.that( + echoResult + .getResponses() + .apply( + "window responses before extraction", + Window.into(FixedWindows.of(Duration.standardSeconds(1L)))) + .apply( + "extract response id", + MapElements.into(strings()).via(input -> checkStateNotNull(input).getId()))) + .containsInAnyOrder(QUOTA_ID); + + PipelineResult job = pipeline.run(); + job.waitUntilFinish(Duration.standardSeconds(3L)); + } + + private static EchoRequest createRequest() { + return EchoRequest.newBuilder().setId(QUOTA_ID).setPayload(PAYLOAD).build(); + } + + private PCollection createRequestStream() { + return pipeline + .apply("impulse", PeriodicImpulse.create().withInterval(Duration.millis(10L))) + .apply( + "requests", + MapElements.into(TypeDescriptor.of(EchoRequest.class)).via(ignored -> createRequest())); + } +} diff --git a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/ThrottleWithExternalResourceTest.java b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/ThrottleWithExternalResourceTest.java new file mode 100644 index 0000000000000..591ba923201fc --- /dev/null +++ b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/ThrottleWithExternalResourceTest.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.requestresponse; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.junit.Assert.assertThrows; + +import java.net.URI; +import org.apache.beam.io.requestresponse.CallTest.Request; +import org.apache.beam.sdk.coders.Coder.NonDeterministicException; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.UncheckedExecutionException; +import org.joda.time.Duration; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link ThrottleWithExternalResource}. */ +@RunWith(JUnit4.class) +public class ThrottleWithExternalResourceTest { + @Rule public TestPipeline pipeline = TestPipeline.create(); + + @Test + public void givenNonDeterministicCoder_usingRedis_throwsError() throws NonDeterministicException { + URI uri = URI.create("redis://localhost:6379"); + String quotaIdentifier = "quota"; + String queueKey = "queue"; + Quota quota = new Quota(10L, Duration.standardSeconds(1L)); + + assertThrows( + NonDeterministicException.class, + () -> + ThrottleWithExternalResource.usingRedis( + uri, quotaIdentifier, queueKey, quota, CallTest.NON_DETERMINISTIC_REQUEST_CODER)); + + ThrottleWithExternalResource.usingRedis( + uri, quotaIdentifier, queueKey, quota, CallTest.DETERMINISTIC_REQUEST_CODER); + } + + @Test + public void givenWrongRedisURI_throwsError() throws NonDeterministicException { + URI uri = URI.create("redis://1.2.3.4:6379"); + String quotaIdentifier = "quota"; + String queueKey = "queue"; + Quota quota = new Quota(10L, Duration.standardSeconds(1L)); + PCollection requests = + pipeline.apply(Create.of(new Request(""))).setCoder(CallTest.DETERMINISTIC_REQUEST_CODER); + requests.apply( + ThrottleWithExternalResource.usingRedis( + uri, quotaIdentifier, queueKey, quota, requests.getCoder())); + + UncheckedExecutionException error = + assertThrows(UncheckedExecutionException.class, pipeline::run); + assertThat( + error.getCause().getMessage(), + containsString("Failed to connect to host: redis://1.2.3.4:6379")); + } +}