Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CRD watch stream starts processing very old deleted resources #693

Closed
max-rocket-internet opened this issue Nov 27, 2018 · 21 comments
Closed
Labels
lifecycle/stale Denotes an issue or PR has remained open with no activity and has become stale.

Comments

@max-rocket-internet
Copy link

max-rocket-internet commented Nov 27, 2018

I have a basic controller running to watch CustomObjectsApi but about 2 minutes after starting it starts processing old resources that were already deleted days ago.

Here's my code:

crds = client.CustomObjectsApi()
resource_version = ''

while True:
    logger.info('controller initializing')
    stream = watch.Watch().stream(crds.list_cluster_custom_object, 'postgresql.org', 'v1', 'pgdatabases', resource_version=resource_version)
    try:
        for event in stream:
            event_type = event["type"]
            obj = event["object"]
            metadata = obj.get('metadata')
            spec = obj.get('spec')
            code = obj.get('code')

            if code == 410:
                new_version = parse_too_old_failure(obj.get('message'))
                if new_version == None:
                    resource_version = ''
                    break
                else:
                    resource_version = new_version
                    logger.error('Updating resource version to {0} due to "too old" error'.format(new_version))

            if not metadata or not spec:
                logger.error('No metadata or spec in object, skipping: {0}'.format(json.dumps(obj, indent=1)))
                continue

            if metadata['resourceVersion'] is not None:
                resource_version = metadata['resourceVersion']
                logger.debug('resourceVersion now: {0}'.format(resource_version))

            logger.debug('{0}: {1} {2}'.format(event_type, resource_version, metadata.get('name')))

And the output is this:

{"time":"2018-11-27 11:50:15,570", "level":"INFO", "message":"controller initializing"}
{"time":"2018-11-27 11:50:16,163", "level":"DEBUG", "message":"resourceVersion now: 1147502"}
# This CRD resource (app-real) exists in the cluster
{"time":"2018-11-27 11:50:16,163", "level":"DEBUG", "message":"ADDED: 1147502 app-real"}
# Wait 2 mins...
{"time":"2018-11-27 11:51:37,431", "level":"DEBUG", "message":"resourceVersion now: 3070373"}
{"time":"2018-11-27 11:51:37,431", "level":"DEBUG", "message":"ADDED: 3070373 app1"}
{"time":"2018-11-27 11:51:37,432", "level":"DEBUG", "message":"resourceVersion now: 3070374"}
{"time":"2018-11-27 11:51:37,432", "level":"DEBUG", "message":"ADDED: 3070374 app2"}

The problem is that the resources 3070373 (app1) & 3070374 (app2) were deleted from the cluster a long time ago. The don't show in kubectl get mycrd.

I have 2 controllers running watching the exact same CRD. One is running in the cluster and the one above is running on my workstation. Is the cause of this error the fact I'm running 2 controllers watching the same CRD? Or is there a mistake in my code?

Kubernetes 1.10.3
kubernetes-client/python: 8.0.0

@max-rocket-internet
Copy link
Author

I have 2 controllers running watching the exact same CRD. One is running in the cluster and the one above is running on my workstation. Is the cause of this error the fact I'm running 2 controllers watching the same CRD?

It's not the cause. I removed the second controller and this problem still persists.

@max-rocket-internet
Copy link
Author

@klarose is this the same problem as #609 ?

@klarose
Copy link

klarose commented Nov 28, 2018

I'm not sure it's the same problem, though I think it can lead to that problem. As far as I can tell, Kubernetes watch is sending something like a compacted transaction log to the watcher. So, from the last point of compaction, you see all modifications to a given resource, in order. Since Kubernetes cares about eventual consistency, I suspect it assumes that any consumers of the watch will process the events in such a way that only the final state truly matters, not the intermediate states.

Do you see DELETED after the ADDED? Or does it finish with ADDED? If it doesn't delete them, then yeah, there's a serious problem, because your application's state is now out of sync.

@max-rocket-internet
Copy link
Author

Thanks for the reply @klarose

Do you see DELETED after the ADDED? Or does it finish with ADDED?

For resources that are still present in k8s, the last event_type is ADDED. And for resources that have been removed from k8s, the last event_type is DELETED. So I guess that makes sense.

