Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Validator Interface Update & Converter Changes #533

Merged
merged 53 commits into from
Dec 22, 2023

Conversation

ankitk-me
Copy link
Contributor

@ankitk-me ankitk-me commented Oct 25, 2023

For previous review comments refer: #499

ankitk-me and others added 21 commits September 28, 2023 11:38
* feature/m1-docker-build-support

- separate jammy and alpine
- add zilla version as env var
- add the docker platform to properties
- don't need to use alpine for build

* docker image tagging options

separate alpine base image from the default image and add more tagging options

* set the version env var in the alpine build

* remove the suffix for local build

* make version tagging more explicit for each profile

* move the alpine specific builds into the docker-image module

* reduce the folder complexity and add child pom placeholders

* revert the docker-image pom to develop

* Use buildx for multi-arch images, build alpine image for release only

* Move inline assembly to descriptor file and reference from alpine image

---------

Co-authored-by: John Fallows <john.r.fallows@gmail.com>
)

Bumps alpine from 3.18.3 to 3.18.4.

---
updated-dependencies:
- dependency-name: alpine
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
@ankitk-me ankitk-me self-assigned this Oct 25, 2023
@ankitk-me ankitk-me marked this pull request as ready for review November 6, 2023 07:38
Comment on lines 81 to 92
int schemaId;
int progress = 0;
if (data.getByte(index) == MAGIC_BYTE)
{
progress += BitUtil.SIZE_OF_BYTE;
schemaId = data.getInt(index + progress);
progress += BitUtil.SIZE_OF_INT;
}
else
{
schemaId = catalog != null ? catalog.id : 0;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's define an int constant NO_SCHEMA_ID with value 0.

Suggested change
int schemaId;
int progress = 0;
if (data.getByte(index) == MAGIC_BYTE)
{
progress += BitUtil.SIZE_OF_BYTE;
schemaId = data.getInt(index + progress);
progress += BitUtil.SIZE_OF_INT;
}
else
{
schemaId = catalog != null ? catalog.id : 0;
}
int schemaId = NO_SCHEMA_ID;
int progress = 0;
if (data.getByte(index) == MAGIC_BYTE)
{
progress += BitUtil.SIZE_OF_BYTE;
schemaId = data.getInt(index + progress);
progress += BitUtil.SIZE_OF_INT;
}
else if (catalog.id != NO_SCHEMA_ID)
{
schemaId = catalog.id;
}

Schema schema = fetchSchema(schemaId);
if (schema != null)
{
if ("json".equals(format))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's define String constant FORMAT_JSON with value "json" and use it here.

return valLength;
}

private byte[] deserializeAvroRecord(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename to deserializeRecord since we are already in the AvroReadValidator.

int offset,
int length)
{
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename outputStream to encoded.
Can we pre-create this once in the constructor and call encoded.reset() here instead?

protected final String format;
protected DatumReader<GenericRecord> reader;
protected DatumWriter<GenericRecord> writer;
protected final DirectBuffer valueRO = new UnsafeBuffer();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please move up just under statics as this is effectively a constant.

@@ -169,7 +169,7 @@ public KafkaCacheClientFetchFactory(
this.supplyDebitor = supplyDebitor;
this.supplyCache = supplyCache;
this.supplyCacheRoute = supplyCacheRoute;
this.cursorFactory = new KafkaCacheCursorFactory(context.writeBuffer());
this.cursorFactory = new KafkaCacheCursorFactory(context.writeBuffer().capacity());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please confirm this change is still needed after we added convertedFile in cache segment.

{
this.writeBuffer = writeBuffer;
this.writeBuffer = new UnsafeBuffer(ByteBuffer.allocate(writeCapacity));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please confirm this change is still needed after we added convertedFile in cache segment.

@@ -191,7 +192,7 @@ public KafkaCacheClientProduceFactory(
this.initialBudgetMax = bufferPool.slotCapacity();
this.localIndex = context.index();
this.cleanupDelay = config.cacheClientCleanupDelay();
this.cursorFactory = new KafkaCacheCursorFactory(context.writeBuffer());
this.cursorFactory = new KafkaCacheCursorFactory(context.writeBuffer().capacity());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please confirm this change is still needed after we added convertedFile in cache segment.

Comment on lines 477 to 478
private ValueValidator validateKey;
private FragmentValidator validateValue;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final?

@@ -168,7 +168,7 @@ public KafkaCacheServerProduceFactory(
this.supplyBinding = supplyBinding;
this.supplyCache = supplyCache;
this.supplyCacheRoute = supplyCacheRoute;
this.cursorFactory = new KafkaCacheCursorFactory(writeBuffer);
this.cursorFactory = new KafkaCacheCursorFactory(writeBuffer.capacity());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please confirm this change is still needed after we added convertedFile in cache segment.

}
}
return status;

return padding;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please move the private supporting methods to the end of the file, so that methods used by other classes are at the top.

protected GenericDatumWriter<GenericRecord> supplyWriter(
int schemaId)
{
writer = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be locally defined instead of populating a member field as a side effect.

protected GenericDatumReader<GenericRecord> supplyReader(
int schemaId)
{
reader = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be locally defined instead of populating a member field as a side effect.

Comment on lines 61 to 63
protected GenericDatumReader<GenericRecord> reader;
protected GenericDatumWriter<GenericRecord> writer;
protected GenericRecord record;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's try to remove these transient fields and use the return value from the corresponding supply methods instead.

Comment on lines 64 to 65
protected ByteArrayOutputStream encoded;
protected DirectBufferInputStream in;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should these be final?

String topic,
OctetsFW payload)
{
final ValueValidator contentValueValidator = supplyValidator.apply(topic);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just validator.

OctetsFW payload)
{
final ValueValidator contentValueValidator = supplyValidator.apply(topic);
return contentValueValidator == null ||
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we introduce a ValueValidator.NOP to return from supplyValidator so we can avoid these null checks?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we already have ValueValidator no op, null check here is redundant. removed it.

@@ -18,6 +18,11 @@
public interface CatalogHandler
{
int NO_SCHEMA_ID = 0;
String TEST = "test"; // Added for unit test & IT purpose
String SCHEMA_REGISTRY = "schema-registry";
String INLINE = "inline";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These cannot go here, caused by abstraction leak and this is a public API for engine.

int padding = 0;
if (appendPrefix)
{
padding = MAX_PADDING_LEN; // TODO: fetch this from catalog
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's defer this to the catalog handler, if present.

.partition(0, 1, 2)
.build()
.build()}
write [0x00] [0x00 0x00 0x00 0x00 0x01] ${kafka:varint(3)} "id0" ${kafka:varint(8)} "positive"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought it was 1 byte (magic) followed by int32 (schemaId) then payload, no?
This has 6-byte prefix, not 5 bytes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's true, but I have added this script to mimic the scenario when the prefix can be more than five byte. So that the padding can be dynamic.

{
GenericRecord record = supplyRecord(schemaId);
in.wrap(buffer, index, length);
GenericDatumReader<GenericRecord> reader = readers.computeIfAbsent(schemaId, this::supplyReader);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
GenericDatumReader<GenericRecord> reader = readers.computeIfAbsent(schemaId, this::supplyReader);
GenericDatumReader<GenericRecord> reader = supplyReader(schemaId);

Schema schema = supplySchema(schemaId);
GenericDatumReader<GenericRecord> reader = supplyReader(schemaId);
GenericDatumWriter<GenericRecord> writer = supplyWriter(schemaId);
GenericRecord record = supplyRecord(schemaId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the json decoder below be precreated and reused instead of creating afresh each time?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this doesn't seems possible as we don't have a method to take reuse JsonDecoder.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but I was able to optimise BinaryEncoder & reuse the same instance.

{
break;
}
}
bytesIndex += numBytes;
index += charByteCount;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are calculating index + j more than once here and also j is not descriptive.

final int charByteLimit =. index + charByteCount;
for (int charByteIndex = index + 1; charByteIndex < charByteLimit; charByteIndex++)
{
    if (charByteIndex >= limit || (data.getByte(charByteIndex) & 0b11000000) != 0b10000000)
    {
        break validate;
    }
}

I think the break above needs to break from the outer while loop, not just the for loop, so we'll need to add a validate: label to the while loop, agree?

private JsonProvider supplyProvider(
int schemaId)
{
return providers.computeIfAbsent(schemaId, id -> createProvider(supplySchema(id)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest we simplify by making createProvider take schemaId and calling supplySchema from createProvider implementation.


default int maxPadding()
{
return ZERO_PADDING;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can return 0 here instead of a constant.

{
prefixRO.putByte(0, MAGIC_BYTE);
prefixRO.putInt(1, schemaId, ByteOrder.BIG_ENDIAN);
next.accept(prefixRO, 0, 5);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
next.accept(prefixRO, 0, 5);
next.accept(prefixRO, 0, PREFIX_LENGTH);


public class TestCatalogHandler implements CatalogHandler
{
private static final int MAX_PADDING_LEN = 10;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest renaming to MAX_PADDING_LENGTH for consistency of naming.

@Override
public int enrich(
int schemaId,
ValueConsumer next)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the intention of the return value of enrich?

It seems like it is trying to say how much of the input should be ignored, while also potentially writing something to next.

So in the case of embed where we are adding the prefix bytes, the PREFIX_LENGTH would already be passed to next, so perhaps this case should return 0?

And when we are doing exclude, meaning we are stripping off the PREFIX_LENGTH bytes, we would not call next so we need to return PREFIX_LENGTH to indicate how many bytes we skipped over.

The implementation below is returning PREFIX_LENGTH for both cases, is it intentional?

{
length = ENRICHED_LENGTH;
}
return length;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to have booleans for embed and exclude or is this more a funciton of whether the validator is read vs write?

If it is a property of the validator, then perhaps we need to pass this context from the validator to the catalog handler, either via a parameter or by having 2 methods on catalog handler?

public String type()
{
return SCHEMA_REGISTRY;
this.prefixRO = new UnsafeBuffer(new byte[5]);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably have this structure generated from an internal idl instead for better readability, much like we do for the kafka cache entry descriptions and other protocol codec flyweights.

DirectBuffer data,
int payloadIndex,
int payloadLength);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use simpler names for parameters, such as index length instead of payloadIndex payloadLength.


default int maxPadding()
{
return ZERO_PADDING;
return 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's rename to encodePadding to indicate this is needed only for encode case.

Also let's rename Validator.maxPadding to Validator.padding since padding concept is already an upper bound in zilla.

SchemaConfig catalog,
String subject)
{
int schemaId = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

int schemaId = NO_SCHEMA_ID;

Comment on lines 71 to 72
SchemaConfig catalog,
String subject,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's aim to remove these parameters from decode.

int ZERO_PADDING = 0;

@FunctionalInterface
interface Read
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest renaming to Decoder, with IDENTITY constant that defaults to next.accept(data, index, length).


GenericDatumReader<GenericRecord> reader = supplyReader(schemaId);
if (reader != null)
private int validatePayload(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest renaming this to decodePayload

int length,
ValueConsumer next,
int schemaId)
{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Receive schemaId as NO_SCHEMA_ID if not found in catalog prefix, so can default here using catalog and subject if needed.

protected final String subject;
protected final String format;
protected final ByteArrayOutputStream encoded;
protected final ExpandableDirectBufferOutputStream encoded;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest renaming this to expandable as it is used for both encode and decode.

schemaId = resolve(subject, catalog.version);
}
return schemaId;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's see if we can simplify this to not require embedding magic byte and schema id in payload.

}

@Test
public void shouldVerifyEnrichedData()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

enriched -> encoded

@jfallows jfallows merged commit 0ffe279 into aklivity:feature/schema-registry Dec 22, 2023
3 checks passed
@ankitk-me ankitk-me deleted the convertor branch December 27, 2023 06:41
@jfallows jfallows linked an issue Dec 28, 2023 that may be closed by this pull request
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
6 participants