Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: New Output Plugin Microsoft Fabric #5

Closed
wants to merge 9 commits into from
5 changes: 5 additions & 0 deletions plugins/outputs/all/microsoft_fabric.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
//go:build !custom || outputs || outputs.microsoft_fabric

package all

import _ "github.com/influxdata/telegraf/plugins/outputs/microsoft_fabric" // register plugin
114 changes: 114 additions & 0 deletions plugins/outputs/microsoft_fabric/microsoft_fabric.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
//go:generate ../../../tools/readme_config_includer/generator
package microsoft_fabric

import (
_ "embed"
"errors"
"strings"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/plugins/outputs"
ADX "github.com/influxdata/telegraf/plugins/outputs/azure_data_explorer"
EH "github.com/influxdata/telegraf/plugins/outputs/event_hubs"
"github.com/influxdata/telegraf/plugins/serializers/json"
)

//go:embed sample.conf
var sampleConfig string

type MicrosoftFabric struct {
ConnectionString string `toml:"connection_string"`
Log telegraf.Logger `toml:"-"`
ADXConf *ADX.AzureDataExplorer `toml:"adx_conf"`
EHConf *EH.EventHubs `toml:"eh_conf"`
FabricSinkService telegraf.Output
}

// Close implements telegraf.Output.
func (m *MicrosoftFabric) Close() error {
return m.FabricSinkService.Close()
}

// Connect implements telegraf.Output.
func (m *MicrosoftFabric) Connect() error {
return m.FabricSinkService.Connect()
}

// SampleConfig implements telegraf.Output.
func (m *MicrosoftFabric) SampleConfig() string {
return sampleConfig
}

// Write implements telegraf.Output.
func (m *MicrosoftFabric) Write(metrics []telegraf.Metric) error {
return m.FabricSinkService.Write(metrics)
}

func (m *MicrosoftFabric) Init() error {
ConnectionString := m.ConnectionString

if ConnectionString == "" {
return errors.New("endpoint must not be empty. For Kusto refer : https://learn.microsoft.com/kusto/api/connection-strings/kusto?view=microsoft-fabric for EventHouse refer : https://learn.microsoft.com/fabric/real-time-intelligence/event-streams/add-manage-eventstream-sources?pivots=enhanced-capabilities")
}

if strings.HasPrefix(ConnectionString, "Endpoint=sb") {
m.Log.Info("Detected EventHouse endpoint, using EventHouse output plugin")

//Need discussion on it
serializer := &json.Serializer{
TimestampUnits: config.Duration(time.Nanosecond),
TimestampFormat: time.RFC3339Nano,
}
m.EHConf.ConnectionString = ConnectionString
m.EHConf.Log = m.Log
m.EHConf.SetSerializer(serializer)
m.EHConf.Init()
m.FabricSinkService = m.EHConf
} else if isKustoEndpoint(strings.ToLower(ConnectionString)) {
m.Log.Info("Detected Kusto endpoint, using Kusto output plugin")
//Setting up the AzureDataExplorer plugin initial properties
m.ADXConf.Endpoint = ConnectionString
m.ADXConf.Log = m.Log
m.ADXConf.Init()
m.FabricSinkService = m.ADXConf
} else {
return errors.New("invalid connection string. For Kusto refer : https://learn.microsoft.com/kusto/api/connection-strings/kusto?view=microsoft-fabric for EventHouse refer : https://learn.microsoft.com/fabric/real-time-intelligence/event-streams/add-manage-eventstream-sources?pivots=enhanced-capabilities")
}
return nil
}

func isKustoEndpoint(endpoint string) bool {
prefixes := []string{
"data source=",
"addr=",
"address=",
"network address=",
"server=",
}

for _, prefix := range prefixes {
if strings.HasPrefix(endpoint, prefix) {
return true
}
}
return false
}

