Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Define the metrics data model and modify the plugin manager to support procress the metrics data #519

Merged
merged 50 commits into from
Dec 6, 2022
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
55b3035
Define the metrics data model and modify the plugin manager to suppor…
liuhaoyang Nov 15, 2022
0e095fa
Merge branch 'main' into data-model
liuhaoyang Nov 15, 2022
4e07119
Fix build & lint error
liuhaoyang Nov 15, 2022
a5bb7f7
metric_mock,processor_default,flusher_stdout support metrics models
liuhaoyang Nov 15, 2022
2d0f7c0
update vscode ignore rule
liuhaoyang Nov 15, 2022
294a006
update vscode ignore rule
liuhaoyang Nov 15, 2022
c7e0c2d
Optimize the metric print of stdout flusher
liuhaoyang Nov 16, 2022
b0ddc98
Make SlsInput and PipelineInput compatible
liuhaoyang Nov 16, 2022
bf8d838
fix InputMock
liuhaoyang Nov 16, 2022
34163a8
reset winmeta Opcodes
liuhaoyang Nov 16, 2022
9884f63
reset LocalCollector
liuhaoyang Nov 16, 2022
b87ee0a
reset codes
liuhaoyang Nov 16, 2022
af0f342
make both input run
liuhaoyang Nov 16, 2022
863f22b
reser
liuhaoyang Nov 17, 2022
51d3977
Fix metric_docker_file rdb mysql plugin run error
liuhaoyang Nov 17, 2022
b7b848c
reset skywalking bp.go
liuhaoyang Nov 17, 2022
12615b9
reset skywalking pb.go
liuhaoyang Nov 17, 2022
9d464ed
rename sls function name
liuhaoyang Nov 21, 2022
f905fe9
reset
liuhaoyang Nov 21, 2022
82a2717
reset
liuhaoyang Nov 21, 2022
a2a0320
reset
liuhaoyang Nov 21, 2022
13fcc3d
reset self_metric_imp_test
liuhaoyang Nov 21, 2022
a1a9293
reset
liuhaoyang Nov 21, 2022
ee6e36c
reset
liuhaoyang Nov 21, 2022
a2f14a4
Merge branch 'main' into data-model
liuhaoyang Nov 21, 2022
d4c6fc0
reset aggregators
liuhaoyang Nov 22, 2022
2ac09d3
reset aggregators
liuhaoyang Nov 22, 2022
ccf4cac
reset aggregators
liuhaoyang Nov 22, 2022
7760bf8
refactor plugin runner
liuhaoyang Nov 25, 2022
e1da6eb
fix input mock
liuhaoyang Nov 25, 2022
58d26ad
Merge branch 'main' into data-model
liuhaoyang Nov 25, 2022
235a2ac
refactor: plugin runner
liuhaoyang Nov 25, 2022
e620198
add pkg/constraints IntUintFloat
liuhaoyang Nov 25, 2022
63b0dfd
Fix restart pipeline
liuhaoyang Nov 25, 2022
82b2f8b
Fix flusher stop
liuhaoyang Nov 28, 2022
a09638c
Fix always run config tests
liuhaoyang Nov 28, 2022
3ee442d
Add some ut
liuhaoyang Nov 29, 2022
5f44479
Add ut for plugin runner
liuhaoyang Nov 30, 2022
1bf26a1
Add math_helper_test
liuhaoyang Dec 2, 2022
db85f07
Merge branch 'main' into data-model
liuhaoyang Dec 2, 2022
7628112
Add go version log
liuhaoyang Dec 2, 2022
4820cf1
Update makefile
liuhaoyang Dec 2, 2022
c74a4b6
for test
liuhaoyang Dec 2, 2022
c33e18c
reset constraints.IntUintFloat
liuhaoyang Dec 3, 2022
a501238
Merge branch 'main' into data-model
liuhaoyang Dec 5, 2022
3b337e6
refactor: rename plugin1 to xxxV1 % refactor pluginrunner
liuhaoyang Dec 5, 2022
b3d43c3
fix: Correctly handle the case comparison of version
liuhaoyang Dec 5, 2022
951d8d7
refactor: Add Initialized check for pluginRunner
liuhaoyang Dec 5, 2022
975de32
fix: support FlushOutStore overwrite
liuhaoyang Dec 6, 2022
3a697be
refactor: Optimize unsend data merge timing
liuhaoyang Dec 6, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ e2e-test
# vscode
.vscode
!.vscode/settings.json
!.vscode/launch.json
output
dist

