Skip to content

Commit

Permalink
[Automated] Merged refs/heads/k8s-sync-2025-01-13-0908-ae0c13b9300dea…
Browse files Browse the repository at this point in the history
…374a9c361e654164fdbb37ff7c into target main
  • Loading branch information
github-actions[bot] authored Jan 13, 2025
2 parents 3d877cd + 56e9f96 commit a635dff
Show file tree
Hide file tree
Showing 13 changed files with 327 additions and 42 deletions.
13 changes: 11 additions & 2 deletions commands/cmd_start_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package commands

import (
"fmt"
"strings"

"github.com/spf13/cobra"
"github.com/spf13/viper"
Expand Down Expand Up @@ -262,8 +263,16 @@ func (c *CmdStartReplication) Run(vcc vclusterops.ClusterCommands) error {

transactionID, err := vcc.VReplicateDatabase(options)
if err != nil {
vcc.LogError(err, "failed to replicate to database", "targetDB", options.TargetDB.DBName)
return err
errMsg := err.Error()
// when no table matches include pattern, errMsg contains "ERROR 4089: [22023] No objects specified"
// when no table matches table-or-schema-name, errMsg contains "ERROR 11781: [22023] Unknown or unsupported table name"
if !strings.Contains(errMsg, "22023") {
vcc.LogError(err, "failed to replicate to database", "targetDB", options.TargetDB.DBName)
return err
}
vcc.DisplayWarning("No data is replicated to database %s: %s",
options.TargetDB.DBName, errMsg)
return nil
}

if options.Async {
Expand Down
1 change: 0 additions & 1 deletion commands/cmd_upgrade_license.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ Examples:

// require license file path
markFlagsRequired(cmd, licenseFileFlag)
markFlagsRequired(cmd, licenseHostFlag)

return cmd
}
Expand Down
10 changes: 5 additions & 5 deletions commands/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,17 +77,17 @@ func converErrorMessage(err error, logger vlog.Printer) string {
errMsg := err.Error()
logger.Error(err, "error to be converted into err msg")
if strings.Contains(errMsg, "down database") {
return "failed to vertify connection parameters. please check your db name and host list"
return "Failed to verify connection parameters. Please check your db name and host list"
} else if strings.Contains(errMsg, "Wrong password") {
return "failed to vertify connection parameters. please check your db username and password"
return "Failed to verify connection parameters. Please check your db username and password"
} else if strings.Contains(errMsg, "rather than database") {
return "failed to vertify connection parameters. please check your db name"
return "Failed to verify connection parameters. Please check your db name"
} else if strings.Contains(errMsg, "no such host") || strings.Contains(errMsg, "network is unreachable") ||
strings.Contains(errMsg, "fail to send request") || strings.Contains(errMsg, "server misbehaving") ||
strings.Contains(errMsg, "i/o timeout") {
return "failed to vertify connection parameters. please check your host list"
return "Failed to verify connection parameters. Please check your host list"
}
return "failed to vertify connection parameters: " + errMsg
return "Failed to verify connection parameters: " + errMsg
}

// this function calls ClusterCommand.FetchNodesDetails() for each input hosts and return both valid and invalid hosts
Expand Down
5 changes: 5 additions & 0 deletions rfc7807/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,11 @@ var (
"Failed to create file",
http.StatusInternalServerError,
)
GenericDeleteFileError = newProblemID(
path.Join(errorEndpointsPrefix, "delete-file-failure-error"),
"Failed to delete file",
http.StatusInternalServerError,
)
MessageQueueFull = newProblemID(
path.Join(errorEndpointsPrefix, "message-queue-full"),
"Message queue is full",
Expand Down
18 changes: 17 additions & 1 deletion vclusterops/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ const (
nmaSuccessfulReturnCode = 0
)

// produceTransferConfigOps generates instructions to transfert some config
// produceTransferConfigOps generates instructions to transfer some config
// files from a sourceConfig node to target nodes.
func produceTransferConfigOps(instructions *[]clusterOp, sourceConfigHost,
targetHosts []string, vdb *VCoordinationDatabase, sandbox *string) {
Expand All @@ -55,6 +55,22 @@ func produceTransferConfigOps(instructions *[]clusterOp, sourceConfigHost,
)
}

// produceTransferLicenseOps generates instructions to transfer a license
// file from a source node to a target node.
func produceTransferLicenseOps(instructions *[]clusterOp, sourceHost,
targetHost, sourceFilePath, targetFilePath string) {
var licenseContent string
nmaDownloadLicenseOp := makeNMADownloadLicenseOp(
sourceHost, sourceFilePath, &licenseContent)
nmaUploadLicenseOp := makeNMAUploadLicenseOp(
sourceHost, targetHost, targetFilePath, &licenseContent)

*instructions = append(*instructions,
&nmaDownloadLicenseOp,
&nmaUploadLicenseOp,
)
}

// Get catalog path after we have db information from /catalog/database endpoint
func updateCatalogPathMapFromCatalogEditor(hosts []string, nmaVDB *nmaVDatabase, catalogPathMap map[string]string) error {
if len(hosts) == 0 {
Expand Down
114 changes: 114 additions & 0 deletions vclusterops/nma_delete_file_op.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package vclusterops

import (
"encoding/json"
"errors"
"fmt"
)

const (
delFileOpName = "NMADeleteFileOp"
delFileOpDesc = "Delete file"
)

// this op is for deleting a single file on the specified host
type nmaDeleteFileOp struct {
opBase
filePath string
hostRequestBodyMap map[string]string
}

type deleteFileData struct {
FilePath string `json:"file_path"`
}

func makeNMADeleteFileOp(hosts []string, filePath string) nmaDeleteFileOp {
op := nmaDeleteFileOp{}
op.name = delFileOpName
op.description = delFileOpDesc
op.hosts = hosts
op.filePath = filePath

return op
}

// make https json data
func (op *nmaDeleteFileOp) setupRequestBody() (map[string]string, error) {
hostRequestBodyMap := make(map[string]string, len(op.hosts))
for _, host := range op.hosts {
requestData := deleteFileData{}
requestData.FilePath = op.filePath

dataBytes, err := json.Marshal(requestData)
if err != nil {
return nil, fmt.Errorf("[%s] fail to marshal request data to JSON string, detail %w", op.name, err)
}
hostRequestBodyMap[host] = string(dataBytes)
}

op.logger.Info("request data", "op name", op.name, "hostRequestBodyMap", op.hostRequestBodyMap)
return hostRequestBodyMap, nil
}

func (op *nmaDeleteFileOp) setupClusterHTTPRequest(hostRequestBodyMap map[string]string) error {
for host, requestBody := range hostRequestBodyMap {
httpRequest := hostHTTPRequest{}
httpRequest.Method = PostMethod
httpRequest.buildNMAEndpoint("files/delete")
httpRequest.RequestData = requestBody
op.clusterHTTPRequest.RequestCollection[host] = httpRequest
}

return nil
}

func (op *nmaDeleteFileOp) prepare(execContext *opEngineExecContext) error {
hostRequestBodyMap, err := op.setupRequestBody()
if err != nil {
return err
}

execContext.dispatcher.setup(op.hosts)

return op.setupClusterHTTPRequest(hostRequestBodyMap)
}

func (op *nmaDeleteFileOp) execute(execContext *opEngineExecContext) error {
if err := op.runExecute(execContext); err != nil {
return err
}

return op.processResult(execContext)
}

func (op *nmaDeleteFileOp) finalize(_ *opEngineExecContext) error {
return nil
}

func (op *nmaDeleteFileOp) processResult(_ *opEngineExecContext) error {
var allErrs error

for host, result := range op.clusterHTTPRequest.ResultCollection {
op.logResponse(host, result)

if result.isPassing() {
// the response object will be a map e.g,.:
// {'/tmp/dummy_file.txt': 'deleted'}
responseObj, err := op.parseAndCheckMapResponse(host, result.content)
if err != nil {
allErrs = errors.Join(allErrs, err)
continue
}

_, ok := responseObj["delete_file_return_code"]
if !ok {
err = fmt.Errorf(`[%s] response does not contain field "delete_file_return_code"`, op.name)
allErrs = errors.Join(allErrs, err)
}
} else {
allErrs = errors.Join(allErrs, result.err)
}
}

return allErrs
}
33 changes: 33 additions & 0 deletions vclusterops/nma_download_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
const (
spreadConf = "config/spread"
verticaConf = "config/vertica"
licenseKey = "config/license"
)

type nmaDownloadConfigOp struct {
Expand Down Expand Up @@ -59,6 +60,23 @@ func makeNMADownloadConfigOp(
return op
}

func makeNMADownloadLicenseOp(
sourceHost, filePath string,
fileContent *string) nmaDownloadConfigOp {
op := nmaDownloadConfigOp{}
op.name = "NMADownloadLicenseKeyOp"
op.hosts = []string{sourceHost}
op.endpoint = licenseKey
op.description = "Get contents of license key"
op.fileContent = fileContent
op.catalogPathMap = make(map[string]string)
op.catalogPathMap[sourceHost] = filePath
op.vdb = nil
// upgrade license key can only be done on main cluster
op.sandbox = nil
return op
}

func (op *nmaDownloadConfigOp) setupClusterHTTPRequest(hosts []string) error {
for _, host := range hosts {
httpRequest := hostHTTPRequest{}
Expand All @@ -78,6 +96,21 @@ func (op *nmaDownloadConfigOp) setupClusterHTTPRequest(hosts []string) error {
}

func (op *nmaDownloadConfigOp) prepare(execContext *opEngineExecContext) error {
// shortcut for license key, as the op has to be done on the passed-in host
if op.endpoint == licenseKey {
return op.prepareForDownloadLicense(execContext)
}

return op.prepareForDownloadConfigs(execContext)
}

func (op *nmaDownloadConfigOp) prepareForDownloadLicense(execContext *opEngineExecContext) error {
execContext.dispatcher.setup(op.hosts)

return op.setupClusterHTTPRequest(op.hosts)
}

func (op *nmaDownloadConfigOp) prepareForDownloadConfigs(execContext *opEngineExecContext) error {
op.catalogPathMap = make(map[string]string)
// vdb is built by calling /cluster and /nodes endpoints of a running db.
// If nodes' info is not available in vdb, we will get the host from execContext.nmaVDatabase which is build by reading the catalog editor
Expand Down
43 changes: 36 additions & 7 deletions vclusterops/nma_upload_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func makeNMAUploadConfigOp(
} else if op.endpoint == spreadConf {
op.description = "Send contents of spread.conf to nodes"
}

op.fileContent = fileContent
op.catalogPathMap = make(map[string]string)
op.sourceConfigHost = sourceConfigHost
Expand All @@ -73,6 +74,26 @@ func makeNMAUploadConfigOp(
return op
}

func makeNMAUploadLicenseOp(
sourceHost, targetHost, tempLicensePath string,
fileContent *string,
) nmaUploadConfigOp {
op := nmaUploadConfigOp{}
op.name = "NMAUploadLicenseOp"
op.endpoint = licenseKey
op.description = "Send contents of license key to the target node"
op.fileContent = fileContent
// re-use the catalog_path as the path for writing the temp license file
op.catalogPathMap = make(map[string]string)
op.catalogPathMap[targetHost] = tempLicensePath
op.sourceConfigHost = []string{sourceHost}
op.destHosts = []string{targetHost}
op.hosts = op.destHosts
op.vdb = nil

return op
}

func (op *nmaUploadConfigOp) setupRequestBody(hosts []string) error {
op.hostRequestBodyMap = make(map[string]string)

Expand Down Expand Up @@ -104,7 +125,21 @@ func (op *nmaUploadConfigOp) setupClusterHTTPRequest(hosts []string) error {
return nil
}

func (op *nmaUploadConfigOp) completePrepare(execContext *opEngineExecContext) error {
err := op.setupRequestBody(op.hosts)
if err != nil {
return err
}
execContext.dispatcher.setup(op.hosts)

return op.setupClusterHTTPRequest(op.hosts)
}

func (op *nmaUploadConfigOp) prepare(execContext *opEngineExecContext) error {
// shortcut for transferring license, it only has to be on the target host
if op.endpoint == licenseKey {
return op.completePrepare(execContext)
}
op.catalogPathMap = make(map[string]string)
// If any node's info is available, we set catalogPathMap from node's info.
// This case is used for starting nodes operation.
Expand Down Expand Up @@ -156,13 +191,7 @@ func (op *nmaUploadConfigOp) prepare(execContext *opEngineExecContext) error {
}
}

err := op.setupRequestBody(op.hosts)
if err != nil {
return err
}
execContext.dispatcher.setup(op.hosts)

return op.setupClusterHTTPRequest(op.hosts)
return op.completePrepare(execContext)
}

func (op *nmaUploadConfigOp) execute(execContext *opEngineExecContext) error {
Expand Down
15 changes: 13 additions & 2 deletions vclusterops/replication_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package vclusterops
import (
"fmt"
"sort"
"strings"
"time"

"github.com/vertica/vcluster/vclusterops/util"
Expand Down Expand Up @@ -48,6 +49,8 @@ type ReplicationStatusResponse struct {
// 'started', 'failed', 'completed'
Status string `json:"status"`

ErrMsg string

// Node the current replication operation is on
NodeName string `json:"node_name"`

Expand Down Expand Up @@ -237,8 +240,7 @@ func getFinalReplicationStatus(replicationStatus []ReplicationStatusResponse) *R

// Get the rest of the status info from the current op (the last op in the sorted list)
currentOp := replicationStatus[len(replicationStatus)-1]

finalReplicationStatus.Status = currentOp.Status
setStatusAndErrMsg(&finalReplicationStatus, &currentOp)
finalReplicationStatus.EndTime = currentOp.EndTime
finalReplicationStatus.OpName = currentOp.OpName
finalReplicationStatus.SentBytes = currentOp.SentBytes
Expand All @@ -247,3 +249,12 @@ func getFinalReplicationStatus(replicationStatus []ReplicationStatusResponse) *R

return &finalReplicationStatus
}

func setStatusAndErrMsg(finalReplicationStatus, currentOp *ReplicationStatusResponse) {
if strings.HasPrefix(currentOp.Status, "failed:") {
finalReplicationStatus.Status = "failed"
finalReplicationStatus.ErrMsg = strings.TrimSpace(currentOp.Status[7:])
} else {
finalReplicationStatus.Status = currentOp.Status
}
}
Loading

0 comments on commit a635dff

Please sign in to comment.