From 69f724b938be45887450b6d6f0226bbacb757f73 Mon Sep 17 00:00:00 2001 From: Adam Thornton Date: Mon, 30 Sep 2024 02:49:15 -0700 Subject: [PATCH] fix(parsers.avro): Add mutex to cache access (#15921) --- plugins/parsers/avro/schema_registry.go | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/plugins/parsers/avro/schema_registry.go b/plugins/parsers/avro/schema_registry.go index feda146c41cb4..bbb6596169b2d 100644 --- a/plugins/parsers/avro/schema_registry.go +++ b/plugins/parsers/avro/schema_registry.go @@ -9,6 +9,7 @@ import ( "net/http" "net/url" "os" + "sync" "time" "github.com/linkedin/goavro/v2" @@ -25,6 +26,7 @@ type schemaRegistry struct { password string cache map[int]*schemaAndCodec client *http.Client + mu sync.RWMutex } const schemaByID = "%s/schemas/ids/%d" @@ -73,10 +75,22 @@ func newSchemaRegistry(addr, caCertPath string) (*schemaRegistry, error) { return registry, nil } -func (sr *schemaRegistry) getSchemaAndCodec(id int) (*schemaAndCodec, error) { +// Helper function to make managing lock easier +func (sr *schemaRegistry) getSchemaAndCodecFromCache(id int) (*schemaAndCodec, error) { + // Read-lock the cache map before access. + sr.mu.RLock() + defer sr.mu.RUnlock() if v, ok := sr.cache[id]; ok { return v, nil } + return nil, fmt.Errorf("schema %d not in cache", id) +} + +func (sr *schemaRegistry) getSchemaAndCodec(id int) (*schemaAndCodec, error) { + v, err := sr.getSchemaAndCodecFromCache(id) + if err == nil { + return v, nil + } req, err := http.NewRequest(http.MethodGet, fmt.Sprintf(schemaByID, sr.url, id), nil) if err != nil { @@ -112,6 +126,9 @@ func (sr *schemaRegistry) getSchemaAndCodec(id int) (*schemaAndCodec, error) { return nil, err } retval := &schemaAndCodec{Schema: schemaValue, Codec: codec} + // Lock the cache map before update. + sr.mu.Lock() + defer sr.mu.Unlock() sr.cache[id] = retval return retval, nil }