Skip to content

Commit

Permalink
Update transport file logging (#1376)
Browse files Browse the repository at this point in the history
* User File log store instead of mem for transports

* Update fileTransportLogStore

This commit updates the fileTransportLogStore to store the logs into a csv instead of writing json to log file.

We save the logs in just one file per day. The filename now is not the transport ip but the date.

* Test gocsv lib to read and write csv

* Update and use TpLogStore

* Use api server instead of file server

* Add endpoint for privacy json

* Fix timeout on large files

This commit fixes the timeout error on large files by removing the ReadTimeout, ReadTimeout and ReadHeaderTimeout from the http server used for serving the API over dmsg.

* Use gocsv

This commit uses the library gocarina/gocsv for the csv instead of encoding/csv because of it's limited functionality and the additional code needed to use it. Using gocarina/gocsv simplifies the process.

It also makes it easy to update the entries in csv where as it was a bit tedious and difficult to do the same with encoding/csv.

* Move today to it's own method

* Minor change

* Minor changes

* Remove todo as it is not needed

* Serve custom folder for usage of the user

* Remove test log

* Add log level to dmsgserver logs

* Minor fix

The ctx done case is moved up to prevent printing 'Cannot fetch public services: context canceled' error twice on visor shutdown.

* Fox import linting

* Read ReadHeaderTimeout to httpserver to fix lint
  • Loading branch information
ersonp authored Oct 11, 2022
1 parent 79557a6 commit 6446a6e
Show file tree
Hide file tree
Showing 19 changed files with 2,495 additions and 36 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ require (
github.com/bitfield/script v0.19.0
github.com/blang/semver/v4 v4.0.0
github.com/go-chi/chi/v5 v5.0.8-0.20220103230436-7dbe9a0bd10f
github.com/gocarina/gocsv v0.0.0-20220927221512-ad3251f9fa25
github.com/ivanpirog/coloredcobra v1.0.0
github.com/james-barrow/golang-ipc v0.0.0-20210227130457-95e7cc81f5e2
github.com/jaypipes/ghw v0.9.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ github.com/gobwas/pool v0.2.0 h1:QEmUOlnSjWtnpRGHF3SauEiOsy82Cup83Vf2LcMlnc8=
github.com/gobwas/pool v0.2.0/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw=
github.com/gobwas/ws v1.0.2 h1:CoAavW/wd/kulfZmSIBt6p24n4j7tHgNVCjsfHVNUbo=
github.com/gobwas/ws v1.0.2/go.mod h1:szmBTxLgaFppYjEmNtny/v3w89xOydFnnZMcgRRu/EM=
github.com/gocarina/gocsv v0.0.0-20220927221512-ad3251f9fa25 h1:wxgEEZvsnOTrDO2npSSKUMDx5IykfoGmro+/Vjc1BQ8=
github.com/gocarina/gocsv v0.0.0-20220927221512-ad3251f9fa25/go.mod h1:5YoVOkjYAQumqlV356Hj3xeYh4BdZuLE0/nRkf2NKkI=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/godbus/dbus/v5 v5.1.0 h1:4KLkAxT3aOY8Li4FRJe/KvhoNFFxo0m6fNuFUO8QJUk=
github.com/godbus/dbus/v5 v5.1.0/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
Expand Down
4 changes: 2 additions & 2 deletions pkg/servicedisc/autoconnect.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ func (a *autoconnector) Run(ctx context.Context) (err error) {

for {
select {
case <-ctx.Done():
return context.Canceled
case <-publicServiceTicket.C:
// successfully established transports
tps := a.tm.GetTransportsByLabel(transport.LabelAutomatic)
Expand Down Expand Up @@ -93,8 +95,6 @@ func (a *autoconnector) Run(ctx context.Context) (err error) {
}
}
}
case <-ctx.Done():
return context.Canceled
}
}
}
Expand Down
16 changes: 9 additions & 7 deletions pkg/skyenv/values.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/bitfield/script"
"github.com/google/uuid"
"github.com/jaypipes/ghw"
"github.com/skycoin/dmsg/pkg/dmsg"

