Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimization of the self-monitoring metrics architecture for Go (Golang) plugin modules and unified output of metrics with C++ modules. #1290

Merged
merged 42 commits into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
0daec5e
add metric code
linrunqi08 Dec 27, 2023
9847679
add cpp call go
linrunqi08 Dec 27, 2023
30810be
update call func
linrunqi08 Dec 27, 2023
b8559c3
tmp debug
linrunqi08 Dec 27, 2023
f26e1a8
tmp save
linrunqi08 Dec 28, 2023
e436d75
tmp save
linrunqi08 Dec 28, 2023
7c38780
add flusher
linrunqi08 Dec 28, 2023
33ea6a8
add flusher
linrunqi08 Dec 28, 2023
9617fb7
fix cpp interface
linrunqi08 Dec 29, 2023
1c7da0f
refine code
linrunqi08 Dec 29, 2023
1ad21ef
refine code
linrunqi08 Dec 29, 2023
465b54c
refine code
linrunqi08 Dec 29, 2023
1ec3dd3
fix comments
linrunqi08 Jan 2, 2024
c1d90cf
fix comment
linrunqi08 Jan 2, 2024
d8c2263
fix comment
linrunqi08 Jan 2, 2024
77ffaaa
Merge branch '1.8' of https://github.com/alibaba/ilogtail into featur…
linrunqi08 Jan 2, 2024
de51063
fix comments
linrunqi08 Jan 9, 2024
e19a9bd
fix build
linrunqi08 Jan 9, 2024
09136c2
add node id
linrunqi08 Jan 12, 2024
9ba425d
add child plugin id
linrunqi08 Jan 12, 2024
e104daa
fix metric
linrunqi08 Jan 15, 2024
96e8296
fix unittest
linrunqi08 Jan 15, 2024
2107d4f
fix go lint
linrunqi08 Jan 15, 2024
3b351c8
fix go lint
linrunqi08 Jan 15, 2024
75c0f4f
Merge branch '1.8' of https://github.com/alibaba/ilogtail into featur…
linrunqi08 Jan 15, 2024
9bb08c6
merge 1.8
linrunqi08 Jan 15, 2024
81481bd
fix ut
linrunqi08 Jan 15, 2024
d70ad57
try fix e2e
linrunqi08 Jan 15, 2024
2ebca2f
fix comments
linrunqi08 Jan 15, 2024
b12a775
fix comments
linrunqi08 Jan 15, 2024
20480fe
Merge branch '1.8' of https://github.com/alibaba/ilogtail into featur…
linrunqi08 Jan 16, 2024
d18453e
add comments
linrunqi08 Jan 22, 2024
0a2d671
fix comments
linrunqi08 Jan 22, 2024
192337b
fix comments
linrunqi08 Jan 22, 2024
3d7964b
fix comments
linrunqi08 Jan 22, 2024
2b401c1
fix comments
linrunqi08 Jan 22, 2024
c4ecf69
fix comments
linrunqi08 Jan 22, 2024
a46c523
fix lint
linrunqi08 Jan 22, 2024
d2aa7d3
fix ut
linrunqi08 Jan 22, 2024
e216f7e
fix c++ pluginID
linrunqi08 Jan 22, 2024
81e2942
fix comments
linrunqi08 Jan 23, 2024
fa9eea1
fix ut
linrunqi08 Jan 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 12 additions & 22 deletions core/go_pipeline/LogtailPlugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -388,27 +388,31 @@ bool LogtailPlugin::LoadPluginBase() {
return mPluginValid;
}
}

