Skip to content

Commit

Permalink
Improve require-osd-release (#460)
Browse files Browse the repository at this point in the history
Signed-off-by: Peter Sabaini <peter.sabaini@canonical.com>
  • Loading branch information
sabaini authored Nov 11, 2024
1 parent c2ac588 commit 98578f7
Show file tree
Hide file tree
Showing 2 changed files with 210 additions and 44 deletions.
128 changes: 85 additions & 43 deletions microceph/ceph/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"time"

"github.com/canonical/lxd/shared/logger"

"github.com/canonical/microceph/microceph/database"
"github.com/canonical/microceph/microceph/interfaces"
)
Expand All @@ -25,87 +24,125 @@ type cephVersion struct {
Overall cephVersionElem `json:"overall"`
}

func checkVersions() (bool, error) {
out, err := processExec.RunCommand("ceph", "versions")
// getCurrentVersion extracts the version codename from the 'ceph -v' output
func getCurrentVersion() (string, error) {
output, err := processExec.RunCommand("ceph", "-v")
if err != nil {
return false, fmt.Errorf("Failed to get Ceph versions: %w", err)
return "", fmt.Errorf("failed to get ceph version: %w", err)
}

var cephVer cephVersion
err = json.Unmarshal([]byte(out), &cephVer)
if err != nil {
return false, fmt.Errorf("Failed to unmarshal Ceph versions: %w", err)
parts := strings.Fields(output)
if len(parts) < 6 { // need sth like "ceph version 19.2.0 (e7ad534...) squid (stable)"
return "", fmt.Errorf("invalid version string format: %s", output)
}

if len(cephVer.Overall) > 1 {
logger.Debug("Not all upgrades have completed")
return false, nil
}
return parts[len(parts)-2], nil // second to last is version code name
}

if len(cephVer.Osd) < 1 {
logger.Debug("No OSD versions found")
return false, nil
}
// checkVersions checks if all Ceph services are running the same version
// retry up to 3 times if multiple versions are detected to allow for upgrades to complete as they are performed
// concurrently
func checkVersions() (bool, error) {
const (
maxRetries = 3
retryDelay = 5 * time.Second
)

for attempt := 0; attempt < maxRetries; attempt++ {
out, err := processExec.RunCommand("ceph", "versions")
if err != nil {
return false, fmt.Errorf("failed to get Ceph versions: %w", err)
}

return true, nil
var cephVer cephVersion
err = json.Unmarshal([]byte(out), &cephVer)
if err != nil {
return false, fmt.Errorf("failed to unmarshal Ceph versions: %w", err)
}

if len(cephVer.Overall) > 1 {
if attempt < maxRetries-1 {
logger.Debugf("multiple versions detected (attempt %d/%d), waiting %v before retry",
attempt+1, maxRetries, retryDelay)
time.Sleep(retryDelay)
continue
}
logger.Debug("not all upgrades have completed after retries")
return false, nil
}

if len(cephVer.Osd) < 1 {
logger.Debug("no OSD versions found")
return false, nil
}

return true, nil
}
// this should never be reached
return false, nil
}

func osdReleaseRequired(version string) (bool, error) {
out, err := processExec.RunCommand("ceph", "osd", "dump", "-f", "json")
if err != nil {
return false, fmt.Errorf("Failed to get OSD dump: %w", err)
return false, fmt.Errorf("failed to get OSD dump: %w", err)
}

var result map[string]any
err = json.Unmarshal([]byte(out), &result)
if err != nil {
return false, fmt.Errorf("Failed to unmarshal OSD dump: %w", err)
return false, fmt.Errorf("failed to unmarshal OSD dump: %w", err)
}

return result["require_osd_release"].(string) != version, nil
releaseVersion, ok := result["require_osd_release"].(string)
if !ok {
return false, fmt.Errorf("invalid or missing require_osd_release in OSD dump")
}

return releaseVersion != version, nil
}

func PostRefresh() error {
currentVersion, err := processExec.RunCommand("ceph", "-v")
func updateOSDRelease(version string) error {
_, err := processExec.RunCommand("ceph", "osd", "require-osd-release",
version, "--yes-i-really-mean-it")
if err != nil {
return err
}

lastPos := strings.LastIndex(currentVersion, " ")
if lastPos < 0 {
return fmt.Errorf("invalid version string: %s", currentVersion)
return fmt.Errorf("failed to update OSD release version: %w", err)
}
return nil
}

currentVersion = currentVersion[0:lastPos]
lastPos = strings.LastIndex(currentVersion, " ")
if lastPos < 0 {
return fmt.Errorf("invalid version string: %s", currentVersion)
// PostRefresh handles version checking and OSD release updates
func PostRefresh() error {
currentVersion, err := getCurrentVersion()
if err != nil {
return fmt.Errorf("version check failed: %w", err)
}

currentVersion = currentVersion[lastPos+1 : len(currentVersion)]
allVersionsEqual, err := checkVersions()

if err != nil {
return err
return fmt.Errorf("version equality check failed: %w", err)
}

if !allVersionsEqual {
logger.Info("versions not equal, skipping OSD release update")
return nil
}

mustUpdate, err := osdReleaseRequired(currentVersion)
if err != nil {
return err
return fmt.Errorf("OSD release check failed: %w", err)
}

if mustUpdate {
_, err = processExec.RunCommand("ceph", "osd", "require-osd-release",
currentVersion, "--yes-i-really-mean-it")
if err != nil {
return err
}
if !mustUpdate {
logger.Debug("OSD release update not required")
return nil
}
err = updateOSDRelease(currentVersion)
if err != nil {
return fmt.Errorf("OSD release update failed: %w", err)
}

logger.Infof("successfully updated OSD release version: %s", currentVersion)
return nil
}

Expand Down Expand Up @@ -164,7 +201,12 @@ func Start(ctx context.Context, s interfaces.StateInterface) error {
}
}()

go PostRefresh()
go func() {
err := PostRefresh()
if err != nil {
logger.Errorf("PostRefresh failed: %v", err)
}
}()

return nil
}
126 changes: 125 additions & 1 deletion microceph/ceph/start_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ceph

import (
"errors"
"testing"

"github.com/canonical/microceph/microceph/mocks"
Expand Down Expand Up @@ -46,12 +47,135 @@ func addExpected(r *mocks.Runner) {
"squid", "--yes-i-really-mean-it").Return("ok", nil).Once()
}

func (s *startSuite) TestStart() {
func (s *startSuite) TestStartOSDReleaseUpdate() {
r := mocks.NewRunner(s.T())

addExpected(r)
processExec = r

err := PostRefresh()
assert.NoError(s.T(), err)
r.AssertExpectations(s.T())
}

func (s *startSuite) TestInvalidVersionString() {
r := mocks.NewRunner(s.T())
// only expect the version command, others shouldnt be reached
r.On("RunCommand", "ceph", "-v").Return("invalid version", nil).Once()
processExec = r

err := PostRefresh()
assert.Error(s.T(), err)
assert.Contains(s.T(), err.Error(), "invalid version string")
r.AssertExpectations(s.T())
}

func (s *startSuite) TestMultipleVersionsPresent() {
r := mocks.NewRunner(s.T())
version := `ceph version 19.2.0 (e7ad5345525c7aa95470c26863873b581076945d) squid (stable)`
versionsJson := `{
"mon": {
"ceph version 18.2.4 reef (stable)": 1,
"ceph version 19.2.0 squid (stable)": 1
},
"overall": {
"ceph version 18.2.4 reef (stable)": 1,
"ceph version 19.2.0 squid (stable)": 1
}
}`

r.On("RunCommand", "ceph", "-v").Return(version, nil).Once()
r.On("RunCommand", "ceph", "versions").Return(versionsJson, nil).Times(3)
processExec = r

err := PostRefresh()
assert.NoError(s.T(), err)
r.AssertExpectations(s.T())
}

func (s *startSuite) TestNoOSDVersions() {
r := mocks.NewRunner(s.T())
version := `ceph version 19.2.0 (e7ad5345525c7aa95470c26863873b581076945d) squid (stable)`
versionsJson := `{
"mon": {
"ceph version 19.2.0 squid (stable)": 1
},
"overall": {
"ceph version 19.2.0 squid (stable)": 1
}
}`

r.On("RunCommand", "ceph", "-v").Return(version, nil).Once()
r.On("RunCommand", "ceph", "versions").Return(versionsJson, nil).Once()
processExec = r

err := PostRefresh()
assert.NoError(s.T(), err) // no OSD versions, so no update required
r.AssertExpectations(s.T())
}

func (s *startSuite) TestOSDReleaseUpToDate() {
r := mocks.NewRunner(s.T())
version := `ceph version 19.2.0 (e7ad5345525c7aa95470c26863873b581076945d) squid (stable)`
versionsJson := `{
"mon": {
"ceph version 19.2.0 squid (stable)": 1
},
"osd": {
"ceph version 19.2.0 squid (stable)": 1
},
"overall": {
"ceph version 19.2.0 squid (stable)": 2
}
}`
osdDump := `{"require_osd_release": "squid"}`

r.On("RunCommand", "ceph", "-v").Return(version, nil).Once()
r.On("RunCommand", "ceph", "versions").Return(versionsJson, nil).Once()
r.On("RunCommand", "ceph", "osd", "dump", "-f", "json").Return(osdDump, nil).Once()
processExec = r

err := PostRefresh()
assert.NoError(s.T(), err)
r.AssertExpectations(s.T())
}

func (s *startSuite) TestOSDReleaseUpdateFails() {
r := mocks.NewRunner(s.T())
version := `ceph version 19.2.0 (e7ad5345525c7aa95470c26863873b581076945d) squid (stable)`
versionsJson := `{
"mon": {
"ceph version 19.2.0 squid (stable)": 1
},
"osd": {
"ceph version 19.2.0 squid (stable)": 1
},
"overall": {
"ceph version 19.2.0 squid (stable)": 2
}
}`
osdDump := `{"require_osd_release": "reef"}`

r.On("RunCommand", "ceph", "-v").Return(version, nil).Once()
r.On("RunCommand", "ceph", "versions").Return(versionsJson, nil).Once()
r.On("RunCommand", "ceph", "osd", "dump", "-f", "json").Return(osdDump, nil).Once()
r.On("RunCommand", "ceph", "osd", "require-osd-release", "squid", "--yes-i-really-mean-it").
Return("", errors.New("update failed")).Once()
processExec = r

err := PostRefresh()
assert.Error(s.T(), err)
assert.Contains(s.T(), err.Error(), "OSD release update failed")
r.AssertExpectations(s.T())
}

func (s *startSuite) TestCephVersionCommandFails() {
r := mocks.NewRunner(s.T())
r.On("RunCommand", "ceph", "-v").Return("", errors.New("command failed")).Once()
processExec = r

err := PostRefresh()
assert.Error(s.T(), err)
assert.Contains(s.T(), err.Error(), "version check failed")
r.AssertExpectations(s.T())
}

0 comments on commit 98578f7

Please sign in to comment.