Skip to content

Commit

Permalink
[CAPPL-40] Add Custom Compute Capability
Browse files Browse the repository at this point in the history
- Pass through binary + config through the workflow engine
  • Loading branch information
cedric-cordenier committed Sep 24, 2024
1 parent 0e32c07 commit fe7219a
Show file tree
Hide file tree
Showing 20 changed files with 773 additions and 45 deletions.
124 changes: 124 additions & 0 deletions core/capabilities/compute/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package compute

import (
"fmt"
"sync"
"time"

"github.com/jonboulle/clockwork"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/host"
)

var (
moduleCacheHit = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "compute_module_cache_hit",
Help: "hit vs non-hits of the module cache for custom compute",
}, []string{"hit"})
moduleCacheEviction = promauto.NewCounter(prometheus.CounterOpts{
Name: "compute_module_cache_eviction",
Help: "evictions from the module cache",
})
moduleCacheAddition = promauto.NewCounter(prometheus.CounterOpts{
Name: "compute_module_cache_addition",
Help: "additions to the module cache",
})
)

type moduleCache struct {
m map[string]*module
mu sync.RWMutex

wg sync.WaitGroup
stopChan services.StopChan

tickInterval time.Duration
timeout time.Duration

clock clockwork.Clock
onReaper chan struct{}
}

func newModuleCache(clock clockwork.Clock, tick, timeout time.Duration) *moduleCache {
return &moduleCache{
m: map[string]*module{},
tickInterval: tick,
timeout: timeout,
clock: clock,
stopChan: make(chan struct{}),
}
}

func (mc *moduleCache) start() {
mc.wg.Add(1)
go func() {
defer mc.wg.Done()
mc.reapLoop()
}()
}

func (mc *moduleCache) close() {
close(mc.stopChan)
mc.wg.Wait()
}

func (mc *moduleCache) reapLoop() {
ticker := mc.clock.NewTicker(mc.tickInterval)
for {
select {
case <-ticker.Chan():
mc.evictOlderThan(mc.timeout)
if mc.onReaper != nil {
mc.onReaper <- struct{}{}
}
case <-mc.stopChan:
return
}
}
}

func (mc *moduleCache) add(id string, mod *module) {
mc.mu.Lock()
defer mc.mu.Unlock()
mod.lastFetchedAt = time.Now()
mc.m[id] = mod
moduleCacheAddition.Inc()
}

func (mc *moduleCache) get(id string) (*module, error) {
mc.mu.Lock()
defer mc.mu.Unlock()
gotModule, ok := mc.m[id]
if !ok {
moduleCacheHit.WithLabelValues("false").Inc()
return nil, fmt.Errorf("could not find module for id %s", id)
}

moduleCacheHit.WithLabelValues("true").Inc()
gotModule.lastFetchedAt = mc.clock.Now()
return gotModule, nil
}

func (mc *moduleCache) evictOlderThan(duration time.Duration) {
mc.mu.Lock()
defer mc.mu.Unlock()

evicted := 0
for id, m := range mc.m {
if mc.clock.Now().Sub(m.lastFetchedAt) > duration {
delete(mc.m, id)
m.module.Close()
evicted++
}
}

moduleCacheEviction.Add(float64(evicted))
}

type module struct {
module *host.Module
lastFetchedAt time.Time
}
55 changes: 55 additions & 0 deletions core/capabilities/compute/cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package compute

import (
"testing"
"time"

"github.com/google/uuid"
"github.com/jonboulle/clockwork"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/host"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/wasmtest"

Check failure on line 13 in core/capabilities/compute/cache_test.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_tests)

no required module provides package github.com/smartcontractkit/chainlink/v2/core/internal/testutils/wasmtest; to add it:

Check failure on line 13 in core/capabilities/compute/cache_test.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_tests)

no required module provides package github.com/smartcontractkit/chainlink/v2/core/internal/testutils/wasmtest; to add it:

Check failure on line 13 in core/capabilities/compute/cache_test.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_race_tests)

no required module provides package github.com/smartcontractkit/chainlink/v2/core/internal/testutils/wasmtest; to add it:

Check failure on line 13 in core/capabilities/compute/cache_test.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_race_tests)

no required module provides package github.com/smartcontractkit/chainlink/v2/core/internal/testutils/wasmtest; to add it:
"github.com/smartcontractkit/chainlink/v2/core/logger"
)

const (
binaryLocation = "test/simple/cmd/testmodule.wasm"
binaryCmd = "core/capabilities/compute/test/simple/cmd"
)

func TestCache(t *testing.T) {
clock := clockwork.NewFakeClock()
tick := 1 * time.Second
timeout := 1 * time.Second

cache := newModuleCache(clock, tick, timeout)
cache.onReaper = make(chan struct{}, 1)
cache.start()

var binary []byte
binary = wasmtest.CreateTestBinary(binaryCmd, binaryLocation, false, t)
hmod, err := host.NewModule(&host.ModuleConfig{
Logger: logger.TestLogger(t),
IsUncompressed: true,
}, binary)
require.NoError(t, err)

id := uuid.New().String()
mod := &module{
module: hmod,
}
cache.add(id, mod)

got, err := cache.get(id)
require.NoError(t, err)

assert.Equal(t, got, mod)

clock.Advance(15 * time.Second)
<-cache.onReaper
m, err := cache.get(id)
assert.ErrorContains(t, err, "could not find module", m)
defer cache.close()
}
202 changes: 202 additions & 0 deletions core/capabilities/compute/compute.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
package compute

