diff --git a/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/config/SuperStream.java b/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/config/SuperStream.java index 0d74183d79..6523ee8814 100644 --- a/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/config/SuperStream.java +++ b/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/config/SuperStream.java @@ -1,5 +1,5 @@ /* - * Copyright 2022 the original author or authors. + * Copyright 2022-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,6 +18,7 @@ import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.function.BiFunction; @@ -36,8 +37,8 @@ * Create Super Stream Topology {@link Declarable}s. * * @author Gary Russell + * @author Sergei Kurenchuk * @since 3.0 - * */ public class SuperStream extends Declarables { @@ -47,9 +48,22 @@ public class SuperStream extends Declarables { * @param partitions the number of partitions. */ public SuperStream(String name, int partitions) { + this(name, partitions, Map.of()); + } + + /** + * Create a Super Stream with the provided parameters. + * @param name the stream name. + * @param partitions the number of partitions. + * @param arguments the stream arguments + * @since 3.1 + */ + public SuperStream(String name, int partitions, Map arguments) { this(name, partitions, (q, i) -> IntStream.range(0, i) .mapToObj(String::valueOf) - .collect(Collectors.toList())); + .collect(Collectors.toList()), + arguments + ); } /** @@ -61,19 +75,37 @@ public SuperStream(String name, int partitions) { * partitions, the returned list must have a size equal to the partitions. */ public SuperStream(String name, int partitions, BiFunction> routingKeyStrategy) { - super(declarables(name, partitions, routingKeyStrategy)); + this(name, partitions, routingKeyStrategy, Map.of()); + } + + /** + * Create a Super Stream with the provided parameters. + * @param name the stream name. + * @param partitions the number of partitions. + * @param routingKeyStrategy a strategy to determine routing keys to use for the + * partitions. The first parameter is the queue name, the second the number of + * partitions, the returned list must have a size equal to the partitions. + * @param arguments the stream arguments + * @since 3.1 + */ + public SuperStream(String name, int partitions, BiFunction> routingKeyStrategy, Map arguments) { + super(declarables(name, partitions, routingKeyStrategy, arguments)); } private static Collection declarables(String name, int partitions, - BiFunction> routingKeyStrategy) { + BiFunction> routingKeyStrategy, + Map arguments) { List declarables = new ArrayList<>(); List rks = routingKeyStrategy.apply(name, partitions); Assert.state(rks.size() == partitions, () -> "Expected " + partitions + " routing keys, not " + rks.size()); declarables.add(new DirectExchange(name, true, false, Map.of("x-super-stream", true))); + + Map argumentsCopy = new HashMap<>(arguments); + argumentsCopy.put("x-queue-type", "stream"); for (int i = 0; i < partitions; i++) { String rk = rks.get(i); - Queue q = new Queue(name + "-" + i, true, false, false, Map.of("x-queue-type", "stream")); + Queue q = new Queue(name + "-" + i, true, false, false, argumentsCopy); declarables.add(q); declarables.add(new Binding(q.getName(), DestinationType.QUEUE, name, rk, Map.of("x-stream-partition-order", i))); diff --git a/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/config/SuperStreamBuilder.java b/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/config/SuperStreamBuilder.java new file mode 100644 index 0000000000..b2a4b574a7 --- /dev/null +++ b/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/config/SuperStreamBuilder.java @@ -0,0 +1,165 @@ +/* + * Copyright 2021-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.rabbit.stream.config; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.BiFunction; + +import org.springframework.util.StringUtils; + +/** + * Builds a Spring AMQP Super Stream using a fluent API. + * Based on Streams documentation + * + * @author Sergei Kurenchuk + * @since 3.1 + */ +public class SuperStreamBuilder { + private final Map arguments = new HashMap<>(); + private String name; + private int partitions = -1; + + private BiFunction> routingKeyStrategy; + + /** + * Creates a builder for Super Stream. + * @param name stream name + * @return the builder + */ + public static SuperStreamBuilder superStream(String name) { + SuperStreamBuilder builder = new SuperStreamBuilder(); + builder.name(name); + return builder; + } + + /** + * Creates a builder for Super Stream. + * @param name stream name + * @param partitions partitions number + * @return the builder + */ + public static SuperStreamBuilder superStream(String name, int partitions) { + return superStream(name).partitions(partitions); + } + + /** + * Set the maximum age retention per stream, which will remove the oldest data. + * @param maxAge valid units: Y, M, D, h, m, s. For example: "7D" for a week + * @return the builder + */ + public SuperStreamBuilder maxAge(String maxAge) { + return withArgument("x-max-age", maxAge); + } + + /** + * Set the maximum log size as the retention configuration for each stream, + * which will truncate the log based on the data size. + * @param bytes the max total size in bytes + * @return the builder + */ + public SuperStreamBuilder maxLength(int bytes) { + return withArgument("max-length-bytes", bytes); + } + + /** + * Set the maximum size limit for segment file. + * @param bytes the max segments size in bytes + * @return the builder + */ + public SuperStreamBuilder maxSegmentSize(int bytes) { + return withArgument("x-stream-max-segment-size-bytes", bytes); + } + + /** + * Set initial replication factor for each partition. + * @param count number of nodes per partition + * @return the builder + */ + public SuperStreamBuilder initialClusterSize(int count) { + return withArgument("x-initial-cluster-size", count); + } + + /** + * Set extra argument which is not covered by builder's methods. + * @param key argument name + * @param value argument value + * @return the builder + */ + public SuperStreamBuilder withArgument(String key, Object value) { + if ("x-queue-type".equals(key) && !"stream".equals(value)) { + throw new IllegalArgumentException("Changing x-queue-type argument is not permitted"); + } + this.arguments.put(key, value); + return this; + } + + /** + * Set the stream name. + * @param name the stream name. + * @return the builder + */ + public SuperStreamBuilder name(String name) { + this.name = name; + return this; + } + + /** + * Set the partitions number. + * @param partitions the partitions number + * @return the builder + */ + public SuperStreamBuilder partitions(int partitions) { + this.partitions = partitions; + return this; + } + + /** + * Set a strategy to determine routing keys to use for the + * partitions. The first parameter is the queue name, the second the number of + * partitions, the returned list must have a size equal to the partitions. + * @param routingKeyStrategy the strategy + * @return the builder + */ + public SuperStreamBuilder routingKeyStrategy(BiFunction> routingKeyStrategy) { + this.routingKeyStrategy = routingKeyStrategy; + return this; + } + + /** + * Builds a final Super Stream. + * @return the Super Stream instance + */ + public SuperStream build() { + if (!StringUtils.hasText(this.name)) { + throw new IllegalArgumentException("Stream name can't be empty"); + } + + if (this.partitions <= 0) { + throw new IllegalArgumentException( + String.format("Partitions number should be great then zero. Current value; %d", this.partitions) + ); + } + + if (this.routingKeyStrategy == null) { + return new SuperStream(this.name, this.partitions, this.arguments); + } + + return new SuperStream(this.name, this.partitions, this.routingKeyStrategy, this.arguments); + } +} diff --git a/spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/config/SuperStreamConfigurationTests.java b/spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/config/SuperStreamConfigurationTests.java new file mode 100644 index 0000000000..781f1fa97d --- /dev/null +++ b/spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/config/SuperStreamConfigurationTests.java @@ -0,0 +1,166 @@ +/* + * Copyright 2021-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.rabbit.stream.config; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import org.springframework.amqp.core.Binding; +import org.springframework.amqp.core.Queue; + +/** + * @author Sergei Kurenchuk + * @since 3.1 + */ +public class SuperStreamConfigurationTests { + + @Test + void argumentsShouldBeAppliedToAllPartitions() { + int partitions = 3; + var argKey = "x-max-age"; + var argValue = 10_000; + + Map testArguments = Map.of(argKey, argValue); + SuperStream superStream = new SuperStream("stream", partitions, testArguments); + + List streams = superStream.getDeclarablesByType(Queue.class); + Assertions.assertEquals(partitions, streams.size()); + + streams.forEach( + it -> { + Object value = it.getArguments().get(argKey); + Assertions.assertNotNull(value, "Arg value should be present"); + Assertions.assertEquals(argValue, value, "Value should be the same"); + } + ); + } + + @Test + void testCustomPartitionsRoutingStrategy() { + var streamName = "test-super-stream-name"; + var partitions = 3; + var names = List.of("test.stream.1", "test.stream.2", "test.stream.3"); + + SuperStream superStream = SuperStreamBuilder.superStream(streamName, partitions) + .routingKeyStrategy((name, partition) -> names) + .build(); + + List bindings = superStream.getDeclarablesByType(Binding.class); + Set routingKeys = bindings.stream().map(Binding::getRoutingKey).collect(Collectors.toSet()); + Assertions.assertTrue(routingKeys.containsAll(names)); + } + + @Test + void builderMustSetupNameAndPartitionsNumber() { + var name = "test-super-stream-name"; + var partitions = 3; + SuperStream superStream = SuperStreamBuilder.superStream(name, partitions).build(); + List streams = superStream.getDeclarablesByType(Queue.class); + Assertions.assertEquals(partitions, streams.size()); + + streams.forEach(it -> Assertions.assertTrue(it.getName().startsWith(name))); + } + + @Test + void builderMustSetupArguments() { + var finalPartitionsNumber = 4; + var finalName = "test-name"; + var maxAge = "1D"; + var maxLength = 10_000_000; + var maxSegmentsSize = 100_000; + var initialClusterSize = 5; + + var testArgName = "test-key"; + var testArgValue = "test-value"; + + SuperStream superStream = SuperStreamBuilder.superStream("name", 3) + .partitions(finalPartitionsNumber) + .maxAge(maxAge) + .maxLength(maxLength) + .maxSegmentSize(maxSegmentsSize) + .initialClusterSize(initialClusterSize) + .name(finalName) + .withArgument(testArgName, testArgValue) + .build(); + + List streams = superStream.getDeclarablesByType(Queue.class); + + Assertions.assertEquals(finalPartitionsNumber, streams.size()); + streams.forEach( + it -> { + Assertions.assertTrue(it.getName().startsWith(finalName)); + Assertions.assertEquals(maxAge, it.getArguments().get("x-max-age")); + Assertions.assertEquals(maxLength, it.getArguments().get("max-length-bytes")); + Assertions.assertEquals(initialClusterSize, it.getArguments().get("x-initial-cluster-size")); + Assertions.assertEquals(maxSegmentsSize, it.getArguments().get("x-stream-max-segment-size-bytes")); + Assertions.assertEquals(testArgValue, it.getArguments().get(testArgName)); + } + ); + } + + @Test + void builderShouldForbidInternalArgumentsChanges() { + SuperStreamBuilder builder = SuperStreamBuilder.superStream("name", 3); + + Assertions.assertThrows(IllegalArgumentException.class, () -> builder.withArgument("x-queue-type", "quorum")); + } + + @Test + void nameCantBeEmpty() { + Assertions.assertThrows( + IllegalArgumentException.class, + () -> SuperStreamBuilder.superStream("", 3).build() + ); + + Assertions.assertThrows( + IllegalArgumentException.class, + () -> SuperStreamBuilder.superStream("testName", 3).name("").build() + ); + + Assertions.assertDoesNotThrow( + () -> SuperStreamBuilder.superStream("testName", 3).build() + ); + } + + @Test + void partitionsNumberShouldBeGreatThenZero() { + Assertions.assertThrows( + IllegalArgumentException.class, + () -> SuperStreamBuilder.superStream("testName", 0).build() + ); + + Assertions.assertThrows( + IllegalArgumentException.class, + () -> SuperStreamBuilder.superStream("testName", -1).build() + ); + + Assertions.assertThrows( + IllegalArgumentException.class, + () -> SuperStreamBuilder.superStream("testName", 1).partitions(0).build() + ); + + Assertions.assertDoesNotThrow( + () -> SuperStreamBuilder.superStream("testName", 1).build() + ); + } + +}