From 2b441efe83d041b614a029f1323e582d295a520e Mon Sep 17 00:00:00 2001 From: Akshat Jaimini Date: Mon, 17 Jun 2024 19:30:46 +0530 Subject: [PATCH 01/16] RPC sink for pgwatch3 --- src/sinks/multiwriter.go | 2 ++ src/sinks/rpc.go | 71 ++++++++++++++++++++++++++++++++++++++++ src/sinks/types.go | 12 +++++++ 3 files changed, 85 insertions(+) create mode 100644 src/sinks/rpc.go create mode 100644 src/sinks/types.go diff --git a/src/sinks/multiwriter.go b/src/sinks/multiwriter.go index e625b55ec3..86aad1d8bc 100644 --- a/src/sinks/multiwriter.go +++ b/src/sinks/multiwriter.go @@ -43,6 +43,8 @@ func NewMultiWriter(ctx context.Context, opts *config.Options, metricDefs *metri w, err = NewPostgresWriter(ctx, s, &opts.Measurements, metricDefs) case "prometheus": w, err = NewPrometheusWriter(ctx, path) + case "rpc": + w, err = NewRPCWriter(ctx, path) default: return nil, fmt.Errorf("unknown schema %s in sink URI %s", scheme, s) } diff --git a/src/sinks/rpc.go b/src/sinks/rpc.go new file mode 100644 index 0000000000..a0bfcf4814 --- /dev/null +++ b/src/sinks/rpc.go @@ -0,0 +1,71 @@ +/* +* +* RPC sink implementation for pgwatch3. +* Requires the address and port of the sink. +* + */ + +package sinks + +import ( + "context" + "net/rpc" + + "github.com/cybertec-postgresql/pgwatch3/metrics" +) + +type RPCWriter struct { + ctx context.Context + address string + client *rpc.Client +} + +func NewRPCWriter(ctx context.Context, address string) (*RPCWriter, error) { + + client, err := rpc.DialHTTP("tcp", address) + if err != nil { + return nil, err + } + + rw := &RPCWriter{ + ctx: ctx, + address: address, + client: client, + } + go rw.watchCtx() + return rw, nil +} + +// Sends Measurement Message to RPC Sink +func (rw *RPCWriter) Write(msgs []metrics.MeasurementMessage) error { + if rw.ctx.Err() != nil { + return rw.ctx.Err() + } + if len(msgs) == 0 { + return nil + } + for _, msg := range msgs { + writeRequest := WriteRequest{ + PgwatchID: 2, + Msg: msg, + } + var status int + err := rw.client.Call("Receiver.UpdateMeasurements", &writeRequest, &status) + if err != nil { + return err + } + } + return nil +} + +func (rw *RPCWriter) SyncMetric(_, _, _ string) error { + if rw.ctx.Err() != nil { + return rw.ctx.Err() + } + return nil +} + +func (rw *RPCWriter) watchCtx() { + <-rw.ctx.Done() + rw.client.Close() +} diff --git a/src/sinks/types.go b/src/sinks/types.go new file mode 100644 index 0000000000..f746794cce --- /dev/null +++ b/src/sinks/types.go @@ -0,0 +1,12 @@ +package sinks + +import ( + "github.com/cybertec-postgresql/pgwatch3/metrics" +) + +type MeasurementMessage metrics.MeasurementMessage + +type WriteRequest struct { + PgwatchID int + Msg metrics.MeasurementMessage +} From 7b350c27438b5258d9caf563eef72a3eef2c1947 Mon Sep 17 00:00:00 2001 From: Akshat Jaimini Date: Fri, 21 Jun 2024 22:53:13 +0530 Subject: [PATCH 02/16] removed unecessary characters --- src/main.go | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main.go b/src/main.go index 142ab488df..651a1f1193 100644 --- a/src/main.go +++ b/src/main.go @@ -127,7 +127,6 @@ func main() { mainContext, cancel = context.WithCancel(context.Background()) SetupCloseHandler(cancel) defer cancel() - if opts, err = config.New(os.Stdout); err != nil { printVersion() fmt.Println(err) From 79939f999f64de05adda4510958eabbf41ae2bed Mon Sep 17 00:00:00 2001 From: Akshat Jaimini Date: Sat, 29 Jun 2024 10:21:11 +0530 Subject: [PATCH 03/16] pgwatch id added to custom tags --- src/reaper/reaper.go | 1 + src/sinks/rpc.go | 15 ++++++++++----- src/sinks/types.go | 12 ------------ 3 files changed, 11 insertions(+), 17 deletions(-) delete mode 100644 src/sinks/types.go diff --git a/src/reaper/reaper.go b/src/reaper/reaper.go index 2f85ecba7d..392ae95551 100644 --- a/src/reaper/reaper.go +++ b/src/reaper/reaper.go @@ -108,6 +108,7 @@ func (r *Reaper) Reap(mainContext context.Context) (err error) { firstLoop = false // only used for failing when 1st config reading fails for _, monitoredDB := range monitoredDbs { + logger.Info("DB->", monitoredDB) logger.WithField("source", monitoredDB.DBUniqueName). WithField("metric", monitoredDB.Metrics). WithField("tags", monitoredDB.CustomTags). diff --git a/src/sinks/rpc.go b/src/sinks/rpc.go index a0bfcf4814..9bf45eea49 100644 --- a/src/sinks/rpc.go +++ b/src/sinks/rpc.go @@ -10,6 +10,8 @@ package sinks import ( "context" "net/rpc" + "os" + "strconv" "github.com/cybertec-postgresql/pgwatch3/metrics" ) @@ -45,12 +47,15 @@ func (rw *RPCWriter) Write(msgs []metrics.MeasurementMessage) error { return nil } for _, msg := range msgs { - writeRequest := WriteRequest{ - PgwatchID: 2, - Msg: msg, - } var status int - err := rw.client.Call("Receiver.UpdateMeasurements", &writeRequest, &status) + pgwatch_id := os.Getenv("pgwatch_id") + msg.CustomTags = make(map[string]string) + if len(pgwatch_id) > 0{ + msg.CustomTags["pgwatch_id"] = pgwatch_id + }else{ + msg.CustomTags["pgwatch_id"] = strconv.Itoa(os.Getpid()) + "_pgwatch3"// Replaces with server PID + } + err := rw.client.Call("Receiver.UpdateMeasurements", &msg, &status) if err != nil { return err } diff --git a/src/sinks/types.go b/src/sinks/types.go deleted file mode 100644 index f746794cce..0000000000 --- a/src/sinks/types.go +++ /dev/null @@ -1,12 +0,0 @@ -package sinks - -import ( - "github.com/cybertec-postgresql/pgwatch3/metrics" -) - -type MeasurementMessage metrics.MeasurementMessage - -type WriteRequest struct { - PgwatchID int - Msg metrics.MeasurementMessage -} From 6fcee7ce3f157fdbbe9dfa14eec1a6381bca94f3 Mon Sep 17 00:00:00 2001 From: Akshat Jaimini Date: Sat, 29 Jun 2024 13:05:41 +0530 Subject: [PATCH 04/16] pgwatch id added to custom tags --- src/sinks/rpc.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/sinks/rpc.go b/src/sinks/rpc.go index 9bf45eea49..2406d78299 100644 --- a/src/sinks/rpc.go +++ b/src/sinks/rpc.go @@ -48,12 +48,12 @@ func (rw *RPCWriter) Write(msgs []metrics.MeasurementMessage) error { } for _, msg := range msgs { var status int - pgwatch_id := os.Getenv("pgwatch_id") + pgwatchId := os.Getenv("pgwatch_id") msg.CustomTags = make(map[string]string) - if len(pgwatch_id) > 0{ - msg.CustomTags["pgwatch_id"] = pgwatch_id + if len(pgwatchId) > 0{ + msg.CustomTags["pgwatchId"] = pgwatchId }else{ - msg.CustomTags["pgwatch_id"] = strconv.Itoa(os.Getpid()) + "_pgwatch3"// Replaces with server PID + msg.CustomTags["pgwatchId"] = strconv.Itoa(os.Getpid()) + "_pgwatch3"// Replaces with server PID } err := rw.client.Call("Receiver.UpdateMeasurements", &msg, &status) if err != nil { From da03035211e2a8f919aa1817000926554fd02c87 Mon Sep 17 00:00:00 2001 From: Akshat Jaimini Date: Sun, 30 Jun 2024 14:56:21 +0530 Subject: [PATCH 05/16] Added Tests for RPC Sink Writer --- src/sinks/rpc.go | 16 +++++----- src/sinks/rpc_test.go | 68 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 76 insertions(+), 8 deletions(-) create mode 100644 src/sinks/rpc_test.go diff --git a/src/sinks/rpc.go b/src/sinks/rpc.go index 2406d78299..e242abf2e9 100644 --- a/src/sinks/rpc.go +++ b/src/sinks/rpc.go @@ -11,7 +11,7 @@ import ( "context" "net/rpc" "os" - "strconv" + "strconv" "github.com/cybertec-postgresql/pgwatch3/metrics" ) @@ -48,13 +48,13 @@ func (rw *RPCWriter) Write(msgs []metrics.MeasurementMessage) error { } for _, msg := range msgs { var status int - pgwatchId := os.Getenv("pgwatch_id") - msg.CustomTags = make(map[string]string) - if len(pgwatchId) > 0{ - msg.CustomTags["pgwatchId"] = pgwatchId - }else{ - msg.CustomTags["pgwatchId"] = strconv.Itoa(os.Getpid()) + "_pgwatch3"// Replaces with server PID - } + pgwatchID := os.Getenv("pgwatch_id") + msg.CustomTags = make(map[string]string) + if len(pgwatchID) > 0 { + msg.CustomTags["pgwatchId"] = pgwatchID + } else { + msg.CustomTags["pgwatchId"] = strconv.Itoa(os.Getpid()) + "_pgwatch3" // Replaces with PID to create a pgwatchid + } err := rw.client.Call("Receiver.UpdateMeasurements", &msg, &status) if err != nil { return err diff --git a/src/sinks/rpc_test.go b/src/sinks/rpc_test.go new file mode 100644 index 0000000000..0867ece45f --- /dev/null +++ b/src/sinks/rpc_test.go @@ -0,0 +1,68 @@ +package sinks + +import ( + "context" + "fmt" + "net" + "net/http" + "net/rpc" + "testing" + + "github.com/cybertec-postgresql/pgwatch3/metrics" +) + +type Receiver struct { +} + +var ctxt = context.Background() + +func (receiver *Receiver) UpdateMeasurements(msg *metrics.MeasurementMessage, status *int) error { + *status = 1 + return nil +} + +func setupServer(port int) error { + recv := new(Receiver) + rpc.Register(recv) + rpc.HandleHTTP() + + listener, err := net.Listen("tcp", "0.0.0.0:"+fmt.Sprint(port)) + if err != nil { + return err + } + go http.Serve(listener, nil) + return nil +} + +func TestNewRPCWriter(t *testing.T) { + port := 5050 + setupServer(port) + _, err := NewRPCWriter(ctxt, "0.0.0.0:"+fmt.Sprint(port)) + if err != nil { + t.Log("Unable to create new RPC client, Error: ", err) + t.Failed() + } +} + +func TestRPCWrite(t *testing.T) { + port := 5050 + setupServer(port) + rw, err := NewRPCWriter(ctxt, "0.0.0.0:"+fmt.Sprint(port)) + if err != nil { + t.Error("Unable to create new RPC client, Error: ", err) + } + + msgs := []metrics.MeasurementMessage{ + metrics.MeasurementMessage{ + DBName: "Db", + }, + metrics.MeasurementMessage{ + DBName: "Db2", + }, + } + + err = rw.Write(msgs) + if err != nil { + t.Error("Unable to Write Messages to sink, Error: ", err) + } +} From a349bd372508e7f3a1f4b0258394cfd9d2ac42a8 Mon Sep 17 00:00:00 2001 From: Pavlo Golub Date: Mon, 1 Jul 2024 15:29:00 +0200 Subject: [PATCH 06/16] Discard changes to src/main.go --- src/main.go | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main.go b/src/main.go index 651a1f1193..142ab488df 100644 --- a/src/main.go +++ b/src/main.go @@ -127,6 +127,7 @@ func main() { mainContext, cancel = context.WithCancel(context.Background()) SetupCloseHandler(cancel) defer cancel() + if opts, err = config.New(os.Stdout); err != nil { printVersion() fmt.Println(err) From 91200b824ca532a794cbac51301d771aa32163a8 Mon Sep 17 00:00:00 2001 From: Pavlo Golub Date: Mon, 1 Jul 2024 15:29:10 +0200 Subject: [PATCH 07/16] Discard changes to src/reaper/reaper.go --- src/reaper/reaper.go | 1 - 1 file changed, 1 deletion(-) diff --git a/src/reaper/reaper.go b/src/reaper/reaper.go index 79d02295a8..b7b6d5893c 100644 --- a/src/reaper/reaper.go +++ b/src/reaper/reaper.go @@ -108,7 +108,6 @@ func (r *Reaper) Reap(mainContext context.Context) (err error) { firstLoop = false // only used for failing when 1st config reading fails for _, monitoredDB := range monitoredDbs { - logger.Info("DB->", monitoredDB) logger.WithField("source", monitoredDB.DBUniqueName). WithField("metric", monitoredDB.Metrics). WithField("tags", monitoredDB.CustomTags). From 1ebfb339f8e3d6807df74ca497624bd2400b3a2d Mon Sep 17 00:00:00 2001 From: Pavlo Golub Date: Mon, 1 Jul 2024 16:27:52 +0200 Subject: [PATCH 08/16] fix rpc tests --- src/sinks/rpc_test.go | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/src/sinks/rpc_test.go b/src/sinks/rpc_test.go index 0867ece45f..b0be3d9346 100644 --- a/src/sinks/rpc_test.go +++ b/src/sinks/rpc_test.go @@ -21,22 +21,20 @@ func (receiver *Receiver) UpdateMeasurements(msg *metrics.MeasurementMessage, st return nil } -func setupServer(port int) error { +func init() { recv := new(Receiver) rpc.Register(recv) rpc.HandleHTTP() - listener, err := net.Listen("tcp", "0.0.0.0:"+fmt.Sprint(port)) + listener, err := net.Listen("tcp", "0.0.0.0:5050") if err != nil { - return err + panic(err) } go http.Serve(listener, nil) - return nil } func TestNewRPCWriter(t *testing.T) { port := 5050 - setupServer(port) _, err := NewRPCWriter(ctxt, "0.0.0.0:"+fmt.Sprint(port)) if err != nil { t.Log("Unable to create new RPC client, Error: ", err) @@ -46,7 +44,6 @@ func TestNewRPCWriter(t *testing.T) { func TestRPCWrite(t *testing.T) { port := 5050 - setupServer(port) rw, err := NewRPCWriter(ctxt, "0.0.0.0:"+fmt.Sprint(port)) if err != nil { t.Error("Unable to create new RPC client, Error: ", err) From 9b960b973b5019034d5a6b26a236aad5333a4628 Mon Sep 17 00:00:00 2001 From: Pavlo Golub Date: Mon, 1 Jul 2024 16:56:39 +0200 Subject: [PATCH 09/16] remove `pgwatchId`, send `msgs` in RPC call, update tests --- src/sinks/rpc.go | 19 +++----------- src/sinks/rpc_test.go | 58 +++++++++++++++++++++++++++++-------------- 2 files changed, 43 insertions(+), 34 deletions(-) diff --git a/src/sinks/rpc.go b/src/sinks/rpc.go index e242abf2e9..1230d3d8f3 100644 --- a/src/sinks/rpc.go +++ b/src/sinks/rpc.go @@ -10,8 +10,6 @@ package sinks import ( "context" "net/rpc" - "os" - "strconv" "github.com/cybertec-postgresql/pgwatch3/metrics" ) @@ -46,19 +44,10 @@ func (rw *RPCWriter) Write(msgs []metrics.MeasurementMessage) error { if len(msgs) == 0 { return nil } - for _, msg := range msgs { - var status int - pgwatchID := os.Getenv("pgwatch_id") - msg.CustomTags = make(map[string]string) - if len(pgwatchID) > 0 { - msg.CustomTags["pgwatchId"] = pgwatchID - } else { - msg.CustomTags["pgwatchId"] = strconv.Itoa(os.Getpid()) + "_pgwatch3" // Replaces with PID to create a pgwatchid - } - err := rw.client.Call("Receiver.UpdateMeasurements", &msg, &status) - if err != nil { - return err - } + var status int + err := rw.client.Call("Receiver.UpdateMeasurements", msgs, &status) + if err != nil { + return err } return nil } diff --git a/src/sinks/rpc_test.go b/src/sinks/rpc_test.go index b0be3d9346..dff3f00ade 100644 --- a/src/sinks/rpc_test.go +++ b/src/sinks/rpc_test.go @@ -2,13 +2,14 @@ package sinks import ( "context" - "fmt" + "errors" "net" "net/http" "net/rpc" "testing" "github.com/cybertec-postgresql/pgwatch3/metrics" + "github.com/stretchr/testify/assert" ) type Receiver struct { @@ -16,7 +17,16 @@ type Receiver struct { var ctxt = context.Background() -func (receiver *Receiver) UpdateMeasurements(msg *metrics.MeasurementMessage, status *int) error { +func (receiver *Receiver) UpdateMeasurements(msgs []metrics.MeasurementMessage, status *int) error { + if msgs == nil { + return errors.New("msgs is nil") + } + if len(msgs) == 0 { + return nil + } + if msgs[0].DBName != "Db" { + return errors.New("invalid message") + } *status = 1 return nil } @@ -34,32 +44,42 @@ func init() { } func TestNewRPCWriter(t *testing.T) { - port := 5050 - _, err := NewRPCWriter(ctxt, "0.0.0.0:"+fmt.Sprint(port)) - if err != nil { - t.Log("Unable to create new RPC client, Error: ", err) - t.Failed() - } + a := assert.New(t) + _, err := NewRPCWriter(ctxt, "foo") + a.Error(err) } func TestRPCWrite(t *testing.T) { - port := 5050 - rw, err := NewRPCWriter(ctxt, "0.0.0.0:"+fmt.Sprint(port)) - if err != nil { - t.Error("Unable to create new RPC client, Error: ", err) - } + a := assert.New(t) + rw, err := NewRPCWriter(ctxt, "0.0.0.0:5050") + a.NoError(err) + // no error for valid messages msgs := []metrics.MeasurementMessage{ - metrics.MeasurementMessage{ + { DBName: "Db", }, - metrics.MeasurementMessage{ - DBName: "Db2", + } + err = rw.Write(msgs) + a.NoError(err) + + // error for invalid messages + msgs = []metrics.MeasurementMessage{ + { + DBName: "invalid", }, } + err = rw.Write(msgs) + a.Error(err) + // no error for empty messages + err = rw.Write([]metrics.MeasurementMessage{}) + a.NoError(err) + + // error for cancelled context + ctx, cancel := context.WithCancel(ctxt) + rw.ctx = ctx + cancel() err = rw.Write(msgs) - if err != nil { - t.Error("Unable to Write Messages to sink, Error: ", err) - } + a.Error(err) } From ca2df1cbdbc1467b8eaaa6bf644d83ca95a1b9ca Mon Sep 17 00:00:00 2001 From: Pavlo Golub Date: Mon, 1 Jul 2024 17:02:25 +0200 Subject: [PATCH 10/16] rename `Receiver` to `RPCWriter` --- src/sinks/rpc.go | 2 +- src/sinks/rpc_test.go | 16 +++++++++------- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/src/sinks/rpc.go b/src/sinks/rpc.go index 1230d3d8f3..91b61fc529 100644 --- a/src/sinks/rpc.go +++ b/src/sinks/rpc.go @@ -45,7 +45,7 @@ func (rw *RPCWriter) Write(msgs []metrics.MeasurementMessage) error { return nil } var status int - err := rw.client.Call("Receiver.UpdateMeasurements", msgs, &status) + err := rw.client.Call("RPCWriter.UpdateMeasurements", msgs, &status) if err != nil { return err } diff --git a/src/sinks/rpc_test.go b/src/sinks/rpc_test.go index dff3f00ade..a3b90aa5e0 100644 --- a/src/sinks/rpc_test.go +++ b/src/sinks/rpc_test.go @@ -1,4 +1,4 @@ -package sinks +package sinks_test import ( "context" @@ -9,15 +9,16 @@ import ( "testing" "github.com/cybertec-postgresql/pgwatch3/metrics" + "github.com/cybertec-postgresql/pgwatch3/sinks" "github.com/stretchr/testify/assert" ) -type Receiver struct { +type RPCWriter struct { } var ctxt = context.Background() -func (receiver *Receiver) UpdateMeasurements(msgs []metrics.MeasurementMessage, status *int) error { +func (receiver *RPCWriter) UpdateMeasurements(msgs []metrics.MeasurementMessage, status *int) error { if msgs == nil { return errors.New("msgs is nil") } @@ -32,7 +33,7 @@ func (receiver *Receiver) UpdateMeasurements(msgs []metrics.MeasurementMessage, } func init() { - recv := new(Receiver) + recv := new(RPCWriter) rpc.Register(recv) rpc.HandleHTTP() @@ -45,13 +46,13 @@ func init() { func TestNewRPCWriter(t *testing.T) { a := assert.New(t) - _, err := NewRPCWriter(ctxt, "foo") + _, err := sinks.NewRPCWriter(ctxt, "foo") a.Error(err) } func TestRPCWrite(t *testing.T) { a := assert.New(t) - rw, err := NewRPCWriter(ctxt, "0.0.0.0:5050") + rw, err := sinks.NewRPCWriter(ctxt, "0.0.0.0:5050") a.NoError(err) // no error for valid messages @@ -78,7 +79,8 @@ func TestRPCWrite(t *testing.T) { // error for cancelled context ctx, cancel := context.WithCancel(ctxt) - rw.ctx = ctx + rw, err = sinks.NewRPCWriter(ctx, "0.0.0.0:5050") + a.NoError(err) cancel() err = rw.Write(msgs) a.Error(err) From 274d55b9494c967d52d8c1a03633e12ace6daebf Mon Sep 17 00:00:00 2001 From: Pavlo Golub Date: Mon, 1 Jul 2024 17:15:42 +0200 Subject: [PATCH 11/16] make linter happy --- src/sinks/rpc_test.go | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/src/sinks/rpc_test.go b/src/sinks/rpc_test.go index a3b90aa5e0..1d1e8e2e51 100644 --- a/src/sinks/rpc_test.go +++ b/src/sinks/rpc_test.go @@ -34,14 +34,17 @@ func (receiver *RPCWriter) UpdateMeasurements(msgs []metrics.MeasurementMessage, func init() { recv := new(RPCWriter) - rpc.Register(recv) + if err := rpc.Register(recv); err != nil { + panic(err) + } rpc.HandleHTTP() - - listener, err := net.Listen("tcp", "0.0.0.0:5050") - if err != nil { + if listener, err := net.Listen("tcp", "0.0.0.0:5050"); err == nil { + go func() { + _ = http.Serve(listener, nil) + }() + } else { panic(err) } - go http.Serve(listener, nil) } func TestNewRPCWriter(t *testing.T) { From 3f68f53f072574ce365d4d5ce9985c3a9d7ac702 Mon Sep 17 00:00:00 2001 From: Akshat Jaimini Date: Sun, 7 Jul 2024 15:43:54 +0530 Subject: [PATCH 12/16] Sync Metric Added; Also added tests for this --- src/sinks/rpc.go | 57 +++++++++++++++++++++++++++++++++++++------ src/sinks/rpc_test.go | 22 ++++++++++++++++- 2 files changed, 71 insertions(+), 8 deletions(-) diff --git a/src/sinks/rpc.go b/src/sinks/rpc.go index 91b61fc529..839d442d20 100644 --- a/src/sinks/rpc.go +++ b/src/sinks/rpc.go @@ -9,7 +9,10 @@ package sinks import ( "context" + "log" "net/rpc" + "os" + "strconv" "github.com/cybertec-postgresql/pgwatch3/metrics" ) @@ -44,18 +47,58 @@ func (rw *RPCWriter) Write(msgs []metrics.MeasurementMessage) error { if len(msgs) == 0 { return nil } - var status int - err := rw.client.Call("RPCWriter.UpdateMeasurements", msgs, &status) +// <<<<<<< HEAD +// var status int +// err := rw.client.Call("RPCWriter.UpdateMeasurements", msgs, &status) +// if err != nil { +// return err +// Can't use this, messes up with the RPC Function signature +// ======= + for _, msg := range msgs { + var status int + pgwatchID := os.Getenv("pgwatchID") + msg.CustomTags = make(map[string]string) + if len(pgwatchID) > 0 { + msg.CustomTags["pgwatchId"] = pgwatchID + } else { + msg.CustomTags["pgwatchId"] = strconv.Itoa(os.Getpid()) + "_pgwatch3" // Replaces with PID to create a pgwatchid + } + err := rw.client.Call("Receiver.UpdateMeasurements", &msg, &status) + if err != nil { + return err + } + } +// >>>>>>> bc59692 (Sync Metric Added; Also added tests for this) +// } + return nil +} + +type SyncReq struct { + OPR string + DBName string + PgwatchID string + MetricName string +} + +func (rw *RPCWriter) SyncMetric(dbUnique string, metricName string, op string) error { + syncReq := new(SyncReq) + + syncReq.DBName = dbUnique + syncReq.OPR = op + syncReq.PgwatchID = os.Getenv("pgwatchID") + syncReq.MetricName = metricName + + var logMsg string + err := rw.client.Call("Receiver.SyncMetricSignal", &syncReq, &logMsg) + if err != nil { return err } - return nil -} -func (rw *RPCWriter) SyncMetric(_, _, _ string) error { - if rw.ctx.Err() != nil { - return rw.ctx.Err() + if len(logMsg) > 0 { + log.Println("[Remote SINK INFO]: ", logMsg) } + return nil } diff --git a/src/sinks/rpc_test.go b/src/sinks/rpc_test.go index 1d1e8e2e51..7ac8d85992 100644 --- a/src/sinks/rpc_test.go +++ b/src/sinks/rpc_test.go @@ -7,6 +7,7 @@ import ( "net/http" "net/rpc" "testing" + "fmt" "github.com/cybertec-postgresql/pgwatch3/metrics" "github.com/cybertec-postgresql/pgwatch3/sinks" @@ -44,9 +45,15 @@ func init() { }() } else { panic(err) - } + } } +func (receiver *RPCWriter) SyncMetricSignal(syncReq *sinks.SyncReq, logMsg *string) error { + *logMsg = "Received>> DBName: " + syncReq.DBName + " OPR: " + syncReq.OPR + " ON: " + syncReq.PgwatchID + return nil +} + +// Test begin from here --------------------------------------------------------- func TestNewRPCWriter(t *testing.T) { a := assert.New(t) _, err := sinks.NewRPCWriter(ctxt, "foo") @@ -88,3 +95,16 @@ func TestRPCWrite(t *testing.T) { err = rw.Write(msgs) a.Error(err) } + +func TestRPCSyncMetric(t *testing.T) { + port := 5050 + rw, err := sinks.NewRPCWriter(ctxt, "0.0.0.0:"+fmt.Sprint(port)) + if err != nil { + t.Error("Unable to send sync metric signal") + } + + err = rw.SyncMetric("Test-DB", "DB-Metric", "Add") + if err != nil { + t.Error("Test Failed: ", err) + } +} From 791e4942a835b6131b506f6f40c0dbdab9709d09 Mon Sep 17 00:00:00 2001 From: Akshat Jaimini Date: Sun, 7 Jul 2024 16:15:15 +0530 Subject: [PATCH 13/16] Added test for RPC Sync Metric Function. Also Updated the test function definitions to match the definitions of RPC Sink Receiver Definitions, will change them back once I figure out how to send the entire array over RPC --- src/sinks/rpc.go | 17 +++++------------ src/sinks/rpc_test.go | 24 ++++++++++-------------- 2 files changed, 15 insertions(+), 26 deletions(-) diff --git a/src/sinks/rpc.go b/src/sinks/rpc.go index 839d442d20..8bd63d4836 100644 --- a/src/sinks/rpc.go +++ b/src/sinks/rpc.go @@ -11,8 +11,8 @@ import ( "context" "log" "net/rpc" - "os" - "strconv" + "os" + "strconv" "github.com/cybertec-postgresql/pgwatch3/metrics" ) @@ -47,13 +47,6 @@ func (rw *RPCWriter) Write(msgs []metrics.MeasurementMessage) error { if len(msgs) == 0 { return nil } -// <<<<<<< HEAD -// var status int -// err := rw.client.Call("RPCWriter.UpdateMeasurements", msgs, &status) -// if err != nil { -// return err -// Can't use this, messes up with the RPC Function signature -// ======= for _, msg := range msgs { var status int pgwatchID := os.Getenv("pgwatchID") @@ -67,9 +60,9 @@ func (rw *RPCWriter) Write(msgs []metrics.MeasurementMessage) error { if err != nil { return err } - } -// >>>>>>> bc59692 (Sync Metric Added; Also added tests for this) -// } + } + // >>>>>>> bc59692 (Sync Metric Added; Also added tests for this) + // } return nil } diff --git a/src/sinks/rpc_test.go b/src/sinks/rpc_test.go index 7ac8d85992..dd318688c5 100644 --- a/src/sinks/rpc_test.go +++ b/src/sinks/rpc_test.go @@ -3,30 +3,27 @@ package sinks_test import ( "context" "errors" + "fmt" "net" "net/http" "net/rpc" "testing" - "fmt" "github.com/cybertec-postgresql/pgwatch3/metrics" "github.com/cybertec-postgresql/pgwatch3/sinks" "github.com/stretchr/testify/assert" ) -type RPCWriter struct { +type Receiver struct { } var ctxt = context.Background() -func (receiver *RPCWriter) UpdateMeasurements(msgs []metrics.MeasurementMessage, status *int) error { - if msgs == nil { +func (receiver *Receiver) UpdateMeasurements(msg *metrics.MeasurementMessage, status *int) error { + if msg == nil { return errors.New("msgs is nil") } - if len(msgs) == 0 { - return nil - } - if msgs[0].DBName != "Db" { + if msg.DBName != "Db" { return errors.New("invalid message") } *status = 1 @@ -34,7 +31,7 @@ func (receiver *RPCWriter) UpdateMeasurements(msgs []metrics.MeasurementMessage, } func init() { - recv := new(RPCWriter) + recv := new(Receiver) if err := rpc.Register(recv); err != nil { panic(err) } @@ -45,10 +42,10 @@ func init() { }() } else { panic(err) - } + } } -func (receiver *RPCWriter) SyncMetricSignal(syncReq *sinks.SyncReq, logMsg *string) error { +func (receiver *Receiver) SyncMetricSignal(syncReq *sinks.SyncReq, logMsg *string) error { *logMsg = "Received>> DBName: " + syncReq.DBName + " OPR: " + syncReq.OPR + " ON: " + syncReq.PgwatchID return nil } @@ -98,13 +95,12 @@ func TestRPCWrite(t *testing.T) { func TestRPCSyncMetric(t *testing.T) { port := 5050 + a := assert.New(t) rw, err := sinks.NewRPCWriter(ctxt, "0.0.0.0:"+fmt.Sprint(port)) if err != nil { t.Error("Unable to send sync metric signal") } err = rw.SyncMetric("Test-DB", "DB-Metric", "Add") - if err != nil { - t.Error("Test Failed: ", err) - } + a.NoError(err) } From 2a5cb270bf890afe5c611cb95f22dab441e48228 Mon Sep 17 00:00:00 2001 From: Akshat Jaimini Date: Sun, 14 Jul 2024 14:13:48 +0530 Subject: [PATCH 14/16] Sync Metric Signals improved --- src/sinks/rpc.go | 3 +-- src/sinks/rpc_test.go | 10 +++++----- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/src/sinks/rpc.go b/src/sinks/rpc.go index 8bd63d4836..cc07949716 100644 --- a/src/sinks/rpc.go +++ b/src/sinks/rpc.go @@ -60,9 +60,8 @@ func (rw *RPCWriter) Write(msgs []metrics.MeasurementMessage) error { if err != nil { return err } + rw.SyncMetric("demo", "abc", "ADD") } - // >>>>>>> bc59692 (Sync Metric Added; Also added tests for this) - // } return nil } diff --git a/src/sinks/rpc_test.go b/src/sinks/rpc_test.go index dd318688c5..9916a67815 100644 --- a/src/sinks/rpc_test.go +++ b/src/sinks/rpc_test.go @@ -30,6 +30,11 @@ func (receiver *Receiver) UpdateMeasurements(msg *metrics.MeasurementMessage, st return nil } +func (receiver *Receiver) SyncMetricSignal(syncReq *sinks.SyncReq, logMsg *string) error { + *logMsg = "Received>> DBName: " + syncReq.DBName + " OPR: " + syncReq.OPR + " ON: " + syncReq.PgwatchID + return nil +} + func init() { recv := new(Receiver) if err := rpc.Register(recv); err != nil { @@ -45,11 +50,6 @@ func init() { } } -func (receiver *Receiver) SyncMetricSignal(syncReq *sinks.SyncReq, logMsg *string) error { - *logMsg = "Received>> DBName: " + syncReq.DBName + " OPR: " + syncReq.OPR + " ON: " + syncReq.PgwatchID - return nil -} - // Test begin from here --------------------------------------------------------- func TestNewRPCWriter(t *testing.T) { a := assert.New(t) From 41540ab24c475b36dbd0d9a8ca5c2d4dbfd521f8 Mon Sep 17 00:00:00 2001 From: Akshat Jaimini Date: Sun, 14 Jul 2024 17:01:07 +0530 Subject: [PATCH 15/16] Pgwatch3 id removed, test statements removed --- src/sinks/rpc.go | 9 --------- 1 file changed, 9 deletions(-) diff --git a/src/sinks/rpc.go b/src/sinks/rpc.go index cc07949716..8fd68679f9 100644 --- a/src/sinks/rpc.go +++ b/src/sinks/rpc.go @@ -12,7 +12,6 @@ import ( "log" "net/rpc" "os" - "strconv" "github.com/cybertec-postgresql/pgwatch3/metrics" ) @@ -49,18 +48,10 @@ func (rw *RPCWriter) Write(msgs []metrics.MeasurementMessage) error { } for _, msg := range msgs { var status int - pgwatchID := os.Getenv("pgwatchID") - msg.CustomTags = make(map[string]string) - if len(pgwatchID) > 0 { - msg.CustomTags["pgwatchId"] = pgwatchID - } else { - msg.CustomTags["pgwatchId"] = strconv.Itoa(os.Getpid()) + "_pgwatch3" // Replaces with PID to create a pgwatchid - } err := rw.client.Call("Receiver.UpdateMeasurements", &msg, &status) if err != nil { return err } - rw.SyncMetric("demo", "abc", "ADD") } return nil } From 3b050f9cdfb1e244fe57a9773ade229393f4ff7f Mon Sep 17 00:00:00 2001 From: Pavlo Golub Date: Sun, 14 Jul 2024 15:28:11 +0200 Subject: [PATCH 16/16] prettify the code: [-] remove the header comment [*] use "pgwatch3/log" instead of "log" [*] unify the second `rpc.Call` argument for all methods [*] fix naming [+] enhance tests --- src/sinks/rpc.go | 48 +++++++++++++++++-------------------------- src/sinks/rpc_test.go | 27 ++++++++++++++++++++---- 2 files changed, 42 insertions(+), 33 deletions(-) diff --git a/src/sinks/rpc.go b/src/sinks/rpc.go index 8fd68679f9..ed057cfbfa 100644 --- a/src/sinks/rpc.go +++ b/src/sinks/rpc.go @@ -1,18 +1,10 @@ -/* -* -* RPC sink implementation for pgwatch3. -* Requires the address and port of the sink. -* - */ - package sinks import ( "context" - "log" "net/rpc" - "os" + "github.com/cybertec-postgresql/pgwatch3/log" "github.com/cybertec-postgresql/pgwatch3/metrics" ) @@ -23,7 +15,6 @@ type RPCWriter struct { } func NewRPCWriter(ctx context.Context, address string) (*RPCWriter, error) { - client, err := rpc.DialHTTP("tcp", address) if err != nil { return nil, err @@ -46,42 +37,41 @@ func (rw *RPCWriter) Write(msgs []metrics.MeasurementMessage) error { if len(msgs) == 0 { return nil } + l := log.GetLogger(rw.ctx). + WithField("sink", "rpc"). + WithField("address", rw.address) for _, msg := range msgs { - var status int - err := rw.client.Call("Receiver.UpdateMeasurements", &msg, &status) - if err != nil { + var logMsg string + if err := rw.client.Call("Receiver.UpdateMeasurements", &msg, &logMsg); err != nil { return err } + if len(logMsg) > 0 { + l.Info(logMsg) + } } return nil } type SyncReq struct { - OPR string - DBName string - PgwatchID string + DbName string MetricName string + Operation string } func (rw *RPCWriter) SyncMetric(dbUnique string, metricName string, op string) error { - syncReq := new(SyncReq) - - syncReq.DBName = dbUnique - syncReq.OPR = op - syncReq.PgwatchID = os.Getenv("pgwatchID") - syncReq.MetricName = metricName - var logMsg string - err := rw.client.Call("Receiver.SyncMetricSignal", &syncReq, &logMsg) - - if err != nil { + if err := rw.client.Call("Receiver.SyncMetric", &SyncReq{ + Operation: op, + DbName: dbUnique, + MetricName: metricName, + }, &logMsg); err != nil { return err } - if len(logMsg) > 0 { - log.Println("[Remote SINK INFO]: ", logMsg) + log.GetLogger(rw.ctx). + WithField("sink", "rpc"). + WithField("address", rw.address).Info(logMsg) } - return nil } diff --git a/src/sinks/rpc_test.go b/src/sinks/rpc_test.go index 9916a67815..b6c2f89f80 100644 --- a/src/sinks/rpc_test.go +++ b/src/sinks/rpc_test.go @@ -19,19 +19,25 @@ type Receiver struct { var ctxt = context.Background() -func (receiver *Receiver) UpdateMeasurements(msg *metrics.MeasurementMessage, status *int) error { +func (receiver *Receiver) UpdateMeasurements(msg *metrics.MeasurementMessage, logMsg *string) error { if msg == nil { return errors.New("msgs is nil") } if msg.DBName != "Db" { return errors.New("invalid message") } - *status = 1 + *logMsg = fmt.Sprintf("Received: %+v", *msg) return nil } -func (receiver *Receiver) SyncMetricSignal(syncReq *sinks.SyncReq, logMsg *string) error { - *logMsg = "Received>> DBName: " + syncReq.DBName + " OPR: " + syncReq.OPR + " ON: " + syncReq.PgwatchID +func (receiver *Receiver) SyncMetric(syncReq *sinks.SyncReq, logMsg *string) error { + if syncReq == nil { + return errors.New("msgs is nil") + } + if syncReq.Operation == "invalid" { + return errors.New("invalid message") + } + *logMsg = fmt.Sprintf("Received: %+v", *syncReq) return nil } @@ -101,6 +107,19 @@ func TestRPCSyncMetric(t *testing.T) { t.Error("Unable to send sync metric signal") } + // no error for valid messages err = rw.SyncMetric("Test-DB", "DB-Metric", "Add") a.NoError(err) + + // error for invalid messages + err = rw.SyncMetric("", "", "invalid") + a.Error(err) + + // error for cancelled context + ctx, cancel := context.WithCancel(ctxt) + rw, err = sinks.NewRPCWriter(ctx, "0.0.0.0:5050") + a.NoError(err) + cancel() + err = rw.SyncMetric("Test-DB", "DB-Metric", "Add") + a.Error(err) }