-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[CAPPL-40] Add Custom Compute Capability
- Pass through binary + config through the workflow engine
- Loading branch information
1 parent
0e32c07
commit 1c73289
Showing
21 changed files
with
773 additions
and
45 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,124 @@ | ||
package compute | ||
|
||
import ( | ||
"fmt" | ||
"sync" | ||
"time" | ||
|
||
"github.com/jonboulle/clockwork" | ||
"github.com/prometheus/client_golang/prometheus" | ||
"github.com/prometheus/client_golang/prometheus/promauto" | ||
|
||
"github.com/smartcontractkit/chainlink-common/pkg/services" | ||
"github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/host" | ||
) | ||
|
||
var ( | ||
moduleCacheHit = promauto.NewCounterVec(prometheus.CounterOpts{ | ||
Name: "compute_module_cache_hit", | ||
Help: "hit vs non-hits of the module cache for custom compute", | ||
}, []string{"hit"}) | ||
moduleCacheEviction = promauto.NewCounter(prometheus.CounterOpts{ | ||
Name: "compute_module_cache_eviction", | ||
Help: "evictions from the module cache", | ||
}) | ||
moduleCacheAddition = promauto.NewCounter(prometheus.CounterOpts{ | ||
Name: "compute_module_cache_addition", | ||
Help: "additions to the module cache", | ||
}) | ||
) | ||
|
||
type moduleCache struct { | ||
m map[string]*module | ||
mu sync.RWMutex | ||
|
||
wg sync.WaitGroup | ||
stopChan services.StopChan | ||
|
||
tickInterval time.Duration | ||
timeout time.Duration | ||
|
||
clock clockwork.Clock | ||
onReaper chan struct{} | ||
} | ||
|
||
func newModuleCache(clock clockwork.Clock, tick, timeout time.Duration) *moduleCache { | ||
return &moduleCache{ | ||
m: map[string]*module{}, | ||
tickInterval: tick, | ||
timeout: timeout, | ||
clock: clock, | ||
stopChan: make(chan struct{}), | ||
} | ||
} | ||
|
||
func (m *moduleCache) start() { | ||
m.wg.Add(1) | ||
go func() { | ||
defer m.wg.Done() | ||
m.reapLoop() | ||
}() | ||
} | ||
|
||
func (m *moduleCache) close() { | ||
close(m.stopChan) | ||
m.wg.Wait() | ||
} | ||
|
||
func (m *moduleCache) reapLoop() { | ||
ticker := m.clock.NewTicker(m.tickInterval) | ||
for { | ||
select { | ||
case <-ticker.Chan(): | ||
m.evictOlderThan(m.timeout) | ||
if m.onReaper != nil { | ||
m.onReaper <- struct{}{} | ||
} | ||
case <-m.stopChan: | ||
return | ||
} | ||
} | ||
} | ||
|
||
func (mr *moduleCache) add(id string, mod *module) { | ||
mr.mu.Lock() | ||
defer mr.mu.Unlock() | ||
mod.lastFetchedAt = time.Now() | ||
mr.m[id] = mod | ||
moduleCacheAddition.Inc() | ||
} | ||
|
||
func (mr *moduleCache) get(id string) (*module, error) { | ||
mr.mu.Lock() | ||
defer mr.mu.Unlock() | ||
gotModule, ok := mr.m[id] | ||
if !ok { | ||
moduleCacheHit.WithLabelValues("false").Inc() | ||
return nil, fmt.Errorf("could not find module for id %s", id) | ||
} | ||
|
||
moduleCacheHit.WithLabelValues("true").Inc() | ||
gotModule.lastFetchedAt = mr.clock.Now() | ||
return gotModule, nil | ||
} | ||
|
||
func (mr *moduleCache) evictOlderThan(duration time.Duration) { | ||
mr.mu.Lock() | ||
defer mr.mu.Unlock() | ||
|
||
evicted := 0 | ||
for id, m := range mr.m { | ||
if mr.clock.Now().Sub(m.lastFetchedAt) > duration { | ||
delete(mr.m, id) | ||
m.module.Close() | ||
evicted++ | ||
} | ||
} | ||
|
||
moduleCacheEviction.Add(float64(evicted)) | ||
} | ||
|
||
type module struct { | ||
module *host.Module | ||
lastFetchedAt time.Time | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
package compute | ||
|
||
import ( | ||
"testing" | ||
"time" | ||
|
||
"github.com/google/uuid" | ||
"github.com/jonboulle/clockwork" | ||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
|
||
"github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/host" | ||
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/wasmtest" | ||
Check failure on line 13 in core/capabilities/compute/cache_test.go GitHub Actions / Core Tests (go_core_tests)
Check failure on line 13 in core/capabilities/compute/cache_test.go GitHub Actions / Core Tests (go_core_tests)
Check failure on line 13 in core/capabilities/compute/cache_test.go GitHub Actions / Core Tests (go_core_race_tests)
|
||
"github.com/smartcontractkit/chainlink/v2/core/logger" | ||
) | ||
|
||
const ( | ||
binaryLocation = "test/simple/cmd/testmodule.wasm" | ||
binaryCmd = "core/capabilities/compute/test/simple/cmd" | ||
) | ||
|
||
func TestCache(t *testing.T) { | ||
clock := clockwork.NewFakeClock() | ||
tick := 1 * time.Second | ||
timeout := 1 * time.Second | ||
|
||
cache := newModuleCache(clock, tick, timeout) | ||
cache.onReaper = make(chan struct{}, 1) | ||
cache.start() | ||
|
||
var binary []byte | ||
binary = wasmtest.CreateTestBinary(binaryCmd, binaryLocation, false, t) | ||
hmod, err := host.NewModule(&host.ModuleConfig{ | ||
Logger: logger.TestLogger(t), | ||
IsUncompressed: true, | ||
}, binary) | ||
require.NoError(t, err) | ||
|
||
id := uuid.New().String() | ||
mod := &module{ | ||
module: hmod, | ||
} | ||
cache.add(id, mod) | ||
|
||
got, err := cache.get(id) | ||
require.NoError(t, err) | ||
|
||
assert.Equal(t, got, mod) | ||
|
||
clock.Advance(15 * time.Second) | ||
<-cache.onReaper | ||
m, err := cache.get(id) | ||
assert.ErrorContains(t, err, "could not find module", m) | ||
defer cache.close() | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,202 @@ | ||
package compute | ||
|
||
import ( | ||
"context" | ||
"crypto/sha256" | ||
"errors" | ||
"fmt" | ||
"time" | ||
|
||
"github.com/google/uuid" | ||
"github.com/jonboulle/clockwork" | ||
"github.com/prometheus/client_golang/prometheus" | ||
"github.com/prometheus/client_golang/prometheus/promauto" | ||
|
||
"github.com/smartcontractkit/chainlink-common/pkg/capabilities" | ||
capabilitiespb "github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb" | ||
"github.com/smartcontractkit/chainlink-common/pkg/logger" | ||
coretypes "github.com/smartcontractkit/chainlink-common/pkg/types/core" | ||
"github.com/smartcontractkit/chainlink-common/pkg/values" | ||
"github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/host" | ||
wasmpb "github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/pb" | ||
) | ||
|
||
const ( | ||
CapabilityIDCompute = "custom_compute@1.0.0" | ||
|
||
binaryKey = "binary" | ||
configKey = "config" | ||
) | ||
|
||
var ( | ||
computeWASMInit = promauto.NewHistogramVec(prometheus.HistogramOpts{ | ||
Name: "compute_wasm_module_init", | ||
Help: "how long it takes to initialize a WASM module", | ||
Buckets: []float64{ | ||
float64(50 * time.Millisecond), | ||
float64(100 * time.Millisecond), | ||
float64(200 * time.Millisecond), | ||
float64(500 * time.Millisecond), | ||
float64(1 * time.Second), | ||
float64(2 * time.Second), | ||
float64(4 * time.Second), | ||
float64(8 * time.Second), | ||
}, | ||
}, []string{"workflowID", "stepRef"}) | ||
computeWASMExec = promauto.NewHistogramVec(prometheus.HistogramOpts{ | ||
Name: "compute_wasm_module_exec", | ||
Help: "how long it takes to execute a request from a WASM module", | ||
Buckets: []float64{ | ||
float64(50 * time.Millisecond), | ||
float64(100 * time.Millisecond), | ||
float64(200 * time.Millisecond), | ||
float64(500 * time.Millisecond), | ||
float64(1 * time.Second), | ||
float64(2 * time.Second), | ||
float64(4 * time.Second), | ||
float64(8 * time.Second), | ||
}, | ||
}, []string{"workflowID", "stepRef"}) | ||
) | ||
|
||
var _ capabilities.ActionCapability = (*Compute)(nil) | ||
|
||
type Compute struct { | ||
log logger.Logger | ||
registry coretypes.CapabilitiesRegistry | ||
modules *moduleCache | ||
} | ||
|
||
func (c *Compute) RegisterToWorkflow(ctx context.Context, request capabilities.RegisterToWorkflowRequest) error { | ||
return nil | ||
} | ||
|
||
func (c *Compute) UnregisterFromWorkflow(ctx context.Context, request capabilities.UnregisterFromWorkflowRequest) error { | ||
return nil | ||
} | ||
|
||
func generateID(binary []byte) string { | ||
id := sha256.Sum256(binary) | ||
return fmt.Sprintf("%x", id) | ||
} | ||
|
||
func (c *Compute) Execute(ctx context.Context, request capabilities.CapabilityRequest) (capabilities.CapabilityResponse, error) { | ||
binary, err := c.popBytesValue(request.Config, binaryKey) | ||
if err != nil { | ||
return capabilities.CapabilityResponse{}, fmt.Errorf("invalid request: %w", err) | ||
} | ||
|
||
config, err := c.popBytesValue(request.Config, configKey) | ||
if err != nil { | ||
return capabilities.CapabilityResponse{}, fmt.Errorf("invalid request: %w", err) | ||
} | ||
|
||
id := generateID(binary) | ||
|
||
m, err := c.modules.get(id) | ||
if err != nil { | ||
mod, err := c.initModule(id, binary, request.Metadata.WorkflowID, request.Metadata.ReferenceID) | ||
if err != nil { | ||
return capabilities.CapabilityResponse{}, err | ||
} | ||
|
||
m = mod | ||
} | ||
|
||
return c.executeWithModule(m.module, config, request) | ||
} | ||
|
||
func (c *Compute) initModule(id string, binary []byte, workflowID, referenceID string) (*module, error) { | ||
initStart := time.Now() | ||
mod, err := host.NewModule(&host.ModuleConfig{Logger: c.log}, binary) | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to instantiate WASM module: %w", err) | ||
} | ||
|
||
mod.Start() | ||
|
||
initDuration := time.Now().Sub(initStart) | ||
computeWASMInit.WithLabelValues(workflowID, referenceID).Observe(float64(initDuration)) | ||
|
||
m := &module{module: mod} | ||
c.modules.add(id, m) | ||
return m, nil | ||
} | ||
|
||
func (c *Compute) popBytesValue(m *values.Map, key string) ([]byte, error) { | ||
v, ok := m.Underlying[key] | ||
if !ok { | ||
return nil, fmt.Errorf("could not find %s in map", key) | ||
} | ||
|
||
vb, ok := v.(*values.Bytes) | ||
if !ok { | ||
return nil, fmt.Errorf("value is not bytes: %s", key) | ||
} | ||
|
||
delete(m.Underlying, key) | ||
return vb.Underlying, nil | ||
} | ||
|
||
func (c *Compute) executeWithModule(module *host.Module, config []byte, req capabilities.CapabilityRequest) (capabilities.CapabilityResponse, error) { | ||
executeStart := time.Now() | ||
capReq := capabilitiespb.CapabilityRequestToProto(req) | ||
|
||
wasmReq := &wasmpb.Request{ | ||
Id: uuid.New().String(), | ||
Config: config, | ||
Message: &wasmpb.Request_ComputeRequest{ | ||
ComputeRequest: &wasmpb.ComputeRequest{ | ||
Request: capReq, | ||
}, | ||
}, | ||
} | ||
resp, err := module.Run(wasmReq) | ||
if err != nil { | ||
return capabilities.CapabilityResponse{}, err | ||
} | ||
|
||
cresppb := resp.GetComputeResponse().GetResponse() | ||
if cresppb == nil { | ||
return capabilities.CapabilityResponse{}, errors.New("got nil compute response") | ||
} | ||
|
||
cresp, err := capabilitiespb.CapabilityResponseFromProto(cresppb) | ||
if err != nil { | ||
return capabilities.CapabilityResponse{}, errors.New("could not convert response proto into response") | ||
} | ||
|
||
computeWASMExec.WithLabelValues( | ||
req.Metadata.WorkflowID, | ||
req.Metadata.ReferenceID, | ||
).Observe(float64(time.Now().Sub(executeStart))) | ||
|
||
return cresp, nil | ||
} | ||
|
||
func (c *Compute) Info(ctx context.Context) (capabilities.CapabilityInfo, error) { | ||
return capabilities.NewCapabilityInfo( | ||
CapabilityIDCompute, | ||
capabilities.CapabilityTypeAction, | ||
"WASM custom compute capability", | ||
) | ||
} | ||
|
||
func (c *Compute) Start(ctx context.Context) error { | ||
c.modules.start() | ||
return c.registry.Add(ctx, c) | ||
} | ||
|
||
func (c *Compute) Close() error { | ||
c.modules.close() | ||
return nil | ||
} | ||
|
||
func NewAction(log logger.Logger, registry coretypes.CapabilitiesRegistry) *Compute { | ||
compute := &Compute{ | ||
log: log, | ||
registry: registry, | ||
modules: newModuleCache(clockwork.NewRealClock(), 1*time.Minute, 10*time.Minute), | ||
} | ||
return compute | ||
} |
Oops, something went wrong.