Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Pulsar client] Enable spotbugs in module pulsar-client. #9630

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions pulsar-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,22 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>com.github.spotbugs</groupId>
<artifactId>spotbugs-maven-plugin</artifactId>
<configuration>
<excludeFilterFile>${basedir}/src/main/resources/findbugsExclude.xml</excludeFilterFile>
</configuration>
<executions>
<execution>
<id>spotbugs</id>
<phase>verify</phase>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import com.google.common.annotations.VisibleForTesting;

public class BackoffBuilder {
private long backoffIntervalNanos;
private long maxBackoffIntervalNanos;
private long initial;
private TimeUnit unitInitial;
private long max;
Expand All @@ -40,8 +38,6 @@ public class BackoffBuilder {
this.max = 0;
this.mandatoryStop = 0;
this.clock = Clock.systemDefaultZone();
this.backoffIntervalNanos = 0;
this.maxBackoffIntervalNanos = 0;
}

public BackoffBuilder setInitialTime(long initial, TimeUnit unitInitial) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
/**
*/
public class BatchMessageIdImpl extends MessageIdImpl {

private static final long serialVersionUID = 1L;
private final static int NO_BATCH = -1;
private final int batchIndex;
private final int batchSize;
Expand Down Expand Up @@ -92,7 +94,7 @@ public int compareTo(MessageId o) {

@Override
public int hashCode() {
return (int) (31 * (ledgerId + 31 * entryId) + (31 * partitionIndex) + batchIndex);
return (int) (31 * (ledgerId + 31 * entryId) + (31 * (long) partitionIndex) + batchIndex);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ protected CompletableFuture<Void> doAcknowledgeWithTxn(List<MessageId> messageId
.thenCompose(ignored -> doAcknowledge(messageIdList, ackType, properties, txn));
txn.registerAckOp(ackFuture);
} else {
ackFuture = doAcknowledge(messageIdList, ackType, properties, txn);
ackFuture = doAcknowledge(messageIdList, ackType, properties, null);
}
return ackFuture;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ public ConsumerBuilder<T> batchReceivePolicy(BatchReceivePolicy batchReceivePoli

@Override
public String toString() {
return conf != null ? conf.toString() : null;
return conf != null ? conf.toString() : "";
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,7 @@ protected CompletableFuture<Void> doReconsumeLater(Message<?> message, AckType a
}

if (propertiesMap.containsKey(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES)) {
reconsumetimes = Integer.valueOf(propertiesMap.get(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES));
reconsumetimes = Integer.parseInt(propertiesMap.get(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES));
reconsumetimes = reconsumetimes + 1;

} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class ConsumerStatsRecorderImpl implements ConsumerStatsRecorder {
private static final long serialVersionUID = 1L;
private TimerTask stat;
private Timeout statTimeout;
private Consumer<?> consumer;
private final Consumer<?> consumer;
private PulsarClientImpl pulsarClient;
private long oldTime;
private long statsIntervalSeconds;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
*/
package org.apache.pulsar.client.impl;

import java.util.HashMap;
import java.util.Map;

import org.apache.pulsar.client.impl.conf.DefaultCryptoKeyReaderConfigurationData;

public class DefaultCryptoKeyReaderBuilder implements Cloneable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -504,16 +504,14 @@ protected CompletableFuture<Void> doReconsumeLater(Message<?> message, AckType a
}

if (ackType == AckType.Cumulative) {
Consumer individualConsumer = consumers.get(topicMessageId.getTopicPartitionName());
Consumer<?> individualConsumer = consumers.get(topicMessageId.getTopicPartitionName());
if (individualConsumer != null) {
MessageId innerId = topicMessageId.getInnerMessageId();
return individualConsumer.reconsumeLaterCumulativeAsync(message, delayTime, unit);
} else {
return FutureUtil.failedFuture(new PulsarClientException.NotConnectedException());
}
} else {
ConsumerImpl<T> consumer = consumers.get(topicMessageId.getTopicPartitionName());
MessageId innerId = topicMessageId.getInnerMessageId();
return consumer.doReconsumeLater(message, ackType, properties, delayTime, unit)
.thenRun(() ->unAckedMessageTracker.remove(topicMessageId));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,6 @@ private void setMessageRoutingMode() throws PulsarClientException {

@Override
public String toString() {
return conf != null ? conf.toString() : null;
return conf != null ? conf.toString() : "";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1129,8 +1129,7 @@ protected boolean verifyLocalBufferIsNotCorrupted(OpSendMsg op) {
}
return true;
} else {
log.warn("[{}] Failed while casting {} into ByteBufPair", producerName,
(op.cmd == null ? null : op.cmd.getClass().getName()));
log.warn("[{}] Failed while casting empty ByteBufPair, ", producerName);
return false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@
public class ProducerStatsRecorderImpl implements ProducerStatsRecorder {

private static final long serialVersionUID = 1L;
private TimerTask stat;
private Timeout statTimeout;
private ProducerImpl<?> producer;
private PulsarClientImpl pulsarClient;
private transient TimerTask stat;
private transient Timeout statTimeout;
private transient ProducerImpl<?> producer;
private transient PulsarClientImpl pulsarClient;
private long oldTime;
private long statsIntervalSeconds;
private final LongAdder numMsgsSent;
Expand All @@ -55,7 +55,7 @@ public class ProducerStatsRecorderImpl implements ProducerStatsRecorder {
private final LongAdder totalAcksReceived;
private static final DecimalFormat DEC = new DecimalFormat("0.000");
private static final DecimalFormat THROUGHPUT_FORMAT = new DecimalFormat("0.00");
private final DoublesSketch ds;
private transient final DoublesSketch ds;

private volatile double sendMsgsRate;
private volatile double sendBytesRate;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ public class TypedMessageBuilderImpl<T> implements TypedMessageBuilder<T> {

private static final ByteBuffer EMPTY_CONTENT = ByteBuffer.allocate(0);

private final ProducerBase<?> producer;
private final MessageMetadata msgMetadata = new MessageMetadata();
private final Schema<T> schema;
private ByteBuffer content;
private final TransactionImpl txn;
private transient final ProducerBase<?> producer;
private transient final MessageMetadata msgMetadata = new MessageMetadata();
private transient final Schema<T> schema;
private transient ByteBuffer content;
private transient final TransactionImpl txn;

public TypedMessageBuilderImpl(ProducerBase<?> producer, Schema<T> schema) {
this(producer, schema, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@

@Slf4j
public class AuthenticationDataKeyStoreTls implements AuthenticationDataProvider {
private final KeyStoreParams keyStoreParams;

private static final long serialVersionUID = 1L;
private transient final KeyStoreParams keyStoreParams;

public AuthenticationDataKeyStoreTls(KeyStoreParams keyStoreParams) throws Exception {
this.keyStoreParams = keyStoreParams;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,13 @@
import org.slf4j.LoggerFactory;

public class AuthenticationDataTls implements AuthenticationDataProvider {
private static final long serialVersionUID = 1L;
protected X509Certificate[] tlsCertificates;
protected PrivateKey tlsPrivateKey;
private FileModifiedTimeUpdater certFile, keyFile;
private transient FileModifiedTimeUpdater certFile, keyFile;
// key and cert using stream
private InputStream certStream, keyStream;
private Supplier<ByteArrayInputStream> certStreamProvider, keyStreamProvider, trustStoreStreamProvider;
private transient InputStream certStream, keyStream;
private transient Supplier<ByteArrayInputStream> certStreamProvider, keyStreamProvider, trustStoreStreamProvider;

public AuthenticationDataTls(String certFilePath, String keyFilePath) throws KeyManagementException {
if (certFilePath == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class AuthenticationKeyStoreTls implements Authentication, EncodedAuthent
public final static String KEYSTORE_PW = "keyStorePassword";
private final static String DEFAULT_KEYSTORE_TYPE = "JKS";

private KeyStoreParams keyStoreParams;
private transient KeyStoreParams keyStoreParams;

public AuthenticationKeyStoreTls() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class AuthenticationTls implements Authentication, EncodedAuthenticationP

private String certFilePath;
private String keyFilePath;
private Supplier<ByteArrayInputStream> certStreamProvider, keyStreamProvider, trustStoreStreamProvider;
private transient Supplier<ByteArrayInputStream> certStreamProvider, keyStreamProvider, trustStoreStreamProvider;

public AuthenticationTls() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
public class AuthenticationToken implements Authentication, EncodedAuthenticationParameterSupport {

private static final long serialVersionUID = 1L;
private transient Supplier<String> tokenSupplier;
private transient Supplier<String> tokenSupplier = null;

public AuthenticationToken() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public class ClientConfigurationData implements Serializable, Cloneable {
private transient ServiceUrlProvider serviceUrlProvider;

@JsonIgnore
private Authentication authentication = AuthenticationDisabled.INSTANCE;
private Authentication authentication;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why need to remove the default value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it has no any meaning.

    public Authentication getAuthentication() {
        if (authentication == null) {
            this.authentication = new AuthenticationDisabled();
        }
        return authentication;
    }

private String authPluginClassName;

@Secret
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public class ConsumerConfigurationData<T> implements Serializable, Cloneable {
private CryptoKeyReader cryptoKeyReader = null;

@JsonIgnore
private MessageCrypto messageCrypto = null;
private transient MessageCrypto messageCrypto = null;

private ConsumerCryptoFailureAction cryptoFailureAction = ConsumerCryptoFailureAction.FAIL;

Expand All @@ -110,7 +110,7 @@ public class ConsumerConfigurationData<T> implements Serializable, Cloneable {

private RegexSubscriptionMode regexSubscriptionMode = RegexSubscriptionMode.PersistentOnly;

private DeadLetterPolicy deadLetterPolicy;
private transient DeadLetterPolicy deadLetterPolicy;

private boolean retryEnable = false;

Expand All @@ -125,7 +125,7 @@ public class ConsumerConfigurationData<T> implements Serializable, Cloneable {

private boolean resetIncludeHead = false;

private KeySharedPolicy keySharedPolicy;
private transient KeySharedPolicy keySharedPolicy;

private boolean batchIndexAckEnabled = false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public class ProducerConfigurationData implements Serializable, Cloneable {
private CryptoKeyReader cryptoKeyReader;

@JsonIgnore
private MessageCrypto messageCrypto = null;
private transient MessageCrypto messageCrypto = null;

@JsonIgnore
private Set<String> encryptionKeys = new TreeSet<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
@Data
public class ReaderConfigurationData<T> implements Serializable, Cloneable {

private static final long serialVersionUID = 1L;

private String topicName;

@JsonIgnore
Expand All @@ -55,7 +57,7 @@ public class ReaderConfigurationData<T> implements Serializable, Cloneable {
private boolean readCompacted = false;
private boolean resetIncludeHead = false;

private List<Range> keyHashRanges;
private transient List<Range> keyHashRanges;

@SuppressWarnings("unchecked")
public ReaderConfigurationData<T> clone() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

Expand Down Expand Up @@ -76,7 +77,8 @@ private static void serializeFileDescriptor(Descriptors.FileDescriptor fileDescr
if (unResolvedFileDescriptNames.length == 0) {
fileDescriptorCache.put(fileDescriptor.getFullName(), fileDescriptor.toProto());
} else {
throw new SchemaSerializationException(fileDescriptor.getFullName() + " can't resolve dependency '" + unResolvedFileDescriptNames + "'.");
throw new SchemaSerializationException(fileDescriptor.getFullName() + " can't resolve dependency '" +
Arrays.toString(unResolvedFileDescriptNames) + "'.");
}
}

Expand All @@ -96,7 +98,7 @@ public static Descriptors.Descriptor deserialize(byte[] schemaDataBytes) {
//extract root fileDescriptor
Descriptors.FileDescriptor fileDescriptor = fileDescriptorCache.get(schemaData.getRootFileDescriptorName());
//trim package
String[] paths = StringUtils.removeFirst(schemaData.getRootMessageTypeName(), fileDescriptor.getPackage()).replaceFirst(".", "").split("\\.");
String[] paths = StringUtils.removeFirst(schemaData.getRootMessageTypeName(), fileDescriptor.getPackage()).replaceFirst("\\.", "").split("\\.");
//extract root message
descriptor = fileDescriptor.findMessageTypeByName(paths[0]);
//extract nested message
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class GenericAvroReader implements SchemaReader<GenericRecord> {
private final List<Field> fields;
private final Schema schema;
private final byte[] schemaVersion;
private int offset;
private final int offset;

public GenericAvroReader(Schema schema) {
this(null, schema, null);
Expand All @@ -66,7 +66,7 @@ public GenericAvroReader(Schema writerSchema, Schema readerSchema, byte[] schema
this.reader = new GenericDatumReader<>(writerSchema, readerSchema);
}
this.byteArrayOutputStream = new ByteArrayOutputStream();
this.encoder = EncoderFactory.get().binaryEncoder(this.byteArrayOutputStream, encoder);
this.encoder = EncoderFactory.get().binaryEncoder(this.byteArrayOutputStream, null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use null will avoid reusing the encode?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't make any sense, the encoder in this time is always null. same below.


if (schema.getObjectProp(GenericAvroSchema.OFFSET_PROP) != null) {
this.offset = Integer.parseInt(schema.getObjectProp(GenericAvroSchema.OFFSET_PROP).toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class GenericAvroWriter implements SchemaWriter<GenericRecord> {
public GenericAvroWriter(Schema schema) {
this.writer = new GenericDatumWriter<>(schema);
this.byteArrayOutputStream = new ByteArrayOutputStream();
this.encoder = EncoderFactory.get().binaryEncoder(this.byteArrayOutputStream, encoder);
this.encoder = EncoderFactory.get().binaryEncoder(this.byteArrayOutputStream, null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as the above comment.

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@

public class GenericProtobufNativeReader implements SchemaReader<GenericRecord> {

private Descriptors.Descriptor descriptor;
private byte[] schemaVersion;
private List<Field> fields;
private final Descriptors.Descriptor descriptor;
private final byte[] schemaVersion;
private final List<Field> fields;

public GenericProtobufNativeReader(Descriptors.Descriptor descriptor) {
this(descriptor, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@

import java.util.List;

public class GenericProtobufNativeRecord<T extends DynamicMessage> extends VersionedGenericRecord {
public class GenericProtobufNativeRecord extends VersionedGenericRecord {

private DynamicMessage record;
private Descriptors.Descriptor msgDesc;
private final DynamicMessage record;
private final Descriptors.Descriptor msgDesc;

protected GenericProtobufNativeRecord(byte[] schemaVersion, Descriptors.Descriptor msgDesc, List<Field> fields, DynamicMessage record) {
super(schemaVersion, fields);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public AvroWriter(Schema schema) {

public AvroWriter(Schema schema, boolean jsr310ConversionEnabled) {
this.byteArrayOutputStream = new ByteArrayOutputStream();
this.encoder = EncoderFactory.get().binaryEncoder(this.byteArrayOutputStream, this.encoder);
this.encoder = EncoderFactory.get().binaryEncoder(this.byteArrayOutputStream, null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as the above comment

ReflectData reflectData = new ReflectData();
AvroSchema.addLogicalTypeConversions(reflectData, jsr310ConversionEnabled);
this.writer = new ReflectDatumWriter<>(schema, reflectData);
Expand Down
Loading