diff --git a/cmd/indexer.go b/cmd/indexer.go index 62c5162..98987a8 100644 --- a/cmd/indexer.go +++ b/cmd/indexer.go @@ -7,12 +7,12 @@ import ( "sync" "github.com/quartz-technology/agate/indexer" - "github.com/quartz-technology/agate/indexer/data_aggregator" - "github.com/quartz-technology/agate/indexer/data_preprocessor" - "github.com/quartz-technology/agate/indexer/head_listener" - "github.com/quartz-technology/agate/indexer/storage_manager" - "github.com/quartz-technology/agate/indexer/storage_manager/store/dto" - "github.com/quartz-technology/agate/indexer/storage_manager/store/postgres" + "github.com/quartz-technology/agate/indexer/client" + "github.com/quartz-technology/agate/indexer/data" + "github.com/quartz-technology/agate/indexer/events" + "github.com/quartz-technology/agate/indexer/storage" + "github.com/quartz-technology/agate/indexer/storage/store/dto" + "github.com/quartz-technology/agate/indexer/storage/store/postgres" "github.com/rs/zerolog/log" "github.com/spf13/cobra" "github.com/spf13/viper" @@ -68,7 +68,7 @@ of relays. Once preprocessed, this data is stored in a database. //nolint:wrapcheck,funlen func run(ctx context.Context, configuration *indexer.Configuration) error { // Performs the database migration. - migrator := storage_manager.NewDefaultDatabaseMigrator() + migrator := storage.NewDefaultDatabaseMigrator() if err := migrator.Init( configuration.DatabaseMigrationSourceURL, @@ -88,22 +88,22 @@ func run(ctx context.Context, configuration *indexer.Configuration) error { log.Info().Msg("database migrations applied!") // Sets up beacon API client. - beaconAPIClient := head_listener.NewAgateBeaconAPIClient() + beaconAPIClient := client.NewDefaultBeaconAPI() if err := beaconAPIClient.Init(ctx, configuration.BeaconAPIURL); err != nil { // TODO: Wrap error. return err } // Sets up head listener. - listener := head_listener.NewAgateHeadListener() + listener := events.NewDefaultHeadListener() listener.Init(beaconAPIClient) // Sets up relay API clients. - relayAPIClients := make([]data_aggregator.RelayAPIClient, 0) + relayAPIClients := make([]client.RelayAPI, 0) relaysDTOs := make([]*dto.Relay, 0) for _, relayAPIURL := range configuration.RelayAPIURLs { - relayAPIClient := data_aggregator.NewAgateRelayAPIClient(relayAPIURL) + relayAPIClient := client.NewDefaultRelayAPI(relayAPIURL) if err := relayAPIClient.Init(); err != nil { // TODO: Wrap error. @@ -115,11 +115,11 @@ func run(ctx context.Context, configuration *indexer.Configuration) error { } // Sets up data aggregator. - aggregator := data_aggregator.NewAgateDataAggregator() + aggregator := data.NewDefaultAggregator() aggregator.Init(relayAPIClients...) // Creates data preprocessor. - preprocessor := data_preprocessor.NewDataPreprocessor() + preprocessor := data.NewPreprocessor() // Sets up the database store. store := postgres.NewDefaultStore() @@ -129,16 +129,16 @@ func run(ctx context.Context, configuration *indexer.Configuration) error { } // Sets up storage manager and stores provided relays. - storage := storage_manager.NewDefaultStorageManager() - storage.Init(store) + storageManager := storage.NewDefaultManager() + storageManager.Init(store) - if err := storage.StoreRelays(ctx, relaysDTOs); err != nil { + if err := storageManager.StoreRelays(ctx, relaysDTOs); err != nil { // TODO: Wrap error. return err } // Sets up main indexer service. - service := indexer.NewIndexer(listener, aggregator, preprocessor, storage) + service := indexer.New(listener, aggregator, preprocessor, storageManager) log.Info().Msg("indexer service starting..") // Starts the indexer service. Blocks until context is done. diff --git a/docker-compose.yaml b/docker-compose.yaml index 06662ee..fb2fe63 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -15,6 +15,7 @@ services: - agate-net volumes: - ./agate-config-docker.yaml:/app/agate-config.yaml + stop_grace_period: "10m" database: hostname: database @@ -34,6 +35,7 @@ services: retries: 5 volumes: - indexer-data:/var/lib/postgresql/data + stop_grace_period: "10m" volumes: indexer-data: diff --git a/indexer/client/beacon_api.go b/indexer/client/beacon_api.go new file mode 100644 index 0000000..1ac0231 --- /dev/null +++ b/indexer/client/beacon_api.go @@ -0,0 +1,14 @@ +package client + +import ( + "context" + + client "github.com/attestantio/go-eth2-client" +) + +// BeaconAPI is used to interact with a beacon node. +type BeaconAPI interface { + // SubscribeToHeadEvents is used to create a subscription to new head events and will perform + // the handler's logic. + SubscribeToHeadEvents(ctx context.Context, handler client.EventHandlerFunc) error +} diff --git a/indexer/client/default_beacon_api.go b/indexer/client/default_beacon_api.go new file mode 100644 index 0000000..2175810 --- /dev/null +++ b/indexer/client/default_beacon_api.go @@ -0,0 +1,54 @@ +package client + +import ( + "context" + + client "github.com/attestantio/go-eth2-client" + "github.com/attestantio/go-eth2-client/http" + "github.com/rs/zerolog" +) + +// DefaultBeaconAPI is the default implementation of the BeaconAPI for agate, +// using the go-eth2-client library to send queries to the beacon node. +type DefaultBeaconAPI struct { + clt *http.Service +} + +// NewDefaultBeaconAPI creates a new and non-initialized DefaultBeaconAPI. +func NewDefaultBeaconAPI() *DefaultBeaconAPI { + return &DefaultBeaconAPI{ + clt: nil, + } +} + +// Init initializes an DefaultBeaconAPI using the beacon API URL to connect to the beacon node. +func (client *DefaultBeaconAPI) Init(ctx context.Context, beaconAPIURL string) error { + var ok bool + + clt, err := http.New( + ctx, + http.WithAddress(beaconAPIURL), + http.WithLogLevel(zerolog.Disabled), + ) + if err != nil { + return NewDefaultBeaconAPIServiceInitializationError(err) + } + + client.clt, ok = clt.(*http.Service) + if !ok { + return ErrDefaultBeaconAPIServiceTypeAssertion + } + + return nil +} + +func (client *DefaultBeaconAPI) SubscribeToHeadEvents( + ctx context.Context, + handler client.EventHandlerFunc, +) error { + if err := client.clt.Events(ctx, []string{"head"}, handler); err != nil { + return NewDefaultBeaconAPIEventSubscriptionError(err) + } + + return nil +} diff --git a/indexer/client/default_beacon_api_errors.go b/indexer/client/default_beacon_api_errors.go new file mode 100644 index 0000000..53b02a0 --- /dev/null +++ b/indexer/client/default_beacon_api_errors.go @@ -0,0 +1,20 @@ +package client + +import ( + "errors" + "fmt" +) + +var ErrDefaultBeaconAPIServiceTypeAssertion = errors.New("failed to type assert beacon API HTTP service") + +// NewDefaultBeaconAPIServiceInitializationError is raised if the client used to connect to the beacon +// node fails to initialize. +func NewDefaultBeaconAPIServiceInitializationError(err error) error { + return fmt.Errorf("failed to initialize default beacon API HTTP service: %w", err) +} + +// NewDefaultBeaconAPIEventSubscriptionError is raised if the client connected to the node fails to create an event +// subscription. +func NewDefaultBeaconAPIEventSubscriptionError(err error) error { + return fmt.Errorf("default beacon API HTTP service failed to create event subscription: %w", err) +} diff --git a/indexer/head_listener/beacon_api_client_test.go b/indexer/client/default_beacon_api_test.go similarity index 55% rename from indexer/head_listener/beacon_api_client_test.go rename to indexer/client/default_beacon_api_test.go index 3b7e0f4..79856be 100644 --- a/indexer/head_listener/beacon_api_client_test.go +++ b/indexer/client/default_beacon_api_test.go @@ -1,4 +1,4 @@ -package head_listener +package client import ( "testing" @@ -6,10 +6,10 @@ import ( "github.com/stretchr/testify/require" ) -func TestNewAgateBeaconAPIClient(t *testing.T) { +func TestNewDefaultBeaconAPI(t *testing.T) { t.Parallel() - client := NewAgateBeaconAPIClient() + client := NewDefaultBeaconAPI() require.NotNil(t, client) require.Nil(t, client.clt) diff --git a/indexer/data_aggregator/relay_api_client.go b/indexer/client/default_relay_api.go similarity index 50% rename from indexer/data_aggregator/relay_api_client.go rename to indexer/client/default_relay_api.go index 4b5178a..87916a4 100644 --- a/indexer/data_aggregator/relay_api_client.go +++ b/indexer/client/default_relay_api.go @@ -1,4 +1,4 @@ -package data_aggregator +package client import ( "context" @@ -10,35 +10,27 @@ import ( datav1 "github.com/quartz-technology/redax-go/sdk/data/v1" ) -// RelayAPIClient is used to collect data from a relay. -type RelayAPIClient interface { - // GetRelayDataForSlot retrieves the bids received and delivered to a relay for a specific slot. - GetRelayDataForSlot(ctx context.Context, slot phase0.Slot) (*common.RelayData, error) - // GetRelayAPIURL returns the API URL of the relay this client makes request to. - GetRelayAPIURL() string -} - -// The AgateRelayAPIClient is an implementation of the RelayAPIClient using the Quartz Technology +// The DefaultRelayAPI is an implementation of the RelayAPI using the Quartz Technology // SDK to interact with the relay. -type AgateRelayAPIClient struct { +type DefaultRelayAPI struct { sdk *sdk.RelaySDK relayAPIURL string } -// NewAgateRelayAPIClient creates an empty and non-initialized AgateRelayAPIClient. +// NewDefaultRelayAPI creates an empty and non-initialized DefaultRelayAPI. // It stores the relay API URL used to create the SDK. -func NewAgateRelayAPIClient(relayAPIURL string) *AgateRelayAPIClient { - return &AgateRelayAPIClient{ +func NewDefaultRelayAPI(relayAPIURL string) *DefaultRelayAPI { + return &DefaultRelayAPI{ sdk: nil, relayAPIURL: relayAPIURL, } } -// Init initializes the AgateRelayAPIClient using the previously stored relay API URL. -func (client *AgateRelayAPIClient) Init() error { +// Init initializes the DefaultRelayAPI using the previously stored relay API URL. +func (client *DefaultRelayAPI) Init() error { clt, err := relay.NewClient(relay.WithAPIURL(client.relayAPIURL)) if err != nil { - return NewAgateRelayAPIClientSDKInitializationError(err) + return NewDefaultRelayAPIServiceInitializationError(err) } client.sdk = sdk.NewRelaySDK(clt) @@ -47,7 +39,7 @@ func (client *AgateRelayAPIClient) Init() error { } // GetRelayDataForSlot implements RelayAPIClient.GetRelayDataForSlot. -func (client *AgateRelayAPIClient) GetRelayDataForSlot( +func (client *DefaultRelayAPI) GetRelayDataForSlot( ctx context.Context, slot phase0.Slot, ) (*common.RelayData, error) { @@ -61,7 +53,7 @@ func (client *AgateRelayAPIClient) GetRelayDataForSlot( datav1.NewGetBidsReceivedRequest().WithSlot(slot), ) if err != nil { - return nil, NewAgateRelayAPIClientBidsReceivedRetrievalError(err) + return nil, NewDefaultRelayAPIBidsReceivedRetrievalError(err) } res.BidsDelivered, err = client.sdk.Data().V1().GetBidsDelivered( @@ -69,12 +61,12 @@ func (client *AgateRelayAPIClient) GetRelayDataForSlot( datav1.NewGetBidsDeliveredRequest().WithSlot(slot), ) if err != nil { - return nil, NewAgateRelayAPIClientBidsDeliveredRetrievalError(err) + return nil, NewDefaultRelayAPIBidsDeliveredRetrievalError(err) } return res, nil } -func (client *AgateRelayAPIClient) GetRelayAPIURL() string { +func (client *DefaultRelayAPI) GetRelayAPIURL() string { return client.relayAPIURL } diff --git a/indexer/client/default_relay_api_errors.go b/indexer/client/default_relay_api_errors.go new file mode 100644 index 0000000..9159ff3 --- /dev/null +++ b/indexer/client/default_relay_api_errors.go @@ -0,0 +1,21 @@ +package client + +import "fmt" + +// NewDefaultRelayAPIServiceInitializationError is raised when the DefaultRelayAPI +// initialization has failed because the SDK could not be initialized. +func NewDefaultRelayAPIServiceInitializationError(err error) error { + return fmt.Errorf("failed to initialize default relay API SDK: %w", err) +} + +// NewDefaultRelayAPIBidsReceivedRetrievalError is raised if the request made to the relay to +// get the bids received for a specific slot has failed. +func NewDefaultRelayAPIBidsReceivedRetrievalError(err error) error { + return fmt.Errorf("default relay API SDK failed to retrieve bids received by relay: %w", err) +} + +// NewDefaultRelayAPIBidsDeliveredRetrievalError is raised if the request made to the relay to +// get the bids delivered for a specific slot has failed. +func NewDefaultRelayAPIBidsDeliveredRetrievalError(err error) error { + return fmt.Errorf("default relay API SDK failed to retrieve bids delivered by relay: %w", err) +} diff --git a/indexer/data_aggregator/relay_api_client_test.go b/indexer/client/default_relay_api_test.go similarity index 71% rename from indexer/data_aggregator/relay_api_client_test.go rename to indexer/client/default_relay_api_test.go index a69e645..edae75b 100644 --- a/indexer/data_aggregator/relay_api_client_test.go +++ b/indexer/client/default_relay_api_test.go @@ -1,4 +1,4 @@ -package data_aggregator +package client import ( "testing" @@ -6,16 +6,16 @@ import ( "github.com/stretchr/testify/require" ) -func TestNewAgateRelayAPIClient(t *testing.T) { +func TestNewDefaultRelayAPI(t *testing.T) { t.Parallel() - client := NewAgateRelayAPIClient("https://example.com") + client := NewDefaultRelayAPI("https://example.com") require.NotNil(t, client) require.Nil(t, client.sdk) } -func TestAgateRelayAPIClient_Init(t *testing.T) { +func TestDefaultRelayAPI_Init(t *testing.T) { t.Parallel() testCases := map[string]struct { @@ -38,7 +38,7 @@ func TestAgateRelayAPIClient_Init(t *testing.T) { t.Run(name, func(t *testing.T) { t.Parallel() - client := NewAgateRelayAPIClient(tc.relayAPIURL) + client := NewDefaultRelayAPI(tc.relayAPIURL) err := client.Init() if tc.success { @@ -52,11 +52,11 @@ func TestAgateRelayAPIClient_Init(t *testing.T) { } } -func TestAgateRelayAPIClient_GetRelayAPIURL(t *testing.T) { +func TestDefaultRelayAPI_GetRelayAPIURL(t *testing.T) { t.Parallel() relayAPIURL := "https://example.com" - client := NewAgateRelayAPIClient(relayAPIURL) + client := NewDefaultRelayAPI(relayAPIURL) require.Equal(t, relayAPIURL, client.GetRelayAPIURL()) } diff --git a/indexer/client/relay_api.go b/indexer/client/relay_api.go new file mode 100644 index 0000000..fee0e2e --- /dev/null +++ b/indexer/client/relay_api.go @@ -0,0 +1,16 @@ +package client + +import ( + "context" + + "github.com/attestantio/go-eth2-client/spec/phase0" + "github.com/quartz-technology/agate/indexer/common" +) + +// RelayAPI is used to collect data from a relay. +type RelayAPI interface { + // GetRelayDataForSlot retrieves the bids received and delivered to a relay for a specific slot. + GetRelayDataForSlot(ctx context.Context, slot phase0.Slot) (*common.RelayData, error) + // GetRelayAPIURL returns the API URL of the relay this client makes request to. + GetRelayAPIURL() string +} diff --git a/indexer/data_preprocessor/preprocessed_data.go b/indexer/common/preprocessor_data.go similarity index 72% rename from indexer/data_preprocessor/preprocessed_data.go rename to indexer/common/preprocessor_data.go index edf9452..702e584 100644 --- a/indexer/data_preprocessor/preprocessed_data.go +++ b/indexer/common/preprocessor_data.go @@ -1,6 +1,6 @@ -package data_preprocessor +package common -import "github.com/quartz-technology/agate/indexer/storage_manager/store/dto" +import "github.com/quartz-technology/agate/indexer/storage/store/dto" // DataPreprocessorOutput is the default data structure produced as the output of the // pre-process task. diff --git a/indexer/data/aggregator.go b/indexer/data/aggregator.go new file mode 100644 index 0000000..087f110 --- /dev/null +++ b/indexer/data/aggregator.go @@ -0,0 +1,17 @@ +package data + +import ( + "context" + + "github.com/attestantio/go-eth2-client/spec/phase0" + "github.com/quartz-technology/agate/indexer/common" +) + +// Aggregator is used to aggregate data from multiple relays. +type Aggregator interface { + // AggregateDataForSlotFromRelays collects data from multiple relays for a specific slot. + AggregateDataForSlotFromRelays( + ctx context.Context, + slot phase0.Slot, + ) (*common.AggregatedRelayData, error) +} diff --git a/indexer/data_aggregator/data_aggregator.go b/indexer/data/default_aggregator.go similarity index 64% rename from indexer/data_aggregator/data_aggregator.go rename to indexer/data/default_aggregator.go index 251fec3..71ab88b 100644 --- a/indexer/data_aggregator/data_aggregator.go +++ b/indexer/data/default_aggregator.go @@ -1,26 +1,18 @@ -package data_aggregator +package data import ( "context" "sync" "github.com/attestantio/go-eth2-client/spec/phase0" + "github.com/quartz-technology/agate/indexer/client" "github.com/quartz-technology/agate/indexer/common" ) -// DataAggregator is used to aggregate data from multiple relays. -type DataAggregator interface { - // AggregateDataForSlotFromRelays collects data from multiple relays for a specific slot. - AggregateDataForSlotFromRelays( - ctx context.Context, - slot phase0.Slot, - ) (*common.AggregatedRelayData, error) -} - // relayResponse is used by the AgateDataAggregator during the map/reduce phase of the relay data // aggregation process. // -// As the AgateDataAggregator uses one goroutine per relay to query, it uses a channel of +// As the DefaultAggregator uses one goroutine per relay to query, it uses a channel of // relayResponse to get back the responses from all the goroutines. type relayResponse struct { // The API URL of the queried relay. @@ -31,26 +23,26 @@ type relayResponse struct { err error } -// The AgateDataAggregator is an implementation of the DataAggregator which uses multiple +// The DefaultAggregator is an implementation of the Aggregator which uses multiple // RelayAPIClient (one per relay) to aggregate relay data. -type AgateDataAggregator struct { - relayAPIClients []RelayAPIClient +type DefaultAggregator struct { + relayAPIClients []client.RelayAPI } -// NewAgateDataAggregator creates an empty and non-initialized AgateDataAggregator. -func NewAgateDataAggregator() *AgateDataAggregator { - return &AgateDataAggregator{ - relayAPIClients: make([]RelayAPIClient, 0), +// NewDefaultAggregator creates an empty and non-initialized DefaultAggregator. +func NewDefaultAggregator() *DefaultAggregator { + return &DefaultAggregator{ + relayAPIClients: make([]client.RelayAPI, 0), } } -// Init initializes an AgateDataAggregator service given multiple clients able to collect data +// Init initializes an DefaultAggregator service given multiple clients able to collect data // from relays. -func (aggregator *AgateDataAggregator) Init(relayAPIClients ...RelayAPIClient) { +func (aggregator *DefaultAggregator) Init(relayAPIClients ...client.RelayAPI) { aggregator.relayAPIClients = relayAPIClients } -// AggregateDataForSlotFromRelays implements DataAggregator.AggregateDataForSlotFromRelays. +// AggregateDataForSlotFromRelays implements Aggregator.AggregateDataForSlotFromRelays. // // It uses a map/reduce procedure to distribute each request made to the relays to distinct // workers (the map phase). @@ -59,7 +51,7 @@ func (aggregator *AgateDataAggregator) Init(relayAPIClients ...RelayAPIClient) { // // In the scenario where part of the data collection has failed, the aggregator still returns the // successfully collected data. -func (aggregator *AgateDataAggregator) AggregateDataForSlotFromRelays( +func (aggregator *DefaultAggregator) AggregateDataForSlotFromRelays( ctx context.Context, slot phase0.Slot, ) (*common.AggregatedRelayData, error) { @@ -72,7 +64,7 @@ func (aggregator *AgateDataAggregator) AggregateDataForSlotFromRelays( for _, relayAPIClient := range aggregator.relayAPIClients { wg.Add(1) - go func(relayAPIClient RelayAPIClient, slot phase0.Slot) { + go func(relayAPIClient client.RelayAPI, slot phase0.Slot) { var ( relayDataResult = new(common.RelayData) relayDataError error = nil @@ -95,7 +87,7 @@ func (aggregator *AgateDataAggregator) AggregateDataForSlotFromRelays( close(relayResponses) - err := NewAgateDataAggregationError() + err := NewDefaultDataAggregationError() for relayRes := range relayResponses { if relayRes.err != nil { diff --git a/indexer/data_aggregator/data_aggregator_errors.go b/indexer/data/default_aggregator_errors.go similarity index 61% rename from indexer/data_aggregator/data_aggregator_errors.go rename to indexer/data/default_aggregator_errors.go index 8cda3f7..2e6c06d 100644 --- a/indexer/data_aggregator/data_aggregator_errors.go +++ b/indexer/data/default_aggregator_errors.go @@ -1,4 +1,4 @@ -package data_aggregator +package data import ( "fmt" @@ -6,9 +6,9 @@ import ( "github.com/attestantio/go-eth2-client/spec/phase0" ) -// AgateDataAggregationError is raised if at least one of the request made to collect data from a +// DefaultDataAggregationError is raised if at least one of the request made to collect data from a // relay has encountered an error. -type AgateDataAggregationError struct { +type DefaultDataAggregationError struct { // The slot used as a parameter in the request made to the relay. Slot phase0.Slot // A 1:1 mapping between a relay API URL and the error that has been encountered when making @@ -16,9 +16,9 @@ type AgateDataAggregationError struct { RelayErrors map[string]error } -// NewAgateDataAggregationError creates an empty and non-initialized AgateDataAggregationError. -func NewAgateDataAggregationError() *AgateDataAggregationError { - return &AgateDataAggregationError{ +// NewDefaultDataAggregationError creates an empty and non-initialized DefaultDataAggregationError. +func NewDefaultDataAggregationError() *DefaultDataAggregationError { + return &DefaultDataAggregationError{ Slot: 0, RelayErrors: make(map[string]error), } @@ -27,11 +27,11 @@ func NewAgateDataAggregationError() *AgateDataAggregationError { // RecordFailure is used during the reduce phase of the aggregation process of the // AgateDataAggregator service. // It saves a record of an error that has been encountered when collecting data from a relay. -func (err *AgateDataAggregationError) RecordFailure(relayAPIURL string, relayError error) { +func (err *DefaultDataAggregationError) RecordFailure(relayAPIURL string, relayError error) { err.RelayErrors[relayAPIURL] = relayError } -func (err *AgateDataAggregationError) Error() string { +func (err *DefaultDataAggregationError) Error() string { return fmt.Sprintf( "data aggregator encountered the following error(s) for slot %d: %v", err.Slot, diff --git a/indexer/data_aggregator/data_aggregator_test.go b/indexer/data/default_aggregator_test.go similarity index 86% rename from indexer/data_aggregator/data_aggregator_test.go rename to indexer/data/default_aggregator_test.go index a943359..5b8e03d 100644 --- a/indexer/data_aggregator/data_aggregator_test.go +++ b/indexer/data/default_aggregator_test.go @@ -1,4 +1,4 @@ -package data_aggregator +package data import ( "context" @@ -6,43 +6,44 @@ import ( "testing" "github.com/attestantio/go-eth2-client/spec/phase0" + "github.com/quartz-technology/agate/indexer/client" "github.com/quartz-technology/agate/indexer/common" "github.com/quartz-technology/agate/internal/mocks" datav1 "github.com/quartz-technology/redax-go/sdk/data/v1" "github.com/stretchr/testify/require" ) -func TestNewAgateDataAggregator(t *testing.T) { +func TestNewDefaultAggregator(t *testing.T) { t.Parallel() - aggregator := NewAgateDataAggregator() + aggregator := NewDefaultAggregator() require.NotNil(t, aggregator) require.NotNil(t, aggregator.relayAPIClients) require.Len(t, aggregator.relayAPIClients, 0) } -func TestAgateDataAggregator_Init(t *testing.T) { +func TestDefaultAggregator_Init(t *testing.T) { t.Parallel() - aggregator := NewAgateDataAggregator() + aggregator := NewDefaultAggregator() aggregator.Init() require.Nil(t, aggregator.relayAPIClients) require.Len(t, aggregator.relayAPIClients, 0) } -func TestAgateDataAggregator_AggregateDataForSlotFromRelays(t *testing.T) { +func TestDefaultAggregator_AggregateDataForSlotFromRelays(t *testing.T) { t.Parallel() testCases := map[string]struct { - relayAPIClient []RelayAPIClient + relayAPIClient []client.RelayAPI slot phase0.Slot expectedAggregatedData *common.AggregatedRelayData expectedAggregationError error }{ "should aggregate data with errors": { - relayAPIClient: []RelayAPIClient{ + relayAPIClient: []client.RelayAPI{ mocks.NewMockRelayAPIClient( func(ctx context.Context, slot phase0.Slot) (*common.RelayData, error) { return &common.RelayData{ @@ -70,7 +71,7 @@ func TestAgateDataAggregator_AggregateDataForSlotFromRelays(t *testing.T) { BidsDelivered: make([]*datav1.BidDelivered, 0), }, }, - expectedAggregationError: &AgateDataAggregationError{ + expectedAggregationError: &DefaultDataAggregationError{ Slot: 42, RelayErrors: map[string]error{ "https://mock.failing.relay.com": errors.New("mocked error"), @@ -78,7 +79,7 @@ func TestAgateDataAggregator_AggregateDataForSlotFromRelays(t *testing.T) { }, }, "should aggregate data without errors": { - relayAPIClient: []RelayAPIClient{ + relayAPIClient: []client.RelayAPI{ mocks.NewMockRelayAPIClient( func(ctx context.Context, slot phase0.Slot) (*common.RelayData, error) { return &common.RelayData{ @@ -122,7 +123,7 @@ func TestAgateDataAggregator_AggregateDataForSlotFromRelays(t *testing.T) { t.Run(name, func(t *testing.T) { t.Parallel() - aggregator := NewAgateDataAggregator() + aggregator := NewDefaultAggregator() aggregator.Init(tc.relayAPIClient...) aggregatedRelayData, err := aggregator.AggregateDataForSlotFromRelays(context.Background(), tc.slot) diff --git a/indexer/data_preprocessor/data_preprocessor.go b/indexer/data/preprocessor.go similarity index 74% rename from indexer/data_preprocessor/data_preprocessor.go rename to indexer/data/preprocessor.go index 9387715..8abf866 100644 --- a/indexer/data_preprocessor/data_preprocessor.go +++ b/indexer/data/preprocessor.go @@ -1,27 +1,27 @@ -package data_preprocessor +package data import ( "github.com/attestantio/go-eth2-client/spec/phase0" "github.com/quartz-technology/agate/indexer/common" - "github.com/quartz-technology/agate/indexer/storage_manager/store/dto" + "github.com/quartz-technology/agate/indexer/storage/store/dto" ) -// The DataPreprocessor is used to transform the raw data acquired by the data aggregator +// The Preprocessor is used to transform the raw data acquired by the data aggregator // service, which is then used by the storage manager to save it in a database. -type DataPreprocessor struct{} +type Preprocessor struct{} -// NewDataPreprocessor creates an empty and non-initialized DataPreprocessor. -func NewDataPreprocessor() *DataPreprocessor { - return &DataPreprocessor{} +// NewPreprocessor creates an empty and non-initialized Preprocessor. +func NewPreprocessor() *Preprocessor { + return &Preprocessor{} } -// Preprocess transforms the data_aggregator.DataAggregator's aggregation output into a data -// structure that the storage_manager.StorageManager service can store in the database. +// Preprocess transforms the Aggregator's aggregation output into a data +// structure that the storage.Manager service can store in the database. // //nolint:funlen -func (preprocessor *DataPreprocessor) Preprocess( +func (preprocessor *Preprocessor) Preprocess( data *common.AggregatedRelayData, -) *DataPreprocessorOutput { +) *common.DataPreprocessorOutput { // A mapping between each relay (represented by their URL) and the bids each one of them has // received. bidsPerRelay := make(map[string][]*dto.Bid) @@ -66,12 +66,12 @@ func (preprocessor *DataPreprocessor) Preprocess( } } - res := make([]*PreprocessedRelayData, 0) + res := make([]*common.PreprocessedRelayData, 0) // Uses the two mappings created previously to reduce the input mapping into an array. for _, bidDTOs := range bidsPerRelay { for _, bidDTO := range bidDTOs { - item := new(PreprocessedRelayData) + item := new(common.PreprocessedRelayData) item.Bid = bidDTO item.Submissions = submissionsPerBid[bidDTO] @@ -80,7 +80,7 @@ func (preprocessor *DataPreprocessor) Preprocess( } } - return &DataPreprocessorOutput{ + return &common.DataPreprocessorOutput{ Output: res, } } diff --git a/indexer/data_preprocessor/data_preprocessor_test.go b/indexer/data/preprocessor_test.go similarity index 78% rename from indexer/data_preprocessor/data_preprocessor_test.go rename to indexer/data/preprocessor_test.go index 83ae998..5158d82 100644 --- a/indexer/data_preprocessor/data_preprocessor_test.go +++ b/indexer/data/preprocessor_test.go @@ -1,30 +1,30 @@ //nolint:exhaustruct -package data_preprocessor +package data import ( "testing" "github.com/attestantio/go-builder-client/api/v1" "github.com/quartz-technology/agate/indexer/common" - "github.com/quartz-technology/agate/indexer/storage_manager/store/dto" + "github.com/quartz-technology/agate/indexer/storage/store/dto" datav1 "github.com/quartz-technology/redax-go/sdk/data/v1" "github.com/stretchr/testify/require" ) -func TestNewDefaultDataPreprocessor(t *testing.T) { +func TestNewPreprocessor(t *testing.T) { t.Parallel() - preprocessor := NewDataPreprocessor() + preprocessor := NewPreprocessor() require.NotNil(t, preprocessor) } -func TestDefaultDataPreprocessor_Preprocess(t *testing.T) { +func TestPreprocessor_Preprocess(t *testing.T) { t.Parallel() testCases := map[string]struct { input *common.AggregatedRelayData - output *DataPreprocessorOutput + output *common.DataPreprocessorOutput }{ "should preprocess non-delivered bid": { input: &common.AggregatedRelayData{ @@ -38,8 +38,8 @@ func TestDefaultDataPreprocessor_Preprocess(t *testing.T) { BidsDelivered: []*datav1.BidDelivered{}, }, }, - output: &DataPreprocessorOutput{ - Output: []*PreprocessedRelayData{ + output: &common.DataPreprocessorOutput{ + Output: []*common.PreprocessedRelayData{ { Bid: &dto.Bid{}, Submissions: []*dto.Submission{ @@ -68,8 +68,8 @@ func TestDefaultDataPreprocessor_Preprocess(t *testing.T) { }, }, }, - output: &DataPreprocessorOutput{ - Output: []*PreprocessedRelayData{ + output: &common.DataPreprocessorOutput{ + Output: []*common.PreprocessedRelayData{ { Bid: &dto.Bid{}, Submissions: []*dto.Submission{ @@ -90,7 +90,7 @@ func TestDefaultDataPreprocessor_Preprocess(t *testing.T) { t.Run(name, func(t *testing.T) { t.Parallel() - preprocessor := NewDataPreprocessor() + preprocessor := NewPreprocessor() data := preprocessor.Preprocess(tc.input) require.Equal(t, tc.output, data) diff --git a/indexer/data_aggregator/relay_api_client_errors.go b/indexer/data_aggregator/relay_api_client_errors.go deleted file mode 100644 index fcc3245..0000000 --- a/indexer/data_aggregator/relay_api_client_errors.go +++ /dev/null @@ -1,21 +0,0 @@ -package data_aggregator - -import "fmt" - -// NewAgateRelayAPIClientSDKInitializationError is raised when the AgateRelayAPIClient -// initialization has failed because the SDK could not be initialized. -func NewAgateRelayAPIClientSDKInitializationError(err error) error { - return fmt.Errorf("failed to initialize relay SDK: %w", err) -} - -// NewAgateRelayAPIClientBidsReceivedRetrievalError is raised if the request made to the relay to -// get the bids received for a specific slot has failed. -func NewAgateRelayAPIClientBidsReceivedRetrievalError(err error) error { - return fmt.Errorf("failed to retrieve bids received by relay: %w", err) -} - -// NewAgateRelayAPIClientBidsDeliveredRetrievalError is raised if the request made to the relay to -// get the bids delivered for a specific slot has failed. -func NewAgateRelayAPIClientBidsDeliveredRetrievalError(err error) error { - return fmt.Errorf("failed to retrieve bids delivered by relay: %w", err) -} diff --git a/indexer/events/default_head_listener.go b/indexer/events/default_head_listener.go new file mode 100644 index 0000000..1dbe9c7 --- /dev/null +++ b/indexer/events/default_head_listener.go @@ -0,0 +1,45 @@ +package events + +import ( + "context" + + v1 "github.com/attestantio/go-eth2-client/api/v1" + "github.com/quartz-technology/agate/indexer/client" +) + +// DefaultHeadListener is the implementation of the HeadListener for agate, +// using the client.BeaconAPI as a way to listen for new heads. +type DefaultHeadListener struct { + beaconAPIClient client.BeaconAPI +} + +// NewDefaultHeadListener creates an empty and non-initialized DefaultHeadListener. +func NewDefaultHeadListener() *DefaultHeadListener { + return &DefaultHeadListener{ + beaconAPIClient: nil, + } +} + +// Init initializes a new DefaultHeadListener using a client.BeaconAPI to listen to new head events. +func (listener *DefaultHeadListener) Init(beaconAPIClient client.BeaconAPI) { + listener.beaconAPIClient = beaconAPIClient +} + +func (listener *DefaultHeadListener) Listen( + ctx context.Context, + headEvents chan<- *v1.HeadEvent, +) error { + if err := listener.beaconAPIClient.SubscribeToHeadEvents(ctx, func(event *v1.Event) { + headEvent, ok := event.Data.(*v1.HeadEvent) + if !ok { + // TODO: Log error. + return + } + + headEvents <- headEvent + }); err != nil { + return NewDefaultHeadListenerSubscriptionError(err) + } + + return nil +} diff --git a/indexer/events/default_head_listener_errors.go b/indexer/events/default_head_listener_errors.go new file mode 100644 index 0000000..229ccc3 --- /dev/null +++ b/indexer/events/default_head_listener_errors.go @@ -0,0 +1,9 @@ +package events + +import "fmt" + +// NewDefaultHeadListenerSubscriptionError is raised when the DefaultHeadListener fails to subscribe +// to new head events via its client.BeaconAPI. +func NewDefaultHeadListenerSubscriptionError(err error) error { + return fmt.Errorf("default head listener failed to subscribe to head event: %w", err) +} diff --git a/indexer/head_listener/head_listener_test.go b/indexer/events/default_head_listener_test.go similarity index 88% rename from indexer/head_listener/head_listener_test.go rename to indexer/events/default_head_listener_test.go index 170f229..675e5ef 100644 --- a/indexer/head_listener/head_listener_test.go +++ b/indexer/events/default_head_listener_test.go @@ -1,5 +1,5 @@ //nolint:exhaustruct -package head_listener +package events import ( "context" @@ -11,19 +11,19 @@ import ( "github.com/stretchr/testify/require" ) -func TestNewAgateHeadListener(t *testing.T) { +func TestNewDefaultHeadListener(t *testing.T) { t.Parallel() - listener := NewAgateHeadListener() + listener := NewDefaultHeadListener() require.NotNil(t, listener) require.Nil(t, listener.beaconAPIClient) } -func TestAgateHeadListener_Init(t *testing.T) { +func TestDefaultHeadListener_Init(t *testing.T) { t.Parallel() - listener := NewAgateHeadListener() + listener := NewDefaultHeadListener() mockBeaconAPIClient := mocks.NewMockBeaconAPIClient(nil).WithDefaultInternalImplementations() listener.Init(mockBeaconAPIClient) @@ -31,7 +31,7 @@ func TestAgateHeadListener_Init(t *testing.T) { require.NotNil(t, listener.beaconAPIClient) } -func TestAgateHeadListener_Listen(t *testing.T) { +func TestDefaultHeadListener_Listen(t *testing.T) { t.Parallel() testCases := map[string]struct { @@ -92,7 +92,7 @@ func TestAgateHeadListener_Listen(t *testing.T) { t.Parallel() ctx := context.Background() - listener := NewAgateHeadListener() + listener := NewDefaultHeadListener() mockBeaconAPIClient := mocks.NewMockBeaconAPIClient( func(ctx context.Context, handlerFunc client.EventHandlerFunc) error { for _, mockEvent := range tc.mockEvents { diff --git a/indexer/events/head_listener.go b/indexer/events/head_listener.go new file mode 100644 index 0000000..e41cd30 --- /dev/null +++ b/indexer/events/head_listener.go @@ -0,0 +1,14 @@ +package events + +import ( + "context" + + v1 "github.com/attestantio/go-eth2-client/api/v1" +) + +// HeadListener is used to listen to head events on the Ethereum beacon chain. +type HeadListener interface { + // Listen starts the listening process in the background and populates the headEvents channel + // with new head events as they arrive. + Listen(ctx context.Context, headEvents chan<- *v1.HeadEvent) error +} diff --git a/indexer/head_listener/beacon_api_client.go b/indexer/head_listener/beacon_api_client.go deleted file mode 100644 index efbc03c..0000000 --- a/indexer/head_listener/beacon_api_client.go +++ /dev/null @@ -1,61 +0,0 @@ -package head_listener - -import ( - "context" - - client "github.com/attestantio/go-eth2-client" - "github.com/attestantio/go-eth2-client/http" - "github.com/rs/zerolog" -) - -// BeaconAPIClient is used to interact with a beacon node. -type BeaconAPIClient interface { - // SubscribeToHeadEvents is used to create a subscription to new head events and will perform - // the handler's logic. - SubscribeToHeadEvents(ctx context.Context, handler client.EventHandlerFunc) error -} - -// AgateBeaconAPIClient is the implementation of the BeaconAPIClient for agate, -// using the go-eth2-client library to interact with the beacon node. -type AgateBeaconAPIClient struct { - clt *http.Service -} - -// NewAgateBeaconAPIClient creates a new and non-initialized AgateBeaconAPIClient. -func NewAgateBeaconAPIClient() *AgateBeaconAPIClient { - return &AgateBeaconAPIClient{ - clt: nil, - } -} - -// Init initializes an AgateBeaconAPIClient using the beacon API URL to connect to the beacon node. -func (client *AgateBeaconAPIClient) Init(ctx context.Context, beaconAPIURL string) error { - var ok bool - - clt, err := http.New( - ctx, - http.WithAddress(beaconAPIURL), - http.WithLogLevel(zerolog.Disabled), - ) - if err != nil { - return NewBeaconAPIClientInitializationError(err) - } - - client.clt, ok = clt.(*http.Service) - if !ok { - return ErrBeaconAPIClientTypeAssertionError - } - - return nil -} - -func (client *AgateBeaconAPIClient) SubscribeToHeadEvents( - ctx context.Context, - handler client.EventHandlerFunc, -) error { - if err := client.clt.Events(ctx, []string{"head"}, handler); err != nil { - return NewEventSubscriptionError(err) - } - - return nil -} diff --git a/indexer/head_listener/beacon_api_client_errors.go b/indexer/head_listener/beacon_api_client_errors.go deleted file mode 100644 index e7a5901..0000000 --- a/indexer/head_listener/beacon_api_client_errors.go +++ /dev/null @@ -1,20 +0,0 @@ -package head_listener - -import ( - "errors" - "fmt" -) - -var ErrBeaconAPIClientTypeAssertionError = errors.New("failed to type assert beacon API HTTP client") - -// NewBeaconAPIClientInitializationError is raised if the client used to connect to the beacon -// node fails to initialize. -func NewBeaconAPIClientInitializationError(err error) error { - return fmt.Errorf("failed to initialize beacon API client service: %w", err) -} - -// NewEventSubscriptionError is raised if the client connected to the node fails to create an event -// subscription. -func NewEventSubscriptionError(err error) error { - return fmt.Errorf("failed to create event subscription: %w", err) -} diff --git a/indexer/head_listener/head_listener.go b/indexer/head_listener/head_listener.go deleted file mode 100644 index b72be33..0000000 --- a/indexer/head_listener/head_listener.go +++ /dev/null @@ -1,51 +0,0 @@ -package head_listener - -import ( - "context" - - v1 "github.com/attestantio/go-eth2-client/api/v1" -) - -// HeadListener is used to listen to head events on the Ethereum beacon chain. -type HeadListener interface { - // Listen starts the listening process in the background and populates the headEvents channel - // with new head events as they arrive. - Listen(ctx context.Context, headEvents chan<- *v1.HeadEvent) error -} - -// AgateHeadListener is the implementation of the HeadListener for agate, -// using the BeaconAPIClient as a way to listen for new heads. -type AgateHeadListener struct { - beaconAPIClient BeaconAPIClient -} - -// NewAgateHeadListener creates an empty and non-initialized AgateHeadListener. -func NewAgateHeadListener() *AgateHeadListener { - return &AgateHeadListener{ - beaconAPIClient: nil, - } -} - -// Init initializes a new AgateHeadListener using a BeaconAPIClient to listen to new head events. -func (listener *AgateHeadListener) Init(beaconAPIClient BeaconAPIClient) { - listener.beaconAPIClient = beaconAPIClient -} - -func (listener *AgateHeadListener) Listen( - ctx context.Context, - headEvents chan<- *v1.HeadEvent, -) error { - if err := listener.beaconAPIClient.SubscribeToHeadEvents(ctx, func(event *v1.Event) { - headEvent, ok := event.Data.(*v1.HeadEvent) - if !ok { - // TODO: Log error. - return - } - - headEvents <- headEvent - }); err != nil { - return NewHeadEventSubscriptionError(err) - } - - return nil -} diff --git a/indexer/head_listener/head_listener_errors.go b/indexer/head_listener/head_listener_errors.go deleted file mode 100644 index e1ee5b3..0000000 --- a/indexer/head_listener/head_listener_errors.go +++ /dev/null @@ -1,9 +0,0 @@ -package head_listener - -import "fmt" - -// NewHeadEventSubscriptionError is raised when the HeadListener fails to subscribe to new head -// events via its BeaconAPIClient. -func NewHeadEventSubscriptionError(err error) error { - return fmt.Errorf("failed to subscribe to head event: %w", err) -} diff --git a/indexer/indexer.go b/indexer/indexer.go index 0aeb78f..c07b04a 100644 --- a/indexer/indexer.go +++ b/indexer/indexer.go @@ -4,30 +4,29 @@ import ( "context" v1 "github.com/attestantio/go-eth2-client/api/v1" - "github.com/quartz-technology/agate/indexer/data_aggregator" - "github.com/quartz-technology/agate/indexer/data_preprocessor" - "github.com/quartz-technology/agate/indexer/head_listener" - "github.com/quartz-technology/agate/indexer/storage_manager" + "github.com/quartz-technology/agate/indexer/data" + "github.com/quartz-technology/agate/indexer/events" + "github.com/quartz-technology/agate/indexer/storage" "github.com/rs/zerolog/log" ) type Indexer struct { - listener head_listener.HeadListener - aggregator data_aggregator.DataAggregator - preprocessor *data_preprocessor.DataPreprocessor - storage storage_manager.StorageManager + listener events.HeadListener + aggregator data.Aggregator + preprocessor *data.Preprocessor + storageManager storage.Manager } -func NewIndexer(listener head_listener.HeadListener, - aggregator data_aggregator.DataAggregator, - preprocessor *data_preprocessor.DataPreprocessor, - storage storage_manager.StorageManager, +func New(listener events.HeadListener, + aggregator data.Aggregator, + preprocessor *data.Preprocessor, + storage storage.Manager, ) *Indexer { return &Indexer{ - listener: listener, - aggregator: aggregator, - preprocessor: preprocessor, - storage: storage, + listener: listener, + aggregator: aggregator, + preprocessor: preprocessor, + storageManager: storage, } } @@ -58,7 +57,7 @@ func (indexer *Indexer) Start(ctx context.Context) error { log.Info().Msg("aggregated relay data successfully preprocessed") - if err := indexer.storage.StoreAggregatedRelayData( + if err := indexer.storageManager.StoreAggregatedRelayData( ctx, preprocessedAggregatedRelayData, ); err != nil { @@ -71,5 +70,5 @@ func (indexer *Indexer) Start(ctx context.Context) error { } func (indexer *Indexer) Stop() { - indexer.storage.Shutdown() + indexer.storageManager.Shutdown() } diff --git a/indexer/indexer_errors.go b/indexer/indexer_errors.go index b53f139..fa41e0f 100644 --- a/indexer/indexer_errors.go +++ b/indexer/indexer_errors.go @@ -3,5 +3,5 @@ package indexer import "fmt" func NewIndexerListenerError(err error) error { - return fmt.Errorf("failed to listen to head events: %w", err) + return fmt.Errorf("indexer failed to start listening to head events: %w", err) } diff --git a/indexer/storage_manager/database_migrator.go b/indexer/storage/database_migrator.go similarity index 90% rename from indexer/storage_manager/database_migrator.go rename to indexer/storage/database_migrator.go index 8306ea0..975a84c 100644 --- a/indexer/storage_manager/database_migrator.go +++ b/indexer/storage/database_migrator.go @@ -1,5 +1,5 @@ //nolint:stylecheck -package storage_manager +package storage // The DatabaseMigrator is used to apply the database migrations, ideally at the indexer's startup. type DatabaseMigrator interface { diff --git a/indexer/storage_manager/default_database_migrator.go b/indexer/storage/default_database_migrator.go similarity index 97% rename from indexer/storage_manager/default_database_migrator.go rename to indexer/storage/default_database_migrator.go index 9b26ed7..b89767e 100644 --- a/indexer/storage_manager/default_database_migrator.go +++ b/indexer/storage/default_database_migrator.go @@ -1,4 +1,4 @@ -package storage_manager +package storage import ( "errors" diff --git a/indexer/storage_manager/default_database_migrator_errors.go b/indexer/storage/default_database_migrator_errors.go similarity index 69% rename from indexer/storage_manager/default_database_migrator_errors.go rename to indexer/storage/default_database_migrator_errors.go index f400955..6716f33 100644 --- a/indexer/storage_manager/default_database_migrator_errors.go +++ b/indexer/storage/default_database_migrator_errors.go @@ -1,15 +1,15 @@ -package storage_manager +package storage import "fmt" // NewDefaultDatabaseMigratorInitializationError is raised if the DefaultDatabaseMigrator encounters // an error during the initialization process. func NewDefaultDatabaseMigratorInitializationError(err error) error { - return fmt.Errorf("failed to initialize agate database migrator: %w", err) + return fmt.Errorf("failed to initialize default agate database migrator: %w", err) } // NewDefaultDatabaseMigratorMigrationError is raised when the DefaultDatabaseMigrator fails to // apply the database migrations. func NewDefaultDatabaseMigratorMigrationError(err error) error { - return fmt.Errorf("agate database migrator failed to apply migrations: %w", err) + return fmt.Errorf("default agate database migrator failed to apply migrations: %w", err) } diff --git a/indexer/storage_manager/default_storage_manager.go b/indexer/storage/default_manager.go similarity index 53% rename from indexer/storage_manager/default_storage_manager.go rename to indexer/storage/default_manager.go index 56e1fa4..fbe39e6 100644 --- a/indexer/storage_manager/default_storage_manager.go +++ b/indexer/storage/default_manager.go @@ -1,33 +1,32 @@ -package storage_manager +package storage import ( "context" - "github.com/quartz-technology/agate/indexer/data_preprocessor" - "github.com/quartz-technology/agate/indexer/storage_manager/store" - "github.com/quartz-technology/agate/indexer/storage_manager/store/dto" - "github.com/quartz-technology/agate/indexer/storage_manager/store/models" + "github.com/quartz-technology/agate/indexer/common" + "github.com/quartz-technology/agate/indexer/storage/store" + "github.com/quartz-technology/agate/indexer/storage/store/dto" + "github.com/quartz-technology/agate/indexer/storage/store/models" ) -// The DefaultStorageManager is an implementation of the StoreManager using the default -// preprocessed aggregated relay data format. -type DefaultStorageManager struct { +// The DefaultManager is an implementation of the Manager using the default store. +type DefaultManager struct { store store.Store } -// NewDefaultStorageManager creates an empty and non-initialized DefaultStorageManager. -func NewDefaultStorageManager() *DefaultStorageManager { - return &DefaultStorageManager{ +// NewDefaultManager creates an empty and non-initialized DefaultManager. +func NewDefaultManager() *DefaultManager { + return &DefaultManager{ store: nil, } } -// Init initializes a DefaultStorageManager given the store used to perform database operations. -func (manager *DefaultStorageManager) Init(store store.Store) { +// Init initializes a DefaultManager given the store used to perform database operations. +func (manager *DefaultManager) Init(store store.Store) { manager.store = store } -func (manager *DefaultStorageManager) StoreRelays( +func (manager *DefaultManager) StoreRelays( ctx context.Context, data []*dto.Relay, ) error { @@ -45,25 +44,25 @@ func (manager *DefaultStorageManager) StoreRelays( err := manager.store.ExecInTx(ctx, func(store store.Store) error { if err := store.CreateRelays(ctx, relays); err != nil { - return NewDefaultStorageManagerRelaysCreationError(err) + return NewDefaultManagerRelaysCreationError(err) } return nil }) if err != nil { - return NewDefaultStorageManagerTransactionedRelaysCreationError(err) + return NewDefaultManagerTransactionedRelaysCreationError(err) } return nil } -func (manager *DefaultStorageManager) StoreAggregatedRelayData( +func (manager *DefaultManager) StoreAggregatedRelayData( ctx context.Context, - data *data_preprocessor.DataPreprocessorOutput, + data *common.DataPreprocessorOutput, ) error { relays, err := manager.store.ListRelays(ctx) if err != nil { - return NewDefaultStorageManagerRelaysListingError(err) + return NewDefaultManagerRelaysListingError(err) } relayURLsToIDs := make(map[string]uint64) @@ -96,22 +95,22 @@ func (manager *DefaultStorageManager) StoreAggregatedRelayData( err = manager.store.ExecInTx(ctx, func(store store.Store) error { if err := store.CreateBids(ctx, bids); err != nil { - return NewDefaultStorageManagerBidsCreationError(err) + return NewDefaultManagerBidsCreationError(err) } if err := store.CreateSubmissions(ctx, submissions); err != nil { - return NewDefaultStorageManagerSubmissionsCreationError(err) + return NewDefaultManagerSubmissionsCreationError(err) } return nil }) if err != nil { - return NewDefaultStorageManagerTransactionedBidsSubmissionsCreationError(err) + return NewDefaultManagerTransactionedBidsSubmissionsCreationError(err) } return nil } -func (manager *DefaultStorageManager) Shutdown() { +func (manager *DefaultManager) Shutdown() { manager.store.Close() } diff --git a/indexer/storage/default_manager_errors.go b/indexer/storage/default_manager_errors.go new file mode 100644 index 0000000..80aca31 --- /dev/null +++ b/indexer/storage/default_manager_errors.go @@ -0,0 +1,52 @@ +package storage + +import "fmt" + +// NewDefaultManagerRelaysCreationError is raised if the DefaultManager encounters an +// error while creating a relay entity in the database. +func NewDefaultManagerRelaysCreationError(err error) error { + return fmt.Errorf("default agate storage manager failed to create relays: %w", err) +} + +// NewDefaultManagerTransactionedRelaysCreationError is raised if the DefaultManager +// encounters an error while executing the transaction responsible for storing multiple relays in +// the database. +func NewDefaultManagerTransactionedRelaysCreationError(err error) error { + return fmt.Errorf( + "default agate storage manager failed to execute transaction which stores relays: %w", + err, + ) +} + +// NewDefaultManagerBidsCreationError is raised if the DefaultManager encounters an +// error while creating a list of bid entity in the database. +func NewDefaultManagerBidsCreationError(err error) error { + return fmt.Errorf("default agate storage manager failed to create bids: %w", err) +} + +// NewDefaultManagerSubmissionsCreationError is raised if the DefaultManager encounters an +// error while creating a list of submission entity in the database. +func NewDefaultManagerSubmissionsCreationError(err error) error { + return fmt.Errorf( + "default agate storage manager failed to create bids submissions: %w", + err, + ) +} + +// NewDefaultManagerTransactionedBidsSubmissionsCreationError is raised if the +// DefaultManager encounters an error while executing the transaction responsible for storing +// multiple bids and their submissions in the database. +func NewDefaultManagerTransactionedBidsSubmissionsCreationError(err error) error { + return fmt.Errorf( + "default agate storage manager failed to execute transaction which stores bids and"+ + " their respective submissions: %w", + err, + ) +} + +func NewDefaultManagerRelaysListingError(err error) error { + return fmt.Errorf( + "default agate storage manager failed to list relays prior to storing aggregated relay data: %w", + err, + ) +} diff --git a/indexer/storage/default_manager_test.go b/indexer/storage/default_manager_test.go new file mode 100644 index 0000000..74a1ff3 --- /dev/null +++ b/indexer/storage/default_manager_test.go @@ -0,0 +1,31 @@ +package storage + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestNewDefaultManager(t *testing.T) { + t.Parallel() + + manager := NewDefaultManager() + + require.NotNil(t, manager) + require.Nil(t, manager.store) +} + +func TestDefaultManager_Init(t *testing.T) { + // TODO. + t.Parallel() +} + +func TestDefaultManager_StoreRelays(t *testing.T) { + // TODO. + t.Parallel() +} + +func TestDefaultManager_StoreAggregatedRelayData(t *testing.T) { + // TODO. + t.Parallel() +} diff --git a/indexer/storage/manager.go b/indexer/storage/manager.go new file mode 100644 index 0000000..2378d48 --- /dev/null +++ b/indexer/storage/manager.go @@ -0,0 +1,19 @@ +package storage + +import ( + "context" + + "github.com/quartz-technology/agate/indexer/common" + "github.com/quartz-technology/agate/indexer/storage/store/dto" +) + +// The Manager is used to store preprocessed aggregated relay data into a database. +type Manager interface { + // StoreRelays is used to store the relay entities at Agate's startup. + StoreRelays(ctx context.Context, relays []*dto.Relay) error + // StoreAggregatedRelayData is used to store the preprocessed aggregated relay data once + // received by the data aggregator. + StoreAggregatedRelayData(ctx context.Context, data *common.DataPreprocessorOutput) error + // Shutdown stops the manager sub-services. + Shutdown() +} diff --git a/indexer/storage_manager/store/dto/bid.go b/indexer/storage/store/dto/bid.go similarity index 100% rename from indexer/storage_manager/store/dto/bid.go rename to indexer/storage/store/dto/bid.go diff --git a/indexer/storage_manager/store/dto/relay.go b/indexer/storage/store/dto/relay.go similarity index 100% rename from indexer/storage_manager/store/dto/relay.go rename to indexer/storage/store/dto/relay.go diff --git a/indexer/storage_manager/store/dto/submission.go b/indexer/storage/store/dto/submission.go similarity index 100% rename from indexer/storage_manager/store/dto/submission.go rename to indexer/storage/store/dto/submission.go diff --git a/indexer/storage_manager/store/models/bid.go b/indexer/storage/store/models/bid.go similarity index 91% rename from indexer/storage_manager/store/models/bid.go rename to indexer/storage/store/models/bid.go index 7bb2562..8726740 100644 --- a/indexer/storage_manager/store/models/bid.go +++ b/indexer/storage/store/models/bid.go @@ -4,7 +4,7 @@ import ( "github.com/attestantio/go-eth2-client/spec/bellatrix" "github.com/attestantio/go-eth2-client/spec/phase0" "github.com/holiman/uint256" - "github.com/quartz-technology/agate/indexer/storage_manager/store/dto" + "github.com/quartz-technology/agate/indexer/storage/store/dto" ) type Bid struct { diff --git a/indexer/storage_manager/store/models/relay.go b/indexer/storage/store/models/relay.go similarity index 100% rename from indexer/storage_manager/store/models/relay.go rename to indexer/storage/store/models/relay.go diff --git a/indexer/storage_manager/store/models/submission.go b/indexer/storage/store/models/submission.go similarity index 100% rename from indexer/storage_manager/store/models/submission.go rename to indexer/storage/store/models/submission.go diff --git a/indexer/storage_manager/store/postgres/store.go b/indexer/storage/store/postgres/store.go similarity index 97% rename from indexer/storage_manager/store/postgres/store.go rename to indexer/storage/store/postgres/store.go index 81b0d20..265eb9d 100644 --- a/indexer/storage_manager/store/postgres/store.go +++ b/indexer/storage/store/postgres/store.go @@ -7,8 +7,8 @@ import ( _ "github.com/golang-migrate/migrate/v4/source/file" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" - "github.com/quartz-technology/agate/indexer/storage_manager/store" - "github.com/quartz-technology/agate/indexer/storage_manager/store/models" + "github.com/quartz-technology/agate/indexer/storage/store" + "github.com/quartz-technology/agate/indexer/storage/store/models" ) // The DefaultStore is an implementation of the Store which connects to a Postgres database to diff --git a/indexer/storage_manager/store/postgres/store_errors.go b/indexer/storage/store/postgres/store_errors.go similarity index 100% rename from indexer/storage_manager/store/postgres/store_errors.go rename to indexer/storage/store/postgres/store_errors.go diff --git a/indexer/storage_manager/store/postgres/store_test.go b/indexer/storage/store/postgres/store_test.go similarity index 100% rename from indexer/storage_manager/store/postgres/store_test.go rename to indexer/storage/store/postgres/store_test.go diff --git a/indexer/storage_manager/store/store.go b/indexer/storage/store/store.go similarity index 94% rename from indexer/storage_manager/store/store.go rename to indexer/storage/store/store.go index 7254869..206adb5 100644 --- a/indexer/storage_manager/store/store.go +++ b/indexer/storage/store/store.go @@ -3,7 +3,7 @@ package store import ( "context" - "github.com/quartz-technology/agate/indexer/storage_manager/store/models" + "github.com/quartz-technology/agate/indexer/storage/store/models" ) // Store is used to perform storage operations with a database. diff --git a/indexer/storage_manager/default_database_migrator_test.go b/indexer/storage_manager/default_database_migrator_test.go deleted file mode 100644 index d9ed3ed..0000000 --- a/indexer/storage_manager/default_database_migrator_test.go +++ /dev/null @@ -1,26 +0,0 @@ -package storage_manager - -import ( - "testing" - - "github.com/stretchr/testify/require" -) - -func TestNewDefaultDatabaseMigrator(t *testing.T) { - t.Parallel() - - migrator := NewDefaultDatabaseMigrator() - - require.NotNil(t, migrator) - require.Nil(t, migrator.clt) -} - -func TestDefaultDatabaseMigrator_Init(t *testing.T) { - // TODO. - t.Parallel() -} - -func TestDefaultDatabaseMigrator_Migrate(t *testing.T) { - // TODO. - t.Parallel() -} diff --git a/indexer/storage_manager/default_storage_manager_errors.go b/indexer/storage_manager/default_storage_manager_errors.go deleted file mode 100644 index 2b68192..0000000 --- a/indexer/storage_manager/default_storage_manager_errors.go +++ /dev/null @@ -1,52 +0,0 @@ -package storage_manager - -import "fmt" - -// NewDefaultStorageManagerRelaysCreationError is raised if the DefaultStorageManager encounters an -// error while creating a relay entity in the database. -func NewDefaultStorageManagerRelaysCreationError(err error) error { - return fmt.Errorf("default agate storage manager failed to create relays: %w", err) -} - -// NewDefaultStorageManagerTransactionedRelaysCreationError is raised if the DefaultStorageManager -// encounters an error while executing the transaction responsible for storing multiple relays in -// the database. -func NewDefaultStorageManagerTransactionedRelaysCreationError(err error) error { - return fmt.Errorf( - "default agate storage manager failed to execute transaction which stores relays: %w", - err, - ) -} - -// NewDefaultStorageManagerBidsCreationError is raised if the DefaultStorageManager encounters an -// error while creating a list of bid entity in the database. -func NewDefaultStorageManagerBidsCreationError(err error) error { - return fmt.Errorf("default agate storage manager failed to create bids: %w", err) -} - -// NewDefaultStorageManagerSubmissionsCreationError is raised if the DefaultStorageManager encounters an -// error while creating a list of submission entity in the database. -func NewDefaultStorageManagerSubmissionsCreationError(err error) error { - return fmt.Errorf( - "default agate storage manager failed to create bids submissions: %w", - err, - ) -} - -// NewDefaultStorageManagerTransactionedBidsSubmissionsCreationError is raised if the -// DefaultStorageManager encounters an error while executing the transaction responsible for storing -// multiple bids and their submissions in the database. -func NewDefaultStorageManagerTransactionedBidsSubmissionsCreationError(err error) error { - return fmt.Errorf( - "default agate storage manager failed to execute transaction which stores bids and"+ - " their respective submissions: %w", - err, - ) -} - -func NewDefaultStorageManagerRelaysListingError(err error) error { - return fmt.Errorf( - "default agate store manager failed to list relays prior to storing aggregated relay data: %w", - err, - ) -} diff --git a/indexer/storage_manager/default_storage_manager_test.go b/indexer/storage_manager/default_storage_manager_test.go deleted file mode 100644 index bd70399..0000000 --- a/indexer/storage_manager/default_storage_manager_test.go +++ /dev/null @@ -1,31 +0,0 @@ -package storage_manager - -import ( - "testing" - - "github.com/stretchr/testify/require" -) - -func TestNewDefaultStorageManager(t *testing.T) { - t.Parallel() - - manager := NewDefaultStorageManager() - - require.NotNil(t, manager) - require.Nil(t, manager.store) -} - -func TestDefaultStorageManager_Init(t *testing.T) { - // TODO. - t.Parallel() -} - -func TestDefaultStorageManager_StoreRelays(t *testing.T) { - // TODO. - t.Parallel() -} - -func TestDefaultStorageManager_StoreAggregatedRelayData(t *testing.T) { - // TODO. - t.Parallel() -} diff --git a/indexer/storage_manager/storage_manager.go b/indexer/storage_manager/storage_manager.go deleted file mode 100644 index df26e7c..0000000 --- a/indexer/storage_manager/storage_manager.go +++ /dev/null @@ -1,22 +0,0 @@ -package storage_manager - -import ( - "context" - - "github.com/quartz-technology/agate/indexer/data_preprocessor" - "github.com/quartz-technology/agate/indexer/storage_manager/store/dto" -) - -// The StorageManager is used to store preprocessed aggregated relay data into a database. -type StorageManager interface { - // StoreRelays is used to store the relay entities at Agate's startup. - StoreRelays(ctx context.Context, relays []*dto.Relay) error - // StoreAggregatedRelayData is used to store the preprocessed aggregated relay data once - // received by the data aggregator. - StoreAggregatedRelayData( - ctx context.Context, - data *data_preprocessor.DataPreprocessorOutput, - ) error - // Shutdown stops the manager sub-services. - Shutdown() -}