Skip to content

Commit

Permalink
Merge pull request #125 from wanjunlei/master
Browse files Browse the repository at this point in the history
Add supports for Amazon ElasticSearch Service and Elastic's Elasticse…
  • Loading branch information
Benjamin Huo authored Aug 18, 2021
2 parents 57ca429 + 639d304 commit 5a4a5f6
Show file tree
Hide file tree
Showing 5 changed files with 1,221 additions and 3,305 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (

// +kubebuilder:object:generate:=true

// The es output plugin, allows to ingest your records into a Elasticsearch database.
// Elasticsearch is the es output plugin, allows to ingest your records into an Elasticsearch database.
type Elasticsearch struct {
// IP address or hostname of the target Elasticsearch instance
Host string `json:"host,omitempty"`
Expand All @@ -28,11 +28,25 @@ type Elasticsearch struct {
// otherwise the value must be according to the Unit Size specification.
// +kubebuilder:validation:Pattern:="^\\d+(k|K|KB|kb|m|M|MB|mb|g|G|GB|gb)?$"
BufferSize string `json:"bufferSize,omitempty"`
// Newer versions of Elasticsearch allows to setup filters called pipelines.
// This option allows to define which pipeline the database should use.
// For performance reasons is strongly suggested to do parsing
// Newer versions of Elasticsearch allows setting up filters called pipelines.
// This option allows defining which pipeline the database should use.
// For performance reasons is strongly suggested parsing
// and filtering on Fluent Bit side, avoid pipelines.
Pipeline string `json:"pipeline,omitempty"`
// Enable AWS Sigv4 Authentication for Amazon ElasticSearch Service.
AWSAuth string `json:"awsAuth,omitempty"`
// Specify the AWS region for Amazon ElasticSearch Service.
AWSRegion string `json:"awsRegion,omitempty"`
// Specify the custom sts endpoint to be used with STS API for Amazon ElasticSearch Service.
AWSSTSEndpoint string `json:"awsSTSEndpoint,omitempty"`
// AWS IAM Role to assume to put records to your Amazon ES cluster.
AWSRoleARN string `json:"awsRoleARN,omitempty"`
// External ID for the AWS IAM Role specified with aws_role_arn.
AWSExternalID string `json:"awsExternalID,omitempty"`
// If you are using Elastic's Elasticsearch Service you can specify the cloud_id of the cluster running.
CloudID string `json:"cloudID,omitempty"`
// Specify the credentials to use to connect to Elastic's Elasticsearch Service running on Elastic Cloud.
CloudAuth string `json:"cloudAuth,omitempty"`
// Optional username credential for Elastic X-Pack access
HTTPUser *plugins.Secret `json:"httpUser,omitempty"`
// Password for user defined in HTTP_User
Expand Down Expand Up @@ -62,6 +76,8 @@ type Elasticsearch struct {
// When enabled, generate _id for outgoing records.
// This prevents duplicate records when retrying ES.
GenerateID *bool `json:"generateID,omitempty"`
// If set, _id will be the value of the key from incoming record and Generate_ID option is ignored.
IdKey string `json:"idKey,omitempty"`
// When enabled, replace field name dots with underscore, required by Elasticsearch 2.0-2.3.
ReplaceDots *bool `json:"replaceDots,omitempty"`
// When enabled print the elasticsearch API calls to stdout (for diag only)
Expand All @@ -72,15 +88,17 @@ type Elasticsearch struct {
CurrentTimeIndex *bool `json:"currentTimeIndex,omitempty"`
// Prefix keys with this string
LogstashPrefixKey string `json:"logstashPrefixKey,omitempty"`
*plugins.TLS `json:"tls,omitempty"`
// When enabled, mapping types is removed and Type option is ignored. Types are deprecated in APIs in v7.0. This options is for v7.0 or later.
SuppressTypeName string `json:"suppressTypeName,omitempty"`
*plugins.TLS `json:"tls,omitempty"`
}

// implement Section() method
// Name implement Section() method
func (_ *Elasticsearch) Name() string {
return "es"
}

// implement Section() method
// Params implement Section() method
func (es *Elasticsearch) Params(sl plugins.SecretLoader) (*plugins.KVs, error) {
kvs := plugins.NewKVs()
if es.Host != "" {
Expand All @@ -98,6 +116,27 @@ func (es *Elasticsearch) Params(sl plugins.SecretLoader) (*plugins.KVs, error) {
if es.Pipeline != "" {
kvs.Insert("Pipeline", es.Pipeline)
}
if es.AWSAuth != "" {
kvs.Insert("AWS_Auth", es.AWSAuth)
}
if es.AWSRegion != "" {
kvs.Insert("AWS_Region", es.AWSRegion)
}
if es.AWSSTSEndpoint != "" {
kvs.Insert("AWS_STS_Endpoint", es.AWSSTSEndpoint)
}
if es.AWSRoleARN != "" {
kvs.Insert("AWS_Role_ARN", es.AWSRoleARN)
}
if es.CloudID != "" {
kvs.Insert("Cloud_ID", es.CloudID)
}
if es.CloudAuth != "" {
kvs.Insert("Cloud_Auth", es.CloudAuth)
}
if es.AWSExternalID != "" {
kvs.Insert("AWS_External_ID", es.AWSExternalID)
}
if es.HTTPUser != nil {
u, err := sl.LoadSecret(*es.HTTPUser)
if err != nil {
Expand Down Expand Up @@ -142,6 +181,9 @@ func (es *Elasticsearch) Params(sl plugins.SecretLoader) (*plugins.KVs, error) {
if es.GenerateID != nil {
kvs.Insert("Generate_ID", fmt.Sprint(*es.GenerateID))
}
if es.IdKey != "" {
kvs.Insert("ID_KEY", es.IdKey)
}
if es.ReplaceDots != nil {
kvs.Insert("Replace_Dots", fmt.Sprint(*es.ReplaceDots))
}
Expand All @@ -157,6 +199,9 @@ func (es *Elasticsearch) Params(sl plugins.SecretLoader) (*plugins.KVs, error) {
if es.LogstashPrefixKey != "" {
kvs.Insert("Logstash_Prefix_Key", es.LogstashPrefixKey)
}
if es.SuppressTypeName != "" {
kvs.Insert("Suppress_Type_Name", es.SuppressTypeName)
}
if es.TLS != nil {
tls, err := es.TLS.Params(sl)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion api/fluentbitoperator/v1alpha2/plugins/tls_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type TLS struct {

func (t *TLS) Params(sl SecretLoader) (*KVs, error) {
kvs := NewKVs()
kvs.Insert("tls", "true")
kvs.Insert("tls", "On")
if t.Verify != nil {
kvs.Insert("tls.verify", fmt.Sprint(*t.Verify))
}
Expand Down
45 changes: 40 additions & 5 deletions config/crd/bases/logging.kubesphere.io_outputs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,24 @@ spec:
es:
description: Elasticsearch defines Elasticsearch Output configuration.
properties:
awsAuth:
description: Enable AWS Sigv4 Authentication for Amazon ElasticSearch
Service.
type: string
awsExternalID:
description: External ID for the AWS IAM Role specified with aws_role_arn.
type: string
awsRegion:
description: Specify the AWS region for Amazon ElasticSearch Service.
type: string
awsRoleARN:
description: AWS IAM Role to assume to put records to your Amazon
ES cluster.
type: string
awsSTSEndpoint:
description: Specify the custom sts endpoint to be used with STS
API for Amazon ElasticSearch Service.
type: string
bufferSize:
description: Specify the buffer size used to read the response
from the Elasticsearch HTTP service. This option is useful for
Expand All @@ -53,6 +71,14 @@ spec:
Size specification.
pattern: ^\d+(k|K|KB|kb|m|M|MB|mb|g|G|GB|gb)?$
type: string
cloudAuth:
description: Specify the credentials to use to connect to Elastic's
Elasticsearch Service running on Elastic Cloud.
type: string
cloudID:
description: If you are using Elastic's Elasticsearch Service
you can specify the cloud_id of the cluster running.
type: string
currentTimeIndex:
description: Use current time for index generation instead of
message record
Expand Down Expand Up @@ -121,6 +147,10 @@ spec:
type: object
type: object
type: object
idKey:
description: If set, _id will be the value of the key from incoming
record and Generate_ID option is ignored.
type: string
includeTagKey:
description: When enabled, it append the Tag name to the record.
type: boolean
Expand Down Expand Up @@ -153,11 +183,11 @@ spec:
indexing HTTP POST URI.
type: string
pipeline:
description: Newer versions of Elasticsearch allows to setup filters
called pipelines. This option allows to define which pipeline
the database should use. For performance reasons is strongly
suggested to do parsing and filtering on Fluent Bit side, avoid
pipelines.
description: Newer versions of Elasticsearch allows setting up
filters called pipelines. This option allows defining which
pipeline the database should use. For performance reasons is
strongly suggested parsing and filtering on Fluent Bit side,
avoid pipelines.
type: string
port:
description: TCP port of the target Elasticsearch instance
Expand All @@ -169,6 +199,11 @@ spec:
description: When enabled, replace field name dots with underscore,
required by Elasticsearch 2.0-2.3.
type: boolean
suppressTypeName:
description: When enabled, mapping types is removed and Type option
is ignored. Types are deprecated in APIs in v7.0. This options
is for v7.0 or later.
type: string
tagKey:
description: When Include_Tag_Key is enabled, this property defines
the key name for the tag.
Expand Down
Loading

0 comments on commit 5a4a5f6

Please sign in to comment.