Skip to content

Commit

Permalink
Use the b64 log format to track offsets for pause/play support
Browse files Browse the repository at this point in the history
  • Loading branch information
DingoEatingFuzz committed Nov 21, 2017
1 parent 6447785 commit bc3f02b
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 52 deletions.
33 changes: 33 additions & 0 deletions ui/app/utils/classes/abstract-logger.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import Ember from 'ember';
import queryString from 'npm:query-string';

const { Mixin, computed, assign } = Ember;
const CHUNK_SIZE = 50000;

export default Mixin.create({
url: '',
params: computed(() => ({})),
logFetch() {
Ember.assert(
'Loggers need a logFetch method, which should have an interface like window.fetch'
);
},

endOffset: null,

offsetParams: computed('endOffset', function() {
const endOffset = this.get('endOffset');
return endOffset
? { origin: 'start', offset: endOffset }
: { origin: 'end', offset: CHUNK_SIZE };
}),

additionalParams: computed(() => ({})),

fullUrl: computed('url', 'params', 'offsetParams', 'additionalParams', function() {
const queryParams = queryString.stringify(
assign({}, this.get('params'), this.get('offsetParams'), this.get('additionalParams'))
);
return `${this.get('url')}?${queryParams}`;
}),
});
33 changes: 3 additions & 30 deletions ui/app/utils/classes/poll-logger.js
Original file line number Diff line number Diff line change
@@ -1,38 +1,11 @@
import Ember from 'ember';
import queryString from 'npm:query-string';
import { task, timeout } from 'ember-concurrency';
import AbstractLogger from './abstract-logger';

const { Object: EmberObject, computed, assign } = Ember;
const { Object: EmberObject } = Ember;

export default EmberObject.extend({
url: '',
export default EmberObject.extend(AbstractLogger, {
interval: 1000,
params: computed(() => ({})),
logFetch() {
Ember.assert(
'Loggers need a logFetch method, which should have an interface like window.fetch'
);
},

endOffset: null,

fullUrl: computed('url', 'endOffset', 'params', function() {
const endOffset = this.get('endOffset');
let additionalParams;
if (endOffset) {
additionalParams = {
origin: 'start',
offset: this.get('endOffset'),
};
} else {
additionalParams = {
origin: 'end',
offset: 50000,
};
}
const queryParams = queryString.stringify(assign({}, this.get('params'), additionalParams));
return `${this.get('url')}?${queryParams}`;
}),

start() {
return this.get('poll').perform();
Expand Down
61 changes: 39 additions & 22 deletions ui/app/utils/classes/stream-logger.js
Original file line number Diff line number Diff line change
@@ -1,21 +1,17 @@
import Ember from 'ember';
import queryString from 'npm:query-string';
import { task } from 'ember-concurrency';
import TextDecoder from 'nomad-ui/utils/classes/text-decoder';
import AbstractLogger from './abstract-logger';

const { Object: EmberObject, computed, assign } = Ember;

export default EmberObject.extend({
url: '',
params: computed(() => ({})),
logFetch() {
Ember.assert(
'Loggers need a logFetch method, which should have an interface like window.fetch'
);
},
const { Object: EmberObject, computed } = Ember;

export default EmberObject.extend(AbstractLogger, {
reader: null,

additionalParams: computed(() => ({
follow: true,
})),

start() {
return this.get('poll').perform();
},
Expand All @@ -29,26 +25,47 @@ export default EmberObject.extend({
},

poll: task(function*() {
const queryParams = queryString.stringify(
assign({}, this.get('params'), {
plain: true,
follow: true,
origin: 'end',
offset: 50000,
})
);
const url = `${this.get('url')}?${queryParams}`;
const url = this.get('fullUrl');
const logFetch = this.get('logFetch');

let streamClosed = false;
let buffer = '';

const decoder = new TextDecoder();
const reader = yield logFetch(url).then(res => res.body.getReader());

this.set('reader', reader);

const decoder = new TextDecoder();
while (!streamClosed) {
yield reader.read().then(({ value, done }) => {
streamClosed = done;
this.get('write')(decoder.decode(value, { stream: true }));

// There is no guarantee that value will be a complete JSON object,
// so it needs to be buffered.
buffer += decoder.decode(value, { stream: true });

// Only when the buffer contains a close bracket can we be sure the buffer
// is in a complete state
if (buffer.indexOf('}') !== -1) {
// The buffer can be one or more complete frames with additional text for the
// next frame
const [, chunk, newBuffer] = buffer.match(/(.*\})(.*)$/);

// Peel chunk off the front of the buffer (since it represents complete frames)
// and set the buffer to be the remainder
buffer = newBuffer;

// Assuming the logs endpoint never returns nested JSON (it shouldn't), at this
// point chunk is a series of valid JSON objects with no delimiter.
const lines = chunk.replace(/\}\{/g, '}\n{').split('\n');
const frames = lines.map(line => JSON.parse(line)).filter(frame => frame.Data);

if (frames.length) {
frames.forEach(frame => (frame.Data = window.atob(frame.Data)));
this.set('endOffset', frames[frames.length - 1].Offset);
this.get('write')(frames.mapBy('Data').join(''));
}
}
});
}
}),
Expand Down

0 comments on commit bc3f02b

Please sign in to comment.