Skip to content

Commit

Permalink
Support batched message using entry filter
Browse files Browse the repository at this point in the history
  • Loading branch information
AnonHxy committed Jul 20, 2022
1 parent 3d15343 commit 9905b1a
Show file tree
Hide file tree
Showing 10 changed files with 229 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service.plugin;


import java.util.List;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.commons.collections4.MapUtils;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.common.api.proto.KeyValue;

public class EntryFilter3Test implements EntryFilter {
@Override
public FilterResult filterEntry(Entry entry, FilterContext context) {
if (context.getMsgMetadata() == null || context.getMsgMetadata().getPropertiesCount() <= 0) {
return FilterResult.ACCEPT;
}
List<KeyValue> list = context.getMsgMetadata().getPropertiesList();
// filter by subscription properties
PersistentSubscription subscription = (PersistentSubscription) context.getSubscription();
if (!MapUtils.isEmpty(subscription.getSubscriptionProperties())) {
for (KeyValue keyValue : list) {
if(subscription.getSubscriptionProperties().containsKey(keyValue.getKey())){
return FilterResult.REJECT;
}
}
}
return FilterResult.ACCEPT;
}

@Override
public void close() {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,11 @@
import static org.testng.AssertJUnit.assertNotNull;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import java.lang.reflect.Field;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
Expand All @@ -51,6 +54,7 @@
import org.awaitility.Awaitility;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = "broker")
Expand Down Expand Up @@ -165,6 +169,120 @@ public void testFilter() throws Exception {

}

@DataProvider(name = "batchedFilterProperties")
public Object[][] batchedFilterProperties() {
return new Object[][]{new Object[] {Collections.singletonList("REJECT")},
{Lists.newArrayList("REJECT", "1", "2")},
{Collections.emptyList()}};
}

@Test(dataProvider = "batchedFilterProperties")
public void testFilterBatch(List<String> batchedFilterProperties) throws Exception {
Map<String, String> map = new HashMap<>();
map.put("1","1");
map.put("2","2");
String topic = "persistent://prop/ns-abc/topic" + UUID.randomUUID();
String subName = "sub";
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic)
.subscriptionProperties(map)
.subscriptionName(subName).subscribe();
// mock entry filters
PersistentSubscription subscription = (PersistentSubscription) pulsar.getBrokerService()
.getTopicReference(topic).get().getSubscription(subName);
Dispatcher dispatcher = subscription.getDispatcher();
Field field = AbstractBaseDispatcher.class.getDeclaredField("entryFilters");
field.setAccessible(true);
NarClassLoader narClassLoader = mock(NarClassLoader.class);
EntryFilter filter1 = new EntryFilterTest();
EntryFilterWithClassLoader loader1 = spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter1, narClassLoader);
EntryFilter filter2 = new EntryFilter3Test();
EntryFilterWithClassLoader loader2 = spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter2, narClassLoader);
field.set(dispatcher, ImmutableList.of(loader1, loader2));

Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.enableBatching(true).batchedFilterProperties(batchedFilterProperties).topic(topic).create();
for (int i = 0; i < 10; i++) {
producer.send("test");
}

int counter = 0;
while (true) {
Message<String> message = consumer.receive(1, TimeUnit.SECONDS);
if (message != null) {
counter++;
consumer.acknowledge(message);
} else {
break;
}
}
// All normal messages can be received
assertEquals(10, counter);
MessageIdImpl lastMsgId = null;
for (int i = 0; i < 10; i++) {
lastMsgId = (MessageIdImpl) producer.newMessage().property("REJECT", "").value("1").send();
}
counter = 0;
while (true) {
Message<String> message = consumer.receive(1, TimeUnit.SECONDS);
if (message != null) {
counter++;
consumer.acknowledge(message);
} else {
break;
}
}
// REJECT messages are filtered out
if (batchedFilterProperties.isEmpty()) { // Nothing in batched filter list, so filter will not take effect
assertEquals(10, counter);
} else {
assertEquals(0, counter);
}


// All messages should be acked, check the MarkDeletedPosition
assertNotNull(lastMsgId);
MessageIdImpl finalLastMsgId = lastMsgId;
Awaitility.await().untilAsserted(() -> {
PositionImpl position = (PositionImpl) subscription.getCursor().getMarkDeletedPosition();
assertEquals(position.getLedgerId(), finalLastMsgId.getLedgerId());
assertEquals(position.getEntryId(), finalLastMsgId.getEntryId());
});
consumer.close();

consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic).subscriptionProperties(map)
.subscriptionName(subName).subscribe();
for (int i = 0; i < 10; i++) {
producer.newMessage().property(String.valueOf(i), String.valueOf(i)).value("1").send();
}
counter = 0;
while (true) {
Message<String> message = consumer.receive(1, TimeUnit.SECONDS);
if (message != null) {
counter++;
consumer.acknowledge(message);
} else {
break;
}
}

if (batchedFilterProperties.size() == 3) { // batched filter list contains `REJECT` `1` `2`
assertEquals(8, counter);
} else {
assertEquals(10, counter);
}

producer.close();
consumer.close();

BrokerService brokerService = pulsar.getBrokerService();
Field field1 = BrokerService.class.getDeclaredField("entryFilters");
field1.setAccessible(true);
field1.set(brokerService, ImmutableMap.of("1", loader1, "2", loader2));
cleanup();
verify(loader1, times(1)).close();
verify(loader2, times(1)).close();

}

@Test
public void testFilteredMsgCount() throws Throwable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.api;

