Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: Build schema on upstream version 3.7.0 #165

Merged
merged 5 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion codegen/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@

# Note: the kafka-clients dependency of the Java tester also needs updating when
# this is bumped (in java_tester/build.gradle).
build_tag: Final = "3.6.0"
build_tag: Final = "3.7.0"
2 changes: 1 addition & 1 deletion codegen/generate_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ def format_array_field_call(
field_kwargs = {}
if metadata:
field_kwargs["metadata"] = repr(metadata)
if tag is not None and field.ignorable:
if tag is not None:
field_kwargs["default"] = "()"

if not field_kwargs:
Expand Down
1 change: 1 addition & 0 deletions codegen/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ def is_nullable_for_version(self, value: int) -> bool:
"RenewPeriodMs",
"RetentionTimeMs",
"HeartbeatIntervalMs",
"PushIntervalMs",
}
)
datetime_names: Final = frozenset(
Expand Down
2 changes: 1 addition & 1 deletion java_tester/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ repositories {
}

dependencies {
implementation 'org.apache.kafka:kafka-clients:3.6.0'
implementation 'org.apache.kafka:kafka-clients:3.7.0'
implementation 'com.fasterxml.jackson.core:jackson-databind:2.15.2'
implementation 'org.apache.commons:commons-text:1.9'
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,12 @@ protected static String getString(JsonNode fieldValue, String fieldName) throws

protected static Uuid getUuid(JsonNode fieldValue, String fieldName) throws Exception {
String str = getString(fieldValue, fieldName);
UUID tmpUuid;
if (str == null) {
return null;
tmpUuid = new UUID(0, 0);
} else {
tmpUuid = UUID.fromString(str);
}
UUID tmpUuid = UUID.fromString(str);
return new Uuid(tmpUuid.getMostSignificantBits(), tmpUuid.getLeastSignificantBits());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;

import com.fasterxml.jackson.databind.JsonNode;

Expand Down Expand Up @@ -69,9 +70,13 @@ List<?> createList() throws Exception {
case "int32" -> Integer.class;
case "int64" -> Long.class;
case "string" -> String.class;
default -> rootMessageInfo.rootClazz.declaredClasses()
case "uuid" -> UUID.class;
default -> rootMessageInfo
.rootClazz
.declaredClasses()
.filter(c -> c.getName().endsWith("$" + elementTypeInSchema))
.findFirst().get();
.findFirst()
.get();
};

List<Object> list = new ArrayList<>();
Expand Down Expand Up @@ -100,6 +105,8 @@ private void fillCollectionFromChildren(
elementObj = getLong(elementValue, fieldName);
} else if (elementClazz.equals(String.class)) {
elementObj = getString(elementValue, fieldName);
} else if (elementClazz.equals(UUID.class)) {
elementObj = getUuid(elementValue, fieldName);
} else {
elementObj = new ObjectCreator<>(rootMessageInfo, new EntityClass<>(elementClazz), fieldSchema)
.create(elementValue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,14 @@ private static String test(String caseStr) {
return failureResponse(message);
} else {
byte[] serializedInJava = writer.buffer().array();
String serializedInJavaB64 = Base64.getEncoder().encodeToString(serializedInJava);

if (!Arrays.equals(serializedFromPython, serializedInJava)) {
String message = "Message serialized in Java is not equal to message serialized in Python\n"
+ "Input: " + caseStr + "\n"
+ "Deserialized: " + messageDeserializedFromPython + "\n"
+ "Constructed: " + constructedMessage;
+ "Constructed: " + constructedMessage + "\n"
+ "Serialized: " + serializedInJavaB64;
return failureResponse(message);
} else {
return SUCCESS_RESPONSE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ private static String kafkaesqueFieldName(String fieldName, List<String> knownFi
case "max_timestamp" -> fieldName = "max_timestamp_ms";
case "transaction_start_time" -> fieldName = "transaction_start_time_ms";
case "log_append_time" -> fieldName = "log_append_time_ms";
case "push_interval" -> fieldName = "push_interval_ms";
}

fieldName = CaseUtils.toCamelCase(fieldName, true, '_');
Expand All @@ -131,6 +132,7 @@ private static String kafkaesqueFieldName(String fieldName, List<String> knownFi
case "IssueTimestampMs" -> fieldName = "IssueTimestamp";
case "ExpiryTimestampMs" -> fieldName = "ExpiryTimestamp";
case "MaxTimestampMs" -> fieldName = "MaxTimestamp";
case "PushIntervalMs" -> fieldName = "PushInterval";
}
}

Expand Down
Empty file.
7 changes: 7 additions & 0 deletions src/kio/schema/assign_replicas_to_dirs/v0/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from .request import AssignReplicasToDirsRequest
from .response import AssignReplicasToDirsResponse

__all__ = (
"AssignReplicasToDirsRequest",
"AssignReplicasToDirsResponse",
)
65 changes: 65 additions & 0 deletions src/kio/schema/assign_replicas_to_dirs/v0/request.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
"""
Generated from ``clients/src/main/resources/common/message/AssignReplicasToDirsRequest.json``.
"""

import uuid

from dataclasses import dataclass
from dataclasses import field
from typing import ClassVar

from kio.schema.request_header.v2.header import RequestHeader
from kio.schema.types import BrokerId
from kio.static.constants import EntityType
from kio.static.primitive import i16
from kio.static.primitive import i32
from kio.static.primitive import i64


@dataclass(frozen=True, slots=True, kw_only=True)
class PartitionData:
__type__: ClassVar = EntityType.nested
__version__: ClassVar[i16] = i16(0)
__flexible__: ClassVar[bool] = True
__api_key__: ClassVar[i16] = i16(73)
__header_schema__: ClassVar[type[RequestHeader]] = RequestHeader
partition_index: i32 = field(metadata={"kafka_type": "int32"})
"""The partition index"""


@dataclass(frozen=True, slots=True, kw_only=True)
class TopicData:
__type__: ClassVar = EntityType.nested
__version__: ClassVar[i16] = i16(0)
__flexible__: ClassVar[bool] = True
__api_key__: ClassVar[i16] = i16(73)
__header_schema__: ClassVar[type[RequestHeader]] = RequestHeader
topic_id: uuid.UUID | None = field(metadata={"kafka_type": "uuid"})
"""The ID of the assigned topic"""
partitions: tuple[PartitionData, ...]


@dataclass(frozen=True, slots=True, kw_only=True)
class DirectoryData:
__type__: ClassVar = EntityType.nested
__version__: ClassVar[i16] = i16(0)
__flexible__: ClassVar[bool] = True
__api_key__: ClassVar[i16] = i16(73)
__header_schema__: ClassVar[type[RequestHeader]] = RequestHeader
id_: uuid.UUID | None = field(metadata={"kafka_type": "uuid"})
"""The ID of the directory"""
topics: tuple[TopicData, ...]


@dataclass(frozen=True, slots=True, kw_only=True)
class AssignReplicasToDirsRequest:
__type__: ClassVar = EntityType.request
__version__: ClassVar[i16] = i16(0)
__flexible__: ClassVar[bool] = True
__api_key__: ClassVar[i16] = i16(73)
__header_schema__: ClassVar[type[RequestHeader]] = RequestHeader
broker_id: BrokerId = field(metadata={"kafka_type": "int32"})
"""The ID of the requesting broker"""
broker_epoch: i64 = field(metadata={"kafka_type": "int64"}, default=i64(-1))
"""The epoch of the requesting broker"""
directories: tuple[DirectoryData, ...]
67 changes: 67 additions & 0 deletions src/kio/schema/assign_replicas_to_dirs/v0/response.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
"""
Generated from ``clients/src/main/resources/common/message/AssignReplicasToDirsResponse.json``.
"""

import uuid

from dataclasses import dataclass
from dataclasses import field
from typing import ClassVar

from kio.schema.errors import ErrorCode
from kio.schema.response_header.v1.header import ResponseHeader
from kio.static.constants import EntityType
from kio.static.primitive import i16
from kio.static.primitive import i32
from kio.static.primitive import i32Timedelta


@dataclass(frozen=True, slots=True, kw_only=True)
class PartitionData:
__type__: ClassVar = EntityType.nested
__version__: ClassVar[i16] = i16(0)
__flexible__: ClassVar[bool] = True
__api_key__: ClassVar[i16] = i16(73)
__header_schema__: ClassVar[type[ResponseHeader]] = ResponseHeader
partition_index: i32 = field(metadata={"kafka_type": "int32"})
"""The partition index"""
error_code: ErrorCode = field(metadata={"kafka_type": "error_code"})
"""The partition level error code"""


@dataclass(frozen=True, slots=True, kw_only=True)
class TopicData:
__type__: ClassVar = EntityType.nested
__version__: ClassVar[i16] = i16(0)
__flexible__: ClassVar[bool] = True
__api_key__: ClassVar[i16] = i16(73)
__header_schema__: ClassVar[type[ResponseHeader]] = ResponseHeader
topic_id: uuid.UUID | None = field(metadata={"kafka_type": "uuid"})
"""The ID of the assigned topic"""
partitions: tuple[PartitionData, ...]


@dataclass(frozen=True, slots=True, kw_only=True)
class DirectoryData:
__type__: ClassVar = EntityType.nested
__version__: ClassVar[i16] = i16(0)
__flexible__: ClassVar[bool] = True
__api_key__: ClassVar[i16] = i16(73)
__header_schema__: ClassVar[type[ResponseHeader]] = ResponseHeader
id_: uuid.UUID | None = field(metadata={"kafka_type": "uuid"})
"""The ID of the directory"""
topics: tuple[TopicData, ...]


@dataclass(frozen=True, slots=True, kw_only=True)
class AssignReplicasToDirsResponse:
__type__: ClassVar = EntityType.response
__version__: ClassVar[i16] = i16(0)
__flexible__: ClassVar[bool] = True
__api_key__: ClassVar[i16] = i16(73)
__header_schema__: ClassVar[type[ResponseHeader]] = ResponseHeader
throttle_time: i32Timedelta = field(metadata={"kafka_type": "timedelta_i32"})
"""The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota."""
error_code: ErrorCode = field(metadata={"kafka_type": "error_code"})
"""The top level response error code"""
directories: tuple[DirectoryData, ...]
7 changes: 7 additions & 0 deletions src/kio/schema/broker_heartbeat/v1/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from .request import BrokerHeartbeatRequest
from .response import BrokerHeartbeatResponse

__all__ = (
"BrokerHeartbeatRequest",
"BrokerHeartbeatResponse",
)
38 changes: 38 additions & 0 deletions src/kio/schema/broker_heartbeat/v1/request.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
"""
Generated from ``clients/src/main/resources/common/message/BrokerHeartbeatRequest.json``.
"""

import uuid

from dataclasses import dataclass
from dataclasses import field
from typing import ClassVar

from kio.schema.request_header.v2.header import RequestHeader
from kio.schema.types import BrokerId
from kio.static.constants import EntityType
from kio.static.primitive import i16
from kio.static.primitive import i64


@dataclass(frozen=True, slots=True, kw_only=True)
class BrokerHeartbeatRequest:
__type__: ClassVar = EntityType.request
__version__: ClassVar[i16] = i16(1)
__flexible__: ClassVar[bool] = True
__api_key__: ClassVar[i16] = i16(63)
__header_schema__: ClassVar[type[RequestHeader]] = RequestHeader
broker_id: BrokerId = field(metadata={"kafka_type": "int32"})
"""The broker ID."""
broker_epoch: i64 = field(metadata={"kafka_type": "int64"}, default=i64(-1))
"""The broker epoch."""
current_metadata_offset: i64 = field(metadata={"kafka_type": "int64"})
"""The highest metadata offset which the broker has reached."""
want_fence: bool = field(metadata={"kafka_type": "bool"})
"""True if the broker wants to be fenced, false otherwise."""
want_shut_down: bool = field(metadata={"kafka_type": "bool"})
"""True if the broker wants to be shut down, false otherwise."""
offline_log_dirs: tuple[uuid.UUID | None, ...] = field(
metadata={"kafka_type": "uuid", "tag": 0}, default=()
)
"""Log directories that failed and went offline."""
32 changes: 32 additions & 0 deletions src/kio/schema/broker_heartbeat/v1/response.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""
Generated from ``clients/src/main/resources/common/message/BrokerHeartbeatResponse.json``.
"""

from dataclasses import dataclass
from dataclasses import field
from typing import ClassVar

from kio.schema.errors import ErrorCode
from kio.schema.response_header.v1.header import ResponseHeader
from kio.static.constants import EntityType
from kio.static.primitive import i16
from kio.static.primitive import i32Timedelta


@dataclass(frozen=True, slots=True, kw_only=True)
class BrokerHeartbeatResponse:
__type__: ClassVar = EntityType.response
__version__: ClassVar[i16] = i16(1)
__flexible__: ClassVar[bool] = True
__api_key__: ClassVar[i16] = i16(63)
__header_schema__: ClassVar[type[ResponseHeader]] = ResponseHeader
throttle_time: i32Timedelta = field(metadata={"kafka_type": "timedelta_i32"})
"""Duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota."""
error_code: ErrorCode = field(metadata={"kafka_type": "error_code"})
"""The error code, or 0 if there was no error."""
is_caught_up: bool = field(metadata={"kafka_type": "bool"}, default=False)
"""True if the broker has approximately caught up with the latest metadata."""
is_fenced: bool = field(metadata={"kafka_type": "bool"}, default=True)
"""True if the broker is fenced."""
should_shut_down: bool = field(metadata={"kafka_type": "bool"})
"""True if the broker should proceed with its shutdown."""
7 changes: 7 additions & 0 deletions src/kio/schema/broker_registration/v2/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from .request import BrokerRegistrationRequest
from .response import BrokerRegistrationResponse

__all__ = (
"BrokerRegistrationRequest",
"BrokerRegistrationResponse",
)
Loading