Skip to content

Commit

Permalink
Complete most of the endpoint watcher implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
murgatroid99 committed Jul 7, 2020
1 parent 81fff18 commit 5767f7d
Show file tree
Hide file tree
Showing 2 changed files with 240 additions and 5 deletions.
2 changes: 1 addition & 1 deletion packages/grpc-js/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
"types": "build/src/index.d.ts",
"license": "Apache-2.0",
"devDependencies": {
"@grpc/proto-loader": "^0.6.0-pre3",
"@grpc/proto-loader": "^0.6.0-pre5",
"@types/gulp": "^4.0.6",
"@types/gulp-mocha": "0.0.32",
"@types/lodash": "^4.14.108",
Expand Down
243 changes: 239 additions & 4 deletions packages/grpc-js/src/xds-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,26 @@ import * as fs from 'fs';
import * as protoLoader from '@grpc/proto-loader';
import { loadPackageDefinition } from './make-client';
import * as adsTypes from './generated/ads';
import { ChannelCredentials } from './channel-credentials';
import * as edsTypes from './generated/endpoint';
import { ChannelCredentials, createGoogleDefaultCredentials } from './channel-credentials';
import { loadBootstrapInfo } from './xds-bootstrap';
import { ClientDuplexStream, ServiceError } from './call';
import { StatusObject } from './call-stream';
import { isIPv4, isIPv6 } from 'net';
import { Status } from './constants';
import { Metadata } from './metadata';

const packageDefinition = protoLoader.loadSync([
const clientVersion = require('../../package.json').version;

const EDS_TYPE_URL = 'type.googleapis.com/envoy.api.v2.ClusterLoadAssignment';

let loadedProtos: Promise<adsTypes.ProtoGrpcType> | null = null;

function loadAdsProtos(): Promise<adsTypes.ProtoGrpcType> {
if (loadedProtos !== null) {
return loadedProtos;
}
loadedProtos = protoLoader.load([
'envoy/service/discovery/v2/ads.proto',
'envoy/api/v2/listener.proto',
'envoy/api/v2/route.proto',
Expand All @@ -40,10 +57,228 @@ const packageDefinition = protoLoader.loadSync([
'deps/googleapis/',
'deps/protoc-gen-validate/'
]
});
}).then(packageDefinition => loadPackageDefinition(packageDefinition) as unknown as adsTypes.ProtoGrpcType);
return loadedProtos;
}

const loadedDefinition = loadPackageDefinition(packageDefinition) as unknown as adsTypes.ProtoGrpcType;
export interface Watcher<UpdateType> {
onValidUpdate(update: UpdateType): void;
onTransientError(error: StatusObject): void;
onResourceDoesNotExist(): void;
}

export class XdsClient {
private node: adsTypes.messages.envoy.api.v2.core.Node | null = null;
private client: adsTypes.ClientInterfaces.envoy.service.discovery.v2.AggregatedDiscoveryServiceClient | null = null;
private adsCall: ClientDuplexStream<adsTypes.messages.envoy.api.v2.DiscoveryRequest, adsTypes.messages.envoy.api.v2.DiscoveryResponse__Output> | null = null;

private endpointWatchers: Map<string, Watcher<edsTypes.messages.envoy.api.v2.ClusterLoadAssignment__Output>[]> = new Map<string, Watcher<edsTypes.messages.envoy.api.v2.ClusterLoadAssignment__Output>[]>();
private lastEdsVersionInfo: string = '';
private lastEdsNonce: string = '';

constructor() {
Promise.all([loadBootstrapInfo(), loadAdsProtos()]).then(([bootstrapInfo, protoDefinitions]) => {
this.node = {
...bootstrapInfo.node,
build_version: `gRPC Node Pure JS ${clientVersion}`,
user_agent_name: 'gRPC Node Pure JS'
}
this.client = new protoDefinitions.envoy.service.discovery.v2.AggregatedDiscoveryService(bootstrapInfo.xdsServers[0].serverUri, createGoogleDefaultCredentials());
this.maybeStartAdsStream();
}, (error) => {
// Bubble this error up to any listeners
for (const watcherList of this.endpointWatchers.values()) {
for (const watcher of watcherList) {
watcher.onTransientError({
code: Status.INTERNAL,
details: `Failed to initialize xDS Client. ${error.message}`,
metadata: new Metadata()
})
}
}
});
}

/**
* Start the ADS stream if the client exists and there is not already an
* existing stream.
*/
private maybeStartAdsStream() {
if (this.client === null) {
return;
}
if (this.adsCall !== null) {
return;
}
this.adsCall = this.client.StreamAggregatedResources();
this.adsCall.on('data', (message: adsTypes.messages.envoy.api.v2.DiscoveryResponse__Output) => {
switch (message.type_url) {
case EDS_TYPE_URL:
const edsResponses: edsTypes.messages.envoy.api.v2.ClusterLoadAssignment__Output[] = [];
for (const resource of message.resources) {
if (protoLoader.isAnyExtension(resource) && resource['@type'] === EDS_TYPE_URL) {
const resp = resource as protoLoader.AnyExtension & edsTypes.messages.envoy.api.v2.ClusterLoadAssignment__Output;
if (!this.validateEdsResponse(resp)) {
this.nackEds('ClusterLoadAssignment validation failed');
return;
}
edsResponses.push(resp);
} else {
this.nackEds(`Invalid resource type ${protoLoader.isAnyExtension(resource) ? resource['@type'] : resource.type_url}`);
return;
}
}
for (const message of edsResponses) {
this.handleEdsResponse(message);
}
this.lastEdsVersionInfo = message.version_info;
this.lastEdsNonce = message.nonce;
this.ackEds();
break;
default:
this.nackUnknown(message.type_url, message.version_info, message.nonce);
}
});
this.adsCall.on('error', (error: ServiceError) => {
this.adsCall = null;
this.reportStreamError(error);
this.maybeStartAdsStream();
});
const endpointWatcherNames = Array.from(this.endpointWatchers.keys());
if (endpointWatcherNames.length > 0) {
this.adsCall.write({
node: this.node,
type_url: EDS_TYPE_URL,
resource_names: endpointWatcherNames
});
}
}

private nackUnknown(typeUrl: string, versionInfo: string, nonce: string) {
if (!this.adsCall) {
return;
}
this.adsCall.write({
node: this.node,
type_url: typeUrl,
version_info: versionInfo,
response_nonce: nonce,
error_detail: {
message: `Unknown type_url ${typeUrl}`
}
});
}

/**
* Acknowledge an EDS update. This should be called after the local nonce and
* version info are updated so that it sends the post-update values.
*/
private ackEds() {
if (!this.adsCall) {
return;
}
this.adsCall.write({
node: this.node,
type_url: EDS_TYPE_URL,
resource_names: Array.from(this.endpointWatchers.keys()),
response_nonce: this.lastEdsNonce,
version_info: this.lastEdsVersionInfo
});
}

/**
* Reject an EDS update. This should be called without updating the local
* nonce and version info.
*/
private nackEds(message: string) {
if (!this.adsCall) {
return;
}
this.adsCall.write({
node: this.node,
type_url: EDS_TYPE_URL,
resource_names: Array.from(this.endpointWatchers.keys()),
response_nonce: this.lastEdsNonce,
version_info: this.lastEdsVersionInfo,
error_detail: {
message
}
});
}

private validateEdsResponse(message: edsTypes.messages.envoy.api.v2.ClusterLoadAssignment__Output): boolean {
for (const endpoint of message.endpoints) {
for (const lb of endpoint.lb_endpoints) {
if (!lb.endpoint) {
return false;
}
if (!lb.endpoint.address) {
return false;
}
if (!lb.endpoint.address.socket_address) {
return false;
}
const socketAddress = lb.endpoint.address.socket_address;
if (socketAddress.port_specifier !== 'port_value') {
return false;
}
if (!(isIPv4(socketAddress.address) || isIPv6(socketAddress.address))) {
return false;
}
}
}
return true;
}

private handleEdsResponse(message: edsTypes.messages.envoy.api.v2.ClusterLoadAssignment__Output) {
const watchers = this.endpointWatchers.get(message.cluster_name) ?? [];
for (const watcher of watchers) {
watcher.onValidUpdate(message);
}
}

private updateEdsNames() {
if (this.adsCall) {
this.adsCall.write({
node: this.node,
type_url: EDS_TYPE_URL,
resource_names: Array.from(this.endpointWatchers.keys()),
response_nonce: this.lastEdsNonce,
version_info: this.lastEdsVersionInfo
});
}
}

private reportStreamError(status: StatusObject) {
for (const watcherList of this.endpointWatchers.values()) {
for (const watcher of watcherList) {
watcher.onTransientError(status);
}
}
// Also do the same for other types of watchers when those are implemented
}

addEndpointWatcher(edsServiceName: string, watcher: Watcher<edsTypes.messages.envoy.api.v2.ClusterLoadAssignment__Output>) {
let watchersEntry = this.endpointWatchers.get(edsServiceName);
let addedServiceName = false;
if (watchersEntry === undefined) {
addedServiceName = true;
watchersEntry = [];
this.endpointWatchers.set(edsServiceName, watchersEntry);
}
watchersEntry.push(watcher);
if (addedServiceName) {
this.updateEdsNames();
}
}

removeEndpointWatcher(edsServiceName: string, watcher: Watcher<edsTypes.messages.envoy.api.v2.ClusterLoadAssignment__Output>) {
const watchersEntry = this.endpointWatchers.get(edsServiceName);
if (watchersEntry !== undefined) {
const entryIndex = watchersEntry.indexOf(watcher);
if (entryIndex >= 0) {
watchersEntry.splice(entryIndex, 1);
}
}
}
}

0 comments on commit 5767f7d

Please sign in to comment.