Skip to content

Commit

Permalink
implement diagnostic events in common JS package (#11)
Browse files Browse the repository at this point in the history
  • Loading branch information
eli-darkly authored Dec 11, 2019
1 parent 58ee382 commit 50bc6aa
Show file tree
Hide file tree
Showing 16 changed files with 1,074 additions and 125 deletions.
24 changes: 21 additions & 3 deletions src/EventProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,17 @@ import * as errors from './errors';
import * as messages from './messages';
import * as utils from './utils';

export default function EventProcessor(platform, options, environmentId, emitter = null, sender = null) {
export default function EventProcessor(
platform,
options,
environmentId,
diagnosticsAccumulator = null,
emitter = null,
sender = null
) {
const processor = {};
const eventSender = sender || EventSender(platform, options.eventsUrl, environmentId);
const eventSender = sender || EventSender(platform, environmentId);
const mainEventsUrl = options.eventsUrl + '/events/bulk/' + environmentId;
const summarizer = EventSummarizer();
const userFilter = UserFilter(options);
const inlineUsers = options.inlineUsersInEvents;
Expand Down Expand Up @@ -62,6 +70,10 @@ export default function EventProcessor(platform, options, environmentId, emitter
exceededCapacity = true;
logger.warn(messages.eventCapacityExceeded());
}
if (diagnosticsAccumulator) {
// For diagnostic events, we track how many times we had to drop an event due to exceeding the capacity.
diagnosticsAccumulator.incrementDroppedEvents();
}
}
}

Expand Down Expand Up @@ -109,12 +121,18 @@ export default function EventProcessor(platform, options, environmentId, emitter
summary.kind = 'summary';
eventsToSend.push(summary);
}
if (diagnosticsAccumulator) {
// For diagnostic events, we record how many events were in the queue at the last flush (since "how
// many events happened to be in the queue at the moment we decided to send a diagnostic event" would
// not be a very useful statistic).
diagnosticsAccumulator.setEventsInLastBatch(eventsToSend.length);
}
if (eventsToSend.length === 0) {
return Promise.resolve();
}
queue = [];
logger.debug(messages.debugPostingEvents(eventsToSend.length));
return eventSender.sendEvents(eventsToSend).then(responseInfo => {
return eventSender.sendEvents(eventsToSend, mainEventsUrl).then(responseInfo => {
if (responseInfo) {
if (responseInfo.serverTime) {
lastKnownPastTime = responseInfo.serverTime;
Expand Down
19 changes: 9 additions & 10 deletions src/EventSender.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@ import * as utils from './utils';

const MAX_URL_LENGTH = 2000;

export default function EventSender(platform, eventsUrl, environmentId, imageCreator) {
const postUrl = eventsUrl + '/events/bulk/' + environmentId;
const imageUrl = eventsUrl + '/a/' + environmentId + '.gif';
export default function EventSender(platform, environmentId, imageCreator) {
const imageUrlPath = '/a/' + environmentId + '.gif';
const sender = {};

function loadUrlUsingImage(src) {
Expand All @@ -25,7 +24,7 @@ export default function EventSender(platform, eventsUrl, environmentId, imageCre
return ret;
}

function sendChunk(events, usePost) {
sender.sendChunk = (events, url, usePost) => {
const createImage = imageCreator || loadUrlUsingImage;
const jsonBody = JSON.stringify(events);

Expand All @@ -38,7 +37,7 @@ export default function EventSender(platform, eventsUrl, environmentId, imageCre
utils.getLDHeaders(platform)
);
return platform
.httpRequest('POST', postUrl, headers, jsonBody)
.httpRequest('POST', url, headers, jsonBody)
.promise.then(result => {
if (!result) {
// This was a response from a fire-and-forget request, so we won't have a status.
Expand All @@ -61,15 +60,15 @@ export default function EventSender(platform, eventsUrl, environmentId, imageCre
if (usePost) {
return doPostRequest(true).catch(() => {});
} else {
const src = imageUrl + '?d=' + utils.base64URLEncode(jsonBody);
const src = url + imageUrlPath + '?d=' + utils.base64URLEncode(jsonBody);
createImage(src);
return Promise.resolve();
// We do not specify an onload handler for the image because we don't want the client to wait around
// for the image to load - it won't provide a server response, there's nothing to be done.
}
}
};

sender.sendEvents = function(events) {
sender.sendEvents = function(events, url) {
if (!platform.httpRequest) {
return Promise.resolve();
}
Expand All @@ -79,11 +78,11 @@ export default function EventSender(platform, eventsUrl, environmentId, imageCre
// no need to break up events into chunks if we can send a POST
chunks = [events];
} else {
chunks = utils.chunkUserEventsForUrl(MAX_URL_LENGTH - eventsUrl.length, events);
chunks = utils.chunkUserEventsForUrl(MAX_URL_LENGTH - url.length, events);
}
const results = [];
for (let i = 0; i < chunks.length; i++) {
results.push(sendChunk(chunks[i], canPost));
results.push(sender.sendChunk(chunks[i], url, canPost));
}
return Promise.all(results);
};
Expand Down
19 changes: 18 additions & 1 deletion src/Stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,19 @@ import { base64URLEncode } from './utils';
// it is in an active state (connected or connecting).
// eventSourceAllowsReport: true if REPORT is supported.

export default function Stream(platform, config, environment, hash) {
export default function Stream(platform, config, environment, diagnosticsAccumulator, hash) {
const baseUrl = config.streamUrl;
const logger = config.logger;
const stream = {};
const evalUrlPrefix = baseUrl + '/eval/' + environment;
const useReport = config.useReport;
const withReasons = config.evaluationReasons;
const streamReconnectDelay = config.streamReconnectDelay;
const diagAcc = diagnosticsAccumulator;
let firstConnectionErrorLogged = false;
let es = null;
let reconnectTimeoutReference = null;
let connectionAttemptStartTime;
let user = null;
let handlers = null;

Expand All @@ -34,6 +36,7 @@ export default function Stream(platform, config, environment, hash) {
// We will decorate *all* handlers to do this to keep this abstraction agnostic
// for different stream implementations.
firstConnectionErrorLogged = false;
logConnectionResult(true);
newHandlers[key] && newHandlers[key](e);
};
}
Expand All @@ -55,6 +58,7 @@ export default function Stream(platform, config, environment, hash) {
logger.warn(messages.streamError(err, streamReconnectDelay));
firstConnectionErrorLogged = true;
}
logConnectionResult(false);
closeConnection();
tryConnect(streamReconnectDelay);
}
Expand Down Expand Up @@ -99,6 +103,8 @@ export default function Stream(platform, config, environment, hash) {

closeConnection();
logger.info(messages.streamConnecting(url));
logConnectionStarted();

es = platform.eventSourceFactory(url, options);
for (const key in handlers) {
if (handlers.hasOwnProperty(key)) {
Expand All @@ -118,5 +124,16 @@ export default function Stream(platform, config, environment, hash) {
}
}

function logConnectionStarted() {
connectionAttemptStartTime = new Date().getTime();
}

function logConnectionResult(success) {
if (connectionAttemptStartTime && diagAcc) {
diagAcc.recordStreamInit(connectionAttemptStartTime, !success, new Date().getTime() - connectionAttemptStartTime);
}
connectionAttemptStartTime = null;
}

return stream;
}
Loading

0 comments on commit 50bc6aa

Please sign in to comment.