Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How can catch put and delete events with Watch? #853

Closed
cleverpig opened this issue Nov 16, 2020 · 14 comments
Closed

How can catch put and delete events with Watch? #853

cleverpig opened this issue Nov 16, 2020 · 14 comments

Comments

@cleverpig
Copy link

I made a watch with jetcd 0.5.0, and process event with LoggingWatchListener.
but It only call when any event complete, how can do like etcdctl:
image

Here is my code:

public void keepWatch(String key){
        if (watchMap.containsKey(key)==false) {
            Watch watch = etcdClient.getWatchClient();
            ByteSequence keyBS = ByteSequence.from(key, StandardCharsets.UTF_8);
            Watch.Watcher watcher=watch.watch(
                    keyBS,
                    WatchOption.DEFAULT,
                    new LoggingWatchListener());
            log.debug("watch on key[{}]",key);
        }
        else{
            log.debug("we had watched on key[{}]",key);
        }
    }
class LoggingWatchListener implements Watch.Listener {
        @Override
        public void onNext(WatchResponse response) {
            log.info("watch[listener] - nextEvent");
            for(WatchEvent event:response.getEvents()){
                log.debug("watch[listener] - nextEvent - type:{}, KV:{}, PreKV:{}",
                        event.getEventType(),event.getKeyValue(),event.getPrevKV()
                );
            }

        }

        @Override
        public void onError(Throwable throwable) {
            log.error("watch[listener] - errorEvent - something wrong: {}",throwable.getMessage(),throwable);
        }

        @Override
        public void onCompleted() {
            log.debug("watch[listener] - completedEvent");
        }
    }
@lburgazzoli
Copy link
Collaborator

What do you mean exactly ?

@cleverpig
Copy link
Author

What do you mean exactly ?

I mean How to monitor and process events with etcd watcher?
I tried the Listener to listen events, but can't get event type and more event detail.
So I tried watch.watch method with consumer parameter, but it doesn't work.

@lburgazzoli
Copy link
Collaborator

can you please try to update to the latest release ?
we do have some tests here you may want to check: https://github.com/etcd-io/jetcd/blob/master/jetcd-core/src/test/java/io/etcd/jetcd/WatchTest.java#L50

@cleverpig
Copy link
Author

can you please try to update to the latest release ?
we do have some tests here you may want to check: https://github.com/etcd-io/jetcd/blob/master/jetcd-core/src/test/java/io/etcd/jetcd/WatchTest.java#L50

Yes, I'd checked out the master and compile & install it.
But Listener doesn't fire "onNext" event still. I debuged my code, and found it never call "onNext" method in WatcherImpl class.
image

@lburgazzoli
Copy link
Collaborator

how do you set up the watcher ?

@lburgazzoli
Copy link
Collaborator

can you provide a reproducer ? a failing test would be very welcome

@cleverpig
Copy link
Author

Sure!
1, Maven dependencies:

<dependencies>
	<!-- etcd client -->
    <dependency>
      <groupId>io.etcd</groupId>
      <artifactId>jetcd-core</artifactId>
      <version>0.5.0</version>
    </dependency>
	
	<!-- test -->
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.12</version>
      <scope>test</scope>
    </dependency>
	
	<!-- logging -->
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-log4j12</artifactId>
      <version>1.7.5<</version>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>jcl-over-slf4j</artifactId>
      <version>1.7.5<</version>
    </dependency>
	
	<!-- misc -->
	<dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      <version>1.18.16</version>
      <scope>provided</scope>
    </dependency>
</dependencies>

2,etcdUtil and testCase source code

@lburgazzoli
Copy link
Collaborator

lburgazzoli commented Nov 23, 2020

I've create the following example:

@Grab(group = 'org.slf4j', module = 'slf4j-simple', version = '1.7.30')
@Grab(group = 'io.etcd', module = 'jetcd-core', version = '0.5.4')

import io.etcd.jetcd.ByteSequence
import io.etcd.jetcd.Client
import io.etcd.jetcd.Watch
import io.etcd.jetcd.options.WatchOption
import io.etcd.jetcd.watch.WatchResponse
import org.slf4j.LoggerFactory
import java.nio.charset.StandardCharsets

def log = LoggerFactory.getLogger('watcher')
def client = Client.builder().endpoints(args[0]).build()
def watch = client.getWatchClient()
def kv = client.getKVClient()
def key = ByteSequence.from(UUID.randomUUID().toString(), StandardCharsets.UTF_8)

