Skip to content

Commit

Permalink
feat(source): implement write key validation
Browse files Browse the repository at this point in the history
  • Loading branch information
Ali Shazaei authored and alishazaee committed Dec 12, 2024
1 parent 834ce46 commit 835ed08
Show file tree
Hide file tree
Showing 22 changed files with 589 additions and 229 deletions.
38 changes: 2 additions & 36 deletions adapter/scylladb/scyllainitialize/init.go
Original file line number Diff line number Diff line change
@@ -1,43 +1,9 @@
/*
Package initializedb provides functions for initializing a ScyllaDB connection and obtaining a database session.
Usage:
func main() {
// Create a new ScyllaDB connection instance
connection := initializedb.NewScyllaDBConnection(gocql.Quorum, "example_keyspace", "127.0.0.1")
// Get a ScyllaDB session using the created connection
session, err := initializedb.GetConnection(connection)
if err != nil {
log.Fatal("Failed to get ScyllaDB session:", err)
}
// Use the 'session' for database operations
// Close the session when done
defer session.Close()
}
This package includes functions for creating a new ScyllaDB connection and obtaining a ScyllaDB session.
It utilizes the gocql library for interacting with ScyllaDB.
Functions:
- NewScyllaDBConnection: Creates and returns a new instance of the 'scyllaDBConnection' type with the specified connection parameters.
func NewScyllaDBConnection(consistency gocql.Consistency, keyspace string, hosts ...string) *scyllaDBConnection
- GetConnection: Returns a ScyllaDB session using the provided 'scyllaDBConnection' instance.
It internally creates a ScyllaDB cluster configuration and session.
func GetConnection(conn *scyllaDBConnection) (scylladb.SessionxInterface, error)
*/
package scyllainitialize

import (
"github.com/gocql/gocql"
"github.com/ormushq/ormus/adapter/scylladb"
"github.com/ormushq/ormus/logger"
scyllaMigrate "github.com/ormushq/ormus/source/repository/scylladb/migrate"
)

func NewScyllaDBConnection(consistency gocql.Consistency, keyspace string, hosts ...string) *ScyllaDBConnection {
Expand Down Expand Up @@ -70,8 +36,8 @@ func CreateKeySpace(consistency gocql.Consistency, keyspace string, hosts ...str
func RunMigrations(dbConn *ScyllaDBConnection, dir string) error {
logger.L().Debug("running migrations...")
for _, host := range dbConn.hosts {
manager := scyllaMigrate.New(dir, host, dbConn.keyspace)
err := manager.Run()
migration := New(dir, host, dbConn.keyspace)
err := migration.Run()
if err != nil {
return err
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package migrate
package scyllainitialize

import (
"errors"
Expand Down
27 changes: 19 additions & 8 deletions cmd/manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package main

import (
"fmt"
"github.com/ormushq/ormus/manager/delivery/grpcserver"
grpcsourcehandler "github.com/ormushq/ormus/manager/delivery/grpcserver/sourcehandler"
"log/slog"
"sync"

Expand Down Expand Up @@ -51,14 +53,19 @@ func main() {
logger.L().Debug(fmt.Sprintf("%+v", cfg))
logger.L().Debug(fmt.Sprintf("%+v", cfg.ScyllaDBConfig))

svcs := setupServices(wg, done, cfg)
httpSvcs, grpcSvcs := setupServices(wg, done, cfg)

server := httpserver.New(cfg, svcs)
go func() {
server := httpserver.New(cfg, httpSvcs)

server.Server()
server.Server()
}()

grpcServer := grpcserver.New(grpcSvcs, cfg)
grpcServer.Server()
}

func setupServices(wg *sync.WaitGroup, done <-chan bool, cfg manager.Config) httpserver.SetupServices {
func setupServices(wg *sync.WaitGroup, done <-chan bool, cfg manager.Config) (httpserver.SetupServices, grpcserver.SetupServices) {
internalBroker := simplechannel.New(done, wg)
err := internalBroker.NewChannel(managerparam.CreateDefaultProject, channel.BothMode,
cfg.InternalBrokerConfig.ChannelSize, cfg.InternalBrokerConfig.MaxRetryPolicy)
Expand All @@ -82,6 +89,8 @@ func setupServices(wg *sync.WaitGroup, done <-chan bool, cfg manager.Config) htt
sourceSvc := sourceservice.New(sourceRepo, sourceValidator, projectSvc)
sourceHandler := sourcehandler.New(authSvc, sourceSvc)

writeKeyHandler := grpcsourcehandler.New(sourceSvc)

userRepo := scyllauser.New(scylla)
userValidator := uservalidator.New(userRepo)
userSvc := userservice.New(authSvc, userRepo, internalBroker, userValidator)
Expand All @@ -90,8 +99,10 @@ func setupServices(wg *sync.WaitGroup, done <-chan bool, cfg manager.Config) htt
workers.New(projectSvc, internalBroker).Run(done, wg)

return httpserver.SetupServices{
UserHandler: userHand,
ProjectHandler: projectHandler,
SourceHandler: sourceHandler,
}
UserHandler: userHand,
ProjectHandler: projectHandler,
SourceHandler: sourceHandler,
}, grpcserver.SetupServices{
WriteKeyValidationHandler: *writeKeyHandler,
}
}
10 changes: 7 additions & 3 deletions cmd/source/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"github.com/ormushq/ormus/source/repository/scylladb"
eventrepo "github.com/ormushq/ormus/source/repository/scylladb/event"
eventsvc "github.com/ormushq/ormus/source/service/event"

"github.com/ormushq/ormus/source/adapter/manager"
"github.com/ormushq/ormus/source/service/writekey"
"github.com/ormushq/ormus/source/validator/eventvalidator/eventvalidator"
)
Expand Down Expand Up @@ -117,16 +119,18 @@ func SetupSourceServices(cfg config.Config) (writeKeySvc writekey.Service, event
panic(err)
}

adapter, err := redis.New(cfg.Redis)
redisAdapter, err := redis.New(cfg.Redis)
if err != nil {
panic(err)
}

writeKeyRepo := writekeyrepo.New(adapter)
ManagerAdapter := manager.New(cfg.Source)

writeKeyRepo := writekeyrepo.New(redisAdapter, *ManagerAdapter)
writeKeySvc = writekey.New(&writeKeyRepo, cfg.Source)
eventHandler = *sourceevent.New(outputAdapter, writeKeySvc)

DB, err := scylladb.New(cfg.Scylladb)
DB, err := scylladb.New(cfg.Source.ScyllaDBConfig)
if err != nil {
panic(err)
}
Expand Down
15 changes: 13 additions & 2 deletions config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ type: yml

source:
http_server:
port: 8080
port: 8082
network: "tcp"
write_key_validation_address: "127.0.0.1:8081"
write_key_expiration: 120
new_source_event_name: "new-source-event"
buffersize: 100
Expand All @@ -15,6 +16,15 @@ source:
enable_metric_expose: true
metric_expose_port: 8081
metric_expose_path: "metrics"
scylla_db_config:
hosts:
- 127.0.0.1:9042
consistency: 4
keyspace: "source"
timeout_cluster: 5s
num_retries: 10
min_retry_delay: 1s
max_retry_delay: 10s
scylladb:
hosts:
- 127.0.0.1:9042
Expand Down Expand Up @@ -83,7 +93,8 @@ swagger:

manager:
application:
port: 8080
http_port: 8080
grpc_port: 8081


internal_broker_config:
Expand Down
Loading

0 comments on commit 835ed08

Please sign in to comment.