Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix flow conrol bug + indexoutofbound exception #568

Merged
merged 5 commits into from
Nov 13, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import static io.aklivity.zilla.runtime.engine.concurrent.Signaler.NO_CANCEL_ID;
import static java.lang.System.currentTimeMillis;
import static java.lang.Thread.currentThread;
import static java.time.Instant.now;
import static java.util.concurrent.TimeUnit.SECONDS;

import java.nio.ByteBuffer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongFunction;
Expand Down Expand Up @@ -86,6 +88,9 @@
public final class KafkaCacheClientProduceFactory implements BindingHandler
{
private static final OctetsFW EMPTY_OCTETS = new OctetsFW().wrap(new UnsafeBuffer(), 0, 0);
private static final KafkaKeyFW EMPTY_KEY =
new OctetsFW().wrap(new UnsafeBuffer(ByteBuffer.wrap(new byte[] { 0x00 })), 0, 1)
.get(new KafkaKeyFW()::wrap);
private static final Consumer<OctetsFW.Builder> EMPTY_EXTENSION = ex -> {};
private static final Array32FW<KafkaHeaderFW> EMPTY_TRAILERS =
new Array32FW.Builder<>(new KafkaHeaderFW.Builder(), new KafkaHeaderFW())
Expand Down Expand Up @@ -481,6 +486,7 @@ private void doReset(

final class KafkaCacheClientProduceFan
{
private static final int PRODUCE_FLUSH_SEQUENCE = -1;
private final KafkaCachePartition partition;
private final KafkaCacheCursor cursor;
private final KafkaOffsetType defaultOffset;
Expand Down Expand Up @@ -762,6 +768,47 @@ private void onClientInitialData(
creditor.credit(traceId, partitionIndex, reserved);
}

private void onClientInitialFlush(
KafkaCacheClientProduceStream stream,
FlushFW flush)
{
final long traceId = flush.traceId();
final int reserved = flush.reserved();

stream.segment = partition.newHeadIfNecessary(partitionOffset, EMPTY_KEY, 0, 0);

int error = NO_ERROR;
if (stream.segment != null)
{
final long nextOffset = partition.nextOffset(defaultOffset);
assert partitionOffset >= 0 && partitionOffset >= nextOffset
: String.format("%d >= 0 && %d >= %d", partitionOffset, partitionOffset, nextOffset);

final long keyHash = partition.computeKeyHash(EMPTY_KEY);
partition.writeProduceEntryStart(partitionOffset, stream.segment, stream.entryMark, stream.position,
now().toEpochMilli(), stream.initialId, PRODUCE_FLUSH_SEQUENCE,
KafkaAckMode.LEADER_ONLY, EMPTY_KEY, keyHash, 0, EMPTY_TRAILERS, trailersSizeMax);
stream.partitionOffset = partitionOffset;
partitionOffset++;

Array32FW<KafkaHeaderFW> trailers = EMPTY_TRAILERS;

partition.writeProduceEntryFin(stream.segment, stream.entryMark, stream.position, stream.initialSeq, trailers);
flushClientFanInitialIfNecessary(traceId);
}
else
{
error = ERROR_RECORD_LIST_TOO_LARGE;
}

if (error != NO_ERROR)
{
stream.cleanupClient(traceId, error);
onClientFanMemberClosed(traceId, stream);
}
creditor.credit(traceId, partitionIndex, reserved);
}

private void flushClientFanInitialIfNecessary(
long traceId)
{
Expand Down Expand Up @@ -1318,8 +1365,18 @@ private void onClientInitialFlush(

assert initialAck <= initialSeq;

if (initialSeq > initialAck + initialMax)
{
doClientInitialResetIfNecessary(traceId, EMPTY_OCTETS);
doClientReplyAbortIfNecessary(traceId);
fan.onClientFanMemberClosed(traceId, this);
}
else
{
fan.onClientInitialFlush(this, flush);
}
final int noAck = (int) (initialSeq - initialAck);
doClientInitialWindow(traceId, noAck, noAck + initialBudgetMax);
doClientInitialWindow(traceId, noAck, initialBudgetMax);
}

private void onClientInitialEnd(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1325,6 +1325,8 @@ private void onMergedInitialFlush(
FlushFW flush)
{
final long traceId = flush.traceId();
final long sequence = flush.sequence();
final long acknowledge = flush.acknowledge();
final OctetsFW extension = flush.extension();
final int reserved = flush.reserved();
final ExtensionFW flushEx = extension.get(extensionRO::tryWrap);
Expand Down Expand Up @@ -1379,6 +1381,10 @@ private void onMergedInitialFlush(

final KafkaUnmergedProduceStream producer = findProducePartitionLeader(nextPartitionId);
assert producer != null;

initialSeq = sequence + reserved;
assert initialAck <= initialSeq;

producer.doProduceInitialFlush(traceId, reserved, kafkaMergedFlushEx);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.LongFunction;
import java.util.function.LongSupplier;
import java.util.function.LongUnaryOperator;
Expand Down Expand Up @@ -126,7 +125,6 @@ public class MqttKafkaSessionFactory implements MqttKafkaStreamFactory
private static final int SIGNAL_CONNECT_WILL_STREAM = 2;
private static final int SIGNAL_EXPIRE_SESSION = 3;
private static final int SIZE_OF_UUID = 38;
private static final AtomicInteger CONTEXT_COUNTER = new AtomicInteger(0);
private static final int RETAIN_AVAILABLE_MASK = 1 << MqttServerCapabilities.RETAIN.value();
private static final int WILDCARD_AVAILABLE_MASK = 1 << MqttServerCapabilities.WILDCARD.value();
private static final int SUBSCRIPTION_IDS_AVAILABLE_MASK = 1 << MqttServerCapabilities.SUBSCRIPTION_IDS.value();
Expand Down Expand Up @@ -209,6 +207,7 @@ public class MqttKafkaSessionFactory implements MqttKafkaStreamFactory

private String serverRef;
private int reconnectAttempt;
private int nextContextId;

public MqttKafkaSessionFactory(
MqttKafkaConfiguration config,
Expand Down Expand Up @@ -1220,7 +1219,7 @@ else if (type.equals(EXPIRY_SIGNAL_NAME_OCTETS) && sessionExpiryIds.containsKey(
case MqttSessionSignalFW.KIND_EXPIRY:
final MqttExpirySignalFW expirySignal = sessionSignal.expiry();
long expireAt = expirySignal.expireAt();
final String16FW expiryClientId = expirySignal.clientId();
final String16FW expiryClientId = new String16FW(expirySignal.clientId().asString());

if (expireAt == MqttTime.UNKNOWN.value())
{
Expand All @@ -1231,7 +1230,7 @@ else if (type.equals(EXPIRY_SIGNAL_NAME_OCTETS) && sessionExpiryIds.containsKey(
expireAt = supplyTime.getAsLong() + expirySignal.delay();
}

final int contextId = CONTEXT_COUNTER.incrementAndGet();
final int contextId = nextContextId++;
expiryClientIds.put(contextId, expiryClientId);

final long signalId =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ write advise zilla:flush ${kafka:flushEx()
.merged()
.fetch()
.partition(-1, -1)
.key("key9")
.capabilities("PRODUCE_ONLY")
.key("key9")
.build()
.build()}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,15 @@ write zilla:data.ext ${kafka:dataEx()
write "Hello, world #A1"
write flush

write zilla:data.ext ${kafka:dataEx()
.typeId(zilla:id("kafka"))
.produce()
.sequence(-1)
.ackMode("LEADER_ONLY")
.build()
.build()}
write zilla:data.empty
write flush

write zilla:data.ext ${kafka:dataEx()
.typeId(zilla:id("kafka"))
Expand All @@ -153,6 +162,17 @@ write zilla:data.ext ${kafka:dataEx()
write "Hello, world #A2"
write flush

write zilla:data.ext ${kafka:dataEx()
.typeId(zilla:id("kafka"))
.produce()
.sequence(-1)
.ackMode("LEADER_ONLY")
.build()
.build()}
write zilla:data.empty
write flush


connect await PARTITION_COUNT_3
"zilla://streams/app1"
option zilla:window 8192
Expand Down Expand Up @@ -235,6 +255,16 @@ write zilla:data.ext ${kafka:dataEx()
write "Hello, world #C1"
write flush

write zilla:data.ext ${kafka:dataEx()
.typeId(zilla:id("kafka"))
.produce()
.sequence(-1)
.ackMode("LEADER_ONLY")
.build()
.build()}
write zilla:data.empty
write flush

write zilla:data.ext ${kafka:dataEx()
.typeId(zilla:id("kafka"))
.produce()
Expand All @@ -246,3 +276,12 @@ write zilla:data.ext ${kafka:dataEx()
write "Hello, world #C2"
write flush

write zilla:data.ext ${kafka:dataEx()
.typeId(zilla:id("kafka"))
.produce()
.sequence(-1)
.ackMode("LEADER_ONLY")
.build()
.build()}
write zilla:data.empty
write flush
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,15 @@ read "Hello, world #A1"

read option zilla:ack 16

read zilla:data.ext ${kafka:matchDataEx()
.typeId(zilla:id("kafka"))
.produce()
.sequence(-1)
.ackMode("LEADER_ONLY")
.build()
.build()}
read zilla:data.empty

read zilla:data.ext ${kafka:matchDataEx()
.typeId(zilla:id("kafka"))
.produce()
Expand All @@ -154,6 +163,15 @@ read "Hello, world #A2"

read option zilla:ack 32

read zilla:data.ext ${kafka:matchDataEx()
.typeId(zilla:id("kafka"))
.produce()
.sequence(-1)
.ackMode("LEADER_ONLY")
.build()
.build()}
read zilla:data.empty


accepted

Expand Down Expand Up @@ -230,6 +248,15 @@ read "Hello, world #C1"

read option zilla:ack 16

read zilla:data.ext ${kafka:matchDataEx()
.typeId(zilla:id("kafka"))
.produce()
.sequence(-1)
.ackMode("LEADER_ONLY")
.build()
.build()}
read zilla:data.empty

read zilla:data.ext ${kafka:matchDataEx()
.typeId(zilla:id("kafka"))
.produce()
Expand All @@ -241,3 +268,12 @@ read zilla:data.ext ${kafka:matchDataEx()
read "Hello, world #C2"

read option zilla:ack 32

read zilla:data.ext ${kafka:matchDataEx()
.typeId(zilla:id("kafka"))
.produce()
.sequence(-1)
.ackMode("LEADER_ONLY")
.build()
.build()}
read zilla:data.empty
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,15 @@ write zilla:data.ext ${kafka:dataEx()
write "Hello, world #A1"
write flush

write zilla:data.ext ${kafka:dataEx()
.typeId(zilla:id("kafka"))
.produce()
.sequence(-1)
.ackMode("LEADER_ONLY")
.build()
.build()}
write zilla:data.empty
write flush

write zilla:data.ext ${kafka:dataEx()
.typeId(zilla:id("kafka"))
Expand All @@ -151,6 +160,17 @@ write zilla:data.ext ${kafka:dataEx()
write "Hello, world #A2"
write flush

write zilla:data.ext ${kafka:dataEx()
.typeId(zilla:id("kafka"))
.produce()
.sequence(-1)
.ackMode("LEADER_ONLY")
.build()
.build()}
write zilla:data.empty
write flush


connect await PARTITION_COUNT_3
"zilla://streams/app1"
option zilla:window 8192
Expand Down Expand Up @@ -185,6 +205,16 @@ write zilla:data.ext ${kafka:dataEx()
write "Hello, world #B1"
write flush

write zilla:data.ext ${kafka:dataEx()
.typeId(zilla:id("kafka"))
.produce()
.sequence(-1)
.ackMode("LEADER_ONLY")
.build()
.build()}
write zilla:data.empty
write flush

write zilla:data.ext ${kafka:dataEx()
.typeId(zilla:id("kafka"))
.produce()
Expand Down Expand Up @@ -230,6 +260,16 @@ write zilla:data.ext ${kafka:dataEx()
write "Hi, world #C1"
write flush

write zilla:data.ext ${kafka:dataEx()
.typeId(zilla:id("kafka"))
.produce()
.sequence(-1)
.ackMode("LEADER_ONLY")
.build()
.build()}
write zilla:data.empty
write flush

write zilla:data.ext ${kafka:dataEx()
.typeId(zilla:id("kafka"))
.produce()
Expand Down
Loading