Expand All @@ -57,4 +58,5 @@ dist
.coretestCoverage.txt
.testCoverage.txt
plugin_main/*.dll
plugin_main/*.so
plugin_main/*.so
plugin_main/*.json
2 changes: 1 addition & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@
}
}
]
}
}
24 changes: 18 additions & 6 deletions aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package ilogtail

import (
"github.com/alibaba/ilogtail/pkg/models"
"github.com/alibaba/ilogtail/pkg/protocol"
)

Expand All @@ -30,12 +31,23 @@ type Aggregator interface {
// Description returns a one-sentence description on the Input.
Description() string

// Add the metric to the aggregator.
Add(log *protocol.Log, ctx map[string]interface{}) error

// Flush pushes the current aggregates to the accumulator.
Flush() []*protocol.LogGroup

// Reset resets the aggregators caches and aggregates.
Reset()
}

type SlsAggregator interface {
Aggregator
// AddLogs the metric to the aggregator.
AddLogs(log *protocol.Log, ctx map[string]interface{}) error

// FlushLogs pushes the current aggregates to the accumulator.
FlushLogs() []*protocol.LogGroup
liuhaoyang marked this conversation as resolved.
Show resolved Hide resolved
}

type PipelineAggregator interface {
Aggregator
// Add the metric to the aggregator.
Add(*models.PipelineGroupEvents, PipelineContext) error
// Flush pushes the current aggregates to the accumulator.
Flush(PipelineContext) error
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type MetricInput interface {

// Collect takes in an accumulator and adds the metrics that the Input
// gathers. This is called every "interval"
Collect(Collector) error
CollectLogs(Collector) error
}
```

Expand Down
2 changes: 1 addition & 1 deletion docs/en/guides/How-to-write-input-plugins.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ Description() string

// Collect takes in an accumulator and adds the metrics that the Input
// gathers. This is called every "interval"
Collect(Collector) error
CollectLogs(Collector) error
}
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

//go:build windows
// +build windows

package wineventlog
Expand Down
22 changes: 17 additions & 5 deletions flusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package ilogtail

import (
"github.com/alibaba/ilogtail/pkg/models"
"github.com/alibaba/ilogtail/pkg/protocol"
)

Expand All @@ -34,11 +35,6 @@ type Flusher interface {
// stopped gracefully.
IsReady(projectName string, logstoreName string, logstoreKey int64) bool

// Flush flushes data to destination, such as SLS, console, file, etc.
// It is expected to return no error at most time because IsReady will be called
// before it to make sure there is space for next data.
Flush(projectName string, logstoreName string, configName string, logGroupList []*protocol.LogGroup) error

// SetUrgent indicates the flusher that it will be destroyed soon.
// @flag indicates if main program (Logtail mostly) will exit after calling this.
//
Expand All @@ -60,3 +56,19 @@ type Flusher interface {
// In a word, flusher should only have things that can be recycled by GC after this.
Stop() error
}

type SlsFlusher interface {
Flusher
// FlushLogs flushes data to destination, such as SLS, console, file, etc.
// It is expected to return no error at most time because IsReady will be called
// before it to make sure there is space for next data.
FlushLogs(projectName string, logstoreName string, configName string, logGroupList []*protocol.LogGroup) error
}

type PipelineFlusher interface {
Flusher
// FlushLogs flushes data to destination, such as SLS, console, file, etc.
liuhaoyang marked this conversation as resolved.
Show resolved Hide resolved
// It is expected to return no error at most time because IsReady will be called
// before it to make sure there is space for next data.
Flush([]*models.PipelineGroupEvents, PipelineContext) error
}
33 changes: 33 additions & 0 deletions helper/math_helper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2021 iLogtail Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package helper

type IntUintFloat interface {
~int | ~int8 | ~int16 | ~int32 | ~int64 | ~uint | ~uint8 | ~uint16 | ~uint32 | ~uint64 | ~float32 | ~float64
}

func Max[T IntUintFloat](x T, y T) T {
if x > y {
return x
}
return y
}

func Min[T IntUintFloat](x T, y T) T {
if x < y {
return x
}
return y
}
27 changes: 23 additions & 4 deletions input.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,20 @@ type MetricInput interface {

// Description returns a one-sentence description on the Input
Description() string
}

type SlsMetricInput interface {
MetricInput
// CollectLogs takes in an accumulator and adds the metrics that the Input
// gathers. This is called every "interval"
CollectLogs(Collector) error
}

type PipelineMetricInput interface {
MetricInput
// Collect takes in an accumulator and adds the metrics that the Input
// gathers. This is called every "interval"
Collect(Collector) error
Collect(PipelineContext) error
}

// ServiceInput ...
Expand All @@ -37,9 +47,18 @@ type ServiceInput interface {
// Description returns a one-sentence description on the Input
Description() string

// Start starts the ServiceInput's service, whatever that may be
Start(Collector) error

// Stop stops the services and closes any necessary channels and connections
Stop() error
}

type SlsServiceInput interface {
ServiceInput
// StartCollectLogs starts the ServiceInput's service, whatever that may be
StartCollectLogs(Collector) error
}

type PipelineServiceInput interface {
ServiceInput
// Start starts the ServiceInput's service, whatever that may be
Start(PipelineContext) error
}
128 changes: 128 additions & 0 deletions pipeline.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
// Copyright 2022 iLogtail Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ilogtail

import (
"github.com/alibaba/ilogtail/pkg/models"
)

type PipelineContext interface {
Collector() PipelineCollector
}

type PipelineCollector interface {
liuhaoyang marked this conversation as resolved.
Show resolved Hide resolved
Collect(group *models.GroupInfo, events ...models.PipelineEvent)
liuhaoyang marked this conversation as resolved.
Show resolved Hide resolved

Dump() []*models.PipelineGroupEvents

Observe() chan *models.PipelineGroupEvents
}

type observePipeCollector struct {
groupChan chan *models.PipelineGroupEvents
}

func (p *observePipeCollector) Collect(group *models.GroupInfo, events ...models.PipelineEvent) {
if len(events) == 0 {
return
}
p.groupChan <- &models.PipelineGroupEvents{
Group: group,
Events: events,
}
}

func (p *observePipeCollector) Dump() []*models.PipelineGroupEvents {
return nil
}

func (p *observePipeCollector) Observe() chan *models.PipelineGroupEvents {
return p.groupChan
}

type dumpPipeCollector struct {
groupEvents map[*models.GroupInfo][]models.PipelineEvent
}

func (p *dumpPipeCollector) Collect(group *models.GroupInfo, events ...models.PipelineEvent) {
store, has := p.groupEvents[group]
if !has {
store = make([]models.PipelineEvent, 0)
}
p.groupEvents[group] = append(store, events...)
}

func (p *dumpPipeCollector) Dump() []*models.PipelineGroupEvents {
len, idx := len(p.groupEvents), 0
results := make([]*models.PipelineGroupEvents, len)
if len == 0 {
return results
}
for group, events := range p.groupEvents {
results[idx] = &models.PipelineGroupEvents{
Group: group,
Events: events,
}
idx++
}
p.groupEvents = make(map[*models.GroupInfo][]models.PipelineEvent)
return results
}

func (p *dumpPipeCollector) Observe() chan *models.PipelineGroupEvents {
return nil
}

type voidPipeCollector struct {
}

func (p *voidPipeCollector) Collect(group *models.GroupInfo, events ...models.PipelineEvent) {
}

func (p *voidPipeCollector) Dump() []*models.PipelineGroupEvents {
return nil
}

func (p *voidPipeCollector) Observe() chan *models.PipelineGroupEvents {
return nil
}

type defaultPipelineContext struct {
collector PipelineCollector
}

func (p *defaultPipelineContext) Collector() PipelineCollector {
return p.collector
}

func NewObservePipelineConext(queueSize int) PipelineContext {
return newPipelineConext(&observePipeCollector{
groupChan: make(chan *models.PipelineGroupEvents, queueSize),
})
}

func NewDumpPipelineConext() PipelineContext {
return newPipelineConext(&dumpPipeCollector{
groupEvents: make(map[*models.GroupInfo][]models.PipelineEvent),
})
}

func NewVoidPipelineConext() PipelineContext {
return newPipelineConext(&voidPipeCollector{})
}

func newPipelineConext(collector PipelineCollector) PipelineContext {
return &defaultPipelineContext{collector: collector}
}
Loading