watch.watch(
    key,
    WatchOption.newBuilder().withProgressNotify(true).withPrevKV(true).build(),
    new Watch.Listener() {
        @Override
        void onNext(WatchResponse response) {
            log.info("onNext")
            response.events.each {
                log.info("""
                    nextEvent: 
                        type: ${it.getEventType()}
                        KV: ${it.keyValue.key.toString(StandardCharsets.UTF_8)} -> ${it.keyValue.value.toString(StandardCharsets.UTF_8)}
                        PreKV: ${it.prevKV.key.toString(StandardCharsets.UTF_8)} -> ${it.prevKV.value.toString(StandardCharsets.UTF_8)}
                """.stripIndent())
            }
        }

        @Override
        void onError(Throwable throwable) {
            log.error("onError - something wrong: {}", throwable.message, throwable);
        }

        @Override
        void onCompleted() {
            log.debug("onCompleted");
        }
    }
)

while (true) {
    kv.put(key, ByteSequence.from(UUID.randomUUID().toString(), StandardCharsets.UTF_8))
    Thread.sleep(1000)
}

It requires groovy and if I run it with:

groovy watcher.groovy http://localhost:2379

I get something like:

[grpc-default-executor-1] INFO watcher - onNext
[grpc-default-executor-1] INFO watcher - 
nextEvent: 
    type: PUT
    KV: 3d3cfdca-432f-47c8-af5a-f95d6111e47a -> 3e77bcdd-0f43-400c-a8c5-cc90d12641be
    PreKV:  -> 

[grpc-default-executor-1] INFO watcher - onNext
[grpc-default-executor-1] INFO watcher - 
nextEvent: 
    type: PUT
    KV: 3d3cfdca-432f-47c8-af5a-f95d6111e47a -> ad0b437b-41be-4cca-aa0c-3ff942c82174
    PreKV: 3d3cfdca-432f-47c8-af5a-f95d6111e47a -> 3e77bcdd-0f43-400c-a8c5-cc90d12641be

So to me it appears to work, am I missing something ?

@cleverpig
Copy link
Author

cleverpig commented Nov 24, 2020

I translated groove to java class, this doesn't work for me.

public void testKeepWatch2() throws InterruptedException {
        Client client = Client.builder().endpoints("http://localhost:2379").build();
        Watch watch = client.getWatchClient();
        KV kv = client.getKVClient();
        ByteSequence key = ByteSequence.from(UUID.randomUUID().toString(), StandardCharsets.UTF_8);

        watch.watch(
                key,
                WatchOption.newBuilder().withProgressNotify(true).withPrevKV(true).build(),
                new Watch.Listener() {
                    @Override
                    public void onNext(WatchResponse response) {
                        log.info("onNext");
                    }

                    @Override
                    public void onError(Throwable throwable) {
                        log.error("onError - something wrong: {}", throwable.getMessage(), throwable);
                    }

                    @Override
                    public void onCompleted() {
                        log.debug("onCompleted");
                    }
                }
        );

        for(int i=0;i<99;i++){
            kv.put(key, ByteSequence.from(UUID.randomUUID().toString(), StandardCharsets.UTF_8));
            log.debug("put [{}]times",i+1);
            Thread.sleep(1000);
        }
    }

logging output without any listener's event:

2020-11-24 11:22:50,669 DEBUG [com.bjinfotech.practice.etcd.EtcdUtilTest]:116 - <put [1]times>
2020-11-24 11:22:51,674 DEBUG [com.bjinfotech.practice.etcd.EtcdUtilTest]:116 - <put [2]times>
2020-11-24 11:22:52,674 DEBUG [com.bjinfotech.practice.etcd.EtcdUtilTest]:116 - <put [3]times>
2020-11-24 11:22:53,674 DEBUG [com.bjinfotech.practice.etcd.EtcdUtilTest]:116 - <put [4]times>
2020-11-24 11:22:54,674 DEBUG [com.bjinfotech.practice.etcd.EtcdUtilTest]:116 - <put [5]times>
2020-11-24 11:22:55,674 DEBUG [com.bjinfotech.practice.etcd.EtcdUtilTest]:116 - <put [6]times>
2020-11-24 11:22:56,674 DEBUG [com.bjinfotech.practice.etcd.EtcdUtilTest]:116 - <put [7]times>
2020-11-24 11:22:57,674 DEBUG [com.bjinfotech.practice.etcd.EtcdUtilTest]:116 - <put [8]times>
2020-11-24 11:22:58,674 DEBUG [com.bjinfotech.practice.etcd.EtcdUtilTest]:116 - <put [9]times>
....

etcd server output:

