Skip to content

Commit

Permalink
specgen: add more tests, rename connector to specification (#206)
Browse files Browse the repository at this point in the history
  • Loading branch information
hariso authored Nov 18, 2024
1 parent 07483f5 commit f51b771
Show file tree
Hide file tree
Showing 51 changed files with 8,790 additions and 343 deletions.
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
.PHONY: test
test:
go test $(GOTEST_FLAGS) -race ./...
echo
echo "Running integration tests..."
echo
cd specgen/specgen/testdata/ && go test $(GOTEST_FLAGS) -race ./...


.PHONY: fmt
fmt:
Expand Down
2 changes: 2 additions & 0 deletions acceptance_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

package sdk

//nolint:dupword // the tests are commented out while specgen is WIP
// todo acceptance tests will be re-enabled in https://github.com/ConduitIO/conduit-connector-sdk/issues/210
/*
import (
Expand Down
5 changes: 4 additions & 1 deletion destination_middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func (c *DefaultDestinationMiddleware) Validate(ctx context.Context) error {
f := valType.Field(i)
if f.Type.Implements(validatableInterface) {
// This is a DestinationConfig struct, validate it.
//nolint:forcetypeassert // type checked above with f.Type.Implements()
errs = append(errs, val.Field(i).Interface().(Validatable).Validate(ctx))
}
}
Expand All @@ -87,6 +88,7 @@ func DestinationWithMiddleware(d Destination) Destination {
}
cfgVal = cfgVal.Elem()

// Collect all middlewares from the config and wrap the destination with them
var mw []DestinationMiddleware
for i := range cfgVal.NumField() {
field := cfgVal.Field(i)
Expand All @@ -98,6 +100,7 @@ func DestinationWithMiddleware(d Destination) Destination {
}
if field.Type().Implements(destinationMiddlewareType) {
// This is a middleware config, store it.
//nolint:forcetypeassert // type checked above with field.Type().Implements()
mw = append(mw, field.Interface().(DestinationMiddleware))
}
}
Expand Down Expand Up @@ -188,7 +191,7 @@ func (*destinationWithBatch) getBatchConfig(ctx context.Context) DestinationWith
type DestinationWithRateLimit struct {
UnimplementedDestinationConfig

// Maximum umber of records written per second (0 means no rate limit).
// Maximum number of records written per second (0 means no rate limit).
RatePerSecond float64 `json:"sdk.rate.perSecond" default:"0" validate:"gt=-1"`
// Allow bursts of at most X records (0 or less means that bursts are not
// limited). Only takes effect if a rate limit per second is set. Note that
Expand Down
6 changes: 4 additions & 2 deletions source_middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,10 @@ import (
"sync"
"time"

"github.com/conduitio/conduit-connector-sdk/internal"

"github.com/conduitio/conduit-commons/cchan"
"github.com/conduitio/conduit-commons/config"
"github.com/conduitio/conduit-commons/opencdc"
"github.com/conduitio/conduit-connector-sdk/internal"
"github.com/conduitio/conduit-connector-sdk/schema"
"github.com/jpillora/backoff"
)
Expand Down Expand Up @@ -65,6 +64,7 @@ func (c *DefaultSourceMiddleware) Validate(ctx context.Context) error {
f := valType.Field(i)
if f.Type.Implements(validatableInterface) {
// This is a DestinationConfig struct, validate it.
//nolint:forcetypeassert // type checked above with f.Type.Implements()
errs = append(errs, val.Field(i).Interface().(Validatable).Validate(ctx))
}
}
Expand All @@ -83,6 +83,7 @@ func SourceWithMiddleware(s Source) Source {
}
cfgVal = cfgVal.Elem()

// Collect all middlewares from the config and wrap the source with them
var mw []SourceMiddleware
for i := range cfgVal.NumField() {
field := cfgVal.Field(i)
Expand All @@ -94,6 +95,7 @@ func SourceWithMiddleware(s Source) Source {
}
if field.Type().Implements(sourceMiddlewareType) {
// This is a middleware config, store it.
//nolint:forcetypeassert // type checked above with field.Type().Implements()
mw = append(mw, field.Interface().(SourceMiddleware))
}
}
Expand Down
2 changes: 1 addition & 1 deletion specgen/README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
# Specgen

TBD
TBD
24 changes: 15 additions & 9 deletions specgen/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,12 @@ func parseFlags() Args {
return args
}

// extractYAMLSpecification extracts the YAML specification from the connector
// found in `path`.
// `path` is the directory path to the package that contains the `Connector` variable.
func extractYAMLSpecification(ctx context.Context, path string) ([]byte, error) {
// Get the import path of the package.
// Get the import path of the package (that contains the connector code).
// It's needed in extractYAMLProgram to get the `Connector` variable.
out, err := specgen.Run(ctx, path, "go", "list", "-m")
if err != nil {
return nil, err
Expand All @@ -97,12 +101,14 @@ func extractYAMLSpecification(ctx context.Context, path string) ([]byte, error)
return runInDir(ctx, program, path)
}

func writeProgram(importPath string) ([]byte, error) {
// writeProgram writes the program that extracts the YAML specs
// from the connector that can be found at the connectorImportPath.
func writeProgram(connectorImportPath string) ([]byte, error) {
var program bytes.Buffer
data := reflectData{
ImportPath: importPath,
ImportPath: connectorImportPath,
}
if err := reflectProgram.Execute(&program, &data); err != nil {
if err := extractYAMLProgram.Execute(&program, &data); err != nil {
return nil, err
}
return program.Bytes(), nil
Expand All @@ -122,13 +128,13 @@ func runInDir(ctx context.Context, program []byte, dir string) ([]byte, error) {
}
}()
const progSource = "prog.go"
var progBinary = "prog.bin"
progBinary := "prog.bin"
if runtime.GOOS == "windows" {
// Windows won't execute a program unless it has a ".exe" suffix.
progBinary += ".exe"
}

if err := os.WriteFile(filepath.Join(tmpDir, progSource), program, 0600); err != nil {
if err := os.WriteFile(filepath.Join(tmpDir, progSource), program, 0o600); err != nil {
return nil, err
}

Expand All @@ -155,9 +161,9 @@ type reflectData struct {
ImportPath string
}

// This program uses reflection to traverse the connector configuration and
// prints the extracted specification to standard output.
var reflectProgram = template.Must(template.New("program").Parse(`
// extractYAMLProgram extract the YAML specs from a connector and writes it
// to standard output. This program uses reflection to traverse the connector configuration.
var extractYAMLProgram = template.Must(template.New("program").Parse(`
package main
import (
Expand Down
50 changes: 25 additions & 25 deletions specgen/specgen/model/v1/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,27 +37,27 @@ var Changelog = evolviconf.Changelog{
}

type Specification struct {
Version string `json:"version"`
Connector Connector `json:"connector"`
Version string `json:"version" yaml:"version"`
ConnectorSpecification ConnectorSpecification `json:"specification" yaml:"specification"`
}

type Connector struct {
Name string `json:"name"`
Summary string `json:"summary"`
Description string `json:"description"`
Version string `json:"version"`
Author string `json:"author"`
type ConnectorSpecification struct {
Name string `json:"name" yaml:"name"`
Summary string `json:"summary" yaml:"summary"`
Description string `json:"description" yaml:"description"`
Version string `json:"version" yaml:"version"`
Author string `json:"author" yaml:"author"`

Source Source `json:"source,omitempty"`
Destination Destination `json:"destination,omitempty"`
Source Source `json:"source,omitempty" yaml:"source,omitempty"`
Destination Destination `json:"destination,omitempty" yaml:"destination,omitempty"`
}

type Source struct {
Parameters Parameters `json:"parameters,omitempty"`
Parameters Parameters `json:"parameters,omitempty" yaml:"parameters,omitempty"`
}

type Destination struct {
Parameters Parameters `json:"parameters,omitempty"`
Parameters Parameters `json:"parameters,omitempty" yaml:"parameters,omitempty"`
}

type Parameters []Parameter
Expand All @@ -83,10 +83,10 @@ type Validation struct {

// ToConfig implements evolviconf.VersionedConfig.
func (s Specification) ToConfig() (pconnector.Specification, error) {
return s.Connector.ToConfig()
return s.ConnectorSpecification.ToConfig()
}

func (c Connector) ToConfig() (pconnector.Specification, error) {
func (c ConnectorSpecification) ToConfig() (pconnector.Specification, error) {
sourceParams, err := c.Source.ToConfig()
if err != nil {
return pconnector.Specification{}, err
Expand Down Expand Up @@ -220,12 +220,12 @@ func (v Validation) ToConfig() (config.Validation, error) {

func (s Specification) FromConfig(spec pconnector.Specification) Specification {
return Specification{
Version: LatestVersion,
Connector: Connector{}.FromConfig(spec),
Version: LatestVersion,
ConnectorSpecification: ConnectorSpecification{}.FromConfig(spec),
}
}

func (c Connector) FromConfig(spec pconnector.Specification) Connector {
func (c ConnectorSpecification) FromConfig(spec pconnector.Specification) ConnectorSpecification {
c.Name = spec.Name
c.Summary = spec.Summary
c.Description = spec.Description
Expand All @@ -238,18 +238,18 @@ func (c Connector) FromConfig(spec pconnector.Specification) Connector {
}

func (Parameters) FromConfig(params config.Parameters) Parameters {
var p Parameters
p := make(Parameters, len(params))

names := make([]string, 0, len(params))
for k := range maps.Keys(params) {
names = append(names, k)
}
slices.Sort(names)

for _, name := range names {
for i, name := range names {
paramOut := Parameter{}.FromConfig(params[name])
paramOut.Name = name
p = append(p, paramOut)
p[i] = paramOut
}
return p
}
Expand Down Expand Up @@ -284,9 +284,9 @@ func (ParameterType) FromConfig(t config.ParameterType) ParameterType {
}

func (Validations) FromConfig(v []config.Validation) Validations {
var validations Validations
for _, validation := range v {
validations = append(validations, Validation{}.FromConfig(validation))
validations := make(Validations, len(v))
for i, validation := range v {
validations[i] = Validation{}.FromConfig(validation)
}
return validations
}
Expand All @@ -300,12 +300,12 @@ func (v Validation) FromConfig(validation config.Validation) Validation {
case config.ValidationGreaterThan:
return Validation{
Type: config.ValidationTypeGreaterThan.String(),
Value: fmt.Sprintf("%f", val.V),
Value: fmt.Sprintf("%g", val.V),
}
case config.ValidationLessThan:
return Validation{
Type: config.ValidationTypeLessThan.String(),
Value: fmt.Sprintf("%f", val.V),
Value: fmt.Sprintf("%g", val.V),
}
case config.ValidationInclusion:
return Validation{
Expand Down
24 changes: 14 additions & 10 deletions specgen/specgen/specgen.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,17 @@ func SpecificationToYaml(spec pconnector.Specification) ([]byte, error) {
return yamlMarshal(v1.Specification{}.FromConfig(spec))
}

// WriteAndCombine combines the user-provided, custom information from an existing YAML file
// with the connector spec.
// `path` is the path to the existing YAML file.
// `yamlBytes` contains the connector's YAML spec.
func WriteAndCombine(yamlBytes []byte, path string) error {
// Read the existing YAML file.
existingRaw, err := os.ReadFile(path)
if err != nil {
if os.IsNotExist(err) {
// If the file doesn't exist, just write the new YAML directly
if err := os.WriteFile(path, yamlBytes, 0600); err != nil {
if err := os.WriteFile(path, yamlBytes, 0o600); err != nil {
return fmt.Errorf("failed to write YAML to file: %w", err)
}
return nil
Expand All @@ -79,11 +83,11 @@ func WriteAndCombine(yamlBytes []byte, path string) error {
}

out := struct {
Version string `yaml:"version"`
Connector struct {
v1.Connector `yaml:",inline"`
UnknownFields map[string]any `yaml:",inline"`
} `yaml:"connector"`
Version string `yaml:"version"`
Specification struct {
v1.ConnectorSpecification `yaml:",inline"`
UnknownFields map[string]any `yaml:",inline"`
} `yaml:"specification"`
UnknownFields map[string]any `yaml:",inline"`
}{}

Expand All @@ -102,7 +106,7 @@ func WriteAndCombine(yamlBytes []byte, path string) error {

// Merge the new map into the existing map, preserving existing fields
connectorUnknownFields, _ := unknownFields["connector"].(map[string]any)
connTyp := reflect.TypeFor[v1.Connector]()
connTyp := reflect.TypeFor[v1.ConnectorSpecification]()
for i := range connTyp.NumField() {
f := connTyp.Field(i)
fieldName := getYAMLFieldName(f)
Expand All @@ -113,7 +117,7 @@ func WriteAndCombine(yamlBytes []byte, path string) error {
delete(unknownFields, "connector")

out.UnknownFields = unknownFields
out.Connector.UnknownFields = connectorUnknownFields
out.Specification.UnknownFields = connectorUnknownFields

// Marshal the merged map back to YAML bytes
mergedYAML, err := yamlMarshal(out)
Expand All @@ -122,7 +126,7 @@ func WriteAndCombine(yamlBytes []byte, path string) error {
}

// Write the merged YAML to the file
if err := os.WriteFile(path, mergedYAML, 0600); err != nil {
if err := os.WriteFile(path, mergedYAML, 0o600); err != nil {
return fmt.Errorf("failed to write merged YAML to file: %w", err)
}

Expand Down Expand Up @@ -168,7 +172,7 @@ func parseParameters(ctx context.Context, cfg any) (config.Parameters, error) {
// from the cfg struct and updates the default value if that field is set to
// anything other than the zero value. It ignores fields that are not found.
func overwriteDefaults(params config.Parameters, cfg any) {
traverseFields(cfg, func(path string, field reflect.StructField, value reflect.Value) {
traverseFields(cfg, func(path string, _ reflect.StructField, value reflect.Value) {
param, ok := params[path]
if !ok {
// This shouldn't happen if the parameters were extracted from the
Expand Down
Loading

0 comments on commit f51b771

Please sign in to comment.