Skip to content

Commit

Permalink
Adding context to log statements (#1260)
Browse files Browse the repository at this point in the history
* adding actionset labels in context as fields

* introducing new function to log with context

* drying out Log function and using LogWithCtx internally

* removing ActionsetName from field and passing it through ctx

* consider labels only with prefix `kanister.io/`

* removing code which adds fields in context

* intializing variable without init function

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
chaitanya-baraskar and mergify[bot] authored Mar 29, 2022
1 parent ecbf4eb commit a932e18
Show file tree
Hide file tree
Showing 20 changed files with 93 additions and 71 deletions.
1 change: 1 addition & 0 deletions pkg/consts/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ const (
GoogleCloudCredsFilePath = "/tmp/creds.txt"
LabelKeyCreatedBy = "createdBy"
LabelValueKanister = "kanister"
LabelPrefix = "kanister.io/"
)

// These names are used to query ActionSet API objects.
Expand Down
26 changes: 21 additions & 5 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"context"
"fmt"
"reflect"
"strings"
"sync"

customresource "github.com/kanisterio/kanister/pkg/customresource"
Expand Down Expand Up @@ -216,24 +217,32 @@ func (c *Controller) onAddBlueprint(bp *crv1alpha1.Blueprint) {

// nolint:unparam
func (c *Controller) onUpdateActionSet(oldAS, newAS *crv1alpha1.ActionSet) error {
ctx := field.Context(context.Background(), consts.ActionsetNameKey, newAS.GetName())
// adding labels with prefix "kanister.io/" in the context as field for better logging
for key, value := range newAS.GetLabels() {
if strings.HasPrefix(key, consts.LabelPrefix) {
ctx = field.Context(ctx, key, value)
}
}

if err := validate.ActionSet(newAS); err != nil {
log.Print("Updated ActionSet", field.M{"ActionSetName": newAS.Name})
log.WithContext(ctx).Print("Updated ActionSet")
return err
}
if newAS.Status == nil || newAS.Status.State != crv1alpha1.StateRunning {
if newAS.Status == nil {
log.Print("Updated ActionSet", field.M{"Actionset": newAS.Name, "Status": "nil"})
log.WithContext(ctx).Print("Updated ActionSet", field.M{"Status": "nil"})
} else if newAS.Status.State == crv1alpha1.StateComplete {
c.logAndSuccessEvent(context.TODO(), fmt.Sprintf("Updated ActionSet '%s' Status->%s", newAS.Name, newAS.Status.State), "Update Complete", newAS)
c.logAndSuccessEvent(ctx, fmt.Sprintf("Updated ActionSet '%s' Status->%s", newAS.Name, newAS.Status.State), "Update Complete", newAS)
} else {
log.Print("Updated ActionSet", field.M{"Actionset": newAS.Name, "Status": newAS.Status.State})
log.WithContext(ctx).Print("Updated ActionSet", field.M{"Status": newAS.Status.State})
}
return nil
}
for _, as := range newAS.Status.Actions {
for _, p := range as.Phases {
if p.State != crv1alpha1.StateComplete {
log.Print("Updated ActionSet", field.M{"Actionset": newAS.Name, "Status": newAS.Status.State, "Phase": fmt.Sprintf("%s->%s", p.Name, p.State)})
log.WithContext(ctx).Print("Updated ActionSet", field.M{"Status": newAS.Status.State, "Phase": fmt.Sprintf("%s->%s", p.Name, p.State)})
return nil
}
}
Expand Down Expand Up @@ -350,6 +359,13 @@ func (c *Controller) handleActionSet(as *crv1alpha1.ActionSet) (err error) {
}
ctx := context.Background()
ctx = field.Context(ctx, consts.ActionsetNameKey, as.GetName())
// adding labels with prefix "kanister.io/" in the context as field for better logging
for key, value := range as.GetLabels() {
if strings.HasPrefix(key, consts.LabelPrefix) {
ctx = field.Context(ctx, key, value)
}
}

for i := range as.Status.Actions {
if err = c.runAction(ctx, as, i); err != nil {
// If runAction returns an error, it is a failure in the synchronous
Expand Down
25 changes: 19 additions & 6 deletions pkg/format/format.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package format

import (
"bufio"
"context"
"io"
"regexp"
"strings"
Expand All @@ -24,13 +25,10 @@ import (
"github.com/kanisterio/kanister/pkg/log"
)

var regex = regexp.MustCompile("[\r\n]")

func Log(podName string, containerName string, output string) {
if output != "" {
logs := regexp.MustCompile("[\r\n]").Split(output, -1)
for _, l := range logs {
info(podName, containerName, l)
}
}
LogWithCtx(context.Background(), podName, containerName, output)
}

func LogStream(podName string, containerName string, output io.ReadCloser) chan string {
Expand All @@ -55,3 +53,18 @@ func info(podName string, containerName string, l string) {
log.Print("Pod Update", field.M{"Pod": podName, "Container": containerName, "Out": l})
}
}

func LogWithCtx(ctx context.Context, podName string, containerName string, output string) {
if output != "" {
logs := regex.Split(output, -1)
for _, l := range logs {
infoWithCtx(ctx, podName, containerName, l)
}
}
}

func infoWithCtx(ctx context.Context, podName string, containerName string, l string) {
if strings.TrimSpace(l) != "" {
log.WithContext(ctx).Print("Pod Update", field.M{"Pod": podName, "Container": containerName, "Out": l})
}
}
4 changes: 2 additions & 2 deletions pkg/function/backup_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,8 @@ func backupData(ctx context.Context, cli kubernetes.Interface, namespace, pod, c
return backupDataParsedOutput{}, err
}
stdout, stderr, err := kube.Exec(cli, namespace, pod, container, cmd, nil)
format.Log(pod, container, stdout)
format.Log(pod, container, stderr)
format.LogWithCtx(ctx, pod, container, stdout)
format.LogWithCtx(ctx, pod, container, stderr)
if err != nil {
return backupDataParsedOutput{}, errors.Wrapf(err, "Failed to create and upload backup")
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/function/backup_data_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ func backupDataStatsPodFunc(cli kubernetes.Interface, tp param.TemplateParams, n
return nil, err
}
stdout, stderr, err := kube.Exec(cli, namespace, pod.Name, pod.Spec.Containers[0].Name, cmd, nil)
format.Log(pod.Name, pod.Spec.Containers[0].Name, stdout)
format.Log(pod.Name, pod.Spec.Containers[0].Name, stderr)
format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stdout)
format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stderr)
if err != nil {
return nil, errors.Wrapf(err, "Failed to get backup stats")
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/function/copy_volume_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ func copyVolumeDataPodFunc(cli kubernetes.Interface, tp param.TemplateParams, na
return nil, err
}
stdout, stderr, err := kube.Exec(cli, namespace, pod.Name, pod.Spec.Containers[0].Name, cmd, nil)
format.Log(pod.Name, pod.Spec.Containers[0].Name, stdout)
format.Log(pod.Name, pod.Spec.Containers[0].Name, stderr)
format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stdout)
format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stderr)
if err != nil {
return nil, errors.Wrapf(err, "Failed to create and upload backup")
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/function/create_rds_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,15 @@ func createRDSSnapshot(ctx context.Context, instanceID string, dbEngine RDSDBEng
// Create Snapshot
snapshotID := fmt.Sprintf("%s-%s", instanceID, rand.String(10))

log.Print("Creating RDS snapshot", field.M{"SnapshotID": snapshotID})
log.WithContext(ctx).Print("Creating RDS snapshot", field.M{"SnapshotID": snapshotID})
if !isAuroraCluster(string(dbEngine)) {
dbSnapshotOutput, err := rdsCli.CreateDBSnapshot(ctx, instanceID, snapshotID)
if err != nil {
return nil, errors.Wrap(err, "Failed to create snapshot")
}

// Wait until snapshot becomes available
log.Print("Waiting for RDS snapshot to be available", field.M{"SnapshotID": snapshotID})
log.WithContext(ctx).Print("Waiting for RDS snapshot to be available", field.M{"SnapshotID": snapshotID})
if err := rdsCli.WaitUntilDBSnapshotAvailable(ctx, snapshotID); err != nil {
return nil, errors.Wrap(err, "Error while waiting snapshot to be available")
}
Expand All @@ -107,7 +107,7 @@ func createRDSSnapshot(ctx context.Context, instanceID string, dbEngine RDSDBEng
return nil, errors.Wrap(err, "Failed to create cluster snapshot")
}

log.Print("Waiting for RDS Aurora snapshot to be available", field.M{"SnapshotID": snapshotID})
log.WithContext(ctx).Print("Waiting for RDS Aurora snapshot to be available", field.M{"SnapshotID": snapshotID})
if err := rdsCli.WaitUntilDBClusterSnapshotAvailable(ctx, snapshotID); err != nil {
return nil, errors.Wrap(err, "Error while waiting snapshot to be available")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/function/create_volume_from_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func createVolumeFromSnapshot(ctx context.Context, cli kubernetes.Interface, nam
if err != nil {
return nil, errors.Wrapf(err, "Unable to create PV for volume %v", *vol)
}
log.Print("Restore/Create volume from snapshot completed", field.M{"PVC": pvc, "Volume": pv})
log.WithContext(ctx).Print("Restore/Create volume from snapshot completed", field.M{"PVC": pvc, "Volume": pv})
providerList[pvcInfo.PVCName] = provider
}
return providerList, nil
Expand Down
8 changes: 4 additions & 4 deletions pkg/function/delete_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ func deleteDataPodFunc(cli kubernetes.Interface, tp param.TemplateParams, reclai
return nil, err
}
stdout, stderr, err := kube.Exec(cli, namespace, pod.Name, pod.Spec.Containers[0].Name, cmd, nil)
format.Log(pod.Name, pod.Spec.Containers[0].Name, stdout)
format.Log(pod.Name, pod.Spec.Containers[0].Name, stderr)
format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stdout)
format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stderr)
if err != nil {
return nil, errors.Wrapf(err, "Failed to forget data, could not get snapshotID from tag, Tag: %s", deleteTag)
}
Expand All @@ -117,8 +117,8 @@ func deleteDataPodFunc(cli kubernetes.Interface, tp param.TemplateParams, reclai
return nil, err
}
stdout, stderr, err := kube.Exec(cli, namespace, pod.Name, pod.Spec.Containers[0].Name, cmd, nil)
format.Log(pod.Name, pod.Spec.Containers[0].Name, stdout)
format.Log(pod.Name, pod.Spec.Containers[0].Name, stderr)
format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stdout)
format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stderr)
if err != nil {
return nil, errors.Wrapf(err, "Failed to forget data")
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/function/delete_rds_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,41 +68,41 @@ func deleteRDSSnapshot(ctx context.Context, snapshotID string, profile *param.Pr

if !isAuroraCluster(string(dbEngine)) {
// Delete Snapshot
log.Print("Deleting RDS snapshot", field.M{"SnapshotID": snapshotID})
log.WithContext(ctx).Print("Deleting RDS snapshot", field.M{"SnapshotID": snapshotID})
_, err := rdsCli.DeleteDBSnapshot(ctx, snapshotID)
if err != nil {
if err, ok := err.(awserr.Error); ok {
switch err.Code() {
case awsrds.ErrCodeDBSnapshotNotFoundFault:
log.Info().Print("Could not find matching RDS snapshot; might have been deleted previously", field.M{"SnapshotId": snapshotID})
log.WithContext(ctx).Print("Could not find matching RDS snapshot; might have been deleted previously", field.M{"SnapshotId": snapshotID})
return nil, nil
default:
return nil, errors.Wrap(err, "Failed to delete snapshot")
}
}
}
// Wait until snapshot is deleted
log.Print("Waiting for RDS snapshot to be deleted", field.M{"SnapshotID": snapshotID})
log.WithContext(ctx).Print("Waiting for RDS snapshot to be deleted", field.M{"SnapshotID": snapshotID})
err = rdsCli.WaitUntilDBSnapshotDeleted(ctx, snapshotID)
return nil, errors.Wrap(err, "Error while waiting for snapshot to be deleted")
}

// delete Aurora DB cluster snapshot
log.Print("Deleting Aurora DB cluster snapshot")
log.WithContext(ctx).Print("Deleting Aurora DB cluster snapshot")
_, err = rdsCli.DeleteDBClusterSnapshot(ctx, snapshotID)
if err != nil {
if err, ok := err.(awserr.Error); ok {
switch err.Code() {
case awsrds.ErrCodeDBClusterSnapshotNotFoundFault:
log.Info().Print("Could not find matching Aurora DB cluster snapshot; might have been deleted previously", field.M{"SnapshotId": snapshotID})
log.WithContext(ctx).Print("Could not find matching Aurora DB cluster snapshot; might have been deleted previously", field.M{"SnapshotId": snapshotID})
return nil, nil
default:
return nil, errors.Wrap(err, "Error deleting Aurora DB cluster snapshot")
}
}
}

log.Print("Waiting for Aurora DB cluster snapshot to be deleted")
log.WithContext(ctx).Print("Waiting for Aurora DB cluster snapshot to be deleted")
err = rdsCli.WaitUntilDBClusterDeleted(ctx, snapshotID)

return nil, errors.Wrap(err, "Error waiting for Aurora DB cluster snapshot to be deleted")
Expand Down
4 changes: 2 additions & 2 deletions pkg/function/delete_volume_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,15 @@ func deleteVolumeSnapshot(ctx context.Context, cli kubernetes.Interface, namespa
snapshot, err := provider.SnapshotGet(ctx, pvcInfo.SnapshotID)
if err != nil {
if strings.Contains(err.Error(), SnapshotDoesNotExistError) {
log.Debug().Print("Snapshot already deleted", field.M{"SnapshotID": pvcInfo.SnapshotID})
log.WithContext(ctx).Print("Snapshot already deleted", field.M{"SnapshotID": pvcInfo.SnapshotID})
} else {
return nil, errors.Wrapf(err, "Failed to get Snapshot from Provider")
}
}
if err = provider.SnapshotDelete(ctx, snapshot); err != nil {
return nil, err
}
log.Print("Successfully deleted snapshot", field.M{"SnapshotID": pvcInfo.SnapshotID})
log.WithContext(ctx).Print("Successfully deleted snapshot", field.M{"SnapshotID": pvcInfo.SnapshotID})
providerList[pvcInfo.PVCName] = provider
}
return providerList, nil
Expand Down
4 changes: 2 additions & 2 deletions pkg/function/describe_backups.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ func describeBackupsPodFunc(cli kubernetes.Interface, tp param.TemplateParams, n
return nil, err
}
stdout, stderr, err := kube.Exec(cli, namespace, pod.Name, pod.Spec.Containers[0].Name, cmd, nil)
format.Log(pod.Name, pod.Spec.Containers[0].Name, stdout)
format.Log(pod.Name, pod.Spec.Containers[0].Name, stderr)
format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stdout)
format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stderr)
if err != nil {
return nil, errors.Wrapf(err, "Failed to get backup stats")
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/function/export_rds_snapshot_location.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func exportRDSSnapshotToLoc(ctx context.Context, namespace, instanceID, snapshot
}
}

log.Print("Spin up temporary RDS instance from the snapshot.", field.M{"SnapshotID": snapshotID, "InstanceID": tmpInstanceID})
log.WithContext(ctx).Print("Spin up temporary RDS instance from the snapshot.", field.M{"SnapshotID": snapshotID, "InstanceID": tmpInstanceID})
// Create tmp instance from snapshot
if err := restoreFromSnapshot(ctx, rdsCli, tmpInstanceID, snapshotID, sgIDs); err != nil {
return nil, errors.Wrapf(err, "Failed to restore snapshot. SnapshotID=%s", snapshotID)
Expand Down Expand Up @@ -320,13 +320,13 @@ func postgresBackupCommand(dbEndpoint, username, password string, dbList []strin

func cleanupRDSDB(ctx context.Context, rdsCli *rds.RDS, instanceID string) error {
// Deleting tmp instance
log.Print("Delete temporary RDS instance.", field.M{"InstanceID": instanceID})
log.WithContext(ctx).Print("Delete temporary RDS instance.", field.M{"InstanceID": instanceID})
if _, err := rdsCli.DeleteDBInstance(ctx, instanceID); err != nil {
return err
}

// Wait until instance is deleted
log.Print("Waiting for RDS DB instance to be deleted", field.M{"InstanceID": instanceID})
log.WithContext(ctx).Print("Waiting for RDS DB instance to be deleted", field.M{"InstanceID": instanceID})
return rdsCli.WaitUntilDBInstanceDeleted(ctx, instanceID)
}

Expand Down
8 changes: 2 additions & 6 deletions pkg/function/kube_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ import (
"github.com/pkg/errors"

kanister "github.com/kanisterio/kanister/pkg"
"github.com/kanisterio/kanister/pkg/consts"
"github.com/kanisterio/kanister/pkg/field"
"github.com/kanisterio/kanister/pkg/format"
"github.com/kanisterio/kanister/pkg/kube"
"github.com/kanisterio/kanister/pkg/output"
Expand Down Expand Up @@ -93,11 +91,9 @@ func (kef *kubeExecFunc) Exec(ctx context.Context, tp param.TemplateParams, args
if err = Arg(args, KubeExecCommandArg, &cmd); err != nil {
return nil, err
}
ctx = field.Context(ctx, consts.PodNameKey, pod)
_ = field.Context(ctx, consts.ContainerNameKey, container)
stdout, stderr, err := kube.Exec(cli, namespace, pod, container, cmd, nil)
format.Log(pod, container, stdout)
format.Log(pod, container, stderr)
format.LogWithCtx(ctx, pod, container, stdout)
format.LogWithCtx(ctx, pod, container, stderr)
if err != nil {
return nil, err
}
Expand Down
8 changes: 2 additions & 6 deletions pkg/function/kube_exec_all.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ import (
"k8s.io/client-go/kubernetes"

kanister "github.com/kanisterio/kanister/pkg"
"github.com/kanisterio/kanister/pkg/consts"
"github.com/kanisterio/kanister/pkg/field"
"github.com/kanisterio/kanister/pkg/format"
"github.com/kanisterio/kanister/pkg/kube"
"github.com/kanisterio/kanister/pkg/param"
Expand Down Expand Up @@ -102,11 +100,9 @@ func execAll(ctx context.Context, cli kubernetes.Interface, namespace string, ps
for _, p := range ps {
for _, c := range cs {
go func(p string, c string) {
ctx = field.Context(ctx, consts.PodNameKey, p)
ctx = field.Context(ctx, consts.ContainerNameKey, c)
stdout, stderr, err := kube.Exec(cli, namespace, p, c, cmd, nil)
format.Log(p, c, stdout)
format.Log(p, c, stderr)
format.LogWithCtx(ctx, p, c, stdout)
format.LogWithCtx(ctx, p, c, stderr)
errChan <- err
output = output + "\n" + stdout
}(p, c)
Expand Down
2 changes: 1 addition & 1 deletion pkg/function/prepare_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func prepareDataPodFunc(cli kubernetes.Interface) func(ctx context.Context, pod
if err != nil {
return nil, errors.Wrapf(err, "Failed to fetch logs from the pod")
}
format.Log(pod.Name, pod.Spec.Containers[0].Name, logs)
format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, logs)
out, err := parseLogAndCreateOutput(logs)
return out, errors.Wrap(err, "Failed to parse phase output")
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/function/restore_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,8 @@ func restoreDataPodFunc(cli kubernetes.Interface, tp param.TemplateParams, names
return nil, err
}
stdout, stderr, err := kube.Exec(cli, namespace, pod.Name, pod.Spec.Containers[0].Name, cmd, nil)
format.Log(pod.Name, pod.Spec.Containers[0].Name, stdout)
format.Log(pod.Name, pod.Spec.Containers[0].Name, stderr)
format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stdout)
format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stderr)
if err != nil {
return nil, errors.Wrapf(err, "Failed to restore backup")
}
Expand Down
Loading

0 comments on commit a932e18

Please sign in to comment.