Skip to content

Commit

Permalink
Pipeline processor (#2)
Browse files Browse the repository at this point in the history
upgrade processor to v2
  • Loading branch information
quzard authored Nov 30, 2023
1 parent 7781d0a commit 566a118
Show file tree
Hide file tree
Showing 43 changed files with 2,647 additions and 2,415 deletions.
4 changes: 2 additions & 2 deletions core/common/ParamExtractor.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@

#pragma once

#include <json/json.h>

#include <cstdint>
#include <string>
#include <type_traits>
#include <unordered_map>
#include <vector>

#include "json/json.h"

#include "common/StringTools.h"
#include "logger/Logger.h"

Expand Down
46 changes: 43 additions & 3 deletions core/common/TimeUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,23 @@
// limitations under the License.

#include "TimeUtil.h"

#include <memory.h>
#include <chrono>
#include <limits>

#include <atomic>
#include <chrono>
#include <cmath>
#include <limits>
#if defined(__linux__)
#include <sys/sysinfo.h>
#include <utmp.h>
#endif
#include "logger/Logger.h"
#include "common/LogtailCommonFlags.h"
#include "common/ParamExtractor.h"
#include "common/StringTools.h"
#include "common/Strptime.h"
#include "logger/Logger.h"
#include "pipeline/PipelineContext.h"

namespace logtail {

Expand Down Expand Up @@ -368,4 +373,39 @@ uint64_t GetCurrentTimeInNanoSeconds() {
.count();
}

bool ParseTimeZoneOffsetSecond(const std::string& logTZ, int& logTZSecond) {
if (logTZ.size() != strlen("GMT+08:00") || logTZ[6] != ':' || (logTZ[3] != '+' && logTZ[3] != '-')) {
return false;
}
if (logTZ.find("GMT") != (size_t)0) {
return false;
}
std::string hourStr = logTZ.substr(4, 2);
std::string minitueStr = logTZ.substr(7, 2);
logTZSecond = StringTo<int>(hourStr) * 3600 + StringTo<int>(minitueStr) * 60;
if (logTZ[3] == '-') {
logTZSecond = -logTZSecond;
}
return true;
}

bool ParseLogTimeZoneOffsetSecond(int& logTimeZoneOffsetSecond,
const std::string& logTZ,
std::string& errorMsg,
bool isAdjustmentNeeded) {
int logTZSecond = 0;
if (!ParseTimeZoneOffsetSecond(logTZ, logTZSecond)) {
errorMsg
= "invalid log time zone specified, will parse log time without time zone adjusted, time zone: " + logTZ;
return false;
} else {
if (isAdjustmentNeeded) {
logTimeZoneOffsetSecond = logTZSecond - GetLocalTimeZoneOffsetSecond();
} else {
logTimeZoneOffsetSecond = logTZSecond;
}
}
return true;
}

} // namespace logtail
13 changes: 11 additions & 2 deletions core/common/TimeUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
*/

#pragma once
#include <string>
#include <ctime>
#include <string>
#include <thread>
#include "log_pb/sls_logs.pb.h"

#include "common/Strptime.h"
#include "log_pb/sls_logs.pb.h"
#include "pipeline/PipelineContext.h"

// Time and timestamp utility.
namespace logtail {
Expand Down Expand Up @@ -88,4 +90,11 @@ LogtailTime GetCurrentLogtailTime();
uint64_t GetPreciseTimestamp(uint64_t secondTimestamp,
const char* preciseTimeSuffix,
const PreciseTimestampConfig& preciseTimestampConfig);
bool ParseTimeZoneOffsetSecond(const std::string& logTZ, int& logTZSecond);

bool ParseLogTimeZoneOffsetSecond(int& logTimeZoneOffsetSecond,
const std::string& logTZ,
std::string& errorMsg,
bool isAdjustmentNeeded);

} // namespace logtail
7 changes: 3 additions & 4 deletions core/models/PipelineEventGroup.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#pragma once
#include <memory>
#include <string>

#include "common/Constants.h"
#include "models/PipelineEventPtr.h"
#include "reader/SourceBuffer.h"
Expand Down Expand Up @@ -66,7 +67,7 @@ class PipelineEventGroup {
PipelineEventGroup& operator=(const PipelineEventGroup&) = delete;
PipelineEventGroup(PipelineEventGroup&&) noexcept = default;
PipelineEventGroup& operator=(PipelineEventGroup&&) noexcept = default;


const EventsContainer& GetEvents() const { return mEvents; }
EventsContainer& MutableEvents() { return mEvents; }
Expand All @@ -86,9 +87,7 @@ class PipelineEventGroup {
void DelMetadata(EventGroupMetaKey key);
GroupMetadata& MutableAllMetadata() { return mMetadata; };
void SwapAllMetadata(GroupMetadata& other) { mMetadata.swap(other); }
void SetAllMetadata(GroupMetadata& other) {
mMetadata = other;
}
void SetAllMetadata(GroupMetadata& other) { mMetadata = other; }

void SetTag(const StringView& key, const StringView& val);
void SetTag(const std::string& key, const std::string& val);
Expand Down
11 changes: 6 additions & 5 deletions core/pipeline/Pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@
#include "flusher/FlusherSLS.h"
#include "go_pipeline/LogtailPlugin.h"
#include "plugin/PluginRegistry.h"
#include "processor/daemon/LogProcess.h"
#include "processor/ProcessorParseApsaraNative.h"
#include "processor/ProcessorSplitLogStringNative.h"
#include "processor/ProcessorSplitRegexNative.h"
#include "processor/ProcessorParseApsaraNative.h"
#include "processor/ProcessorTagNative.h"
#include "processor/daemon/LogProcess.h"

// for special treatment
#include "input/InputFile.h"
#include "file_server/MultilineOptions.h"
#include "input/InputFile.h"

DECLARE_FLAG_INT32(default_plugin_log_queue_size);

Expand Down Expand Up @@ -84,8 +84,8 @@ bool Pipeline::Init(Config&& config) {

if (config.IsProcessRunnerInvolved()) {
Json::Value detail;
unique_ptr<ProcessorInstance> processor = PluginRegistry::GetInstance()->CreateProcessor(
ProcessorTagNative::sName, to_string(++pluginIndex));
unique_ptr<ProcessorInstance> processor
= PluginRegistry::GetInstance()->CreateProcessor(ProcessorTagNative::sName, to_string(++pluginIndex));
if (!processor->Init(detail, mContext)) {
// should not happen
return false;
Expand All @@ -106,6 +106,7 @@ bool Pipeline::Init(Config&& config) {
} else if (inputFile->mMultiline.IsMultiline()) {
processor = PluginRegistry::GetInstance()->CreateProcessor(ProcessorSplitRegexNative::sName,
to_string(++pluginIndex));
detail["Mode"] = Json::Value("custom");
detail["StartPattern"] = Json::Value(inputFile->mMultiline.mStartPattern);
detail["ContinuePattern"] = Json::Value(inputFile->mMultiline.mContinuePattern);
detail["EndPattern"] = Json::Value(inputFile->mMultiline.mEndPattern);
Expand Down
1 change: 1 addition & 0 deletions core/pipeline/Pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ class Pipeline {
friend class PipelineMock;
friend class PipelineUnittest;
friend class InputFileUnittest;
friend class ProcessorTagNativeUnittest;
#endif
};

Expand Down
6 changes: 3 additions & 3 deletions core/pipeline/PipelineContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,16 @@

#pragma once

#include <json/json.h>

#include <cstdint>
#include <string>

#include "json/json.h"

#include "common/LogstoreFeedbackKey.h"
#include "logger/Logger.h"
#include "models/PipelineEventGroup.h"
#include "monitor/LogtailAlarm.h"
#include "monitor/LogFileProfiler.h"
#include "monitor/LogtailAlarm.h"
#include "pipeline/GlobalConfig.h"

namespace logtail {
Expand Down
54 changes: 0 additions & 54 deletions core/processor/BaseFilterNode.h

This file was deleted.

62 changes: 0 additions & 62 deletions core/processor/BinaryFilterOperatorNode.h

This file was deleted.

61 changes: 61 additions & 0 deletions core/processor/CommonParserOptions.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright 2023 iLogtail Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "processor/CommonParserOptions.h"

#include "common/ParamExtractor.h"

namespace logtail {

bool CommonParserOptions::Init(const Json::Value& config, const PipelineContext& ctx, const std::string& pluginName) {
std::string errorMsg;
if (!GetOptionalBoolParam(config, "KeepingSourceWhenParseFail", mKeepingSourceWhenParseFail, errorMsg)) {
PARAM_WARNING_DEFAULT(ctx.GetLogger(), errorMsg, mKeepingSourceWhenParseFail, pluginName, ctx.GetConfigName());
}
if (!GetOptionalBoolParam(config, "KeepingSourceWhenParseSucceed", mKeepingSourceWhenParseSucceed, errorMsg)) {
PARAM_WARNING_DEFAULT(
ctx.GetLogger(), errorMsg, mKeepingSourceWhenParseSucceed, pluginName, ctx.GetConfigName());
}
if (!GetOptionalStringParam(config, "RenamedSourceKey", mRenamedSourceKey, errorMsg)) {
PARAM_WARNING_DEFAULT(ctx.GetLogger(), errorMsg, mRenamedSourceKey, pluginName, ctx.GetConfigName());
}
if (!GetOptionalBoolParam(config, "CopingRawLog", mCopingRawLog, errorMsg)) {
PARAM_WARNING_DEFAULT(ctx.GetLogger(), errorMsg, mCopingRawLog, pluginName, ctx.GetConfigName());
}
return true;
}
bool CommonParserOptions::ShouldAddUnmatchLog(bool parseSuccess) {
return !parseSuccess && mKeepingSourceWhenParseFail && mCopingRawLog;
}

// Parsing successful and original logs are retained or parsing failed and original logs are retained.
bool CommonParserOptions::ShouldAddRenamedSourceLog(bool parseSuccess) {
return (((parseSuccess && mKeepingSourceWhenParseSucceed) || (!parseSuccess && mKeepingSourceWhenParseFail)));
}
bool CommonParserOptions::ShouldEraseEvent(bool parseSuccess, const LogEvent& sourceEvent) {
if (!parseSuccess && !mKeepingSourceWhenParseFail) {
const auto& contents = sourceEvent.GetContents();
if (contents.empty()) {
return true;
}
// "__file_offset__"
if (contents.size() == 1 && (contents.begin()->first == LOG_RESERVED_KEY_FILE_OFFSET)) {
return true;
}
}
return false;
}
} // namespace logtail
Loading

0 comments on commit 566a118

Please sign in to comment.