Skip to content

Commit

Permalink
Add salesforce input plugin (#3075)
Browse files Browse the repository at this point in the history
  • Loading branch information
rody authored and danielnelson committed Aug 8, 2017
1 parent b9b5b74 commit 2ef93a1
Show file tree
Hide file tree
Showing 4 changed files with 809 additions and 0 deletions.
1 change: 1 addition & 0 deletions plugins/inputs/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/redis"
_ "github.com/influxdata/telegraf/plugins/inputs/rethinkdb"
_ "github.com/influxdata/telegraf/plugins/inputs/riak"
_ "github.com/influxdata/telegraf/plugins/inputs/salesforce"
_ "github.com/influxdata/telegraf/plugins/inputs/sensors"
_ "github.com/influxdata/telegraf/plugins/inputs/snmp"
_ "github.com/influxdata/telegraf/plugins/inputs/snmp_legacy"
Expand Down
48 changes: 48 additions & 0 deletions plugins/inputs/salesforce/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Salesforce Input Plugin

The Salesforce plugin gathers metrics about the limits in your Salesforce organization and the remaining usage.
It fetches its data from the [limits endpoint](https://developer.salesforce.com/docs/atlas.en-us.api_rest.meta/api_rest/resources_limits.htm) of Salesforce's REST API.

### Configuration:

```toml
# Gather Metrics about Salesforce limits and remaining usage
[[inputs.salesforce]]
username = "your_username"
password = "your_password"
## (Optional) security tokjen
security_token = "your_security_token"
## (Optional) environment type (sandbox or production)
## default is: production
# environment = "production"
## (Optional) API version (default: "39.0")
# version = "39.0"
```

### Measurements & Fields:

Salesforce provide one measurment named "salesforce".
Each entry is converted to snake\_case and 2 fields are created.

- \<key\>_max represents the limit threshold
- \<key\>_remaining represents the usage remaining before hitting the limit threshold

- salesforce
- \<key\>_max (int)
- \<key\>_remaining (int)
- (...)

### Tags:

- All measurements have the following tags:
- host
- organization_id (t18 char organisation ID)


### Example Output:

```
$./telegraf --config telegraf.conf --input-filter salesforce --test
salesforce,organization_id=XXXXXXXXXXXXXXXXXX,host=xxxxx.salesforce.com daily_workflow_emails_max=546000i,hourly_time_based_workflow_max=50i,daily_async_apex_executions_remaining=250000i,daily_durable_streaming_api_events_remaining=1000000i,streaming_api_concurrent_clients_remaining=2000i,daily_bulk_api_requests_remaining=10000i,hourly_sync_report_runs_remaining=500i,daily_api_requests_max=5000000i,data_storage_mb_remaining=1073i,file_storage_mb_remaining=1069i,daily_generic_streaming_api_events_remaining=10000i,hourly_async_report_runs_remaining=1200i,hourly_time_based_workflow_remaining=50i,daily_streaming_api_events_remaining=1000000i,single_email_max=5000i,hourly_dashboard_refreshes_remaining=200i,streaming_api_concurrent_clients_max=2000i,daily_durable_generic_streaming_api_events_remaining=1000000i,daily_api_requests_remaining=4999998i,hourly_dashboard_results_max=5000i,hourly_async_report_runs_max=1200i,daily_durable_generic_streaming_api_events_max=1000000i,hourly_dashboard_results_remaining=5000i,concurrent_sync_report_runs_max=20i,durable_streaming_api_concurrent_clients_remaining=2000i,daily_workflow_emails_remaining=546000i,hourly_dashboard_refreshes_max=200i,daily_streaming_api_events_max=1000000i,hourly_sync_report_runs_max=500i,hourly_o_data_callout_max=10000i,mass_email_max=5000i,mass_email_remaining=5000i,single_email_remaining=5000i,hourly_dashboard_statuses_max=999999999i,concurrent_async_get_report_instances_max=200i,daily_durable_streaming_api_events_max=1000000i,daily_generic_streaming_api_events_max=10000i,hourly_o_data_callout_remaining=10000i,concurrent_sync_report_runs_remaining=20i,daily_bulk_api_requests_max=10000i,data_storage_mb_max=1073i,hourly_dashboard_statuses_remaining=999999999i,concurrent_async_get_report_instances_remaining=200i,daily_async_apex_executions_max=250000i,durable_streaming_api_concurrent_clients_max=2000i,file_storage_mb_max=1073i 1501565661000000000
```
245 changes: 245 additions & 0 deletions plugins/inputs/salesforce/salesforce.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
package salesforce

import (
"encoding/json"
"encoding/xml"
"errors"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"strings"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs"
)

var sampleConfig = `
## specify your credentials
##
username = "your_username"
password = "your_password"
##
## (optional) security token
# security_token = "your_security_token"
##
## (optional) environment type (sandbox or production)
## default is: production
##
# environment = "production"
##
## (optional) API version (default: "39.0")
##
# version = "39.0"
`

type limit struct {
Max int
Remaining int
}

type limits map[string]limit

type Salesforce struct {
Username string
Password string
SecurityToken string
Environment string
SessionID string
ServerURL *url.URL
OrganizationID string
Version string

client *http.Client
}

const defaultVersion = "39.0"
const defaultEnvironment = "production"

// returns a new Salesforce plugin instance
func NewSalesforce() *Salesforce {
tr := &http.Transport{
ResponseHeaderTimeout: time.Duration(5 * time.Second),
}
client := &http.Client{
Transport: tr,
Timeout: time.Duration(10 * time.Second),
}
return &Salesforce{
client: client,
Version: defaultVersion,
Environment: defaultEnvironment}
}

func (s *Salesforce) SampleConfig() string {
return sampleConfig
}

func (s *Salesforce) Description() string {
return "Read API usage and limits for a Salesforce organisation"
}

// Reads limits values from Salesforce API
func (s *Salesforce) Gather(acc telegraf.Accumulator) error {
limits, err := s.fetchLimits()
if err != nil {
return err
}

tags := map[string]string{
"organization_id": s.OrganizationID,
"host": s.ServerURL.Host,
}

fields := make(map[string]interface{})
for k, v := range limits {
key := internal.SnakeCase(k)
fields[key+"_max"] = v.Max
fields[key+"_remaining"] = v.Remaining
}

acc.AddFields("salesforce", fields, tags)
return nil
}

// query the limits endpoint
func (s *Salesforce) queryLimits() (*http.Response, error) {
endpoint := fmt.Sprintf("%s://%s/services/data/v%s/limits", s.ServerURL.Scheme, s.ServerURL.Host, s.Version)
req, err := http.NewRequest(http.MethodGet, endpoint, nil)
if err != nil {
return nil, err
}
req.Header.Add("Accept", "encoding/json")
req.Header.Add("Authorization", "Bearer "+s.SessionID)
return s.client.Do(req)
}

func (s *Salesforce) isAuthenticated() bool {
return s.SessionID != ""
}

func (s *Salesforce) fetchLimits() (limits, error) {
var l limits
if !s.isAuthenticated() {
if err := s.login(); err != nil {
return l, err
}
}

resp, err := s.queryLimits()
if err != nil {
return l, err
}
defer resp.Body.Close()

if resp.StatusCode == http.StatusUnauthorized {
if err = s.login(); err != nil {
return l, err
}
resp, err = s.queryLimits()
if err != nil {
return l, err
}
defer resp.Body.Close()
}

if resp.StatusCode != http.StatusOK {
return l, fmt.Errorf("Salesforce responded with unexpected status code %d", resp.StatusCode)
}

l = limits{}
err = json.NewDecoder(resp.Body).Decode(&l)
return l, err
}

func (s *Salesforce) getLoginEndpoint() (string, error) {
switch s.Environment {
case "sandbox":
return fmt.Sprintf("https://test.salesforce.com/services/Soap/c/%s/", s.Version), nil
case "production":
return fmt.Sprintf("https://login.salesforce.com/services/Soap/c/%s/", s.Version), nil
default:
return "", fmt.Errorf("unknown environment type: %s", s.Environment)
}
}

// Authenticate with Salesfroce
func (s *Salesforce) login() error {
if s.Username == "" || s.Password == "" {
return errors.New("missing username or password")
}

body := fmt.Sprintf(`<?xml version="1.0" encoding="utf-8"?>
<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/"
xmlns:urn="urn:enterprise.soap.sforce.com">
<soapenv:Body>
<urn:login>
<urn:username>%s</urn:username>
<urn:password>%s%s</urn:password>
</urn:login>
</soapenv:Body>
</soapenv:Envelope>`,
s.Username, s.Password, s.SecurityToken)

loginEndpoint, err := s.getLoginEndpoint()
if err != nil {
return err
}

req, err := http.NewRequest(http.MethodPost, loginEndpoint, strings.NewReader(body))
if err != nil {
return err
}
req.Header.Add("Content-Type", "text/xml")
req.Header.Add("SOAPAction", "login")

resp, err := s.client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()

respBody, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}

soapFault := struct {
Code string `xml:"Body>Fault>faultcode"`
Message string `xml:"Body>Fault>faultstring"`
}{}

err = xml.Unmarshal(respBody, &soapFault)
if err != nil {
return err
}

if soapFault.Code != "" {
return fmt.Errorf("login failed: %s", soapFault.Message)
}

loginResult := struct {
ServerURL string `xml:"Body>loginResponse>result>serverUrl"`
SessionID string `xml:"Body>loginResponse>result>sessionId"`
OrganizationID string `xml:"Body>loginResponse>result>userInfo>organizationId"`
}{}

err = xml.Unmarshal(respBody, &loginResult)
if err != nil {
return err
}

s.SessionID = loginResult.SessionID
s.OrganizationID = loginResult.OrganizationID
s.ServerURL, err = url.Parse(loginResult.ServerURL)

return err
}

func init() {
inputs.Add("salesforce", func() telegraf.Input {
return NewSalesforce()
})
}
Loading

0 comments on commit 2ef93a1

Please sign in to comment.