import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -490,6 +491,15 @@ public interface ProducerBuilder<T> extends Cloneable {
*/
ProducerBuilder<T> batcherBuilder(BatcherBuilder batcherBuilder);

/**
* Properties of message will be added to batched metadata if the {@param batchedFilterProperties} contains the key
* of the property. This is useful when producer publish batched message and using {@code EntryFilter} to filter
* messages in broker. <i>default: empty list</i>
* @param batchedFilterProperties keys of properties to added to batched metadata.
* @return the producer builder instance
*/
ProducerBuilder<T> batchedFilterProperties(List<String> batchedFilterProperties);

/**
* Set the baseline for the sequence ids for messages published by the producer.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.client.impl;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.common.api.proto.CompressionType;
Expand All @@ -36,6 +37,7 @@ public abstract class AbstractBatchMessageContainer implements BatchMessageConta
protected String topicName;
protected String producerName;
protected ProducerImpl producer;
protected List<String> batchedFilterProperties = new ArrayList<>();

protected int maxNumMessagesInBatch;
protected int maxBytesInBatch;
Expand Down Expand Up @@ -96,6 +98,7 @@ public void setProducer(ProducerImpl<?> producer) {
this.compressor = CompressionCodecProvider.getCompressionCodec(compressionType);
this.maxNumMessagesInBatch = producer.getConfiguration().getBatchingMaxMessages();
this.maxBytesInBatch = producer.getConfiguration().getBatchingMaxBytes();
this.batchedFilterProperties = producer.getConfiguration().getBatchedFilterProperties();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ public interface BatchMessageContainerBase extends BatchMessageContainer {
*/
boolean hasSameSchema(MessageImpl<?> msg);

boolean hasSameProperties(MessageImpl<?> msg);

/**
* Set producer of the message batch container.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,16 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import lombok.Getter;
import lombok.Setter;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.ProducerImpl.OpSendMsg;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.proto.CompressionType;
import org.apache.pulsar.common.api.proto.KeyValue;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.protocol.ByteBufPair;
import org.apache.pulsar.common.protocol.Commands;
Expand Down Expand Up @@ -82,6 +85,10 @@ public boolean add(MessageImpl<?> msg, SendCallback callback) {
// some properties are common amongst the different messages in the batch, hence we just pick it up from
// the first message
messageMetadata.setSequenceId(msg.getSequenceId());
List<KeyValue> filterProperties = getProperties(msg);
if (!filterProperties.isEmpty()) {
messageMetadata.addAllProperties(filterProperties);
}
lowestSequenceId = Commands.initBatchMessageMetadata(messageMetadata, msg.getMessageBuilder());
this.firstCallback = callback;
batchedMessageMetadataAndPayload = PulsarByteBufAllocator.DEFAULT
Expand Down Expand Up @@ -235,5 +242,27 @@ public boolean hasSameSchema(MessageImpl<?> msg) {
return Arrays.equals(msg.getSchemaVersion(), messageMetadata.getSchemaVersion());
}

@Override
public boolean hasSameProperties(MessageImpl<?> msg) {
if (numMessagesInBatch == 0) {
return true;
}
if (!messageMetadata.getPropertiesList().isEmpty()) {
return getProperties(msg).isEmpty();
}
return getProperties(msg).equals(messageMetadata.getPropertiesList());
}

private List<KeyValue> getProperties(MessageImpl<?> msg) {
if (batchedFilterProperties.isEmpty() || msg.getMessageBuilder().getPropertiesList().isEmpty()) {
return Collections.emptyList();
}
return msg.getMessageBuilder().getPropertiesList()
.stream()
.filter(kv -> batchedFilterProperties.contains(kv.getKey()))
.sorted()
.collect(Collectors.toList());
}

private static final Logger log = LoggerFactory.getLogger(BatchMessageContainerImpl.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,13 @@ public boolean hasSameSchema(MessageImpl<?> msg) {
return batchMessageContainer == null || batchMessageContainer.hasSameSchema(msg);
}

@Override
public boolean hasSameProperties(MessageImpl<?> msg) {
String key = getKey(msg);
BatchMessageContainerImpl batchMessageContainer = batches.get(key);
return batchMessageContainer == null || batchMessageContainer.hasSameProperties(msg);
}

private String getKey(MessageImpl<?> msg) {
if (msg.hasOrderingKey()) {
return Base64.getEncoder().encodeToString(msg.getOrderingKey());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,11 @@ public ProducerBuilder<T> batcherBuilder(BatcherBuilder batcherBuilder) {
return this;
}

@Override
public ProducerBuilder<T> batchedFilterProperties(List<String> batchedFilterProperties) {
conf.setBatchedFilterProperties(batchedFilterProperties);
return this;
}

@Override
public ProducerBuilder<T> initialSequenceId(long initialSequenceId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -845,6 +845,7 @@ private boolean canAddToBatch(MessageImpl<?> msg) {
private boolean canAddToCurrentBatch(MessageImpl<?> msg) {
return batchMessageContainer.haveEnoughSpace(msg)
&& (!isMultiSchemaEnabled(false) || batchMessageContainer.hasSameSchema(msg))
&& batchMessageContainer.hasSameProperties(msg)
&& batchMessageContainer.hasSameTxn(msg);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.google.common.collect.Sets;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
Expand Down Expand Up @@ -73,6 +75,7 @@ public class ProducerConfigurationData implements Serializable, Cloneable {
private boolean batchingEnabled = true; // enabled by default
@JsonIgnore
private BatcherBuilder batcherBuilder = BatcherBuilder.DEFAULT;
private List<String> batchedFilterProperties = new ArrayList<>();
private boolean chunkingEnabled = false;
private int chunkMaxMessageSize = -1;

Expand Down

0 comments on commit 9905b1a

Please sign in to comment.