Skip to content
This repository has been archived by the owner on Jun 4, 2021. It is now read-only.

Commit

Permalink
Updating to latest 0.18 (go get -v -d knative.dev/eventing@release-0.…
Browse files Browse the repository at this point in the history
…18 && ./hack/update-deps.sh) (#1653)

Signed-off-by: Matthias Wessendorf <mwessend@redhat.com>
  • Loading branch information
matzew committed Oct 30, 2020
1 parent b9ce948 commit dd3a4ff
Show file tree
Hide file tree
Showing 40 changed files with 1,181 additions and 888 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ require (
k8s.io/apimachinery v0.18.8
k8s.io/client-go v11.0.1-0.20190805182717-6502b5e7b1b5+incompatible
k8s.io/utils v0.0.0-20200603063816-c1c6865ac451
knative.dev/eventing v0.18.0
knative.dev/eventing v0.18.4-0.20201028193234-25836253934e
knative.dev/pkg v0.0.0-20200922164940-4bf40ad82aab
knative.dev/test-infra v0.0.0-20200921012245-37f1a12adbd3
)
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1931,8 +1931,8 @@ k8s.io/utils v0.0.0-20200603063816-c1c6865ac451 h1:v8ud2Up6QK1lNOKFgiIVrZdMg7Mpm
k8s.io/utils v0.0.0-20200603063816-c1c6865ac451/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
knative.dev/caching v0.0.0-20190719140829-2032732871ff/go.mod h1:dHXFU6CGlLlbzaWc32g80cR92iuBSpsslDNBWI8C7eg=
knative.dev/caching v0.0.0-20200116200605-67bca2c83dfa/go.mod h1:dHXFU6CGlLlbzaWc32g80cR92iuBSpsslDNBWI8C7eg=
knative.dev/eventing v0.18.0 h1:DKRDpIAYZtFt959m4m9oWHnNBBH97yMP+nekrqmjOF8=
knative.dev/eventing v0.18.0/go.mod h1:Rv5V1Sk/XeG6vdEpRu+zDhEUDg2SgbkOJWRNssUyt50=
knative.dev/eventing v0.18.4-0.20201028193234-25836253934e h1:G56dQeg8sig6qubcLkl2u1OpYPHF1n/zowaiGD7p+HA=
knative.dev/eventing v0.18.4-0.20201028193234-25836253934e/go.mod h1:Rv5V1Sk/XeG6vdEpRu+zDhEUDg2SgbkOJWRNssUyt50=
knative.dev/eventing-contrib v0.6.1-0.20190723221543-5ce18048c08b/go.mod h1:SnXZgSGgMSMLNFTwTnpaOH7hXDzTFtw0J8OmHflNx3g=
knative.dev/eventing-contrib v0.6.1-0.20190723221543-5ce18048c08b/go.mod h1:SnXZgSGgMSMLNFTwTnpaOH7hXDzTFtw0J8OmHflNx3g=
knative.dev/eventing-contrib v0.6.1-0.20190723221543-5ce18048c08b/go.mod h1:SnXZgSGgMSMLNFTwTnpaOH7hXDzTFtw0J8OmHflNx3g=
Expand Down
41 changes: 39 additions & 2 deletions vendor/knative.dev/eventing/pkg/adapter/v2/cloudevents.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ import (
"context"
"errors"
"fmt"
nethttp "net/http"
"net/url"
"time"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/event"
Expand All @@ -35,9 +37,18 @@ import (
// NewCloudEventsClient returns a client that will apply the ceOverrides to
// outbound events and report outbound event counts.
func NewCloudEventsClient(target string, ceOverrides *duckv1.CloudEventOverrides, reporter source.StatsReporter) (cloudevents.Client, error) {
return NewCloudEventsClientCRStatus(target, ceOverrides, reporter, nil)
return newCloudEventsClientCRStatus(nil, target, ceOverrides, reporter, nil)
}
func NewCloudEventsClientCRStatus(target string, ceOverrides *duckv1.CloudEventOverrides, reporter source.StatsReporter, crStatusEventClient *crstatusevent.CRStatusEventClient) (cloudevents.Client, error) {
func NewCloudEventsClientCRStatus(env EnvConfigAccessor, reporter source.StatsReporter, crStatusEventClient *crstatusevent.CRStatusEventClient) (cloudevents.Client, error) {
return newCloudEventsClientCRStatus(env, "", nil, reporter, crStatusEventClient)
}
func newCloudEventsClientCRStatus(env EnvConfigAccessor, target string, ceOverrides *duckv1.CloudEventOverrides, reporter source.StatsReporter,
crStatusEventClient *crstatusevent.CRStatusEventClient) (cloudevents.Client, error) {

if target == "" && env != nil {
target = env.GetSink()
}

pOpts := make([]http.Option, 0)
if len(target) > 0 {
pOpts = append(pOpts, cloudevents.WithTarget(target))
Expand All @@ -46,6 +57,19 @@ func NewCloudEventsClientCRStatus(target string, ceOverrides *duckv1.CloudEventO
Propagation: tracecontextb3.TraceContextEgress,
}))

if env != nil {
if sinkWait := env.GetSinktimeout(); sinkWait > 0 {
pOpts = append(pOpts, setTimeOut(time.Duration(sinkWait)*time.Second))
}
var err error
if ceOverrides == nil {
ceOverrides, err = env.GetCloudEventOverrides()
if err != nil {
return nil, err
}
}
}

p, err := cloudevents.NewHTTP(pOpts...)
if err != nil {
return nil, err
Expand All @@ -67,6 +91,19 @@ func NewCloudEventsClientCRStatus(target string, ceOverrides *duckv1.CloudEventO
}, nil
}

func setTimeOut(duration time.Duration) http.Option {
return func(p *http.Protocol) error {
if p == nil {
return fmt.Errorf("http target option can not set nil protocol")
}
if p.Client == nil {
p.Client = &nethttp.Client{}
}
p.Client.Timeout = duration
return nil
}
}

type client struct {
ceClient cloudevents.Client
ceOverrides *duckv1.CloudEventOverrides
Expand Down
29 changes: 29 additions & 0 deletions vendor/knative.dev/eventing/pkg/adapter/v2/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package adapter

import (
"encoding/json"
"os"
"strconv"
"time"

"go.uber.org/zap"
Expand All @@ -43,6 +45,7 @@ const (
EnvConfigLoggingConfig = "K_LOGGING_CONFIG"
EnvConfigTracingConfig = "K_TRACING_CONFIG"
EnvConfigLeaderElectionConfig = "K_LEADER_ELECTION_CONFIG"
EnvSinkTimeout = "K_SINK_TIMEOUT"
)

// EnvConfig is the minimal set of configuration parameters
Expand Down Expand Up @@ -85,6 +88,9 @@ type EnvConfig struct {

// LeaderElectionConfigJson is the leader election component configuration.
LeaderElectionConfigJson string `envconfig:"K_LEADER_ELECTION_CONFIG"`

// Time in seconds to wait for sink to respond
EnvSinkTimeout int `envconfig:"K_SINK_TIMEOUT"`
}

// EnvConfigAccessor defines accessors for the minimal
Expand Down Expand Up @@ -114,6 +120,9 @@ type EnvConfigAccessor interface {

// GetLeaderElectionConfig returns leader election configuration.
GetLeaderElectionConfig() (*kle.ComponentConfig, error)

// Get the name of the adapter.
GetSinktimeout() int
}

var _ EnvConfigAccessor = (*EnvConfig)(nil)
Expand Down Expand Up @@ -158,6 +167,10 @@ func (e *EnvConfig) GetName() string {
return e.Name
}

func (e *EnvConfig) GetSinktimeout() int {
return e.EnvSinkTimeout
}

func (e *EnvConfig) SetupTracing(logger *zap.SugaredLogger) error {
config, err := tracingconfig.JsonToTracingConfig(e.TracingConfigJson)
if err != nil {
Expand Down Expand Up @@ -209,3 +222,19 @@ func LeaderElectionComponentConfigToJson(cfg *kle.ComponentConfig) (string, erro
jsonCfg, err := json.Marshal(cfg)
return string(jsonCfg), err
}

func GetSinkTimeout(logger *zap.SugaredLogger) int {
str := os.Getenv(EnvSinkTimeout)
if str != "" {
var err error
duration, err := strconv.Atoi(str)
if err != nil || duration < 0 {
if logger != nil {
logger.Errorf("%s environment value is invalid. It must be a integer greater than zero. (got %s)", EnvSinkTimeout, str)
}
return -1
}
return duration
}
return -1
}
8 changes: 1 addition & 7 deletions vendor/knative.dev/eventing/pkg/adapter/v2/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"context"
"flag"
"fmt"

"log"
"net/http"
"strconv"
Expand Down Expand Up @@ -160,12 +159,7 @@ func MainWithInformers(ctx context.Context, component string, env EnvConfigAcces
logger.Error("Error setting up trace publishing", zap.Error(err))
}

ceOverrides, err := env.GetCloudEventOverrides()
if err != nil {
logger.Error("Error loading cloudevents overrides", zap.Error(err))
}

eventsClient, err := NewCloudEventsClientCRStatus(env.GetSink(), ceOverrides, reporter, crStatusEventClient)
eventsClient, err := NewCloudEventsClientCRStatus(env, reporter, crStatusEventClient)
if err != nil {
logger.Fatal("Error building cloud event client", zap.Error(err))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func (ts *TriggerStatus) MarkDependencyNotConfigured() {
"DependencyNotConfigured", "Dependency has not yet been reconciled.")
}

func (ts *TriggerStatus) PropagateDependencyStatus(ks *duckv1.KResource) {
func (ts *TriggerStatus) PropagateDependencyStatus(ks *duckv1.Source) {
kc := ks.Status.GetCondition(apis.ConditionReady)
if kc == nil {
ts.MarkDependencyNotConfigured()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,24 @@ import (
"context"

"knative.dev/pkg/apis"
"knative.dev/pkg/resolver"
)

// sinkURIKey is used as the key for associating information
// with a context.Context.
type sinkURIKey struct{}
type resolverKey struct{}

// WithSinkURI notes on the context for binding that the resolved SinkURI
// is the provided apis.URL.
func WithSinkURI(ctx context.Context, uri *apis.URL) context.Context {
return context.WithValue(ctx, sinkURIKey{}, uri)
}

func WithURIResolver(ctx context.Context, resolver *resolver.URIResolver) context.Context {
return context.WithValue(ctx, resolverKey{}, resolver)
}

// GetSinkURI accesses the apis.URL for the Sink URI that has been associated
// with this context.
func GetSinkURI(ctx context.Context) *apis.URL {
Expand All @@ -41,3 +47,11 @@ func GetSinkURI(ctx context.Context) *apis.URL {
}
return value.(*apis.URL)
}

func GetURIResolver(ctx context.Context) *resolver.URIResolver {
value := ctx.Value(resolverKey{})
if value == nil {
return nil
}
return value.(*resolver.URIResolver)
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"fmt"

"go.uber.org/zap"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime/schema"

Expand All @@ -33,7 +32,9 @@ import (
"knative.dev/pkg/tracker"
)

var sbCondSet = apis.NewLivingConditionSet()
var sbCondSet = apis.NewLivingConditionSet(
SinkBindingConditionSinkProvided,
)

// GetConditionSet retrieves the condition set for this resource. Implements the KRShaped interface.
func (*SinkBinding) GetConditionSet() apis.ConditionSet {
Expand Down Expand Up @@ -82,16 +83,31 @@ func (sbs *SinkBindingStatus) MarkBindingAvailable() {
sbCondSet.Manage(sbs).MarkTrue(SinkBindingConditionReady)
}

// MarkSink sets the condition that the source has a sink configured.
func (sbs *SinkBindingStatus) MarkSink(uri *apis.URL) {
sbs.SinkURI = uri
if uri != nil {
sbCondSet.Manage(sbs).MarkTrue(SinkBindingConditionSinkProvided)
} else {
sbCondSet.Manage(sbs).MarkFalse(SinkBindingConditionSinkProvided, "SinkEmpty", "Sink has resolved to empty.%s", "")
}
}

// Do implements psbinding.Bindable
func (sb *SinkBinding) Do(ctx context.Context, ps *duckv1.WithPod) {
// First undo so that we can just unconditionally append below.
sb.Undo(ctx, ps)

uri := GetSinkURI(ctx)
if uri == nil {
logging.FromContext(ctx).Errorf("No sink URI associated with context for %+v", sb)
resolver := GetURIResolver(ctx)
if resolver == nil {
logging.FromContext(ctx).Errorf("No Resolver associated with context for sink: %+v", sb)
}
uri, err := resolver.URIFromDestinationV1(ctx, sb.Spec.Sink, sb)
if err != nil {
logging.FromContext(ctx).Errorw("URI could not be extracted from destination: ", zap.Error(err))
return
}
sb.Status.MarkSink(uri)

var ceOverrides string
if sb.Spec.CloudEventOverrides != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ const (
// SinkBindingConditionReady is configured to indicate whether the Binding
// has been configured for resources subject to its runtime contract.
SinkBindingConditionReady = apis.ConditionReady

// SinkBindingConditionSinkProvided is configured to indicate whether the
// sink has been properly extracted from the resolver.
SinkBindingConditionSinkProvided apis.ConditionType = "SinkProvided"
)

// SinkBindingStatus communicates the observed state of the SinkBinding (from the controller).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,12 @@ func (s *HttpMessageSender) SendWithRetries(req *nethttp.Request, config *RetryC
},
}

return retryableClient.Do(&retryablehttp.Request{Request: req})
retryableReq, err := retryablehttp.FromRequest(req)
if err != nil {
return nil, err
}

return retryableClient.Do(retryableReq)
}

func NoRetries() RetryConfig {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"knative.dev/pkg/reconciler"

eventingv1beta1 "knative.dev/eventing/pkg/apis/eventing/v1beta1"
"knative.dev/eventing/test/lib/recordevents"

testlib "knative.dev/eventing/test/lib"
"knative.dev/eventing/test/lib/duck"
Expand Down Expand Up @@ -82,8 +83,7 @@ func triggerV1Beta1BeforeBrokerHelper(triggerName string, client *testlib.Client
const etLogger = "logger"
const loggerPodName = "logger-pod"

logPod := resources.EventRecordPod(loggerPodName)
client.CreatePodOrFail(logPod, testlib.WithService(loggerPodName))
_ = recordevents.DeployEventRecordOrFail(context.TODO(), client, loggerPodName)
client.WaitForAllTestResourcesReadyOrFail(context.Background()) // Can't do this for the trigger because it's not 'ready' yet
client.CreateTriggerOrFailV1Beta1(triggerName,
resources.WithAttributesTriggerFilterV1Beta1(eventingv1beta1.TriggerAnyFilter, etLogger, map[string]interface{}{}),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,14 +276,16 @@ func BrokerV1Beta1ConsumerDataPlaneTestHelper(
}

transformMsg := []byte(`{"msg":"Transformed!"}`)
transformPod := resources.EventTransformationPod(
recordevents.DeployEventRecordOrFail(
ctx,
client,
transformerName,
"reply-check-type",
"reply-check-source",
transformMsg,
recordevents.ReplyWithTransformedEvent(
"reply-check-type",
"reply-check-source",
string(transformMsg),
),
)
client.CreatePodOrFail(transformPod, testlib.WithService(transformerName))
client.WaitForAllTestResourcesReadyOrFail(ctx)

trigger := client.CreateTriggerOrFailV1Beta1(
triggerName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"knative.dev/eventing/pkg/utils"
tracinghelper "knative.dev/eventing/test/conformance/helpers/tracing"
testlib "knative.dev/eventing/test/lib"
"knative.dev/eventing/test/lib/recordevents"
"knative.dev/eventing/test/lib/resources"
"knative.dev/eventing/test/lib/sender"
)
Expand Down Expand Up @@ -81,8 +82,7 @@ func setupBrokerTracing(ctx context.Context, brokerClass string) SetupTracingTes
)

// Create a logger (EventRecord) Pod and a K8s Service that points to it.
logPod := resources.EventRecordPod(loggerPodName)
client.CreatePodOrFail(logPod, testlib.WithService(loggerPodName))
_ = recordevents.DeployEventRecordOrFail(ctx, client, loggerPodName)

// Create a Trigger that receives events (type=bar) and sends them to the logger Pod.
loggerTrigger := client.CreateTriggerOrFailV1Beta1(
Expand All @@ -92,15 +92,18 @@ func setupBrokerTracing(ctx context.Context, brokerClass string) SetupTracingTes
resources.WithSubscriberServiceRefForTriggerV1Beta1(loggerPodName),
)

// Create a transformer (EventTransfrmer) Pod that replies with the same event as the input,
// Create a transformer Pod (recordevents with transform reply) that replies with the same event as the input,
// except the reply's event's type is changed to bar.
eventTransformerPod := resources.EventTransformationPod(
eventTransformerPod := recordevents.DeployEventRecordOrFail(
ctx,
client,
"transformer",
etLogger,
senderName,
[]byte(eventBody),
recordevents.ReplyWithTransformedEvent(
etLogger,
senderName,
eventBody,
),
)
client.CreatePodOrFail(eventTransformerPod, testlib.WithService(eventTransformerPod.Name))

// Create a Trigger that receives events (type=foo) and sends them to the transformer Pod.
transformerTrigger := client.CreateTriggerOrFailV1Beta1(
Expand Down
Loading

0 comments on commit dd3a4ff

Please sign in to comment.