Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: simplify indexer package #14

Merged
merged 2 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 17 additions & 17 deletions cmd/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ import (
"sync"

"github.com/quartz-technology/agate/indexer"
"github.com/quartz-technology/agate/indexer/data_aggregator"
"github.com/quartz-technology/agate/indexer/data_preprocessor"
"github.com/quartz-technology/agate/indexer/head_listener"
"github.com/quartz-technology/agate/indexer/storage_manager"
"github.com/quartz-technology/agate/indexer/storage_manager/store/dto"
"github.com/quartz-technology/agate/indexer/storage_manager/store/postgres"
"github.com/quartz-technology/agate/indexer/client"
"github.com/quartz-technology/agate/indexer/data"
"github.com/quartz-technology/agate/indexer/events"
"github.com/quartz-technology/agate/indexer/storage"
"github.com/quartz-technology/agate/indexer/storage/store/dto"
"github.com/quartz-technology/agate/indexer/storage/store/postgres"
"github.com/rs/zerolog/log"
"github.com/spf13/cobra"
"github.com/spf13/viper"
Expand Down Expand Up @@ -68,7 +68,7 @@ of relays. Once preprocessed, this data is stored in a database.
//nolint:wrapcheck,funlen
func run(ctx context.Context, configuration *indexer.Configuration) error {
// Performs the database migration.
migrator := storage_manager.NewDefaultDatabaseMigrator()
migrator := storage.NewDefaultDatabaseMigrator()

if err := migrator.Init(
configuration.DatabaseMigrationSourceURL,
Expand All @@ -88,22 +88,22 @@ func run(ctx context.Context, configuration *indexer.Configuration) error {
log.Info().Msg("database migrations applied!")

// Sets up beacon API client.
beaconAPIClient := head_listener.NewAgateBeaconAPIClient()
beaconAPIClient := client.NewDefaultBeaconAPI()
if err := beaconAPIClient.Init(ctx, configuration.BeaconAPIURL); err != nil {
// TODO: Wrap error.
return err
}

// Sets up head listener.
listener := head_listener.NewAgateHeadListener()
listener := events.NewDefaultHeadListener()
listener.Init(beaconAPIClient)

// Sets up relay API clients.
relayAPIClients := make([]data_aggregator.RelayAPIClient, 0)
relayAPIClients := make([]client.RelayAPI, 0)
relaysDTOs := make([]*dto.Relay, 0)

for _, relayAPIURL := range configuration.RelayAPIURLs {
relayAPIClient := data_aggregator.NewAgateRelayAPIClient(relayAPIURL)
relayAPIClient := client.NewDefaultRelayAPI(relayAPIURL)

if err := relayAPIClient.Init(); err != nil {
// TODO: Wrap error.
Expand All @@ -115,11 +115,11 @@ func run(ctx context.Context, configuration *indexer.Configuration) error {
}

// Sets up data aggregator.
aggregator := data_aggregator.NewAgateDataAggregator()
aggregator := data.NewDefaultAggregator()
aggregator.Init(relayAPIClients...)

// Creates data preprocessor.
preprocessor := data_preprocessor.NewDataPreprocessor()
preprocessor := data.NewPreprocessor()

// Sets up the database store.
store := postgres.NewDefaultStore()
Expand All @@ -129,16 +129,16 @@ func run(ctx context.Context, configuration *indexer.Configuration) error {
}

// Sets up storage manager and stores provided relays.
storage := storage_manager.NewDefaultStorageManager()
storage.Init(store)
storageManager := storage.NewDefaultManager()
storageManager.Init(store)

if err := storage.StoreRelays(ctx, relaysDTOs); err != nil {
if err := storageManager.StoreRelays(ctx, relaysDTOs); err != nil {
// TODO: Wrap error.
return err
}

// Sets up main indexer service.
service := indexer.NewIndexer(listener, aggregator, preprocessor, storage)
service := indexer.New(listener, aggregator, preprocessor, storageManager)

log.Info().Msg("indexer service starting..")
// Starts the indexer service. Blocks until context is done.
Expand Down
2 changes: 2 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ services:
- agate-net
volumes:
- ./agate-config-docker.yaml:/app/agate-config.yaml
stop_grace_period: "10m"

database:
hostname: database
Expand All @@ -34,6 +35,7 @@ services:
retries: 5
volumes:
- indexer-data:/var/lib/postgresql/data
stop_grace_period: "10m"

volumes:
indexer-data:
Expand Down
14 changes: 14 additions & 0 deletions indexer/client/beacon_api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package client

import (
"context"

client "github.com/attestantio/go-eth2-client"
)

// BeaconAPI is used to interact with a beacon node.
type BeaconAPI interface {
// SubscribeToHeadEvents is used to create a subscription to new head events and will perform
// the handler's logic.
SubscribeToHeadEvents(ctx context.Context, handler client.EventHandlerFunc) error
}
54 changes: 54 additions & 0 deletions indexer/client/default_beacon_api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package client

import (
"context"

client "github.com/attestantio/go-eth2-client"
"github.com/attestantio/go-eth2-client/http"
"github.com/rs/zerolog"
)

// DefaultBeaconAPI is the default implementation of the BeaconAPI for agate,
// using the go-eth2-client library to send queries to the beacon node.
type DefaultBeaconAPI struct {
clt *http.Service
}

// NewDefaultBeaconAPI creates a new and non-initialized DefaultBeaconAPI.
func NewDefaultBeaconAPI() *DefaultBeaconAPI {
return &DefaultBeaconAPI{
clt: nil,
}
}

// Init initializes an DefaultBeaconAPI using the beacon API URL to connect to the beacon node.
func (client *DefaultBeaconAPI) Init(ctx context.Context, beaconAPIURL string) error {
var ok bool

clt, err := http.New(
ctx,
http.WithAddress(beaconAPIURL),
http.WithLogLevel(zerolog.Disabled),
)
if err != nil {
return NewDefaultBeaconAPIServiceInitializationError(err)
}

client.clt, ok = clt.(*http.Service)
if !ok {
return ErrDefaultBeaconAPIServiceTypeAssertion
}

return nil
}

func (client *DefaultBeaconAPI) SubscribeToHeadEvents(
ctx context.Context,
handler client.EventHandlerFunc,
) error {
if err := client.clt.Events(ctx, []string{"head"}, handler); err != nil {
return NewDefaultBeaconAPIEventSubscriptionError(err)
}

return nil
}
20 changes: 20 additions & 0 deletions indexer/client/default_beacon_api_errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package client

import (
"errors"
"fmt"
)

var ErrDefaultBeaconAPIServiceTypeAssertion = errors.New("failed to type assert beacon API HTTP service")

// NewDefaultBeaconAPIServiceInitializationError is raised if the client used to connect to the beacon
// node fails to initialize.
func NewDefaultBeaconAPIServiceInitializationError(err error) error {
return fmt.Errorf("failed to initialize default beacon API HTTP service: %w", err)
}

// NewDefaultBeaconAPIEventSubscriptionError is raised if the client connected to the node fails to create an event
// subscription.
func NewDefaultBeaconAPIEventSubscriptionError(err error) error {
return fmt.Errorf("default beacon API HTTP service failed to create event subscription: %w", err)
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
package head_listener
package client

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestNewAgateBeaconAPIClient(t *testing.T) {
func TestNewDefaultBeaconAPI(t *testing.T) {
t.Parallel()

client := NewAgateBeaconAPIClient()
client := NewDefaultBeaconAPI()

require.NotNil(t, client)
require.Nil(t, client.clt)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package data_aggregator
package client

import (
"context"
Expand All @@ -10,35 +10,27 @@ import (
datav1 "github.com/quartz-technology/redax-go/sdk/data/v1"
)

// RelayAPIClient is used to collect data from a relay.
type RelayAPIClient interface {
// GetRelayDataForSlot retrieves the bids received and delivered to a relay for a specific slot.
GetRelayDataForSlot(ctx context.Context, slot phase0.Slot) (*common.RelayData, error)
// GetRelayAPIURL returns the API URL of the relay this client makes request to.
GetRelayAPIURL() string
}

// The AgateRelayAPIClient is an implementation of the RelayAPIClient using the Quartz Technology
// The DefaultRelayAPI is an implementation of the RelayAPI using the Quartz Technology
// <redax> SDK to interact with the relay.
type AgateRelayAPIClient struct {
type DefaultRelayAPI struct {
sdk *sdk.RelaySDK
relayAPIURL string
}

// NewAgateRelayAPIClient creates an empty and non-initialized AgateRelayAPIClient.
// NewDefaultRelayAPI creates an empty and non-initialized DefaultRelayAPI.
// It stores the relay API URL used to create the <redax> SDK.
func NewAgateRelayAPIClient(relayAPIURL string) *AgateRelayAPIClient {
return &AgateRelayAPIClient{
func NewDefaultRelayAPI(relayAPIURL string) *DefaultRelayAPI {
return &DefaultRelayAPI{
sdk: nil,
relayAPIURL: relayAPIURL,
}
}

// Init initializes the AgateRelayAPIClient using the previously stored relay API URL.
func (client *AgateRelayAPIClient) Init() error {
// Init initializes the DefaultRelayAPI using the previously stored relay API URL.
func (client *DefaultRelayAPI) Init() error {
clt, err := relay.NewClient(relay.WithAPIURL(client.relayAPIURL))
if err != nil {
return NewAgateRelayAPIClientSDKInitializationError(err)
return NewDefaultRelayAPIServiceInitializationError(err)
}

client.sdk = sdk.NewRelaySDK(clt)
Expand All @@ -47,7 +39,7 @@ func (client *AgateRelayAPIClient) Init() error {
}

// GetRelayDataForSlot implements RelayAPIClient.GetRelayDataForSlot.
func (client *AgateRelayAPIClient) GetRelayDataForSlot(
func (client *DefaultRelayAPI) GetRelayDataForSlot(
ctx context.Context,
slot phase0.Slot,
) (*common.RelayData, error) {
Expand All @@ -61,20 +53,20 @@ func (client *AgateRelayAPIClient) GetRelayDataForSlot(
datav1.NewGetBidsReceivedRequest().WithSlot(slot),
)
if err != nil {
return nil, NewAgateRelayAPIClientBidsReceivedRetrievalError(err)
return nil, NewDefaultRelayAPIBidsReceivedRetrievalError(err)
}

res.BidsDelivered, err = client.sdk.Data().V1().GetBidsDelivered(
ctx,
datav1.NewGetBidsDeliveredRequest().WithSlot(slot),
)
if err != nil {
return nil, NewAgateRelayAPIClientBidsDeliveredRetrievalError(err)
return nil, NewDefaultRelayAPIBidsDeliveredRetrievalError(err)
}

return res, nil
}

func (client *AgateRelayAPIClient) GetRelayAPIURL() string {
func (client *DefaultRelayAPI) GetRelayAPIURL() string {
return client.relayAPIURL
}
21 changes: 21 additions & 0 deletions indexer/client/default_relay_api_errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package client

import "fmt"

// NewDefaultRelayAPIServiceInitializationError is raised when the DefaultRelayAPI
// initialization has failed because the <redax> SDK could not be initialized.
func NewDefaultRelayAPIServiceInitializationError(err error) error {
return fmt.Errorf("failed to initialize default relay API SDK: %w", err)
}

// NewDefaultRelayAPIBidsReceivedRetrievalError is raised if the request made to the relay to
// get the bids received for a specific slot has failed.
func NewDefaultRelayAPIBidsReceivedRetrievalError(err error) error {
return fmt.Errorf("default relay API SDK failed to retrieve bids received by relay: %w", err)
}

// NewDefaultRelayAPIBidsDeliveredRetrievalError is raised if the request made to the relay to
// get the bids delivered for a specific slot has failed.
func NewDefaultRelayAPIBidsDeliveredRetrievalError(err error) error {
return fmt.Errorf("default relay API SDK failed to retrieve bids delivered by relay: %w", err)
}
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
package data_aggregator
package client

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestNewAgateRelayAPIClient(t *testing.T) {
func TestNewDefaultRelayAPI(t *testing.T) {
t.Parallel()

client := NewAgateRelayAPIClient("https://example.com")
client := NewDefaultRelayAPI("https://example.com")

require.NotNil(t, client)
require.Nil(t, client.sdk)
}

func TestAgateRelayAPIClient_Init(t *testing.T) {
func TestDefaultRelayAPI_Init(t *testing.T) {
t.Parallel()

testCases := map[string]struct {
Expand All @@ -38,7 +38,7 @@ func TestAgateRelayAPIClient_Init(t *testing.T) {
t.Run(name, func(t *testing.T) {
t.Parallel()

client := NewAgateRelayAPIClient(tc.relayAPIURL)
client := NewDefaultRelayAPI(tc.relayAPIURL)
err := client.Init()

if tc.success {
Expand All @@ -52,11 +52,11 @@ func TestAgateRelayAPIClient_Init(t *testing.T) {
}
}

func TestAgateRelayAPIClient_GetRelayAPIURL(t *testing.T) {
func TestDefaultRelayAPI_GetRelayAPIURL(t *testing.T) {
t.Parallel()

relayAPIURL := "https://example.com"
client := NewAgateRelayAPIClient(relayAPIURL)
client := NewDefaultRelayAPI(relayAPIURL)

require.Equal(t, relayAPIURL, client.GetRelayAPIURL())
}
16 changes: 16 additions & 0 deletions indexer/client/relay_api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package client

import (
"context"

"github.com/attestantio/go-eth2-client/spec/phase0"
"github.com/quartz-technology/agate/indexer/common"
)

// RelayAPI is used to collect data from a relay.
type RelayAPI interface {
// GetRelayDataForSlot retrieves the bids received and delivered to a relay for a specific slot.
GetRelayDataForSlot(ctx context.Context, slot phase0.Slot) (*common.RelayData, error)
// GetRelayAPIURL returns the API URL of the relay this client makes request to.
GetRelayAPIURL() string
}
Comment on lines +10 to +16
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like that

Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package data_preprocessor
package common

import "github.com/quartz-technology/agate/indexer/storage_manager/store/dto"
import "github.com/quartz-technology/agate/indexer/storage/store/dto"

// DataPreprocessorOutput is the default data structure produced as the output of the
// pre-process task.
Expand Down
Loading