Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cameraexporter]Rotate files in chronological order #420

Merged
merged 6 commits into from
Dec 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
### New features

### Enhancements
- When using the file writer in `cameraexporter`, we rotate files in chronological order now and rotate half of files one time. ([#420](https://github.com/KindlingProject/kindling/pull/420))
- Support to identify the MySQL protocol with statements `commit` and `set`. ([#417](https://github.com/KindlingProject/kindling/pull/417))


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os"
"path"
"path/filepath"
"sort"
"strconv"

"github.com/Kindling-project/kindling/collector/pkg/component"
Expand Down Expand Up @@ -86,14 +87,20 @@ func (fw *fileWriter) writeFile(baseDir string, fileName string, group *model.Da
// Check whether the count of files is greater than MaxCount
err := fw.rotateFiles(baseDir)
if err != nil {
fw.logger.Infof("can't rotate files in %s: %v", baseDir, err)
fw.logger.Warnf("can't rotate files in %s: %v", baseDir, err)
}
filePath := filepath.Join(baseDir, fileName)
f, err := os.Create(filePath)
defer f.Close()
defer func(f *os.File) {
err := f.Close()
if err != nil {
fw.logger.Warnf("Failed to close the file %s", filePath)
}
}(f)
if err != nil {
return fmt.Errorf("can't create new file: %w", err)
}
fw.logger.Debugf("Create a trace file at [%s]", filePath)
bytes, err := json.Marshal(group)
if err != nil {
return fmt.Errorf("can't marshal DataGroup: %w", err)
Expand All @@ -108,33 +115,58 @@ func (fw *fileWriter) rotateFiles(baseDir string) error {
return nil
}
// Get all files path
toBeRotated, err := getFilesName(baseDir)
toBeRotated, err := getDirEntryInTimeOrder(baseDir)
if err != nil {
return fmt.Errorf("can't get files list: %w", err)
}
// No need to rotate
if len(toBeRotated) < fw.config.MaxFileCount {
return nil
}
// TODO Remove the older files
toBeRotated = toBeRotated[:len(toBeRotated)-fw.config.MaxFileCount+1]
// Remove the older files and remove half of them one time to decrease the frequency
// of deleting files. Note this is different from rotating log files. We could delete
// one file at a time for log files because the action "rotate" is in a low frequency
// in that case.
toBeRotated = toBeRotated[:len(toBeRotated)-fw.config.MaxFileCount/2+1]
// Remove the stale files asynchronously
go func() {
for _, file := range toBeRotated {
_ = os.Remove(filepath.Join(baseDir, file))
for _, dirEntry := range toBeRotated {
_ = os.Remove(filepath.Join(baseDir, dirEntry.Name()))
fw.logger.Infof("Rotate trace files [%s]", dirEntry.Name())
}
}()
return nil
}

func getFilesName(path string) ([]string, error) {
// getDirEntryInTimeOrder returns the directory entries slice in chronological order.
// The result files are sorted based on their modification time.
func getDirEntryInTimeOrder(path string) ([]os.DirEntry, error) {
f, err := os.Open(path)
if err != nil {
return nil, err
}
defer f.Close()
files, err := f.Readdirnames(-1)
return files, err
defer func(f *os.File) {
err := f.Close()
if err != nil {
return
}
}(f)
dirs, err := f.ReadDir(-1)
// Sort the files based on their modification time. We don't sort them based on the
// timestamp in the file name because they are similar but the latter one costs more CPU
// considering that we have to split the file name first.
sort.Slice(dirs, func(i, j int) bool {
fileInfoA, err := dirs[i].Info()
if err != nil {
return false
}
fileInfoB, err := dirs[j].Info()
if err != nil {
return false
}
return fileInfoA.ModTime().Before(fileInfoB.ModTime())
})
return dirs, err
}

const dividingLine = "\n------\n"
Expand All @@ -146,22 +178,29 @@ func (fw *fileWriter) writeCpuEvents(group *model.DataGroup) {
fileName := getFileName(pathElements.Protocol, pathElements.ContentKey, pathElements.Timestamp, pathElements.IsServer)
filePath := filepath.Join(baseDir, fileName)
f, err := os.OpenFile(filePath, os.O_WRONLY|os.O_APPEND, 0)
defer f.Close()
defer func(f *os.File) {
err := f.Close()
if err != nil {
fw.logger.Warnf("Failed to close the file %s", filePath)
}
}(f)
if err != nil {
// Just return if we can't find the exported file
fw.logger.Infof("Couldn't open the trace file %s when append CpuEvents: %v. "+
"Maybe the file has been rotated.", filePath, err)
return
}
_, err = f.Write([]byte(dividingLine))
if err != nil {
fw.logger.Infof("Failed to append CpuEvents to the file %s: %v", filePath, err)
fw.logger.Errorf("Failed to append CpuEvents to the file %s: %v", filePath, err)
return
}
eventsBytes, _ := json.Marshal(group)
_, err = f.Write(eventsBytes)
if err != nil {
fw.logger.Infof("Failed to append CpuEvents to the file %s: %v", filePath, err)
fw.logger.Errorf("Failed to append CpuEvents to the file %s: %v", filePath, err)
return
}
fw.logger.Debugf("Write CpuEvents to trace files [%s]", filePath)
}

func (fw *fileWriter) name() string {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package cameraexporter

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/Kindling-project/kindling/collector/pkg/component"
"github.com/Kindling-project/kindling/collector/pkg/filepathhelper"
"github.com/Kindling-project/kindling/collector/pkg/model"
"github.com/Kindling-project/kindling/collector/pkg/model/constlabels"
"github.com/Kindling-project/kindling/collector/pkg/model/constnames"
"github.com/stretchr/testify/assert"
"strconv"
"testing"
)

func TestWriteTrace(t *testing.T) {
Expand All @@ -33,9 +34,9 @@ func TestWriteTrace(t *testing.T) {
}
pathElements := filepathhelper.GetFilePathElements(traceData(int64(pid), uint64(timestamp)), uint64(timestamp))
filePath := writer.pidFilePath(pathElements.WorkloadName, pathElements.PodName, pathElements.ContainerName, pathElements.Pid)
filesName, err := getFilesName(filePath)
filesName, err := getDirEntryInTimeOrder(filePath)
assert.NoError(t, err)
assert.Equal(t, fileConfig.MaxFileCount, len(filesName))
assert.Equal(t, fileConfig.MaxFileCount/2+2, len(filesName))
}

func traceData(pid int64, timestamp uint64) *model.DataGroup {
Expand All @@ -62,25 +63,3 @@ func cpuEvent(startTime int64, extraLabels *model.AttributeMap) *model.DataGroup
labels.Merge(extraLabels)
return model.NewDataGroup(constnames.CameraEventGroupName, labels, uint64(startTime))
}

func triggerKey(data *model.DataGroup) string {
timestamp := data.Timestamp
isServer := data.Labels.GetBoolValue(constlabels.IsServer)
var podName string
if isServer {
podName = data.Labels.GetStringValue(constlabels.DstPod)
} else {
podName = data.Labels.GetStringValue(constlabels.SrcPod)
}
if len(podName) == 0 {
podName = "null"
}
var isServerString string
if isServer {
isServerString = "true"
} else {
isServerString = "false"
}
protocol := data.Labels.GetStringValue(constlabels.Protocol)
return podName + "_" + isServerString + "_" + protocol + "_" + strconv.FormatUint(timestamp, 10)
}