Skip to content

Commit

Permalink
Create stream channel before performing master election (#27)
Browse files Browse the repository at this point in the history
* Create stream channel before performing master election

* fix
  • Loading branch information
adibrastegarnia authored Feb 17, 2023
1 parent 5463af0 commit 0ec4f4b
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 26 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.1.8-dev
1.1.8
4 changes: 2 additions & 2 deletions pkg/p4rtclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ func (c *client) ClientConn() *grpc.ClientConn {
return c.grpcClient
}

func (c *client) PerformMasterArbitration(role *p4api.Role) (*p4api.StreamMessageResponse_Arbitration, error) {
resp, err := c.streamClient.PerformMasterArbitration(role)
func (c *client) PerformMasterArbitration(ctx context.Context, role *p4api.Role) (*p4api.StreamMessageResponse_Arbitration, error) {
resp, err := c.streamClient.PerformMasterArbitration(ctx, role)
return resp, errors.FromGRPC(err)
}

Expand Down
6 changes: 0 additions & 6 deletions pkg/p4rtclient/conn_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,12 +162,6 @@ func (m *connManager) Connect(ctx context.Context, destination *Destination) (Cl
}

m.targets[targetID] = p4rtClient
streamChannel, err := p4rtClient.p4runtimeClient.StreamChannel(context.Background())
if err != nil {
log.Errorw("Cannot open a p4rt stream for connection", "targetID", destination.TargetID, "error", err)
return nil, err
}
p4rtClient.streamClient.streamChannel = streamChannel
go func() {
var conn Conn
state := clientConn.GetState()
Expand Down
4 changes: 2 additions & 2 deletions pkg/p4rtclient/conn_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,12 +381,12 @@ func TestClient_SetMasterArbitration(t *testing.T) {
assert.NoError(t, err)

role := p4utils.NewStratumRole("test1", 0, []byte{}, false, true)
resp, err := conn1.PerformMasterArbitration(role)
resp, err := conn1.PerformMasterArbitration(ctx, role)
assert.NoError(t, err)
assert.Equal(t, "test1", resp.Arbitration.Role.Name)

role2 := p4utils.NewStratumRole("test2", 0, []byte{}, false, true)
resp, err = conn2.PerformMasterArbitration(role2)
resp, err = conn2.PerformMasterArbitration(ctx, role2)
assert.Equal(t, "test2", resp.Arbitration.Role.Name)

assert.NoError(t, err)
Expand Down
34 changes: 19 additions & 15 deletions pkg/p4rtclient/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,28 @@
package p4rtclient

import (
"github.com/onosproject/onos-lib-go/pkg/errors"
"context"
"github.com/onosproject/onos-net-lib/pkg/p4utils"
p4api "github.com/p4lang/p4runtime/go/p4/v1"
)

// StreamClient p4runtime master stream client
type StreamClient interface {
PerformMasterArbitration(role *p4api.Role) (*p4api.StreamMessageResponse_Arbitration, error)
PerformMasterArbitration(ctx context.Context, role *p4api.Role) (*p4api.StreamMessageResponse_Arbitration, error)
}

type streamClient struct {
p4runtimeClient p4api.P4RuntimeClient
streamChannel p4api.P4Runtime_StreamChannelClient
deviceID uint64
}

func (s *streamClient) PerformMasterArbitration(role *p4api.Role) (*p4api.StreamMessageResponse_Arbitration, error) {
func (s *streamClient) PerformMasterArbitration(ctx context.Context, role *p4api.Role) (*p4api.StreamMessageResponse_Arbitration, error) {
electionID := p4utils.TimeBasedElectionID()
channel, err := s.p4runtimeClient.StreamChannel(ctx)
if err != nil {
return nil, err
}

request := &p4api.StreamMessageRequest{
Update: &p4api.StreamMessageRequest_Arbitration{Arbitration: &p4api.MasterArbitrationUpdate{
DeviceId: s.deviceID,
Expand All @@ -31,26 +35,26 @@ func (s *streamClient) PerformMasterArbitration(role *p4api.Role) (*p4api.Stream
}},
}
log.Infow("Sending master arbitration request", "request", request)
err := s.streamChannel.Send(request)
err = channel.Send(request)
if err != nil {
return nil, err
}

in, err := s.streamChannel.Recv()
if err != nil {
return nil, err
}

switch v := in.Update.(type) {
case *p4api.StreamMessageResponse_Arbitration:
log.Infow("Received arbitration response", "response", v)
for {
in, err := channel.Recv()
if err != nil {
return nil, err
}
return v, nil

switch v := in.Update.(type) {
case *p4api.StreamMessageResponse_Arbitration:
log.Infow("Received arbitration response", "response", v)
if err != nil {
return nil, err
}
return v, nil
}
}
return nil, errors.NewNotSupported("not an arbitration response message")

}

Expand Down

0 comments on commit 0ec4f4b

Please sign in to comment.