Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport of fix: consume ignored entries in CE downgrade via Ent snapshot into release/1.18.x #20978

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/20977.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
server: fix Ent snapshot restore on CE when CE downgrade is enabled
```
7 changes: 4 additions & 3 deletions agent/consul/fsm/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func (c *FSM) Apply(log *raft.Log) interface{} {
return nil
}
if structs.CEDowngrade && msgType >= 64 {
c.logger.Warn("ignoring enterprise message, for downgrading to oss", "type", msgType)
c.logger.Warn("ignoring enterprise message as part of downgrade to CE", "type", msgType)
return nil
}
panic(fmt.Errorf("failed to apply request: %#v", buf))
Expand Down Expand Up @@ -268,8 +268,9 @@ func (c *FSM) Restore(old io.ReadCloser) error {
}
default:
if structs.CEDowngrade && msg >= 64 {
c.logger.Warn("ignoring enterprise message , for downgrading to oss", "type", msg)
return nil
c.logger.Warn("ignoring enterprise message as part of downgrade to CE", "type", msg)
var ignore interface{}
return dec.Decode(&ignore)
} else if msg >= 64 {
return fmt.Errorf("msg type <%d> is a Consul Enterprise log entry. Consul CE cannot restore it", msg)
} else {
Expand Down
99 changes: 99 additions & 0 deletions agent/consul/fsm/snapshot_ce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib/stringslice"
"github.com/hashicorp/consul/sdk/testutil"
)

Expand Down Expand Up @@ -60,3 +61,101 @@ func TestRestoreFromEnterprise(t *testing.T) {
require.EqualError(t, fsm.Restore(sink), "msg type <65> is a Consul Enterprise log entry. Consul CE cannot restore it")
sink.Cancel()
}

func TestRestoreFromEnterprise_CEDowngrade(t *testing.T) {
logger := testutil.Logger(t)

handle := &testRaftHandle{}
storageBackend := newStorageBackend(t, handle)
handle.apply = func(buf []byte) (any, error) { return storageBackend.Apply(buf, 123), nil }

fsm := NewFromDeps(Deps{
Logger: logger,
NewStateStore: func() *state.Store {
return state.NewStateStore(nil)
},
StorageBackend: storageBackend,
})

// To verify if a proper message is displayed when Consul CE tries to
// unsuccessfully restore entries from a Consul Ent snapshot.
buf := bytes.NewBuffer(nil)
sink := &MockSink{buf, false}

type EntMock struct {
ID int
Type string
}

entMockEntry := EntMock{
ID: 65,
Type: "A Consul Ent Log Type",
}

// Create one entry to exercise the Go struct marshaller, and one to exercise the
// Binary Marshaller interface. This verifies that regardless of whether the struct gets
// encoded as a msgpack byte string (binary marshaller) or msgpack map (other struct),
// it will still be skipped over correctly.
registerEntry := structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
Service: &structs.NodeService{
ID: "db",
Service: "db",
Tags: []string{"primary"},
Port: 8000,
},
}
proxyDefaultsEntry := &structs.ConfigEntryRequest{
Op: structs.ConfigEntryUpsert,
Entry: &structs.ProxyConfigEntry{
Kind: structs.ProxyDefaults,
Name: "global",
Config: map[string]interface{}{
"foo": "bar",
},
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
},
}

// Write the header and records.
header := SnapshotHeader{
LastIndex: 0,
}
encoder := codec.NewEncoder(sink, structs.MsgpackHandle)
encoder.Encode(&header)
sink.Write([]byte{byte(structs.MessageType(entMockEntry.ID))})
encoder.Encode(entMockEntry)
sink.Write([]byte{byte(structs.RegisterRequestType)})
encoder.Encode(registerEntry)
sink.Write([]byte{byte(structs.ConfigEntryRequestType)})
encoder.Encode(proxyDefaultsEntry)

defer func() {
structs.CEDowngrade = false
}()
structs.CEDowngrade = true

require.NoError(t, fsm.Restore(sink), "failed to decode Ent snapshot to CE")

// Verify the register request
_, nodes, err := fsm.state.Nodes(nil, nil, "")
require.NoError(t, err)
require.Len(t, nodes, 1, "incorrect number of nodes: %v", nodes)
require.Equal(t, "foo", nodes[0].Node)
require.Equal(t, "dc1", nodes[0].Datacenter)
require.Equal(t, "127.0.0.1", nodes[0].Address)
_, fooSrv, err := fsm.state.NodeServices(nil, "foo", nil, "")
require.NoError(t, err)
require.Len(t, fooSrv.Services, 1)
require.Contains(t, fooSrv.Services["db"].Tags, "primary")
require.True(t, stringslice.Contains(fooSrv.Services["db"].Tags, "primary"))
require.Equal(t, 8000, fooSrv.Services["db"].Port)

// Verify the proxy defaults request
_, configEntry, err := fsm.state.ConfigEntry(nil, structs.ProxyDefaults, "global", structs.DefaultEnterpriseMetaInDefaultPartition())
require.NoError(t, err)
configEntry.SetHash(proxyDefaultsEntry.Entry.GetHash())
require.Equal(t, proxyDefaultsEntry.Entry, configEntry)
}
Loading