Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: remove all traces of unused protocol version #11600

Merged
merged 2 commits into from
Feb 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/11600.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
core: The unused protocol_version agent configuration value has been removed.
```
14 changes: 1 addition & 13 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -749,18 +749,6 @@ func (c *Client) secretNodeID() string {
return c.config.Node.SecretID
}

// RPCMajorVersion returns the structs.ApiMajorVersion supported by the
// client.
func (c *Client) RPCMajorVersion() int {
return structs.ApiMajorVersion
}

// RPCMinorVersion returns the structs.ApiMinorVersion supported by the
// client.
func (c *Client) RPCMinorVersion() int {
return structs.ApiMinorVersion
}

// Shutdown is used to tear down the client
func (c *Client) Shutdown() error {
c.shutdownLock.Lock()
Expand Down Expand Up @@ -2773,7 +2761,7 @@ DISCOLOOP:
continue
}
var peers []string
if err := c.connPool.RPC(region, addr, c.RPCMajorVersion(), "Status.Peers", rpcargs, &peers); err != nil {
if err := c.connPool.RPC(region, addr, "Status.Peers", rpcargs, &peers); err != nil {
mErr.Errors = append(mErr.Errors, err)
continue
}
Expand Down
4 changes: 2 additions & 2 deletions client/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ TRY:
}

// Make the request.
rpcErr := c.connPool.RPC(c.Region(), server.Addr, c.RPCMajorVersion(), method, args, reply)
rpcErr := c.connPool.RPC(c.Region(), server.Addr, method, args, reply)

if rpcErr == nil {
c.fireRpcRetryWatcher()
Expand Down Expand Up @@ -427,7 +427,7 @@ func resolveServer(s string) (net.Addr, error) {
// a potential error.
func (c *Client) Ping(srv net.Addr) error {
var reply struct{}
err := c.connPool.RPC(c.Region(), srv, c.RPCMajorVersion(), "Status.Ping", struct{}{}, &reply)
err := c.connPool.RPC(c.Region(), srv, "Status.Ping", struct{}{}, &reply)
return err
}

Expand Down
3 changes: 0 additions & 3 deletions command/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,6 @@ func convertServerConfig(agentConfig *Config) (*nomad.Config, error) {
if agentConfig.Server.DataDir != "" {
conf.DataDir = agentConfig.Server.DataDir
}
if agentConfig.Server.ProtocolVersion != 0 {
conf.ProtocolVersion = uint8(agentConfig.Server.ProtocolVersion)
}
if agentConfig.Server.RaftProtocol != 0 {
conf.RaftConfig.ProtocolVersion = raft.ProtocolVersion(agentConfig.Server.RaftProtocol)
}
Expand Down
6 changes: 6 additions & 0 deletions command/agent/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,12 @@ func (c *Command) IsValidConfig(config, cmdConfig *Config) bool {
}
}

// ProtocolVersion has never been used. Warn if it is set as someone
// has probably made a mistake.
if config.Server.ProtocolVersion != 0 {
c.agent.logger.Warn("Please remove deprecated protocol_version field from config.")
}

return true
}

Expand Down
4 changes: 3 additions & 1 deletion command/agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,9 @@ type ServerConfig struct {

// ProtocolVersion is the protocol version to speak. This must be between
// ProtocolVersionMin and ProtocolVersionMax.
ProtocolVersion int `hcl:"protocol_version"`
//
// Deprecated: This has never been used and will emit a warning if nonzero.
ProtocolVersion int `hcl:"protocol_version" json:"-"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Enhancement idea. Should/could we introduce a deprecated tag that can be introspected to auto-generated deprecation warnings like you had to manually add to command/agent.command.go? I've had a lot of success with that kind of AOP approach on APIs in other projects.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps, but this happens fairly rarely, and I think we may want to handle each deprecation on a case by case basis. For example nomad job run has a whole special Warnings slice where we include deprecation notifications: https://github.com/hashicorp/nomad/blob/v1.2.2/nomad/structs/structs.go#L6514


