Skip to content

Commit

Permalink
In order to achieve a relatively unified interface for SPL and other …
Browse files Browse the repository at this point in the history
…plugins, the interface definition has been optimized. (#1254)

* refine interface

* refine code

* refine code

* refine code

* fix comments
  • Loading branch information
linrunqi08 authored Dec 5, 2023
1 parent c3d0a9a commit 500a83b
Show file tree
Hide file tree
Showing 15 changed files with 292 additions and 120 deletions.
5 changes: 2 additions & 3 deletions core/pipeline/Pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -262,11 +262,10 @@ void Pipeline::Start() {
}
}

void Pipeline::Process(PipelineEventGroup&& logGroup, vector<PipelineEventGroup>& logGroupList) {
void Pipeline::Process(vector<PipelineEventGroup>& logGroupList) {
for (auto& p : mProcessorLine) {
p->Process(logGroup);
p->Process(logGroupList);
}
logGroupList.emplace_back(std::move(logGroup));
}

void Pipeline::Stop(bool isRemoving) {
Expand Down
2 changes: 1 addition & 1 deletion core/pipeline/Pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class Pipeline {
public:
bool Init(Config&& config);
void Start();
void Process(PipelineEventGroup&& logGroup, std::vector<PipelineEventGroup>& logGroupList);
void Process(std::vector<PipelineEventGroup>& logGroupList);
void Stop(bool isRemoving);

const std::string& Name() const { return mName; }
Expand Down
21 changes: 13 additions & 8 deletions core/plugin/instance/ProcessorInstance.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,20 +39,25 @@ bool ProcessorInstance::Init(const Json::Value& config, PipelineContext& context
return true;
}

void ProcessorInstance::Process(PipelineEventGroup& logGroup) {
size_t inSize = logGroup.GetEvents().size();

mProcInRecordsTotal->Add(inSize);
void ProcessorInstance::Process(std::vector<PipelineEventGroup>& logGroupList) {
if (logGroupList.size() <= 0) {
return;
}
for (const auto& logGroup : logGroupList) {
mProcInRecordsTotal->Add(logGroup.GetEvents().size());
}

PipelineEventGroup& logGroup = logGroupList[0];

uint64_t startTime = GetCurrentTimeInMicroSeconds();
mPlugin->Process(logGroup);
mPlugin->Process(logGroupList);
uint64_t durationTime = GetCurrentTimeInMicroSeconds() - startTime;

mProcTimeMS->Add(durationTime);

size_t outSize = logGroup.GetEvents().size();
mProcOutRecordsTotal->Add(outSize);
LOG_DEBUG(mPlugin->GetContext().GetLogger(), ("Processor", Id())("InSize", inSize)("OutSize", outSize));
for (const auto& logGroup : logGroupList) {
mProcOutRecordsTotal->Add(logGroup.GetEvents().size());
}
}

} // namespace logtail
2 changes: 1 addition & 1 deletion core/plugin/instance/ProcessorInstance.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class ProcessorInstance : public PluginInstance {
const std::string& Name() const override { return mPlugin->Name(); };

bool Init(const Json::Value& config, PipelineContext& context);
void Process(PipelineEventGroup& logGroup);
void Process(std::vector<PipelineEventGroup>& logGroupList);

private:
std::unique_ptr<Processor> mPlugin;
Expand Down
10 changes: 10 additions & 0 deletions core/plugin/interface/Processor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#include "plugin/interface/Processor.h"

namespace logtail {

void Processor::Process(std::vector<PipelineEventGroup>& logGroupList) {
for (auto& logGroup : logGroupList) {
Process(logGroup);
}
}
}
3 changes: 2 additions & 1 deletion core/plugin/interface/Processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@ class Processor : public Plugin {
virtual ~Processor() {}

virtual bool Init(const Json::Value& config) = 0;
virtual void Process(PipelineEventGroup& logGroup) = 0;
virtual void Process(std::vector<PipelineEventGroup>& logGroupList);

protected:
virtual bool IsSupportedEvent(const PipelineEventPtr& e) const = 0;
virtual void Process(PipelineEventGroup& logGroup) = 0;
};

} // namespace logtail
1 change: 1 addition & 0 deletions core/processor/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,5 @@ target_link_libraries(${PROJECT_NAME} monitor)
target_link_libraries(${PROJECT_NAME} reader)
target_link_libraries(${PROJECT_NAME} config_manager)
target_link_libraries(${PROJECT_NAME} fuse)
target_link_libraries(${PROJECT_NAME} plugin_interface)
link_re2(${PROJECT_NAME})
7 changes: 4 additions & 3 deletions core/processor/daemon/LogProcess.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ int LogProcess::ProcessBuffer(std::shared_ptr<LogBuffer>& logBuffer,
return -1;
}

std::vector<PipelineEventGroup> outputList;
std::vector<PipelineEventGroup> eventGroupList;
{
// construct a logGroup, it should be moved into input later
PipelineEventGroup eventGroup(logBuffer);
Expand All @@ -464,16 +464,17 @@ int LogProcess::ProcessBuffer(std::shared_ptr<LogBuffer>& logBuffer,
auto offsetStr = event->GetSourceBuffer()->CopyString(std::to_string(logBuffer->readOffset));
event->SetContentNoCopy(LOG_RESERVED_KEY_FILE_OFFSET, StringView(offsetStr.data, offsetStr.size));
eventGroup.AddEvent(std::move(event));
eventGroupList.emplace_back(std::move(eventGroup));
// process logGroup
pipeline->Process(std::move(eventGroup), outputList);
pipeline->Process(eventGroupList);
}

// record profile
auto& processProfile = pipeline->GetContext().GetProcessProfile();
profile = processProfile;
processProfile.Reset();

for (auto& eventGroup : outputList) {
for (auto& eventGroup : eventGroupList) {
// fill protobuf
FillLogGroupLogs(eventGroup, resultGroup, pipeline->GetContext().GetGlobalConfig().mEnableTimestampNanosecond);
FillLogGroupTags(eventGroup, logFileReader, resultGroup);
Expand Down
Loading

0 comments on commit 500a83b

Please sign in to comment.