Skip to content

Commit

Permalink
fix(parsers.avro): Add mutex to cache access (#15921)
Browse files Browse the repository at this point in the history
  • Loading branch information
athornton authored and jiangxianfu committed Sep 30, 2024
1 parent 8a079dc commit 69f724b
Showing 1 changed file with 18 additions and 1 deletion.
19 changes: 18 additions & 1 deletion plugins/parsers/avro/schema_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"net/http"
"net/url"
"os"
"sync"
"time"

"github.com/linkedin/goavro/v2"
Expand All @@ -25,6 +26,7 @@ type schemaRegistry struct {
password string
cache map[int]*schemaAndCodec
client *http.Client
mu sync.RWMutex
}

const schemaByID = "%s/schemas/ids/%d"
Expand Down Expand Up @@ -73,10 +75,22 @@ func newSchemaRegistry(addr, caCertPath string) (*schemaRegistry, error) {
return registry, nil
}

func (sr *schemaRegistry) getSchemaAndCodec(id int) (*schemaAndCodec, error) {
// Helper function to make managing lock easier
func (sr *schemaRegistry) getSchemaAndCodecFromCache(id int) (*schemaAndCodec, error) {
// Read-lock the cache map before access.
sr.mu.RLock()
defer sr.mu.RUnlock()
if v, ok := sr.cache[id]; ok {
return v, nil
}
return nil, fmt.Errorf("schema %d not in cache", id)
}

func (sr *schemaRegistry) getSchemaAndCodec(id int) (*schemaAndCodec, error) {
v, err := sr.getSchemaAndCodecFromCache(id)
if err == nil {
return v, nil
}

req, err := http.NewRequest(http.MethodGet, fmt.Sprintf(schemaByID, sr.url, id), nil)
if err != nil {
Expand Down Expand Up @@ -112,6 +126,9 @@ func (sr *schemaRegistry) getSchemaAndCodec(id int) (*schemaAndCodec, error) {
return nil, err
}
retval := &schemaAndCodec{Schema: schemaValue, Codec: codec}
// Lock the cache map before update.
sr.mu.Lock()
defer sr.mu.Unlock()
sr.cache[id] = retval
return retval, nil
}

0 comments on commit 69f724b

Please sign in to comment.