-
Notifications
You must be signed in to change notification settings - Fork 4.8k
/
new_delta_subscription_state.cc
407 lines (376 loc) · 19 KB
/
new_delta_subscription_state.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
#include "source/common/config/new_delta_subscription_state.h"
#include "envoy/event/dispatcher.h"
#include "envoy/service/discovery/v3/discovery.pb.h"
#include "source/common/common/assert.h"
#include "source/common/common/hash.h"
#include "source/common/config/utility.h"
#include "source/common/runtime/runtime_features.h"
namespace Envoy {
namespace Config {
NewDeltaSubscriptionState::NewDeltaSubscriptionState(std::string type_url,
UntypedConfigUpdateCallbacks& watch_map,
const LocalInfo::LocalInfo& local_info,
Event::Dispatcher& dispatcher)
// TODO(snowp): Hard coding VHDS here is temporary until we can move it away from relying on
// empty resources as updates.
: supports_heartbeats_(type_url != "envoy.config.route.v3.VirtualHost"),
ttl_(
[this](const auto& expired) {
Protobuf::RepeatedPtrField<std::string> removed_resources;
for (const auto& resource : expired) {
if (auto maybe_resource = getRequestedResourceState(resource);
maybe_resource.has_value()) {
maybe_resource->setAsWaitingForServer();
removed_resources.Add(std::string(resource));
} else if (const auto erased_count = wildcard_resource_state_.erase(resource) +
ambiguous_resource_state_.erase(resource);
erased_count > 0) {
removed_resources.Add(std::string(resource));
}
}
watch_map_.onConfigUpdate({}, removed_resources, "");
},
dispatcher, dispatcher.timeSource()),
type_url_(std::move(type_url)), watch_map_(watch_map), local_info_(local_info) {}
void NewDeltaSubscriptionState::updateSubscriptionInterest(
const absl::flat_hash_set<std::string>& cur_added,
const absl::flat_hash_set<std::string>& cur_removed) {
for (const auto& a : cur_added) {
if (in_initial_legacy_wildcard_ && a != Wildcard) {
in_initial_legacy_wildcard_ = false;
}
// If the requested resource existed as a wildcard resource,
// transition it to requested. Otherwise mark it as a resource
// waiting for the server to receive the version.
if (auto it = wildcard_resource_state_.find(a); it != wildcard_resource_state_.end()) {
requested_resource_state_.insert_or_assign(a, ResourceState::withVersion(it->second));
wildcard_resource_state_.erase(it);
} else if (it = ambiguous_resource_state_.find(a); it != ambiguous_resource_state_.end()) {
requested_resource_state_.insert_or_assign(a, ResourceState::withVersion(it->second));
ambiguous_resource_state_.erase(it);
} else {
requested_resource_state_.insert_or_assign(a, ResourceState::waitingForServer());
}
ASSERT(requested_resource_state_.contains(a));
ASSERT(!wildcard_resource_state_.contains(a));
ASSERT(!ambiguous_resource_state_.contains(a));
// If interest in a resource is removed-then-added (all before a discovery request
// can be sent), we must treat it as a "new" addition: our user may have forgotten its
// copy of the resource after instructing us to remove it, and need to be reminded of it.
names_removed_.erase(a);
names_added_.insert(a);
}
for (const auto& r : cur_removed) {
auto actually_erased = false;
// The resource we have lost the interest in could also come from our wildcard subscription. We
// just don't know it at this point. Instead of removing it outright, mark the resource as not
// interesting to us any more and the server will send us an update. If we don't have a wildcard
// subscription then there is no ambiguity and just drop the resource.
if (requested_resource_state_.contains(Wildcard)) {
if (auto it = requested_resource_state_.find(r); it != requested_resource_state_.end()) {
// Wildcard resources always have a version. If our requested resource has no version, it
// won't be a wildcard resource then. If r is Wildcard itself, then it never has a version
// attached to it, so it will not be moved to ambiguous category.
if (!it->second.isWaitingForServer()) {
ambiguous_resource_state_.insert_or_assign(it->first, it->second.version());
}
requested_resource_state_.erase(it);
actually_erased = true;
}
} else {
actually_erased = (requested_resource_state_.erase(r) > 0);
}
ASSERT(!requested_resource_state_.contains(r));
// Ideally, when interest in a resource is added-then-removed in between requests,
// we would avoid putting a superfluous "unsubscribe [resource that was never subscribed]"
// in the request. However, the removed-then-added case *does* need to go in the request,
// and due to how we accomplish that, it's difficult to distinguish remove-add-remove from
// add-remove (because "remove-add" has to be treated as equivalent to just "add").
names_added_.erase(r);
if (actually_erased) {
names_removed_.insert(r);
in_initial_legacy_wildcard_ = false;
}
}
// If we unsubscribe from wildcard resource, drop all the resources that came from wildcard from
// cache. Also drop the ambiguous resources - we aren't interested in those, but we didn't know if
// those came from wildcard subscription or not, but now it's not important any more.
if (cur_removed.contains(Wildcard)) {
wildcard_resource_state_.clear();
ambiguous_resource_state_.clear();
}
}
// Not having sent any requests yet counts as an "update pending" since you're supposed to resend
// the entirety of your interest at the start of a stream, even if nothing has changed.
bool NewDeltaSubscriptionState::subscriptionUpdatePending() const {
if (!names_added_.empty() || !names_removed_.empty()) {
return true;
}
// At this point, we have no new resources to subscribe to or any
// resources to unsubscribe from.
if (!any_request_sent_yet_in_current_stream_) {
// If we haven't sent anything on the current stream, but we are actually interested in some
// resource then we obviously need to let the server know about those.
if (!requested_resource_state_.empty()) {
return true;
}
// So there are no new names and we are interested in nothing. This may either mean that we want
// the legacy wildcard subscription to kick in or we actually unsubscribed from everything. If
// the latter is true, then we should not be sending any requests. In such case the initial
// wildcard mode will be false. Otherwise it means that the legacy wildcard request should be
// sent.
return in_initial_legacy_wildcard_;
}
// At this point, we have no changes in subscription resources and this isn't a first request in
// the stream, so even if there are no resources we are interested in, we can send the request,
// because even if it's empty, it won't be interpreted as legacy wildcard subscription, which can
// only for the first request in the stream. So sending an empty request at this point should be
// harmless.
return must_send_discovery_request_;
}
UpdateAck NewDeltaSubscriptionState::handleResponse(
const envoy::service::discovery::v3::DeltaDiscoveryResponse& message) {
// We *always* copy the response's nonce into the next request, even if we're going to make that
// request a NACK by setting error_detail.
UpdateAck ack(message.nonce(), type_url_);
TRY_ASSERT_MAIN_THREAD { handleGoodResponse(message); }
END_TRY
catch (const EnvoyException& e) {
handleBadResponse(e, ack);
}
return ack;
}
bool NewDeltaSubscriptionState::isHeartbeatResponse(
const envoy::service::discovery::v3::Resource& resource) const {
if (!supports_heartbeats_) {
return false;
}
if (resource.has_resource()) {
return false;
}
if (const auto maybe_resource = getRequestedResourceState(resource.name());
maybe_resource.has_value()) {
return !maybe_resource->isWaitingForServer() && resource.version() == maybe_resource->version();
}
if (const auto itr = wildcard_resource_state_.find(resource.name());
itr != wildcard_resource_state_.end()) {
return resource.version() == itr->second;
}
if (const auto itr = ambiguous_resource_state_.find(resource.name());
itr != wildcard_resource_state_.end()) {
// In theory we should move the ambiguous resource to wildcard, because probably we shouldn't be
// getting heartbeat responses about resources that we are not interested in, but the server
// could have sent this heartbeat before it learned about our lack of interest in the resource.
return resource.version() == itr->second;
}
return false;
}
void NewDeltaSubscriptionState::handleGoodResponse(
const envoy::service::discovery::v3::DeltaDiscoveryResponse& message) {
absl::flat_hash_set<std::string> names_added_removed;
Protobuf::RepeatedPtrField<envoy::service::discovery::v3::Resource> non_heartbeat_resources;
for (const auto& resource : message.resources()) {
if (!names_added_removed.insert(resource.name()).second) {
throw EnvoyException(
fmt::format("duplicate name {} found among added/updated resources", resource.name()));
}
if (isHeartbeatResponse(resource)) {
continue;
}
non_heartbeat_resources.Add()->CopyFrom(resource);
// DeltaDiscoveryResponses for unresolved aliases don't contain an actual resource
if (!resource.has_resource() && resource.aliases_size() > 0) {
continue;
}
if (message.type_url() != resource.resource().type_url()) {
throw EnvoyException(fmt::format("type URL {} embedded in an individual Any does not match "
"the message-wide type URL {} in DeltaDiscoveryResponse {}",
resource.resource().type_url(), message.type_url(),
message.DebugString()));
}
}
for (const auto& name : message.removed_resources()) {
if (!names_added_removed.insert(name).second) {
throw EnvoyException(
fmt::format("duplicate name {} found in the union of added+removed resources", name));
}
}
{
const auto scoped_update = ttl_.scopedTtlUpdate();
if (requested_resource_state_.contains(Wildcard)) {
for (const auto& resource : message.resources()) {
addResourceStateFromServer(resource);
}
} else {
// We are not subscribed to wildcard, so we only take resources that we explicitly requested
// and ignore the others.
for (const auto& resource : message.resources()) {
if (requested_resource_state_.contains(resource.name())) {
addResourceStateFromServer(resource);
}
}
}
}
watch_map_.onConfigUpdate(non_heartbeat_resources, message.removed_resources(),
message.system_version_info());
// If a resource is gone, there is no longer a meaningful version for it that makes sense to
// provide to the server upon stream reconnect: either it will continue to not exist, in which
// case saying nothing is fine, or the server will bring back something new, which we should
// receive regardless (which is the logic that not specifying a version will get you).
//
// So, leave the version map entry present but blank if we are still interested in the resource.
// It will be left out of initial_resource_versions messages, but will remind us to explicitly
// tell the server "I'm cancelling my subscription" when we lose interest. In case of resources
// received as a part of the wildcard subscription or resources we already lost interest in, we
// just drop them.
for (const auto& resource_name : message.removed_resources()) {
if (auto maybe_resource = getRequestedResourceState(resource_name);
maybe_resource.has_value()) {
maybe_resource->setAsWaitingForServer();
} else if (const auto erased_count = ambiguous_resource_state_.erase(resource_name);
erased_count == 0) {
wildcard_resource_state_.erase(resource_name);
}
}
ENVOY_LOG(debug, "Delta config for {} accepted with {} resources added, {} removed", type_url_,
message.resources().size(), message.removed_resources().size());
}
void NewDeltaSubscriptionState::handleBadResponse(const EnvoyException& e, UpdateAck& ack) {
// Note that error_detail being set is what indicates that a DeltaDiscoveryRequest is a NACK.
ack.error_detail_.set_code(Grpc::Status::WellKnownGrpcStatus::Internal);
ack.error_detail_.set_message(Config::Utility::truncateGrpcStatusMessage(e.what()));
ENVOY_LOG(warn, "delta config for {} rejected: {}", type_url_, e.what());
watch_map_.onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::UpdateRejected, &e);
}
void NewDeltaSubscriptionState::handleEstablishmentFailure() {
watch_map_.onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure,
nullptr);
}
envoy::service::discovery::v3::DeltaDiscoveryRequest
NewDeltaSubscriptionState::getNextRequestAckless() {
envoy::service::discovery::v3::DeltaDiscoveryRequest request;
must_send_discovery_request_ = false;
if (!any_request_sent_yet_in_current_stream_) {
any_request_sent_yet_in_current_stream_ = true;
const bool is_legacy_wildcard = isInitialRequestForLegacyWildcard();
// initial_resource_versions "must be populated for first request in a stream".
// Also, since this might be a new server, we must explicitly state *all* of our subscription
// interest.
for (auto const& [resource_name, resource_state] : requested_resource_state_) {
// Populate initial_resource_versions with the resource versions we currently have.
// Resources we are interested in, but are still waiting to get any version of from the
// server, do not belong in initial_resource_versions. (But do belong in new subscriptions!)
if (!resource_state.isWaitingForServer()) {
(*request.mutable_initial_resource_versions())[resource_name] = resource_state.version();
}
// We are going over a list of resources that we are interested in, so add them to
// resource_names_subscribe.
names_added_.insert(resource_name);
}
for (auto const& [resource_name, resource_version] : wildcard_resource_state_) {
(*request.mutable_initial_resource_versions())[resource_name] = resource_version;
}
for (auto const& [resource_name, resource_version] : ambiguous_resource_state_) {
(*request.mutable_initial_resource_versions())[resource_name] = resource_version;
}
// If this is a legacy wildcard request, then make sure that the resource_names_subscribe is
// empty.
if (is_legacy_wildcard) {
names_added_.clear();
}
names_removed_.clear();
}
std::copy(names_added_.begin(), names_added_.end(),
Protobuf::RepeatedFieldBackInserter(request.mutable_resource_names_subscribe()));
std::copy(names_removed_.begin(), names_removed_.end(),
Protobuf::RepeatedFieldBackInserter(request.mutable_resource_names_unsubscribe()));
names_added_.clear();
names_removed_.clear();
request.set_type_url(type_url_);
request.mutable_node()->MergeFrom(local_info_.node());
return request;
}
bool NewDeltaSubscriptionState::isInitialRequestForLegacyWildcard() {
if (in_initial_legacy_wildcard_) {
requested_resource_state_.insert_or_assign(Wildcard, ResourceState::waitingForServer());
ASSERT(requested_resource_state_.contains(Wildcard));
ASSERT(!wildcard_resource_state_.contains(Wildcard));
ASSERT(!ambiguous_resource_state_.contains(Wildcard));
return true;
}
// If we are here, this means that we lost our initial wildcard mode, because we subscribed to
// something in the past. We could still be in the situation now that all we are subscribed to now
// is wildcard resource, so in such case try to send a legacy wildcard subscription request
// anyway. For this to happen, two conditions need to apply:
//
// 1. No change in interest.
// 2. The only requested resource is Wildcard resource.
//
// The invariant of the code here is that this code is executed only when
// subscriptionUpdatePending actually returns true, which in our case can only happen if the
// requested resources state_ isn't empty.
ASSERT(!requested_resource_state_.empty());
// If our subscription interest didn't change then the first condition for using legacy wildcard
// subscription is met.
if (!names_added_.empty() || !names_removed_.empty()) {
return false;
}
// If we requested only a wildcard resource then the second condition for using legacy wildcard
// condition is met.
return requested_resource_state_.size() == 1 &&
requested_resource_state_.begin()->first == Wildcard;
}
envoy::service::discovery::v3::DeltaDiscoveryRequest
NewDeltaSubscriptionState::getNextRequestWithAck(const UpdateAck& ack) {
envoy::service::discovery::v3::DeltaDiscoveryRequest request = getNextRequestAckless();
request.set_response_nonce(ack.nonce_);
if (ack.error_detail_.code() != Grpc::Status::WellKnownGrpcStatus::Ok) {
// Don't needlessly make the field present-but-empty if status is ok.
request.mutable_error_detail()->CopyFrom(ack.error_detail_);
}
return request;
}
void NewDeltaSubscriptionState::addResourceStateFromServer(
const envoy::service::discovery::v3::Resource& resource) {
if (resource.has_ttl()) {
ttl_.add(std::chrono::milliseconds(DurationUtil::durationToMilliseconds(resource.ttl())),
resource.name());
} else {
ttl_.clear(resource.name());
}
if (auto maybe_resource = getRequestedResourceState(resource.name());
maybe_resource.has_value()) {
// It is a resource that we requested.
maybe_resource->setVersion(resource.version());
ASSERT(requested_resource_state_.contains(resource.name()));
ASSERT(!wildcard_resource_state_.contains(resource.name()));
ASSERT(!ambiguous_resource_state_.contains(resource.name()));
} else {
// It is a resource that is a part of our wildcard request.
wildcard_resource_state_.insert_or_assign(resource.name(), resource.version());
// The resource could be ambiguous before, but now the ambiguity
// is resolved.
ambiguous_resource_state_.erase(resource.name());
ASSERT(!requested_resource_state_.contains(resource.name()));
ASSERT(wildcard_resource_state_.contains(resource.name()));
ASSERT(!ambiguous_resource_state_.contains(resource.name()));
}
}
OptRef<NewDeltaSubscriptionState::ResourceState>
NewDeltaSubscriptionState::getRequestedResourceState(absl::string_view resource_name) {
auto itr = requested_resource_state_.find(resource_name);
if (itr == requested_resource_state_.end()) {
return {};
}
return {itr->second};
}
OptRef<const NewDeltaSubscriptionState::ResourceState>
NewDeltaSubscriptionState::getRequestedResourceState(absl::string_view resource_name) const {
auto itr = requested_resource_state_.find(resource_name);
if (itr == requested_resource_state_.end()) {
return {};
}
return {itr->second};
}
} // namespace Config
} // namespace Envoy