Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(source): implement write key validation #146

Merged
merged 1 commit into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 2 additions & 36 deletions adapter/scylladb/scyllainitialize/init.go
Original file line number Diff line number Diff line change
@@ -1,43 +1,9 @@
/*
Package initializedb provides functions for initializing a ScyllaDB connection and obtaining a database session.

Usage:

func main() {
// Create a new ScyllaDB connection instance
connection := initializedb.NewScyllaDBConnection(gocql.Quorum, "example_keyspace", "127.0.0.1")

// Get a ScyllaDB session using the created connection
session, err := initializedb.GetConnection(connection)
if err != nil {
log.Fatal("Failed to get ScyllaDB session:", err)
}

// Use the 'session' for database operations

// Close the session when done
defer session.Close()
}

This package includes functions for creating a new ScyllaDB connection and obtaining a ScyllaDB session.
It utilizes the gocql library for interacting with ScyllaDB.

Functions:

- NewScyllaDBConnection: Creates and returns a new instance of the 'scyllaDBConnection' type with the specified connection parameters.
func NewScyllaDBConnection(consistency gocql.Consistency, keyspace string, hosts ...string) *scyllaDBConnection

- GetConnection: Returns a ScyllaDB session using the provided 'scyllaDBConnection' instance.
It internally creates a ScyllaDB cluster configuration and session.
func GetConnection(conn *scyllaDBConnection) (scylladb.SessionxInterface, error)
*/
package scyllainitialize

import (
"github.com/gocql/gocql"
"github.com/ormushq/ormus/adapter/scylladb"
"github.com/ormushq/ormus/logger"
scyllaMigrate "github.com/ormushq/ormus/source/repository/scylladb/migrate"
)

