Skip to content

Commit

Permalink
diag: thread-safety step1 - json marshal under mutex (#11134)
Browse files Browse the repository at this point in the history
  • Loading branch information
AskAlexSharov authored Jul 13, 2024
1 parent e513e5f commit b1c60ad
Show file tree
Hide file tree
Showing 10 changed files with 80 additions and 23 deletions.
3 changes: 1 addition & 2 deletions diagnostics/bodies_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package diagnostics

import (
"encoding/json"
"net/http"

diaglib "github.com/ledgerwatch/erigon-lib/diagnostics"
Expand All @@ -37,5 +36,5 @@ func SetupBodiesAccess(metricsMux *http.ServeMux, diag *diaglib.DiagnosticClient
}

func writeBodies(w http.ResponseWriter, diag *diaglib.DiagnosticClient) {
json.NewEncoder(w).Encode(diag.GetBodiesInfo())
diag.BodiesInfoJson(w)
}
3 changes: 1 addition & 2 deletions diagnostics/headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package diagnostics

import (
"encoding/json"
"net/http"

diaglib "github.com/ledgerwatch/erigon-lib/diagnostics"
Expand All @@ -36,5 +35,5 @@ func SetupHeadersAccess(metricsMux *http.ServeMux, diag *diaglib.DiagnosticClien
}

func writeHeaders(w http.ResponseWriter, diag *diaglib.DiagnosticClient) {
json.NewEncoder(w).Encode(diag.GetHeaders())
diag.HeadersJson(w)
}
13 changes: 6 additions & 7 deletions diagnostics/snapshot_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package diagnostics

import (
"encoding/json"
"net/http"

diaglib "github.com/ledgerwatch/erigon-lib/diagnostics"
Expand Down Expand Up @@ -66,25 +65,25 @@ func SetupStagesAccess(metricsMux *http.ServeMux, diag *diaglib.DiagnosticClient
}

func writeNetworkSpeed(w http.ResponseWriter, diag *diaglib.DiagnosticClient) {
json.NewEncoder(w).Encode(diag.GetNetworkSpeed())
diag.NetworkSpeedJson(w)
}

func writeResourcesUsage(w http.ResponseWriter, diag *diaglib.DiagnosticClient) {
json.NewEncoder(w).Encode(diag.GetResourcesUsage())
diag.ResourcesUsageJson(w)
}

func writeStages(w http.ResponseWriter, diag *diaglib.DiagnosticClient) {
json.NewEncoder(w).Encode(diag.SyncStatistics())
diag.SyncStatsJson(w)
}

func writeFilesList(w http.ResponseWriter, diag *diaglib.DiagnosticClient) {
json.NewEncoder(w).Encode(diag.SnapshotFilesList())
diag.SnapshotFilesListJson(w)
}

func writeHardwareInfo(w http.ResponseWriter, diag *diaglib.DiagnosticClient) {
json.NewEncoder(w).Encode(diag.HardwareInfo())
diag.HardwareInfoJson(w)
}

func writeSyncStages(w http.ResponseWriter, diag *diaglib.DiagnosticClient) {
json.NewEncoder(w).Encode(diag.GetSyncStages())
diag.SyncStagesJson(w)
}
8 changes: 6 additions & 2 deletions erigon-lib/diagnostics/bodies.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package diagnostics

import (
"context"
"encoding/json"
"io"

"github.com/ledgerwatch/erigon-lib/log/v3"
)
Expand Down Expand Up @@ -109,8 +111,10 @@ func (d *DiagnosticClient) runBodiesProcessingListener(rootCtx context.Context)
}()
}

func (d *DiagnosticClient) GetBodiesInfo() BodiesInfo {
func (d *DiagnosticClient) BodiesInfoJson(w io.Writer) {
d.bodiesMutex.Lock()
defer d.bodiesMutex.Unlock()
return d.bodies
if err := json.NewEncoder(w).Encode(d.bodies); err != nil {
log.Debug("[diagnostics] BodiesInfoJson", "err", err)
}
}
10 changes: 8 additions & 2 deletions erigon-lib/diagnostics/headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package diagnostics

import (
"context"
"encoding/json"
"io"

"github.com/ledgerwatch/erigon-lib/log/v3"
)
Expand All @@ -29,8 +31,12 @@ func (d *DiagnosticClient) setupHeadersDiagnostics(rootCtx context.Context) {
d.runProcessedListener(rootCtx)
}

func (d *DiagnosticClient) GetHeaders() Headers {
return d.headers
func (d *DiagnosticClient) HeadersJson(w io.Writer) {
d.headerMutex.Lock()
defer d.headerMutex.Unlock()
if err := json.NewEncoder(w).Encode(d.headers); err != nil {
log.Debug("[diagnostics] HeadersJson", "err", err)
}
}

func (d *DiagnosticClient) runHeadersWaitingListener(rootCtx context.Context) {
Expand Down
9 changes: 7 additions & 2 deletions erigon-lib/diagnostics/resources_usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package diagnostics

import (
"context"
"encoding/json"
"io"

"github.com/ledgerwatch/erigon-lib/log/v3"
)
Expand All @@ -26,13 +28,16 @@ func (d *DiagnosticClient) setupResourcesUsageDiagnostics(rootCtx context.Contex
d.runMemoryStatsListener(rootCtx)
}

func (d *DiagnosticClient) GetResourcesUsage() ResourcesUsage {
func (d *DiagnosticClient) ResourcesUsageJson(w io.Writer) {
d.resourcesUsageMutex.Lock()
defer d.resourcesUsageMutex.Unlock()

returnObj := d.resourcesUsage
d.resourcesUsage = ResourcesUsage{}
return returnObj

if err := json.NewEncoder(w).Encode(returnObj); err != nil {
log.Debug("[diagnostics] ResourcesUsageJson", "err", err)
}
}

func (d *DiagnosticClient) runMemoryStatsListener(rootCtx context.Context) {
Expand Down
19 changes: 17 additions & 2 deletions erigon-lib/diagnostics/snapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ package diagnostics

import (
"context"
"encoding/json"
"fmt"
"io"
"time"

"github.com/ledgerwatch/erigon-lib/common"
Expand Down Expand Up @@ -424,12 +426,25 @@ func (d *DiagnosticClient) SetFillDBInfo(info SnapshotFillDBStage) {
}
}

// Deprecated - it's not thread-safe and used only in tests. Need introduce another method or add special methods for Tests.
func (d *DiagnosticClient) SyncStatistics() SyncStatistics {
return d.syncStats
}

func (d *DiagnosticClient) SnapshotFilesList() SnapshoFilesList {
return d.snapshotFileList
func (d *DiagnosticClient) SyncStatsJson(w io.Writer) {
d.mu.Lock()
defer d.mu.Unlock()
if err := json.NewEncoder(w).Encode(d.syncStats); err != nil {
log.Debug("[diagnostics] SyncStatsJson", "err", err)
}
}

func (d *DiagnosticClient) SnapshotFilesListJson(w io.Writer) {
d.mu.Lock()
defer d.mu.Unlock()
if err := json.NewEncoder(w).Encode(d.snapshotFileList); err != nil {
log.Debug("[diagnostics] SnapshotFilesListJson", "err", err)
}
}

func SnapshotDownloadInfoFromTx(tx kv.Tx) ([]byte, error) {
Expand Down
11 changes: 9 additions & 2 deletions erigon-lib/diagnostics/speedtest.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ package diagnostics

import (
"context"
"encoding/json"
"io"
"time"

"github.com/ledgerwatch/erigon-lib/log/v3"
"github.com/showwin/speedtest-go/speedtest"
"github.com/showwin/speedtest-go/speedtest/transport"
)
Expand Down Expand Up @@ -86,6 +89,10 @@ func (d *DiagnosticClient) runSpeedTest(rootCtx context.Context) NetworkSpeedTes
}
}

func (d *DiagnosticClient) GetNetworkSpeed() NetworkSpeedTestResult {
return d.networkSpeed
func (d *DiagnosticClient) NetworkSpeedJson(w io.Writer) {
d.networkSpeedMutex.Lock()
defer d.networkSpeedMutex.Unlock()
if err := json.NewEncoder(w).Encode(d.networkSpeed); err != nil {
log.Debug("[diagnostics] ResourcesUsageJson", "err", err)
}
}
16 changes: 16 additions & 0 deletions erigon-lib/diagnostics/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ package diagnostics

import (
"context"
"encoding/json"
"fmt"
"io"

"github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/kv"
Expand Down Expand Up @@ -283,7 +285,12 @@ func (d *DiagnosticClient) SetCurrentSyncSubStage(css CurrentSyncSubStage) {
}
}

// Deprecated - used only in tests. Non-thread-safe.
func (d *DiagnosticClient) GetStageState(stageId string) (StageState, error) {
return d.getStageState(stageId)
}

func (d *DiagnosticClient) getStageState(stageId string) (StageState, error) {
for _, stage := range d.syncStages {
if stage.ID == stageId {
return stage.State, nil
Expand Down Expand Up @@ -311,6 +318,15 @@ func StagesListUpdater(info []SyncStage) func(tx kv.RwTx) error {
return PutDataToTable(kv.DiagSyncStages, StagesListKey, info)
}

// Deprecated - not thread-safe method. Used only in tests. Need introduce more thread-safe method or something special for tests.
func (d *DiagnosticClient) GetSyncStages() []SyncStage {
return d.syncStages
}

func (d *DiagnosticClient) SyncStagesJson(w io.Writer) {
d.mu.Lock()
defer d.mu.Unlock()
if err := json.NewEncoder(w).Encode(d.syncStages); err != nil {
log.Debug("[diagnostics] HardwareInfoJson", "err", err)
}
}
11 changes: 9 additions & 2 deletions erigon-lib/diagnostics/sys_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
package diagnostics

import (
"encoding/json"
"io"

"github.com/shirou/gopsutil/v3/cpu"
"github.com/shirou/gopsutil/v3/disk"
"github.com/shirou/gopsutil/v3/mem"
Expand Down Expand Up @@ -59,8 +62,12 @@ func (d *DiagnosticClient) setupSysInfoDiagnostics() {
d.mu.Unlock()
}

func (d *DiagnosticClient) HardwareInfo() HardwareInfo {
return d.hardwareInfo
func (d *DiagnosticClient) HardwareInfoJson(w io.Writer) {
d.mu.Lock()
defer d.mu.Unlock()
if err := json.NewEncoder(w).Encode(d.hardwareInfo); err != nil {
log.Debug("[diagnostics] HardwareInfoJson", "err", err)
}
}

func findNodeDisk(dirPath string) string {
Expand Down

0 comments on commit b1c60ad

Please sign in to comment.