Skip to content

Commit

Permalink
Set the 'require-osd-release' option on startup (#459)
Browse files Browse the repository at this point in the history
# Description

This patchset modifies the microcephd component so that it sets the
'require-osd-release' config option on startup, if needed. That is, in
case the new (current) Ceph version is ahead of what the OSD's are
currently running.

## Type of change

- [ ] Bug fix (non-breaking change which fixes an issue)

## How Has This Been Tested?

Unit testing is provided for this change.

---------

Signed-off-by: Luciano Lo Giudice <luciano.logiudice@canonical.com>
Signed-off-by: Peter Sabaini <peter.sabaini@canonical.com>
Co-authored-by: Peter Sabaini <peter.sabaini@canonical.com>
  • Loading branch information
lmlg and sabaini authored Nov 11, 2024
1 parent c9f3f46 commit 95ad180
Show file tree
Hide file tree
Showing 2 changed files with 323 additions and 1 deletion.
143 changes: 142 additions & 1 deletion microceph/ceph/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,149 @@ package ceph
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"reflect"
"strings"
"time"

"github.com/canonical/lxd/shared/logger"

"github.com/canonical/microceph/microceph/database"
"github.com/canonical/microceph/microceph/interfaces"
)

type cephVersionElem map[string]int32

type cephVersion struct {
Mon cephVersionElem `json:"mon"`
Mgr cephVersionElem `json:"mgr"`
Osd cephVersionElem `json:"osd"`
Mds cephVersionElem `json:"mds"`
Overall cephVersionElem `json:"overall"`
}

// getCurrentVersion extracts the version codename from the 'ceph -v' output
func getCurrentVersion() (string, error) {
output, err := processExec.RunCommand("ceph", "-v")
if err != nil {
return "", fmt.Errorf("failed to get ceph version: %w", err)
}

parts := strings.Fields(output)
if len(parts) < 6 { // need sth like "ceph version 19.2.0 (e7ad534...) squid (stable)"
return "", fmt.Errorf("invalid version string format: %s", output)
}

return parts[len(parts)-2], nil // second to last is version code name
}

// checkVersions checks if all Ceph services are running the same version
// retry up to 3 times if multiple versions are detected to allow for upgrades to complete as they are performed
// concurrently
func checkVersions() (bool, error) {
const (
maxRetries = 3
retryDelay = 5 * time.Second
)

for attempt := 0; attempt < maxRetries; attempt++ {
out, err := processExec.RunCommand("ceph", "versions")
if err != nil {
return false, fmt.Errorf("failed to get Ceph versions: %w", err)
}

var cephVer cephVersion
err = json.Unmarshal([]byte(out), &cephVer)
if err != nil {
return false, fmt.Errorf("failed to unmarshal Ceph versions: %w", err)
}

if len(cephVer.Overall) > 1 {
if attempt < maxRetries-1 {
logger.Debugf("multiple versions detected (attempt %d/%d), waiting %v before retry",
attempt+1, maxRetries, retryDelay)
time.Sleep(retryDelay)
continue
}
logger.Debug("not all upgrades have completed after retries")
return false, nil
}

if len(cephVer.Osd) < 1 {
logger.Debug("no OSD versions found")
return false, nil
}

return true, nil
}
// this should never be reached
return false, nil
}

func osdReleaseRequired(version string) (bool, error) {
out, err := processExec.RunCommand("ceph", "osd", "dump", "-f", "json")
if err != nil {
return false, fmt.Errorf("failed to get OSD dump: %w", err)
}

var result map[string]any
err = json.Unmarshal([]byte(out), &result)
if err != nil {
return false, fmt.Errorf("failed to unmarshal OSD dump: %w", err)
}

releaseVersion, ok := result["require_osd_release"].(string)
if !ok {
return false, fmt.Errorf("invalid or missing require_osd_release in OSD dump")
}

return releaseVersion != version, nil
}

