Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: fetch schema from schema-registry by schema string #308

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions api-docs/docs/classes/SchemaRegistry.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,10 @@ Deserializes the given data and schema into its original form.

▸ **getSchema**(`schema`): [`Schema`](../interfaces/Schema.md)

**`method`**
Get a schema from Schema Registry by version and subject.
**`method`** Get a schema from Schema Registry
Copy link
Owner

@mostafa mostafa Oct 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not change this manually. See my comment on the api-docs/index.d.ts file above.

* if only `schema.subject` is set: returns the latest schema for the given subject
* if `schema.subject` and `schema.schema` is set: returns the schema for the given schema string
* if `schema.subject` and `schema.version` is set: returns the schema for the given version

#### Parameters

Expand Down
3 changes: 2 additions & 1 deletion api-docs/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,8 @@ export class SchemaRegistry {
constructor(schemaRegistryConfig: SchemaRegistryConfig);
/**
* @method
* Get a schema from Schema Registry by version and subject.
* Get latest schema from Schema Registry by subject.
Copy link
Owner

@mostafa mostafa Oct 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You only need to update the api-docs/index.d.ts file and then run these commands to generate the docs in the same directory, considering that you haven't installed npm deps:

npm i
npm run validate-d-ts
npm run generate-docs
npm run prettify

* Alternatively a specific schema version can be fetched by either specifing schema.version of schema.schema
* @param {Schema} schema - Schema configuration.
* @returns {Schema} - Schema.
*/
Expand Down
30 changes: 21 additions & 9 deletions schema_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,25 +274,37 @@ func (k *Kafka) schemaRegistryClient(config *SchemaRegistryConfig) *srclient.Sch
return srClient
}

// getSchema returns the schema for the given subject and schema ID and version.
// getSchema returns either the latest schema for the given subject or a specific version (if given) or the schema for the given schema string (if given)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// getSchema returns either the latest schema for the given subject or a specific version (if given) or the schema for the given schema string (if given)
// getSchema returns the latest schema for the specified subject, a specific version if provided, or the schema matching the given schema string.

