Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: Implement basic support for record fields #122

Merged
merged 2 commits into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions codegen/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,12 @@ def get_type_hint(self, optional: bool = False) -> str:
hint = "f64"
case Primitive.string:
hint = "str"
case Primitive.bytes_:
case Primitive.bytes_ | Primitive.records:
hint = "bytes"
case Primitive.bool_:
hint = "bool"
case Primitive.uuid:
hint = "uuid.UUID | None"
case Primitive.records:
return "tuple[bytes | None, ...]"
case Primitive.error_code:
hint = "ErrorCode"
case Primitive.timedelta_i32:
Expand Down
5 changes: 5 additions & 0 deletions container/compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ services:
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
KAFKA_MAX_MESSAGE_BYTES: 104857600
KAFKA_MESSAGE_MAX_BYTES: 104857600
KAFKA_BUFFER_MEMORY: 104857600
KAFKA_MAX_REQUEST_SIZE: 104857600
KAFKA_REPLICA_FETCH_MAX_BYTES: 104857600
volumes:
- ./update-run.sh:/tmp/update-run.sh
tmpfs:
Expand Down
2 changes: 1 addition & 1 deletion src/kio/schema/fetch/v0/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class PartitionData:
"""The error code, or 0 if there was no fetch error."""
high_watermark: i64 = field(metadata={"kafka_type": "int64"})
"""The current high water mark."""
records: tuple[bytes | None, ...] = field(metadata={"kafka_type": "records"})
records: bytes | None = field(metadata={"kafka_type": "records"})
"""The record data."""


Expand Down
2 changes: 1 addition & 1 deletion src/kio/schema/fetch/v1/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class PartitionData:
"""The error code, or 0 if there was no fetch error."""
high_watermark: i64 = field(metadata={"kafka_type": "int64"})
"""The current high water mark."""
records: tuple[bytes | None, ...] = field(metadata={"kafka_type": "records"})
records: bytes | None = field(metadata={"kafka_type": "records"})
"""The record data."""


Expand Down
2 changes: 1 addition & 1 deletion src/kio/schema/fetch/v10/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class PartitionData:
"""The current log start offset."""
aborted_transactions: tuple[AbortedTransaction, ...]
"""The aborted transactions."""
records: tuple[bytes | None, ...] = field(metadata={"kafka_type": "records"})
records: bytes | None = field(metadata={"kafka_type": "records"})
"""The record data."""


Expand Down
2 changes: 1 addition & 1 deletion src/kio/schema/fetch/v11/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class PartitionData:
metadata={"kafka_type": "int32"}, default=BrokerId(-1)
)
"""The preferred read replica for the consumer to use on its next fetch request"""
records: tuple[bytes | None, ...] = field(metadata={"kafka_type": "records"})
records: bytes | None = field(metadata={"kafka_type": "records"})
"""The record data."""


Expand Down
2 changes: 1 addition & 1 deletion src/kio/schema/fetch/v12/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ class PartitionData:
metadata={"kafka_type": "int32"}, default=BrokerId(-1)
)
"""The preferred read replica for the consumer to use on its next fetch request"""
records: tuple[bytes | None, ...] = field(metadata={"kafka_type": "records"})
records: bytes | None = field(metadata={"kafka_type": "records"})
"""The record data."""


Expand Down
2 changes: 1 addition & 1 deletion src/kio/schema/fetch/v13/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ class PartitionData:
metadata={"kafka_type": "int32"}, default=BrokerId(-1)
)
"""The preferred read replica for the consumer to use on its next fetch request"""
records: tuple[bytes | None, ...] = field(metadata={"kafka_type": "records"})
records: bytes | None = field(metadata={"kafka_type": "records"})
"""The record data."""


Expand Down
2 changes: 1 addition & 1 deletion src/kio/schema/fetch/v14/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ class PartitionData:
metadata={"kafka_type": "int32"}, default=BrokerId(-1)
)
"""The preferred read replica for the consumer to use on its next fetch request"""
records: tuple[bytes | None, ...] = field(metadata={"kafka_type": "records"})
records: bytes | None = field(metadata={"kafka_type": "records"})
"""The record data."""


Expand Down
2 changes: 1 addition & 1 deletion src/kio/schema/fetch/v15/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ class PartitionData:
metadata={"kafka_type": "int32"}, default=BrokerId(-1)
)
"""The preferred read replica for the consumer to use on its next fetch request"""
records: tuple[bytes | None, ...] = field(metadata={"kafka_type": "records"})
records: bytes | None = field(metadata={"kafka_type": "records"})
"""The record data."""