// 加载全局配置,目前应该没有调用点
mLoadGlobalConfigFun = (LoadGlobalConfigFun)loader.LoadMethod("LoadGlobalConfig", error);
if (!error.empty()) {
LOG_ERROR(sLogger, ("load LoadGlobalConfig error, Message", error));
return mPluginValid;
}
// 加载单个配置,目前应该是Resume的时候,全量加载一次
mLoadConfigFun = (LoadConfigFun)loader.LoadMethod("LoadConfig", error);
if (!error.empty()) {
LOG_ERROR(sLogger, ("load LoadConfig error, Message", error));
return mPluginValid;
}
// 更新配置,目前应该没有调用点
mUnloadConfigFun = (UnloadConfigFun)loader.LoadMethod("UnloadConfig", error);
if (!error.empty()) {
LOG_ERROR(sLogger, ("load UnloadConfig error, Message", error));
return mPluginValid;
}
// 插件暂停
mHoldOnFun = (HoldOnFun)loader.LoadMethod("HoldOn", error);
if (!error.empty()) {
LOG_ERROR(sLogger, ("load HoldOn error, Message", error));
return mPluginValid;
}
// 插件恢复
mResumeFun = (ResumeFun)loader.LoadMethod("Resume", error);
if (!error.empty()) {
LOG_ERROR(sLogger, ("load Resume error, Message", error));
Expand Down Expand Up @@ -538,32 +542,18 @@ void LogtailPlugin::GetPipelineMetrics(std::vector<std::map<std::string, std::st
for (int j = 0; j < innerpm->count; ++j) {
InnerKeyValue* innerkv = innerpm->keyValues[j];
if (innerkv != nullptr) {
item.insert(std::make_pair(std::string(innerkv->key), std::string(innerkv->value)));
item.insert(std::make_pair(std::string(innerkv->key), std::string(innerkv->value)));
free(innerkv->key);
free(innerkv->value);
free(innerkv);
}
}
free(innerpm->keyValues);
free(innerpm);
}
metircsList.emplace_back(item);
}
if (metrics->count > 0) {
for (int i = 0; i < metrics->count; ++i) {
InnerPluginMetric* innerpm = metrics->metrics[i];
if (innerpm != nullptr) {
if (innerpm->count > 0) {
for (int j = 0; j < innerpm->count; ++j) {
InnerKeyValue* innerkv = innerpm->keyValues[j];
if (innerkv != nullptr) {
free(innerkv->key);
free(innerkv->value);
free(innerkv);
}
}
free(innerpm->keyValues);
}
free(innerpm);
}
}
free(metrics->metrics);
}
free(metrics->metrics);
free(metrics);
}
}
Expand Down
6 changes: 4 additions & 2 deletions core/monitor/LogtaiMetric.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,15 +136,17 @@ void WriteMetrics::PreparePluginCommonLabels(const std::string& projectName,
const std::string& configName,
const std::string& pluginName,
const std::string& pluginID,
const std::string& childPluginID,
const std::string& nodeID,
const std::string& childNodeID,
MetricLabels& labels) {
labels.emplace_back(std::make_pair("project", projectName));
labels.emplace_back(std::make_pair("logstore", logstoreName));
labels.emplace_back(std::make_pair("region", region));
labels.emplace_back(std::make_pair("config_name", configName));
labels.emplace_back(std::make_pair("plugin_name", pluginName));
labels.emplace_back(std::make_pair("plugin_id", pluginID));
labels.emplace_back(std::make_pair("child_plugin_id", childPluginID));
labels.emplace_back(std::make_pair("node_id", nodeID));
labels.emplace_back(std::make_pair("child_node_id", childNodeID));
}