// RaftProtocol is the Raft protocol version to speak. This must be from [1-3].
RaftProtocol int `hcl:"raft_protocol"`
Expand Down
2 changes: 0 additions & 2 deletions command/agent/config_parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ var basicConfig = &Config{
AuthoritativeRegion: "foobar",
BootstrapExpect: 5,
DataDir: "/tmp/data",
ProtocolVersion: 3,
RaftProtocol: 3,
RaftMultiplier: helper.IntToPtr(4),
NumSchedulers: helper.IntToPtr(2),
Expand Down Expand Up @@ -494,7 +493,6 @@ func TestConfig_Parse(t *testing.T) {
}
actual = oldDefault.Merge(actual)

//panic(fmt.Sprintf("first: %+v \n second: %+v", actual.TLSConfig, tc.Result.TLSConfig))
require.EqualValues(tc.Result, removeHelperAttributes(actual))
})
}
Expand Down
1 change: 0 additions & 1 deletion command/agent/testdata/basic.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ server {
authoritative_region = "foobar"
bootstrap_expect = 5
data_dir = "/tmp/data"
protocol_version = 3
raft_protocol = 3
num_schedulers = 2
enabled_schedulers = ["test"]
Expand Down
1 change: 0 additions & 1 deletion command/agent/testdata/basic.json
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,6 @@
"node_gc_threshold": "12h",
"non_voting_server": true,
"num_schedulers": 2,
"protocol_version": 3,
"raft_protocol": 3,
"raft_multiplier": 4,
"redundancy_zone": "foo",
Expand Down
22 changes: 10 additions & 12 deletions helper/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ type Conn struct {
addr net.Addr
session *yamux.Session
lastUsed time.Time
version int

pool *ConnPool

Expand Down Expand Up @@ -278,7 +277,7 @@ func (p *ConnPool) SetConnListener(l chan<- *Conn) {

// Acquire is used to get a connection that is
// pooled or to return a new connection
func (p *ConnPool) acquire(region string, addr net.Addr, version int) (*Conn, error) {
func (p *ConnPool) acquire(region string, addr net.Addr) (*Conn, error) {
// Check to see if there's a pooled connection available. This is up
// here since it should the vastly more common case than the rest
// of the code here.
Expand All @@ -305,7 +304,7 @@ func (p *ConnPool) acquire(region string, addr net.Addr, version int) (*Conn, er
// If we are the lead thread, make the new connection and then wake
// everybody else up to see if we got it.
if isLeadThread {
c, err := p.getNewConn(region, addr, version)
c, err := p.getNewConn(region, addr)
p.Lock()
delete(p.limiter, addr.String())
close(wait)
Expand Down Expand Up @@ -349,7 +348,7 @@ func (p *ConnPool) acquire(region string, addr net.Addr, version int) (*Conn, er
}

// getNewConn is used to return a new connection
func (p *ConnPool) getNewConn(region string, addr net.Addr, version int) (*Conn, error) {
func (p *ConnPool) getNewConn(region string, addr net.Addr) (*Conn, error) {
// Try to dial the conn
conn, err := net.DialTimeout("tcp", addr.String(), 10*time.Second)
if err != nil {
Expand Down Expand Up @@ -404,7 +403,6 @@ func (p *ConnPool) getNewConn(region string, addr net.Addr, version int) (*Conn,
session: session,
clients: list.New(),
lastUsed: time.Now(),
version: version,
pool: p,
}
return c, nil
Expand All @@ -429,12 +427,12 @@ func (p *ConnPool) clearConn(conn *Conn) {
}
}

// getClient is used to get a usable client for an address and protocol version
func (p *ConnPool) getRPCClient(region string, addr net.Addr, version int) (*Conn, *StreamClient, error) {
// getClient is used to get a usable client for an address
func (p *ConnPool) getRPCClient(region string, addr net.Addr) (*Conn, *StreamClient, error) {
retries := 0
START:
// Try to get a conn first
conn, err := p.acquire(region, addr, version)
conn, err := p.acquire(region, addr)
if err != nil {
return nil, nil, fmt.Errorf("failed to get conn: %v", err)
}
Expand All @@ -457,8 +455,8 @@ START:

// StreamingRPC is used to make an streaming RPC call. Callers must
// close the connection when done.
func (p *ConnPool) StreamingRPC(region string, addr net.Addr, version int) (net.Conn, error) {
conn, err := p.acquire(region, addr, version)
func (p *ConnPool) StreamingRPC(region string, addr net.Addr) (net.Conn, error) {
conn, err := p.acquire(region, addr)
if err != nil {
return nil, fmt.Errorf("failed to get conn: %v", err)
}
Expand All @@ -477,9 +475,9 @@ func (p *ConnPool) StreamingRPC(region string, addr net.Addr, version int) (net.
}

// RPC is used to make an RPC call to a remote host
func (p *ConnPool) RPC(region string, addr net.Addr, version int, method string, args interface{}, reply interface{}) error {
func (p *ConnPool) RPC(region string, addr net.Addr, method string, args interface{}, reply interface{}) error {
// Get a usable client
conn, sc, err := p.getRPCClient(region, addr, version)
conn, sc, err := p.getRPCClient(region, addr)
if err != nil {
return fmt.Errorf("rpc error: %w", err)
}
Expand Down
3 changes: 1 addition & 2 deletions helper/pool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

"github.com/hashicorp/nomad/helper/freeport"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -50,7 +49,7 @@ func TestConnPool_ConnListener(t *testing.T) {
pool.SetConnListener(c)

// Make an RPC
_, err = pool.acquire("test", addr, structs.ApiMajorVersion)
_, err = pool.acquire("test", addr)
require.Nil(err)

// Assert we get a connection.
Expand Down
3 changes: 1 addition & 2 deletions nomad/client_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,7 @@ func (s *Server) serverWithNodeConn(nodeID, region string) (*serverParts, error)

// Make the RPC
var resp structs.NodeConnQueryResponse
err := s.connPool.RPC(s.config.Region, server.Addr, server.MajorVersion,
"Status.HasNodeConn", &req, &resp)
err := s.connPool.RPC(s.config.Region, server.Addr, "Status.HasNodeConn", &req, &resp)
if err != nil {
multierror.Append(&rpcErr, fmt.Errorf("failed querying server %q: %v", server.Addr.String(), err))
continue
Expand Down
35 changes: 0 additions & 35 deletions nomad/config.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package nomad

import (
"fmt"
"io"
"net"
"os"
Expand All @@ -27,23 +26,6 @@ const (
DefaultSerfPort = 4648
)

// These are the protocol versions that Nomad can understand
const (
ProtocolVersionMin uint8 = 1
ProtocolVersionMax = 1
)

// ProtocolVersionMap is the mapping of Nomad protocol versions
// to Serf protocol versions. We mask the Serf protocols using
// our own protocol version.
var protocolVersionMap map[uint8]uint8

func init() {
protocolVersionMap = map[uint8]uint8{
1: 4,
}
}

func DefaultRPCAddr() *net.TCPAddr {
return &net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: 4647}
}
Expand Down Expand Up @@ -93,10 +75,6 @@ type Config struct {
// Logger is the logger used by the server.
Logger log.InterceptLogger

// ProtocolVersion is the protocol version to speak. This must be between
// ProtocolVersionMin and ProtocolVersionMax.
ProtocolVersion uint8

// RPCAddr is the RPC address used by Nomad. This should be reachable
// by the other servers and clients
RPCAddr *net.TCPAddr
Expand Down Expand Up @@ -370,18 +348,6 @@ type Config struct {
DeploymentQueryRateLimit float64
}

// CheckVersion is used to check if the ProtocolVersion is valid
func (c *Config) CheckVersion() error {
if c.ProtocolVersion < ProtocolVersionMin {
return fmt.Errorf("Protocol version '%d' too low. Must be in range: [%d, %d]",
c.ProtocolVersion, ProtocolVersionMin, ProtocolVersionMax)
} else if c.ProtocolVersion > ProtocolVersionMax {
return fmt.Errorf("Protocol version '%d' too high. Must be in range: [%d, %d]",
c.ProtocolVersion, ProtocolVersionMin, ProtocolVersionMax)
}
return nil
}

// DefaultConfig returns the default configuration. Only used as the basis for
// merging agent or test parameters.
func DefaultConfig() *Config {
Expand All @@ -396,7 +362,6 @@ func DefaultConfig() *Config {
Datacenter: DefaultDC,
NodeName: hostname,
NodeID: uuid.Generate(),
ProtocolVersion: ProtocolVersionMax,
RaftConfig: raft.DefaultConfig(),
RaftTimeout: 10 * time.Second,
LogOutput: os.Stderr,
Expand Down
2 changes: 0 additions & 2 deletions nomad/node_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,6 @@ func (n *Node) constructNodeServerInfoResponse(snap *state.StateSnapshot, reply
reply.Servers = append(reply.Servers,
&structs.NodeServerInfo{
RPCAdvertiseAddr: v.RPCAddr.String(),
RPCMajorVersion: int32(v.MajorVersion),
RPCMinorVersion: int32(v.MinorVersion),
Datacenter: v.Datacenter,
})
}
Expand Down
8 changes: 4 additions & 4 deletions nomad/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,7 @@ func (r *rpcHandler) forwardLeader(server *serverParts, method string, args inte
if server == nil {
return structs.ErrNoLeader
}
return r.connPool.RPC(r.config.Region, server.Addr, server.MajorVersion, method, args, reply)
return r.connPool.RPC(r.config.Region, server.Addr, method, args, reply)
}

// forwardServer is used to forward an RPC call to a particular server
Expand All @@ -653,7 +653,7 @@ func (r *rpcHandler) forwardServer(server *serverParts, method string, args inte
if server == nil {
return errors.New("must be given a valid server address")
}
return r.connPool.RPC(r.config.Region, server.Addr, server.MajorVersion, method, args, reply)
return r.connPool.RPC(r.config.Region, server.Addr, method, args, reply)
}

func (r *rpcHandler) findRegionServer(region string) (*serverParts, error) {
Expand All @@ -680,7 +680,7 @@ func (r *rpcHandler) forwardRegion(region, method string, args interface{}, repl

// Forward to remote Nomad
metrics.IncrCounter([]string{"nomad", "rpc", "cross-region", region}, 1)
return r.connPool.RPC(region, server.Addr, server.MajorVersion, method, args, reply)
return r.connPool.RPC(region, server.Addr, method, args, reply)
}

func (r *rpcHandler) getServer(region, serverID string) (*serverParts, error) {
Expand Down Expand Up @@ -708,7 +708,7 @@ func (r *rpcHandler) getServer(region, serverID string) (*serverParts, error) {
// initial handshake, returning the connection or an error. It is the callers
// responsibility to close the connection if there is no returned error.
func (r *rpcHandler) streamingRpc(server *serverParts, method string) (net.Conn, error) {
c, err := r.connPool.StreamingRPC(r.config.Region, server.Addr, server.MajorVersion)
c, err := r.connPool.StreamingRPC(r.config.Region, server.Addr)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion nomad/serf.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func (s *Server) maybeBootstrap() {

// Retry with exponential backoff to get peer status from this server
for attempt := uint(0); attempt < maxPeerRetries; attempt++ {
if err := s.connPool.RPC(s.config.Region, server.Addr, server.MajorVersion,
if err := s.connPool.RPC(s.config.Region, server.Addr,
"Status.Peers", req, &peers); err != nil {
nextRetry := (1 << attempt) * peerRetryBase
s.logger.Error("failed to confirm peer status", "peer", server.Name, "error", err, "retry", nextRetry)
Expand Down
7 changes: 0 additions & 7 deletions nomad/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,10 +291,6 @@ type endpoints struct {
// NewServer is used to construct a new Nomad server from the
// configuration, potentially returning an error
func NewServer(config *Config, consulCatalog consul.CatalogAPI, consulConfigEntries consul.ConfigAPI, consulACLs consul.ACLsAPI) (*Server, error) {
// Check the protocol version
if err := config.CheckVersion(); err != nil {
return nil, err
}

// Create an eval broker
evalBroker, err := NewEvalBroker(
Expand Down Expand Up @@ -1398,8 +1394,6 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string) (
conf.Tags["role"] = "nomad"
conf.Tags["region"] = s.config.Region
conf.Tags["dc"] = s.config.Datacenter
conf.Tags["vsn"] = fmt.Sprintf("%d", structs.ApiMajorVersion)
conf.Tags["mvn"] = fmt.Sprintf("%d", structs.ApiMinorVersion)
conf.Tags["build"] = s.config.Build
conf.Tags["raft_vsn"] = fmt.Sprintf("%d", s.config.RaftConfig.ProtocolVersion)
conf.Tags["id"] = s.config.NodeID
Expand Down Expand Up @@ -1433,7 +1427,6 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string) (
return nil, err
}
}
conf.ProtocolVersion = protocolVersionMap[s.config.ProtocolVersion]
conf.RejoinAfterLeave = true
// LeavePropagateDelay is used to make sure broadcasted leave intents propagate
// This value was tuned using https://www.serf.io/docs/internals/simulator.html to
Expand Down
2 changes: 1 addition & 1 deletion nomad/stats_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func NewStatsFetcher(logger log.Logger, pool *pool.ConnPool, region string) *Sta
func (f *StatsFetcher) fetch(server *serverParts, replyCh chan *autopilot.ServerStats) {
var args struct{}
var reply autopilot.ServerStats
err := f.pool.RPC(f.region, server.Addr, server.MajorVersion, "Status.RaftStats", &args, &reply)
err := f.pool.RPC(f.region, server.Addr, "Status.RaftStats", &args, &reply)
if err != nil {
f.logger.Warn("failed retrieving server health", "server", server.Name, "error", err)
} else {
Expand Down
Loading