Expand Down
2 changes: 1 addition & 1 deletion src/kio/schema/fetch/v2/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class PartitionData:
"""The error code, or 0 if there was no fetch error."""
high_watermark: i64 = field(metadata={"kafka_type": "int64"})
"""The current high water mark."""
records: tuple[bytes | None, ...] = field(metadata={"kafka_type": "records"})
records: bytes | None = field(metadata={"kafka_type": "records"})
"""The record data."""


Expand Down
2 changes: 1 addition & 1 deletion src/kio/schema/fetch/v3/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class PartitionData:
"""The error code, or 0 if there was no fetch error."""
high_watermark: i64 = field(metadata={"kafka_type": "int64"})
"""The current high water mark."""
records: tuple[bytes | None, ...] = field(metadata={"kafka_type": "records"})
records: bytes | None = field(metadata={"kafka_type": "records"})
"""The record data."""


Expand Down
2 changes: 1 addition & 1 deletion src/kio/schema/fetch/v4/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class PartitionData:
"""The last stable offset (or LSO) of the partition. This is the last offset such that the state of all transactional records prior to this offset have been decided (ABORTED or COMMITTED)"""
aborted_transactions: tuple[AbortedTransaction, ...]
"""The aborted transactions."""
records: tuple[bytes | None, ...] = field(metadata={"kafka_type": "records"})
records: bytes | None = field(metadata={"kafka_type": "records"})
"""The record data."""


Expand Down
2 changes: 1 addition & 1 deletion src/kio/schema/fetch/v5/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class PartitionData:
"""The current log start offset."""
aborted_transactions: tuple[AbortedTransaction, ...]
"""The aborted transactions."""
records: tuple[bytes | None, ...] = field(metadata={"kafka_type": "records"})
records: bytes | None = field(metadata={"kafka_type": "records"})
"""The record data."""


Expand Down
2 changes: 1 addition & 1 deletion src/kio/schema/fetch/v6/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class PartitionData:
"""The current log start offset."""
aborted_transactions: tuple[AbortedTransaction, ...]
"""The aborted transactions."""
records: tuple[bytes | None, ...] = field(metadata={"kafka_type": "records"})
records: bytes | None = field(metadata={"kafka_type": "records"})
"""The record data."""


Expand Down
2 changes: 1 addition & 1 deletion src/kio/schema/fetch/v7/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class PartitionData:
"""The current log start offset."""
aborted_transactions: tuple[AbortedTransaction, ...]
"""The aborted transactions."""
records: tuple[bytes | None, ...] = field(metadata={"kafka_type": "records"})
records: bytes | None = field(metadata={"kafka_type": "records"})
"""The record data."""


Expand Down
2 changes: 1 addition & 1 deletion src/kio/schema/fetch/v8/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class PartitionData:
"""The current log start offset."""
aborted_transactions: tuple[AbortedTransaction, ...]
"""The aborted transactions."""
records: tuple[bytes | None, ...] = field(metadata={"kafka_type": "records"})
records: bytes | None = field(metadata={"kafka_type": "records"})
"""The record data."""


Expand Down
2 changes: 1 addition & 1 deletion src/kio/schema/fetch/v9/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class PartitionData:
"""The current log start offset."""
aborted_transactions: tuple[AbortedTransaction, ...]
"""The aborted transactions."""
records: tuple[bytes | None, ...] = field(metadata={"kafka_type": "records"})
records: bytes | None = field(metadata={"kafka_type": "records"})
"""The record data."""


Expand Down
4 changes: 1 addition & 3 deletions src/kio/schema/fetch_snapshot/v0/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,7 @@ class PartitionSnapshot:
"""The total size of the snapshot."""
position: i64 = field(metadata={"kafka_type": "int64"})
"""The starting byte position within the snapshot included in the Bytes field."""
unaligned_records: tuple[bytes | None, ...] = field(
metadata={"kafka_type": "records"}
)
unaligned_records: bytes = field(metadata={"kafka_type": "records"})
"""Snapshot data in records format which may not be aligned on an offset boundary"""


Expand Down
2 changes: 1 addition & 1 deletion src/kio/schema/produce/v0/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class PartitionProduceData:
__header_schema__: ClassVar[type[RequestHeader]] = RequestHeader
index: i32 = field(metadata={"kafka_type": "int32"})
"""The partition index."""
records: tuple[bytes | None, ...] = field(metadata={"kafka_type": "records"})
records: bytes | None = field(metadata={"kafka_type": "records"})
"""The record data to be produced."""


Expand Down
2 changes: 1 addition & 1 deletion src/kio/schema/produce/v1/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class PartitionProduceData:
__header_schema__: ClassVar[type[RequestHeader]] = RequestHeader
index: i32 = field(metadata={"kafka_type": "int32"})
"""The partition index."""
records: tuple[bytes | None, ...] = field(metadata={"kafka_type": "records"})
records: bytes | None = field(metadata={"kafka_type": "records"})
"""The record data to be produced."""