func updateOSDRelease(version string) error {
_, err := processExec.RunCommand("ceph", "osd", "require-osd-release",
version, "--yes-i-really-mean-it")
if err != nil {
return fmt.Errorf("failed to update OSD release version: %w", err)
}
return nil
}

// PostRefresh handles version checking and OSD release updates
func PostRefresh() error {
currentVersion, err := getCurrentVersion()
if err != nil {
return fmt.Errorf("version check failed: %w", err)
}

allVersionsEqual, err := checkVersions()
if err != nil {
return fmt.Errorf("version equality check failed: %w", err)
}

if !allVersionsEqual {
logger.Info("versions not equal, skipping OSD release update")
return nil
}

mustUpdate, err := osdReleaseRequired(currentVersion)
if err != nil {
return fmt.Errorf("OSD release check failed: %w", err)
}

if !mustUpdate {
logger.Debug("OSD release update not required")
return nil
}
err = updateOSDRelease(currentVersion)
if err != nil {
return fmt.Errorf("OSD release update failed: %w", err)
}

logger.Infof("successfully updated OSD release version: %s", currentVersion)
return nil
}

// Start is run on daemon startup.
func Start(ctx context.Context, s interfaces.StateInterface) error {
// Start background loop to refresh the config every minute if needed.
Expand Down Expand Up @@ -67,5 +201,12 @@ func Start(ctx context.Context, s interfaces.StateInterface) error {
}
}()

go func() {
err := PostRefresh()
if err != nil {
logger.Errorf("PostRefresh failed: %v", err)
}
}()

return nil
}
181 changes: 181 additions & 0 deletions microceph/ceph/start_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
package ceph

import (
"errors"
"testing"

"github.com/canonical/microceph/microceph/mocks"
"github.com/canonical/microceph/microceph/tests"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
)

type startSuite struct {
tests.BaseSuite
}

func TestStart(t *testing.T) {
suite.Run(t, new(startSuite))
}

func addExpected(r *mocks.Runner) {
version := `ceph version 19.2.0 (e7ad5345525c7aa95470c26863873b581076945d) squid (stable)`
versionsJson := `{
"mon": {
"ceph version 18.2.4 (e7ad5345525c7aa95470c26863873b581076945d) reef (stable)": 1
},
"mgr": {
"ceph version 18.2.4 (e7ad5345525c7aa95470c26863873b581076945d) reef (stable)": 1
},
"osd": {
"ceph version 18.2.4 (e7ad5345525c7aa95470c26863873b581076945d) reef (stable)": 4
},
"mds": {
"ceph version 18.2.4 (e7ad5345525c7aa95470c26863873b581076945d) reef (stable)": 1
},
"overall": {
"ceph version 18.2.4 (e7ad5345525c7aa95470c26863873b581076945d) reef (stable)": 7
}
}`
osdDump := `{"require_osd_release": "reef"}`

r.On("RunCommand", "ceph", "-v").Return(version, nil).Once()
r.On("RunCommand", "ceph", "versions").Return(versionsJson, nil).Once()
r.On("RunCommand", "ceph", "osd", "dump", "-f", "json").Return(osdDump, nil).Once()
r.On("RunCommand", "ceph", "osd", "require-osd-release",
"squid", "--yes-i-really-mean-it").Return("ok", nil).Once()
}

func (s *startSuite) TestStartOSDReleaseUpdate() {
r := mocks.NewRunner(s.T())

addExpected(r)
processExec = r

err := PostRefresh()
assert.NoError(s.T(), err)
r.AssertExpectations(s.T())
}

func (s *startSuite) TestInvalidVersionString() {
r := mocks.NewRunner(s.T())
// only expect the version command, others shouldnt be reached
r.On("RunCommand", "ceph", "-v").Return("invalid version", nil).Once()
processExec = r

err := PostRefresh()
assert.Error(s.T(), err)
assert.Contains(s.T(), err.Error(), "invalid version string")
r.AssertExpectations(s.T())
}

