Skip to content

Commit

Permalink
Support get metric for general pql that doesn't depend pod/node resource
Browse files Browse the repository at this point in the history
  • Loading branch information
nkwangleiGIT committed Oct 21, 2022
1 parent 7a91ca8 commit b5eeed9
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 17 deletions.
33 changes: 32 additions & 1 deletion observer-plugins/metric-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ import (
"flag"
"log"
"net"
"os"
"os/signal"
"syscall"

//"github.com/smoky8/pkg/lib/go/obi"
"google.golang.org/grpc"
Expand All @@ -37,6 +40,9 @@ var (
endpoint = flag.String("endpoint", "/var/run/observer.sock", "unix socket domain for current server")
kubeconfig = flag.String("kubeconfig", "", "kubernetes auth config file")
)
var (
shutdownSignals = []os.Signal{os.Interrupt, syscall.SIGTERM}
)

func main() {
klog.InitFlags(flag.CommandLine)
Expand All @@ -61,8 +67,11 @@ func main() {
if err != nil {
log.Fatalf("%s create metric client error: %s", server.PluginName, err)
}
metricServer := grpc.NewServer()

// Setup signal watcher to handle cleanup
SetupSignalHandler(*endpoint)

metricServer := grpc.NewServer()
obi.RegisterServerServer(metricServer, server.NewServer(clientSet))
listen, err := net.Listen("unix", *endpoint)
if err != nil {
Expand All @@ -72,3 +81,25 @@ func main() {

klog.Fatalln(metricServer.Serve(listen))
}

// SetupSignalHandler registered for SIGTERM and SIGINT. A stop channel is returned
// which is closed on one of these signals. If a second signal is caught, the program
// is terminated with exit code 1.
func SetupSignalHandler(socketFile string) {
c := make(chan os.Signal)
signal.Notify(c, shutdownSignals...)
go func() {
for s := range c {
switch s {
case os.Interrupt, syscall.SIGTERM:
klog.Infoln("Shutting down normally...")
if err := os.RemoveAll(socketFile); err != nil {
klog.Fatal(err)
}
os.Exit(1)
default:
klog.Infoln("Got signal", s)
}
}
}()
}
32 changes: 31 additions & 1 deletion observer-plugins/prometheus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ import (
"flag"
"log"
"net"
"os"
"os/signal"
"syscall"

"google.golang.org/grpc"
"k8s.io/client-go/kubernetes"
Expand All @@ -38,6 +41,9 @@ var (
stepSeconds = flag.Int64("step", 60, "query steps")
rangeMinute = flag.Int64("range", 2, "prometheus, the maximum time between two slices within the boundaries.")
)
var (
shutdownSignals = []os.Signal{os.Interrupt, syscall.SIGTERM}
)

func main() {
klog.InitFlags(flag.CommandLine)
Expand All @@ -64,6 +70,8 @@ func main() {
if err != nil {
klog.Fatal(err)
}
// Setup signal watcher to handle cleanup
SetupSignalHandler(*endpoint)

server := grpc.NewServer()
obi.RegisterServerServer(server, prometheus.NewPrometheusServer(*address, conf, *stepSeconds, *rangeMinute))
Expand All @@ -72,6 +80,28 @@ func main() {
log.Fatal(err)
}

klog.Infof("%s starting work...", prometheus.PluginName)
klog.Infof("%s plugin started ...", prometheus.PluginName)
klog.Fatalln(server.Serve(listen))
}

// SetupSignalHandler registered for SIGTERM and SIGINT. A stop channel is returned
// which is closed on one of these signals. If a second signal is caught, the program
// is terminated with exit code 1.
func SetupSignalHandler(socketFile string) {
c := make(chan os.Signal)
signal.Notify(c, shutdownSignals...)
go func() {
for s := range c {
switch s {
case os.Interrupt, syscall.SIGTERM:
klog.Infoln("Shutting down normally...")
if err := os.RemoveAll(socketFile); err != nil {
klog.Fatal(err)
}
os.Exit(1)
default:
klog.Infoln("Got signal", s)
}
}
}()
}
30 changes: 21 additions & 9 deletions observer-plugins/prometheus/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package prometheus

import (
"encoding/json"
"fmt"
"time"

Expand Down Expand Up @@ -65,7 +66,7 @@ type CalculateAux struct {
Value float64
}

func (p *prometheusServer) Query(startTime, endTime time.Time, query, op string) (DataSeries, error) {
func (p *prometheusServer) Query(startTime, endTime time.Time, kind, query, op string) (DataSeries, error) {
method := "prometheusServer.Query"
ans := DataSeries{Timestamp: endTime.UnixMilli()}
prometheusAPI, err := p.NewPrometheusAPI()
Expand All @@ -86,18 +87,29 @@ func (p *prometheusServer) Query(startTime, endTime time.Time, query, op string)
klog.V(4).Infof("%s quer '%s' result with warnings %v\n", method, warnings)
}

data, err := formatRawValues(result, op)
if err != nil {
return ans, err
}

if f, ok := actionFuncs[op]; ok {
f(data, &ans)
// TODO: Use kind as the raw data query, may add a 'rawData: true' property for this?
if kind == "Pod" || kind == "Node" {
data, err := formatRawValues(result)
if err != nil {
return ans, err
}
if f, ok := actionFuncs[op]; ok {
f(data, &ans)
}
} else {
// Handle raw data if no aggregation defined, just return the json data
jsonValue, err := json.Marshal(result)
if err != nil {
klog.Errorf("failed to marshal result to json: %s", err)
ans.Value = fmt.Sprintf("failed to get json value: %s " + result.String())
} else {
ans.Value = string(jsonValue)
}
}
return ans, nil
}

func formatRawValues(rawValue model.Value, op string) ([]CalculateAux, error) {
func formatRawValues(rawValue model.Value) ([]CalculateAux, error) {
ans := make([]CalculateAux, 0)
switch rawValue.Type() {
case model.ValScalar:
Expand Down
16 changes: 10 additions & 6 deletions observer-plugins/prometheus/prometheus/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const (
MaxAction = "max"
MinAction = "min"
AvgAction = "avg"
NoneAction = "none"
)

// impl obi interface
Expand Down Expand Up @@ -77,24 +78,27 @@ func (p *prometheusServer) GetMetrics(ctx context.Context, req *obi.GetMetricsRe

var err error
klog.V(4).Infof("prometheus query: %s\n", req.Query)
var resourceName string
if len(req.ResourceNames) > 0 {
resourceName = req.ResourceNames[0]
}
result := &obi.GetMetricsResponse{
ResourceName: req.ResourceNames[0],
ResourceName: resourceName,
Namespace: req.Namespace,
Unit: req.Unit,
Records: []*obi.GetMetricsResponseRecord{},
}

// use avgerage as the default aggregation action
op := AvgAction
if len(req.Aggregation) > 0 {
op = req.Aggregation[0]
}
klog.Infof("exec aggregation is: %s\n", op)
metricData, err := p.Query(startTime, endTime, req.Query, op)
metricData, err := p.Query(startTime, endTime, req.Kind, req.Query, op)
if err != nil {
klog.Errorf("%s query error: %s\n", method, err)
return result, err
}

// only return the latest record
result.Records = append(result.Records, &obi.GetMetricsResponseRecord{Timestamp: metricData.Timestamp, Value: metricData.Value})
/*
Expand All @@ -103,8 +107,8 @@ func (p *prometheusServer) GetMetrics(ctx context.Context, req *obi.GetMetricsRe
}
*/

klog.Infof("query by %s successfully", req.MetricName)
klog.V(5).Infof("%s query by %s result: %v\n", method, req.MetricName, metricData)
klog.Infof("query by metric '%s', query '%s' successfully", req.MetricName, req.Query)
klog.V(5).Infof("%s query by %s, %s result: %v\n", method, req.MetricName, req.Query, metricData)

return result, nil
}

0 comments on commit b5eeed9

Please sign in to comment.