Skip to content

Commit

Permalink
add flusher_local_file
Browse files Browse the repository at this point in the history
  • Loading branch information
Takuka0311 committed Nov 26, 2024
1 parent c01bf8b commit c791fd1
Show file tree
Hide file tree
Showing 10 changed files with 469 additions and 59 deletions.
5 changes: 4 additions & 1 deletion core/monitor/MetricManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,11 +178,14 @@ bool SelfMonitorMetricEvent::ShouldDelete() {
void SelfMonitorMetricEvent::ReadAsMetricEvent(MetricEvent* metricEventPtr) {
// time
auto now = GetCurrentLogtailTime();
metricEventPtr->SetTimestamp(AppConfig::GetInstance()->EnableLogTimeAutoAdjust() ? now.tv_sec + GetTimeDelta() : now.tv_sec);
metricEventPtr->SetTimestamp(AppConfig::GetInstance()->EnableLogTimeAutoAdjust() ? now.tv_sec + GetTimeDelta()
: now.tv_sec);
// __tag__
for (auto label = mLabels.begin(); label != mLabels.end(); label++) {
metricEventPtr->SetTag(label->first, label->second);
}
// name
metricEventPtr->SetName(mCategory);
// values
metricEventPtr->SetValue({});
for (auto counter = mCounters.begin(); counter != mCounters.end(); counter++) {
Expand Down
7 changes: 5 additions & 2 deletions core/pipeline/plugin/PluginRegistry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "app_config/AppConfig.h"
#include "common/Flags.h"
#include "plugin/flusher/blackhole/FlusherBlackHole.h"
#include "plugin/flusher/local_file/FlusherLocalFile.h"
#include "plugin/flusher/sls/FlusherSLS.h"
#include "plugin/input/InputContainerStdio.h"
#include "plugin/input/InputFile.h"
Expand All @@ -50,9 +51,9 @@
#include "plugin/processor/ProcessorParseJsonNative.h"
#include "plugin/processor/ProcessorParseRegexNative.h"
#include "plugin/processor/ProcessorParseTimestampNative.h"
#include "plugin/processor/inner/ProcessorPromParseMetricNative.h"
#include "plugin/processor/inner/ProcessorMergeMultilineLogNative.h"
#include "plugin/processor/inner/ProcessorParseContainerLogNative.h"
#include "plugin/processor/inner/ProcessorPromParseMetricNative.h"
#include "plugin/processor/inner/ProcessorPromRelabelMetricNative.h"
#include "plugin/processor/inner/ProcessorSplitLogStringNative.h"
#include "plugin/processor/inner/ProcessorSplitMultilineLogStringNative.h"
Expand Down Expand Up @@ -158,6 +159,7 @@ void PluginRegistry::LoadStaticPlugins() {

RegisterFlusherCreator(new StaticFlusherCreator<FlusherSLS>());
RegisterFlusherCreator(new StaticFlusherCreator<FlusherBlackHole>());
RegisterFlusherCreator(new StaticFlusherCreator<FlusherLocalFile>());
}

void PluginRegistry::LoadDynamicPlugins(const set<string>& plugins) {
Expand Down Expand Up @@ -222,7 +224,8 @@ void PluginRegistry::RegisterCreator(PluginCat cat, PluginCreator* creator) {
mPluginDict.emplace(PluginKey(cat, creator->Name()), shared_ptr<PluginCreator>(creator));
}

unique_ptr<PluginInstance> PluginRegistry::Create(PluginCat cat, const string& name, const PluginInstance::PluginMeta& pluginMeta) {
unique_ptr<PluginInstance>
PluginRegistry::Create(PluginCat cat, const string& name, const PluginInstance::PluginMeta& pluginMeta) {
unique_ptr<PluginInstance> ins;
auto creatorEntry = mPluginDict.find(PluginKey(cat, name));
if (creatorEntry != mPluginDict.end()) {
Expand Down
162 changes: 162 additions & 0 deletions core/pipeline/serializer/JsonSerializer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
// Copyright 2024 iLogtail Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "pipeline/serializer/JsonSerializer.h"

#include "constants/SpanConstants.h"
#include "protobuf/sls/LogGroupSerializer.h"

using namespace std;

namespace logtail {

const string JSON_KEY_TIME = "__time__";
const string JSON_KEY_TIME_NANO = "__time_nano__";
const string JSON_KEY_TAGS = "tags";
const string JSON_KEY_CONTENTS = "contents";

bool JsonEventGroupSerializer::Serialize(BatchedEvents&& group, string& res, string& errorMsg) {
if (group.mEvents.empty()) {
errorMsg = "empty event group";
return false;
}

PipelineEvent::Type eventType = group.mEvents[0]->GetType();
if (eventType == PipelineEvent::Type::NONE) {
// should not happen
errorMsg = "unsupported event type in event group";
return false;
}

Json::Value groupTags;
for (const auto& tag : group.mTags.mInner) {
groupTags[tag.first.to_string()] = tag.second.to_string();
}

std::ostringstream oss;
switch (eventType) {
case PipelineEvent::Type::LOG:
for (size_t i = 0; i < group.mEvents.size(); ++i) {
const auto& e = group.mEvents[i].Cast<LogEvent>();
Json::Value eventJson;
// time
eventJson[JSON_KEY_TIME] = e.GetTimestamp();
// tags
eventJson[JSON_KEY_TAGS] = Json::Value();
eventJson[JSON_KEY_TAGS].copy(groupTags);
// contents
eventJson[JSON_KEY_CONTENTS] = Json::Value();
for (const auto& kv : e) {
eventJson[JSON_KEY_CONTENTS][kv.first.to_string()] = kv.second.to_string();
}
oss << JsonToString(eventJson) << "\n";
}
break;
case PipelineEvent::Type::METRIC:
for (size_t i = 0; i < group.mEvents.size(); ++i) {
const auto& e = group.mEvents[i].Cast<MetricEvent>();
if (e.Is<std::monostate>()) {
continue;
}
Json::Value eventJson;
// time
eventJson[JSON_KEY_TIME] = e.GetTimestamp();
// tags
eventJson[JSON_KEY_TAGS] = Json::Value();
eventJson[JSON_KEY_TAGS].copy(groupTags);
// contents
eventJson[JSON_KEY_CONTENTS] = Json::Value();
// __labels__
eventJson[JSON_KEY_CONTENTS][METRIC_RESERVED_KEY_LABELS] = Json::Value();
for (auto tag = e.TagsBegin(); tag != e.TagsEnd(); tag++) {
eventJson[JSON_KEY_CONTENTS][METRIC_RESERVED_KEY_LABELS][tag->first.to_string()]
= tag->second.to_string();
}
// __name__
eventJson[JSON_KEY_CONTENTS][METRIC_RESERVED_KEY_NAME] = e.GetName().to_string();
// __value__
if (e.Is<UntypedSingleValue>()) {
eventJson[JSON_KEY_CONTENTS][METRIC_RESERVED_KEY_VALUE] = e.GetValue<UntypedSingleValue>()->mValue;
} else if (e.Is<UntypedMultiDoubleValues>()) {
eventJson[JSON_KEY_CONTENTS][METRIC_RESERVED_KEY_VALUE] = Json::Value();
for (auto value = e.GetValue<UntypedMultiDoubleValues>()->ValusBegin();
value != e.GetValue<UntypedMultiDoubleValues>()->ValusEnd();
value++) {
eventJson[JSON_KEY_CONTENTS][METRIC_RESERVED_KEY_VALUE][value->first.to_string()]
= value->second;
}
}
oss << JsonToString(eventJson) << "\n";
}
break;
case PipelineEvent::Type::SPAN:
for (size_t i = 0; i < group.mEvents.size(); ++i) {
const auto& e = group.mEvents[i].Cast<SpanEvent>();
Json::Value eventJson;
// time
eventJson[JSON_KEY_TIME] = e.GetTimestamp();
// tags
eventJson[JSON_KEY_TAGS] = Json::Value();
eventJson[JSON_KEY_TAGS].copy(groupTags);
// contents
eventJson[JSON_KEY_CONTENTS] = Json::Value();
// set trace_id span_id span_kind status etc
eventJson[JSON_KEY_CONTENTS][DEFAULT_TRACE_TAG_TRACE_ID] = e.GetTraceId().to_string();
eventJson[JSON_KEY_CONTENTS][DEFAULT_TRACE_TAG_SPAN_ID] = e.GetSpanId().to_string();
eventJson[JSON_KEY_CONTENTS][DEFAULT_TRACE_TAG_PARENT_ID] = e.GetParentSpanId().to_string();
// span_name
eventJson[JSON_KEY_CONTENTS][DEFAULT_TRACE_TAG_SPAN_NAME] = e.GetName().to_string();
// span_kind
eventJson[JSON_KEY_CONTENTS][DEFAULT_TRACE_TAG_SPAN_KIND] = GetKindString(e.GetKind());
// status_code
eventJson[JSON_KEY_CONTENTS][DEFAULT_TRACE_TAG_STATUS_CODE] = GetStatusString(e.GetStatus());
// trace state
eventJson[JSON_KEY_CONTENTS][DEFAULT_TRACE_TAG_TRACE_STATE] = e.GetTraceState().to_string();
eventJson[JSON_KEY_CONTENTS][DEFAULT_TRACE_TAG_ATTRIBUTES] = SerializeSpanTagsToString(e);
eventJson[JSON_KEY_CONTENTS][DEFAULT_TRACE_TAG_LINKS] = SerializeSpanLinksToString(e);
eventJson[JSON_KEY_CONTENTS][DEFAULT_TRACE_TAG_EVENTS] = SerializeSpanEventsToString(e);
// start_time
eventJson[JSON_KEY_CONTENTS][DEFAULT_TRACE_TAG_START_TIME_NANO] = std::to_string(e.GetStartTimeNs());
// end_time
eventJson[JSON_KEY_CONTENTS][DEFAULT_TRACE_TAG_END_TIME_NANO] = std::to_string(e.GetEndTimeNs());
// duration
eventJson[JSON_KEY_CONTENTS][DEFAULT_TRACE_TAG_DURATION]
= std::to_string(e.GetEndTimeNs() - e.GetStartTimeNs());

oss << JsonToString(eventJson) << "\n";
}
break;
case PipelineEvent::Type::RAW:
for (size_t i = 0; i < group.mEvents.size(); ++i) {
const auto& e = group.mEvents[i].Cast<RawEvent>();
Json::Value eventJson;
// time
eventJson[JSON_KEY_TIME] = e.GetTimestamp();
// tags
eventJson[JSON_KEY_TAGS] = Json::Value();
eventJson[JSON_KEY_TAGS].copy(groupTags);
// contents
eventJson[JSON_KEY_CONTENTS] = Json::Value();
eventJson[JSON_KEY_CONTENTS][DEFAULT_CONTENT_KEY] = e.GetContent().to_string();
oss << JsonToString(eventJson) << "\n";
}
break;
default:
break;
}
res = oss.str();
return true;
}

} // namespace logtail
34 changes: 34 additions & 0 deletions core/pipeline/serializer/JsonSerializer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2024 iLogtail Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <string>
#include <vector>

#include "pipeline/serializer/Serializer.h"

namespace logtail {

class JsonEventGroupSerializer : public Serializer<BatchedEvents> {
public:
JsonEventGroupSerializer(Flusher* f) : Serializer<BatchedEvents>(f) {}

private:
bool Serialize(BatchedEvents&& p, std::string& res, std::string& errorMsg) override;
};

} // namespace logtail
62 changes: 20 additions & 42 deletions core/pipeline/serializer/SLSSerializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,43 +14,22 @@

#include "pipeline/serializer/SLSSerializer.h"

#include <json/json.h>

#include <array>

#include "common/Flags.h"
#include "constants/SpanConstants.h"
#include "common/compression/CompressType.h"
#include "constants/SpanConstants.h"
#include "plugin/flusher/sls/FlusherSLS.h"
#include "protobuf/sls/LogGroupSerializer.h"
#include <json/json.h>
#include <array>

DECLARE_FLAG_INT32(max_send_log_group_size);

using namespace std;

namespace logtail {

std::string SerializeSpanLinksToString(const SpanEvent& event) {
if (event.GetLinks().empty()) {
return "";
}
Json::Value jsonLinks(Json::arrayValue);
for (const auto& link : event.GetLinks()) {
jsonLinks.append(link.ToJson());
}
Json::StreamWriterBuilder writer;
return Json::writeString(writer, jsonLinks);
}
std::string SerializeSpanEventsToString(const SpanEvent& event) {
if (event.GetEvents().empty()) {
return "";
}
Json::Value jsonEvents(Json::arrayValue);
for (const auto& event : event.GetEvents()) {
jsonEvents.append(event.ToJson());
}
Json::StreamWriterBuilder writer;
return Json::writeString(writer, jsonEvents);
}

template <>
bool Serializer<vector<CompressedLogGroup>>::DoSerialize(vector<CompressedLogGroup>&& p,
std::string& output,
Expand Down Expand Up @@ -97,7 +76,7 @@ bool SLSEventGroupSerializer::Serialize(BatchedEvents&& group, string& res, stri
vector<array<string, 6>> spanEventContentCache(group.mEvents.size());
size_t logGroupSZ = 0;
switch (eventType) {
case PipelineEvent::Type::LOG:{
case PipelineEvent::Type::LOG: {
for (size_t i = 0; i < group.mEvents.size(); ++i) {
const auto& e = group.mEvents[i].Cast<LogEvent>();
if (e.Empty()) {
Expand All @@ -111,11 +90,19 @@ bool SLSEventGroupSerializer::Serialize(BatchedEvents&& group, string& res, stri
}
break;
}
case PipelineEvent::Type::METRIC:{
case PipelineEvent::Type::METRIC: {
for (size_t i = 0; i < group.mEvents.size(); ++i) {
const auto& e = group.mEvents[i].Cast<MetricEvent>();
if (e.Is<UntypedSingleValue>()) {
metricEventContentCache[i].first = to_string(e.GetValue<UntypedSingleValue>()->mValue);
} else if (e.Is<UntypedMultiDoubleValues>()) {
Json::Value metricValues;
for (auto value = e.GetValue<UntypedMultiDoubleValues>()->ValusBegin();
value != e.GetValue<UntypedMultiDoubleValues>()->ValusEnd();
value++) {
metricValues[value->first.to_string()] = value->second;
}
metricEventContentCache[i].first = JsonToString(metricValues);
} else {
// should not happen
LOG_ERROR(sLogger,
Expand Down Expand Up @@ -145,22 +132,14 @@ bool SLSEventGroupSerializer::Serialize(BatchedEvents&& group, string& res, stri
contentSZ += GetLogContentSize(DEFAULT_TRACE_TAG_PARENT_ID.size(), e.GetParentSpanId().size());
contentSZ += GetLogContentSize(DEFAULT_TRACE_TAG_SPAN_NAME.size(), e.GetName().size());
contentSZ += GetLogContentSize(DEFAULT_TRACE_TAG_SPAN_KIND.size(), GetKindString(e.GetKind()).size());
contentSZ += GetLogContentSize(DEFAULT_TRACE_TAG_STATUS_CODE.size(), GetStatusString(e.GetStatus()).size());
contentSZ
+= GetLogContentSize(DEFAULT_TRACE_TAG_STATUS_CODE.size(), GetStatusString(e.GetStatus()).size());
contentSZ += GetLogContentSize(DEFAULT_TRACE_TAG_TRACE_STATE.size(), e.GetTraceState().size());
//

// set tags and scope tags
Json::Value jsonVal;
for (auto it = e.TagsBegin(); it != e.TagsEnd(); ++it) {
jsonVal[it->first.to_string()] = it->second.to_string();
}
for (auto it = e.ScopeTagsBegin(); it != e.ScopeTagsEnd(); ++it) {
jsonVal[it->first.to_string()] = it->second.to_string();
}
Json::StreamWriterBuilder writer;
std::string attrString = Json::writeString(writer, jsonVal);
auto attrString = SerializeSpanTagsToString(e);
contentSZ += GetLogContentSize(DEFAULT_TRACE_TAG_ATTRIBUTES.size(), attrString.size());
spanEventContentCache[i][0] = std::move(attrString);

auto linkString = SerializeSpanLinksToString(e);
contentSZ += GetLogContentSize(DEFAULT_TRACE_TAG_LINKS.size(), linkString.size());
spanEventContentCache[i][1] = std::move(linkString);
Expand Down Expand Up @@ -265,7 +244,7 @@ bool SLSEventGroupSerializer::Serialize(BatchedEvents&& group, string& res, stri
serializer.AddLogContent(DEFAULT_TRACE_TAG_TRACE_STATE, spanEvent.GetTraceState());

serializer.AddLogContent(DEFAULT_TRACE_TAG_ATTRIBUTES, spanEventContentCache[i][0]);

serializer.AddLogContent(DEFAULT_TRACE_TAG_LINKS, spanEventContentCache[i][1]);
serializer.AddLogContent(DEFAULT_TRACE_TAG_EVENTS, spanEventContentCache[i][2]);

Expand All @@ -275,7 +254,6 @@ bool SLSEventGroupSerializer::Serialize(BatchedEvents&& group, string& res, stri
serializer.AddLogContent(DEFAULT_TRACE_TAG_END_TIME_NANO, spanEventContentCache[i][4]);
// duration
serializer.AddLogContent(DEFAULT_TRACE_TAG_DURATION, spanEventContentCache[i][5]);

}
break;
case PipelineEvent::Type::RAW:
Expand Down
Loading

0 comments on commit c791fd1

Please sign in to comment.