Skip to content

Commit

Permalink
test: improve the reset integration tests
Browse files Browse the repository at this point in the history
Provide a trace for each step of the reset sequence taken, so if one of
those fails, integration test produces a meaningful message instead of
proceeding and failing somewhere else.

More cleanup/refactor, should be functionally equivalent.

Fixes #8635

Signed-off-by: Andrey Smirnov <andrey.smirnov@siderolabs.com>
  • Loading branch information
smira committed Apr 24, 2024
1 parent 8cdf0f7 commit 05fd042
Show file tree
Hide file tree
Showing 3 changed files with 173 additions and 226 deletions.
104 changes: 28 additions & 76 deletions internal/integration/api/etcd-recover.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"context"
"fmt"
"io"
"path/filepath"
"testing"
"time"

Expand Down Expand Up @@ -55,8 +56,6 @@ func (suite *EtcdRecoverSuite) TearDownTest() {
}

// TestSnapshotRecover snapshot etcd, wipes control plane nodes and recovers etcd from a snapshot.
//
//nolint:gocyclo
func (suite *EtcdRecoverSuite) TestSnapshotRecover() {
if !suite.Capabilities().SupportsReboot {
suite.T().Skip("cluster doesn't support reboot")
Expand All @@ -83,96 +82,49 @@ func (suite *EtcdRecoverSuite) TestSnapshotRecover() {

suite.Require().NoError(suite.snapshotEtcd(snapshotNode, &snapshot))

// wipe ephemeral partition on all control plane nodes
preReset := map[string]string{}
// leave etcd on all nodes but one
for _, node := range controlPlaneNodes[1:] {
suite.T().Logf("leaving etcd on node %q", node)

for _, node := range controlPlaneNodes {
var err error
nodeCtx := client.WithNode(suite.ctx, node)

preReset[node], err = suite.HashKubeletCert(suite.ctx, node)
_, err := suite.Client.EtcdForfeitLeadership(nodeCtx, &machineapi.EtcdForfeitLeadershipRequest{})
suite.Require().NoError(err)

err = suite.Client.EtcdLeaveCluster(nodeCtx, &machineapi.EtcdLeaveClusterRequest{})
suite.Require().NoError(err)
}

suite.T().Logf("wiping control plane nodes %q", controlPlaneNodes)

errCh := make(chan error)

// wipe ephemeral partition on all control plane nodes, starting with the one that still has etcd running
for _, node := range controlPlaneNodes {
go func() {
errCh <- func() error {
nodeCtx := client.WithNodes(suite.ctx, node)

bootIDBefore, err := suite.ReadBootID(nodeCtx)
if err != nil {
return fmt.Errorf("error reading pre-reset boot ID: %w", err)
}

if err = base.IgnoreGRPCUnavailable(
suite.Client.ResetGeneric(
nodeCtx, &machineapi.ResetRequest{
Reboot: true,
Graceful: false,
SystemPartitionsToWipe: []*machineapi.ResetPartitionSpec{
{
Label: constants.EphemeralPartitionLabel,
Wipe: true,
},
},
},
),
); err != nil {
return fmt.Errorf("error resetting the node %q: %w", node, err)
}

var bootIDAfter string

return retry.Constant(5 * time.Minute).Retry(
func() error {
requestCtx, requestCtxCancel := context.WithTimeout(nodeCtx, 5*time.Second)
defer requestCtxCancel()

bootIDAfter, err = suite.ReadBootID(requestCtx)
if err != nil {
// API might be unresponsive during reboot
return retry.ExpectedError(err)
}

if bootIDAfter == bootIDBefore {
// bootID should be different after reboot
return retry.ExpectedErrorf(
"bootID didn't change for node %q: before %s, after %s",
node,
bootIDBefore,
bootIDAfter,
)
}

return nil
},
)
}()
}()
suite.ResetNode(suite.ctx, node, &machineapi.ResetRequest{
Reboot: true,
Graceful: false,
SystemPartitionsToWipe: []*machineapi.ResetPartitionSpec{
{
Label: constants.EphemeralPartitionLabel,
Wipe: true,
},
},
}, false)
}

for range controlPlaneNodes {
suite.Require().NoError(<-errCh)
}
// verify that etcd data directory doesn't exist on the nodes
for _, node := range controlPlaneNodes {
stream, err := suite.Client.MachineClient.List(client.WithNode(suite.ctx, node), &machineapi.ListRequest{Root: filepath.Join(constants.EtcdDataPath, "member")})
suite.Require().NoError(err)

suite.ClearConnectionRefused(suite.ctx, controlPlaneNodes...)
_, err = stream.Recv()
suite.Require().Error(err)
suite.Require().Equal(client.StatusCode(err), codes.Unknown)
suite.Require().Contains(client.Status(err).Message(), "no such file or directory")
}

suite.T().Logf("recovering etcd snapshot at node %q", recoverNode)

suite.Require().NoError(suite.recoverEtcd(recoverNode, bytes.NewReader(snapshot.Bytes())))

suite.AssertClusterHealthy(suite.ctx)

for _, node := range controlPlaneNodes {
postReset, err := suite.HashKubeletCert(suite.ctx, node)
suite.Require().NoError(err)

suite.Assert().NotEqual(postReset, preReset[node], "kubelet cert hasn't changed for node %q", node)
}
}

func (suite *EtcdRecoverSuite) snapshotEtcd(snapshotNode string, dest io.Writer) error {
Expand Down
199 changes: 49 additions & 150 deletions internal/integration/api/reset.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"time"

"github.com/siderolabs/gen/xslices"
"github.com/siderolabs/go-retry/retry"

"github.com/siderolabs/talos/internal/integration/base"
machineapi "github.com/siderolabs/talos/pkg/machinery/api/machine"
Expand Down Expand Up @@ -68,48 +67,16 @@ func (suite *ResetSuite) TestResetNodeByNode() {
suite.T().Skip("skipping as talos is explicitly trusted booted")
}

initNodeAddress := ""

for _, node := range suite.Cluster.Info().Nodes {
if node.Type == machine.TypeInit {
initNodeAddress = node.IPs[0].String()

break
}
}

nodes := suite.DiscoverNodeInternalIPs(suite.ctx)
suite.Require().NotEmpty(nodes)

sort.Strings(nodes)

for _, node := range nodes {
if node == initNodeAddress {
// due to the bug with etcd cluster build for the init node after Reset(), skip resetting first node
// there's no problem if bootstrap API was used, so this check only protects legacy init nodes
suite.T().Log("Skipping init node", node, "due to known issue with etcd")

continue
}

suite.T().Log("Resetting node", node)

preReset, err := suite.HashKubeletCert(suite.ctx, node)
suite.Require().NoError(err)

suite.AssertRebooted(
suite.ctx, node, func(nodeCtx context.Context) error {
// force reboot after reset, as this is the only mode we can test
return base.IgnoreGRPCUnavailable(suite.Client.Reset(nodeCtx, true, true))
}, 10*time.Minute,
)

suite.ClearConnectionRefused(suite.ctx, node)

postReset, err := suite.HashKubeletCert(suite.ctx, node)
suite.Require().NoError(err)

suite.Assert().NotEqual(preReset, postReset, "reset should lead to new kubelet cert being generated")
suite.ResetNode(suite.ctx, node, &machineapi.ResetRequest{
Reboot: true,
Graceful: true,
}, true)
}
}

Expand All @@ -121,24 +88,10 @@ func (suite *ResetSuite) testResetNoGraceful(nodeType machine.Type) {

node := suite.RandomDiscoveredNodeInternalIP(nodeType)

suite.T().Logf("Resetting %s node !graceful %s", nodeType, node)

preReset, err := suite.HashKubeletCert(suite.ctx, node)
suite.Require().NoError(err)

suite.AssertRebooted(
suite.ctx, node, func(nodeCtx context.Context) error {
// force reboot after reset, as this is the only mode we can test
return base.IgnoreGRPCUnavailable(suite.Client.Reset(nodeCtx, false, true))
}, 5*time.Minute,
)

suite.ClearConnectionRefused(suite.ctx, node)

postReset, err := suite.HashKubeletCert(suite.ctx, node)
suite.Require().NoError(err)

suite.Assert().NotEqual(preReset, postReset, "reset should lead to new kubelet cert being generated")
suite.ResetNode(suite.ctx, node, &machineapi.ResetRequest{
Reboot: true,
Graceful: false,
}, true)
}

// TestResetNoGracefulWorker resets a worker in !graceful mode.
Expand All @@ -157,37 +110,16 @@ func (suite *ResetSuite) TestResetNoGracefulControlplane() {
func (suite *ResetSuite) TestResetWithSpecEphemeral() {
node := suite.RandomDiscoveredNodeInternalIP()

suite.T().Log("Resetting node with spec=[EPHEMERAL]", node)

preReset, err := suite.HashKubeletCert(suite.ctx, node)
suite.Require().NoError(err)

suite.AssertRebooted(
suite.ctx, node, func(nodeCtx context.Context) error {
// force reboot after reset, as this is the only mode we can test
return base.IgnoreGRPCUnavailable(
suite.Client.ResetGeneric(
nodeCtx, &machineapi.ResetRequest{
Reboot: true,
Graceful: true,
SystemPartitionsToWipe: []*machineapi.ResetPartitionSpec{
{
Label: constants.EphemeralPartitionLabel,
Wipe: true,
},
},
},
),
)
}, 5*time.Minute,
)

suite.ClearConnectionRefused(suite.ctx, node)

postReset, err := suite.HashKubeletCert(suite.ctx, node)
suite.Require().NoError(err)

suite.Assert().NotEqual(preReset, postReset, "reset should lead to new kubelet cert being generated")
suite.ResetNode(suite.ctx, node, &machineapi.ResetRequest{
Reboot: true,
Graceful: true,
SystemPartitionsToWipe: []*machineapi.ResetPartitionSpec{
{
Label: constants.EphemeralPartitionLabel,
Wipe: true,
},
},
}, true)
}

// TestResetWithSpecState resets only state partition on the node.
Expand All @@ -201,12 +133,7 @@ func (suite *ResetSuite) TestResetWithSpecState() {

node := suite.RandomDiscoveredNodeInternalIP()

suite.T().Log("Resetting node with spec=[STATE]", node)

preReset, err := suite.HashKubeletCert(suite.ctx, node)
suite.Require().NoError(err)

disks, err := suite.Client.Disks(client.WithNodes(suite.ctx, node))
disks, err := suite.Client.Disks(client.WithNode(suite.ctx, node))
suite.Require().NoError(err)
suite.Require().NotEmpty(disks.Messages)

Expand All @@ -219,73 +146,45 @@ func (suite *ResetSuite) TestResetWithSpecState() {
},
)

suite.AssertRebooted(
suite.ctx, node, func(nodeCtx context.Context) error {
// force reboot after reset, as this is the only mode we can test
return base.IgnoreGRPCUnavailable(
suite.Client.ResetGeneric(
nodeCtx, &machineapi.ResetRequest{
Reboot: true,
Graceful: true,
SystemPartitionsToWipe: []*machineapi.ResetPartitionSpec{
{
Label: constants.StatePartitionLabel,
Wipe: true,
},
},
UserDisksToWipe: userDisksToWipe,
},
),
)
}, 5*time.Minute,
)

suite.ClearConnectionRefused(suite.ctx, node)

postReset, err := suite.HashKubeletCert(suite.ctx, node)
suite.Require().NoError(err)

suite.Assert().Equal(preReset, postReset, "ephemeral partition was not reset")
suite.ResetNode(suite.ctx, node, &machineapi.ResetRequest{
Reboot: true,
Graceful: true,
SystemPartitionsToWipe: []*machineapi.ResetPartitionSpec{
{
Label: constants.StatePartitionLabel,
Wipe: true,
},
},
UserDisksToWipe: userDisksToWipe,
}, true)
}

// TestResetDuringBoot resets the node multiple times, second reset is done
// before boot sequence is complete.
// TestResetDuringBoot resets the node while it is in boot sequence.
func (suite *ResetSuite) TestResetDuringBoot() {
node := suite.RandomDiscoveredNodeInternalIP()
nodeCtx := client.WithNodes(suite.ctx, node)

suite.T().Log("Resetting node", node)

for range 2 {
bootID := suite.ReadBootIDWithRetry(nodeCtx, time.Minute*5)

err := retry.Constant(5*time.Minute, retry.WithUnits(time.Millisecond*1000)).Retry(
func() error {
// force reboot after reset, as this is the only mode we can test
return retry.ExpectedError(
suite.Client.ResetGeneric(
client.WithNodes(suite.ctx, node), &machineapi.ResetRequest{
Reboot: true,
Graceful: true,
SystemPartitionsToWipe: []*machineapi.ResetPartitionSpec{
{
Label: constants.EphemeralPartitionLabel,
Wipe: true,
},
},
},
),
)
},
)
suite.T().Log("rebooting node", node)

bootIDBefore, err := suite.ReadBootID(nodeCtx)
suite.Require().NoError(err)

suite.Require().NoError(err)
suite.Require().NoError(suite.Client.Reboot(nodeCtx))

suite.AssertBootIDChanged(nodeCtx, bootID, node, time.Minute*5)
}
suite.AssertBootIDChanged(nodeCtx, bootIDBefore, node, 3*time.Minute)

suite.ClearConnectionRefused(suite.ctx, node)

suite.WaitForBootDone(suite.ctx)
suite.AssertClusterHealthy(suite.ctx)
suite.ResetNode(suite.ctx, node, &machineapi.ResetRequest{
Reboot: true,
Graceful: true,
SystemPartitionsToWipe: []*machineapi.ResetPartitionSpec{
{
Label: constants.EphemeralPartitionLabel,
Wipe: true,
},
},
}, true)
}

func init() {
Expand Down
Loading

0 comments on commit 05fd042

Please sign in to comment.