Skip to content

Commit

Permalink
🎨 Automatic purge for local data repo #13091
Browse files Browse the repository at this point in the history
  • Loading branch information
88250 committed Nov 17, 2024
1 parent 02165bc commit 671333f
Showing 1 changed file with 7 additions and 19 deletions.
26 changes: 7 additions & 19 deletions kernel/model/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,26 +94,26 @@ func autoPurgeRepo(cron bool) {

now := time.Now()

dateGroupedIndexes := map[string][]*dejavu.Log{} // 按照日期分组
dateGroupedIndexes := map[string][]*entity.Index{} // 按照日期分组
// 收集指定日期内需要保留的索引
var date string
page := 1
for {
indexLogs, pageCount, _, err := repo.GetIndexLogs(page, 512)
indexes, pageCount, _, err := repo.GetIndexes(page, 512)
if nil != err {
logging.LogErrorf("get data repo index logs failed: %s", err)
return
}
if 1 > len(indexLogs) {
if 1 > len(indexes) {
break
}

tooOld := false
for _, index := range indexLogs {
for _, index := range indexes {
if now.UnixMilli()-index.Created <= int64(Conf.Repo.IndexRetentionDays)*24*60*60*1000 {
date = time.UnixMilli(index.Created).Format("2006-01-02")
if _, ok := dateGroupedIndexes[date]; !ok {
dateGroupedIndexes[date] = []*dejavu.Log{}
dateGroupedIndexes[date] = []*entity.Index{}
}
dateGroupedIndexes[date] = append(dateGroupedIndexes[date], index)
} else {
Expand Down Expand Up @@ -148,7 +148,7 @@ func autoPurgeRepo(cron bool) {
}

for _, keepIndex := range keepIndexes.Values() {
retentionIndexIDs = append(retentionIndexIDs, keepIndex.(*dejavu.Log).ID)
retentionIndexIDs = append(retentionIndexIDs, keepIndex.(*entity.Index).ID)
}
}

Expand All @@ -158,16 +158,7 @@ func autoPurgeRepo(cron bool) {
return
}

stat, err := repo.Purge(retentionIndexIDs...)
if err != nil {
return
}

deletedIndexes := stat.Indexes
deletedObjects := stat.Objects
deletedSize := humanize.BytesCustomCeil(uint64(stat.Size), 2)
logging.LogInfof("purge data repo completed [ellapsed=%.2fs, indexes=%d, objects=%d, size=%s]",
time.Since(now).Seconds(), deletedIndexes, deletedObjects, deletedSize)
_, err = repo.Purge(retentionIndexIDs...)
}

func GetRepoFile(fileID string) (ret []byte, p string, err error) {
Expand Down Expand Up @@ -629,7 +620,6 @@ func PurgeRepo() (err error) {
return
}

now := time.Now()
stat, err := repo.Purge()
if err != nil {
return
Expand All @@ -638,8 +628,6 @@ func PurgeRepo() (err error) {
deletedIndexes := stat.Indexes
deletedObjects := stat.Objects
deletedSize := humanize.BytesCustomCeil(uint64(stat.Size), 2)
logging.LogInfof("purge data repo completed [ellapsed=%.2fs, indexes=%d, objects=%d, size=%s]",
time.Since(now).Seconds(), deletedIndexes, deletedObjects, deletedSize)
msg = fmt.Sprintf(Conf.Language(203), deletedIndexes, deletedObjects, deletedSize)
util.PushMsg(msg, 7000)
return
Expand Down

0 comments on commit 671333f

Please sign in to comment.