Skip to content

Commit

Permalink
[bfetch] Pass compress flag in query instead of headers (#113929)
Browse files Browse the repository at this point in the history
  • Loading branch information
Dosant authored Oct 11, 2021
1 parent 53109bd commit f944389
Show file tree
Hide file tree
Showing 9 changed files with 56 additions and 150 deletions.
1 change: 1 addition & 0 deletions src/plugins/bfetch/common/util/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@

export * from './normalize_error';
export * from './remove_leading_slash';
export * from './query_params';
12 changes: 12 additions & 0 deletions src/plugins/bfetch/common/util/query_params.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

export const appendQueryParam = (url: string, key: string, value: string): string => {
const separator = url.includes('?') ? '&' : '?';
return `${url}${separator}${key}=${value}`;
};
12 changes: 6 additions & 6 deletions src/plugins/bfetch/public/streaming/fetch_streaming.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { map, share } from 'rxjs/operators';
import { inflateResponse } from '.';
import { fromStreamingXhr } from './from_streaming_xhr';
import { split } from './split';
import { appendQueryParam } from '../../common';

export interface FetchStreamingParams {
url: string;
Expand All @@ -34,16 +35,15 @@ export function fetchStreaming({
}: FetchStreamingParams) {
const xhr = new window.XMLHttpRequest();

// Begin the request
xhr.open(method, url);
xhr.withCredentials = true;

const isCompressionDisabled = getIsCompressionDisabled();

if (!isCompressionDisabled) {
headers['X-Chunk-Encoding'] = 'deflate';
url = appendQueryParam(url, 'compress', 'true');
}

// Begin the request
xhr.open(method, url);
xhr.withCredentials = true;

// Set the HTTP headers
Object.entries(headers).forEach(([k, v]) => xhr.setRequestHeader(k, v));

Expand Down
1 change: 0 additions & 1 deletion src/plugins/bfetch/server/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import { PluginInitializerContext } from '../../../core/server';
import { BfetchServerPlugin } from './plugin';

export { BfetchServerSetup, BfetchServerStart, BatchProcessingRouteParams } from './plugin';
export { StreamingRequestHandler } from './types';

export function plugin(initializerContext: PluginInitializerContext) {
return new BfetchServerPlugin(initializerContext);
Expand Down
1 change: 0 additions & 1 deletion src/plugins/bfetch/server/mocks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ const createSetupContract = (): Setup => {
const setupContract: Setup = {
addBatchProcessingRoute: jest.fn(),
addStreamingResponseRoute: jest.fn(),
createStreamingRequestHandler: jest.fn(),
};
return setupContract;
};
Expand Down
78 changes: 3 additions & 75 deletions src/plugins/bfetch/server/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,6 @@ import type {
Plugin,
Logger,
KibanaRequest,
RouteMethod,
RequestHandler,
RequestHandlerContext,
StartServicesAccessor,
} from 'src/core/server';
import { schema } from '@kbn/config-schema';
Expand All @@ -28,7 +25,6 @@ import {
removeLeadingSlash,
normalizeError,
} from '../common';
import { StreamingRequestHandler } from './types';
import { createStream } from './streaming';
import { getUiSettings } from './ui_settings';

Expand All @@ -52,44 +48,6 @@ export interface BfetchServerSetup {
path: string,
params: (request: KibanaRequest) => StreamingResponseHandler<Payload, Response>
) => void;
/**
* Create a streaming request handler to be able to use an Observable to return chunked content to the client.
* This is meant to be used with the `fetchStreaming` API of the `bfetch` client-side plugin.
*
* @example
* ```ts
* setup({ http }: CoreStart, { bfetch }: SetupDeps) {
* const router = http.createRouter();
* router.post(
* {
* path: '/api/my-plugin/stream-endpoint,
* validate: {
* body: schema.object({
* term: schema.string(),
* }),
* }
* },
* bfetch.createStreamingResponseHandler(async (ctx, req) => {
* const { term } = req.body;
* const results$ = await myApi.getResults$(term);
* return results$;
* })
* )}
*
* ```
*
* @param streamHandler
*/
createStreamingRequestHandler: <
Response,
P,
Q,
B,
Context extends RequestHandlerContext = RequestHandlerContext,
Method extends RouteMethod = any
>(
streamHandler: StreamingRequestHandler<Response, P, Q, B, Method>
) => RequestHandler<P, Q, B, Context, Method>;
}

// eslint-disable-next-line
Expand Down Expand Up @@ -124,15 +82,10 @@ export class BfetchServerPlugin
logger,
});
const addBatchProcessingRoute = this.addBatchProcessingRoute(addStreamingResponseRoute);
const createStreamingRequestHandler = this.createStreamingRequestHandler({
getStartServices: core.getStartServices,
logger,
});

return {
addBatchProcessingRoute,
addStreamingResponseRoute,
createStreamingRequestHandler,
};
}