2020-11-24 11:22:51.602118 D | etcdserver/api/v3rpc: start time = 2020-11-24 11:22:51.6021187 +0800 CST m=+1093.828563401, time spent = 0s, remote = 127.0.0.1:54043, response type = /etcdserverpb.KV/Put, request count = 1, request size = 76, response count = 0, response size = 28, request content = key:"c291e080-825b-4990-a2f1-f1666d1d7cd8" value_size:36
2020-11-24 11:22:51.722125 D | etcdserver/api/v3rpc: start time = 2020-11-24 11:22:51.677123 +0800 CST m=+1093.903567701, time spent = 45.0026ms, remote = 127.0.0.1:54043, response type = /etcdserverpb.KV/Put, request count = 1, request size = 76, response count = 0, response size = 28, request content = key:"c291e080-825b-4990-a2f1-f1666d1d7cd8" value_size:36
2020-11-24 11:22:52.678180 D | etcdserver/api/v3rpc: start time = 2020-11-24 11:22:52.6781803 +0800 CST m=+1094.904625001, time spent = 0s, remote = 127.0.0.1:54043, response type = /etcdserverpb.KV/Put, request count = 1, request size = 76, response count = 0, response size = 28, request content = key:"c291e080-825b-4990-a2f1-f1666d1d7cd8" value_size:36
2020-11-24 11:22:53.677237 D | etcdserver/api/v3rpc: start time = 2020-11-24 11:22:53.6762373 +0800 CST m=+1095.902682001, time spent = 1.0001ms, remote = 127.0.0.1:54043, response type = /etcdserverpb.KV/Put, request count = 1, request size = 76, response count = 0, response size = 28, request content = key:"c291e080-825b-4990-a2f1-f1666d1d7cd8" value_size:36
2020-11-24 11:22:54.678294 D | etcdserver/api/v3rpc: start time = 2020-11-24 11:22:54.6782947 +0800 CST m=+1096.904739401, time spent = 0s, remote = 127.0.0.1:54043, response type = /etcdserverpb.KV/Put, request count = 1, request size = 76, response count = 0, response size = 28, request content = key:"c291e080-825b-4990-a2f1-f1666d1d7cd8" value_size:36
2020-11-24 11:22:55.677351 D | etcdserver/api/v3rpc: start time = 2020-11-24 11:22:55.6773518 +0800 CST m=+1097.903796501, time spent = 0s, remote = 127.0.0.1:54043, response type = /etcdserverpb.KV/Put, request count = 1, request size = 76, response count = 0, response size = 28, request content = key:"c291e080-825b-4990-a2f1-f1666d1d7cd8" value_size:36
2020-11-24 11:22:56.678409 D | etcdserver/api/v3rpc: start time = 2020-11-24 11:22:56.6784091 +0800 CST m=+1098.904853801, time spent = 0s, remote = 127.0.0.1:54043, response type = /etcdserverpb.KV/Put, request count = 1, request size = 76, response count = 0, response size = 28, request content = key:"c291e080-825b-4990-a2f1-f1666d1d7cd8" value_size:36

Btw, which etcdServer version are your? I tried 3.3.25 and 3.4.13.

@lburgazzoli
Copy link
Collaborator

I've tested with both 3.3 and 3.4, command line to run it is:

 docker run --rm -ti --name etcd \
    --publish 4001:4001 \
    --publish 2380:2380 \
    --publish 2379:2379 \
    gcr.io/etcd-development/etcd:v3.4.13 \
        etcd \
            -debug \
            -name etcdv3 \
            -advertise-client-urls http://127.0.0.1:2379,http://127.0.0.1:4001 \
            -listen-client-urls http://0.0.0.0:2379,http://0.0.0.0:4001 \
            -initial-advertise-peer-urls http://127.0.0.1:2380

Then you can try my example with jbang using a gist:

jbang https://gist.github.com/lburgazzoli/4398aef11cc2880daf4ad05a4dbe8f58

And it does work for me, I suspect you have some issue in how you start etcd

@cleverpig
Copy link
Author

I've tested with both 3.3 and 3.4, command line to run it is:

 docker run --rm -ti --name etcd \
    --publish 4001:4001 \
    --publish 2380:2380 \
    --publish 2379:2379 \
    gcr.io/etcd-development/etcd:v3.4.13 \
        etcd \
            -debug \
            -name etcdv3 \
            -advertise-client-urls http://127.0.0.1:2379,http://127.0.0.1:4001 \
            -listen-client-urls http://0.0.0.0:2379,http://0.0.0.0:4001 \
            -initial-advertise-peer-urls http://127.0.0.1:2380

Then you can try my example with jbang using a gist:

jbang https://gist.github.com/lburgazzoli/4398aef11cc2880daf4ad05a4dbe8f58

And it does work for me, I suspect you have some issue in how you start etcd

Jbang is ok.
But I change to the maven build with your code and etcd config, it doesn't work too...

@lburgazzoli
Copy link
Collaborator

So you have something wrong on your set-up.

Did you try to use the exact same dependencies I've used for jbang/groovy ?
I've noticed that you are using some old version of jetcd and slf4j.

@github-actions
Copy link

This issue is stale because it has been open 60 days with no activity.
Remove stale label or comment or this will be closed in 7 days.

@SadykovR
Copy link

SadykovR commented Jun 10, 2022

I have the same issue. I tried to put and get values and it works well, but Watch doesnt works - Watch.Listener doesn't fires when I put a value.
I use 0.5.0 version.
P.S. Also tried to add Watch and test it using etcdctl tool on the same server - works fine.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

No branches or pull requests

3 participants