Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update transport file logging #1376

Merged
merged 19 commits into from
Oct 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ require (
github.com/bitfield/script v0.19.0
github.com/blang/semver/v4 v4.0.0
github.com/go-chi/chi/v5 v5.0.8-0.20220103230436-7dbe9a0bd10f
github.com/gocarina/gocsv v0.0.0-20220927221512-ad3251f9fa25
github.com/ivanpirog/coloredcobra v1.0.0
github.com/james-barrow/golang-ipc v0.0.0-20210227130457-95e7cc81f5e2
github.com/jaypipes/ghw v0.9.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ github.com/gobwas/pool v0.2.0 h1:QEmUOlnSjWtnpRGHF3SauEiOsy82Cup83Vf2LcMlnc8=
github.com/gobwas/pool v0.2.0/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw=
github.com/gobwas/ws v1.0.2 h1:CoAavW/wd/kulfZmSIBt6p24n4j7tHgNVCjsfHVNUbo=
github.com/gobwas/ws v1.0.2/go.mod h1:szmBTxLgaFppYjEmNtny/v3w89xOydFnnZMcgRRu/EM=
github.com/gocarina/gocsv v0.0.0-20220927221512-ad3251f9fa25 h1:wxgEEZvsnOTrDO2npSSKUMDx5IykfoGmro+/Vjc1BQ8=
github.com/gocarina/gocsv v0.0.0-20220927221512-ad3251f9fa25/go.mod h1:5YoVOkjYAQumqlV356Hj3xeYh4BdZuLE0/nRkf2NKkI=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/godbus/dbus/v5 v5.1.0 h1:4KLkAxT3aOY8Li4FRJe/KvhoNFFxo0m6fNuFUO8QJUk=
github.com/godbus/dbus/v5 v5.1.0/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
Expand Down
4 changes: 2 additions & 2 deletions pkg/servicedisc/autoconnect.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ func (a *autoconnector) Run(ctx context.Context) (err error) {

for {
select {
case <-ctx.Done():
return context.Canceled
case <-publicServiceTicket.C:
// successfully established transports
tps := a.tm.GetTransportsByLabel(transport.LabelAutomatic)
Expand Down Expand Up @@ -93,8 +95,6 @@ func (a *autoconnector) Run(ctx context.Context) (err error) {
}
}
}
case <-ctx.Done():
return context.Canceled
}
}
}
Expand Down
16 changes: 9 additions & 7 deletions pkg/skyenv/values.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/bitfield/script"
"github.com/google/uuid"
"github.com/jaypipes/ghw"
"github.com/skycoin/dmsg/pkg/dmsg"

