Skip to content

Commit

Permalink
feat(state): add state synchronization service based on Raft consensu…
Browse files Browse the repository at this point in the history
…s algorithm
  • Loading branch information
ffa500 committed Jan 25, 2024
1 parent 00c099f commit 56862e4
Show file tree
Hide file tree
Showing 43 changed files with 6,205 additions and 204 deletions.
3 changes: 1 addition & 2 deletions Taskfile.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ output: interleaved
silent: true

vars:
GO_TEST_FLAGS: -tags=testing -v -count=1 -timeout=1m
GO_TEST_FLAGS: -tags=testing -v -count=1 -timeout=10m

tasks:
default:
Expand Down Expand Up @@ -70,7 +70,6 @@ tasks:
desc: Run all testing packages with logging disabled
cmds:
- go test {{.GO_TEST_FLAGS}} ./apis/grpc/ ./services/... ./config/... ./dda/test/...

test-log:
desc: Run all testing packages with logging enabled
env:
Expand Down
91 changes: 90 additions & 1 deletion apis/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@ import (

"github.com/coatyio/dda/apis"
"github.com/coatyio/dda/apis/grpc/stubs/golang/com"
"github.com/coatyio/dda/apis/grpc/stubs/golang/state"
"github.com/coatyio/dda/apis/grpc/stubs/golang/store"
"github.com/coatyio/dda/config"
"github.com/coatyio/dda/plog"
"github.com/coatyio/dda/services"
comapi "github.com/coatyio/dda/services/com/api"
stateapi "github.com/coatyio/dda/services/state/api"
storeapi "github.com/coatyio/dda/services/store/api"
"github.com/google/uuid"
rpc "google.golang.org/grpc"
Expand All @@ -39,6 +41,8 @@ type grpcServer struct {
comApi comapi.Api
store.UnimplementedStoreServiceServer
storeApi storeapi.Api
state.UnimplementedStateServiceServer
stateApi stateapi.Api
mu sync.RWMutex // protects following fields
srv *rpc.Server
grpcWebServer
Expand All @@ -49,10 +53,11 @@ type grpcServer struct {
// New returns an apis.ApiServer interface that implements an uninitialized gRPC
// server exposing the peripheral DDA services to gRPC and gRPC-Web clients. To
// start the returned server, invoke Open with a gRPC-enabled DDA configuration.
func New(com comapi.Api, store storeapi.Api) apis.ApiServer {
func New(com comapi.Api, store storeapi.Api, state stateapi.Api) apis.ApiServer {
return &grpcServer{
comApi: com,
storeApi: store,
stateApi: state,
actionCallbacks: make(map[string]comapi.ActionCallback),
queryCallbacks: make(map[string]comapi.QueryCallback),
}
Expand Down Expand Up @@ -93,6 +98,7 @@ func (s *grpcServer) Open(cfg *config.Config) error {
s.srv = rpc.NewServer(srvOpts...)
com.RegisterComServiceServer(s.srv, s)
store.RegisterStoreServiceServer(s.srv, s)
state.RegisterStateServiceServer(s.srv, s)

plog.Printf("Open gRPC server listening on address %s...\n", address)

Expand Down Expand Up @@ -645,6 +651,89 @@ func (s *grpcServer) ScanRange(r *store.Range, stream store.StoreService_ScanRan
return err
}

// State API

func (s *grpcServer) ProposeInput(ctx context.Context, in *state.Input) (*state.Ack, error) {
if s.stateApi == nil {
return nil, s.serviceDisabledError("state")
}
err := s.stateApi.ProposeInput(ctx, &stateapi.Input{
Op: stateapi.InputOp(in.GetOp()),
Key: in.GetKey(),
Value: in.GetValue(),
})
if err != nil {
err = status.Errorf(s.codeByError(err), "failed: %v", err)
plog.Println(err)
return nil, err
}
return &state.Ack{}, nil
}

func (s *grpcServer) ObserveStateChange(p *state.ObserveStateChangeParams, stream state.StateService_ObserveStateChangeServer) error {
if s.stateApi == nil {
return s.serviceDisabledError("state")
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

ch, err := s.stateApi.ObserveStateChange(ctx)
if err != nil {
err = status.Errorf(s.codeByError(err), "failed observing state changes: %v", err)
plog.Println(err)
return err
}

for {
select {
case in, ok := <-ch:
if !ok {
// End stream if channel has been closed.
return nil
}
if err := stream.Send(&state.Input{Op: state.InputOperation(in.Op), Key: in.Key, Value: in.Value}); err != nil {
// Do not return err, but keep stream alive for further transmissions.
plog.Println(err)
}
case <-stream.Context().Done(): // server streaming call canceled by client
return stream.Context().Err()
}
}
}

func (s *grpcServer) ObserveMembershipChange(p *state.ObserveMembershipChangeParams, stream state.StateService_ObserveMembershipChangeServer) error {
if s.stateApi == nil {
return s.serviceDisabledError("state")
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

ch, err := s.stateApi.ObserveMembershipChange(ctx)
if err != nil {
err = status.Errorf(s.codeByError(err), "failed observing membership changes: %v", err)
plog.Println(err)
return err
}

for {
select {
case mc, ok := <-ch:
if !ok {
// End stream if channel has been closed.
return nil
}
if err := stream.Send(&state.MembershipChange{Id: mc.Id, Joined: mc.Joined}); err != nil {
// Do not return err, but keep stream alive for further transmissions.
plog.Println(err)
}
case <-stream.Context().Done(): // server streaming call canceled by client
return stream.Context().Err()
}
}
}

// Utils

func (s *grpcServer) serviceDisabledError(srv string) error {
Expand Down
141 changes: 139 additions & 2 deletions apis/grpc/protos/state.proto
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// SPDX-FileCopyrightText: © 2023 Siemens AG
// SPDX-FileCopyrightText: © 2024 Siemens AG
// SPDX-License-Identifier: MIT

// Service definition of DDA distributed state management API.
// Service definition of DDA distributed state synchronization API.

syntax = "proto3";

Expand All @@ -10,3 +10,140 @@ package dda.state.v1;
option java_package = "io.dda.state.v1";
option java_multiple_files = true;
option java_outer_classname = "DdaStateProto";

// This gRPC service exposes the DDA distributed state synchronization API to
// gRPC clients.
//
// This service enables application components to share distributed state with
// each other by use of an underlying consensus protocol, like Raft or Paxos,
// that guarantees strong consistency.
//
// This service maintains a replicated state in a state synchronization group
// that is formed by all associated DDA agents in a DDA cluster that are
// configured as state group members. State changes can be observed and proposed
// by members of the group. Replicated state is represented as a key-value
// store. Whereas keys are strings, values can be any application-specific
// binary encoding represented by Protobuf scalar value type bytes. You may
// partition the overall state into multiple application-specific use cases by
// providing a unique key prefix for each one.
//
// Replicated state can be modified with method ProposeInput to propose new
// input that should be applied to the replicated state. Applied state changes
// can be observed with method ObserveStateChange.
//
// All members of a state synchronization group can monitor the lifecycle of
// each other by method ObserveMembershipChange emitting new membership change
// information whenever a member is joining or leaving the group.
service StateService {

// Proposes the given input to the state synchronization group. It tries to
// add the input to a log replicated by all members. If the input is accepted
// into the log it counts as committed and is applied to each member's
// key-value store. The resulting state of the key-value store after the
// proposed input has been applied, if it ever gets applied, can be observed
// with method ObserveStateChange.
//
// A majority of members need to give their consent before the given input is
// accepted into the log. This might take indefinitely if no majority can be
// reached, e.g. if too many members have crashed and cannot recover. In this
// case the call never returns unless you specify a deadline/timeout with the
// call.
//
// If the given input cannot be transmitted or if the operation fails due to a
// non-retryable error, a gRPC error with status code UNAVAILABLE (14) is
// signaled.
rpc ProposeInput(Input) returns (Ack);

// Emits new input that has been proposed and applied to the replicated
// key-value store as soon as the change becomes known to the local state
// group member. A state change is triggered whenever a new input of type
// INPUT_OPERATION_SET or INPUT_OPERATION_DELETE is committed. Upon
// invocation, synthetic INPUT_OPERATION_SET state changes are triggered to
// reproduce the current key-value entries of the replicated state.
//
// This server streaming call is automatically ended by this service once its
// local member leaves the group.
//
// To stop receiving state changes, a gRPC client should cancel this server
// streaming call or specify a deadline/timeout with this call from the very
// start.
//
// If the operation fails, a gRPC error with status code UNAVAILABLE (14) is
// signaled and the stream is aborted.
rpc ObserveStateChange(ObserveStateChangeParams) returns (stream Input);

// Emits state member information on every state membership change as soon as
// the update becomes known to the local member. State membership changes are
// triggered whenever a member joins or leaves the state synchronization
// group.
//
// This server streaming call is automatically ended by this service once its
// local member leaves the group.
//
// To stop receiving membership changes, a gRPC client should cancel this
// server streaming call or specify a deadline/timeout with this call from the
// very start.
//
// If the operation fails, a gRPC error with status code UNAVAILABLE (14) is
// signaled and the stream is aborted.
rpc ObserveMembershipChange(ObserveMembershipChangeParams)
returns (stream MembershipChange);
}

// Empty acknowledgement message.
message Ack {
}

// Input represents an operation proposed by method ProposeInput. Input is
// applied to the replicated state represented as a key-value store and finally
// emitted as a state change by method ObserveStateChange.
message Input {
// Operation applied on given key ([INPUT_OPERATION_DELETE]) or key-value pair
// ([INPUT_OPERATION_SET]) (required).
InputOperation op = 1;

// Key on which given operation is applied (required).
//
// If this field is not set, the default key "" is used.
string key = 2;

// Value that is set or changed (required for INPUT_OPERATION_SET operation
// only).
//
// Value is represented as any application-specific binary encoding. If this
// field is not specified but required, the value which is set represents
// empty bytes.
bytes value = 3;
}

// Defines all input operations available on replicated state.
enum InputOperation {
// No operation. This operation should never be specified and will be ignored.
INPUT_OPERATION_UNSPECIFIED = 0;

// Set or change the value of a key.
INPUT_OPERATION_SET = 1;

// Delete a key-value pair.
INPUT_OPERATION_DELETE = 2;
}

// Represents parameters of method ObserveStateChange.
message ObserveStateChangeParams {
}

// Represents parameters of method ObserveMembershipChangeParams.
message ObserveMembershipChangeParams {
}

// Represents information about a single member joining or leaving the state
// synchronization group.
message MembershipChange {
// Unique Id of member usually represented as a UUID v4 (required). This Id
// corresponds with the configured DDA identity id.
string id = 1;

// Determines whether the member has joined (true) or left (false) the state
// synchronization group.
bool joined = 2;
}
Loading

0 comments on commit 56862e4

Please sign in to comment.