Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[be1] add federation result handling #1841

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
187 changes: 161 additions & 26 deletions be1-go/channel/federation/federation.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@ import (
"encoding/hex"
"encoding/json"
"fmt"
"github.com/rs/zerolog"
"golang.org/x/xerrors"
"popstellar/channel"
"popstellar/channel/registry"
"popstellar/crypto"
"popstellar/inbox"
jsonrpc "popstellar/message"
"popstellar/message/answer"
Expand All @@ -22,6 +21,10 @@ import (
"strconv"
"sync"
"time"

"github.com/rs/zerolog"
"go.dedis.ch/kyber/v3/sign/schnorr"
"golang.org/x/xerrors"
)

const (
Expand All @@ -47,8 +50,9 @@ type remoteOrganization struct {
// store the pop tokens of the other lao
// popTokens map[string]struct{}

challenge messagedata.Challenge
socket socket.Socket
challenge messagedata.FederationChallenge
challengeMsg message.Message
socket socket.Socket

state State
sync.Mutex
Expand All @@ -69,7 +73,7 @@ type Channel struct {
remoteOrganizations map[string]*remoteOrganization

// list of challenge requested but not used yet
challenges map[messagedata.Challenge]struct{}
challenges map[messagedata.FederationChallenge]struct{}

sync.Mutex
}
Expand All @@ -88,7 +92,7 @@ func NewChannel(channelID string, hub channel.HubFunctionalities,
log: log,
localOrganizerPk: organizerPk,
remoteOrganizations: make(map[string]*remoteOrganization),
challenges: make(map[messagedata.Challenge]struct{}),
challenges: make(map[messagedata.FederationChallenge]struct{}),
}

newChannel.registry = newChannel.NewFederationRegistry()
Expand Down Expand Up @@ -155,6 +159,7 @@ func (c *Channel) NewFederationRegistry() registry.MessageRegistry {
fedRegistry.Register(messagedata.FederationExpect{}, c.processFederationExpect)
fedRegistry.Register(messagedata.FederationInit{}, c.processFederationInit)
fedRegistry.Register(messagedata.FederationChallenge{}, c.processFederationChallenge)
fedRegistry.Register(messagedata.FederationResult{}, c.processFederationResult)

return fedRegistry
}
Expand Down Expand Up @@ -230,10 +235,7 @@ func (c *Channel) processFederationInit(msg message.Message,
remoteOrg.organizerPk = federationInit.PublicKey
remoteOrg.laoId = federationInit.LaoId
remoteOrg.fedChannel = fmt.Sprintf("/root/%s/federation", federationInit.LaoId)
remoteOrg.challenge = messagedata.Challenge{
Value: federationChallenge.Value,
ValidUntil: federationChallenge.Timestamp,
}
remoteOrg.challenge = federationChallenge

remoteOrg.socket, err = c.hub.ConnectToServerAsClient(federationInit.ServerAddress)
if err != nil {
Expand Down Expand Up @@ -268,6 +270,7 @@ func (c *Channel) processFederationInit(msg message.Message,
}

remoteOrg.socket.Send(buf)
remoteOrg.state = WaitResult

return nil
}
Expand All @@ -293,31 +296,36 @@ func (c *Channel) processFederationExpect(msg message.Message,
if err != nil {
return xerrors.Errorf("failed to unmarshal federationExpect data: %v", err)
}

remoteOrg := c.getRemoteOrganization(federationExpect.PublicKey)
remoteOrg.Lock()
defer remoteOrg.Unlock()

if remoteOrg.state != None {
return answer.NewInternalServerError(invalidStateError, remoteOrg.state, msg)
}
var federationChallenge messagedata.FederationChallenge
err = federationExpect.ChallengeMsg.UnmarshalData(&federationChallenge)
if err != nil {
return xerrors.Errorf("failed to unmarshal federationChallenge data: %v", err)
}

c.Lock()
_, ok = c.challenges[federationExpect.Challenge]
_, ok = c.challenges[federationChallenge]
// always remove the challenge, if present, to avoid challenge reuse
delete(c.challenges, federationExpect.Challenge)
delete(c.challenges, federationChallenge)
c.Unlock()

if !ok {
return answer.NewAccessDeniedError("Invalid challenge %v",
federationExpect.Challenge)
federationChallenge)
}

remoteOrg.state = ExpectConnect
remoteOrg.challenge = federationExpect.Challenge
remoteOrg.challenge = federationChallenge
remoteOrg.organizerPk = federationExpect.PublicKey
remoteOrg.laoId = federationExpect.LaoId
remoteOrg.fedChannel = fmt.Sprintf("/root/%s/federation", federationExpect.LaoId)
remoteOrg.challengeMsg = federationExpect.ChallengeMsg

return nil
}
Expand Down Expand Up @@ -369,8 +377,63 @@ func (c *Channel) processFederationChallenge(msg message.Message,

remoteOrg.state = WaitResult
remoteOrg.socket = s
// Send Federation result to S1
// c.remoteServer.Send(...)

federationResultData := messagedata.FederationResult{
Object: messagedata.FederationObject,
Action: messagedata.FederationActionResult,
Status: "success",
PublicKey: remoteOrg.organizerPk,
ChallengeMsg: remoteOrg.challengeMsg,
}

dataBytes, err := json.Marshal(federationResultData)
if err != nil {
return xerrors.Errorf("failed to marshal federation result message data: %v", err)
}

dataBase64 := base64.URLEncoding.EncodeToString(dataBytes)
signatureBytes, err := c.hub.Sign(dataBytes)
if err != nil {
return xerrors.Errorf("failed to sign federation result message: %v", err)
}
signatureBase64 := base64.URLEncoding.EncodeToString(signatureBytes)

serverPubKey, err := c.hub.GetPubKeyServ().MarshalBinary()
if err != nil {
return xerrors.Errorf("failed to marshal public key of server: %v", err)

}

federationResultMsg := message.Message{
Data: dataBase64,
Sender: base64.URLEncoding.EncodeToString(serverPubKey),
Signature: signatureBase64,
MessageID: messagedata.Hash(dataBase64, signatureBase64),
WitnessSignatures: []message.WitnessSignature{},
}

rpcMessage := method.Publish{
Base: query.Base{
JSONRPCBase: jsonrpc.JSONRPCBase{
JSONRPC: "2.0",
},
Method: "publish",
},

Params: struct {
Channel string `json:"channel"`
Message message.Message `json:"message"`
}{
Channel: remoteOrg.fedChannel,
Message: federationResultMsg,
},
}
buf, err := json.Marshal(&rpcMessage)
if err != nil {
return xerrors.Errorf("failed to marshal publish query: %v", err)
}

remoteOrg.socket.Send(buf)
1florentin marked this conversation as resolved.
Show resolved Hide resolved

return nil
}
Expand Down Expand Up @@ -405,22 +468,17 @@ func (c *Channel) processChallengeRequest(msg message.Message,
}
challengeValue := hex.EncodeToString(randomBytes)
expirationTime := time.Now().Add(time.Minute * 5).Unix()
challenge := messagedata.Challenge{
federationChallenge := messagedata.FederationChallenge{
Object: messagedata.FederationObject,
Action: messagedata.FederationActionChallenge,
Value: challengeValue,
ValidUntil: expirationTime,
}

c.Lock()
c.challenges[challenge] = struct{}{}
c.challenges[federationChallenge] = struct{}{}
c.Unlock()

federationChallenge := messagedata.FederationChallenge{
Object: messagedata.FederationObject,
Action: messagedata.FederationActionChallenge,
Value: challengeValue,
Timestamp: expirationTime,
}

challengeData, err := json.Marshal(federationChallenge)
if err != nil {
return xerrors.Errorf(
Expand Down Expand Up @@ -475,6 +533,83 @@ func (c *Channel) processChallengeRequest(msg message.Message,
return nil
}

func (c *Channel) processFederationResult(msg message.Message,
msgData interface{}, s socket.Socket) error {
_, ok := msgData.(*messagedata.FederationResult)
if !ok {
return xerrors.Errorf("message %v is not a federation#result message",
msgData)
}

var federationResult messagedata.FederationResult

err := msg.UnmarshalData(&federationResult)
if err != nil {
return xerrors.Errorf("failed to unmarshal FederationResult data: %v", err)
}

if federationResult.Status != "success" {
if len(federationResult.Reason) > 0 {
return xerrors.Errorf("failed to establish federated connection: %v", federationResult.Reason)
}
return xerrors.Errorf("failed to establish federated connection")
}

var federationChallenge messagedata.FederationChallenge
err = federationResult.ChallengeMsg.UnmarshalData(&federationChallenge)
if err != nil {
return xerrors.Errorf("failed to unmarshal challenge from FederationResult data: %v", err)
}

challengeDataBytes, err := base64.URLEncoding.DecodeString(federationResult.ChallengeMsg.Data)
if err != nil {
return xerrors.Errorf("failed to decode challenge data in FederationResult: %v", err)

}

challengeSignatureBytes, err := base64.URLEncoding.DecodeString(federationResult.ChallengeMsg.Signature)
if err != nil {
return xerrors.Errorf("failed to decode challenge signature in FederationResult: %v", err)

}

remoteOrg := c.getRemoteOrganization(federationResult.ChallengeMsg.Sender)
remoteOrg.Lock()
defer remoteOrg.Unlock()

if remoteOrg.state != WaitResult {
return answer.NewInternalServerError(invalidStateError, remoteOrg.state, msg)

}

pkBytes, err := base64.URLEncoding.DecodeString(remoteOrg.organizerPk)
if err != nil {
return xerrors.Errorf("failed to decode remote organizers public key: %v", err)

}

remotePk := crypto.Suite.Point()

err = remotePk.UnmarshalBinary(pkBytes)
if err != nil {
return xerrors.Errorf("failed to decode remote organizers public key: %v", err)

}
err = schnorr.Verify(crypto.Suite, remotePk, challengeDataBytes, challengeSignatureBytes)
if err != nil {
return xerrors.Errorf("failed to verify signature on challenge in FederationResult message: %v", err)

}

if c.localOrganizerPk != federationResult.PublicKey {
return xerrors.Errorf("invalid public key contained in FederationResult message")
}

remoteOrg.state = Connected

return nil
}

// getRemoteOrganization get the remoteOrganization for the given organizerPk
// or return a new empty one.
func (c *Channel) getRemoteOrganization(organizerPk string) *remoteOrganization {
Expand Down
Loading
Loading