From 2dee89f4e5ea46eae3156305f4c6e67ab327c3a3 Mon Sep 17 00:00:00 2001 From: Anton Agestam Date: Wed, 31 Jan 2024 15:14:36 +0100 Subject: [PATCH] feature: Implement basic support for record fields --- codegen/parser.py | 4 +- container/compose.yml | 5 + src/kio/schema/fetch/v0/response.py | 2 +- src/kio/schema/fetch/v1/response.py | 2 +- src/kio/schema/fetch/v10/response.py | 2 +- src/kio/schema/fetch/v11/response.py | 2 +- src/kio/schema/fetch/v12/response.py | 2 +- src/kio/schema/fetch/v13/response.py | 2 +- src/kio/schema/fetch/v14/response.py | 2 +- src/kio/schema/fetch/v15/response.py | 2 +- src/kio/schema/fetch/v2/response.py | 2 +- src/kio/schema/fetch/v3/response.py | 2 +- src/kio/schema/fetch/v4/response.py | 2 +- src/kio/schema/fetch/v5/response.py | 2 +- src/kio/schema/fetch/v6/response.py | 2 +- src/kio/schema/fetch/v7/response.py | 2 +- src/kio/schema/fetch/v8/response.py | 2 +- src/kio/schema/fetch/v9/response.py | 2 +- src/kio/schema/fetch_snapshot/v0/response.py | 4 +- src/kio/schema/produce/v0/request.py | 2 +- src/kio/schema/produce/v1/request.py | 2 +- src/kio/schema/produce/v2/request.py | 2 +- src/kio/schema/produce/v3/request.py | 2 +- src/kio/schema/produce/v4/request.py | 2 +- src/kio/schema/produce/v5/request.py | 2 +- src/kio/schema/produce/v6/request.py | 2 +- src/kio/schema/produce/v7/request.py | 2 +- src/kio/schema/produce/v8/request.py | 2 +- src/kio/schema/produce/v9/request.py | 2 +- src/kio/serial/_parse.py | 10 +- src/kio/serial/_serialize.py | 10 +- tests/fixtures.py | 27 ++++ tests/serial/test_parse.py | 6 +- tests/serial/test_serialize.py | 8 +- tests/test_integration.py | 137 ++++++++++++++++++- 35 files changed, 208 insertions(+), 55 deletions(-) create mode 100644 tests/fixtures.py diff --git a/codegen/parser.py b/codegen/parser.py index a6d9b67e..3ab4601e 100644 --- a/codegen/parser.py +++ b/codegen/parser.py @@ -96,14 +96,12 @@ def get_type_hint(self, optional: bool = False) -> str: hint = "f64" case Primitive.string: hint = "str" - case Primitive.bytes_: + case Primitive.bytes_ | Primitive.records: hint = "bytes" case Primitive.bool_: hint = "bool" case Primitive.uuid: hint = "uuid.UUID | None" - case Primitive.records: - return "tuple[bytes | None, ...]" case Primitive.error_code: hint = "ErrorCode" case Primitive.timedelta_i32: diff --git a/container/compose.yml b/container/compose.yml index 40d827ae..6bea7f07 100644 --- a/container/compose.yml +++ b/container/compose.yml @@ -24,6 +24,11 @@ services: KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT' KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs' + KAFKA_MAX_MESSAGE_BYTES: 104857600 + KAFKA_MESSAGE_MAX_BYTES: 104857600 + KAFKA_BUFFER_MEMORY: 104857600 + KAFKA_MAX_REQUEST_SIZE: 104857600 + KAFKA_REPLICA_FETCH_MAX_BYTES: 104857600 volumes: - ./update-run.sh:/tmp/update-run.sh tmpfs: diff --git a/src/kio/schema/fetch/v0/response.py b/src/kio/schema/fetch/v0/response.py index 54919a63..88e2830a 100644 --- a/src/kio/schema/fetch/v0/response.py +++ b/src/kio/schema/fetch/v0/response.py @@ -30,7 +30,7 @@ class PartitionData: """The error code, or 0 if there was no fetch error.""" high_watermark: i64 = field(metadata={"kafka_type": "int64"}) """The current high water mark.""" - records: tuple[bytes | None, ...] = field(metadata={"kafka_type": "records"}) + records: bytes | None = field(metadata={"kafka_type": "records"}) """The record data.""" diff --git a/src/kio/schema/fetch/v1/response.py b/src/kio/schema/fetch/v1/response.py index 80666b73..1ad1f353 100644 --- a/src/kio/schema/fetch/v1/response.py +++ b/src/kio/schema/fetch/v1/response.py @@ -31,7 +31,7 @@ class PartitionData: """The error code, or 0 if there was no fetch error.""" high_watermark: i64 = field(metadata={"kafka_type": "int64"}) """The current high water mark.""" - records: tuple[bytes | None, ...] = field(metadata={"kafka_type": "records"}) + records: bytes | None = field(metadata={"kafka_type": "records"}) """The record data.""" diff --git a/src/kio/schema/fetch/v10/response.py b/src/kio/schema/fetch/v10/response.py index abc6683d..580d0e15 100644 --- a/src/kio/schema/fetch/v10/response.py +++ b/src/kio/schema/fetch/v10/response.py @@ -51,7 +51,7 @@ class PartitionData: """The current log start offset.""" aborted_transactions: tuple[AbortedTransaction, ...] """The aborted transactions.""" - records: tuple[bytes | None, ...] = field(metadata={"kafka_type": "records"}) + records: bytes | None = field(metadata={"kafka_type": "records"}) """The record data.""" diff --git a/src/kio/schema/fetch/v11/response.py b/src/kio/schema/fetch/v11/response.py index 177854d9..3f9a7e44 100644 --- a/src/kio/schema/fetch/v11/response.py +++ b/src/kio/schema/fetch/v11/response.py @@ -56,7 +56,7 @@ class PartitionData: metadata={"kafka_type": "int32"}, default=BrokerId(-1) ) """The preferred read replica for the consumer to use on its next fetch request""" - records: tuple[bytes | None, ...] = field(metadata={"kafka_type": "records"}) + records: bytes | None = field(metadata={"kafka_type": "records"}) """The record data.""" diff --git a/src/kio/schema/fetch/v12/response.py b/src/kio/schema/fetch/v12/response.py index d8182941..049aee9e 100644 --- a/src/kio/schema/fetch/v12/response.py +++ b/src/kio/schema/fetch/v12/response.py @@ -100,7 +100,7 @@ class PartitionData: metadata={"kafka_type": "int32"}, default=BrokerId(-1) ) """The preferred read replica for the consumer to use on its next fetch request""" - records: tuple[bytes | None, ...] = field(metadata={"kafka_type": "records"}) + records: bytes | None = field(metadata={"kafka_type": "records"}) """The record data.""" diff --git a/src/kio/schema/fetch/v13/response.py b/src/kio/schema/fetch/v13/response.py index ad5eaced..80aba126 100644 --- a/src/kio/schema/fetch/v13/response.py +++ b/src/kio/schema/fetch/v13/response.py @@ -100,7 +100,7 @@ class PartitionData: metadata={"kafka_type": "int32"}, default=BrokerId(-1) ) """The preferred read replica for the consumer to use on its next fetch request""" - records: tuple[bytes | None, ...] = field(metadata={"kafka_type": "records"}) + records: bytes | None = field(metadata={"kafka_type": "records"}) """The record data.""" diff --git a/src/kio/schema/fetch/v14/response.py b/src/kio/schema/fetch/v14/response.py index 0285e74c..2e70d5ca 100644 --- a/src/kio/schema/fetch/v14/response.py +++ b/src/kio/schema/fetch/v14/response.py @@ -100,7 +100,7 @@ class PartitionData: metadata={"kafka_type": "int32"}, default=BrokerId(-1) ) """The preferred read replica for the consumer to use on its next fetch request""" - records: tuple[bytes | None, ...] = field(metadata={"kafka_type": "records"}) + records: bytes | None = field(metadata={"kafka_type": "records"}) """The record data.""" diff --git a/src/kio/schema/fetch/v15/response.py b/src/kio/schema/fetch/v15/response.py index 1f248258..6cbae8d1 100644 --- a/src/kio/schema/fetch/v15/response.py +++ b/src/kio/schema/fetch/v15/response.py @@ -100,7 +100,7 @@ class PartitionData: metadata={"kafka_type": "int32"}, default=BrokerId(-1) ) """The preferred read replica for the consumer to use on its next fetch request""" - records: tuple[bytes | None, ...] = field(metadata={"kafka_type": "records"}) + records: bytes | None = field(metadata={"kafka_type": "records"}) """The record data.""" diff --git a/src/kio/schema/fetch/v2/response.py b/src/kio/schema/fetch/v2/response.py index cda95934..bcc8bc5f 100644 --- a/src/kio/schema/fetch/v2/response.py +++ b/src/kio/schema/fetch/v2/response.py @@ -31,7 +31,7 @@ class PartitionData: """The error code, or 0 if there was no fetch error.""" high_watermark: i64 = field(metadata={"kafka_type": "int64"}) """The current high water mark.""" - records: tuple[bytes | None, ...] = field(metadata={"kafka_type": "records"}) + records: bytes | None = field(metadata={"kafka_type": "records"}) """The record data.""" diff --git a/src/kio/schema/fetch/v3/response.py b/src/kio/schema/fetch/v3/response.py index 487c5cd6..83711e3c 100644 --- a/src/kio/schema/fetch/v3/response.py +++ b/src/kio/schema/fetch/v3/response.py @@ -31,7 +31,7 @@ class PartitionData: """The error code, or 0 if there was no fetch error.""" high_watermark: i64 = field(metadata={"kafka_type": "int64"}) """The current high water mark.""" - records: tuple[bytes | None, ...] = field(metadata={"kafka_type": "records"}) + records: bytes | None = field(metadata={"kafka_type": "records"}) """The record data.""" diff --git a/src/kio/schema/fetch/v4/response.py b/src/kio/schema/fetch/v4/response.py index 54753bab..93bcef42 100644 --- a/src/kio/schema/fetch/v4/response.py +++ b/src/kio/schema/fetch/v4/response.py @@ -49,7 +49,7 @@ class PartitionData: """The last stable offset (or LSO) of the partition. This is the last offset such that the state of all transactional records prior to this offset have been decided (ABORTED or COMMITTED)""" aborted_transactions: tuple[AbortedTransaction, ...] """The aborted transactions.""" - records: tuple[bytes | None, ...] = field(metadata={"kafka_type": "records"}) + records: bytes | None = field(metadata={"kafka_type": "records"}) """The record data.""" diff --git a/src/kio/schema/fetch/v5/response.py b/src/kio/schema/fetch/v5/response.py index 8eb0b0d5..4eacafc9 100644 --- a/src/kio/schema/fetch/v5/response.py +++ b/src/kio/schema/fetch/v5/response.py @@ -51,7 +51,7 @@ class PartitionData: """The current log start offset.""" aborted_transactions: tuple[AbortedTransaction, ...] """The aborted transactions.""" - records: tuple[bytes | None, ...] = field(metadata={"kafka_type": "records"}) + records: bytes | None = field(metadata={"kafka_type": "records"}) """The record data.""" diff --git a/src/kio/schema/fetch/v6/response.py b/src/kio/schema/fetch/v6/response.py index 259743a0..a779f6f1 100644 --- a/src/kio/schema/fetch/v6/response.py +++ b/src/kio/schema/fetch/v6/response.py @@ -51,7 +51,7 @@ class PartitionData: """The current log start offset.""" aborted_transactions: tuple[AbortedTransaction, ...] """The aborted transactions.""" - records: tuple[bytes | None, ...] = field(metadata={"kafka_type": "records"}) + records: bytes | None = field(metadata={"kafka_type": "records"}) """The record data.""" diff --git a/src/kio/schema/fetch/v7/response.py b/src/kio/schema/fetch/v7/response.py index 5472e555..7f21055c 100644 --- a/src/kio/schema/fetch/v7/response.py +++ b/src/kio/schema/fetch/v7/response.py @@ -51,7 +51,7 @@ class PartitionData: """The current log start offset.""" aborted_transactions: tuple[AbortedTransaction, ...] """The aborted transactions.""" - records: tuple[bytes | None, ...] = field(metadata={"kafka_type": "records"}) + records: bytes | None = field(metadata={"kafka_type": "records"}) """The record data.""" diff --git a/src/kio/schema/fetch/v8/response.py b/src/kio/schema/fetch/v8/response.py index 35ac0b5e..90611fd5 100644 --- a/src/kio/schema/fetch/v8/response.py +++ b/src/kio/schema/fetch/v8/response.py @@ -51,7 +51,7 @@ class PartitionData: """The current log start offset.""" aborted_transactions: tuple[AbortedTransaction, ...] """The aborted transactions.""" - records: tuple[bytes | None, ...] = field(metadata={"kafka_type": "records"}) + records: bytes | None = field(metadata={"kafka_type": "records"}) """The record data.""" diff --git a/src/kio/schema/fetch/v9/response.py b/src/kio/schema/fetch/v9/response.py index a71caf68..d7633c30 100644 --- a/src/kio/schema/fetch/v9/response.py +++ b/src/kio/schema/fetch/v9/response.py @@ -51,7 +51,7 @@ class PartitionData: """The current log start offset.""" aborted_transactions: tuple[AbortedTransaction, ...] """The aborted transactions.""" - records: tuple[bytes | None, ...] = field(metadata={"kafka_type": "records"}) + records: bytes | None = field(metadata={"kafka_type": "records"}) """The record data.""" diff --git a/src/kio/schema/fetch_snapshot/v0/response.py b/src/kio/schema/fetch_snapshot/v0/response.py index 03b3dfb1..173e528c 100644 --- a/src/kio/schema/fetch_snapshot/v0/response.py +++ b/src/kio/schema/fetch_snapshot/v0/response.py @@ -61,9 +61,7 @@ class PartitionSnapshot: """The total size of the snapshot.""" position: i64 = field(metadata={"kafka_type": "int64"}) """The starting byte position within the snapshot included in the Bytes field.""" - unaligned_records: tuple[bytes | None, ...] = field( - metadata={"kafka_type": "records"} - ) + unaligned_records: bytes = field(metadata={"kafka_type": "records"}) """Snapshot data in records format which may not be aligned on an offset boundary""" diff --git a/src/kio/schema/produce/v0/request.py b/src/kio/schema/produce/v0/request.py index a9534f75..08c7bedd 100644 --- a/src/kio/schema/produce/v0/request.py +++ b/src/kio/schema/produce/v0/request.py @@ -25,7 +25,7 @@ class PartitionProduceData: __header_schema__: ClassVar[type[RequestHeader]] = RequestHeader index: i32 = field(metadata={"kafka_type": "int32"}) """The partition index.""" - records: tuple[bytes | None, ...] = field(metadata={"kafka_type": "records"}) + records: bytes | None = field(metadata={"kafka_type": "records"}) """The record data to be produced.""" diff --git a/src/kio/schema/produce/v1/request.py b/src/kio/schema/produce/v1/request.py index c719f4bc..56af3146 100644 --- a/src/kio/schema/produce/v1/request.py +++ b/src/kio/schema/produce/v1/request.py @@ -25,7 +25,7 @@ class PartitionProduceData: __header_schema__: ClassVar[type[RequestHeader]] = RequestHeader index: i32 = field(metadata={"kafka_type": "int32"}) """The partition index.""" - records: tuple[bytes | None, ...] = field(metadata={"kafka_type": "records"}) + records: bytes | None = field(metadata={"kafka_type": "records"}) """The record data to be produced.""" diff --git a/src/kio/schema/produce/v2/request.py b/src/kio/schema/produce/v2/request.py index 87ef59dc..5d87940e 100644 --- a/src/kio/schema/produce/v2/request.py +++ b/src/kio/schema/produce/v2/request.py @@ -25,7 +25,7 @@ class PartitionProduceData: __header_schema__: ClassVar[type[RequestHeader]] = RequestHeader index: i32 = field(metadata={"kafka_type": "int32"}) """The partition index.""" - records: tuple[bytes | None, ...] = field(metadata={"kafka_type": "records"}) + records: bytes | None = field(metadata={"kafka_type": "records"}) """The record data to be produced.""" diff --git a/src/kio/schema/produce/v3/request.py b/src/kio/schema/produce/v3/request.py index 359d7eb2..6427877f 100644 --- a/src/kio/schema/produce/v3/request.py +++ b/src/kio/schema/produce/v3/request.py @@ -26,7 +26,7 @@ class PartitionProduceData: __header_schema__: ClassVar[type[RequestHeader]] = RequestHeader index: i32 = field(metadata={"kafka_type": "int32"}) """The partition index.""" - records: tuple[bytes | None, ...] = field(metadata={"kafka_type": "records"}) + records: bytes | None = field(metadata={"kafka_type": "records"}) """The record data to be produced.""" diff --git a/src/kio/schema/produce/v4/request.py b/src/kio/schema/produce/v4/request.py index 53da2162..faafe96c 100644 --- a/src/kio/schema/produce/v4/request.py +++ b/src/kio/schema/produce/v4/request.py @@ -26,7 +26,7 @@ class PartitionProduceData: __header_schema__: ClassVar[type[RequestHeader]] = RequestHeader index: i32 = field(metadata={"kafka_type": "int32"}) """The partition index.""" - records: tuple[bytes | None, ...] = field(metadata={"kafka_type": "records"}) + records: bytes | None = field(metadata={"kafka_type": "records"}) """The record data to be produced.""" diff --git a/src/kio/schema/produce/v5/request.py b/src/kio/schema/produce/v5/request.py index 8acb71ca..f24ba6b7 100644 --- a/src/kio/schema/produce/v5/request.py +++ b/src/kio/schema/produce/v5/request.py @@ -26,7 +26,7 @@ class PartitionProduceData: __header_schema__: ClassVar[type[RequestHeader]] = RequestHeader index: i32 = field(metadata={"kafka_type": "int32"}) """The partition index.""" - records: tuple[bytes | None, ...] = field(metadata={"kafka_type": "records"}) + records: bytes | None = field(metadata={"kafka_type": "records"}) """The record data to be produced.""" diff --git a/src/kio/schema/produce/v6/request.py b/src/kio/schema/produce/v6/request.py index 440c33a6..3c2d1e2c 100644 --- a/src/kio/schema/produce/v6/request.py +++ b/src/kio/schema/produce/v6/request.py @@ -26,7 +26,7 @@ class PartitionProduceData: __header_schema__: ClassVar[type[RequestHeader]] = RequestHeader index: i32 = field(metadata={"kafka_type": "int32"}) """The partition index.""" - records: tuple[bytes | None, ...] = field(metadata={"kafka_type": "records"}) + records: bytes | None = field(metadata={"kafka_type": "records"}) """The record data to be produced.""" diff --git a/src/kio/schema/produce/v7/request.py b/src/kio/schema/produce/v7/request.py index d26b6ca2..37286810 100644 --- a/src/kio/schema/produce/v7/request.py +++ b/src/kio/schema/produce/v7/request.py @@ -26,7 +26,7 @@ class PartitionProduceData: __header_schema__: ClassVar[type[RequestHeader]] = RequestHeader index: i32 = field(metadata={"kafka_type": "int32"}) """The partition index.""" - records: tuple[bytes | None, ...] = field(metadata={"kafka_type": "records"}) + records: bytes | None = field(metadata={"kafka_type": "records"}) """The record data to be produced.""" diff --git a/src/kio/schema/produce/v8/request.py b/src/kio/schema/produce/v8/request.py index 2ec07269..bf2396c1 100644 --- a/src/kio/schema/produce/v8/request.py +++ b/src/kio/schema/produce/v8/request.py @@ -26,7 +26,7 @@ class PartitionProduceData: __header_schema__: ClassVar[type[RequestHeader]] = RequestHeader index: i32 = field(metadata={"kafka_type": "int32"}) """The partition index.""" - records: tuple[bytes | None, ...] = field(metadata={"kafka_type": "records"}) + records: bytes | None = field(metadata={"kafka_type": "records"}) """The record data to be produced.""" diff --git a/src/kio/schema/produce/v9/request.py b/src/kio/schema/produce/v9/request.py index c911e977..1d6ab267 100644 --- a/src/kio/schema/produce/v9/request.py +++ b/src/kio/schema/produce/v9/request.py @@ -26,7 +26,7 @@ class PartitionProduceData: __header_schema__: ClassVar[type[RequestHeader]] = RequestHeader index: i32 = field(metadata={"kafka_type": "int32"}) """The partition index.""" - records: tuple[bytes | None, ...] = field(metadata={"kafka_type": "records"}) + records: bytes | None = field(metadata={"kafka_type": "records"}) """The record data to be produced.""" diff --git a/src/kio/serial/_parse.py b/src/kio/serial/_parse.py index ecc8494c..6d8d7b24 100644 --- a/src/kio/serial/_parse.py +++ b/src/kio/serial/_parse.py @@ -51,15 +51,13 @@ def get_reader( return readers.read_legacy_string case ("string", False, True): return readers.read_nullable_legacy_string - case ("bytes", True, False): + case ("bytes" | "records", True, False): return readers.read_compact_string_as_bytes - case ("bytes", True, True): + case ("bytes" | "records", True, True): return readers.read_compact_string_as_bytes_nullable - case ("bytes", False, False): + case ("bytes" | "records", False, False): return readers.read_legacy_bytes - case ("bytes", False, True): - return readers.read_nullable_legacy_bytes - case ("records", _, True): + case ("bytes" | "records", False, True): return readers.read_nullable_legacy_bytes case ("uuid", _, True): return readers.read_uuid diff --git a/src/kio/serial/_serialize.py b/src/kio/serial/_serialize.py index 239e7d70..69b9200f 100644 --- a/src/kio/serial/_serialize.py +++ b/src/kio/serial/_serialize.py @@ -57,16 +57,14 @@ def get_writer( return writers.write_legacy_string case ("string", False, True): return writers.write_nullable_legacy_string - case ("bytes", True, False): + case ("bytes" | "records", True, False): return writers.write_compact_string - case ("bytes", True, True): + case ("bytes" | "records", True, True): return writers.write_nullable_compact_string - case ("bytes", False, False): + case ("bytes" | "records", False, False): return writers.write_legacy_bytes - case ("bytes", False, True): + case ("bytes" | "records", False, True): return writers.write_nullable_legacy_bytes - case ("records", _, True): - return writers.write_nullable_legacy_string case ("uuid", _, _): return writers.write_uuid case ("bool", _, False): diff --git a/tests/fixtures.py b/tests/fixtures.py new file mode 100644 index 00000000..d09eb84d --- /dev/null +++ b/tests/fixtures.py @@ -0,0 +1,27 @@ +# Ruthlessly stolen from python-kafka. +# This is real live data from Kafka 11 broker +from typing import Final + +record_batch_data_v2: Final = ( + # First Batch value == "123" + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00;\x00\x00\x00\x01\x02\x03" + b"\x18\xa2p\x00\x00\x00\x00\x00\x00\x00\x00\x01]\xff{\x06<\x00\x00\x01]" + b"\xff{\x06<\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00" + b"\x00\x00\x01\x12\x00\x00\x00\x01\x06123\x00", + # Second Batch value = "" and value = "". 2 records + b"\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00@\x00\x00\x00\x02\x02\xc8" + b"\\\xbd#\x00\x00\x00\x00\x00\x01\x00\x00\x01]\xff|\xddl\x00\x00\x01]\xff" + b"|\xde\x14\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00" + b"\x00\x00\x02\x0c\x00\x00\x00\x01\x00\x00\x0e\x00\xd0\x02\x02\x01\x00" + b"\x00", + # Third batch value = "123" + b"\x00\x00\x00\x00\x00\x00\x00\x03\x00\x00\x00;\x00\x00\x00\x02\x02.\x0b" + b"\x85\xb7\x00\x00\x00\x00\x00\x00\x00\x00\x01]\xff|\xe7\x9d\x00\x00\x01]" + b"\xff|\xe7\x9d\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff" + b"\x00\x00\x00\x01\x12\x00\x00\x00\x01\x06123\x00" + # Fourth batch value = "hdr" with header hkey=hval + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00E\x00\x00\x00\x00\x02\\" + b"\xd8\xefR\x00\x00\x00\x00\x00\x00\x00\x00\x01e\x85\xb6\xf3\xc1\x00\x00" + b"\x01e\x85\xb6\xf3\xc1\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff" + b"\xff\xff\x00\x00\x00\x01&\x00\x00\x00\x01\x06hdr\x02\x08hkey\x08hval", +) diff --git a/tests/serial/test_parse.py b/tests/serial/test_parse.py index 5a7e5394..5b813e8b 100644 --- a/tests/serial/test_parse.py +++ b/tests/serial/test_parse.py @@ -63,7 +63,9 @@ class TestGetReader: ("bytes", True, True, readers.read_compact_string_as_bytes_nullable), ("bytes", False, False, readers.read_legacy_bytes), ("bytes", False, True, readers.read_nullable_legacy_bytes), - ("records", True, True, readers.read_nullable_legacy_bytes), + ("records", True, False, readers.read_compact_string_as_bytes), + ("records", True, True, readers.read_compact_string_as_bytes_nullable), + ("records", False, False, readers.read_legacy_bytes), ("records", False, True, readers.read_nullable_legacy_bytes), ("uuid", False, True, readers.read_uuid), ("uuid", True, True, readers.read_uuid), @@ -121,8 +123,6 @@ def test_can_match_kafka_type_with_reader( ("timedelta_i32", False, True), ("timedelta_i64", True, True), ("timedelta_i64", False, True), - ("records", True, False), - ("records", False, False), ), ) def test_raises_not_implemented_error_for_invalid_combination( diff --git a/tests/serial/test_serialize.py b/tests/serial/test_serialize.py index 40bea737..9f464a7d 100644 --- a/tests/serial/test_serialize.py +++ b/tests/serial/test_serialize.py @@ -63,8 +63,10 @@ class TestGetWriter: ("bytes", False, True, writers.write_nullable_legacy_bytes), ("bytes", True, False, writers.write_compact_string), ("bytes", False, False, writers.write_legacy_bytes), - ("records", True, True, writers.write_nullable_legacy_string), - ("records", False, True, writers.write_nullable_legacy_string), + ("records", True, True, writers.write_nullable_compact_string), + ("records", False, True, writers.write_nullable_legacy_bytes), + ("records", True, False, writers.write_compact_string), + ("records", False, False, writers.write_legacy_bytes), ("uuid", False, False, writers.write_uuid), ("uuid", True, False, writers.write_uuid), ("uuid", False, True, writers.write_uuid), @@ -115,8 +117,6 @@ def test_can_match_kafka_type_with_writer( ("float64", False, True), ("bool", False, True), ("bool", True, True), - ("records", True, False), - ("records", False, False), ("error_code", True, True), ("error_code", False, True), ("timedelta_i32", True, True), diff --git a/tests/test_integration.py b/tests/test_integration.py index 14dc97c4..3e095608 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -8,6 +8,7 @@ from typing import Any from typing import Final from typing import TypeVar +from typing import assert_type from unittest import mock import pytest @@ -29,12 +30,21 @@ from kio.schema.delete_topics.v6 import DeleteTopicsResponse from kio.schema.delete_topics.v6.request import DeleteTopicState from kio.schema.delete_topics.v6.response import DeletableTopicResult +from kio.schema.fetch.v13.request import FetchPartition +from kio.schema.fetch.v13.request import FetchRequest +from kio.schema.fetch.v13.request import FetchTopic +from kio.schema.fetch.v13.response import FetchResponse from kio.schema.metadata.v5 import request as metadata_v5_request from kio.schema.metadata.v5 import response as metadata_v5_response from kio.schema.metadata.v12 import request as metadata_v12_request from kio.schema.metadata.v12 import response as metadata_v12_response +from kio.schema.produce.v9.request import PartitionProduceData +from kio.schema.produce.v9.request import ProduceRequest +from kio.schema.produce.v9.request import TopicProduceData +from kio.schema.produce.v9.response import ProduceResponse from kio.schema.types import BrokerId from kio.schema.types import TopicName +from kio.schema.types import TransactionalId from kio.serial import entity_reader from kio.serial import entity_writer from kio.serial.readers import read_int32 @@ -42,12 +52,16 @@ from kio.serial.writers import write_int32 from kio.static.constants import ErrorCode from kio.static.constants import uuid_zero +from kio.static.primitive import i8 from kio.static.primitive import i16 from kio.static.primitive import i32 from kio.static.primitive import i32Timedelta +from kio.static.primitive import i64 from kio.static.protocol import Entity from kio.static.protocol import Payload +from . import fixtures + pytestmark = pytest.mark.integration timedelta_zero: Final = i32Timedelta.parse(datetime.timedelta()) @@ -158,7 +172,7 @@ async def make_request( # We use asynchronous facilities to send the request and read the raw response into # a BytesIO buffer. with closing(stream_writer): - async with asyncio.timeout(1): + async with asyncio.timeout(10): await send(stream_writer, request, correlation_id) response = await read_response_bytes(stream_reader) @@ -342,13 +356,13 @@ async def create_topic(topic_name: TopicName) -> CreateTopicsResponse: ) -async def metadata_v12() -> metadata_v12_response.MetadataResponse: +async def metadata_v12(topic: TopicName) -> metadata_v12_response.MetadataResponse: return await make_request( request=metadata_v12_request.MetadataRequest( topics=( metadata_v12_request.MetadataRequestTopic( topic_id=uuid_zero, - name=TopicName("le-topic"), + name=topic, ), ), allow_auto_topic_creation=True, @@ -407,7 +421,7 @@ async def test_topic_and_metadata_operations() -> None: (created_topic,) = create_topics_response.topics # The previous creation call should guarantee this call contains the topic. - response_v12 = await metadata_v12() + response_v12 = await metadata_v12(created_topic.name) assert response_v12 == metadata_v12_response.MetadataResponse( throttle_time=timedelta_zero, brokers=( @@ -450,3 +464,118 @@ async def test_topic_and_metadata_operations() -> None: controller_id=BrokerId(1), cluster_id=mock.ANY, ) + + +# Custom exception for ability to xfail narrowly. +class _IncompleteFetch(Exception): + ... + + +@pytest.mark.xfail( + raises=_IncompleteFetch, + reason=( + "This test is flaky. Intermittently, Kafka returns incomplete responses for " + "the fetch response. If this turns out to be expected behavior, we need to " + "adjust this test to retry fetching until it parses successfully." + ), +) +async def test_produce_consume() -> None: + topic_name = TopicName("le-topic-2") + + # Test delete topic which might or might not exist. + delete_topics_response = await delete_topic(topic_name) + assert delete_topics_response == DeleteTopicsResponse( + throttle_time=timedelta_zero, + responses=( + DeletableTopicResult( + name=topic_name, + topic_id=mock.ANY, + error_code=mock.ANY, + error_message=mock.ANY, + ), + ), + ) + + # The previous deletion call should guarantee this call succeeds. + create_topics_response = await create_topic(topic_name) + assert create_topics_response == CreateTopicsResponse( + throttle_time=timedelta_zero, + topics=( + CreatableTopicResult( + name=topic_name, + topic_id=mock.ANY, + error_code=ErrorCode.none, + error_message=None, + num_partitions=i32(3), + replication_factor=i16(1), + configs=mock.ANY, + ), + ), + ) + (created_topic,) = create_topics_response.topics + + metadata = await metadata_v12(topic_name) + (topic,) = metadata.topics + partition, _, _ = topic.partitions + + produce_request = ProduceRequest( + transactional_id=TransactionalId("foo"), + # Note that passing 0 here results in server sending no response at all. + acks=i16(1), + timeout=i32Timedelta.parse(datetime.timedelta(seconds=1)), + topic_data=( + TopicProduceData( + name=topic_name, + partition_data=( + PartitionProduceData( + index=partition.partition_index, + records=fixtures.record_batch_data_v2[0], + ), + ), + ), + ), + ) + produce_response = await make_request(produce_request, ProduceResponse) + + (topic_response,) = produce_response.responses + assert topic_response.name == topic_name + (partition_response,) = topic_response.partition_responses + assert partition_response.error_code is ErrorCode.none + + fetch_request = FetchRequest( + max_wait=i32Timedelta.parse(datetime.timedelta(seconds=10)), + min_bytes=i32(0), + isolation_level=i8(1), + topics=( + FetchTopic( + topic_id=topic.topic_id, + partitions=( + FetchPartition( + partition=partition.partition_index, + fetch_offset=i64(0), + partition_max_bytes=i32(104_857_600), + ), + ), + ), + ), + forgotten_topics_data=(), + ) + + try: + fetch_response = await make_request(fetch_request, FetchResponse) + except ValueError as exception: + # Re-raise as custom error in order to xfail as strictly as possible. + if str(exception).startswith("not enough values to unpack"): + raise _IncompleteFetch from exception + raise + + assert_type(fetch_response, FetchResponse) + assert fetch_response.error_code is ErrorCode.none + (fetch_topic_response,) = fetch_response.responses + assert fetch_topic_response.topic_id == topic.topic_id + (fetch_partition,) = fetch_topic_response.partitions + assert fetch_partition.partition_index == partition.partition_index + assert fetch_partition.error_code is ErrorCode.none + # Compare record batch excluding baseOffset, batchLength, and partitionLeaderEpoch. + assert fetch_partition.records is not None + assert fetch_partition.records[16:] == fixtures.record_batch_data_v2[0][16:]