diff --git a/core/monitor/MetricManager.cpp b/core/monitor/MetricManager.cpp index b2cc42761c..44d61e59b7 100644 --- a/core/monitor/MetricManager.cpp +++ b/core/monitor/MetricManager.cpp @@ -178,11 +178,14 @@ bool SelfMonitorMetricEvent::ShouldDelete() { void SelfMonitorMetricEvent::ReadAsMetricEvent(MetricEvent* metricEventPtr) { // time auto now = GetCurrentLogtailTime(); - metricEventPtr->SetTimestamp(AppConfig::GetInstance()->EnableLogTimeAutoAdjust() ? now.tv_sec + GetTimeDelta() : now.tv_sec); + metricEventPtr->SetTimestamp(AppConfig::GetInstance()->EnableLogTimeAutoAdjust() ? now.tv_sec + GetTimeDelta() + : now.tv_sec); // __tag__ for (auto label = mLabels.begin(); label != mLabels.end(); label++) { metricEventPtr->SetTag(label->first, label->second); } + // name + metricEventPtr->SetName(mCategory); // values metricEventPtr->SetValue({}); for (auto counter = mCounters.begin(); counter != mCounters.end(); counter++) { diff --git a/core/pipeline/plugin/PluginRegistry.cpp b/core/pipeline/plugin/PluginRegistry.cpp index 7849d2d1fa..4b0f5c1c27 100644 --- a/core/pipeline/plugin/PluginRegistry.cpp +++ b/core/pipeline/plugin/PluginRegistry.cpp @@ -26,6 +26,7 @@ #include "app_config/AppConfig.h" #include "common/Flags.h" #include "plugin/flusher/blackhole/FlusherBlackHole.h" +#include "plugin/flusher/local_file/FlusherLocalFile.h" #include "plugin/flusher/sls/FlusherSLS.h" #include "plugin/input/InputContainerStdio.h" #include "plugin/input/InputFile.h" @@ -50,9 +51,9 @@ #include "plugin/processor/ProcessorParseJsonNative.h" #include "plugin/processor/ProcessorParseRegexNative.h" #include "plugin/processor/ProcessorParseTimestampNative.h" -#include "plugin/processor/inner/ProcessorPromParseMetricNative.h" #include "plugin/processor/inner/ProcessorMergeMultilineLogNative.h" #include "plugin/processor/inner/ProcessorParseContainerLogNative.h" +#include "plugin/processor/inner/ProcessorPromParseMetricNative.h" #include "plugin/processor/inner/ProcessorPromRelabelMetricNative.h" #include "plugin/processor/inner/ProcessorSplitLogStringNative.h" #include "plugin/processor/inner/ProcessorSplitMultilineLogStringNative.h" @@ -158,6 +159,7 @@ void PluginRegistry::LoadStaticPlugins() { RegisterFlusherCreator(new StaticFlusherCreator()); RegisterFlusherCreator(new StaticFlusherCreator()); + RegisterFlusherCreator(new StaticFlusherCreator()); } void PluginRegistry::LoadDynamicPlugins(const set& plugins) { @@ -222,7 +224,8 @@ void PluginRegistry::RegisterCreator(PluginCat cat, PluginCreator* creator) { mPluginDict.emplace(PluginKey(cat, creator->Name()), shared_ptr(creator)); } -unique_ptr PluginRegistry::Create(PluginCat cat, const string& name, const PluginInstance::PluginMeta& pluginMeta) { +unique_ptr +PluginRegistry::Create(PluginCat cat, const string& name, const PluginInstance::PluginMeta& pluginMeta) { unique_ptr ins; auto creatorEntry = mPluginDict.find(PluginKey(cat, name)); if (creatorEntry != mPluginDict.end()) { diff --git a/core/pipeline/serializer/JsonSerializer.cpp b/core/pipeline/serializer/JsonSerializer.cpp new file mode 100644 index 0000000000..2cb7daa8aa --- /dev/null +++ b/core/pipeline/serializer/JsonSerializer.cpp @@ -0,0 +1,162 @@ +// Copyright 2024 iLogtail Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "pipeline/serializer/JsonSerializer.h" + +#include "constants/SpanConstants.h" +#include "protobuf/sls/LogGroupSerializer.h" + +using namespace std; + +namespace logtail { + +const string JSON_KEY_TIME = "__time__"; +const string JSON_KEY_TIME_NANO = "__time_nano__"; +const string JSON_KEY_TAGS = "tags"; +const string JSON_KEY_CONTENTS = "contents"; + +bool JsonEventGroupSerializer::Serialize(BatchedEvents&& group, string& res, string& errorMsg) { + if (group.mEvents.empty()) { + errorMsg = "empty event group"; + return false; + } + + PipelineEvent::Type eventType = group.mEvents[0]->GetType(); + if (eventType == PipelineEvent::Type::NONE) { + // should not happen + errorMsg = "unsupported event type in event group"; + return false; + } + + Json::Value groupTags; + for (const auto& tag : group.mTags.mInner) { + groupTags[tag.first.to_string()] = tag.second.to_string(); + } + + std::ostringstream oss; + switch (eventType) { + case PipelineEvent::Type::LOG: + for (size_t i = 0; i < group.mEvents.size(); ++i) { + const auto& e = group.mEvents[i].Cast(); + Json::Value eventJson; + // time + eventJson[JSON_KEY_TIME] = e.GetTimestamp(); + // tags + eventJson[JSON_KEY_TAGS] = Json::Value(); + eventJson[JSON_KEY_TAGS].copy(groupTags); + // contents + eventJson[JSON_KEY_CONTENTS] = Json::Value(); + for (const auto& kv : e) { + eventJson[JSON_KEY_CONTENTS][kv.first.to_string()] = kv.second.to_string(); + } + oss << JsonToString(eventJson) << "\n"; + } + break; + case PipelineEvent::Type::METRIC: + for (size_t i = 0; i < group.mEvents.size(); ++i) { + const auto& e = group.mEvents[i].Cast(); + if (e.Is()) { + continue; + } + Json::Value eventJson; + // time + eventJson[JSON_KEY_TIME] = e.GetTimestamp(); + // tags + eventJson[JSON_KEY_TAGS] = Json::Value(); + eventJson[JSON_KEY_TAGS].copy(groupTags); + // contents + eventJson[JSON_KEY_CONTENTS] = Json::Value(); + // __labels__ + eventJson[JSON_KEY_CONTENTS][METRIC_RESERVED_KEY_LABELS] = Json::Value(); + for (auto tag = e.TagsBegin(); tag != e.TagsEnd(); tag++) { + eventJson[JSON_KEY_CONTENTS][METRIC_RESERVED_KEY_LABELS][tag->first.to_string()] + = tag->second.to_string(); + } + // __name__ + eventJson[JSON_KEY_CONTENTS][METRIC_RESERVED_KEY_NAME] = e.GetName().to_string(); + // __value__ + if (e.Is()) { + eventJson[JSON_KEY_CONTENTS][METRIC_RESERVED_KEY_VALUE] = e.GetValue()->mValue; + } else if (e.Is()) { + eventJson[JSON_KEY_CONTENTS][METRIC_RESERVED_KEY_VALUE] = Json::Value(); + for (auto value = e.GetValue()->ValusBegin(); + value != e.GetValue()->ValusEnd(); + value++) { + eventJson[JSON_KEY_CONTENTS][METRIC_RESERVED_KEY_VALUE][value->first.to_string()] + = value->second; + } + } + oss << JsonToString(eventJson) << "\n"; + } + break; + case PipelineEvent::Type::SPAN: + for (size_t i = 0; i < group.mEvents.size(); ++i) { + const auto& e = group.mEvents[i].Cast(); + Json::Value eventJson; + // time + eventJson[JSON_KEY_TIME] = e.GetTimestamp(); + // tags + eventJson[JSON_KEY_TAGS] = Json::Value(); + eventJson[JSON_KEY_TAGS].copy(groupTags); + // contents + eventJson[JSON_KEY_CONTENTS] = Json::Value(); + // set trace_id span_id span_kind status etc + eventJson[JSON_KEY_CONTENTS][DEFAULT_TRACE_TAG_TRACE_ID] = e.GetTraceId().to_string(); + eventJson[JSON_KEY_CONTENTS][DEFAULT_TRACE_TAG_SPAN_ID] = e.GetSpanId().to_string(); + eventJson[JSON_KEY_CONTENTS][DEFAULT_TRACE_TAG_PARENT_ID] = e.GetParentSpanId().to_string(); + // span_name + eventJson[JSON_KEY_CONTENTS][DEFAULT_TRACE_TAG_SPAN_NAME] = e.GetName().to_string(); + // span_kind + eventJson[JSON_KEY_CONTENTS][DEFAULT_TRACE_TAG_SPAN_KIND] = GetKindString(e.GetKind()); + // status_code + eventJson[JSON_KEY_CONTENTS][DEFAULT_TRACE_TAG_STATUS_CODE] = GetStatusString(e.GetStatus()); + // trace state + eventJson[JSON_KEY_CONTENTS][DEFAULT_TRACE_TAG_TRACE_STATE] = e.GetTraceState().to_string(); + eventJson[JSON_KEY_CONTENTS][DEFAULT_TRACE_TAG_ATTRIBUTES] = SerializeSpanTagsToString(e); + eventJson[JSON_KEY_CONTENTS][DEFAULT_TRACE_TAG_LINKS] = SerializeSpanLinksToString(e); + eventJson[JSON_KEY_CONTENTS][DEFAULT_TRACE_TAG_EVENTS] = SerializeSpanEventsToString(e); + // start_time + eventJson[JSON_KEY_CONTENTS][DEFAULT_TRACE_TAG_START_TIME_NANO] = std::to_string(e.GetStartTimeNs()); + // end_time + eventJson[JSON_KEY_CONTENTS][DEFAULT_TRACE_TAG_END_TIME_NANO] = std::to_string(e.GetEndTimeNs()); + // duration + eventJson[JSON_KEY_CONTENTS][DEFAULT_TRACE_TAG_DURATION] + = std::to_string(e.GetEndTimeNs() - e.GetStartTimeNs()); + + oss << JsonToString(eventJson) << "\n"; + } + break; + case PipelineEvent::Type::RAW: + for (size_t i = 0; i < group.mEvents.size(); ++i) { + const auto& e = group.mEvents[i].Cast(); + Json::Value eventJson; + // time + eventJson[JSON_KEY_TIME] = e.GetTimestamp(); + // tags + eventJson[JSON_KEY_TAGS] = Json::Value(); + eventJson[JSON_KEY_TAGS].copy(groupTags); + // contents + eventJson[JSON_KEY_CONTENTS] = Json::Value(); + eventJson[JSON_KEY_CONTENTS][DEFAULT_CONTENT_KEY] = e.GetContent().to_string(); + oss << JsonToString(eventJson) << "\n"; + } + break; + default: + break; + } + res = oss.str(); + return true; +} + +} // namespace logtail \ No newline at end of file diff --git a/core/pipeline/serializer/JsonSerializer.h b/core/pipeline/serializer/JsonSerializer.h new file mode 100644 index 0000000000..7576af70fc --- /dev/null +++ b/core/pipeline/serializer/JsonSerializer.h @@ -0,0 +1,34 @@ +/* + * Copyright 2024 iLogtail Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include "pipeline/serializer/Serializer.h" + +namespace logtail { + +class JsonEventGroupSerializer : public Serializer { +public: + JsonEventGroupSerializer(Flusher* f) : Serializer(f) {} + +private: + bool Serialize(BatchedEvents&& p, std::string& res, std::string& errorMsg) override; +}; + +} // namespace logtail diff --git a/core/pipeline/serializer/SLSSerializer.cpp b/core/pipeline/serializer/SLSSerializer.cpp index 6b9ec3888d..325d5d8d20 100644 --- a/core/pipeline/serializer/SLSSerializer.cpp +++ b/core/pipeline/serializer/SLSSerializer.cpp @@ -14,13 +14,15 @@ #include "pipeline/serializer/SLSSerializer.h" +#include + +#include + #include "common/Flags.h" -#include "constants/SpanConstants.h" #include "common/compression/CompressType.h" +#include "constants/SpanConstants.h" #include "plugin/flusher/sls/FlusherSLS.h" #include "protobuf/sls/LogGroupSerializer.h" -#include -#include DECLARE_FLAG_INT32(max_send_log_group_size); @@ -28,29 +30,6 @@ using namespace std; namespace logtail { -std::string SerializeSpanLinksToString(const SpanEvent& event) { - if (event.GetLinks().empty()) { - return ""; - } - Json::Value jsonLinks(Json::arrayValue); - for (const auto& link : event.GetLinks()) { - jsonLinks.append(link.ToJson()); - } - Json::StreamWriterBuilder writer; - return Json::writeString(writer, jsonLinks); -} -std::string SerializeSpanEventsToString(const SpanEvent& event) { - if (event.GetEvents().empty()) { - return ""; - } - Json::Value jsonEvents(Json::arrayValue); - for (const auto& event : event.GetEvents()) { - jsonEvents.append(event.ToJson()); - } - Json::StreamWriterBuilder writer; - return Json::writeString(writer, jsonEvents); -} - template <> bool Serializer>::DoSerialize(vector&& p, std::string& output, @@ -97,7 +76,7 @@ bool SLSEventGroupSerializer::Serialize(BatchedEvents&& group, string& res, stri vector> spanEventContentCache(group.mEvents.size()); size_t logGroupSZ = 0; switch (eventType) { - case PipelineEvent::Type::LOG:{ + case PipelineEvent::Type::LOG: { for (size_t i = 0; i < group.mEvents.size(); ++i) { const auto& e = group.mEvents[i].Cast(); if (e.Empty()) { @@ -111,11 +90,19 @@ bool SLSEventGroupSerializer::Serialize(BatchedEvents&& group, string& res, stri } break; } - case PipelineEvent::Type::METRIC:{ + case PipelineEvent::Type::METRIC: { for (size_t i = 0; i < group.mEvents.size(); ++i) { const auto& e = group.mEvents[i].Cast(); if (e.Is()) { metricEventContentCache[i].first = to_string(e.GetValue()->mValue); + } else if (e.Is()) { + Json::Value metricValues; + for (auto value = e.GetValue()->ValusBegin(); + value != e.GetValue()->ValusEnd(); + value++) { + metricValues[value->first.to_string()] = value->second; + } + metricEventContentCache[i].first = JsonToString(metricValues); } else { // should not happen LOG_ERROR(sLogger, @@ -145,22 +132,14 @@ bool SLSEventGroupSerializer::Serialize(BatchedEvents&& group, string& res, stri contentSZ += GetLogContentSize(DEFAULT_TRACE_TAG_PARENT_ID.size(), e.GetParentSpanId().size()); contentSZ += GetLogContentSize(DEFAULT_TRACE_TAG_SPAN_NAME.size(), e.GetName().size()); contentSZ += GetLogContentSize(DEFAULT_TRACE_TAG_SPAN_KIND.size(), GetKindString(e.GetKind()).size()); - contentSZ += GetLogContentSize(DEFAULT_TRACE_TAG_STATUS_CODE.size(), GetStatusString(e.GetStatus()).size()); + contentSZ + += GetLogContentSize(DEFAULT_TRACE_TAG_STATUS_CODE.size(), GetStatusString(e.GetStatus()).size()); contentSZ += GetLogContentSize(DEFAULT_TRACE_TAG_TRACE_STATE.size(), e.GetTraceState().size()); - // + // set tags and scope tags - Json::Value jsonVal; - for (auto it = e.TagsBegin(); it != e.TagsEnd(); ++it) { - jsonVal[it->first.to_string()] = it->second.to_string(); - } - for (auto it = e.ScopeTagsBegin(); it != e.ScopeTagsEnd(); ++it) { - jsonVal[it->first.to_string()] = it->second.to_string(); - } - Json::StreamWriterBuilder writer; - std::string attrString = Json::writeString(writer, jsonVal); + auto attrString = SerializeSpanTagsToString(e); contentSZ += GetLogContentSize(DEFAULT_TRACE_TAG_ATTRIBUTES.size(), attrString.size()); spanEventContentCache[i][0] = std::move(attrString); - auto linkString = SerializeSpanLinksToString(e); contentSZ += GetLogContentSize(DEFAULT_TRACE_TAG_LINKS.size(), linkString.size()); spanEventContentCache[i][1] = std::move(linkString); @@ -265,7 +244,7 @@ bool SLSEventGroupSerializer::Serialize(BatchedEvents&& group, string& res, stri serializer.AddLogContent(DEFAULT_TRACE_TAG_TRACE_STATE, spanEvent.GetTraceState()); serializer.AddLogContent(DEFAULT_TRACE_TAG_ATTRIBUTES, spanEventContentCache[i][0]); - + serializer.AddLogContent(DEFAULT_TRACE_TAG_LINKS, spanEventContentCache[i][1]); serializer.AddLogContent(DEFAULT_TRACE_TAG_EVENTS, spanEventContentCache[i][2]); @@ -275,7 +254,6 @@ bool SLSEventGroupSerializer::Serialize(BatchedEvents&& group, string& res, stri serializer.AddLogContent(DEFAULT_TRACE_TAG_END_TIME_NANO, spanEventContentCache[i][4]); // duration serializer.AddLogContent(DEFAULT_TRACE_TAG_DURATION, spanEventContentCache[i][5]); - } break; case PipelineEvent::Type::RAW: diff --git a/core/pipeline/serializer/Serializer.cpp b/core/pipeline/serializer/Serializer.cpp new file mode 100644 index 0000000000..07e95c387a --- /dev/null +++ b/core/pipeline/serializer/Serializer.cpp @@ -0,0 +1,59 @@ +/* + * Copyright 2024 iLogtail Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "Serializer.h" + +namespace logtail { + +std::string JsonToString(Json::Value& value) { + Json::StreamWriterBuilder writer; + return Json::writeString(writer, value); +} + +std::string SerializeSpanTagsToString(const SpanEvent& event) { + Json::Value jsonVal; + for (auto it = event.TagsBegin(); it != event.TagsEnd(); ++it) { + jsonVal[it->first.to_string()] = it->second.to_string(); + } + for (auto it = event.ScopeTagsBegin(); it != event.ScopeTagsEnd(); ++it) { + jsonVal[it->first.to_string()] = it->second.to_string(); + } + return JsonToString(jsonVal); +} + +std::string SerializeSpanLinksToString(const SpanEvent& event) { + if (event.GetLinks().empty()) { + return ""; + } + Json::Value jsonLinks(Json::arrayValue); + for (const auto& link : event.GetLinks()) { + jsonLinks.append(link.ToJson()); + } + return JsonToString(jsonLinks); +} + +std::string SerializeSpanEventsToString(const SpanEvent& event) { + if (event.GetEvents().empty()) { + return ""; + } + Json::Value jsonEvents(Json::arrayValue); + for (const auto& event : event.GetEvents()) { + jsonEvents.append(event.ToJson()); + } + return JsonToString(jsonEvents); +} + +} // namespace logtail \ No newline at end of file diff --git a/core/pipeline/serializer/Serializer.h b/core/pipeline/serializer/Serializer.h index 27cc2e0847..3f7b0ae8a3 100644 --- a/core/pipeline/serializer/Serializer.h +++ b/core/pipeline/serializer/Serializer.h @@ -108,4 +108,9 @@ using EventSerializer = Serializer; using EventGroupSerializer = Serializer; using EventGroupListSerializer = Serializer; +std::string JsonToString(Json::Value& value); +std::string SerializeSpanTagsToString(const SpanEvent& event); +std::string SerializeSpanLinksToString(const SpanEvent& event); +std::string SerializeSpanEventsToString(const SpanEvent& event); + } // namespace logtail diff --git a/core/plugin/flusher/local_file/FlusherLocalFile.cpp b/core/plugin/flusher/local_file/FlusherLocalFile.cpp new file mode 100644 index 0000000000..5b84cced81 --- /dev/null +++ b/core/plugin/flusher/local_file/FlusherLocalFile.cpp @@ -0,0 +1,117 @@ +// Copyright 2024 iLogtail Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "plugin/flusher/local_file/FlusherLocalFile.h" + +#include +#include +#include + +#include "pipeline/queue/SenderQueueManager.h" + +using namespace std; + +namespace logtail { + +const string FlusherLocalFile::sName = "flusher_local_file"; + +bool FlusherLocalFile::Init(const Json::Value& config, Json::Value& optionalGoPipeline) { + string errorMsg; + // FileName + if (!GetMandatoryStringParam(config, "FileName", mFileName, errorMsg)) { + PARAM_ERROR_RETURN(mContext->GetLogger(), + mContext->GetAlarm(), + errorMsg, + sName, + mContext->GetConfigName(), + mContext->GetProjectName(), + mContext->GetLogstoreName(), + mContext->GetRegion()); + } + // MaxFileSize + GetMandatoryUIntParam(config, "MaxFileSize", mMaxFileSize, errorMsg); + // MaxFiles + GetMandatoryUIntParam(config, "MaxFiles", mMaxFileSize, errorMsg); + + // create file writer + auto file_sink = std::make_shared(mFileName, mMaxFileSize, mMaxFiles, true); + mFileWriter = std::make_shared( + sName, file_sink, spdlog::thread_pool(), spdlog::async_overflow_policy::block); + mFileWriter->set_pattern("[%Y-%m-%d %H:%M:%S.%f] %v"); + + mGroupSerializer = make_unique(this); + mSendCnt = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_OUT_EVENT_GROUPS_TOTAL); + return true; +} + +bool FlusherLocalFile::Send(PipelineEventGroup&& g) { + if (g.IsReplay()) { + return SerializeAndPush(std::move(g)); + } else { + vector res; + mBatcher.Add(std::move(g), res); + return SerializeAndPush(std::move(res)); + } +} + +bool FlusherLocalFile::Flush(size_t key) { + BatchedEventsList res; + mBatcher.FlushQueue(key, res); + return SerializeAndPush(std::move(res)); +} + +bool FlusherLocalFile::FlushAll() { + vector res; + mBatcher.FlushAll(res); + return SerializeAndPush(std::move(res)); +} + +bool FlusherLocalFile::SerializeAndPush(PipelineEventGroup&& group) { + string serializedData, errorMsg; + BatchedEvents g(std::move(group.MutableEvents()), + std::move(group.GetSizedTags()), + std::move(group.GetSourceBuffer()), + group.GetMetadata(EventGroupMetaKey::SOURCE_ID), + std::move(group.GetExactlyOnceCheckpoint())); + mGroupSerializer->DoSerialize(move(g), serializedData, errorMsg); + if (errorMsg.empty()) { + mFileWriter->info(serializedData); + } else { + LOG_ERROR(sLogger, ("serialize pipeline event group error", errorMsg)); + } + return true; +} + +bool FlusherLocalFile::SerializeAndPush(BatchedEventsList&& groupList) { + string serializedData; + for (auto& group : groupList) { + string errorMsg; + mGroupSerializer->DoSerialize(move(group), serializedData, errorMsg); + if (errorMsg.empty()) { + mFileWriter->info(serializedData); + } else { + LOG_ERROR(sLogger, ("serialize pipeline event group error", errorMsg)); + } + } + return true; +} + +bool FlusherLocalFile::SerializeAndPush(vector&& groupLists) { + for (auto& groupList : groupLists) { + SerializeAndPush(std::move(groupList)); + } + return true; +} + +} // namespace logtail \ No newline at end of file diff --git a/core/plugin/flusher/local_file/FlusherLocalFile.h b/core/plugin/flusher/local_file/FlusherLocalFile.h new file mode 100644 index 0000000000..5cfd0d0283 --- /dev/null +++ b/core/plugin/flusher/local_file/FlusherLocalFile.h @@ -0,0 +1,54 @@ +/* + * Copyright 2024 iLogtail Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include + +#include "pipeline/batch/Batcher.h" +#include "pipeline/plugin/interface/Flusher.h" +#include "pipeline/serializer/JsonSerializer.h" + +namespace logtail { + +class FlusherLocalFile : public Flusher { +public: + static const std::string sName; + + const std::string& Name() const override { return sName; } + bool Init(const Json::Value& config, Json::Value& optionalGoPipeline) override; + bool Send(PipelineEventGroup&& g) override; + bool Flush(size_t key) override; + bool FlushAll() override; + +private: + bool SerializeAndPush(PipelineEventGroup&& group); + bool SerializeAndPush(BatchedEventsList&& groupList); + bool SerializeAndPush(std::vector&& groupLists); + + std::shared_ptr mFileWriter; + std::string mFileName; + uint32_t mMaxFileSize = 1024 * 1024 * 10; + uint32_t mMaxFiles = 10; + Batcher mBatcher; + std::unique_ptr mGroupSerializer; + + CounterPtr mSendCnt; +}; + +} // namespace logtail diff --git a/core/plugin/input/InputSelfMonitorMetric.cpp b/core/plugin/input/InputSelfMonitorMetric.cpp index b6095ceaa4..4939e093c8 100644 --- a/core/plugin/input/InputSelfMonitorMetric.cpp +++ b/core/plugin/input/InputSelfMonitorMetric.cpp @@ -20,19 +20,19 @@ namespace logtail { const std::string InputSelfMonitorMetric::sName = "input_self_monitor_metric"; -bool GetEnabled(Json::Value& rule) { +bool GetEnabled(const Json::Value& rule) { if (rule.isMember("Enable") && rule["Enable"].isBool()) return rule["Enable"].asBool(); return true; } -int GetInterval(Json::Value& rule) { +int GetInterval(const Json::Value& rule) { if (rule.isMember("Interval") && rule["Interval"].isInt()) return rule["Interval"].asInt(); return 10; } -void ParseSelfMonitorMetricRule(std::string&& ruleKey, Json::Value& ruleJson, SelfMonitorMetricRule& rule) { +void ParseSelfMonitorMetricRule(std::string&& ruleKey, const Json::Value& ruleJson, SelfMonitorMetricRule& rule) { if (ruleJson.isMember(ruleKey) && ruleJson[ruleKey].isObject()) { rule.mEnable = GetEnabled(ruleJson[ruleKey]); rule.mInterval = GetInterval(ruleJson[ruleKey]); @@ -40,17 +40,12 @@ void ParseSelfMonitorMetricRule(std::string&& ruleKey, Json::Value& ruleJson, Se } bool InputSelfMonitorMetric::Init(const Json::Value& config, Json::Value& optionalGoPipeline) { - if (!config.isMember("Rules") && !config["Rules"].isObject()) { - LOG_ERROR(sLogger, ("init self-monitor metric input failed", "no rules found")); - return false; - } - Json::Value rules = config["Rules"]; - ParseSelfMonitorMetricRule("Agent", rules, mSelfMonitorMetricRules.mAgentMetricsRule); - ParseSelfMonitorMetricRule("Runner", rules, mSelfMonitorMetricRules.mRunnerMetricsRule); - ParseSelfMonitorMetricRule("Pipeline", rules, mSelfMonitorMetricRules.mPipelineMetricsRule); - ParseSelfMonitorMetricRule("PluginSource", rules, mSelfMonitorMetricRules.mPluginSourceMetricsRule); - ParseSelfMonitorMetricRule("Plugin", rules, mSelfMonitorMetricRules.mPluginMetricsRule); - ParseSelfMonitorMetricRule("Component", rules, mSelfMonitorMetricRules.mComponentMetricsRule); + ParseSelfMonitorMetricRule("Agent", config, mSelfMonitorMetricRules.mAgentMetricsRule); + ParseSelfMonitorMetricRule("Runner", config, mSelfMonitorMetricRules.mRunnerMetricsRule); + ParseSelfMonitorMetricRule("Pipeline", config, mSelfMonitorMetricRules.mPipelineMetricsRule); + ParseSelfMonitorMetricRule("PluginSource", config, mSelfMonitorMetricRules.mPluginSourceMetricsRule); + ParseSelfMonitorMetricRule("Plugin", config, mSelfMonitorMetricRules.mPluginMetricsRule); + ParseSelfMonitorMetricRule("Component", config, mSelfMonitorMetricRules.mComponentMetricsRule); return true; }