From ccb44a36d39fc069b6d1704efc8f30a80d020784 Mon Sep 17 00:00:00 2001 From: Dante Catalfamo <43040593+dantecatalfamo@users.noreply.github.com> Date: Fri, 20 Dec 2024 11:28:45 -0500 Subject: [PATCH] Use `sync.Map` to prevent race on map access in `osquery-perf` (#24501) #24381 --- cmd/osquery-perf/agent.go | 63 +++++++++++++++++++++++++-------------- 1 file changed, 40 insertions(+), 23 deletions(-) diff --git a/cmd/osquery-perf/agent.go b/cmd/osquery-perf/agent.go index 6b306da0128f..0671766fc2ed 100644 --- a/cmd/osquery-perf/agent.go +++ b/cmd/osquery-perf/agent.go @@ -514,8 +514,7 @@ type agent struct { MDMCheckInInterval time.Duration DiskEncryptionEnabled bool - scheduledQueriesMu sync.Mutex // protects the below members - scheduledQueryData map[string]scheduledQuery + scheduledQueryData *sync.Map // bufferedResults contains result logs that are buffered when // /api/v1/osquery/log requests to the Fleet server fail. // @@ -668,6 +667,7 @@ func newAgent( disableFleetDesktop: disableFleetDesktop, loggerTLSMaxLines: loggerTLSMaxLines, bufferedResults: make(map[resultLog]int), + scheduledQueryData: new(sync.Map), } } @@ -777,9 +777,19 @@ func (a *agent) runLoop(i int, onlyAlreadyEnrolled bool) { // check if we have any scheduled queries that should be returning results var results []resultLog now := time.Now().Unix() - a.scheduledQueriesMu.Lock() prevCount := a.countBuffered() - for queryName, query := range a.scheduledQueryData { + + // NOTE The goroutine that pulls in new configurations + // MAY replace this map if it happens to run at the + // exact same time. The result would be. The result + // would be that the query lastRun does not get + // updated and cause the query to run more times than + // expected. + queryData := a.scheduledQueryData + queryData.Range(func(key, value any) bool { + queryName := key.(string) + query := value.(scheduledQuery) + if query.lastRun == 0 || now >= (query.lastRun+int64(query.ScheduleInterval)) { results = append(results, resultLog{ packName: query.packName, @@ -787,18 +797,18 @@ func (a *agent) runLoop(i int, onlyAlreadyEnrolled bool) { numRows: int(query.numRows), }) // Update lastRun - v := a.scheduledQueryData[queryName] - v.lastRun = now - a.scheduledQueryData[queryName] = v + query.lastRun = now + queryData.Store(queryName, query) } - } + + return true + }) if prevCount+len(results) < 1_000_000 { // osquery buffered_log_max is 1M a.addToBuffer(results) } a.sendLogsBatch() newBufferedCount := a.countBuffered() - prevCount a.stats.UpdateBufferedLogs(newBufferedCount) - a.scheduledQueriesMu.Unlock() } } @@ -1518,7 +1528,16 @@ func (a *agent) config() error { return fmt.Errorf("json parse at config: %w", err) } - scheduledQueryData := make(map[string]scheduledQuery) + existingLastRunData := make(map[string]int64) + + a.scheduledQueryData.Range(func(key, value any) bool { + existingLastRunData[key.(string)] = value.(scheduledQuery).lastRun + + return true + }) + + newScheduledQueryData := new(sync.Map) + for packName, pack := range parsedResp.Packs { for queryName, query := range pack.Queries { m, ok := query.(map[string]interface{}) @@ -1546,17 +1565,14 @@ func (a *agent) config() error { q.Query = m["query"].(string) scheduledQueryName := packName + "_" + queryName - if existingEntry, ok := a.scheduledQueryData[scheduledQueryName]; ok { - // Keep lastRun if the query is already scheduled. - q.lastRun = existingEntry.lastRun + if lastRun, ok := existingLastRunData[scheduledQueryName]; ok { + q.lastRun = lastRun } - scheduledQueryData[scheduledQueryName] = q + newScheduledQueryData.Store(scheduledQueryName, q) } } - a.scheduledQueriesMu.Lock() - a.scheduledQueryData = scheduledQueryData - a.scheduledQueriesMu.Unlock() + a.scheduledQueryData = newScheduledQueryData return nil } @@ -1852,13 +1868,12 @@ func (a *agent) runPolicy(query string) []map[string]string { } func (a *agent) randomQueryStats() []map[string]string { - a.scheduledQueriesMu.Lock() - defer a.scheduledQueriesMu.Unlock() - var stats []map[string]string - for scheduledQuery := range a.scheduledQueryData { + a.scheduledQueryData.Range(func(key, value any) bool { + queryName := key.(string) + stats = append(stats, map[string]string{ - "name": scheduledQuery, + "name": queryName, "delimiter": "_", "average_memory": fmt.Sprint(rand.Intn(200) + 10), "denylisted": "false", @@ -1871,7 +1886,9 @@ func (a *agent) randomQueryStats() []map[string]string { "wall_time": fmt.Sprint(rand.Intn(4) + 1), "wall_time_ms": fmt.Sprint(rand.Intn(4000) + 10), }) - } + + return true + }) return stats }