Skip to content

Commit

Permalink
fix comments
Browse files Browse the repository at this point in the history
  • Loading branch information
linrunqi08 committed Jan 9, 2024
1 parent 77ffaaa commit de51063
Show file tree
Hide file tree
Showing 45 changed files with 855 additions and 584 deletions.
13 changes: 6 additions & 7 deletions core/monitor/MetricExportor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "LogFileProfiler.h"
#include "MetricConstants.h"
#include "go_pipeline/LogtailPlugin.h"
#include "app_config/AppConfig.h"

using namespace sls_logs;
using namespace std;
Expand All @@ -29,8 +30,8 @@ void MetricExportor::PushGoPluginMetrics() {
break;
}
}
if (configName != "") {
auto search = tmpConfigNameToRegion.find("configName");
if (!configName.empty()) {
auto search = tmpConfigNameToRegion.find(configName);
if (search != tmpConfigNameToRegion.end()) {
region = search->second;
} else {
Expand All @@ -49,7 +50,6 @@ void MetricExportor::PushGoPluginMetrics() {
logPtr = logGroup->add_logs();
goLogGroupMap.insert(std::pair<std::string, sls_logs::LogGroup*>(region, logGroup));
}

auto now = GetCurrentLogtailTime();
SetLogTime(logPtr,
AppConfig::GetInstance()->EnableLogTimeAutoAdjust() ? now.tv_sec + GetTimeDelta() : now.tv_sec);
Expand All @@ -59,8 +59,6 @@ void MetricExportor::PushGoPluginMetrics() {
contentPtr->set_value(pair.second);
}
}
LOG_INFO(sLogger, ("goLogGroupMap", goLogGroupMap.size()));

SendMetrics(goLogGroupMap);
}

Expand All @@ -86,8 +84,9 @@ void MetricExportor::PushMetrics(bool forceSend) {
return;
}

PushGoPluginMetrics();

if (LogtailPlugin::GetInstance()->IsPluginOpened()) {
PushGoPluginMetrics();
}

ReadMetrics::GetInstance()->UpdateMetrics();
std::map<std::string, sls_logs::LogGroup*> logGroupMap;
Expand Down
186 changes: 186 additions & 0 deletions pkg/helper/collector_imp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
// Copyright 2021 iLogtail Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package helper

import (
"github.com/alibaba/ilogtail/pkg/models"
"github.com/alibaba/ilogtail/pkg/pipeline"
)

// Observable pipeline data collector, which stores data based on channal and can be subscribed by multiple consumers
type observePipeCollector struct {
groupChan chan *models.PipelineGroupEvents
metricRecord *pipeline.MetricsRecord
procInRecordsTotal pipeline.CounterMetric
procOutRecordsTotal pipeline.CounterMetric
procTimeMS pipeline.CounterMetric
}


func (p *observePipeCollector) Collect(group *models.GroupInfo, events ...models.PipelineEvent) {
if len(events) == 0 {
return
}
p.procInRecordsTotal.Add(int64(len(events)))
p.groupChan <- &models.PipelineGroupEvents{
Group: group,
Events: events,
}
p.procOutRecordsTotal.Add(int64(len(events)))
}

func (p *observePipeCollector) CollectList(groups ...*models.PipelineGroupEvents) {
if len(groups) == 0 {
return
}
for _, g := range groups {
p.procInRecordsTotal.Add(int64(len(g.Events)))
p.groupChan <- g
p.procOutRecordsTotal.Add(int64(len(g.Events)))
}
}

func (p *observePipeCollector) ToArray() []*models.PipelineGroupEvents {
totalCount := len(p.groupChan)
results := make([]*models.PipelineGroupEvents, totalCount)
for i := 0; i < totalCount; i++ {
results[i] = <-p.groupChan
}
return results
}

func (p *observePipeCollector) Observe() chan *models.PipelineGroupEvents {
return p.groupChan
}

func (p *observePipeCollector) Close() {
close(p.groupChan)
}

// groupedPipeCollector group the collected PipelineEvent by groupInfo.
// The limitation is that it cannot be subscribed as it always returns an empty chan.
// so it can only return all the grouped data at one time.
type groupedPipeCollector struct {
groupEvents map[*models.GroupInfo][]models.PipelineEvent
metricRecord *pipeline.MetricsRecord
procInRecordsTotal pipeline.CounterMetric
procOutRecordsTotal pipeline.CounterMetric
procTimeMS pipeline.CounterMetric
}


func (p *groupedPipeCollector) Collect(group *models.GroupInfo, events ...models.PipelineEvent) {
if len(events) == 0 {
return
}
p.procInRecordsTotal.Add(int64(len(events)))
store, has := p.groupEvents[group]
if !has {
store = make([]models.PipelineEvent, 0)
}
p.groupEvents[group] = append(store, events...)
p.procOutRecordsTotal.Add(int64(len(events)))
}

func (p *groupedPipeCollector) CollectList(groups ...*models.PipelineGroupEvents) {
if len(groups) == 0 {
return
}
for _, g := range groups {
p.procInRecordsTotal.Add(int64(len(g.Events)))
p.Collect(g.Group, g.Events...)
p.procOutRecordsTotal.Add(int64(len(g.Events)))
}
}

func (p *groupedPipeCollector) ToArray() []*models.PipelineGroupEvents {
len, idx := len(p.groupEvents), 0
results := make([]*models.PipelineGroupEvents, len)
if len == 0 {
return results
}
for group, events := range p.groupEvents {
results[idx] = &models.PipelineGroupEvents{
Group: group,
Events: events,
}
idx++
}
p.groupEvents = make(map[*models.GroupInfo][]models.PipelineEvent)
return results
}

func (p *groupedPipeCollector) Observe() chan *models.PipelineGroupEvents {
return nil
}

func (p *groupedPipeCollector) Close() {
for k := range p.groupEvents {
delete(p.groupEvents, k)
}
}

// noopPipeCollector is an empty collector implementation.
type noopPipeCollector struct {
}

func (p *noopPipeCollector) Prepare(metricRecord *pipeline.MetricsRecord) {

}

func (p *noopPipeCollector) Collect(group *models.GroupInfo, events ...models.PipelineEvent) {
}

func (p *noopPipeCollector) CollectList(groups ...*models.PipelineGroupEvents) {
}

func (p *noopPipeCollector) ToArray() []*models.PipelineGroupEvents {
return nil
}

func (p *noopPipeCollector) Observe() chan *models.PipelineGroupEvents {
return nil
}

func (p *noopPipeCollector) Close() {
}

type defaultPipelineContext struct {
collector pipeline.PipelineCollector
}

func (p *defaultPipelineContext) Collector() pipeline.PipelineCollector {
return p.collector
}

func NewObservePipelineConext(queueSize int) pipeline.PipelineContext {
return newPipelineConext(&observePipeCollector{
groupChan: make(chan *models.PipelineGroupEvents, queueSize),
})
}

func NewGroupedPipelineConext() pipeline.PipelineContext {
return newPipelineConext(&groupedPipeCollector{
groupEvents: make(map[*models.GroupInfo][]models.PipelineEvent),
})
}

func NewNoopPipelineConext() pipeline.PipelineContext {
return newPipelineConext(&noopPipeCollector{})
}

func newPipelineConext(collector pipeline.PipelineCollector) pipeline.PipelineContext {
return &defaultPipelineContext{collector: collector}
}
2 changes: 1 addition & 1 deletion pkg/helper/local_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (p *LocalContext) RegisterMetricRecord(labels map[string]string) *pipeline.
latencyMetric := make([]pipeline.LatencyMetric, 0)

metricRecord := pipeline.MetricsRecord{
Labels: labels,
Labels: labels,
CounterMetrics: counterMetrics,
StringMetrics: stringMetrics,
LatencyMetrics: latencyMetric,
Expand Down
42 changes: 42 additions & 0 deletions pkg/helper/self_metric_imp.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ func (s *StrMetric) Get() string {
return v
}

func (s *StrMetric) GetAndReset() string {
mu.Lock()
v := s.value
s.value = ""
mu.Unlock()
return v
}

type NormalMetric struct {
name string
value int64
Expand All @@ -63,6 +71,10 @@ func (s *NormalMetric) Get() int64 {
return atomic.LoadInt64(&s.value)
}

func (s *NormalMetric) GetAndReset() int64 {
return atomic.SwapInt64(&s.value, 0)
}

func (s *NormalMetric) Name() string {
return s.name
}
Expand Down Expand Up @@ -93,6 +105,23 @@ func (s *AvgMetric) Get() int64 {
return int64(s.GetAvg())
}

func (s *AvgMetric) GetAndReset() int64 {
var avg float64
mu.Lock()
if s.count > 0 {
s.prevAvg, avg = float64(s.value)/float64(s.count), float64(s.value)/float64(s.count)
s.value = 0
s.count = 0
} else {
avg = s.prevAvg
}
s.value = 0
s.count = 0
s.prevAvg = 0.0
mu.Unlock()
return int64(avg)
}

func (s *AvgMetric) GetAvg() float64 {
var avg float64
mu.Lock()
Expand Down Expand Up @@ -154,6 +183,19 @@ func (s *LatMetric) Get() int64 {
return v
}

func (s *LatMetric) GetAndReset() int64 {
mu.Lock()
v := int64(0)
if s.count != 0 {
v = int64(s.latencySum) / int64(s.count)
}
s.count = 0
s.latencySum = 0
s.t = time.Unix(0, 0)
mu.Unlock()
return v
}

func NewCounterMetric(n string) pipeline.CounterMetric {
return &NormalMetric{name: n}
}
Expand Down
Loading

0 comments on commit de51063

Please sign in to comment.