As far as I can tell, Kubernetes watch is sending something like a compacted transaction log to the watcher. So, from the last point of compaction, you see all modifications to a given resource, in order.

OK, I think I understand why but not how are we supposed to deal with this in the controller? If the controller is adding and removing something based on these events, e.g. a database, or some configuration, then replaying the events (even in correct order) could be detrimental.

In my example above, it's reprocessing events from days ago. If the same resources was added as name app1, then deleted and then added again as app1, then the controller could end up making destructive changes, right?

@klarose
Copy link

klarose commented Nov 28, 2018

Hmmm. I'm not sure. Pretty fundamental to Kubernetes is the idea that you only care about eventual consistency. So if you're doing something non-idempotent with the results of the watch, you could be in for some pain. :)

That said, how about this?

  1. When you start, do a list of all objects, with no watch. If that only gives the current objects in their most recent state, then the next steps will actually work. :) Note the time immediately before you run the list, in the timezone of the k8s cluster
  2. Operate on the results of #1 like you normally would. Track the maximum resource version.
  3. Start your watch from the maximum resource version from #2.
  4. Discard any adds for resources whose creation timestamp is before the time you noted before.

This is pretty clunky. It'd be better if the initial list would also return the maximum resource version across all resources. You could then just start the watch from that point... Is there an API to do that?

@klarose
Copy link

klarose commented Nov 28, 2018

Actually, maybe the API does do that?

kyle@khazad-dum:~/test/kubernetes_python$ kubectl apply -f test_def.yaml
kyle@khazad-dum:~/test/kubernetes_python$ kubectl apply -f hobbit.yaml
kyle@khazad-dum:~/test/kubernetes_python$ python3 test.py 
{'apiVersion': 'test.example/v1', 'kind': 'Hobbit', 'metadata': {'annotations': {'kubectl.kubernetes.io/last-applied-configuration': '{"apiVersion":"test.example/v1","kind":"Hobbit","metadata":{"annotations":{},"name":"bilbo-baggins","namespace":"default"},"spec":{"items":{"weapon":{"name":"Sting","type":"ShortSword"}}}}\n'}, 'clusterName': '', 'creationTimestamp': '2018-11-28T16:40:36Z', 'generation': 1, 'name': 'bilbo-baggins', 'namespace': 'default', 'resourceVersion': '8655440', 'selfLink': '/apis/test.example/v1/namespaces/default/hobbitses/bilbo-baggins', 'uid': '5502e41f-f32c-11e8-b388-0a58ac1f00cb'}, 'spec': {'items': {'weapon': {'name': 'Sting', 'type': 'ShortSword'}}}}
{'apiVersion': 'test.example/v1', 'kind': 'Hobbit', 'metadata': {'annotations': {'kubectl.kubernetes.io/last-applied-configuration': '{"apiVersion":"test.example/v1","kind":"Hobbit","metadata":{"annotations":{},"name":"frodo-baggins","namespace":"default"},"spec":{"items":{"weapon":{"name":"TheOneRing","type":"RingOfPower"}}}}\n'}, 'clusterName': '', 'creationTimestamp': '2018-11-28T16:23:27Z', 'generation': 1, 'name': 'frodo-baggins', 'namespace': 'default', 'resourceVersion': '8651977', 'selfLink': '/apis/test.example/v1/namespaces/default/hobbitses/frodo-baggins', 'uid': 'ef8a27db-f329-11e8-b388-0a58ac1f00cb'}, 'spec': {'items': {'weapon': {'name': 'TheOneRing', 'type': 'RingOfPower'}}}}
Resouce_version: 8655451
kyle@khazad-dum:~/test/kubernetes_python$ kubectl delete hobbit bilbo-baggins
hobbit.test.example "bilbo-baggins" deleted
kyle@khazad-dum:~/test/kubernetes_python$ python3 test.py 
{'apiVersion': 'test.example/v1', 'kind': 'Hobbit', 'metadata': {'annotations': {'kubectl.kubernetes.io/last-applied-configuration': '{"apiVersion":"test.example/v1","kind":"Hobbit","metadata":{"annotations":{},"name":"frodo-baggins","namespace":"default"},"spec":{"items":{"weapon":{"name":"TheOneRing","type":"RingOfPower"}}}}\n'}, 'clusterName': '', 'creationTimestamp': '2018-11-28T16:23:27Z', 'generation': 1, 'name': 'frodo-baggins', 'namespace': 'default', 'resourceVersion': '8651977', 'selfLink': '/apis/test.example/v1/namespaces/default/hobbitses/frodo-baggins', 'uid': 'ef8a27db-f329-11e8-b388-0a58ac1f00cb'}, 'spec': {'items': {'weapon': {'name': 'TheOneRing', 'type': 'RingOfPower'}}}}
Resouce_version: 8655521

