Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ISSUE-432: regexp subscription is recreating deleted topics when it shouldn't #133

Open
sijie opened this issue Jan 4, 2021 · 0 comments

Comments

@sijie
Copy link
Member

sijie commented Jan 4, 2021

Original Issue: apache#432


Steps to reproduce

Initialize a standalone cluster and create some entities:

$ pulsarctl tenant create -c standalone testtenant
$ pulsarctl namespace create testtenant/logs

Create a client with the following code (I have it in a directory called rexconsumer):

package main
import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"os"
	"os/signal"
	"time"
	"github.com/apache/pulsar-client-go/pulsar"
)
// TopicWildcardName returns a fully-qualified pulsar topic string
func TopicWildcardName(tenant, namespace string) string {
	return fmt.Sprintf("persistent://%s/%s/*", tenant, namespace)
}
func main() {
	log.SetFlags(0)
	client, err := pulsar.NewClient(pulsar.ClientOptions{
		URL: "pulsar://localhost:6650",
	})
	if err != nil {
		log.Fatal(err)
	}
	defer client.Close()
	topicpattern := TopicWildcardName(os.Args[1], os.Args[2])
	log.Println(topicpattern)
	consumer, err := client.Subscribe(pulsar.ConsumerOptions{
		TopicsPattern:               topicpattern,
		SubscriptionName:            "regexconsumer",
		Type:                        pulsar.Exclusive,
		SubscriptionInitialPosition: pulsar.SubscriptionPositionEarliest,
		AutoDiscoveryPeriod:         time.Second,
	})
	ctx, cFunc := context.WithCancel(context.Background())
	sig := make(chan os.Signal, 1)
	signal.Notify(sig, os.Interrupt, os.Kill)
	go func() {
		<-sig
		cFunc()
	}()
MainLoop:
	for {
		select {
		case <-ctx.Done():
			log.Println("closing gracefully")
			consumer.Close()
			break MainLoop
		default:
			// get message
		}
		msg, err := consumer.Receive(ctx)
		if err != nil {
			log.Println("consumer receive error:", err)
			continue
		}
                log.Printf("Payload:\n%s",string(msg.Payload()))
		consumer.Ack(msg)
	}
}

start the consumer:

$ ./rexconsumer testtenant logs
persistent://testtenant/logs/*
INFO[0000] [Connecting to broker]                        remote_addr="pulsar://localhost:6650"
INFO[0000] [TCP connection established]                  local_addr="[::1]:63062" remote_addr="pulsar://localhost:6650"
INFO[0000] [Connection is ready]

Create some topics and verify they are there:

$ pulsarctl topics create testtenant/logs/topic1 0
Create topic persistent://testtenant/logs/topic1 with 0 partitions successfully
$ pulsarctl topics create testtenant/logs/topic2 0
Create topic persistent://testtenant/logs/topic2 with 0 partitions successfully
$ pulsarctl topics list testtenant/logs
+-------------------------------------+---------------+
|             TOPIC NAME              | PARTITIONED ? |
+-------------------------------------+---------------+
| persistent://testtenant/logs/topic1 | N             |
| persistent://testtenant/logs/topic2 | N             |
+-------------------------------------+---------------+

Now force-delete one of the topics:

pulsarctl topics delete -f -n persistent://testtenant/logs/topic1 
Delete topic persistent://testtenant/logs/topic1 successfully

In the consumer you will see:

INFO[0430] Broker notification of Closed consumer: [1]   local_addr="[::1]:63062" remote_addr="pulsar://localhost:6650"
INFO[0430] [Reconnecting to broker in  100ms]            consumerID=1 name=wfidv subscription=regexconsumer topic="persistent://testtenant/logs/topic1"
INFO[0430] [Connected consumer]                          consumerID=1 name=wfidv subscription=regexconsumer topic="persistent://testtenant/logs/topic1"
INFO[0430] [Reconnected consumer to broker]              consumerID=1 name=wfidv subscription=regexconsumer topic="persistent://testtenant/logs/topic1"

And the topic is still there:

$ pulsarctl topics list testtenant/logs
+-------------------------------------+---------------+
|             TOPIC NAME              | PARTITIONED ? |
+-------------------------------------+---------------+
| persistent://testtenant/logs/topic1 | N             |
| persistent://testtenant/logs/topic2 | N             |
+-------------------------------------+---------------+

Expected behavior

My understanding from #226 is that regex/pattern consumers should be applying this flag so that they will not do this. The topic should not be re-created.

Actual behavior

As above. Topic is re-created.

System configuration

Pulsar version: 2.8.0-SNAPSHOT (head)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant