Skip to content

Commit

Permalink
feat(core): add create bridge core function logic
Browse files Browse the repository at this point in the history
  • Loading branch information
shivanshkc committed Jun 22, 2022
1 parent 7f1883a commit d7cd8d5
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 6 deletions.
5 changes: 4 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@ func main() {
// Prerequisites.
ctx, conf, log := context.Background(), configs.Get(), logger.Get()

// Providing the discovery address to core.
// Providing the discovery address to the core.
core.OwnDiscoveryAddr = conf.HTTPServer.DiscoveryAddr
// Providing the required dependencies to the core.
core.BridgeManager = nil
core.BridgeDatabase = nil

// Startup log.
log.Info(ctx, &logger.Entry{Payload: fmt.Sprintf("server listening at: %s", conf.HTTPServer.Addr)})
Expand Down
23 changes: 23 additions & 0 deletions src/core/const.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package core

// Message types.
const (
// messageIncomingReq is the type for an incoming message request.
// messageIncomingReq string = "INCOMING_MESSAGE_REQ"
// messageOutgoingReq is the type for an outgoing message request.
// messageOutgoingReq string = "OUTGOING_MESSAGE_REQ"
// messageOutgoingRes is the type for an outgoing message response.
// messageOutgoingRes string = "OUTGOING_MESSAGE_RES"
// messageErrorRes is the type for all error messages.
messageErrorRes string = "ERROR_RES"
)

// Persistence modes.
const (
// persistTrue always persists the message.
// persistTrue = "true"
// persistFalse never persists the message. If the receiver is offline, the message is lost forever.
// persistFalse = "false"
// persistIfError persists the message only if there's an error while sending the message.
// persistIfError = "if_error"
)
52 changes: 52 additions & 0 deletions src/core/dep_bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package core

import (
"context"
"net/http"
)

// Bridge represents a connection between the client and a Rosenbridge node.
Expand All @@ -23,6 +24,28 @@ type Bridge interface {
SetErrorHandler(handler func(err error))
}

// bridgeManager provides CRUD operations on all bridges that this Rosenbridge node is keeping.
type bridgeManager interface {
// CreateBridge creates a new bridge and makes it available for other CRUD operations.
CreateBridge(ctx context.Context, params *BridgeManagerCreateParams) (Bridge, error)
// GetBridge fetches the bridge that matches the provided identity. It returns nil if the bridge is not found.
GetBridge(ctx context.Context, identity *BridgeIdentity) Bridge
// DeleteBridge disconnects and deletes the specified bridge.
DeleteBridge(ctx context.Context, identity *BridgeIdentity)
}

// bridgeDatabase provides access to the database of all bridges that the whole cluster is keeping.
type bridgeDatabase interface {
// InsertBridge inserts a new bridge document into the database.
InsertBridge(ctx context.Context, doc *BridgeDatabaseDoc) error
// GetBridgesForClients gets all bridges that belong to any of the provided clients.
GetBridgesForClients(ctx context.Context, clientIDs []string) ([]*BridgeDatabaseDoc, error)
// DeleteBridgeForNode deletes the specified bridge for the specified node.
DeleteBridgeForNode(ctx context.Context, bridge *BridgeIdentity, nodeAddr string) error
// DeleteBridgesForNode deletes all specified bridges for the specified node.
DeleteBridgesForNode(ctx context.Context, bridges []*BridgeIdentity, nodeAddr string) error
}

// BridgeIdentity is the information required to uniquely identify a bridge.
type BridgeIdentity struct {
// ClientID is the ID of the client to which the bridge belongs.
Expand All @@ -47,3 +70,32 @@ type BridgeStatus struct {
*BridgeIdentity
*CodeAndReason
}

// BridgeManagerCreateParams are the params required by the CreateBridge method of the bridgeManager.
type BridgeManagerCreateParams struct {
*BridgeIdentity

// Writer is required to upgrade the connection to websocket (if the websocket protocol is being used).
Writer http.ResponseWriter
// Request is required to upgrade the connection to websocket (if the websocket protocol is being used).
Request *http.Request

// BridgeLimitTotal is the max number of bridges allowed. It is optional.
BridgeLimitTotal *int
// BridgeLimitPerClient is the max number of bridges allowed per client. It is optional.
BridgeLimitPerClient *int
}

// BridgeDatabaseDoc is the schema for the document of the bridge in the database.
type BridgeDatabaseDoc struct {
// ClientID is the ID of the client to which the bridge belongs.
ClientID string `json:"client_id" bson:"client_id"`
// BridgeID is unique for all bridges for a given client.
// But two bridges, belonging to two different clients may have the same BridgeID.
BridgeID string `json:"bridge_id" bson:"bridge_id"`

// NodeAddr is the address of the node hosting the connection.
NodeAddr string `json:"node_addr" bson:"node_addr"`
// ConnectedAt is the time at which connection was established.
ConnectedAt int64 `json:"connected_at" bson:"connected_at"`
}
87 changes: 86 additions & 1 deletion src/core/func_create_bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,23 @@ package core

import (
"context"
"fmt"
"net/http"
"time"

"github.com/shivanshkc/rosenbridge/src/utils/errutils"

"github.com/google/uuid"
)

// OwnDiscoveryAddr is the address of this node that other nodes in the cluster can use to reach it.
var OwnDiscoveryAddr string

var (
BridgeManager bridgeManager
BridgeDatabase bridgeDatabase
)

// CreateBridgeParams are the params required by the CreateBridge function.
type CreateBridgeParams struct {
// ClientID is the ID of the client who is requesting a new bridge.
Expand All @@ -17,9 +28,83 @@ type CreateBridgeParams struct {
Writer http.ResponseWriter
// Request is required to upgrade the connection to websocket (if the websocket protocol is being used).
Request *http.Request

// BridgeLimitTotal is the max number of bridges allowed. It is optional.
BridgeLimitTotal *int
// BridgeLimitPerClient is the max number of bridges allowed per client. It is optional.
BridgeLimitPerClient *int
}

// CreateBridge is the core functionality to create a new bridge.
func CreateBridge(ctx context.Context, params *CreateBridgeParams) (Bridge, error) {
panic("implement me")
// Generating a new bridge identity.
bridgeIdentity := &BridgeIdentity{ClientID: params.ClientID, BridgeID: uuid.NewString()}

// This bridge doc will be stored in the database.
bridgeDoc := &BridgeDatabaseDoc{
ClientID: bridgeIdentity.ClientID,
BridgeID: bridgeIdentity.BridgeID,
NodeAddr: OwnDiscoveryAddr,
ConnectedAt: time.Now().Unix(),
}

// Inserting the bridge into the database.
if err := BridgeDatabase.InsertBridge(ctx, bridgeDoc); err != nil {
return nil, fmt.Errorf("error in BridgeDatabase.InsertBridge call: %w", err)
}

// Notice that we put the bridge document in the database before actually creating the bridge.
// That's because the system is designed to handle dangling database entries, but not dangling bridges.
//
// In other words, if a bridge does not exist, but its database entry does, then the system will identify
// and clean it up automatically, but on the other hand, if a bridge exists but its database entry does not,
// then that is a fatal situation.

// This input will be required to create a new bridge.
bridgeCreateInput := &BridgeManagerCreateParams{
BridgeIdentity: bridgeIdentity,
Writer: params.Writer,
Request: params.Request,
BridgeLimitTotal: params.BridgeLimitTotal,
BridgeLimitPerClient: params.BridgeLimitPerClient,
}

// Creating a new bridge.
bridge, err := BridgeManager.CreateBridge(ctx, bridgeCreateInput)
if err != nil {
// If the bridge creation fails, we asynchronously attempt to remove the earlier created db record.
// Even if this request fails, the system will eventually identify the stale record and remove it.
go func() { _ = BridgeDatabase.DeleteBridgeForNode(ctx, bridgeIdentity, OwnDiscoveryAddr) }()

return nil, fmt.Errorf("error in BridgeManager.CreateBridge call: %w", err)
}

// It is the core's responsibility to handle bridge closures.
bridge.SetCloseHandler(func(err error) {
ctx := context.Background()
// Removing the bridge from the bridge manager.
BridgeManager.DeleteBridge(ctx, bridgeIdentity)
// Removing the bridge entry from the database.
// TODO: Log the error without importing the src/logger dependency.
_ = BridgeDatabase.DeleteBridgeForNode(ctx, bridgeIdentity, OwnDiscoveryAddr)
})

// It is the core's responsibility to handle bridge errors.
bridge.SetErrorHandler(func(err error) {
ctx := context.Background()
// Converting the error to HTTP error to get the code and reason.
errHTTP := errutils.ToHTTPError(err)
// Forming the bridge message to before sending to client.
bridgeMessage := &BridgeMessage{
Type: messageErrorRes,
// RequestID is not known.
RequestID: "",
Body: &CodeAndReason{errHTTP.Code, errHTTP.Reason},
}
// Letting the client know of the error.
// TODO: Log the error without importing the src/logger dependency.
_ = bridge.SendMessage(ctx, bridgeMessage)
})

return bridge, nil
}
4 changes: 0 additions & 4 deletions src/core/func_create_bridge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,6 @@ func TestCreateBridge_BridgeCloseHandler(t *testing.T) {
t.Parallel()
}

func TestCreateBridge_BridgeDatabase_DeleteBridgeError(t *testing.T) {
t.Parallel()
}

func TestCreateBridge_BridgeErrorHandler(t *testing.T) {
t.Parallel()
}

0 comments on commit d7cd8d5

Please sign in to comment.