Expand All @@ -142,10 +95,6 @@ export class BfetchServerPlugin

public stop() {}

private getCompressionDisabled(request: KibanaRequest) {
return request.headers['x-chunk-encoding'] !== 'deflate';
}

private addStreamingResponseRoute =
({
getStartServices,
Expand All @@ -162,42 +111,21 @@ export class BfetchServerPlugin
path: `/${removeLeadingSlash(path)}`,
validate: {
body: schema.any(),
query: schema.object({ compress: schema.boolean({ defaultValue: false }) }),
},
},
async (context, request, response) => {
const handlerInstance = handler(request);
const data = request.body;
const compressionDisabled = this.getCompressionDisabled(request);
const compress = request.query.compress;
return response.ok({
headers: streamingHeaders,
body: createStream(
handlerInstance.getResponseStream(data),
logger,
compressionDisabled
),
body: createStream(handlerInstance.getResponseStream(data), logger, compress),
});
}
);
};

private createStreamingRequestHandler =
({
logger,
getStartServices,
}: {
logger: Logger;
getStartServices: StartServicesAccessor;
}): BfetchServerSetup['createStreamingRequestHandler'] =>
(streamHandler) =>
async (context, request, response) => {
const response$ = await streamHandler(context, request);
const compressionDisabled = this.getCompressionDisabled(request);
return response.ok({
headers: streamingHeaders,
body: createStream(response$, logger, compressionDisabled),
});
};

private addBatchProcessingRoute =
(
addStreamingResponseRoute: BfetchServerSetup['addStreamingResponseRoute']
Expand Down
8 changes: 4 additions & 4 deletions src/plugins/bfetch/server/streaming/create_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ import { createNDJSONStream } from './create_ndjson_stream';
export function createStream<Payload, Response>(
response$: Observable<Response>,
logger: Logger,
compressionDisabled: boolean
compress: boolean
): Stream {
return compressionDisabled
? createNDJSONStream(response$, logger)
: createCompressedStream(response$, logger);
return compress
? createCompressedStream(response$, logger)
: createNDJSONStream(response$, logger);
}
27 changes: 0 additions & 27 deletions src/plugins/bfetch/server/types.ts

This file was deleted.

66 changes: 30 additions & 36 deletions test/api_integration/apis/search/bsearch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,28 +29,25 @@ export default function ({ getService }: FtrProviderContext) {
describe('bsearch', () => {
describe('post', () => {
it('should return 200 a single response', async () => {
const resp = await supertest
.post(`/internal/bsearch`)
.set({ 'X-Chunk-Encoding': '' })
.send({
batch: [
{
request: {
params: {
index: '.kibana',
body: {
query: {
match_all: {},
},
const resp = await supertest.post(`/internal/bsearch`).send({
batch: [
{
request: {
params: {
index: '.kibana',
body: {
query: {
match_all: {},
},
},
},
options: {
strategy: 'es',
},
},
],
});
options: {
strategy: 'es',
},
},
],
});

const jsonBody = parseBfetchResponse(resp);

Expand All @@ -62,28 +59,25 @@ export default function ({ getService }: FtrProviderContext) {
});

it('should return 200 a single response from compressed', async () => {
const resp = await supertest
.post(`/internal/bsearch`)
.set({ 'X-Chunk-Encoding': 'deflate' })
.send({
batch: [
{
request: {
params: {
index: '.kibana',
body: {
query: {
match_all: {},
},
const resp = await supertest.post(`/internal/bsearch?compress=true`).send({
batch: [
{
request: {
params: {
index: '.kibana',
body: {
query: {
match_all: {},
},
},
},
options: {
strategy: 'es',
},
},
],
});
options: {
strategy: 'es',
},
},
],
});

const jsonBody = parseBfetchResponse(resp, true);

Expand Down

0 comments on commit f944389

Please sign in to comment.