func (k *Kafka) getSchema(client *srclient.SchemaRegistryClient, schema *Schema) *Schema {
// If EnableCache is set, check if the schema is in the cache.
if schema.EnableCaching {
if schema, ok := k.schemaCache[schema.Subject]; ok {
return schema
if cachedSchema, ok := k.schemaCache[schema.Subject]; ok {
// the cache should contain the latest version of a schema for the given subject
// we must not return the cached schema if it does not match the requested version or schema string
if (schema.Version == 0 && schema.Schema != "") || schema.Version == cachedSchema.Version || schema.Schema == cachedSchema.Schema {
return cachedSchema;
}
}
}

runtime := k.vu.Runtime()
// The client always caches the schema.
var schemaInfo *srclient.Schema
var err error
// Default version of the schema is the latest version.
if schema.Version == 0 {
var isLatestSchema = false;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
var isLatestSchema = false;
var updateCache = false;

if schema.Schema != "" { // fetch schema for given schema string
var schemaType srclient.SchemaType
if schema.SchemaType != nil {
schemaType = *schema.SchemaType
} else {
schemaType = srclient.Avro
}
schemaInfo, err = client.LookupSchema(schema.Subject, schema.Schema, schemaType, schema.References...)
} else if schema.Version == 0 { // fetch schema by version
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
} else if schema.Version == 0 { // fetch schema by version
} else if schema.Version == 0 { // fetch latest schema for the given subject

schemaInfo, err = client.GetLatestSchema(schema.Subject)
} else {
schemaInfo, err = client.GetSchemaByVersion(
schema.Subject, schema.Version)
} else { // fetch latest schema for given subject
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
} else { // fetch latest schema for given subject
} else { // fetch schema by version

schemaInfo, err = client.GetSchemaByVersion(schema.Subject, schema.Version)
isLatestSchema = true;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
isLatestSchema = true;
updateCache = true;

}

if err == nil {
Expand All @@ -306,7 +318,7 @@ func (k *Kafka) getSchema(client *srclient.SchemaRegistryClient, schema *Schema)
Subject: schema.Subject,
}
// If the Cache is set, cache the schema.
if wrappedSchema.EnableCaching {
if wrappedSchema.EnableCaching && isLatestSchema {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if wrappedSchema.EnableCaching && isLatestSchema {
if wrappedSchema.EnableCaching && updateCache {

k.schemaCache[wrappedSchema.Subject] = wrappedSchema
}
return wrappedSchema
Expand Down
64 changes: 55 additions & 9 deletions schema_registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,8 @@ func TestGetSubjectNameCanUseRecordNameStrategyWithNamespace(t *testing.T) {
// TestSchemaRegistryClientClass tests the schema registry client class.
func TestSchemaRegistryClientClass(t *testing.T) {
test := getTestModuleInstance(t)
avroSchema := `{"type":"record","name":"Schema","namespace":"com.example.person","fields":[{"name":"field","type":"string"}]}`
avroSchema1 := `{"type":"record","name":"Schema","namespace":"com.example.person","fields":[{"name":"field","type":"string"}]}`
avroSchema2 := `{"type":"record","name":"Schema","namespace":"com.example.person","fields":[{"name":"field","type":"string"}, {"name":"field2","type":"int"}]}`
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tested this locally and the space causes the tests to fail.

Suggested change
avroSchema2 := `{"type":"record","name":"Schema","namespace":"com.example.person","fields":[{"name":"field","type":"string"}, {"name":"field2","type":"int"}]}`
avroSchema2 := `{"type":"record","name":"Schema","namespace":"com.example.person","fields":[{"name":"field","type":"string"},{"name":"field2","type":"int"}]}`


require.NoError(t, test.moveToVUCode())
assert.NotPanics(t, func() {
Expand All @@ -287,21 +288,36 @@ func TestSchemaRegistryClientClass(t *testing.T) {
})
assert.NotNil(t, client)

// Create a schema and send it to the registry.
// Create first schema and send it to the registry.
createSchema := client.Get("createSchema").Export().(func(sobek.FunctionCall) sobek.Value)
newSchema := createSchema(sobek.FunctionCall{
newSchema1 := createSchema(sobek.FunctionCall{
Arguments: []sobek.Value{
test.module.vu.Runtime().ToValue(
map[string]interface{}{
"subject": "test-subject",
"schema": avroSchema,
"schema": avroSchema1,
"schemaType": srclient.Avro,
},
),
},
}).Export().(*Schema)
assert.Equal(t, "test-subject", newSchema.Subject)
assert.Equal(t, 0, newSchema.Version)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fyi: the first version of a schema should have the version 1 (not 0)

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we use the client.CreateSchema function in our code, it internally calls the client.GetSchema function from the srclient library. This function interacts with the Schema Registry API endpoint GET /schemas/ids/{id}. According to the API documentation, this endpoint returns the schema associated with a specific ID but does not include the version information in its response. As a result, the version defaults to 0 when you retrieve the schema using these functions.

To obtain the version information of the schema, you should use the getSchema wrapper function like below. This function calls a different endpoint—such as GET /subjects/{subject}/versions/{version} or GET /subjects/{subject}/versions/latest—which provides both the schema and its version information. By using getSchema, you ensure that you retrieve all the necessary details about the schema, including its version.

(We can add this as a comment here)

assert.Equal(t, "test-subject", newSchema1.Subject)
assert.Equal(t, 1, newSchema1.Version)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
assert.Equal(t, 1, newSchema1.Version)
assert.Equal(t, 0, newSchema1.Version)


// Create second schema and send it to the registry.
newSchema2 := createSchema(sobek.FunctionCall{
Arguments: []sobek.Value{
test.module.vu.Runtime().ToValue(
map[string]interface{}{
"subject": "test-subject",
"schema": avroSchema2,
"schemaType": srclient.Avro,
},
),
},
}).Export().(*Schema)
assert.Equal(t, "test-subject", newSchema2.Subject)
assert.Equal(t, 2, newSchema2.Version)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
assert.Equal(t, 2, newSchema2.Version)
assert.Equal(t, 0, newSchema2.Version)


// Get the latest version of the schema from the registry.
getSchema := client.Get("getSchema").Export().(func(sobek.FunctionCall) sobek.Value)
Expand All @@ -316,16 +332,46 @@ func TestSchemaRegistryClientClass(t *testing.T) {
},
}).Export().(*Schema)
assert.Equal(t, "test-subject", currentSchema.Subject)
assert.Equal(t, 1, currentSchema.Version)
assert.Equal(t, avroSchema, currentSchema.Schema)
assert.Equal(t, 2, currentSchema.Version)
assert.Equal(t, avroSchema2, currentSchema.Schema)

// get schema by schema string
schemaByString := getSchema(sobek.FunctionCall{
Arguments: []sobek.Value{
test.module.vu.Runtime().ToValue(
map[string]interface{}{
"subject": "test-subject",
"schema": avroSchema1,
},
),
},
}).Export().(*Schema)
assert.Equal(t, "test-subject", schemaByString.Subject)
assert.Equal(t, 1, schemaByString.Version)
assert.Equal(t, avroSchema1, schemaByString.Schema)

// get schema by version
schemaByVersion := getSchema(sobek.FunctionCall{
Arguments: []sobek.Value{
test.module.vu.Runtime().ToValue(
map[string]interface{}{
"subject": "test-subject",
"version": 1,
},
),
},
}).Export().(*Schema)
assert.Equal(t, "test-subject", schemaByVersion.Subject)
assert.Equal(t, 1, schemaByVersion.Version)
assert.Equal(t, avroSchema1, schemaByVersion.Schema)

// Get the subject name based on the given subject name config.
getSubjectName := client.Get("getSubjectName").Export().(func(sobek.FunctionCall) sobek.Value)
subjectName := getSubjectName(sobek.FunctionCall{
Arguments: []sobek.Value{
test.module.vu.Runtime().ToValue(
map[string]interface{}{
"schema": avroSchema,
"schema": avroSchema1,
"topic": "test-topic",
"subjectNameStrategy": TopicRecordNameStrategy,
"element": Value,
Expand Down
6 changes: 6 additions & 0 deletions scripts/test_avro_with_schema_registry.js
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ const valueSchemaObject = schemaRegistry.createSchema({
schemaType: SCHEMA_TYPE_AVRO,
});

// if you want use a schema which has already been created you can fetch it by the schema string
const valueSchemaObjectExisting = schemaRegistry.get({
subject: valueSubjectName,
schema: valueSchema,
});

export default function () {
for (let index = 0; index < 100; index++) {
let messages = [
Expand Down