Expand Down
2 changes: 1 addition & 1 deletion src/kio/schema/produce/v2/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class PartitionProduceData:
__header_schema__: ClassVar[type[RequestHeader]] = RequestHeader
index: i32 = field(metadata={"kafka_type": "int32"})
"""The partition index."""
records: tuple[bytes | None, ...] = field(metadata={"kafka_type": "records"})
records: bytes | None = field(metadata={"kafka_type": "records"})
"""The record data to be produced."""


Expand Down
2 changes: 1 addition & 1 deletion src/kio/schema/produce/v3/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class PartitionProduceData:
__header_schema__: ClassVar[type[RequestHeader]] = RequestHeader
index: i32 = field(metadata={"kafka_type": "int32"})
"""The partition index."""
records: tuple[bytes | None, ...] = field(metadata={"kafka_type": "records"})
records: bytes | None = field(metadata={"kafka_type": "records"})
"""The record data to be produced."""


Expand Down
2 changes: 1 addition & 1 deletion src/kio/schema/produce/v4/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class PartitionProduceData:
__header_schema__: ClassVar[type[RequestHeader]] = RequestHeader
index: i32 = field(metadata={"kafka_type": "int32"})
"""The partition index."""
records: tuple[bytes | None, ...] = field(metadata={"kafka_type": "records"})
records: bytes | None = field(metadata={"kafka_type": "records"})
"""The record data to be produced."""


Expand Down
2 changes: 1 addition & 1 deletion src/kio/schema/produce/v5/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class PartitionProduceData:
__header_schema__: ClassVar[type[RequestHeader]] = RequestHeader
index: i32 = field(metadata={"kafka_type": "int32"})
"""The partition index."""
records: tuple[bytes | None, ...] = field(metadata={"kafka_type": "records"})
records: bytes | None = field(metadata={"kafka_type": "records"})
"""The record data to be produced."""


Expand Down
2 changes: 1 addition & 1 deletion src/kio/schema/produce/v6/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class PartitionProduceData:
__header_schema__: ClassVar[type[RequestHeader]] = RequestHeader
index: i32 = field(metadata={"kafka_type": "int32"})
"""The partition index."""
records: tuple[bytes | None, ...] = field(metadata={"kafka_type": "records"})
records: bytes | None = field(metadata={"kafka_type": "records"})
"""The record data to be produced."""


Expand Down
2 changes: 1 addition & 1 deletion src/kio/schema/produce/v7/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class PartitionProduceData:
__header_schema__: ClassVar[type[RequestHeader]] = RequestHeader
index: i32 = field(metadata={"kafka_type": "int32"})
"""The partition index."""
records: tuple[bytes | None, ...] = field(metadata={"kafka_type": "records"})
records: bytes | None = field(metadata={"kafka_type": "records"})
"""The record data to be produced."""


Expand Down
2 changes: 1 addition & 1 deletion src/kio/schema/produce/v8/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class PartitionProduceData:
__header_schema__: ClassVar[type[RequestHeader]] = RequestHeader
index: i32 = field(metadata={"kafka_type": "int32"})
"""The partition index."""
records: tuple[bytes | None, ...] = field(metadata={"kafka_type": "records"})
records: bytes | None = field(metadata={"kafka_type": "records"})
"""The record data to be produced."""


Expand Down
2 changes: 1 addition & 1 deletion src/kio/schema/produce/v9/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class PartitionProduceData:
__header_schema__: ClassVar[type[RequestHeader]] = RequestHeader
index: i32 = field(metadata={"kafka_type": "int32"})
"""The partition index."""
records: tuple[bytes | None, ...] = field(metadata={"kafka_type": "records"})
records: bytes | None = field(metadata={"kafka_type": "records"})
"""The record data to be produced."""


