Skip to content

Commit

Permalink
Enable CheckStyle plugin in pulsar-client module (#13940)
Browse files Browse the repository at this point in the history
### Motivation

Enable CheckStyle plugin in pulsar-client module.

### Modifications

Fix the wrong code style in pulsar-client module.

### Verifying this change

The `maven-checkstyle-plugin` is added to `pulsar-client/pom.xml` so
that the code style will be checked in `mvn clean install`.
  • Loading branch information
BewareMyPower authored Jan 25, 2022
1 parent 496afa7 commit 04aa9e8
Show file tree
Hide file tree
Showing 166 changed files with 1,385 additions and 1,069 deletions.
14 changes: 14 additions & 0 deletions pulsar-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,20 @@
</executions>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<executions>
<execution>
<id>checkstyle</id>
<phase>verify</phase>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,13 @@
*/
package org.apache.pulsar.client.impl;

import java.io.IOException;
import java.util.List;
import lombok.extern.slf4j.Slf4j;

import org.apache.pulsar.common.api.proto.CompressionType;
import org.apache.pulsar.common.compression.CompressionCodec;
import org.apache.pulsar.common.compression.CompressionCodecProvider;

import java.io.IOException;
import java.util.List;

/**
* Batch message container framework.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

import org.apache.pulsar.client.api.MessageId;

import org.apache.pulsar.common.api.proto.CommandAck.AckType;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,11 @@
package org.apache.pulsar.client.impl;

import static org.apache.commons.lang3.StringUtils.isNotBlank;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.EncodedAuthenticationParameterSupport;
import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
Expand Down Expand Up @@ -56,7 +53,7 @@ public static Map<String, String> configureFromPulsar1AuthParamString(String aut
}

/**
* Create an instance of the Authentication-Plugin
* Create an instance of the Authentication-Plugin.
*
* @param authPluginClassName
* name of the Authentication-Plugin you want to use
Expand Down Expand Up @@ -89,7 +86,7 @@ public static final Authentication create(String authPluginClassName, String aut
}

/**
* Create an instance of the Authentication-Plugin
* Create an instance of the Authentication-Plugin.
*
* @param authPluginClassName
* name of the Authentication-Plugin you want to use
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@
package org.apache.pulsar.client.impl;

import com.google.common.annotations.VisibleForTesting;
import lombok.Data;

import java.time.Clock;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import lombok.Data;

// All variables are in TimeUnit millis by default
@Data
Expand Down Expand Up @@ -102,8 +101,8 @@ long getFirstBackoffTimeInMillis() {
}

public static boolean shouldBackoff(long initialTimestamp, TimeUnit unitInitial, int failedAttempts,
long defaultInterval, long maxBackoffInterval) {
long initialTimestampInNano = unitInitial.toNanos(initialTimestamp);
long defaultInterval, long maxBackoffInterval) {
long initialTimestampInNano = unitInitial.toNanos(initialTimestamp);
long currentTime = System.nanoTime();
long interval = defaultInterval;
for (int i = 1; i < failedAttempts; i++) {
Expand All @@ -120,6 +119,6 @@ public static boolean shouldBackoff(long initialTimestamp, TimeUnit unitInitial,

public static boolean shouldBackoff(long initialTimestamp, TimeUnit unitInitial, int failedAttempts) {
return Backoff.shouldBackoff(initialTimestamp, unitInitial, failedAttempts,
DEFAULT_INTERVAL_IN_NANOSECONDS, MAX_BACKOFF_INTERVAL_NANOSECONDS);
DEFAULT_INTERVAL_IN_NANOSECONDS, MAX_BACKOFF_INTERVAL_NANOSECONDS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,34 +29,34 @@ public class BackoffBuilder {
private Clock clock;
private long mandatoryStop;
private TimeUnit unitMandatoryStop;

public BackoffBuilder() {
this.initial = 0;
this.max = 0;
this.mandatoryStop = 0;
this.clock = Clock.systemDefaultZone();
}

public BackoffBuilder setInitialTime(long initial, TimeUnit unitInitial) {
this.unitInitial = unitInitial;
this.initial = initial;
return this;
this.unitInitial = unitInitial;
this.initial = initial;
return this;
}

public BackoffBuilder setMax(long max, TimeUnit unitMax) {
this.unitMax = unitMax;
this.max = max;
return this;
this.unitMax = unitMax;
this.max = max;
return this;
}

public BackoffBuilder setMandatoryStop(long mandatoryStop, TimeUnit unitMandatoryStop) {
this.mandatoryStop = mandatoryStop;
this.unitMandatoryStop = unitMandatoryStop;
return this;
this.mandatoryStop = mandatoryStop;
this.unitMandatoryStop = unitMandatoryStop;
return this;
}


public Backoff create() {
return new Backoff(initial, unitInitial, max, unitMax, mandatoryStop, unitMandatoryStop, clock);
return new Backoff(initial, unitInitial, max, unitMax, mandatoryStop, unitMandatoryStop, clock);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,10 @@ public boolean isPrevBatchCumulativelyAcked() {

@Override
public String toString() {
return "BatchMessageAcker{" +
"batchSize=" + batchSize +
", bitSet=" + bitSet +
", prevBatchCumulativelyAcked=" + prevBatchCumulativelyAcked +
'}';
return "BatchMessageAcker{"
+ "batchSize=" + batchSize
+ ", bitSet=" + bitSet
+ ", prevBatchCumulativelyAcked=" + prevBatchCumulativelyAcked
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@
*/
package org.apache.pulsar.client.impl;

import org.apache.pulsar.client.api.BatchMessageContainer;
import org.apache.pulsar.client.impl.ProducerImpl.OpSendMsg;

import java.io.IOException;
import java.util.List;
import org.apache.pulsar.client.api.BatchMessageContainer;
import org.apache.pulsar.client.impl.ProducerImpl.OpSendMsg;

public interface BatchMessageContainerBase extends BatchMessageContainer {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,10 @@
package org.apache.pulsar.client.impl;

import io.netty.buffer.ByteBuf;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.ProducerImpl.OpSendMsg;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
Expand All @@ -36,7 +34,7 @@
import org.slf4j.LoggerFactory;

/**
* Default batch message container
* Default batch message container.
*
* incoming single messages:
* (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)
Expand Down Expand Up @@ -115,8 +113,8 @@ private ByteBuf getCompressedBatchMetadataAndPayload() {
MessageImpl<?> msg = messages.get(i);
msg.getDataBuffer().markReaderIndex();
try {
batchedMessageMetadataAndPayload = Commands.serializeSingleMessageInBatchWithPayload(msg.getMessageBuilder(),
msg.getDataBuffer(), batchedMessageMetadataAndPayload);
batchedMessageMetadataAndPayload = Commands.serializeSingleMessageInBatchWithPayload(
msg.getMessageBuilder(), msg.getDataBuffer(), batchedMessageMetadataAndPayload);
} catch (Throwable th) {
// serializing batch message can corrupt the index of message and batch-message. Reset the index so,
// next iteration doesn't send corrupt message to broker.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ public BatchMessageIdImpl(long ledgerId, long entryId, int partitionIndex, int b
this(ledgerId, entryId, partitionIndex, batchIndex, 0, BatchMessageAckerDisabled.INSTANCE);
}

public BatchMessageIdImpl(long ledgerId, long entryId, int partitionIndex, int batchIndex, int batchSize, BatchMessageAcker acker) {
public BatchMessageIdImpl(long ledgerId, long entryId, int partitionIndex, int batchIndex, int batchSize,
BatchMessageAcker acker) {
super(ledgerId, entryId, partitionIndex);
this.batchIndex = batchIndex;
this.batchSize = batchSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,15 @@
package org.apache.pulsar.client.impl;

import com.google.common.collect.ComparisonChain;

import io.netty.buffer.ByteBuf;
import io.netty.util.ReferenceCountUtil;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.proto.CompressionType;
Expand All @@ -42,7 +39,7 @@
import org.slf4j.LoggerFactory;

/**
* Key based batch message container
* Key based batch message container.
*
* incoming single messages:
* (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)
Expand Down Expand Up @@ -119,7 +116,8 @@ public boolean isMultiBatches() {
}

private ProducerImpl.OpSendMsg createOpSendMsg(KeyedBatch keyedBatch) throws IOException {
ByteBuf encryptedPayload = producer.encryptMessage(keyedBatch.messageMetadata, keyedBatch.getCompressedBatchMetadataAndPayload());
ByteBuf encryptedPayload = producer.encryptMessage(keyedBatch.messageMetadata,
keyedBatch.getCompressedBatchMetadataAndPayload());
if (encryptedPayload.readableBytes() > ClientCnx.getMaxMessageSize()) {
keyedBatch.discard(new PulsarClientException.InvalidMessageException(
"Message size is bigger than " + ClientCnx.getMaxMessageSize() + " bytes"));
Expand All @@ -141,7 +139,8 @@ private ProducerImpl.OpSendMsg createOpSendMsg(KeyedBatch keyedBatch) throws IOE
ByteBufPair cmd = producer.sendMessage(producer.producerId, keyedBatch.sequenceId, numMessagesInBatch,
keyedBatch.messageMetadata, encryptedPayload);

ProducerImpl.OpSendMsg op = ProducerImpl.OpSendMsg.create(keyedBatch.messages, cmd, keyedBatch.sequenceId, keyedBatch.firstCallback);
ProducerImpl.OpSendMsg op = ProducerImpl.OpSendMsg.create(
keyedBatch.messages, cmd, keyedBatch.sequenceId, keyedBatch.firstCallback);

op.setNumMessagesInBatch(numMessagesInBatch);
op.setBatchSizeByte(currentBatchSizeBytes);
Expand Down Expand Up @@ -203,8 +202,8 @@ private static class KeyedBatch {

private ByteBuf getCompressedBatchMetadataAndPayload() {
for (MessageImpl<?> msg : messages) {
batchedMessageMetadataAndPayload = Commands.serializeSingleMessageInBatchWithPayload(msg.getMessageBuilder(),
msg.getDataBuffer(), batchedMessageMetadataAndPayload);
batchedMessageMetadataAndPayload = Commands.serializeSingleMessageInBatchWithPayload(
msg.getMessageBuilder(), msg.getDataBuffer(), batchedMessageMetadataAndPayload);
}
int uncompressedSize = batchedMessageMetadataAndPayload.readableBytes();
ByteBuf compressedPayload = compressor.encode(batchedMessageMetadataAndPayload);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@
package org.apache.pulsar.client.impl;

import static java.lang.String.format;

import io.netty.buffer.ByteBuf;

import java.net.InetSocketAddress;
import java.net.URI;
import java.util.ArrayList;
Expand All @@ -32,16 +30,15 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse;
import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse.LookupType;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.slf4j.Logger;
Expand All @@ -56,12 +53,19 @@ public class BinaryProtoLookupService implements LookupService {
private final String listenerName;
private final int maxLookupRedirects;

public BinaryProtoLookupService(PulsarClientImpl client, String serviceUrl, boolean useTls, ExecutorService executor)
public BinaryProtoLookupService(PulsarClientImpl client,
String serviceUrl,
boolean useTls,
ExecutorService executor)
throws PulsarClientException {
this(client, serviceUrl, null, useTls, executor);
}

public BinaryProtoLookupService(PulsarClientImpl client, String serviceUrl, String listenerName, boolean useTls, ExecutorService executor)
public BinaryProtoLookupService(PulsarClientImpl client,
String serviceUrl,
String listenerName,
boolean useTls,
ExecutorService executor)
throws PulsarClientException {
this.client = client;
this.useTls = useTls;
Expand Down Expand Up @@ -129,7 +133,8 @@ private CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> findBroker
uri = new URI(serviceUrl);
}

InetSocketAddress responseBrokerAddress = InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort());
InetSocketAddress responseBrokerAddress =
InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort());

// (2) redirect to given address if response is: redirect
if (r.redirect) {
Expand Down Expand Up @@ -193,7 +198,8 @@ private CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(
partitionFuture.complete(new PartitionedTopicMetadata(r.partitions));
} catch (Exception e) {
partitionFuture.completeExceptionally(new PulsarClientException.LookupException(
format("Failed to parse partition-response redirect=%s, topic=%s, partitions with %s, error message %s",
format("Failed to parse partition-response redirect=%s, topic=%s, partitions with %s,"
+ " error message %s",
r.redirect, topicName.toString(), r.partitions,
e.getMessage())));
}
Expand Down Expand Up @@ -275,7 +281,8 @@ private void getTopicsUnderNamespace(InetSocketAddress socketAddress,
topicsFuture.completeExceptionally(t);
} else {
if (log.isDebugEnabled()) {
log.debug("[namespace: {}] Success get topics list in request: {}", namespace.toString(), requestId);
log.debug("[namespace: {}] Success get topics list in request: {}",
namespace.toString(), requestId);
}

// do not keep partition part of topic name
Expand All @@ -302,8 +309,8 @@ private void getTopicsUnderNamespace(InetSocketAddress socketAddress,
}

((ScheduledExecutorService) executor).schedule(() -> {
log.warn("[namespace: {}] Could not get connection while getTopicsUnderNamespace -- Will try again in {} ms",
namespace, nextDelay);
log.warn("[namespace: {}] Could not get connection while getTopicsUnderNamespace -- Will try again in"
+ " {} ms", namespace, nextDelay);
remainingTime.addAndGet(-nextDelay);
getTopicsUnderNamespace(socketAddress, namespace, backoff, remainingTime, topicsFuture, mode);
}, nextDelay, TimeUnit.MILLISECONDS);
Expand Down
Loading

0 comments on commit 04aa9e8

Please sign in to comment.