Note how Resource_version is the greater than before and greater than the resource_version of bilbo-baggins.

Unfortunately, the generated API docs aren't really clear on what is really returned here...
Anyway, here's the code and yaml if you want to look at it: test.tar.gz

@max-rocket-internet
Copy link
Author

Actually, maybe the API does do that?

In your example above, did you wait 5 mins after starting the test.py again? Because I have the same output as you BUT later, after say 2-5 mins, then it starts reprocessing old events.

@max-rocket-internet
Copy link
Author

That said, how about this? 1, 2, 3, 4

Cool but, as you said, that's quite clunky. And from the examples I read, they don't implement this behaviour. E.g. https://github.com/karmab/samplecontroller/blob/master/controller.py from @karmab

@klarose
Copy link

klarose commented Nov 28, 2018

I'm not doing a watch. This is a simple list. So, the list is giving me the current state and the most recent resource version independent of the current state (I.e. it captures anything that has been deleted since the last updated to an existing resource).

For example, imagine we had something like this:

Add A: resource version 1.
Add B: resource version 2.
Add C: resource version 3.

Delete C: resource version 4.

Now, if we list the resource, we see:

A: resource version 1.
B: resource version 2.
Max resource version: 5.

If we do a watch from 0, we see:

1: Add A
2: Add B
(some time)
3: Add C
4: Delete C

However, if we do this:

current_resources_return = list()
for item in current_resources_return.items:
  process_resource("add", item)

start_resource = current_resources_return.Resouce_version # returns 5
watch(resource=start_resource, ...)

Then we will never see versions 3 and 4, because we will see 1 and 2 from the list, then skip 3 and 4 when we start watching. Further, any resources that were added after we did the list will have a version greater than or equal to 5, meaning that we'll see them in the watch.

@klarose
Copy link

klarose commented Nov 28, 2018

Actually, maybe that is the fix for both the issues. :)

https://github.com/kubernetes-client/python-base/blob/master/watch/watch.py#L135

Perhaps here, rather than tracking the resource version of each object, maybe it should be parsing out the resourceVersion from the result of func()?

@max-rocket-internet
Copy link
Author

@klarose

Perhaps here, rather than tracking the resource version of each object, maybe it should be parsing out the resourceVersion from the result of func()?

Isn't that what my code example is doing though? with these lines?

            if metadata['resourceVersion'] is not None:
                resource_version = metadata['resourceVersion']

@klarose
Copy link

klarose commented Nov 29, 2018

@max-rocket-internet That is the resourceVersion of the specific event, not the list itself.

In my example, I get the resourceVersion of the list itself, not the items within the list. It is different. I tried to decode what was being returned here from the kubernetes docs, but they don't seem to have a definition for this API. The python client docs only describe the return type as an "object", sadly.

Either way, here is what the result of the list returns right now for me:

{'apiVersion': 'test.example/v1',
 'items': [{'apiVersion': 'test.example/v1',
            'kind': 'Hobbit',
            'metadata': {'annotations': {'kubectl.kubernetes.io/last-applied-configuration': '{"apiVersion":"test.example/v1","kind":"Hobbit","metadata":{"annotations":{},"name":"frodo-baggins","namespace":"default"},"spec":{"items":{"weapon":{"name":"TheOneRing","type":"RingOfPower"}}}}\n'},
                         'clusterName': '',
                         'creationTimestamp': '2018-11-28T16:23:27Z',
                         'generation': 1,
                         'name': 'frodo-baggins',
                         'namespace': 'default',
                         'resourceVersion': '8651977',
                         'selfLink': '/apis/test.example/v1/namespaces/default/hobbitses/frodo-baggins',
                         'uid': 'ef8a27db-f329-11e8-b388-0a58ac1f00cb'},
            'spec': {'items': {'weapon': {'name': 'TheOneRing',
                                          'type': 'RingOfPower'}}}}],
 'kind': 'HobbitList',
 'metadata': {'continue': '',
              'resourceVersion': '8940412',
              'selfLink': '/apis/test.example/v1/namespaces//hobbitses'}}

