-
Notifications
You must be signed in to change notification settings - Fork 45
/
log.go
163 lines (140 loc) · 3.84 KB
/
log.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
package transport
import (
"bytes"
"encoding/gob"
"encoding/json"
"errors"
"fmt"
"os"
"path/filepath"
"strconv"
"sync"
"sync/atomic"
"github.com/google/uuid"
"github.com/skycoin/skywire-utilities/pkg/logging"
)
// LogEntry represents a logging entry for a given Transport.
// The entry is updated every time a packet is received or sent.
type LogEntry struct {
// atomic requires 64-bit alignment for struct field access
RecvBytes uint64 `json:"recv"` // Total received bytes.
SentBytes uint64 `json:"sent"` // Total sent bytes.
}
// AddRecv records read.
func (le *LogEntry) AddRecv(n uint64) {
atomic.AddUint64(&le.RecvBytes, n)
}
// AddSent records write.
func (le *LogEntry) AddSent(n uint64) {
atomic.AddUint64(&le.SentBytes, n)
}
// MarshalJSON implements json.Marshaller
func (le *LogEntry) MarshalJSON() ([]byte, error) {
rb := strconv.FormatUint(atomic.LoadUint64(&le.RecvBytes), 10)
sb := strconv.FormatUint(atomic.LoadUint64(&le.SentBytes), 10)
return []byte(`{"recv":` + rb + `,"sent":` + sb + `}`), nil
}
// GobEncode implements gob.GobEncoder
func (le *LogEntry) GobEncode() ([]byte, error) {
var b bytes.Buffer
enc := gob.NewEncoder(&b)
if err := enc.Encode(le.RecvBytes); err != nil {
return nil, err
}
if err := enc.Encode(le.SentBytes); err != nil {
return nil, err
}
return b.Bytes(), nil
}
// GobDecode implements gob.GobDecoder
func (le *LogEntry) GobDecode(b []byte) error {
r := bytes.NewReader(b)
dec := gob.NewDecoder(r)
var rb uint64
if err := dec.Decode(&rb); err != nil {
return err
}
var sb uint64
if err := dec.Decode(&sb); err != nil {
return err
}
atomic.StoreUint64(&le.RecvBytes, rb)
atomic.StoreUint64(&le.SentBytes, sb)
return nil
}
// LogStore stores transport log entries.
type LogStore interface {
Entry(id uuid.UUID) (*LogEntry, error)
Record(id uuid.UUID, entry *LogEntry) error
}
type inMemoryTransportLogStore struct {
entries map[uuid.UUID]*LogEntry
mu sync.Mutex
}
// InMemoryTransportLogStore implements in-memory TransportLogStore.
func InMemoryTransportLogStore() LogStore {
return &inMemoryTransportLogStore{
entries: make(map[uuid.UUID]*LogEntry),
}
}
func (tls *inMemoryTransportLogStore) Entry(id uuid.UUID) (*LogEntry, error) {
tls.mu.Lock()
entry, ok := tls.entries[id]
tls.mu.Unlock()
if !ok {
return entry, errors.New("transport log entry not found")
}
return entry, nil
}
func (tls *inMemoryTransportLogStore) Record(id uuid.UUID, entry *LogEntry) error {
tls.mu.Lock()
if tls.entries == nil {
tls.entries = make(map[uuid.UUID]*LogEntry)
}
tls.entries[id] = entry
tls.mu.Unlock()
return nil
}
type fileTransportLogStore struct {
dir string
log *logging.Logger
}
// FileTransportLogStore implements file TransportLogStore.
func FileTransportLogStore(dir string) (LogStore, error) {
if err := os.MkdirAll(dir, 0707); err != nil {
return nil, err
}
log := logging.MustGetLogger("transport")
return &fileTransportLogStore{dir, log}, nil
}
func (tls *fileTransportLogStore) Entry(id uuid.UUID) (*LogEntry, error) {
f, err := os.Open(filepath.Join(tls.dir, fmt.Sprintf("%s.log", id)))
if err != nil {
return nil, fmt.Errorf("open: %w", err)
}
defer func() {
if err := f.Close(); err != nil {
tls.log.WithError(err).Warn("Failed to close file")
}
}()
entry := &LogEntry{}
if err := json.NewDecoder(f).Decode(entry); err != nil {
return nil, fmt.Errorf("json: %w", err)
}
return entry, nil
}
func (tls *fileTransportLogStore) Record(id uuid.UUID, entry *LogEntry) error {
f, err := os.OpenFile(filepath.Join(tls.dir, fmt.Sprintf("%s.log", id)), os.O_RDWR|os.O_CREATE, 0600)
if err != nil {
return fmt.Errorf("open: %w", err)
}
defer func() {
if err := f.Close(); err != nil {
tls.log.WithError(err).Warn("Failed to close file")
}
}()
if err := json.NewEncoder(f).Encode(entry); err != nil {
return fmt.Errorf("json: %w", err)
}
return nil
}