Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

specgen: add more tests, rename connector to specification #206

Merged
merged 13 commits into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
.PHONY: test
test:
go test $(GOTEST_FLAGS) -race ./...
echo
echo "Running integration tests..."
echo
cd specgen/specgen/testdata/ && go test $(GOTEST_FLAGS) -race ./...


.PHONY: fmt
fmt:
Expand Down
2 changes: 2 additions & 0 deletions acceptance_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

package sdk

//nolint:dupword // the tests are commented out while specgen is WIP
// todo acceptance tests will be re-enabled in https://github.com/ConduitIO/conduit-connector-sdk/issues/210
/*

import (
Expand Down
5 changes: 4 additions & 1 deletion destination_middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func (c *DefaultDestinationMiddleware) Validate(ctx context.Context) error {
f := valType.Field(i)
if f.Type.Implements(validatableInterface) {
// This is a DestinationConfig struct, validate it.
//nolint:forcetypeassert // type checked above with f.Type.Implements()
errs = append(errs, val.Field(i).Interface().(Validatable).Validate(ctx))
}
}
Expand All @@ -87,6 +88,7 @@ func DestinationWithMiddleware(d Destination) Destination {
}
cfgVal = cfgVal.Elem()

// Collect all middlewares from the config and wrap the destination with them
var mw []DestinationMiddleware
for i := range cfgVal.NumField() {
field := cfgVal.Field(i)
Expand All @@ -98,6 +100,7 @@ func DestinationWithMiddleware(d Destination) Destination {
}
if field.Type().Implements(destinationMiddlewareType) {
// This is a middleware config, store it.
//nolint:forcetypeassert // type checked above with field.Type().Implements()
mw = append(mw, field.Interface().(DestinationMiddleware))
}
}
Expand Down Expand Up @@ -188,7 +191,7 @@ func (*destinationWithBatch) getBatchConfig(ctx context.Context) DestinationWith
type DestinationWithRateLimit struct {
UnimplementedDestinationConfig

// Maximum umber of records written per second (0 means no rate limit).
// Maximum number of records written per second (0 means no rate limit).
RatePerSecond float64 `json:"sdk.rate.perSecond" default:"0" validate:"gt=-1"`
// Allow bursts of at most X records (0 or less means that bursts are not
// limited). Only takes effect if a rate limit per second is set. Note that
Expand Down
6 changes: 4 additions & 2 deletions source_middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,10 @@ import (
"sync"
"time"

"github.com/conduitio/conduit-connector-sdk/internal"

"github.com/conduitio/conduit-commons/cchan"
"github.com/conduitio/conduit-commons/config"
"github.com/conduitio/conduit-commons/opencdc"
"github.com/conduitio/conduit-connector-sdk/internal"
"github.com/conduitio/conduit-connector-sdk/schema"
"github.com/jpillora/backoff"
)
Expand Down Expand Up @@ -65,6 +64,7 @@ func (c *DefaultSourceMiddleware) Validate(ctx context.Context) error {
f := valType.Field(i)
if f.Type.Implements(validatableInterface) {
// This is a DestinationConfig struct, validate it.
//nolint:forcetypeassert // type checked above with f.Type.Implements()
errs = append(errs, val.Field(i).Interface().(Validatable).Validate(ctx))
}
}
Expand All @@ -83,6 +83,7 @@ func SourceWithMiddleware(s Source) Source {
}
cfgVal = cfgVal.Elem()

// Collect all middlewares from the config and wrap the source with them
var mw []SourceMiddleware
for i := range cfgVal.NumField() {
field := cfgVal.Field(i)
Expand All @@ -94,6 +95,7 @@ func SourceWithMiddleware(s Source) Source {
}
if field.Type().Implements(sourceMiddlewareType) {
// This is a middleware config, store it.
//nolint:forcetypeassert // type checked above with field.Type().Implements()
mw = append(mw, field.Interface().(SourceMiddleware))
}
}
Expand Down
2 changes: 1 addition & 1 deletion specgen/README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
# Specgen

TBD
TBD
24 changes: 15 additions & 9 deletions specgen/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,12 @@ func parseFlags() Args {
return args
}

// extractYAMLSpecification extracts the YAML specification from the connector
// found in `path`.
// `path` is the directory path to the package that contains the `Connector` variable.
func extractYAMLSpecification(ctx context.Context, path string) ([]byte, error) {
// Get the import path of the package.
// Get the import path of the package (that contains the connector code).
// It's needed in extractYAMLProgram to get the `Connector` variable.
out, err := specgen.Run(ctx, path, "go", "list", "-m")
if err != nil {
return nil, err
Expand All @@ -97,12 +101,14 @@ func extractYAMLSpecification(ctx context.Context, path string) ([]byte, error)
return runInDir(ctx, program, path)
}

func writeProgram(importPath string) ([]byte, error) {
// writeProgram writes the program that extracts the YAML specs
// from the connector that can be found at the connectorImportPath.
func writeProgram(connectorImportPath string) ([]byte, error) {
var program bytes.Buffer
data := reflectData{
ImportPath: importPath,
ImportPath: connectorImportPath,
}
if err := reflectProgram.Execute(&program, &data); err != nil {
if err := extractYAMLProgram.Execute(&program, &data); err != nil {
return nil, err
}
return program.Bytes(), nil
Expand All @@ -122,13 +128,13 @@ func runInDir(ctx context.Context, program []byte, dir string) ([]byte, error) {
}
}()
const progSource = "prog.go"
var progBinary = "prog.bin"
progBinary := "prog.bin"
if runtime.GOOS == "windows" {
// Windows won't execute a program unless it has a ".exe" suffix.
progBinary += ".exe"
}

if err := os.WriteFile(filepath.Join(tmpDir, progSource), program, 0600); err != nil {
if err := os.WriteFile(filepath.Join(tmpDir, progSource), program, 0o600); err != nil {
return nil, err
}

Expand All @@ -155,9 +161,9 @@ type reflectData struct {
ImportPath string
}

// This program uses reflection to traverse the connector configuration and
// prints the extracted specification to standard output.
var reflectProgram = template.Must(template.New("program").Parse(`
// extractYAMLProgram extract the YAML specs from a connector and writes it
// to standard output. This program uses reflection to traverse the connector configuration.
var extractYAMLProgram = template.Must(template.New("program").Parse(`
package main

import (
Expand Down
50 changes: 25 additions & 25 deletions specgen/specgen/model/v1/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,27 +37,27 @@ var Changelog = evolviconf.Changelog{
}

type Specification struct {
Version string `json:"version"`
Connector Connector `json:"connector"`
Version string `json:"version" yaml:"version"`
ConnectorSpecification ConnectorSpecification `json:"specification" yaml:"specification"`
}

type Connector struct {
Name string `json:"name"`
Summary string `json:"summary"`
Description string `json:"description"`
Version string `json:"version"`
Author string `json:"author"`
type ConnectorSpecification struct {
Name string `json:"name" yaml:"name"`
Summary string `json:"summary" yaml:"summary"`
Description string `json:"description" yaml:"description"`
Version string `json:"version" yaml:"version"`
Author string `json:"author" yaml:"author"`

Source Source `json:"source,omitempty"`
Destination Destination `json:"destination,omitempty"`
Source Source `json:"source,omitempty" yaml:"source,omitempty"`
Destination Destination `json:"destination,omitempty" yaml:"destination,omitempty"`
}

type Source struct {
Parameters Parameters `json:"parameters,omitempty"`
Parameters Parameters `json:"parameters,omitempty" yaml:"parameters,omitempty"`
}

type Destination struct {
Parameters Parameters `json:"parameters,omitempty"`
Parameters Parameters `json:"parameters,omitempty" yaml:"parameters,omitempty"`
}

type Parameters []Parameter
Expand All @@ -83,10 +83,10 @@ type Validation struct {

// ToConfig implements evolviconf.VersionedConfig.
func (s Specification) ToConfig() (pconnector.Specification, error) {
return s.Connector.ToConfig()
return s.ConnectorSpecification.ToConfig()
}

func (c Connector) ToConfig() (pconnector.Specification, error) {
func (c ConnectorSpecification) ToConfig() (pconnector.Specification, error) {
sourceParams, err := c.Source.ToConfig()
if err != nil {
return pconnector.Specification{}, err
Expand Down Expand Up @@ -220,12 +220,12 @@ func (v Validation) ToConfig() (config.Validation, error) {

func (s Specification) FromConfig(spec pconnector.Specification) Specification {
return Specification{
Version: LatestVersion,
Connector: Connector{}.FromConfig(spec),
Version: LatestVersion,
ConnectorSpecification: ConnectorSpecification{}.FromConfig(spec),
}
}

func (c Connector) FromConfig(spec pconnector.Specification) Connector {
func (c ConnectorSpecification) FromConfig(spec pconnector.Specification) ConnectorSpecification {
c.Name = spec.Name
c.Summary = spec.Summary
c.Description = spec.Description
Expand All @@ -238,18 +238,18 @@ func (c Connector) FromConfig(spec pconnector.Specification) Connector {
}

func (Parameters) FromConfig(params config.Parameters) Parameters {
var p Parameters
p := make(Parameters, len(params))

names := make([]string, 0, len(params))
for k := range maps.Keys(params) {
names = append(names, k)
}
slices.Sort(names)

for _, name := range names {
for i, name := range names {
paramOut := Parameter{}.FromConfig(params[name])
paramOut.Name = name
p = append(p, paramOut)
p[i] = paramOut
}
return p
}
Expand Down Expand Up @@ -284,9 +284,9 @@ func (ParameterType) FromConfig(t config.ParameterType) ParameterType {
}

func (Validations) FromConfig(v []config.Validation) Validations {
var validations Validations
for _, validation := range v {
validations = append(validations, Validation{}.FromConfig(validation))
validations := make(Validations, len(v))
for i, validation := range v {
validations[i] = Validation{}.FromConfig(validation)
}
return validations
}
Expand All @@ -300,12 +300,12 @@ func (v Validation) FromConfig(validation config.Validation) Validation {
case config.ValidationGreaterThan:
return Validation{
Type: config.ValidationTypeGreaterThan.String(),
Value: fmt.Sprintf("%f", val.V),
Value: fmt.Sprintf("%g", val.V),
}
case config.ValidationLessThan:
return Validation{
Type: config.ValidationTypeLessThan.String(),
Value: fmt.Sprintf("%f", val.V),
Value: fmt.Sprintf("%g", val.V),
}
case config.ValidationInclusion:
return Validation{
Expand Down
24 changes: 14 additions & 10 deletions specgen/specgen/specgen.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,17 @@ func SpecificationToYaml(spec pconnector.Specification) ([]byte, error) {
return yamlMarshal(v1.Specification{}.FromConfig(spec))
}

// WriteAndCombine combines the user-provided, custom information from an existing YAML file
// with the connector spec.
// `path` is the path to the existing YAML file.
// `yamlBytes` contains the connector's YAML spec.
func WriteAndCombine(yamlBytes []byte, path string) error {
// Read the existing YAML file.
existingRaw, err := os.ReadFile(path)
if err != nil {
if os.IsNotExist(err) {
// If the file doesn't exist, just write the new YAML directly
if err := os.WriteFile(path, yamlBytes, 0600); err != nil {
if err := os.WriteFile(path, yamlBytes, 0o600); err != nil {
return fmt.Errorf("failed to write YAML to file: %w", err)
}
return nil
Expand All @@ -79,11 +83,11 @@ func WriteAndCombine(yamlBytes []byte, path string) error {
}

out := struct {
Version string `yaml:"version"`
Connector struct {
v1.Connector `yaml:",inline"`
UnknownFields map[string]any `yaml:",inline"`
} `yaml:"connector"`
Version string `yaml:"version"`
Specification struct {
v1.ConnectorSpecification `yaml:",inline"`
UnknownFields map[string]any `yaml:",inline"`
} `yaml:"specification"`
UnknownFields map[string]any `yaml:",inline"`
}{}

Expand All @@ -102,7 +106,7 @@ func WriteAndCombine(yamlBytes []byte, path string) error {

// Merge the new map into the existing map, preserving existing fields
connectorUnknownFields, _ := unknownFields["connector"].(map[string]any)
connTyp := reflect.TypeFor[v1.Connector]()
connTyp := reflect.TypeFor[v1.ConnectorSpecification]()
for i := range connTyp.NumField() {
f := connTyp.Field(i)
fieldName := getYAMLFieldName(f)
Expand All @@ -113,7 +117,7 @@ func WriteAndCombine(yamlBytes []byte, path string) error {
delete(unknownFields, "connector")

out.UnknownFields = unknownFields
out.Connector.UnknownFields = connectorUnknownFields
out.Specification.UnknownFields = connectorUnknownFields

// Marshal the merged map back to YAML bytes
mergedYAML, err := yamlMarshal(out)
Expand All @@ -122,7 +126,7 @@ func WriteAndCombine(yamlBytes []byte, path string) error {
}

// Write the merged YAML to the file
if err := os.WriteFile(path, mergedYAML, 0600); err != nil {
if err := os.WriteFile(path, mergedYAML, 0o600); err != nil {
return fmt.Errorf("failed to write merged YAML to file: %w", err)
}

Expand Down Expand Up @@ -168,7 +172,7 @@ func parseParameters(ctx context.Context, cfg any) (config.Parameters, error) {
// from the cfg struct and updates the default value if that field is set to
// anything other than the zero value. It ignores fields that are not found.
func overwriteDefaults(params config.Parameters, cfg any) {
traverseFields(cfg, func(path string, field reflect.StructField, value reflect.Value) {
traverseFields(cfg, func(path string, _ reflect.StructField, value reflect.Value) {
param, ok := params[path]
if !ok {
// This shouldn't happen if the parameters were extracted from the
Expand Down
Loading