func init() {

outputs.Add("microsoft_fabric", func() telegraf.Output {
return &MicrosoftFabric{
ADXConf: &ADX.AzureDataExplorer{
Timeout: config.Duration(20 * time.Second),
CreateTables: true,
asaharn marked this conversation as resolved.
Show resolved Hide resolved
AppName: "Fabric.Telegraf",
},
EHConf: &EH.EventHubs{
Hub: &eventHub{},
Timeout: config.Duration(30 * time.Second),
},
}
})
}
28 changes: 28 additions & 0 deletions plugins/outputs/microsoft_fabric/microsoft_fabric_eh.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package microsoft_fabric

import (
"context"

eventhub "github.com/Azure/azure-event-hubs-go/v3"
)

type eventHub struct {
hub *eventhub.Hub
}

func (e *eventHub) GetHub(connectionString string) error {
hub, err := eventhub.NewHubFromConnectionString(connectionString)
if err != nil {
return err
}
e.hub = hub
return nil
}

func (e *eventHub) Close(ctx context.Context) error {
return e.hub.Close(ctx)
}

func (e *eventHub) SendBatch(ctx context.Context, iterator eventhub.BatchIterator, opts ...eventhub.BatchOption) error {
return e.hub.SendBatch(ctx, iterator, opts...)
}
19 changes: 19 additions & 0 deletions plugins/outputs/microsoft_fabric/microsoft_fabric_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package microsoft_fabric

import (
"testing"

"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestMicrosoftFabric_Init_EmptyConnectionString(t *testing.T) {
mf := &MicrosoftFabric{
ConnectionString: "",
Log: testutil.Logger{},
}
err := mf.Init()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tests for ConnectionString for SB and EventHouse too

require.Error(t, err)
assert.Equal(t, "endpoint must not be empty", err.Error())
}
60 changes: 60 additions & 0 deletions plugins/outputs/microsoft_fabric/sample.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Sends metrics to Microsoft Fabric
[[outputs.microsoft_fabric]]
## The URI property of the Azure Data Explorer resource on Azure
## ex: connection_string = https://myadxresource.australiasoutheast.kusto.windows.net
connection_string = ""


[outputs.microsoft_fabric.adx_conf]
## The Azure Data Explorer database that the metrics will be ingested into.
## The plugin will NOT generate this database automatically, it's expected that this database already exists before ingestion.
## ex: "exampledatabase"
database = ""

## Timeout for Azure Data Explorer operations
# timeout = "20s"

## Type of metrics grouping used when pushing to Azure Data Explorer.
## Default is "TablePerMetric" for one table per different metric.
## For more information, please check the plugin README.
# metrics_grouping_type = "TablePerMetric"

## Name of the single table to store all the metrics (Only needed if metrics_grouping_type is "SingleTable").
# table_name = ""

## Creates tables and relevant mapping if set to true(default).
## Skips table and mapping creation if set to false, this is useful for running Telegraf with the lowest possible permissions i.e. table ingestor role.
# create_tables = true

## Ingestion method to use.
## Available options are
## - managed -- streaming ingestion with fallback to batched ingestion or the "queued" method below
## - queued -- queue up metrics data and process sequentially
# ingestion_type = "queued"

[outputs.microsoft_fabric.eh_conf]
## The full connection string to the Event Hub (required)
## The shared access key must have "Send" permissions on the target Event Hub.

## Client timeout (defaults to 30s)
# timeout = "30s"

## Partition key
## Metric tag or field name to use for the event partition key. The value of
## this tag or field is set as the key for events if it exists. If both, tag
## and field, exist the tag is preferred.
# partition_key = ""

## Set the maximum batch message size in bytes
## The allowable size depends on the Event Hub tier
## See: https://learn.microsoft.com/azure/event-hubs/event-hubs-quotas#basic-vs-standard-vs-premium-vs-dedicated-tiers
## Setting this to 0 means using the default size from the Azure Event Hubs Client library (1000000 bytes)
# max_message_size = 1000000

## Data format to output.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
data_format = "json"


Loading