Note how list.metadata.resourceVersion (8940412) is much newer than the only resource in that list (which has resourceVersion 8651977). This leads me to conclude that it is actually tracking a monotonically increasing global version which points to the latest event version across all resources. So, if we start the watch at that version, we should skip any intermediate adds/deletes that happened since the last operation on the objects in items[].

@klarose
Copy link

klarose commented Nov 29, 2018

I suspect that this is the type we're seeing there: https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.12/#listmeta-v1-meta

It describes resourceVersion as:

String that identifies the server's internal version of this object that can be used by clients to determine when objects have changed. Value must be treated as opaque by clients and passed unmodified back to the server. Populated by the system. Read-only. More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#concurrency-control-and-consistency

The link in there has much more info on resourceVersions. It explains some things I've noticed about the resourceVersion I wasn't sure about (namely, it being a global value). There is one very important tidbit at the end:

"Watch" operations specify resourceVersion using a query parameter. It is used to specify the point at which to begin watching the specified resources. This may be used to ensure that no mutations are missed between a GET of a resource (or list of resources) and a subsequent Watch, even if the current version of the resource is more recent. This is currently the main reason that list operations (GET on a collection) return resourceVersion.

I think this implies that my solution is the correct one:

Start the watch with a list, then follow up with a watch from the list's resource version.

@max-rocket-internet
Copy link
Author

Thanks for the replies @klarose

In my example, I get the resourceVersion of the list itself, not the items within the list. It is different.

OK.

Is there no examples of this though? Because the existing guitar one and the comments I found don't use this method. Would be nice to have an example with a confirmed correct approach!

@klarose
Copy link

klarose commented Nov 30, 2018

I'm not aware of any. Given that you can reproduce it, maybe you can be the one to give the example! :)

@juliantaylor
Copy link
Contributor

I have posted a reproducing script in #700

@juliantaylor
Copy link
Contributor

fix posted: kubernetes-client/python-base#109

@max-rocket-internet
Copy link
Author

@juliantaylor so to be clear: when your PR is merged, then I will not have to worry about my watcher getting old resource versions? i.e. if I start watching NOW, I will not receive events for old resources?

@juliantaylor
Copy link
Contributor

juliantaylor commented Dec 12, 2018

When the fix is merged the watch object will respect the resource_version you give it. So it will not return objects that have a lower resource version than you have provided in the initial watch call. Before it always restarted at resource version zero getting old (potentially already deleted) objects)

It does not handle potential out of order objects which could again lead to receiving old objects. But I am not sure if that can happen nor can I reproduce that. (but it can probably happen: https://coreos.com/etcd/docs/latest/learning/api_guarantees.html)
That issue could be fixed either by just using max again or nicer place the objects in a priority queue and return them to the client in the correct order (poc implemented here https://github.com/juliantaylor/python-base/tree/unordered-events).

@fejta-bot
Copy link

Issues go stale after 90d of inactivity.
Mark the issue as fresh with /remove-lifecycle stale.
Stale issues rot after an additional 30d of inactivity and eventually close.

If this issue is safe to close now please do so with /close.

Send feedback to sig-testing, kubernetes/test-infra and/or fejta.
/lifecycle stale

@k8s-ci-robot k8s-ci-robot added the lifecycle/stale Denotes an issue or PR has remained open with no activity and has become stale. label Apr 27, 2019
@max-rocket-internet
Copy link
Author

This is resolved in latest releases for me. Thanks for the support!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
lifecycle/stale Denotes an issue or PR has remained open with no activity and has become stale.
Projects
None yet
Development

No branches or pull requests

5 participants