Skip to content

Commit

Permalink
Finish missing sources from Datastream Stream (#7175) (#13646)
Browse files Browse the repository at this point in the history
Signed-off-by: Modular Magician <magic-modules@google.com>
  • Loading branch information
modular-magician authored Feb 3, 2023
1 parent 427769e commit 2cf5bb3
Show file tree
Hide file tree
Showing 5 changed files with 5,229 additions and 1,394 deletions.
9 changes: 9 additions & 0 deletions .changelog/7175.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
```release-note:enhancement
datastream: Added `postgresql_source_config` & `oracle_source_config` in `google_datastream_stream`
```
```release-note:enhancement
datastream: Added support for `desired_state=RUNNING` in `google_datastream_stream`
```
```release-note:enhancement
datastream: Exposed validation errors in `google_datastream_stream`
```
63 changes: 54 additions & 9 deletions google/datastream_operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@ import (
"bytes"
"encoding/json"
"fmt"
datastream "google.golang.org/api/datastream/v1"
"time"

datastream "google.golang.org/api/datastream/v1"
)

type DatastreamOperationWaiter struct {
Config *Config
UserAgent string
Project string
Op datastream.Operation
CommonOperationWaiter
}

Expand All @@ -20,14 +22,22 @@ func (w *DatastreamOperationWaiter) QueryOp() (interface{}, error) {
return nil, fmt.Errorf("Cannot query operation, it's unset or nil.")
}
// Returns the proper get.
url := fmt.Sprintf("%s%s", w.Config.DatastreamBasePath, w.CommonOperationWaiter.Op.Name)
url := fmt.Sprintf("%s%s", w.Config.DatastreamBasePath, w.Op.Name)

return sendRequest(w.Config, "GET", w.Project, url, w.UserAgent, nil)
}

func (w *DatastreamOperationWaiter) Error() error {
if w != nil && w.Op.Error != nil {
return DatastreamError(*w.Op.Error)
return &DatastreamOperationError{Op: w.Op}
}
return nil
}

func (w *DatastreamOperationWaiter) SetOp(op interface{}) error {
w.CommonOperationWaiter.SetOp(op)
if err := Convert(op, &w.Op); err != nil {
return err
}
return nil
}
Expand All @@ -38,7 +48,7 @@ func createDatastreamWaiter(config *Config, op map[string]interface{}, project,
UserAgent: userAgent,
Project: project,
}
if err := w.CommonOperationWaiter.SetOp(op); err != nil {
if err := w.SetOp(op); err != nil {
return nil, err
}
return w, nil
Expand All @@ -53,7 +63,7 @@ func datastreamOperationWaitTimeWithResponse(config *Config, op map[string]inter
if err := OperationWait(w, activity, timeout, config.PollInterval); err != nil {
return err
}
return json.Unmarshal([]byte(w.CommonOperationWaiter.Op.Response), response)
return json.Unmarshal([]byte(w.Op.Response), response)
}

func datastreamOperationWaitTime(config *Config, op map[string]interface{}, project, activity, userAgent string, timeout time.Duration) error {
Expand All @@ -69,17 +79,52 @@ func datastreamOperationWaitTime(config *Config, op map[string]interface{}, proj
return OperationWait(w, activity, timeout, config.PollInterval)
}

// DatastreamError wraps datastream.Status and implements the
// DatastreamOperationError wraps datastream.Status and implements the
// error interface so it can be returned.
type DatastreamError datastream.Status
type DatastreamOperationError struct {
Op datastream.Operation
}

func (e DatastreamError) Error() string {
func (e DatastreamOperationError) Error() string {
var buf bytes.Buffer

for _, err := range e.Details {
for _, err := range e.Op.Error.Details {
buf.Write(err)
buf.WriteString("\n")
}
if validations := e.extractFailedValidationResult(); validations != nil {
buf.Write(validations)
buf.WriteString("\n")
}

return buf.String()
}

// extractFailedValidationResult extracts the internal failed validations
// if there are any.
func (e DatastreamOperationError) extractFailedValidationResult() []byte {
var metadata datastream.OperationMetadata
data, err := e.Op.Metadata.MarshalJSON()
if err != nil {
return nil
}
err = json.Unmarshal(data, &metadata)
if err != nil {
return nil
}
if metadata.ValidationResult == nil {
return nil
}
var res []byte
for _, v := range metadata.ValidationResult.Validations {
if v.State == "FAILED" {
data, err := v.MarshalJSON()
if err != nil {
return nil
}
res = append(res, data...)
res = append(res, []byte("\n")...)
}
}
return res
}
Loading

0 comments on commit 2cf5bb3

Please sign in to comment.