"github.com/skycoin/skywire-utilities/pkg/buildinfo"
"github.com/skycoin/skywire-utilities/pkg/cipher"
Expand All @@ -29,12 +30,12 @@ const (
// Dmsg port constants.
// TODO(evanlinjin): Define these properly. These are currently random.
const (
DmsgCtrlPort uint16 = 7 // Listening port for dmsgctrl protocol (similar to TCP Echo Protocol).
DmsgSetupPort uint16 = 36 // Listening port of a setup node.
DmsgHypervisorPort uint16 = 46 // Listening port of a hypervisor for incoming RPC visor connections over dmsg.
DmsgTransportSetupPort uint16 = 47 // Listening port for transport setup RPC over dmsg.
DmsgHTTPPort uint16 = 80 // Listening port for dmsghttp logserver.
DmsgAwaitSetupPort uint16 = 136 // Listening port of a visor for setup operations.
DmsgCtrlPort uint16 = 7 // Listening port for dmsgctrl protocol (similar to TCP Echo Protocol).
DmsgSetupPort uint16 = 36 // Listening port of a setup node.
DmsgHypervisorPort uint16 = 46 // Listening port of a hypervisor for incoming RPC visor connections over dmsg.
DmsgTransportSetupPort uint16 = 47 // Listening port for transport setup RPC over dmsg.
DmsgHTTPPort uint16 = dmsg.DefaultDmsgHTTPPort // Listening port for dmsghttp logserver.
DmsgAwaitSetupPort uint16 = 136 // Listening port of a visor for setup operations.
)

// Transport port constants.
Expand Down Expand Up @@ -93,7 +94,8 @@ const (

// Routing constants
const (
TpLogStore = "./transport_logs"
TpLogStore = "transport_logs"
Custom = "custom"
)

// Local constants
Expand Down
104 changes: 85 additions & 19 deletions pkg/transport/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,35 @@ package transport
import (
"bytes"
"encoding/gob"
"encoding/json"
"errors"
"fmt"
"os"
"path/filepath"
"strconv"
"sync"
"sync/atomic"
"time"

"github.com/gocarina/gocsv"
"github.com/google/uuid"

"github.com/skycoin/skywire-utilities/pkg/logging"
)

// CsvEntry represents a logging entry for csv for a given Transport.
type CsvEntry struct {
TpID uuid.UUID `csv:"tp_id"`
// atomic requires 64-bit alignment for struct field access
LogEntry
TimeStamp time.Time `csv:"time_stamp"` // TimeStamp should be time.RFC3339Nano formatted
}

// LogEntry represents a logging entry for a given Transport.
// The entry is updated every time a packet is received or sent.
type LogEntry struct {
// atomic requires 64-bit alignment for struct field access
RecvBytes uint64 `json:"recv"` // Total received bytes.
SentBytes uint64 `json:"sent"` // Total sent bytes.
RecvBytes uint64 `csv:"recv"` // Total received bytes.
SentBytes uint64 `csv:"sent"` // Total sent bytes.
}

// AddRecv records read.
Expand Down Expand Up @@ -118,46 +127,103 @@ type fileTransportLogStore struct {

// FileTransportLogStore implements file TransportLogStore.
func FileTransportLogStore(dir string) (LogStore, error) {
if err := os.MkdirAll(dir, 0707); err != nil {
if err := os.MkdirAll(dir, 0606); err != nil {
return nil, err
}
log := logging.MustGetLogger("transport")
return &fileTransportLogStore{dir, log}, nil
}

func (tls *fileTransportLogStore) Entry(id uuid.UUID) (*LogEntry, error) {
f, err := os.Open(filepath.Join(tls.dir, fmt.Sprintf("%s.log", id)))
func (tls *fileTransportLogStore) Entry(tpID uuid.UUID) (*LogEntry, error) {
entries, err := tls.readFromCSV(tls.today())
if err != nil {
return nil, err
}
for _, entry := range entries {
if entry.TpID == tpID {
return &entry.LogEntry, nil
}
}
return nil, nil
}

func (tls *fileTransportLogStore) Record(id uuid.UUID, entry *LogEntry) error {
cEntry := &CsvEntry{
TpID: id,
LogEntry: *entry,
TimeStamp: time.Now().UTC(),
}

return tls.writeToCSV(cEntry)
}

func (tls *fileTransportLogStore) writeToCSV(cEntry *CsvEntry) error {
f, err := os.OpenFile(filepath.Join(tls.dir, fmt.Sprintf("%s.csv", tls.today())), os.O_RDWR|os.O_CREATE, os.ModePerm)
if err != nil {
return nil, fmt.Errorf("open: %w", err)
return err
}

defer func() {
if err := f.Close(); err != nil {
tls.log.WithError(err).Warn("Failed to close file")
tls.log.WithError(err).Errorln("Failed to close hypervisor response body")
}
}()

entry := &LogEntry{}
if err := json.NewDecoder(f).Decode(entry); err != nil {
return nil, fmt.Errorf("json: %w", err)
readClients := []*CsvEntry{}
writeClients := []*CsvEntry{}

if err := gocsv.UnmarshalFile(f, &readClients); err != nil && !errors.Is(err, gocsv.ErrEmptyCSVFile) { // Load clients from file
return err
}

return entry, nil
if len(readClients) == 0 {
writeClients = append(writeClients, cEntry)
}

for _, client := range readClients {
if client.TpID == cEntry.TpID {
writeClients = append(writeClients, cEntry)
continue
}
writeClients = append(writeClients, client)
}

if _, err := f.Seek(0, 0); err != nil { // Go to the start of the file
return err
}

_, err = gocsv.MarshalString(&writeClients) // Get all clients as CSV string
if err != nil {
return err
}

err = gocsv.MarshalFile(&writeClients, f) // Use this to save the CSV back to the file
if err != nil {
return err
}
return nil
}

func (tls *fileTransportLogStore) Record(id uuid.UUID, entry *LogEntry) error {
f, err := os.OpenFile(filepath.Join(tls.dir, fmt.Sprintf("%s.log", id)), os.O_RDWR|os.O_CREATE, 0600)
func (tls *fileTransportLogStore) readFromCSV(fileName string) ([]*CsvEntry, error) {
f, err := os.OpenFile(filepath.Join(tls.dir, fmt.Sprint(fileName)), os.O_RDWR|os.O_CREATE, os.ModePerm)
if err != nil {
return fmt.Errorf("open: %w", err)
return nil, err
}

defer func() {
if err := f.Close(); err != nil {
tls.log.WithError(err).Warn("Failed to close file")
tls.log.WithError(err).Errorln("Failed to close hypervisor response body")
}
}()

if err := json.NewEncoder(f).Encode(entry); err != nil {
return fmt.Errorf("json: %w", err)
readClients := []*CsvEntry{}

if err := gocsv.UnmarshalFile(f, &readClients); err != nil && !errors.Is(err, gocsv.ErrEmptyCSVFile) { // Load clients from file
return nil, err
}
return readClients, nil
}

return nil
func (tls *fileTransportLogStore) today() string {
return time.Now().UTC().Format("2006-01-02")
}
33 changes: 25 additions & 8 deletions pkg/visor/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"time"

"github.com/ccding/go-stun/stun"
"github.com/sirupsen/logrus"
"github.com/skycoin/dmsg/pkg/direct"
dmsgdisc "github.com/skycoin/dmsg/pkg/disc"
"github.com/skycoin/dmsg/pkg/dmsg"
Expand Down Expand Up @@ -47,6 +48,7 @@ import (
"github.com/skycoin/skywire/pkg/utclient"
"github.com/skycoin/skywire/pkg/util/osutil"
"github.com/skycoin/skywire/pkg/visor/dmsgtracker"
"github.com/skycoin/skywire/pkg/visor/logserver"
"github.com/skycoin/skywire/pkg/visor/visorconfig"
vinit "github.com/skycoin/skywire/pkg/visor/visorinit"
)
Expand Down Expand Up @@ -352,6 +354,16 @@ func initDmsgHTTPLogServer(ctx context.Context, v *Visor, log *logging.Logger) e
}
logger := v.MasterLogger().PackageLogger("dmsghttp_logserver")

tpLogPath := v.conf.LocalPath + "/" + skyenv.TpLogStore
customPath := v.conf.LocalPath + "/" + skyenv.Custom

var printLog bool
if v.MasterLogger().GetLevel() == logrus.DebugLevel || v.MasterLogger().GetLevel() == logrus.TraceLevel {
printLog = true
}

lsAPI := logserver.New(logger, tpLogPath, v.conf.LocalPath, customPath, printLog)

lis, err := dmsgC.Listen(skyenv.DmsgHTTPPort)
if err != nil {
return err
Expand All @@ -362,11 +374,13 @@ func initDmsgHTTPLogServer(ctx context.Context, v *Visor, log *logging.Logger) e
logger.WithError(err).Error()
}
}()

log.WithField("dmsg_addr", fmt.Sprintf("dmsg://%v", lis.Addr().String())).
Debug("Serving...")
srv := &http.Server{
ReadHeaderTimeout: 5 * time.Second,
ReadTimeout: 5 * time.Second,
WriteTimeout: 10 * time.Second,
Handler: http.FileServer(http.Dir(v.conf.LocalPath)),
ReadHeaderTimeout: 2 * time.Second,
IdleTimeout: 30 * time.Second,
Handler: lsAPI,
}

wg := new(sync.WaitGroup)
Expand Down Expand Up @@ -449,7 +463,10 @@ func initTransport(ctx context.Context, v *Visor, log *logging.Logger) error {
return err
}

logS := transport.InMemoryTransportLogStore()
logS, err := transport.FileTransportLogStore(v.conf.LocalPath + "/" + skyenv.TpLogStore)
if err != nil {
return err
}

pTps, err := v.conf.GetPersistentTransports()
if err != nil {
Expand Down Expand Up @@ -579,8 +596,8 @@ func getRouteSetupHooks(ctx context.Context, v *Visor, log *logging.Logger) []ro
trySUDPH := false

for _, trans := range transports {
ntype := network.Type(trans)
if ntype == network.STCPR {
nType := network.Type(trans)
if nType == network.STCPR {
trySTCPR = true
continue
}
Expand All @@ -589,7 +606,7 @@ func getRouteSetupHooks(ctx context.Context, v *Visor, log *logging.Logger) []ro
<-v.stunReady

// skip if SUDPH is under symmetric NAT / under UDP firewall.
if ntype == network.SUDPH && (v.stunClient.NATType == stun.NATSymmetric ||
if nType == network.SUDPH && (v.stunClient.NATType == stun.NATSymmetric ||
v.stunClient.NATType == stun.NATSymmetricUDPFirewall) {
continue
}
Expand Down
Loading

0 comments on commit 6446a6e

Please sign in to comment.