Skip to content

Commit

Permalink
Update qryn+gigapipe docs (#1594)
Browse files Browse the repository at this point in the history
Updates for the qryn opensource and qryn/gigapipe integrations

---------

Co-authored-by: akvlad <akvlad90@gmail.com>
Co-authored-by: Amir Blum <amirgiraffe@gmail.com>
  • Loading branch information
3 people authored Oct 17, 2024
1 parent 06f421a commit 41db3fd
Show file tree
Hide file tree
Showing 9 changed files with 383 additions and 110 deletions.
162 changes: 119 additions & 43 deletions common/config/qryn.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,103 +3,179 @@ package config
import (
"errors"
"fmt"
"math/rand"
"net/url"
"strings"

"github.com/odigos-io/odigos/common"
)

const (
qrynHost = "QRYN_URL"
qrynAPIKey = "QRYN_API_KEY"
qrynAPISecret = "${QRYN_API_SECRET}"
qrynHost = "QRYN_URL"
qrynAPIKey = "QRYN_API_KEY"
qrynAddExporterName = "QRYN_ADD_EXPORTER_NAME"
resourceToTelemetryConversion = "QRYN_RESOURCE_TO_TELEMETRY_CONVERSION"
qrynSecretsOptional = "__QRYN_SECRETS_OPTIONAL__"
qrynPasswordFieldName = "__QRYN_PASSWORD_FIELD_NAME__"
)

type qrynConf struct {
host string
key string
addExporterName bool
resourceToTelemetryConversion bool
secretsOptional bool
passwordFieldName string
}

type Qryn struct{}

func (g *Qryn) DestType() common.DestinationType {
return common.QrynDestinationType
}

func (g *Qryn) ModifyConfig(dest ExporterConfigurer, currentConfig *Config) error {
if !g.requiredVarsExists(dest) {
return errors.New("Qryn config is missing required variables")
}
apiKey, apiSecret := g.authData(dest)
if apiKey == "" || apiSecret == "" {
return errors.New("Qryn API key or secret not set")
conf := g.getConfigs(dest)
err := g.checkConfigs(&conf)
if err != nil {
return err
}

baseURL, err := parseURL(dest.GetConfig()[qrynHost], apiKey, apiSecret)
passwordPlaceholder := "${QRYN_API_SECRET}"
if conf.passwordFieldName != "" {
passwordPlaceholder = "${" + conf.passwordFieldName + "}"
}
baseURL, err := parseURL(dest.GetConfig()[qrynHost], conf.key, passwordPlaceholder)
if err != nil {
return errors.New("Qryn API host is not a valid")
return errors.Join(err, errors.New("invalid qryn endpoint. gateway will not be configured with qryn"))
}

if isMetricsEnabled(dest) {
rwExporterName := "prometheusremotewrite/qryn-" + dest.GetID()
currentConfig.Exporters[rwExporterName] = GenericMap{
"endpoint": fmt.Sprintf("%s/api/v1/prom/remote/write", baseURL),
"resource_to_telemetry_conversion": GenericMap{
"enabled": dest.GetConfig()[resourceToTelemetryConversion] == "Yes",
},
}
metricsPipelineName := "metrics/qryn-" + dest.GetID()
currentConfig.Service.Pipelines[metricsPipelineName] = Pipeline{
ppl := Pipeline{
Exporters: []string{rwExporterName},
}
g.maybeAddExporterName(
&conf,
currentConfig,
"resource/qryn-metrics-name-"+dest.GetID(),
"odigos-qryn-metrics",
&ppl,
)
currentConfig.Service.Pipelines[metricsPipelineName] = ppl

}

otlpHttpExporterName := ""
otlpHttpExporter := GenericMap{}
if isTracingEnabled(dest) {
exporterName := "otlp/qryn-" + dest.GetID()
currentConfig.Exporters[exporterName] = GenericMap{
"endpoint": fmt.Sprintf("%s/tempo/spans", baseURL),
}
otlpHttpExporterName = "otlphttp/qryn-" + dest.GetID()
otlpHttpExporter["traces_endpoint"] = fmt.Sprintf("%s/v1/traces", baseURL)
otlpHttpExporter["encoding"] = "proto"
otlpHttpExporter["compression"] = "none"
tracesPipelineName := "traces/qryn-" + dest.GetID()
currentConfig.Service.Pipelines[tracesPipelineName] = Pipeline{
Exporters: []string{exporterName},
ppl := Pipeline{
Exporters: []string{otlpHttpExporterName},
}
g.maybeAddExporterName(
&conf,
currentConfig,
"resource/qryn-traces-name-"+dest.GetID(),
"odigos-qryn-traces",
&ppl,
)
currentConfig.Service.Pipelines[tracesPipelineName] = ppl

}

if isLoggingEnabled(dest) {
lokiExporterName := "loki/qryn-" + dest.GetID()
currentConfig.Exporters[lokiExporterName] = GenericMap{
"endpoint": fmt.Sprintf("%s/loki/api/v1/push", baseURL),
"labels": GenericMap{
"attributes": GenericMap{
"k8s.container.name": "k8s_container_name",
"k8s.pod.name": "k8s_pod_name",
"k8s.namespace.name": "k8s_namespace_name",
},
},
}
otlpHttpExporterName = "otlphttp/qryn-" + dest.GetID()
otlpHttpExporter["logs_endpoint"] = fmt.Sprintf("%s/v1/logs", baseURL)
logsPipelineName := "logs/qryn-" + dest.GetID()
currentConfig.Service.Pipelines[logsPipelineName] = Pipeline{
Exporters: []string{lokiExporterName},
otlpHttpExporter["encoding"] = "proto"
otlpHttpExporter["compression"] = "none"
ppl := Pipeline{
Exporters: []string{otlpHttpExporterName},
}
g.maybeAddExporterName(
&conf,
currentConfig,
"resource/qryn-logs-name-"+dest.GetID(),
"odigos-qryn-logs",
&ppl,
)
currentConfig.Service.Pipelines[logsPipelineName] = ppl

}

if otlpHttpExporterName != "" {
currentConfig.Exporters[otlpHttpExporterName] = otlpHttpExporter
}

return nil
}

func (g *Qryn) requiredVarsExists(dest ExporterConfigurer) bool {
if _, ok := dest.GetConfig()[qrynHost]; !ok {
return false
func (g *Qryn) getConfigs(dest ExporterConfigurer) qrynConf {
return qrynConf{
host: dest.GetConfig()[qrynHost],
key: dest.GetConfig()[qrynAPIKey],
addExporterName: dest.GetConfig()[qrynAddExporterName] == "Yes",
resourceToTelemetryConversion: dest.GetConfig()[resourceToTelemetryConversion] == "Yes",
secretsOptional: dest.GetConfig()[qrynSecretsOptional] == "1",
passwordFieldName: dest.GetConfig()[qrynPasswordFieldName],
}
return true
}

func (g *Qryn) authData(dest ExporterConfigurer) (string, string) {
var key string
if k, ok := dest.GetConfig()[qrynAPIKey]; ok {
key = k
func (g *Qryn) checkConfigs(conf *qrynConf) error {
if conf.host == "" {
return errors.New("missing URL")
}
return key, qrynAPISecret
if !conf.secretsOptional && conf.key == "" {
return errors.New("missing API key")
}
return nil
}

func parseURL(rawURL, apiKey, apiSecret string) (string, error) {
rawURL = strings.TrimSpace(rawURL)
if !strings.HasPrefix(rawURL, "http://") && !strings.HasPrefix(rawURL, "https://") {
rawURL = "https://" + rawURL
}
u, err := url.Parse(rawURL)
if err != nil {
return "", err
}
if u.Scheme == "" {
return parseURL(fmt.Sprintf("https://%s", rawURL), apiKey, apiSecret)
apiSecretPlaceholder := fmt.Sprintf("____%d_SECRET_PLACEHOLDER_%[1]d____", rand.Uint64())
if apiKey != "" {
u.User = url.UserPassword(apiKey, apiSecretPlaceholder)
}
res := u.String()
if apiKey != "" {
res = strings.ReplaceAll(res, ":"+apiSecretPlaceholder+"@", ":"+apiSecret+"@")
}
return res, nil
}

return fmt.Sprintf("https://%s:%s@%s", apiKey, apiSecret, u.Host), nil
func (g *Qryn) maybeAddExporterName(conf *qrynConf, currentConfig *Config, processorName string, name string,
pipeline *Pipeline) {
if !conf.addExporterName {
return
}
currentConfig.Processors[processorName] = GenericMap{
"attributes": []GenericMap{
{
"action": "upsert",
"key": "qryn_exporter",
"value": name,
},
},
}
pipeline.Processors = append(pipeline.Processors, processorName)
}
39 changes: 39 additions & 0 deletions common/config/qryn_oss.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package config

import (
"github.com/odigos-io/odigos/common"
)

const (
qrynOssHost = "QRYN_OSS_URL"
qrynOssUsername = "QRYN_OSS_USERNAME"
qrynOssresourceToTelemetryConversion = "QRYN_OSS_RESOURCE_TO_TELEMETRY_CONVERSION"
qrynOssAddExporterName = "QRYN_OSS_ADD_EXPORTER_NAME"
)

type QrynOSS struct {
*Qryn
}

type QrynOssDest struct {
ExporterConfigurer
}

func (d QrynOssDest) GetConfig() map[string]string {
conf := d.ExporterConfigurer.GetConfig()
conf[qrynHost] = conf[qrynOssHost]
conf[qrynAPIKey] = conf[qrynOssUsername]
conf[resourceToTelemetryConversion] = conf[qrynOssresourceToTelemetryConversion]
conf[qrynSecretsOptional] = "1"
conf[qrynAddExporterName] = conf[qrynOssAddExporterName]
conf[qrynPasswordFieldName] = "QRYN_OSS_PASSWORD"
return conf
}

func (g *QrynOSS) DestType() common.DestinationType {
return common.QrynOSSDestinationType
}

func (g *QrynOSS) ModifyConfig(dest ExporterConfigurer, currentConfig *Config) error {
return g.Qryn.ModifyConfig(QrynOssDest{dest}, currentConfig)
}
66 changes: 33 additions & 33 deletions common/config/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ var availableConfigers = []Configer{
&Tempo{}, &Loki{}, &Jaeger{}, &GenericOTLP{}, &OTLPHttp{}, &Elasticsearch{}, &Quickwit{}, &Signoz{}, &Qryn{},
&OpsVerse{}, &Splunk{}, &Lightstep{}, &GoogleCloud{}, &GoogleCloudStorage{}, &Sentry{}, &AzureBlobStorage{},
&AWSS3{}, &Dynatrace{}, &Chronosphere{}, &ElasticAPM{}, &Axiom{}, &SumoLogic{}, &Coralogix{}, &Clickhouse{},
&Causely{}, &Uptrace{}, &Debug{},
&Causely{}, &Uptrace{}, &Debug{}, &QrynOSS{},
}

type Configer interface {
Expand Down Expand Up @@ -144,46 +144,46 @@ func CalculateWithBase(currentConfig *Config, prefixProcessors []string, dests [
// In addition it returns prefix processors that should be added to beginning of each pipeline.
func getBasicConfig(memoryLimiterConfig GenericMap) (*Config, []string) {
return &Config{
Receivers: GenericMap{
"otlp": GenericMap{
"protocols": GenericMap{
"grpc": GenericMap{
// setting it to a large value to avoid dropping batches.
"max_recv_msg_size_mib": 128 * 1024 * 1024,
"endpoint": "0.0.0.0:4317",
},
// Node collectors send in gRPC, so this is probably not needed
"http": GenericMap{
"endpoint": "0.0.0.0:4318",
Receivers: GenericMap{
"otlp": GenericMap{
"protocols": GenericMap{
"grpc": GenericMap{
// setting it to a large value to avoid dropping batches.
"max_recv_msg_size_mib": 128 * 1024 * 1024,
"endpoint": "0.0.0.0:4317",
},
// Node collectors send in gRPC, so this is probably not needed
"http": GenericMap{
"endpoint": "0.0.0.0:4318",
},
},
},
},
},
Processors: GenericMap{
memoryLimiterProcessorName: memoryLimiterConfig,
"resource/odigos-version": GenericMap{
"attributes": []GenericMap{
{
"key": "odigos.version",
"value": "${ODIGOS_VERSION}",
"action": "upsert",
Processors: GenericMap{
memoryLimiterProcessorName: memoryLimiterConfig,
"resource/odigos-version": GenericMap{
"attributes": []GenericMap{
{
"key": "odigos.version",
"value": "${ODIGOS_VERSION}",
"action": "upsert",
},
},
},
},
},
Extensions: GenericMap{
"health_check": GenericMap{
"endpoint": "0.0.0.0:13133",
Extensions: GenericMap{
"health_check": GenericMap{
"endpoint": "0.0.0.0:13133",
},
},
Exporters: map[string]interface{}{},
Connectors: map[string]interface{}{},
Service: Service{
Pipelines: map[string]Pipeline{},
Extensions: []string{"health_check"},
},
},
Exporters: map[string]interface{}{},
Connectors: map[string]interface{}{},
Service: Service{
Pipelines: map[string]Pipeline{},
Extensions: []string{"health_check"},
},
},
[]string{memoryLimiterProcessorName, "resource/odigos-version"}
[]string{memoryLimiterProcessorName, "resource/odigos-version"}
}

func LoadConfigers() (map[common.DestinationType]Configer, error) {
Expand Down
1 change: 1 addition & 0 deletions common/dests.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const (
OtlpHttpDestinationType DestinationType = "otlphttp"
PrometheusDestinationType DestinationType = "prometheus"
QrynDestinationType DestinationType = "qryn"
QrynOSSDestinationType DestinationType = "qryn-oss"
QuickwitDestinationType DestinationType = "quickwit"
SentryDestinationType DestinationType = "sentry"
SignozDestinationType DestinationType = "signoz"
Expand Down
Loading

0 comments on commit 41db3fd

Please sign in to comment.