Skip to content

Commit

Permalink
Merge pull request #323 from mimiro-io/feat/proxy-timeout
Browse files Browse the repository at this point in the history
allow configuring proxy dataset timeout
  • Loading branch information
rompetroll authored Sep 4, 2024
2 parents 256a583 + 6c78112 commit 0dcadd0
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 8 deletions.
18 changes: 15 additions & 3 deletions DOCUMENTATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -264,10 +264,23 @@ By setting up proxy datasets for the database layer, data hub can make the data
without having to load the data.

Proxy datasets need the base Url of the proxied remote dataset as configuration. Optionally a security provider can
be referenced if the remote dataset requires authentication. See (security providers)[#Working_with_security_providers]
be referenced, if the remote dataset requires authentication. See (security providers)[#Working_with_security_providers]
It is also possible to set a request timeout for the proxy dataset, if omitted the default timeout is unlimited.

To create a proxy dataset, use the following command:
```shell
> curl -XPOST \
--header "Content-Type: application/json" \
--data '{"ProxyDatasetConfig": {"remoteUrl": "https://dsurl", "proxyAuthProvider": "providerName", "timeoutSeconds": 10}}' \
http://datahub/datasets/dataset-name
```
> mim dataset create test.people --proxy=true --proxyRemoteUrl=https://url --proxyAuthProvider=authProviderName

or using the `mim` CLI:
```shell
> mim dataset create test.people \
--proxy=true \
--proxyRemoteUrl=https://remote/datasets/name \
--proxyAuthProvider=authProviderName

SUCCESS Dataset has been created
```
Expand Down Expand Up @@ -306,7 +319,6 @@ curl -XPOST \
--header "Content-Type: application/json" \
--data '{"virtualDatasetConfig": { "transform": "BASE64-ENCODED-TRANSFORMATION-SCRIPT" } }' \
http://datahub/datasets/dataset-name

```


Expand Down
1 change: 1 addition & 0 deletions internal/server/dataset.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type ProxyDatasetConfig struct {
UpstreamTransform string `json:"upstreamTransform"`
DownstreamTransform string `json:"downstreamTransform"`
AuthProviderName string `json:"authProviderName"`
TimeoutSeconds int `json:"timeoutSeconds"`
}

type VirtualDatasetConfig struct {
Expand Down
1 change: 1 addition & 0 deletions internal/server/dsmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func (dsm *DsManager) NewDatasetEntity(
entity.Properties[prefix+":authProviderName"] = proxyDatasetConfig.AuthProviderName
entity.Properties[prefix+":downstreamTransform"] = proxyDatasetConfig.DownstreamTransform
entity.Properties[prefix+":upstreamTransform"] = proxyDatasetConfig.UpstreamTransform
entity.Properties[prefix+":timeoutSeconds"] = proxyDatasetConfig.TimeoutSeconds
}

if virtualDatasetConfig != nil && virtualDatasetConfig.Transform != "" {
Expand Down
21 changes: 16 additions & 5 deletions internal/server/proxydataset.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (d *ProxyDataset) StreamEntitiesRaw(
}
uri.RawQuery = q.Encode()
fullURI := uri.String()
ctx, cancel := context.WithTimeout(context.Background(), 1000*time.Millisecond)
ctx, cancel := d.newHttpContext()
defer cancel()
req, err := http.NewRequestWithContext(ctx, "GET", fullURI, nil)
if err != nil {
Expand Down Expand Up @@ -126,6 +126,17 @@ func (d *ProxyDataset) StreamEntitiesRaw(
return cont.Properties["token"].(string), nil
}

func (d *ProxyDataset) newHttpContext() (context.Context, context.CancelFunc) {
var ctx context.Context
var cancel context.CancelFunc
if d.TimeoutSeconds > 0 {
ctx, cancel = context.WithTimeout(context.Background(), time.Duration(d.TimeoutSeconds)*time.Second)
} else {
ctx, cancel = context.WithCancel(context.Background())
}
return ctx, cancel
}

func (d *ProxyDataset) StreamEntities(from string, limit int, f func(*Entity) error, preStream func() error) (string, error) {
uri, err := url.Parse(d.RemoteEntitiesURL)
if err != nil {
Expand All @@ -140,7 +151,7 @@ func (d *ProxyDataset) StreamEntities(from string, limit int, f func(*Entity) er
}
uri.RawQuery = q.Encode()
fullUri := uri.String()
ctx, cancel := context.WithTimeout(context.Background(), 1000*time.Millisecond)
ctx, cancel := d.newHttpContext()
defer cancel()
req, err := http.NewRequestWithContext(ctx, "GET", fullUri, nil)
if err != nil {
Expand Down Expand Up @@ -216,7 +227,7 @@ func (d *ProxyDataset) StreamChangesRaw(

uri.RawQuery = q.Encode()
fullURI := uri.String()
ctx, cancel := context.WithTimeout(context.Background(), 1000*time.Millisecond)
ctx, cancel := d.newHttpContext()
defer cancel()
req, err := http.NewRequestWithContext(ctx, "GET", fullURI, nil)
if err != nil {
Expand Down Expand Up @@ -284,7 +295,7 @@ func (d *ProxyDataset) StreamChanges(since string, limit int, latestOnly bool, r

uri.RawQuery = q.Encode()
fullUri := uri.String()
ctx, cancel := context.WithTimeout(context.Background(), 1000*time.Millisecond)
ctx, cancel := d.newHttpContext()
defer cancel()
req, err := http.NewRequestWithContext(ctx, "GET", fullUri, nil)
if err != nil {
Expand Down Expand Up @@ -325,7 +336,7 @@ func (d *ProxyDataset) StreamChanges(since string, limit int, latestOnly bool, r
}

func (d *ProxyDataset) ForwardEntities(sourceBody io.ReadCloser, sourceHeader http.Header) error {
ctx, cancel := context.WithTimeout(context.Background(), 1000*time.Millisecond)
ctx, cancel := d.newHttpContext()
defer cancel()
req, _ := http.NewRequestWithContext(ctx, "POST", d.RemoteEntitiesURL, sourceBody)
for k, v := range sourceHeader {
Expand Down

0 comments on commit 0dcadd0

Please sign in to comment.