Skip to content

Commit

Permalink
add TimeoutSeconds config option for HttpDatasetSource
Browse files Browse the repository at this point in the history
  • Loading branch information
rompetroll committed Oct 23, 2024
1 parent c8ec2a9 commit 23a19f1
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 9 deletions.
1 change: 1 addition & 0 deletions DOCUMENTATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,7 @@ It can be configured as follows:
"Type": "HttpDatasetSource",
"Url": "full url of change endpoint of the datalayer to read from",
"TokenProvider": "optional: name of token provider that allows access"
"TimoutSeconds": "optional: timeout in seconds for the http request. 5s default"
}
}
```
Expand Down
6 changes: 6 additions & 0 deletions internal/jobs/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,12 @@ func (s *Scheduler) parseSource(jobConfig *JobConfiguration) (source.Source, err
src := &source.HTTPDatasetSource{}
src.Store = s.Store
src.Logger = s.Runner.logger.Named("HttpDatasetSource")
timeout, ok := sourceConfig["TimeoutSeconds"]
if ok {
src.Timeout = time.Duration(int(timeout.(float64))) * time.Second
} else {
src.Timeout = 5 * time.Second
}
endpoint, ok := sourceConfig["Url"]
if ok && endpoint != "" {
src.Endpoint = endpoint.(string)
Expand Down
21 changes: 12 additions & 9 deletions internal/jobs/source/http_dataset_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type HTTPDatasetSource struct {
TokenProvider security.Provider // for use in token auth
Store *server.Store
Logger *zap.SugaredLogger
Timeout time.Duration
}

func (httpDatasetSource *HTTPDatasetSource) StartFullSync() {
Expand Down Expand Up @@ -73,7 +74,7 @@ func (httpDatasetSource *HTTPDatasetSource) ReadEntities(ctx context.Context, si
ctx, cancel := context.WithCancel(ctx)
defer cancel()

netClient := httpClient()
netClient := httpClient(httpDatasetSource.Timeout)

// security
if httpDatasetSource.TokenProvider != nil {
Expand Down Expand Up @@ -119,7 +120,6 @@ func (httpDatasetSource *HTTPDatasetSource) ReadEntities(ctx context.Context, si
}
return nil
})

if err != nil {
return err
}
Expand All @@ -136,15 +136,18 @@ func (httpDatasetSource *HTTPDatasetSource) ReadEntities(ctx context.Context, si
return nil
}

var globalHttpClient *http.Client
var globalHttpClients map[time.Duration]*http.Client

// use common http client for all http sources, to reuse connections
func httpClient() *http.Client {
if globalHttpClient == nil {
func httpClient(timeout time.Duration) *http.Client {
if globalHttpClients == nil {
globalHttpClients = make(map[time.Duration]*http.Client)
}
if globalHttpClients[timeout] == nil {
// set up a transport with sane defaults, but with a default content timeout of 0 (infinite)
netTransport := &http.Transport{
DialContext: (&net.Dialer{
Timeout: 5 * time.Second,
Timeout: timeout,
// keep connections alive a short time
KeepAliveConfig: net.KeepAliveConfig{
Enable: true,
Expand All @@ -153,16 +156,16 @@ func httpClient() *http.Client {
Count: 3,
},
}).DialContext,
TLSHandshakeTimeout: 5 * time.Second,
TLSHandshakeTimeout: timeout,
MaxIdleConnsPerHost: 1000,
MaxConnsPerHost: 1000,
}
netClient := &http.Client{
Transport: netTransport,
}
globalHttpClient = netClient
globalHttpClients[timeout] = netClient
}
return globalHttpClient
return globalHttpClients[timeout]
}

func (httpDatasetSource *HTTPDatasetSource) GetConfig() map[string]interface{} {
Expand Down

0 comments on commit 23a19f1

Please sign in to comment.