func NewScyllaDBConnection(consistency gocql.Consistency, keyspace string, hosts ...string) *ScyllaDBConnection {
Expand Down Expand Up @@ -70,8 +36,8 @@ func CreateKeySpace(consistency gocql.Consistency, keyspace string, hosts ...str
func RunMigrations(dbConn *ScyllaDBConnection, dir string) error {
logger.L().Debug("running migrations...")
for _, host := range dbConn.hosts {
manager := scyllaMigrate.New(dir, host, dbConn.keyspace)
err := manager.Run()
migration := New(dir, host, dbConn.keyspace)
err := migration.Run()
if err != nil {
return err
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package migrate
package scyllainitialize

import (
"errors"
Expand Down
27 changes: 19 additions & 8 deletions cmd/manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"github.com/ormushq/ormus/config"
"github.com/ormushq/ormus/logger"
"github.com/ormushq/ormus/manager"
"github.com/ormushq/ormus/manager/delivery/grpcserver"
grpcsourcehandler "github.com/ormushq/ormus/manager/delivery/grpcserver/sourcehandler"
"github.com/ormushq/ormus/manager/delivery/httpserver"
"github.com/ormushq/ormus/manager/delivery/httpserver/projecthandler"
"github.com/ormushq/ormus/manager/delivery/httpserver/sourcehandler"
Expand Down Expand Up @@ -51,14 +53,19 @@ func main() {
logger.L().Debug(fmt.Sprintf("%+v", cfg))
logger.L().Debug(fmt.Sprintf("%+v", cfg.ScyllaDBConfig))

svcs := setupServices(wg, done, cfg)
httpSvcs, grpcSvcs := setupServices(wg, done, cfg)

server := httpserver.New(cfg, svcs)
go func() {
server := httpserver.New(cfg, httpSvcs)

server.Server()
server.Server()
}()

grpcServer := grpcserver.New(grpcSvcs, cfg)
grpcServer.Server()
}

func setupServices(wg *sync.WaitGroup, done <-chan bool, cfg manager.Config) httpserver.SetupServices {
func setupServices(wg *sync.WaitGroup, done <-chan bool, cfg manager.Config) (httpHandler httpserver.SetupServices, grpcHandler grpcserver.SetupServices) {
internalBroker := simplechannel.New(done, wg)
err := internalBroker.NewChannel(managerparam.CreateDefaultProject, channel.BothMode,
cfg.InternalBrokerConfig.ChannelSize, cfg.InternalBrokerConfig.MaxRetryPolicy)
Expand All @@ -82,6 +89,8 @@ func setupServices(wg *sync.WaitGroup, done <-chan bool, cfg manager.Config) htt
sourceSvc := sourceservice.New(sourceRepo, sourceValidator, projectSvc)
sourceHandler := sourcehandler.New(authSvc, sourceSvc)

writeKeyHandler := grpcsourcehandler.New(sourceSvc)

userRepo := scyllauser.New(scylla)
userValidator := uservalidator.New(userRepo)
userSvc := userservice.New(authSvc, userRepo, internalBroker, userValidator)
Expand All @@ -90,8 +99,10 @@ func setupServices(wg *sync.WaitGroup, done <-chan bool, cfg manager.Config) htt
workers.New(projectSvc, internalBroker).Run(done, wg)

return httpserver.SetupServices{
UserHandler: userHand,
ProjectHandler: projectHandler,
SourceHandler: sourceHandler,
}
UserHandler: userHand,
ProjectHandler: projectHandler,
SourceHandler: sourceHandler,
}, grpcserver.SetupServices{
WriteKeyValidationHandler: *writeKeyHandler,
}
}
11 changes: 7 additions & 4 deletions cmd/source/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/ormushq/ormus/logger"
"github.com/ormushq/ormus/pkg/channel"
"github.com/ormushq/ormus/pkg/channel/adapter/rabbitmqchannel"
"github.com/ormushq/ormus/source/adapter/manager"
"github.com/ormushq/ormus/source/delivery/httpserver"
"github.com/ormushq/ormus/source/delivery/httpserver/eventhandler"
"github.com/ormushq/ormus/source/delivery/httpserver/statushandler"
Expand Down Expand Up @@ -117,23 +118,25 @@ func SetupSourceServices(cfg config.Config) (writeKeySvc writekey.Service, event
panic(err)
}

adapter, err := redis.New(cfg.Redis)
redisAdapter, err := redis.New(cfg.Redis)
if err != nil {
panic(err)
}

writeKeyRepo := writekeyrepo.New(adapter)
ManagerAdapter := manager.New(cfg.Source)

writeKeyRepo := writekeyrepo.New(redisAdapter, *ManagerAdapter)
writeKeySvc = writekey.New(&writeKeyRepo, cfg.Source)
eventHandler = *sourceevent.New(outputAdapter, writeKeySvc)

DB, err := scylladb.New(cfg.Scylladb)
DB, err := scylladb.New(cfg.Source.ScyllaDBConfig)
if err != nil {
panic(err)
}
eventRepo := eventrepo.New(DB)
eventSvc = *eventsvc.New(eventRepo)

eventValidator = eventvalidator.New(&writeKeyRepo)
eventValidator = eventvalidator.New(&writeKeyRepo, cfg.Source)

return writeKeySvc, eventHandler, eventSvc, eventValidator
}
15 changes: 13 additions & 2 deletions config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ type: yml

source:
http_server:
port: 8080
port: 8082
network: "tcp"
write_key_validation_address: "127.0.0.1:8081"
write_key_expiration: 120
new_source_event_name: "new-source-event"
buffersize: 100
Expand All @@ -15,6 +16,15 @@ source:
enable_metric_expose: true
metric_expose_port: 8081
metric_expose_path: "metrics"
scylla_db_config:
hosts:
- 127.0.0.1:9042
consistency: 4
keyspace: "source"
timeout_cluster: 5s
num_retries: 10
min_retry_delay: 1s
max_retry_delay: 10s
scylladb:
hosts:
- 127.0.0.1:9042
Expand Down Expand Up @@ -83,7 +93,8 @@ swagger:

manager:
application:
port: 8080
http_port: 8080
grpc_port: 8081


internal_broker_config:
Expand Down
Loading
Loading