-
Notifications
You must be signed in to change notification settings - Fork 69
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: fetch schema from schema-registry by schema string #308
base: main
Are you sure you want to change the base?
Conversation
rroesch1
commented
Oct 9, 2024
- enable users to lookup the schema id for a given schema string
Hey @rroesch1, Thanks for your contribution! Can you please add a JS script to the examples to test all these cases and add some Go tests? |
8ffee37
to
3c65d03
Compare
"schemaType": srclient.Avro, | ||
}, | ||
), | ||
}, | ||
}).Export().(*Schema) | ||
assert.Equal(t, "test-subject", newSchema.Subject) | ||
assert.Equal(t, 0, newSchema.Version) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fyi: the first version of a schema should have the version 1
(not 0
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we use the client.CreateSchema
function in our code, it internally calls the client.GetSchema
function from the srclient
library. This function interacts with the Schema Registry API endpoint GET /schemas/ids/{id}
. According to the API documentation, this endpoint returns the schema associated with a specific ID but does not include the version information in its response. As a result, the version defaults to 0
when you retrieve the schema using these functions.
To obtain the version information of the schema, you should use the getSchema
wrapper function like below. This function calls a different endpoint—such as GET /subjects/{subject}/versions/{version}
or GET /subjects/{subject}/versions/latest
—which provides both the schema and its version information. By using getSchema
, you ensure that you retrieve all the necessary details about the schema, including its version.
(We can add this as a comment here)
* enable users to lookup the schema id for a given schema string
3c65d03
to
f9dd1b9
Compare
Quality Gate passedIssues Measures |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rroesch1 Thank a lot for your contribution! 🙏
Please address the comments.
@@ -488,7 +488,8 @@ export class SchemaRegistry { | |||
constructor(schemaRegistryConfig: SchemaRegistryConfig); | |||
/** | |||
* @method | |||
* Get a schema from Schema Registry by version and subject. | |||
* Get latest schema from Schema Registry by subject. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You only need to update the api-docs/index.d.ts
file and then run these commands to generate the docs
in the same directory, considering that you haven't installed npm deps:
npm i
npm run validate-d-ts
npm run generate-docs
npm run prettify
@@ -137,8 +137,10 @@ Deserializes the given data and schema into its original form. | |||
|
|||
▸ **getSchema**(`schema`): [`Schema`](../interfaces/Schema.md) | |||
|
|||
**`method`** | |||
Get a schema from Schema Registry by version and subject. | |||
**`method`** Get a schema from Schema Registry |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do not change this manually. See my comment on the api-docs/index.d.ts
file above.
} | ||
} | ||
|
||
runtime := k.vu.Runtime() | ||
// The client always caches the schema. | ||
var schemaInfo *srclient.Schema | ||
var err error | ||
// Default version of the schema is the latest version. | ||
if schema.Version == 0 { | ||
var isLatestSchema = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
var isLatestSchema = false; | |
var updateCache = false; |
schema.Subject, schema.Version) | ||
} else { // fetch latest schema for given subject | ||
schemaInfo, err = client.GetSchemaByVersion(schema.Subject, schema.Version) | ||
isLatestSchema = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isLatestSchema = true; | |
updateCache = true; |
@@ -306,7 +318,7 @@ func (k *Kafka) getSchema(client *srclient.SchemaRegistryClient, schema *Schema) | |||
Subject: schema.Subject, | |||
} | |||
// If the Cache is set, cache the schema. | |||
if wrappedSchema.EnableCaching { | |||
if wrappedSchema.EnableCaching && isLatestSchema { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if wrappedSchema.EnableCaching && isLatestSchema { | |
if wrappedSchema.EnableCaching && updateCache { |
} else { | ||
schemaInfo, err = client.GetSchemaByVersion( | ||
schema.Subject, schema.Version) | ||
} else { // fetch latest schema for given subject |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
} else { // fetch latest schema for given subject | |
} else { // fetch schema by version |
"schemaType": srclient.Avro, | ||
}, | ||
), | ||
}, | ||
}).Export().(*Schema) | ||
assert.Equal(t, "test-subject", newSchema.Subject) | ||
assert.Equal(t, 0, newSchema.Version) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we use the client.CreateSchema
function in our code, it internally calls the client.GetSchema
function from the srclient
library. This function interacts with the Schema Registry API endpoint GET /schemas/ids/{id}
. According to the API documentation, this endpoint returns the schema associated with a specific ID but does not include the version information in its response. As a result, the version defaults to 0
when you retrieve the schema using these functions.
To obtain the version information of the schema, you should use the getSchema
wrapper function like below. This function calls a different endpoint—such as GET /subjects/{subject}/versions/{version}
or GET /subjects/{subject}/versions/latest
—which provides both the schema and its version information. By using getSchema
, you ensure that you retrieve all the necessary details about the schema, including its version.
(We can add this as a comment here)
assert.Equal(t, "test-subject", newSchema.Subject) | ||
assert.Equal(t, 0, newSchema.Version) | ||
assert.Equal(t, "test-subject", newSchema1.Subject) | ||
assert.Equal(t, 1, newSchema1.Version) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assert.Equal(t, 1, newSchema1.Version) | |
assert.Equal(t, 0, newSchema1.Version) |
}, | ||
}).Export().(*Schema) | ||
assert.Equal(t, "test-subject", newSchema2.Subject) | ||
assert.Equal(t, 2, newSchema2.Version) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assert.Equal(t, 2, newSchema2.Version) | |
assert.Equal(t, 0, newSchema2.Version) |
@@ -271,7 +271,8 @@ func TestGetSubjectNameCanUseRecordNameStrategyWithNamespace(t *testing.T) { | |||
// TestSchemaRegistryClientClass tests the schema registry client class. | |||
func TestSchemaRegistryClientClass(t *testing.T) { | |||
test := getTestModuleInstance(t) | |||
avroSchema := `{"type":"record","name":"Schema","namespace":"com.example.person","fields":[{"name":"field","type":"string"}]}` | |||
avroSchema1 := `{"type":"record","name":"Schema","namespace":"com.example.person","fields":[{"name":"field","type":"string"}]}` | |||
avroSchema2 := `{"type":"record","name":"Schema","namespace":"com.example.person","fields":[{"name":"field","type":"string"}, {"name":"field2","type":"int"}]}` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tested this locally and the space causes the tests to fail.
avroSchema2 := `{"type":"record","name":"Schema","namespace":"com.example.person","fields":[{"name":"field","type":"string"}, {"name":"field2","type":"int"}]}` | |
avroSchema2 := `{"type":"record","name":"Schema","namespace":"com.example.person","fields":[{"name":"field","type":"string"},{"name":"field2","type":"int"}]}` |
Hey @rroesch1, Would you update this PR with the reviewed changes? |