Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/traffic db name #4548

Merged
merged 2 commits into from
Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion controllers/pkg/database/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type Account interface {
GetBillingCount(accountType common.Type, startTime, endTime time.Time) (count, amount int64, err error)
GenerateBillingData(startTime, endTime time.Time, prols *resources.PropertyTypeLS, namespaces []string, owner string) (orderID []string, amount int64, err error)
InsertMonitor(ctx context.Context, monitors ...*resources.Monitor) error
GetDistinctMonitorCombinations(startTime, endTime time.Time, namespace string) ([]resources.Monitor, error)
GetDistinctMonitorCombinations(startTime, endTime time.Time) ([]resources.Monitor, error)
DropMonitorCollectionsOlderThan(days int) error
Disconnect(ctx context.Context) error
Creator
Expand Down
37 changes: 23 additions & 14 deletions controllers/pkg/database/mongo/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"strings"
"time"

"github.com/labring/sealos/controllers/pkg/utils/env"

"github.com/labring/sealos/controllers/pkg/common"
"github.com/labring/sealos/controllers/pkg/database"

Expand All @@ -39,9 +41,15 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

const (
EnvAccountDBName = "ACCOUNT_DB_NAME"
EnvTrafficDBName = "TRAFFIC_DB_NAME"
EnvTrafficConn = "TRAFFIC_CONN"
)

const (
DefaultAccountDBName = "sealos-resources"
DefaultTrafficDBName = "sealos-networkmanager-synchronizer"
DefaultTrafficDBName = "sealos-networkmanager"
DefaultAuthDBName = "sealos-auth"
DefaultMeteringConn = "metering"
DefaultMonitorConn = "monitor"
Expand Down Expand Up @@ -251,14 +259,13 @@ func (m *mongoDB) InsertMonitor(ctx context.Context, monitors ...*resources.Moni
return err
}

func (m *mongoDB) GetDistinctMonitorCombinations(startTime, endTime time.Time, namespace string) ([]resources.Monitor, error) {
func (m *mongoDB) GetDistinctMonitorCombinations(startTime, endTime time.Time) ([]resources.Monitor, error) {
pipeline := mongo.Pipeline{
{{Key: "$match", Value: bson.M{
"time": bson.M{
"$gte": startTime.UTC(),
"$lt": endTime.UTC(),
},
"category": namespace,
}}},
{{Key: "$group", Value: bson.M{
"_id": bson.M{
Expand All @@ -267,21 +274,23 @@ func (m *mongoDB) GetDistinctMonitorCombinations(startTime, endTime time.Time, n
"type": "$type",
},
}}},
{{Key: "$project", Value: bson.M{
"_id": 0,
"category": "$_id.category",
"name": "$_id.name",
"type": "$_id.type",
}}},
}
cursor, err := m.getMonitorCollection(startTime).Aggregate(context.Background(), pipeline)
if err != nil {
return nil, fmt.Errorf("aggregate error: %v", err)
}
defer cursor.Close(context.Background())
var monitors []resources.Monitor
for cursor.Next(context.Background()) {
var result = make(map[string]resources.Monitor, 1)
if err := cursor.Decode(result); err != nil {
return nil, fmt.Errorf("decode error: %v", err)
}
monitors = append(monitors, result["_id"])
if !cursor.Next(context.Background()) {
return nil, nil
}
if err := cursor.Err(); err != nil {
var monitors []resources.Monitor
if err := cursor.All(context.Background(), &monitors); err != nil {
return nil, fmt.Errorf("cursor error: %v", err)
}
return monitors, nil
Expand Down Expand Up @@ -941,15 +950,15 @@ func NewMongoInterface(ctx context.Context, URL string) (database.Interface, err
err = client.Ping(ctx, nil)
return &mongoDB{
Client: client,
AccountDB: DefaultAccountDBName,
TrafficDB: DefaultTrafficDBName,
AccountDB: env.GetEnvWithDefault(EnvAccountDBName, DefaultAccountDBName),
TrafficDB: env.GetEnvWithDefault(EnvTrafficDBName, DefaultTrafficDBName),
AuthDB: DefaultAuthDBName,
UserConn: DefaultUserConn,
MeteringConn: DefaultMeteringConn,
MonitorConnPrefix: DefaultMonitorConn,
BillingConn: DefaultBillingConn,
PricesConn: DefaultPricesConn,
PropertiesConn: DefaultPropertiesConn,
TrafficConn: DefaultTrafficConn,
TrafficConn: env.GetEnvWithDefault(EnvTrafficConn, DefaultTrafficConn),
}, err
}
20 changes: 20 additions & 0 deletions controllers/pkg/database/mongo/account_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,3 +594,23 @@ func TestMongoDB_SetPropertyTypeLS(t *testing.T) {
// t.Fatalf("failed to save property types: %v", err)
//}
}

func Test_mongoDB_GetDistinctMonitorCombinations(t *testing.T) {
dbCTX := context.Background()

m, err := NewMongoInterface(dbCTX, os.Getenv("MONGODB_URI"))
if err != nil {
t.Errorf("failed to connect mongo: error = %v", err)
}
defer func() {
if err = m.Disconnect(dbCTX); err != nil {
t.Errorf("failed to disconnect mongo: error = %v", err)
}
}()
queryTime := time.Now().UTC()
monitorCombinations, err := m.GetDistinctMonitorCombinations(queryTime.Add(-time.Hour), queryTime)
if err != nil {
t.Fatalf("failed to get distinct monitor combinations: %v", err)
}
t.Logf("monitorCombinations: %v", monitorCombinations)
}
75 changes: 42 additions & 33 deletions controllers/resources/controllers/monitor_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"sync"
"time"

"golang.org/x/sync/errgroup"

"github.com/labring/sealos/controllers/pkg/utils/env"

"golang.org/x/sync/semaphore"
Expand Down Expand Up @@ -400,47 +402,54 @@ func (r *MonitorReconciler) getObjStorageUsed(user string, namedMap *map[string]
}

func (r *MonitorReconciler) MonitorPodTrafficUsed(startTime, endTime time.Time) error {
namespaceList, err := r.getNamespaceList()
if err != nil {
return fmt.Errorf("failed to list namespaces")
}
logger.Info("start getPodTrafficUsed", "startTime", startTime.Format(time.RFC3339), "endTime", endTime.Format(time.RFC3339))
for _, namespace := range namespaceList.Items {
if err := r.monitorPodTrafficUsed(namespace, startTime, endTime); err != nil {
r.Logger.Error(err, "failed to monitor pod traffic used", "namespace", namespace.Name)
}
execTime := time.Now().UTC()
if err := r.monitorPodTrafficUsed(startTime, endTime); err != nil {
r.Logger.Error(err, "failed to monitor pod traffic used")
}
r.Logger.Info("success to monitor pod traffic used", "startTime", startTime.Format(time.RFC3339), "endTime", endTime.Format(time.RFC3339), "execTime", time.Since(execTime).String())
return nil
}

func (r *MonitorReconciler) monitorPodTrafficUsed(namespace corev1.Namespace, startTime, endTime time.Time) error {
monitors, err := r.DBClient.GetDistinctMonitorCombinations(startTime, endTime, namespace.Name)
func (r *MonitorReconciler) monitorPodTrafficUsed(startTime, endTime time.Time) error {
monitors, err := r.DBClient.GetDistinctMonitorCombinations(startTime, endTime)
if err != nil {
return fmt.Errorf("failed to get distinct monitor combinations: %w", err)
}
for _, monitor := range monitors {
bytes, err := r.TrafficClient.GetTrafficSentBytes(startTime, endTime, namespace.Name, monitor.Type, monitor.Name)
if err != nil {
return fmt.Errorf("failed to get traffic sent bytes: %w", err)
}
unit := r.Properties.StringMap[resources.ResourceNetwork].Unit
used := int64(math.Ceil(float64(resource.NewQuantity(bytes, resource.BinarySI).MilliValue()) / float64(unit.MilliValue())))
if used == 0 {
continue
}
logger.Info("traffic used ", "monitor", monitor, "used", used, "unit", unit, "bytes", bytes)
ro := resources.Monitor{
Category: namespace.Name,
Name: monitor.Name,
Used: map[uint8]int64{r.Properties.StringMap[resources.ResourceNetwork].Enum: used},
Time: endTime.Add(-1 * time.Minute),
Type: monitor.Type,
}
r.Logger.Info("monitor traffic used", "monitor", ro)
err = r.DBClient.InsertMonitor(context.Background(), &ro)
if err != nil {
return fmt.Errorf("failed to insert monitor: %w", err)
}
r.Logger.Info("distinct monitor combinations", "monitors len", len(monitors))
wg, _ := errgroup.WithContext(context.Background())
wg.SetLimit(100)
for i := range monitors {
monitor := monitors[i]
wg.Go(func() error {
return r.handlerTrafficUsed(startTime, endTime, monitor)
})
}
return wg.Wait()
}

func (r *MonitorReconciler) handlerTrafficUsed(startTime, endTime time.Time, monitor resources.Monitor) error {
bytes, err := r.TrafficClient.GetTrafficSentBytes(startTime, endTime, monitor.Category, monitor.Type, monitor.Name)
if err != nil {
return fmt.Errorf("failed to get traffic sent bytes: %w", err)
}
unit := r.Properties.StringMap[resources.ResourceNetwork].Unit
used := int64(math.Ceil(float64(resource.NewQuantity(bytes, resource.BinarySI).MilliValue()) / float64(unit.MilliValue())))
if used == 0 {
return nil
}
//logger.Info("traffic used ", "monitor", monitor, "used", used, "unit", unit, "bytes", bytes)
ro := resources.Monitor{
Category: monitor.Category,
Name: monitor.Name,
Used: map[uint8]int64{r.Properties.StringMap[resources.ResourceNetwork].Enum: used},
Time: endTime.Add(-1 * time.Minute),
Type: monitor.Type,
}
r.Logger.Info("monitor traffic used", "monitor", ro)
err = r.DBClient.InsertMonitor(context.Background(), &ro)
if err != nil {
return fmt.Errorf("failed to insert monitor: %w", err)
}
return nil
}
Expand Down
Loading