"github.com/skycoin/skywire-utilities/pkg/buildinfo"
"github.com/skycoin/skywire-utilities/pkg/cipher"
Expand All @@ -29,12 +30,12 @@ const (
// Dmsg port constants.
// TODO(evanlinjin): Define these properly. These are currently random.
const (
DmsgCtrlPort uint16 = 7 // Listening port for dmsgctrl protocol (similar to TCP Echo Protocol).
DmsgSetupPort uint16 = 36 // Listening port of a setup node.
DmsgHypervisorPort uint16 = 46 // Listening port of a hypervisor for incoming RPC visor connections over dmsg.
DmsgTransportSetupPort uint16 = 47 // Listening port for transport setup RPC over dmsg.
DmsgHTTPPort uint16 = 80 // Listening port for dmsghttp logserver.
DmsgAwaitSetupPort uint16 = 136 // Listening port of a visor for setup operations.
DmsgCtrlPort uint16 = 7 // Listening port for dmsgctrl protocol (similar to TCP Echo Protocol).
DmsgSetupPort uint16 = 36 // Listening port of a setup node.
DmsgHypervisorPort uint16 = 46 // Listening port of a hypervisor for incoming RPC visor connections over dmsg.
DmsgTransportSetupPort uint16 = 47 // Listening port for transport setup RPC over dmsg.
DmsgHTTPPort uint16 = dmsg.DefaultDmsgHTTPPort // Listening port for dmsghttp logserver.
DmsgAwaitSetupPort uint16 = 136 // Listening port of a visor for setup operations.
)

// Transport port constants.
Expand Down Expand Up @@ -93,7 +94,8 @@ const (

// Routing constants
const (
TpLogStore = "./transport_logs"
TpLogStore = "transport_logs"
Custom = "custom"
)

// Local constants
Expand Down
104 changes: 85 additions & 19 deletions pkg/transport/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,35 @@ package transport
import (
"bytes"
"encoding/gob"
"encoding/json"
"errors"
"fmt"
"os"
"path/filepath"
"strconv"
"sync"
"sync/atomic"
"time"

"github.com/gocarina/gocsv"
"github.com/google/uuid"

"github.com/skycoin/skywire-utilities/pkg/logging"
)

// CsvEntry represents a logging entry for csv for a given Transport.
type CsvEntry struct {
TpID uuid.UUID `csv:"tp_id"`
// atomic requires 64-bit alignment for struct field access
LogEntry
TimeStamp time.Time `csv:"time_stamp"` // TimeStamp should be time.RFC3339Nano formatted
}

// LogEntry represents a logging entry for a given Transport.
// The entry is updated every time a packet is received or sent.
type LogEntry struct {
// atomic requires 64-bit alignment for struct field access
RecvBytes uint64 `json:"recv"` // Total received bytes.
SentBytes uint64 `json:"sent"` // Total sent bytes.
RecvBytes uint64 `csv:"recv"` // Total received bytes.
SentBytes uint64 `csv:"sent"` // Total sent bytes.
}

// AddRecv records read.
Expand Down Expand Up @@ -118,46 +127,103 @@ type fileTransportLogStore struct {

// FileTransportLogStore implements file TransportLogStore.
func FileTransportLogStore(dir string) (LogStore, error) {
if err := os.MkdirAll(dir, 0707); err != nil {
if err := os.MkdirAll(dir, 0606); err != nil {
return nil, err
}
log := logging.MustGetLogger("transport")
return &fileTransportLogStore{dir, log}, nil
}

func (tls *fileTransportLogStore) Entry(id uuid.UUID) (*LogEntry, error) {
f, err := os.Open(filepath.Join(tls.dir, fmt.Sprintf("%s.log", id)))
func (tls *fileTransportLogStore) Entry(tpID uuid.UUID) (*LogEntry, error) {
entries, err := tls.readFromCSV(tls.today())
if err != nil {
return nil, err
}
for _, entry := range entries {
if entry.TpID == tpID {
return &entry.LogEntry, nil
}
}
return nil, nil
}

func (tls *fileTransportLogStore) Record(id uuid.UUID, entry *LogEntry) error {
cEntry := &CsvEntry{
TpID: id,
LogEntry: *entry,
TimeStamp: time.Now().UTC(),
}

return tls.writeToCSV(cEntry)
}

func (tls *fileTransportLogStore) writeToCSV(cEntry *CsvEntry) error {
f, err := os.OpenFile(filepath.Join(tls.dir, fmt.Sprintf("%s.csv", tls.today())), os.O_RDWR|os.O_CREATE, os.ModePerm)
if err != nil {
return nil, fmt.Errorf("open: %w", err)
return err
}

defer func() {
if err := f.Close(); err != nil {
tls.log.WithError(err).Warn("Failed to close file")
tls.log.WithError(err).Errorln("Failed to close hypervisor response body")
}
}()

entry := &LogEntry{}
if err := json.NewDecoder(f).Decode(entry); err != nil {
return nil, fmt.Errorf("json: %w", err)
readClients := []*CsvEntry{}
writeClients := []*CsvEntry{}

if err := gocsv.UnmarshalFile(f, &readClients); err != nil && !errors.Is(err, gocsv.ErrEmptyCSVFile) { // Load clients from file
return err
}

return entry, nil
if len(readClients) == 0 {
writeClients = append(writeClients, cEntry)
}

for _, client := range readClients {
if client.TpID == cEntry.TpID {
writeClients = append(writeClients, cEntry)
continue
}
writeClients = append(writeClients, client)
}

if _, err := f.Seek(0, 0); err != nil { // Go to the start of the file
return err
}

_, err = gocsv.MarshalString(&writeClients) // Get all clients as CSV string
if err != nil {
return err
}

err = gocsv.MarshalFile(&writeClients, f) // Use this to save the CSV back to the file
if err != nil {
return err
}
return nil
}

func (tls *fileTransportLogStore) Record(id uuid.UUID, entry *LogEntry) error {
f, err := os.OpenFile(filepath.Join(tls.dir, fmt.Sprintf("%s.log", id)), os.O_RDWR|os.O_CREATE, 0600)
func (tls *fileTransportLogStore) readFromCSV(fileName string) ([]*CsvEntry, error) {
f, err := os.OpenFile(filepath.Join(tls.dir, fmt.Sprint(fileName)), os.O_RDWR|os.O_CREATE, os.ModePerm)
if err != nil {
return fmt.Errorf("open: %w", err)
return nil, err
}

defer func() {
if err := f.Close(); err != nil {
tls.log.WithError(err).Warn("Failed to close file")
tls.log.WithError(err).Errorln("Failed to close hypervisor response body")
}
}()

if err := json.NewEncoder(f).Encode(entry); err != nil {
return fmt.Errorf("json: %w", err)
readClients := []*CsvEntry{}

if err := gocsv.UnmarshalFile(f, &readClients); err != nil && !errors.Is(err, gocsv.ErrEmptyCSVFile) { // Load clients from file
return nil, err
}
return readClients, nil
}

return nil
func (tls *fileTransportLogStore) today() string {
return time.Now().UTC().Format("2006-01-02")
}
33 changes: 25 additions & 8 deletions pkg/visor/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"time"

"github.com/ccding/go-stun/stun"
"github.com/sirupsen/logrus"
"github.com/skycoin/dmsg/pkg/direct"
dmsgdisc "github.com/skycoin/dmsg/pkg/disc"
"github.com/skycoin/dmsg/pkg/dmsg"
Expand Down Expand Up @@ -47,6 +48,7 @@ import (
"github.com/skycoin/skywire/pkg/utclient"
"github.com/skycoin/skywire/pkg/util/osutil"
"github.com/skycoin/skywire/pkg/visor/dmsgtracker"
"github.com/skycoin/skywire/pkg/visor/logserver"
"github.com/skycoin/skywire/pkg/visor/visorconfig"
vinit "github.com/skycoin/skywire/pkg/visor/visorinit"
)
Expand Down Expand Up @@ -352,6 +354,16 @@ func initDmsgHTTPLogServer(ctx context.Context, v *Visor, log *logging.Logger) e
}
logger := v.MasterLogger().PackageLogger("dmsghttp_logserver")

tpLogPath := v.conf.LocalPath + "/" + skyenv.TpLogStore
customPath := v.conf.LocalPath + "/" + skyenv.Custom

var printLog bool
if v.MasterLogger().GetLevel() == logrus.DebugLevel || v.MasterLogger().GetLevel() == logrus.TraceLevel {
printLog = true
}

lsAPI := logserver.New(logger, tpLogPath, v.conf.LocalPath, customPath, printLog)

lis, err := dmsgC.Listen(skyenv.DmsgHTTPPort)
if err != nil {
return err
Expand All @@ -362,11 +374,13 @@ func initDmsgHTTPLogServer(ctx context.Context, v *Visor, log *logging.Logger) e
logger.WithError(err).Error()
}
}()

log.WithField("dmsg_addr", fmt.Sprintf("dmsg://%v", lis.Addr().String())).
Debug("Serving...")
srv := &http.Server{
ReadHeaderTimeout: 5 * time.Second,
ReadTimeout: 5 * time.Second,
WriteTimeout: 10 * time.Second,
Handler: http.FileServer(http.Dir(v.conf.LocalPath)),
ReadHeaderTimeout: 2 * time.Second,
IdleTimeout: 30 * time.Second,
Handler: lsAPI,
}

wg := new(sync.WaitGroup)
Expand Down Expand Up @@ -449,7 +463,10 @@ func initTransport(ctx context.Context, v *Visor, log *logging.Logger) error {
return err
}

logS := transport.InMemoryTransportLogStore()
logS, err := transport.FileTransportLogStore(v.conf.LocalPath + "/" + skyenv.TpLogStore)
if err != nil {
return err
}

pTps, err := v.conf.GetPersistentTransports()
if err != nil {
Expand Down Expand Up @@ -579,8 +596,8 @@ func getRouteSetupHooks(ctx context.Context, v *Visor, log *logging.Logger) []ro
trySUDPH := false

for _, trans := range transports {
ntype := network.Type(trans)
if ntype == network.STCPR {
nType := network.Type(trans)
if nType == network.STCPR {
trySTCPR = true
continue
}
Expand All @@ -589,7 +606,7 @@ func getRouteSetupHooks(ctx context.Context, v *Visor, log *logging.Logger) []ro
<-v.stunReady

// skip if SUDPH is under symmetric NAT / under UDP firewall.
if ntype == network.SUDPH && (v.stunClient.NATType == stun.NATSymmetric ||
if nType == network.SUDPH && (v.stunClient.NATType == stun.NATSymmetric ||
v.stunClient.NATType == stun.NATSymmetricUDPFirewall) {
continue
}
Expand Down
Loading