func (s *startSuite) TestMultipleVersionsPresent() {
r := mocks.NewRunner(s.T())
version := `ceph version 19.2.0 (e7ad5345525c7aa95470c26863873b581076945d) squid (stable)`
versionsJson := `{
"mon": {
"ceph version 18.2.4 reef (stable)": 1,
"ceph version 19.2.0 squid (stable)": 1
},
"overall": {
"ceph version 18.2.4 reef (stable)": 1,
"ceph version 19.2.0 squid (stable)": 1
}
}`

r.On("RunCommand", "ceph", "-v").Return(version, nil).Once()
r.On("RunCommand", "ceph", "versions").Return(versionsJson, nil).Times(3)
processExec = r

err := PostRefresh()
assert.NoError(s.T(), err)
r.AssertExpectations(s.T())
}

func (s *startSuite) TestNoOSDVersions() {
r := mocks.NewRunner(s.T())
version := `ceph version 19.2.0 (e7ad5345525c7aa95470c26863873b581076945d) squid (stable)`
versionsJson := `{
"mon": {
"ceph version 19.2.0 squid (stable)": 1
},
"overall": {
"ceph version 19.2.0 squid (stable)": 1
}
}`

r.On("RunCommand", "ceph", "-v").Return(version, nil).Once()
r.On("RunCommand", "ceph", "versions").Return(versionsJson, nil).Once()
processExec = r

err := PostRefresh()
assert.NoError(s.T(), err) // no OSD versions, so no update required
r.AssertExpectations(s.T())
}

func (s *startSuite) TestOSDReleaseUpToDate() {
r := mocks.NewRunner(s.T())
version := `ceph version 19.2.0 (e7ad5345525c7aa95470c26863873b581076945d) squid (stable)`
versionsJson := `{
"mon": {
"ceph version 19.2.0 squid (stable)": 1
},
"osd": {
"ceph version 19.2.0 squid (stable)": 1
},
"overall": {
"ceph version 19.2.0 squid (stable)": 2
}
}`
osdDump := `{"require_osd_release": "squid"}`

r.On("RunCommand", "ceph", "-v").Return(version, nil).Once()
r.On("RunCommand", "ceph", "versions").Return(versionsJson, nil).Once()
r.On("RunCommand", "ceph", "osd", "dump", "-f", "json").Return(osdDump, nil).Once()
processExec = r

err := PostRefresh()
assert.NoError(s.T(), err)
r.AssertExpectations(s.T())
}

func (s *startSuite) TestOSDReleaseUpdateFails() {
r := mocks.NewRunner(s.T())
version := `ceph version 19.2.0 (e7ad5345525c7aa95470c26863873b581076945d) squid (stable)`
versionsJson := `{
"mon": {
"ceph version 19.2.0 squid (stable)": 1
},
"osd": {
"ceph version 19.2.0 squid (stable)": 1
},
"overall": {
"ceph version 19.2.0 squid (stable)": 2
}
}`
osdDump := `{"require_osd_release": "reef"}`

r.On("RunCommand", "ceph", "-v").Return(version, nil).Once()
r.On("RunCommand", "ceph", "versions").Return(versionsJson, nil).Once()
r.On("RunCommand", "ceph", "osd", "dump", "-f", "json").Return(osdDump, nil).Once()
r.On("RunCommand", "ceph", "osd", "require-osd-release", "squid", "--yes-i-really-mean-it").
Return("", errors.New("update failed")).Once()
processExec = r

err := PostRefresh()
assert.Error(s.T(), err)
assert.Contains(s.T(), err.Error(), "OSD release update failed")
r.AssertExpectations(s.T())
}

func (s *startSuite) TestCephVersionCommandFails() {
r := mocks.NewRunner(s.T())
r.On("RunCommand", "ceph", "-v").Return("", errors.New("command failed")).Once()
processExec = r

err := PostRefresh()
assert.Error(s.T(), err)
assert.Contains(s.T(), err.Error(), "version check failed")
r.AssertExpectations(s.T())
}

0 comments on commit 95ad180

Please sign in to comment.