Skip to content

Commit

Permalink
fix: consume ignored entries in CE downgrade via Ent snapshot
Browse files Browse the repository at this point in the history
This operation would previously fail due to unconsumed bytes in the
decoder buffer when reading the Ent snapshot (the first byte of the
record would be misinterpreted as a type indicator, and the remaining
bytes would fail to be deserialized or read as invalid data).

Ensure restore succeeds by decoding the ignored record as an
interface{}, which will consume the record bytes without requiring a
concrete target struct, then moving on to the next record.
  • Loading branch information
zalimeni committed Apr 11, 2024
1 parent e231f0e commit 24f226e
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 3 deletions.
3 changes: 3 additions & 0 deletions .changelog/20977.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
server: fix Ent snapshot restore on CE when CE downgrade is enabled
```
7 changes: 4 additions & 3 deletions agent/consul/fsm/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func (c *FSM) Apply(log *raft.Log) interface{} {
return nil
}
if structs.CEDowngrade && msgType >= 64 {
c.logger.Warn("ignoring enterprise message, for downgrading to oss", "type", msgType)
c.logger.Warn("ignoring enterprise message as part of downgrade to CE", "type", msgType)
return nil
}
panic(fmt.Errorf("failed to apply request: %#v", buf))
Expand Down Expand Up @@ -268,8 +268,9 @@ func (c *FSM) Restore(old io.ReadCloser) error {
}
default:
if structs.CEDowngrade && msg >= 64 {
c.logger.Warn("ignoring enterprise message , for downgrading to oss", "type", msg)
return nil
c.logger.Warn("ignoring enterprise message as part of downgrade to CE", "type", msg)
var ignore interface{}
return dec.Decode(&ignore)
} else if msg >= 64 {
return fmt.Errorf("msg type <%d> is a Consul Enterprise log entry. Consul CE cannot restore it", msg)
} else {
Expand Down
99 changes: 99 additions & 0 deletions agent/consul/fsm/snapshot_ce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib/stringslice"
"github.com/hashicorp/consul/sdk/testutil"
)

Expand Down Expand Up @@ -60,3 +61,101 @@ func TestRestoreFromEnterprise(t *testing.T) {
require.EqualError(t, fsm.Restore(sink), "msg type <65> is a Consul Enterprise log entry. Consul CE cannot restore it")
sink.Cancel()
}

func TestRestoreFromEnterprise_CEDowngrade(t *testing.T) {
logger := testutil.Logger(t)

handle := &testRaftHandle{}
storageBackend := newStorageBackend(t, handle)
handle.apply = func(buf []byte) (any, error) { return storageBackend.Apply(buf, 123), nil }

fsm := NewFromDeps(Deps{
Logger: logger,
NewStateStore: func() *state.Store {
return state.NewStateStore(nil)
},
StorageBackend: storageBackend,
})

// To verify if a proper message is displayed when Consul CE tries to
// unsuccessfully restore entries from a Consul Ent snapshot.
buf := bytes.NewBuffer(nil)
sink := &MockSink{buf, false}

type EntMock struct {
ID int
Type string
}

entMockEntry := EntMock{
ID: 65,
Type: "A Consul Ent Log Type",
}

// Create one entry to exercise the Go struct marshaller, and one to exercise the
// Binary Marshaller interface. This verifies that regardless of whether the struct gets
// encoded as a msgpack byte string (binary marshaller) or msgpack map (other struct),
// it will still be skipped over correctly.
registerEntry := structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
Service: &structs.NodeService{
ID: "db",
Service: "db",
Tags: []string{"primary"},
Port: 8000,
},
}
proxyDefaultsEntry := &structs.ConfigEntryRequest{
Op: structs.ConfigEntryUpsert,
Entry: &structs.ProxyConfigEntry{
Kind: structs.ProxyDefaults,
Name: "global",
Config: map[string]interface{}{
"foo": "bar",
},
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
},
}

// Write the header and records.
header := SnapshotHeader{
LastIndex: 0,
}
encoder := codec.NewEncoder(sink, structs.MsgpackHandle)
encoder.Encode(&header)
sink.Write([]byte{byte(structs.MessageType(entMockEntry.ID))})
encoder.Encode(entMockEntry)
sink.Write([]byte{byte(structs.RegisterRequestType)})
encoder.Encode(registerEntry)
sink.Write([]byte{byte(structs.ConfigEntryRequestType)})
encoder.Encode(proxyDefaultsEntry)

defer func() {
structs.CEDowngrade = false
}()
structs.CEDowngrade = true

require.NoError(t, fsm.Restore(sink), "failed to decode Ent snapshot to CE")

// Verify the register request
_, nodes, err := fsm.state.Nodes(nil, nil, "")
require.NoError(t, err)
require.Len(t, nodes, 1, "incorrect number of nodes: %v", nodes)
require.Equal(t, "foo", nodes[0].Node)
require.Equal(t, "dc1", nodes[0].Datacenter)
require.Equal(t, "127.0.0.1", nodes[0].Address)
_, fooSrv, err := fsm.state.NodeServices(nil, "foo", nil, "")
require.NoError(t, err)
require.Len(t, fooSrv.Services, 1)
require.Contains(t, fooSrv.Services["db"].Tags, "primary")
require.True(t, stringslice.Contains(fooSrv.Services["db"].Tags, "primary"))
require.Equal(t, 8000, fooSrv.Services["db"].Port)

// Verify the proxy defaults request
_, configEntry, err := fsm.state.ConfigEntry(nil, structs.ProxyDefaults, "global", structs.DefaultEnterpriseMetaInDefaultPartition())
require.NoError(t, err)
configEntry.SetHash(proxyDefaultsEntry.Entry.GetHash())
require.Equal(t, proxyDefaultsEntry.Entry, configEntry)
}

0 comments on commit 24f226e

Please sign in to comment.