Skip to content

Commit

Permalink
Treat deprecations as errors (#253)
Browse files Browse the repository at this point in the history
* Treat deprecated usage as errors

* Update the review date

* do not use deprecated APIs in examples

* do not use deprecated APIs in examples, this time for real
  • Loading branch information
bsideup authored Jan 31, 2020
1 parent 168aafa commit f99da14
Show file tree
Hide file tree
Showing 12 changed files with 86 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Value;
import lombok.experimental.Wither;
import lombok.With;
import org.reactivestreams.Publisher;

import java.nio.ByteBuffer;
Expand Down Expand Up @@ -117,7 +117,7 @@ public <T> Envelope withValue(T rawValue, Function<T, ByteBuffer> valueEncoder)
}

@Value
@Wither
@With
class Record {

Envelope envelope;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ protected void load(ApplicationContext context, Object[] sources) {
(GenericApplicationContext applicationContext) -> {
applicationContext.registerBean("health", RouterFunction.class, () -> {
return RouterFunctions.route()
.GET("/health", __ -> ServerResponse.ok().syncBody("OK"))
.GET("/health", __ -> ServerResponse.ok().bodyValue("OK"))
.build();
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,12 @@ public Mono<Empty> ack(Mono<AckRequest> request) {
GroupId groupId;
int partition;

if (ack.hasAssignment()) {
var subscription = subscriptions.get(ack.getAssignment().getSessionId());
@SuppressWarnings("deprecation")
var hasAssignment = ack.hasAssignment();
if (hasAssignment) {
@SuppressWarnings("deprecation")
var assignment = ack.getAssignment();
var subscription = subscriptions.get(assignment.getSessionId());

if (subscription == null) {
log.warn("Subscription is null, returning empty Publisher. Request: {}", ack.toString().replace("\n", "\\n"));
Expand All @@ -231,7 +235,7 @@ public Mono<Empty> ack(Mono<AckRequest> request) {

topic = subscription.getTopic();
groupId = subscription.getGroupId();
partition = ack.getAssignment().getPartition();
partition = assignment.getPartition();
} else {
topic = ack.getTopic();
groupId = GroupId.of(ack.getGroup(), ack.getGroupVersion());
Expand Down
34 changes: 22 additions & 12 deletions app/src/test/java/com/github/bsideup/liiklus/AckTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
import com.google.protobuf.ByteString;
import org.junit.Before;
import org.junit.Test;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;

import static org.assertj.core.api.Assertions.assertThat;

Expand Down Expand Up @@ -103,11 +105,15 @@ public void testStatelessAck() throws Exception {
public void testAlwaysLatest() throws Exception {
Integer partition = stub.subscribe(subscribeRequest)
.map(SubscribeReply::getAssignment)
.delayUntil(assignment ->
stub.ack(AckRequest.newBuilder().setAssignment(assignment).setOffset(10).build())
.delayUntil(new Function<>() {
@Override
@SuppressWarnings("deprecation")
public Publisher<?> apply(Assignment assignment) {
return stub.ack(AckRequest.newBuilder().setAssignment(assignment).setOffset(10).build())
.then(stub.ack(AckRequest.newBuilder().setAssignment(assignment).setOffset(200).build()))
.then(stub.ack(AckRequest.newBuilder().setAssignment(assignment).setOffset(100).build()))
)
.then(stub.ack(AckRequest.newBuilder().setAssignment(assignment).setOffset(100).build()));
}
})
.take(1)
.map(Assignment::getPartition)
.blockFirst(Duration.ofSeconds(10));
Expand Down Expand Up @@ -150,14 +156,18 @@ public void testInterruption() throws Exception {
.receive(ReceiveRequest.newBuilder().setAssignment(it.getAssignment()).build())
.map(ReceiveReply::getRecord)
.buffer(5)
.delayUntil(batch -> stub
.ack(
AckRequest.newBuilder()
.setAssignment(it.getAssignment())
.setOffset(batch.get(batch.size() - 1).getOffset())
.build()
)
)
.delayUntil(batch -> {
@SuppressWarnings("deprecation")
var builder = AckRequest.newBuilder()
.setAssignment(it.getAssignment());

return stub
.ack(
builder
.setOffset(batch.get(batch.size() - 1).getOffset())
.build()
);
})
)
.take(1)
)
Expand Down
16 changes: 10 additions & 6 deletions app/src/test/java/com/github/bsideup/liiklus/PositionsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,16 @@ public void testGetOffsets() throws Exception {
.flatMap(it -> stub.receive(ReceiveRequest.newBuilder().setAssignment(it.getAssignment()).build())
.map(ReceiveReply::getRecord)
.filter(record -> key.equals(record.getKey().toStringUtf8()))
.delayUntil(record -> stub.ack(
AckRequest.newBuilder()
.setAssignment(it.getAssignment())
.setOffset(record.getOffset())
.build()
))
.delayUntil(record -> {
@SuppressWarnings("deprecation")
var builder = AckRequest.newBuilder()
.setAssignment(it.getAssignment());
return stub.ack(
builder
.setOffset(record.getOffset())
.build()
);
})
)
.blockFirst(Duration.ofSeconds(10));

Expand Down
7 changes: 7 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ configure(subprojects.findAll { !it.name.startsWith("examples/") }) {

sourceCompatibility = targetCompatibility = 11

tasks.withType(JavaCompile) {
options.compilerArgs = [
'-Xlint:deprecation',
'-Werror'
]
}

test {
useJUnitPlatform()

Expand Down
17 changes: 8 additions & 9 deletions examples/java/src/main/java/com/example/Consumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public static void main(String[] args) {

var channel = NettyChannelBuilder.forTarget(liiklusTarget)
.directExecutor()
.usePlaintext(true)
.usePlaintext()
.build();

var subscribeAction = SubscribeRequest.newBuilder()
Expand Down Expand Up @@ -74,7 +74,10 @@ public static void main(String[] args) {
log.info("ACKing partition {} offset {}", assignment.getPartition(), record.getOffset());
return stub.ack(
AckRequest.newBuilder()
.setAssignment(assignment)
.setTopic(subscribeAction.getTopic())
.setGroup(subscribeAction.getGroup())
.setGroupVersion(subscribeAction.getGroupVersion())
.setPartition(assignment.getPartition())
.setOffset(record.getOffset())
.build()
);
Expand All @@ -86,16 +89,12 @@ public static void main(String[] args) {
}

private static String getLiiklusTarget() {
var kafka = new KafkaContainer()
.withEnv("KAFKA_NUM_PARTITIONS", "4");

GenericContainer liiklus = new GenericContainer<>("bsideup/liiklus:0.1.8")
.withNetwork(kafka.getNetwork())
GenericContainer<?> liiklus = new GenericContainer<>("bsideup/liiklus:0.9.0")
.withExposedPorts(6565)
.withEnv("kafka_bootstrapServers", kafka.getNetworkAliases().get(0) + ":9093")
.withEnv("storage_records_type", "MEMORY")
.withEnv("storage_positions_type", "MEMORY"); // Fine for testing, NOT FINE I WARNED YOU for production :D

Stream.of(kafka, liiklus).parallel().forEach(GenericContainer::start);
liiklus.start();

log.info("Containers started");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,22 +123,22 @@ public Publisher<Record> getPublisher() {
);
return Mono.defer(() -> Mono.fromCompletionStage(offsetsProvider.get()))
.defaultIfEmpty(Collections.emptyMap())
.flatMapMany(offsets -> storedPartition.getProcessor()
.compose(flux -> {
if (offsets.containsKey(partition)) {
return flux.skip(offsets.get(partition));
}
switch (autoOffsetReset.orElse("")) {
case "latest":
long nextOffset = storedPartition.getNextOffset().get();
if (nextOffset > 0) {
return flux.skip(nextOffset);
}
default:
return flux;
}
})
)
.flatMapMany(offsets -> {
return storedPartition.getProcessor().transform(flux -> {
if (offsets.containsKey(partition)) {
return flux.skip(offsets.get(partition));
}
switch (autoOffsetReset.orElse("")) {
case "latest":
long nextOffset = storedPartition.getNextOffset().get();
if (nextOffset > 0) {
return flux.skip(nextOffset);
}
default:
return flux;
}
});
})
.filter(it -> storedTopic.isAssigned(groupName, subscription, partition))
.map(it -> new Record(
new Envelope(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,11 @@ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
});
}

@Override
public int hashCode() {
return System.identityHashCode(this);
}

@Override
public boolean equals(Object o) {
return this == o;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public void initialize(GenericApplicationContext applicationContext) {
TextFormat.write004(writer, Collections.enumeration(metrics));
return ServerResponse.ok()
.contentType(MediaType.valueOf(TextFormat.CONTENT_TYPE_004))
.syncBody(writer.toString());
.bodyValue(writer.toString());
} catch (IOException e) {
return ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ public CompletionStage<Map<Integer, Long>> getEndOffsets(String topic) {
.map(messageId -> Map.entry(partitionIndex, toOffset(messageId)));
},
consumer -> Mono.fromCompletionStage(consumer.closeAsync()),
(consumer, e) -> Mono.fromCompletionStage(consumer.closeAsync()),
consumer -> Mono.fromCompletionStage(consumer.closeAsync())
);
})
Expand Down Expand Up @@ -160,6 +161,11 @@ public Publisher<Stream<? extends PartitionSource>> getPublisher(
});
}

@Override
public int hashCode() {
return System.identityHashCode(this);
}

@Override
public boolean equals(Object o) {
return this == o;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public int getNumberOfPartitions() {

@Override
@Test
@DisabledUntil(value = "2020-01-01", comment = "#180 - Pulsar should fix the way seek works, not disconnecting consumers (apache/pulsar/pull/5022)")
@DisabledUntil(value = "2020-03-01", comment = "#180 - Pulsar should fix the way seek works, not disconnecting consumers (apache/pulsar/pull/5022)")
public void shouldAlwaysUseEarliestOffsetOnEmptyOffsetsInTheInitialProvider() {
RecordStorageTests.super.shouldAlwaysUseEarliestOffsetOnEmptyOffsetsInTheInitialProvider();
}
Expand Down

0 comments on commit f99da14

Please sign in to comment.