diff --git a/CHANGELOG.md b/CHANGELOG.md index 799db9ab7..775998bbd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ ### New features ### Enhancements +- When using the file writer in `cameraexporter`, we rotate files in chronological order now and rotate half of files one time. ([#420](https://github.com/KindlingProject/kindling/pull/420)) - Support to identify the MySQL protocol with statements `commit` and `set`. ([#417](https://github.com/KindlingProject/kindling/pull/417)) diff --git a/collector/pkg/component/consumer/exporter/cameraexporter/filewriter.go b/collector/pkg/component/consumer/exporter/cameraexporter/filewriter.go index bd4ebfffb..dbe798720 100644 --- a/collector/pkg/component/consumer/exporter/cameraexporter/filewriter.go +++ b/collector/pkg/component/consumer/exporter/cameraexporter/filewriter.go @@ -7,6 +7,7 @@ import ( "os" "path" "path/filepath" + "sort" "strconv" "github.com/Kindling-project/kindling/collector/pkg/component" @@ -86,14 +87,20 @@ func (fw *fileWriter) writeFile(baseDir string, fileName string, group *model.Da // Check whether the count of files is greater than MaxCount err := fw.rotateFiles(baseDir) if err != nil { - fw.logger.Infof("can't rotate files in %s: %v", baseDir, err) + fw.logger.Warnf("can't rotate files in %s: %v", baseDir, err) } filePath := filepath.Join(baseDir, fileName) f, err := os.Create(filePath) - defer f.Close() + defer func(f *os.File) { + err := f.Close() + if err != nil { + fw.logger.Warnf("Failed to close the file %s", filePath) + } + }(f) if err != nil { return fmt.Errorf("can't create new file: %w", err) } + fw.logger.Debugf("Create a trace file at [%s]", filePath) bytes, err := json.Marshal(group) if err != nil { return fmt.Errorf("can't marshal DataGroup: %w", err) @@ -108,7 +115,7 @@ func (fw *fileWriter) rotateFiles(baseDir string) error { return nil } // Get all files path - toBeRotated, err := getFilesName(baseDir) + toBeRotated, err := getDirEntryInTimeOrder(baseDir) if err != nil { return fmt.Errorf("can't get files list: %w", err) } @@ -116,25 +123,50 @@ func (fw *fileWriter) rotateFiles(baseDir string) error { if len(toBeRotated) < fw.config.MaxFileCount { return nil } - // TODO Remove the older files - toBeRotated = toBeRotated[:len(toBeRotated)-fw.config.MaxFileCount+1] + // Remove the older files and remove half of them one time to decrease the frequency + // of deleting files. Note this is different from rotating log files. We could delete + // one file at a time for log files because the action "rotate" is in a low frequency + // in that case. + toBeRotated = toBeRotated[:len(toBeRotated)-fw.config.MaxFileCount/2+1] // Remove the stale files asynchronously go func() { - for _, file := range toBeRotated { - _ = os.Remove(filepath.Join(baseDir, file)) + for _, dirEntry := range toBeRotated { + _ = os.Remove(filepath.Join(baseDir, dirEntry.Name())) + fw.logger.Infof("Rotate trace files [%s]", dirEntry.Name()) } }() return nil } -func getFilesName(path string) ([]string, error) { +// getDirEntryInTimeOrder returns the directory entries slice in chronological order. +// The result files are sorted based on their modification time. +func getDirEntryInTimeOrder(path string) ([]os.DirEntry, error) { f, err := os.Open(path) if err != nil { return nil, err } - defer f.Close() - files, err := f.Readdirnames(-1) - return files, err + defer func(f *os.File) { + err := f.Close() + if err != nil { + return + } + }(f) + dirs, err := f.ReadDir(-1) + // Sort the files based on their modification time. We don't sort them based on the + // timestamp in the file name because they are similar but the latter one costs more CPU + // considering that we have to split the file name first. + sort.Slice(dirs, func(i, j int) bool { + fileInfoA, err := dirs[i].Info() + if err != nil { + return false + } + fileInfoB, err := dirs[j].Info() + if err != nil { + return false + } + return fileInfoA.ModTime().Before(fileInfoB.ModTime()) + }) + return dirs, err } const dividingLine = "\n------\n" @@ -146,22 +178,29 @@ func (fw *fileWriter) writeCpuEvents(group *model.DataGroup) { fileName := getFileName(pathElements.Protocol, pathElements.ContentKey, pathElements.Timestamp, pathElements.IsServer) filePath := filepath.Join(baseDir, fileName) f, err := os.OpenFile(filePath, os.O_WRONLY|os.O_APPEND, 0) - defer f.Close() + defer func(f *os.File) { + err := f.Close() + if err != nil { + fw.logger.Warnf("Failed to close the file %s", filePath) + } + }(f) if err != nil { - // Just return if we can't find the exported file + fw.logger.Infof("Couldn't open the trace file %s when append CpuEvents: %v. "+ + "Maybe the file has been rotated.", filePath, err) return } _, err = f.Write([]byte(dividingLine)) if err != nil { - fw.logger.Infof("Failed to append CpuEvents to the file %s: %v", filePath, err) + fw.logger.Errorf("Failed to append CpuEvents to the file %s: %v", filePath, err) return } eventsBytes, _ := json.Marshal(group) _, err = f.Write(eventsBytes) if err != nil { - fw.logger.Infof("Failed to append CpuEvents to the file %s: %v", filePath, err) + fw.logger.Errorf("Failed to append CpuEvents to the file %s: %v", filePath, err) return } + fw.logger.Debugf("Write CpuEvents to trace files [%s]", filePath) } func (fw *fileWriter) name() string { diff --git a/collector/pkg/component/consumer/exporter/cameraexporter/filewriter_test.go b/collector/pkg/component/consumer/exporter/cameraexporter/filewriter_test.go index 8781bf8bc..b806647f8 100644 --- a/collector/pkg/component/consumer/exporter/cameraexporter/filewriter_test.go +++ b/collector/pkg/component/consumer/exporter/cameraexporter/filewriter_test.go @@ -1,14 +1,15 @@ package cameraexporter import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/Kindling-project/kindling/collector/pkg/component" "github.com/Kindling-project/kindling/collector/pkg/filepathhelper" "github.com/Kindling-project/kindling/collector/pkg/model" "github.com/Kindling-project/kindling/collector/pkg/model/constlabels" "github.com/Kindling-project/kindling/collector/pkg/model/constnames" - "github.com/stretchr/testify/assert" - "strconv" - "testing" ) func TestWriteTrace(t *testing.T) { @@ -33,9 +34,9 @@ func TestWriteTrace(t *testing.T) { } pathElements := filepathhelper.GetFilePathElements(traceData(int64(pid), uint64(timestamp)), uint64(timestamp)) filePath := writer.pidFilePath(pathElements.WorkloadName, pathElements.PodName, pathElements.ContainerName, pathElements.Pid) - filesName, err := getFilesName(filePath) + filesName, err := getDirEntryInTimeOrder(filePath) assert.NoError(t, err) - assert.Equal(t, fileConfig.MaxFileCount, len(filesName)) + assert.Equal(t, fileConfig.MaxFileCount/2+2, len(filesName)) } func traceData(pid int64, timestamp uint64) *model.DataGroup { @@ -62,25 +63,3 @@ func cpuEvent(startTime int64, extraLabels *model.AttributeMap) *model.DataGroup labels.Merge(extraLabels) return model.NewDataGroup(constnames.CameraEventGroupName, labels, uint64(startTime)) } - -func triggerKey(data *model.DataGroup) string { - timestamp := data.Timestamp - isServer := data.Labels.GetBoolValue(constlabels.IsServer) - var podName string - if isServer { - podName = data.Labels.GetStringValue(constlabels.DstPod) - } else { - podName = data.Labels.GetStringValue(constlabels.SrcPod) - } - if len(podName) == 0 { - podName = "null" - } - var isServerString string - if isServer { - isServerString = "true" - } else { - isServerString = "false" - } - protocol := data.Labels.GetStringValue(constlabels.Protocol) - return podName + "_" + isServerString + "_" + protocol + "_" + strconv.FormatUint(timestamp, 10) -}