Expand Down
10 changes: 4 additions & 6 deletions src/kio/serial/_parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,13 @@ def get_reader(
return readers.read_legacy_string
case ("string", False, True):
return readers.read_nullable_legacy_string
case ("bytes", True, False):
case ("bytes" | "records", True, False):
return readers.read_compact_string_as_bytes
case ("bytes", True, True):
case ("bytes" | "records", True, True):
return readers.read_compact_string_as_bytes_nullable
case ("bytes", False, False):
case ("bytes" | "records", False, False):
return readers.read_legacy_bytes
case ("bytes", False, True):
return readers.read_nullable_legacy_bytes
case ("records", _, True):
case ("bytes" | "records", False, True):
return readers.read_nullable_legacy_bytes
case ("uuid", _, True):
return readers.read_uuid
Expand Down
10 changes: 4 additions & 6 deletions src/kio/serial/_serialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,14 @@ def get_writer(
return writers.write_legacy_string
case ("string", False, True):
return writers.write_nullable_legacy_string
case ("bytes", True, False):
case ("bytes" | "records", True, False):
return writers.write_compact_string
case ("bytes", True, True):
case ("bytes" | "records", True, True):
return writers.write_nullable_compact_string
case ("bytes", False, False):
case ("bytes" | "records", False, False):
return writers.write_legacy_bytes
case ("bytes", False, True):
case ("bytes" | "records", False, True):
return writers.write_nullable_legacy_bytes
case ("records", _, True):
return writers.write_nullable_legacy_string
case ("uuid", _, _):
return writers.write_uuid
case ("bool", _, False):
Expand Down
27 changes: 27 additions & 0 deletions tests/fixtures.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Ruthlessly stolen from python-kafka.
# This is real live data from Kafka 11 broker
from typing import Final

record_batch_data_v2: Final = (
# First Batch value == "123"
b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00;\x00\x00\x00\x01\x02\x03"
b"\x18\xa2p\x00\x00\x00\x00\x00\x00\x00\x00\x01]\xff{\x06<\x00\x00\x01]"
b"\xff{\x06<\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00"
b"\x00\x00\x01\x12\x00\x00\x00\x01\x06123\x00",
# Second Batch value = "" and value = "". 2 records
b"\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00@\x00\x00\x00\x02\x02\xc8"
b"\\\xbd#\x00\x00\x00\x00\x00\x01\x00\x00\x01]\xff|\xddl\x00\x00\x01]\xff"
b"|\xde\x14\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00"
b"\x00\x00\x02\x0c\x00\x00\x00\x01\x00\x00\x0e\x00\xd0\x02\x02\x01\x00"
b"\x00",
# Third batch value = "123"
b"\x00\x00\x00\x00\x00\x00\x00\x03\x00\x00\x00;\x00\x00\x00\x02\x02.\x0b"
b"\x85\xb7\x00\x00\x00\x00\x00\x00\x00\x00\x01]\xff|\xe7\x9d\x00\x00\x01]"
b"\xff|\xe7\x9d\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff"
b"\x00\x00\x00\x01\x12\x00\x00\x00\x01\x06123\x00"
# Fourth batch value = "hdr" with header hkey=hval
b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00E\x00\x00\x00\x00\x02\\"
b"\xd8\xefR\x00\x00\x00\x00\x00\x00\x00\x00\x01e\x85\xb6\xf3\xc1\x00\x00"
b"\x01e\x85\xb6\xf3\xc1\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff"
b"\xff\xff\x00\x00\x00\x01&\x00\x00\x00\x01\x06hdr\x02\x08hkey\x08hval",
)
6 changes: 3 additions & 3 deletions tests/serial/test_parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ class TestGetReader:
("bytes", True, True, readers.read_compact_string_as_bytes_nullable),
("bytes", False, False, readers.read_legacy_bytes),
("bytes", False, True, readers.read_nullable_legacy_bytes),
("records", True, True, readers.read_nullable_legacy_bytes),
("records", True, False, readers.read_compact_string_as_bytes),
("records", True, True, readers.read_compact_string_as_bytes_nullable),
("records", False, False, readers.read_legacy_bytes),
("records", False, True, readers.read_nullable_legacy_bytes),
("uuid", False, True, readers.read_uuid),
("uuid", True, True, readers.read_uuid),
Expand Down Expand Up @@ -122,8 +124,6 @@ def test_can_match_kafka_type_with_reader(
("timedelta_i32", False, True),
("timedelta_i64", True, True),
("timedelta_i64", False, True),
("records", True, False),
("records", False, False),
),
)
def test_raises_not_implemented_error_for_invalid_combination(
Expand Down
8 changes: 4 additions & 4 deletions tests/serial/test_serialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,10 @@ class TestGetWriter:
("bytes", False, True, writers.write_nullable_legacy_bytes),
("bytes", True, False, writers.write_compact_string),
("bytes", False, False, writers.write_legacy_bytes),
("records", True, True, writers.write_nullable_legacy_string),
("records", False, True, writers.write_nullable_legacy_string),
("records", True, True, writers.write_nullable_compact_string),
("records", False, True, writers.write_nullable_legacy_bytes),
("records", True, False, writers.write_compact_string),
("records", False, False, writers.write_legacy_bytes),
("uuid", False, False, writers.write_uuid),
("uuid", True, False, writers.write_uuid),
("uuid", False, True, writers.write_uuid),
Expand Down Expand Up @@ -116,8 +118,6 @@ def test_can_match_kafka_type_with_writer(
("float64", False, True),
("bool", False, True),
("bool", True, True),
("records", True, False),
("records", False, False),
("error_code", True, True),
("error_code", False, True),
("timedelta_i32", True, True),
Expand Down
Loading
Loading