-
Notifications
You must be signed in to change notification settings - Fork 0
/
init.go
65 lines (54 loc) · 1.81 KB
/
init.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package gopubsub
import (
"context"
"fmt"
"net/url"
"gocloud.dev/pubsub"
"gocloud.dev/pubsub/batcher"
)
func init() {
opener := new(URLOpener)
pubsub.DefaultURLMux().RegisterTopic(Scheme, opener)
pubsub.DefaultURLMux().RegisterSubscription(Scheme, opener)
}
// Scheme is the URL scheme that kafkapubsub registers its URLOpeners under on pubsub.DefaultMux.
const Scheme = "local"
// URLOpener opens Kafka URLs like "kafka://mytopic" for topics and
// "kafka://group?topic=mytopic" for subscriptions.
//
// For topics, the URL's host+path is used as the topic name.
//
// For subscriptions, the URL's host+path is used as the group name,
// and the "topic" query parameter(s) are used as the set of topics to
// subscribe to.
type URLOpener struct{}
var sendBatcherOpts = &batcher.Options{
MaxBatchSize: 100,
MaxHandlers: 2,
}
// OpenTopicURL opens a pubsub.Topic.
func (o *URLOpener) OpenTopic(ctx context.Context, topicName string) (*pubsub.Topic, error) {
t, err := openTopic(ctx, topicName)
if err != nil {
return nil, fmt.Errorf("cannot open local topic: %w", err)
}
return pubsub.NewTopic(t, sendBatcherOpts), nil
}
func (o *URLOpener) OpenTopicURL(ctx context.Context, u *url.URL) (*pubsub.Topic, error) {
return o.OpenTopic(ctx, u.Host)
}
var recvBatcherOpts = &batcher.Options{
MaxBatchSize: 1,
MaxHandlers: 1,
}
// OpenSubscriptionURL opens a pubsub.Subscription.
func (o *URLOpener) OpenSubscription(ctx context.Context, topicName string) (*pubsub.Subscription, error) {
s, err := OpenSubscription(ctx, topicName)
if err != nil {
return nil, fmt.Errorf("cannot open local subscription: %w", err)
}
return pubsub.NewSubscription(s, recvBatcherOpts, nil), nil
}
func (o *URLOpener) OpenSubscriptionURL(ctx context.Context, u *url.URL) (*pubsub.Subscription, error) {
return o.OpenSubscription(ctx, u.Host)
}