import (
"context"
"crypto/sha256"
"errors"
"fmt"
"time"

"github.com/google/uuid"
"github.com/jonboulle/clockwork"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
capabilitiespb "github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
coretypes "github.com/smartcontractkit/chainlink-common/pkg/types/core"
"github.com/smartcontractkit/chainlink-common/pkg/values"
"github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/host"
wasmpb "github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/pb"
)

const (
CapabilityIDCompute = "custom_compute@1.0.0"

binaryKey = "binary"
configKey = "config"
)

var (
computeWASMInit = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "compute_wasm_module_init",
Help: "how long it takes to initialize a WASM module",
Buckets: []float64{
float64(50 * time.Millisecond),
float64(100 * time.Millisecond),
float64(200 * time.Millisecond),
float64(500 * time.Millisecond),
float64(1 * time.Second),
float64(2 * time.Second),
float64(4 * time.Second),
float64(8 * time.Second),
},
}, []string{"workflowID", "stepRef"})
computeWASMExec = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "compute_wasm_module_exec",
Help: "how long it takes to execute a request from a WASM module",
Buckets: []float64{
float64(50 * time.Millisecond),
float64(100 * time.Millisecond),
float64(200 * time.Millisecond),
float64(500 * time.Millisecond),
float64(1 * time.Second),
float64(2 * time.Second),
float64(4 * time.Second),
float64(8 * time.Second),
},
}, []string{"workflowID", "stepRef"})
)

var _ capabilities.ActionCapability = (*Compute)(nil)

type Compute struct {
log logger.Logger
registry coretypes.CapabilitiesRegistry
modules *moduleCache
}

func (c *Compute) RegisterToWorkflow(ctx context.Context, request capabilities.RegisterToWorkflowRequest) error {
return nil
}

func (c *Compute) UnregisterFromWorkflow(ctx context.Context, request capabilities.UnregisterFromWorkflowRequest) error {
return nil
}

func generateID(binary []byte) string {
id := sha256.Sum256(binary)
return fmt.Sprintf("%x", id)
}

func (c *Compute) Execute(ctx context.Context, request capabilities.CapabilityRequest) (capabilities.CapabilityResponse, error) {
binary, err := c.popBytesValue(request.Config, binaryKey)
if err != nil {
return capabilities.CapabilityResponse{}, fmt.Errorf("invalid request: %w", err)
}

config, err := c.popBytesValue(request.Config, configKey)
if err != nil {
return capabilities.CapabilityResponse{}, fmt.Errorf("invalid request: %w", err)
}

id := generateID(binary)

m, err := c.modules.get(id)
if err != nil {
mod, err := c.initModule(id, binary, request.Metadata.WorkflowID, request.Metadata.ReferenceID)
if err != nil {
return capabilities.CapabilityResponse{}, err
}

m = mod
}

return c.executeWithModule(m.module, config, request)
}

func (c *Compute) initModule(id string, binary []byte, workflowID, referenceID string) (*module, error) {
initStart := time.Now()
mod, err := host.NewModule(&host.ModuleConfig{Logger: c.log}, binary)
if err != nil {
return nil, fmt.Errorf("failed to instantiate WASM module: %w", err)
}

mod.Start()

initDuration := time.Now().Sub(initStart)
computeWASMInit.WithLabelValues(workflowID, referenceID).Observe(float64(initDuration))

m := &module{module: mod}
c.modules.add(id, m)
return m, nil
}

func (c *Compute) popBytesValue(m *values.Map, key string) ([]byte, error) {
v, ok := m.Underlying[key]
if !ok {
return nil, fmt.Errorf("could not find %s in map", key)
}

vb, ok := v.(*values.Bytes)
if !ok {
return nil, fmt.Errorf("value is not bytes: %s", key)
}

delete(m.Underlying, key)
return vb.Underlying, nil
}

func (c *Compute) executeWithModule(module *host.Module, config []byte, req capabilities.CapabilityRequest) (capabilities.CapabilityResponse, error) {
executeStart := time.Now()
capReq := capabilitiespb.CapabilityRequestToProto(req)

wasmReq := &wasmpb.Request{
Id: uuid.New().String(),
Config: config,
Message: &wasmpb.Request_ComputeRequest{
ComputeRequest: &wasmpb.ComputeRequest{
Request: capReq,
},
},
}
resp, err := module.Run(wasmReq)
if err != nil {
return capabilities.CapabilityResponse{}, err
}

cresppb := resp.GetComputeResponse().GetResponse()
if cresppb == nil {
return capabilities.CapabilityResponse{}, errors.New("got nil compute response")
}

cresp, err := capabilitiespb.CapabilityResponseFromProto(cresppb)
if err != nil {
return capabilities.CapabilityResponse{}, errors.New("could not convert response proto into response")
}

computeWASMExec.WithLabelValues(
req.Metadata.WorkflowID,
req.Metadata.ReferenceID,
).Observe(float64(time.Now().Sub(executeStart)))

return cresp, nil
}

func (c *Compute) Info(ctx context.Context) (capabilities.CapabilityInfo, error) {
return capabilities.NewCapabilityInfo(
CapabilityIDCompute,
capabilities.CapabilityTypeAction,
"WASM custom compute capability",
)
}

func (c *Compute) Start(ctx context.Context) error {
c.modules.start()
return c.registry.Add(ctx, c)
}

func (c *Compute) Close() error {
c.modules.close()
return nil
}

func NewAction(log logger.Logger, registry coretypes.CapabilitiesRegistry) *Compute {
compute := &Compute{
log: log,
registry: registry,
modules: newModuleCache(clockwork.NewRealClock(), 1*time.Minute, 10*time.Minute),
}
return compute
}
Loading

0 comments on commit fe7219a

Please sign in to comment.