void WriteMetrics::PrepareMetricsRecordRef(MetricsRecordRef& ref, MetricLabels&& labels) {
Expand Down
3 changes: 2 additions & 1 deletion core/monitor/LogtailMetric.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ class WriteMetrics {
const std::string& configName,
const std::string& pluginName,
const std::string& pluginID,
const std::string& childPluginID,
const std::string& nodeID,
const std::string& childNodeID,
MetricLabels& labels);
void PrepareMetricsRecordRef(MetricsRecordRef& ref, MetricLabels&& labels);
MetricsRecord* DoSnapshot();
Expand Down
10 changes: 4 additions & 6 deletions core/pipeline/Pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,13 @@ namespace logtail {

void genPluginAndNodeID(int& pluginIndex, bool lastOne, PluginInstance::PluginMeta& pluginMeta) {
pluginIndex ++;
int childPluginID = pluginIndex;
int childNodeID = pluginIndex;
linrunqi08 marked this conversation as resolved.
Show resolved Hide resolved
if (!lastOne) {
childPluginID = pluginIndex + 1;
childNodeID = pluginIndex + 1;
}
pluginMeta.pluginID = std::to_string(pluginIndex);
pluginMeta.childPluginID = std::to_string(childPluginID);
pluginMeta.nodeID = std::to_string(pluginIndex);
pluginMeta.childNodeID = std::to_string(childNodeID);
}

bool Pipeline::Init(const PipelineConfig& config) {
Expand All @@ -55,9 +56,6 @@ bool Pipeline::Init(const PipelineConfig& config) {
int pluginIndex = 0;

PluginInstance::PluginMeta pluginMeta;
// Input plugin
pluginIndex++;

if (config.mLogType == STREAM_LOG || config.mLogType == PLUGIN_LOG) {
return true;
}
Expand Down
2 changes: 1 addition & 1 deletion core/plugin/instance/FlusherInstance.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ namespace logtail {
bool FlusherInstance::Init(const Json::Value& config, PipelineContext& context) {
mPlugin->SetContext(context);
auto meta = Meta();
mPlugin->SetMetricsRecordRef(Name(), meta.pluginID, meta.childPluginID);
mPlugin->SetMetricsRecordRef(Name(), meta.pluginID, meta.nodeID, meta.childNodeID);
if (mPlugin->Init(config)) {
return false;
}
Expand Down
2 changes: 1 addition & 1 deletion core/plugin/instance/InputInstance.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ namespace logtail {
bool InputInstance::Init(const Json::Value& config, PipelineContext& context) {
mPlugin->SetContext(context);
auto meta = Meta();
mPlugin->SetMetricsRecordRef(Name(), meta.pluginID, meta.childPluginID);
mPlugin->SetMetricsRecordRef(Name(), meta.pluginID, meta.nodeID, meta.childNodeID);
if (mPlugin->Init(config)) {
return false;
}
Expand Down
3 changes: 2 additions & 1 deletion core/plugin/instance/PluginInstance.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ class PluginInstance {
public:
struct PluginMeta {
std::string pluginID;
std::string childPluginID;
std::string nodeID;
std::string childNodeID;
};
PluginInstance(const PluginMeta& pluginMeta) : mMeta(pluginMeta) {}
virtual ~PluginInstance() = default;
Expand Down
2 changes: 1 addition & 1 deletion core/plugin/instance/ProcessorInstance.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ namespace logtail {
bool ProcessorInstance::Init(const ComponentConfig& config, PipelineContext& context) {
mPlugin->SetContext(context);
auto meta = Meta();
mPlugin->SetMetricsRecordRef(Name(), meta.pluginID, meta.childPluginID);
mPlugin->SetMetricsRecordRef(Name(), meta.pluginID, meta.nodeID, meta.childNodeID);
bool inited = mPlugin->Init(config);
if (!inited) {
return inited;
Expand Down
5 changes: 3 additions & 2 deletions core/plugin/interface/Plugin.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,16 @@ class Plugin {
PipelineContext& GetContext() { return *mContext; }
void SetContext(PipelineContext& context) { mContext = &context; }
MetricsRecordRef& GetMetricsRecordRef() { return mMetricsRecordRef; }
void SetMetricsRecordRef(const std::string& name, const std::string& id, const std::string& childPluginID) {
void SetMetricsRecordRef(const std::string& name, const std::string& id, const std::string& nodeID, const std::string& childNodeID) {
std::vector<std::pair<std::string, std::string>> labels;
WriteMetrics::GetInstance()->PreparePluginCommonLabels(GetContext().GetProjectName(),
GetContext().GetLogstoreName(),
GetContext().GetRegion(),
GetContext().GetConfigName(),
name,
id,
childPluginID,
nodeID,
childNodeID,
labels);

WriteMetrics::GetInstance()->PrepareMetricsRecordRef(mMetricsRecordRef, std::move(labels));
Expand Down
5 changes: 3 additions & 2 deletions core/unittest/plugin/PluginRegistryUnittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,11 @@ void PluginRegistryUnittest::TestLoadStaticPlugins() {
void PluginRegistryUnittest::TestCreateProcessor() {
PluginRegistry::GetInstance()->LoadStaticPlugins();
auto processorParseRegexNative = PluginRegistry::GetInstance()->CreateProcessor(
ProcessorParseRegexNative::sName, {"0", "1"});
ProcessorParseRegexNative::sName, {"0", "0", "1"});
APSARA_TEST_NOT_EQUAL_FATAL(nullptr, processorParseRegexNative.get());
APSARA_TEST_EQUAL_FATAL("0", processorParseRegexNative->Meta().pluginID);
APSARA_TEST_EQUAL_FATAL("1", processorParseRegexNative->Meta().childPluginID);
APSARA_TEST_EQUAL_FATAL("0", processorParseRegexNative->Meta().nodeID);
APSARA_TEST_EQUAL_FATAL("1", processorParseRegexNative->Meta().childNodeID);
}

} // namespace logtail
Expand Down
5 changes: 3 additions & 2 deletions core/unittest/plugin/StaticProcessorCreatorUnittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,11 @@ void StaticProcessorCreatorUnittest::TestIsDynamic() {

void StaticProcessorCreatorUnittest::TestCreate() {
StaticProcessorCreator<ProcessorMock> creator;
auto processorMock = creator.Create({"0", "1"});
auto processorMock = creator.Create({"0", "0", "1"});
APSARA_TEST_NOT_EQUAL_FATAL(nullptr, processorMock.get());
APSARA_TEST_EQUAL_FATAL("0", processorMock->Meta().pluginID);
APSARA_TEST_EQUAL_FATAL("1", processorMock->Meta().childPluginID);
APSARA_TEST_EQUAL_FATAL("0", processorMock->Meta().nodeID);
APSARA_TEST_EQUAL_FATAL("1", processorMock->Meta().childNodeID);
}

} // namespace logtail
Expand Down
16 changes: 11 additions & 5 deletions pkg/helper/local_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (p *LocalContext) GetExtension(name string, cfg any) (pipeline.Extension, e
return nil, nil
}

func (p *LocalContext) RegisterMetricRecord(labels map[string]string) *pipeline.MetricsRecord {
func (p *LocalContext) RegisterMetricRecord(labels []pipeline.LabelPair) *pipeline.MetricsRecord {
contextMutex.Lock()
defer contextMutex.Unlock()

Expand All @@ -89,7 +89,7 @@ func (p *LocalContext) RegisterMetricRecord(labels map[string]string) *pipeline.
return &metricRecord
}

func (p *LocalContext) RegisterLogstoreConfigMetricRecord(labels map[string]string) *pipeline.MetricsRecord {
func (p *LocalContext) RegisterLogstoreConfigMetricRecord(labels []pipeline.LabelPair) *pipeline.MetricsRecord {
counterMetrics := make([]pipeline.CounterMetric, 0)
stringMetrics := make([]pipeline.StringMetric, 0)
latencyMetric := make([]pipeline.LatencyMetric, 0)
Expand All @@ -114,11 +114,17 @@ func (p *LocalContext) ExportMetricRecords() (results []map[string]string) {
results = make([]map[string]string, 0)
for _, metricRecord := range p.MetricsRecords {
oneResult := make(map[string]string)
for key, value := range metricRecord.Labels {
oneResult[key] = value
for _, label := range metricRecord.Labels {
oneResult["label."+label.Key] = label.Value
}
for _, counterMetric := range metricRecord.CounterMetrics {
oneResult[counterMetric.Name()] = strconv.FormatInt(counterMetric.Get(), 10)
oneResult["value."+counterMetric.Name()] = strconv.FormatInt(counterMetric.GetAndReset(), 10)
}
for _, stringMetric := range metricRecord.StringMetrics {
oneResult["value."+stringMetric.Name()] = stringMetric.GetAndReset()
}
for _, latencyMetric := range metricRecord.LatencyMetrics {
oneResult["value."+latencyMetric.Name()] = strconv.FormatInt(latencyMetric.GetAndReset(), 10)
}
results = append(results, oneResult)
}
Expand Down
38 changes: 23 additions & 15 deletions pkg/pipeline/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,13 @@ type CommonContext struct {
ConfigName string
}

type LabelPair struct {
Key string
Value string
}

type MetricsRecord struct {
Labels map[string]string
Labels []LabelPair

CounterMetrics []CounterMetric
StringMetrics []StringMetric
Expand All @@ -45,21 +50,24 @@ func (m *MetricsRecord) RegisterLatencyMetric(metric LatencyMetric) {
m.LatencyMetrics = append(m.LatencyMetrics, metric)
}

func GetCommonLabels(context Context, pluginName string, pluginID string, childPluginID string) map[string]string {
labels := make(map[string]string)
labels["project"] = context.GetProject()
labels["logstore"] = context.GetLogstore()
labels["config_name"] = context.GetConfigName()
if len(pluginID) > 0 {
labels["plugin_id"] = pluginID
func GetCommonLabels(context Context, pluginMeta *PluginMeta) []LabelPair {
labels := make([]LabelPair, 0)
labels = append(labels, LabelPair{Key: "project", Value: context.GetProject()})
labels = append(labels, LabelPair{Key: "logstore", Value: context.GetLogstore()})
labels = append(labels, LabelPair{Key: "config_name", Value: context.GetConfigName()})

if len(pluginMeta.PluginID) > 0 {
labels = append(labels, LabelPair{Key: "plugin_id", Value: pluginMeta.PluginID})
}
if len(childPluginID) > 0 {
labels["child_plugin_id"] = childPluginID
if len(pluginMeta.NodeID) > 0 {
labels = append(labels, LabelPair{Key: "node_id", Value: pluginMeta.NodeID})
}
if len(pluginName) > 0 {
labels["plugin_name"] = pluginName
if len(pluginMeta.ChildNodeID) > 0 {
labels = append(labels, LabelPair{Key: "child_node_id", Value: pluginMeta.ChildNodeID})
}
if len(pluginMeta.PluginType) > 0 {
labels = append(labels, LabelPair{Key: "plugin_name", Value: pluginMeta.PluginType})
}

return labels
}

Expand All @@ -74,9 +82,9 @@ type Context interface {
GetExtension(name string, cfg any) (Extension, error)

ExportMetricRecords() []map[string]string
RegisterMetricRecord(labels map[string]string) *MetricsRecord
RegisterMetricRecord(labels []LabelPair) *MetricsRecord

RegisterLogstoreConfigMetricRecord(labels map[string]string) *MetricsRecord
RegisterLogstoreConfigMetricRecord(labels []LabelPair) *MetricsRecord
GetLogstoreConfigMetricRecord() *MetricsRecord

GetMetricRecord() *MetricsRecord
Expand Down
8 changes: 8 additions & 0 deletions pkg/pipeline/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@ type PluginContext struct {
MetricRecord *MetricsRecord
}

type PluginMeta struct {
PluginID string
NodeID string
ChildNodeID string
PluginType string
PluginTypeWithID string
}

type MetricCreator func() MetricInput

var MetricInputs = map[string]MetricCreator{}
Expand Down
6 changes: 3 additions & 3 deletions plugin_main/plugin_export.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
/*
#include <stdlib.h>
static char**makeCharArray(int size) {
return malloc(sizeof(char*)* size);
return malloc(sizeof(char*) * size);
}

static void setArrayString(char **a, char *s, int n) {
Expand Down Expand Up @@ -74,15 +74,15 @@ typedef struct {
} PluginMetrics;

static KeyValue** makeKeyValueArray(int size) {
return malloc(sizeof(KeyValue*)* size);
return malloc(sizeof(KeyValue*) * size);
}

static void setArrayKeyValue(KeyValue **a, KeyValue *s, int n) {
a[n] = s;
}

static PluginMetric** makePluginMetricArray(int size) {
return malloc(sizeof(KeyValue*)* size);
return malloc(sizeof(KeyValue*) * size);
}

static void setArrayPluginMetric(PluginMetric **a, PluginMetric *s, int n) {
Expand Down
4 changes: 2 additions & 2 deletions pluginmanager/aggregator_wrapper_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ type AggregatorWrapperV1 struct {
procTimeMS pipeline.CounterMetric
}

func (p *AggregatorWrapperV1) Init(name string, pluginID string, childPluginID string) error {
labels := pipeline.GetCommonLabels(p.Config.Context, name, pluginID, childPluginID)
func (p *AggregatorWrapperV1) Init(pluginMeta *pipeline.PluginMeta) error {
labels := pipeline.GetCommonLabels(p.Config.Context, pluginMeta)
p.MetricRecord = p.Config.Context.RegisterMetricRecord(labels)

p.procInRecordsTotal = helper.NewCounterMetric("proc_in_records_total")
Expand Down
4 changes: 2 additions & 2 deletions pluginmanager/aggregator_wrapper_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ type AggregatorWrapperV2 struct {
procTimeMS pipeline.CounterMetric
}

func (p *AggregatorWrapperV2) Init(name string, pluginID string, childPluginID string) error {
labels := pipeline.GetCommonLabels(p.Config.Context, name, pluginID, childPluginID)
func (p *AggregatorWrapperV2) Init(pluginMeta *pipeline.PluginMeta) error {
labels := pipeline.GetCommonLabels(p.Config.Context, pluginMeta)
p.MetricRecord = p.Config.Context.RegisterMetricRecord(labels)

p.procInRecordsTotal = helper.NewCounterMetric("proc_in_records_total")
Expand Down
Loading
Loading