Skip to content

Commit

Permalink
refactor: simplify indexer package (#14)
Browse files Browse the repository at this point in the history
Simplified the indexer package by refactoring contained sub-services location and package names.

Signed-off-by: Luca Georges Francois <luca@quartz.technology>
  • Loading branch information
0xpanoramix authored Jan 24, 2024
1 parent 3149b0d commit 663aea3
Show file tree
Hide file tree
Showing 49 changed files with 473 additions and 469 deletions.
34 changes: 17 additions & 17 deletions cmd/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ import (
"sync"

"github.com/quartz-technology/agate/indexer"
"github.com/quartz-technology/agate/indexer/data_aggregator"
"github.com/quartz-technology/agate/indexer/data_preprocessor"
"github.com/quartz-technology/agate/indexer/head_listener"
"github.com/quartz-technology/agate/indexer/storage_manager"
"github.com/quartz-technology/agate/indexer/storage_manager/store/dto"
"github.com/quartz-technology/agate/indexer/storage_manager/store/postgres"
"github.com/quartz-technology/agate/indexer/client"
"github.com/quartz-technology/agate/indexer/data"
"github.com/quartz-technology/agate/indexer/events"
"github.com/quartz-technology/agate/indexer/storage"
"github.com/quartz-technology/agate/indexer/storage/store/dto"
"github.com/quartz-technology/agate/indexer/storage/store/postgres"
"github.com/rs/zerolog/log"
"github.com/spf13/cobra"
"github.com/spf13/viper"
Expand Down Expand Up @@ -68,7 +68,7 @@ of relays. Once preprocessed, this data is stored in a database.
//nolint:wrapcheck,funlen
func run(ctx context.Context, configuration *indexer.Configuration) error {
// Performs the database migration.
migrator := storage_manager.NewDefaultDatabaseMigrator()
migrator := storage.NewDefaultDatabaseMigrator()

if err := migrator.Init(
configuration.DatabaseMigrationSourceURL,
Expand All @@ -88,22 +88,22 @@ func run(ctx context.Context, configuration *indexer.Configuration) error {
log.Info().Msg("database migrations applied!")

// Sets up beacon API client.
beaconAPIClient := head_listener.NewAgateBeaconAPIClient()
beaconAPIClient := client.NewDefaultBeaconAPI()
if err := beaconAPIClient.Init(ctx, configuration.BeaconAPIURL); err != nil {
// TODO: Wrap error.
return err
}

// Sets up head listener.
listener := head_listener.NewAgateHeadListener()
listener := events.NewDefaultHeadListener()
listener.Init(beaconAPIClient)

// Sets up relay API clients.
relayAPIClients := make([]data_aggregator.RelayAPIClient, 0)
relayAPIClients := make([]client.RelayAPI, 0)
relaysDTOs := make([]*dto.Relay, 0)

for _, relayAPIURL := range configuration.RelayAPIURLs {
relayAPIClient := data_aggregator.NewAgateRelayAPIClient(relayAPIURL)
relayAPIClient := client.NewDefaultRelayAPI(relayAPIURL)

if err := relayAPIClient.Init(); err != nil {
// TODO: Wrap error.
Expand All @@ -115,11 +115,11 @@ func run(ctx context.Context, configuration *indexer.Configuration) error {
}

// Sets up data aggregator.
aggregator := data_aggregator.NewAgateDataAggregator()
aggregator := data.NewDefaultAggregator()
aggregator.Init(relayAPIClients...)

// Creates data preprocessor.
preprocessor := data_preprocessor.NewDataPreprocessor()
preprocessor := data.NewPreprocessor()

// Sets up the database store.
store := postgres.NewDefaultStore()
Expand All @@ -129,16 +129,16 @@ func run(ctx context.Context, configuration *indexer.Configuration) error {
}

// Sets up storage manager and stores provided relays.
storage := storage_manager.NewDefaultStorageManager()
storage.Init(store)
storageManager := storage.NewDefaultManager()
storageManager.Init(store)

if err := storage.StoreRelays(ctx, relaysDTOs); err != nil {
if err := storageManager.StoreRelays(ctx, relaysDTOs); err != nil {
// TODO: Wrap error.
return err
}

// Sets up main indexer service.
service := indexer.NewIndexer(listener, aggregator, preprocessor, storage)
service := indexer.New(listener, aggregator, preprocessor, storageManager)

log.Info().Msg("indexer service starting..")
// Starts the indexer service. Blocks until context is done.
Expand Down
2 changes: 2 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ services:
- agate-net
volumes:
- ./agate-config-docker.yaml:/app/agate-config.yaml
stop_grace_period: "10m"

database:
hostname: database
Expand All @@ -34,6 +35,7 @@ services:
retries: 5
volumes:
- indexer-data:/var/lib/postgresql/data
stop_grace_period: "10m"

volumes:
indexer-data:
Expand Down
14 changes: 14 additions & 0 deletions indexer/client/beacon_api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package client

import (
"context"

client "github.com/attestantio/go-eth2-client"
)

// BeaconAPI is used to interact with a beacon node.
type BeaconAPI interface {
// SubscribeToHeadEvents is used to create a subscription to new head events and will perform
// the handler's logic.
SubscribeToHeadEvents(ctx context.Context, handler client.EventHandlerFunc) error
}
54 changes: 54 additions & 0 deletions indexer/client/default_beacon_api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package client

import (
"context"

client "github.com/attestantio/go-eth2-client"
"github.com/attestantio/go-eth2-client/http"
"github.com/rs/zerolog"
)

// DefaultBeaconAPI is the default implementation of the BeaconAPI for agate,
// using the go-eth2-client library to send queries to the beacon node.
type DefaultBeaconAPI struct {
clt *http.Service
}

// NewDefaultBeaconAPI creates a new and non-initialized DefaultBeaconAPI.
func NewDefaultBeaconAPI() *DefaultBeaconAPI {
return &DefaultBeaconAPI{
clt: nil,
}
}

// Init initializes an DefaultBeaconAPI using the beacon API URL to connect to the beacon node.
func (client *DefaultBeaconAPI) Init(ctx context.Context, beaconAPIURL string) error {
var ok bool

clt, err := http.New(
ctx,
http.WithAddress(beaconAPIURL),
http.WithLogLevel(zerolog.Disabled),
)
if err != nil {
return NewDefaultBeaconAPIServiceInitializationError(err)
}

client.clt, ok = clt.(*http.Service)
if !ok {
return ErrDefaultBeaconAPIServiceTypeAssertion
}

return nil
}

func (client *DefaultBeaconAPI) SubscribeToHeadEvents(
ctx context.Context,
handler client.EventHandlerFunc,
) error {
if err := client.clt.Events(ctx, []string{"head"}, handler); err != nil {
return NewDefaultBeaconAPIEventSubscriptionError(err)
}

return nil
}
20 changes: 20 additions & 0 deletions indexer/client/default_beacon_api_errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package client

import (
"errors"
"fmt"
)

var ErrDefaultBeaconAPIServiceTypeAssertion = errors.New("failed to type assert beacon API HTTP service")

// NewDefaultBeaconAPIServiceInitializationError is raised if the client used to connect to the beacon
// node fails to initialize.
func NewDefaultBeaconAPIServiceInitializationError(err error) error {
return fmt.Errorf("failed to initialize default beacon API HTTP service: %w", err)
}

// NewDefaultBeaconAPIEventSubscriptionError is raised if the client connected to the node fails to create an event
// subscription.
func NewDefaultBeaconAPIEventSubscriptionError(err error) error {
return fmt.Errorf("default beacon API HTTP service failed to create event subscription: %w", err)
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
package head_listener
package client

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestNewAgateBeaconAPIClient(t *testing.T) {
func TestNewDefaultBeaconAPI(t *testing.T) {
t.Parallel()

client := NewAgateBeaconAPIClient()
client := NewDefaultBeaconAPI()

require.NotNil(t, client)
require.Nil(t, client.clt)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package data_aggregator
package client

import (
"context"
Expand All @@ -10,35 +10,27 @@ import (
datav1 "github.com/quartz-technology/redax-go/sdk/data/v1"
)

// RelayAPIClient is used to collect data from a relay.
type RelayAPIClient interface {
// GetRelayDataForSlot retrieves the bids received and delivered to a relay for a specific slot.
GetRelayDataForSlot(ctx context.Context, slot phase0.Slot) (*common.RelayData, error)
// GetRelayAPIURL returns the API URL of the relay this client makes request to.
GetRelayAPIURL() string
}

// The AgateRelayAPIClient is an implementation of the RelayAPIClient using the Quartz Technology
// The DefaultRelayAPI is an implementation of the RelayAPI using the Quartz Technology
// <redax> SDK to interact with the relay.
type AgateRelayAPIClient struct {
type DefaultRelayAPI struct {
sdk *sdk.RelaySDK
relayAPIURL string
}

// NewAgateRelayAPIClient creates an empty and non-initialized AgateRelayAPIClient.
// NewDefaultRelayAPI creates an empty and non-initialized DefaultRelayAPI.
// It stores the relay API URL used to create the <redax> SDK.
func NewAgateRelayAPIClient(relayAPIURL string) *AgateRelayAPIClient {
return &AgateRelayAPIClient{
func NewDefaultRelayAPI(relayAPIURL string) *DefaultRelayAPI {
return &DefaultRelayAPI{
sdk: nil,
relayAPIURL: relayAPIURL,
}
}

// Init initializes the AgateRelayAPIClient using the previously stored relay API URL.
func (client *AgateRelayAPIClient) Init() error {
// Init initializes the DefaultRelayAPI using the previously stored relay API URL.
func (client *DefaultRelayAPI) Init() error {
clt, err := relay.NewClient(relay.WithAPIURL(client.relayAPIURL))
if err != nil {
return NewAgateRelayAPIClientSDKInitializationError(err)
return NewDefaultRelayAPIServiceInitializationError(err)
}

client.sdk = sdk.NewRelaySDK(clt)
Expand All @@ -47,7 +39,7 @@ func (client *AgateRelayAPIClient) Init() error {
}

// GetRelayDataForSlot implements RelayAPIClient.GetRelayDataForSlot.
func (client *AgateRelayAPIClient) GetRelayDataForSlot(
func (client *DefaultRelayAPI) GetRelayDataForSlot(
ctx context.Context,
slot phase0.Slot,
) (*common.RelayData, error) {
Expand All @@ -61,20 +53,20 @@ func (client *AgateRelayAPIClient) GetRelayDataForSlot(
datav1.NewGetBidsReceivedRequest().WithSlot(slot),
)
if err != nil {
return nil, NewAgateRelayAPIClientBidsReceivedRetrievalError(err)
return nil, NewDefaultRelayAPIBidsReceivedRetrievalError(err)
}

res.BidsDelivered, err = client.sdk.Data().V1().GetBidsDelivered(
ctx,
datav1.NewGetBidsDeliveredRequest().WithSlot(slot),
)
if err != nil {
return nil, NewAgateRelayAPIClientBidsDeliveredRetrievalError(err)
return nil, NewDefaultRelayAPIBidsDeliveredRetrievalError(err)
}

return res, nil
}

func (client *AgateRelayAPIClient) GetRelayAPIURL() string {
func (client *DefaultRelayAPI) GetRelayAPIURL() string {
return client.relayAPIURL
}
21 changes: 21 additions & 0 deletions indexer/client/default_relay_api_errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package client

import "fmt"

// NewDefaultRelayAPIServiceInitializationError is raised when the DefaultRelayAPI
// initialization has failed because the <redax> SDK could not be initialized.
func NewDefaultRelayAPIServiceInitializationError(err error) error {
return fmt.Errorf("failed to initialize default relay API SDK: %w", err)
}

// NewDefaultRelayAPIBidsReceivedRetrievalError is raised if the request made to the relay to
// get the bids received for a specific slot has failed.
func NewDefaultRelayAPIBidsReceivedRetrievalError(err error) error {
return fmt.Errorf("default relay API SDK failed to retrieve bids received by relay: %w", err)
}

// NewDefaultRelayAPIBidsDeliveredRetrievalError is raised if the request made to the relay to
// get the bids delivered for a specific slot has failed.
func NewDefaultRelayAPIBidsDeliveredRetrievalError(err error) error {
return fmt.Errorf("default relay API SDK failed to retrieve bids delivered by relay: %w", err)
}
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
package data_aggregator
package client

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestNewAgateRelayAPIClient(t *testing.T) {
func TestNewDefaultRelayAPI(t *testing.T) {
t.Parallel()

client := NewAgateRelayAPIClient("https://example.com")
client := NewDefaultRelayAPI("https://example.com")

require.NotNil(t, client)
require.Nil(t, client.sdk)
}

func TestAgateRelayAPIClient_Init(t *testing.T) {
func TestDefaultRelayAPI_Init(t *testing.T) {
t.Parallel()

testCases := map[string]struct {
Expand All @@ -38,7 +38,7 @@ func TestAgateRelayAPIClient_Init(t *testing.T) {
t.Run(name, func(t *testing.T) {
t.Parallel()

client := NewAgateRelayAPIClient(tc.relayAPIURL)
client := NewDefaultRelayAPI(tc.relayAPIURL)
err := client.Init()

if tc.success {
Expand All @@ -52,11 +52,11 @@ func TestAgateRelayAPIClient_Init(t *testing.T) {
}
}

func TestAgateRelayAPIClient_GetRelayAPIURL(t *testing.T) {
func TestDefaultRelayAPI_GetRelayAPIURL(t *testing.T) {
t.Parallel()

relayAPIURL := "https://example.com"
client := NewAgateRelayAPIClient(relayAPIURL)
client := NewDefaultRelayAPI(relayAPIURL)

require.Equal(t, relayAPIURL, client.GetRelayAPIURL())
}
16 changes: 16 additions & 0 deletions indexer/client/relay_api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package client

import (
"context"

"github.com/attestantio/go-eth2-client/spec/phase0"
"github.com/quartz-technology/agate/indexer/common"
)

// RelayAPI is used to collect data from a relay.
type RelayAPI interface {
// GetRelayDataForSlot retrieves the bids received and delivered to a relay for a specific slot.
GetRelayDataForSlot(ctx context.Context, slot phase0.Slot) (*common.RelayData, error)
// GetRelayAPIURL returns the API URL of the relay this client makes request to.
GetRelayAPIURL() string
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package data_preprocessor
package common

import "github.com/quartz-technology/agate/indexer/storage_manager/store/dto"
import "github.com/quartz-technology/agate/indexer/storage/store/dto"

// DataPreprocessorOutput is the default data structure produced as the output of the
// pre-process task.
Expand Down
Loading

0 comments on commit 663aea3

Please sign in to comment.