diff --git a/.eslintrc.json b/.eslintrc.json index e881d63..93b619d 100644 --- a/.eslintrc.json +++ b/.eslintrc.json @@ -29,6 +29,7 @@ ], "deprecation/deprecation": "error", "@typescript-eslint/no-empty-interface": "off", + "@typescript-eslint/no-empty-function": "off", "@typescript-eslint/no-explicit-any": "off", "@typescript-eslint/no-namespace": "off", "@typescript-eslint/no-unused-vars": ["error", { "argsIgnorePattern": "^_" }], diff --git a/.vscode/settings.json b/.vscode/settings.json index 6827c6d..ca70865 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,6 +1,6 @@ { "azureFunctions.showProjectWarning": false, - "editor.formatOnSave": true, + "editor.formatOnSave": false, "editor.codeActionsOnSave": ["source.fixAll"], "typescript.tsdk": "node_modules/typescript/lib", "typescript.preferences.importModuleSpecifier": "relative", diff --git a/app/v3/httpTriggerRandomDelay/function.json b/app/v3/httpTriggerRandomDelay/function.json new file mode 100644 index 0000000..0830e3c --- /dev/null +++ b/app/v3/httpTriggerRandomDelay/function.json @@ -0,0 +1,20 @@ +{ + "bindings": [ + { + "authLevel": "anonymous", + "type": "httpTrigger", + "direction": "in", + "name": "req", + "methods": [ + "get", + "post" + ] + }, + { + "type": "http", + "direction": "out", + "name": "res" + } + ], + "scriptFile": "../dist/httpTriggerRandomDelay/index.js" +} \ No newline at end of file diff --git a/app/v3/httpTriggerRandomDelay/index.ts b/app/v3/httpTriggerRandomDelay/index.ts new file mode 100644 index 0000000..cab6c34 --- /dev/null +++ b/app/v3/httpTriggerRandomDelay/index.ts @@ -0,0 +1,19 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. + +import { AzureFunction, Context, HttpRequest } from '@azure/functions'; +import { addRandomAsyncOrSyncDelay } from '../utils/getRandomTestData'; + +const httpTrigger: AzureFunction = async function (context: Context, req: HttpRequest): Promise { + context.log(`Http function processed request for url "${req.url}"`); + + await addRandomAsyncOrSyncDelay(); + + const name = req.query.name || req.body || 'world'; + + context.res = { + body: `Hello, ${name}!`, + }; +}; + +export default httpTrigger; diff --git a/app/v3/utils/delay.ts b/app/v3/utils/delay.ts new file mode 100644 index 0000000..7b7a884 --- /dev/null +++ b/app/v3/utils/delay.ts @@ -0,0 +1,13 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. + +export async function delay(ms: number): Promise { + await new Promise((resolve) => setTimeout(resolve, ms)); +} + +export function delaySync(ms: number): void { + const endTime = Date.now() + ms; + while (Date.now() < endTime) { + // wait + } +} diff --git a/app/v3/utils/getRandomTestData.ts b/app/v3/utils/getRandomTestData.ts new file mode 100644 index 0000000..618f46d --- /dev/null +++ b/app/v3/utils/getRandomTestData.ts @@ -0,0 +1,32 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. + +import * as crypto from 'crypto'; +import { delay, delaySync } from './delay'; + +export function getRandomTestData(): string { + // This should start with non-numeric data to prevent this bug from causing a test failure + // https://github.com/Azure/azure-functions-nodejs-library/issues/90 + return `testData${getRandomHexString()}`; +} + +export function getRandomHexString(length = 10): string { + const buffer: Buffer = crypto.randomBytes(Math.ceil(length / 2)); + return buffer.toString('hex').slice(0, length); +} + +export function getRandomInt(min: number, max: number): number { + return Math.floor(Math.random() * (max - min) + min); +} + +export function getRandomBoolean(percentTrue: number): boolean { + return Math.random() * 100 > percentTrue; +} + +export async function addRandomAsyncOrSyncDelay(): Promise { + if (getRandomBoolean(95)) { + await delay(getRandomInt(0, 250)); + } else { + delaySync(getRandomInt(0, 10)); + } +} diff --git a/app/v4-oldConfig/src/setup.ts b/app/v4-oldConfig/src/setup.ts new file mode 100644 index 0000000..ed6a2ae --- /dev/null +++ b/app/v4-oldConfig/src/setup.ts @@ -0,0 +1,6 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. + +import { app } from '@azure/functions'; + +app.setup({ enableHttpStream: false }); diff --git a/app/v4/package-lock.json b/app/v4/package-lock.json index 9020d3b..3b8a7f3 100644 --- a/app/v4/package-lock.json +++ b/app/v4/package-lock.json @@ -8,7 +8,7 @@ "name": "v4", "version": "1.0.0", "dependencies": { - "@azure/functions": "^4.0.0" + "@azure/functions": "^4.2.0" }, "devDependencies": { "@types/long": "^4.0.0", @@ -18,9 +18,9 @@ } }, "node_modules/@azure/functions": { - "version": "4.0.1", - "resolved": "https://registry.npmjs.org/@azure/functions/-/functions-4.0.1.tgz", - "integrity": "sha512-Ol38b4XOlu6IDkLnO91HaYeo2utMixG0LIA1NR9Qehu17U/cGjNx+bAcOEdNlSJWNYh5ChhzjxA/uFB5dQJtmg==", + "version": "4.2.0", + "resolved": "https://registry.npmjs.org/@azure/functions/-/functions-4.2.0.tgz", + "integrity": "sha512-RSECLoje4jGVpsVRjEzkna9KvmQOVeB96cg8J5J2g41QQpMWCzD1QTPI5+yI0uvOidGRLYElV1zHZjdvsGf9Nw==", "dependencies": { "long": "^4.0.0", "undici": "^5.13.0" diff --git a/app/v4/package.json b/app/v4/package.json index b9f95ff..562364c 100644 --- a/app/v4/package.json +++ b/app/v4/package.json @@ -12,13 +12,13 @@ "test": "echo \"No tests yet...\"" }, "dependencies": { - "@azure/functions": "^4.0.0" + "@azure/functions": "^4.2.0" }, "devDependencies": { "@types/long": "^4.0.0", "@types/node": "^18.x", - "typescript": "^4.0.0", - "rimraf": "^5.0.0" + "rimraf": "^5.0.0", + "typescript": "^4.0.0" }, - "main": "dist/src/{index.js,functions/*.js}" -} \ No newline at end of file + "main": "dist/src/{index.js,setup.js,functions/*.js}" +} diff --git a/app/v4/src/functions/httpTriggerRandomDelay.ts b/app/v4/src/functions/httpTriggerRandomDelay.ts new file mode 100644 index 0000000..ae886a6 --- /dev/null +++ b/app/v4/src/functions/httpTriggerRandomDelay.ts @@ -0,0 +1,24 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. + +import { app, HttpRequest, HttpResponseInit, InvocationContext } from '@azure/functions'; +import { addRandomAsyncOrSyncDelay } from '../utils/getRandomTestData'; + +export async function httpTriggerRandomDelay( + request: HttpRequest, + context: InvocationContext +): Promise { + context.log(`Http function processed request for url "${request.url}"`); + + await addRandomAsyncOrSyncDelay(); + + const name = request.query.get('name') || (await request.text()) || 'world'; + + return { body: `Hello, ${name}!` }; +} + +app.http('httpTriggerRandomDelay', { + methods: ['GET', 'POST'], + authLevel: 'anonymous', + handler: httpTriggerRandomDelay, +}); diff --git a/app/v4/src/functions/httpTriggerReceiveStream.ts b/app/v4/src/functions/httpTriggerReceiveStream.ts new file mode 100644 index 0000000..43b55e1 --- /dev/null +++ b/app/v4/src/functions/httpTriggerReceiveStream.ts @@ -0,0 +1,22 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. + +import { app, HttpRequest, HttpResponseInit, InvocationContext } from '@azure/functions'; +import { receiveStreamWithProgress } from '../utils/streamHttp'; + +export async function httpTriggerReceiveStream( + request: HttpRequest, + context: InvocationContext +): Promise { + context.log(`Http function processed request for url "${request.url}"`); + + const bytesReceived = await receiveStreamWithProgress(request.body); + + return { body: `Bytes received: ${bytesReceived}` }; +} + +app.http('httpTriggerReceiveStream', { + methods: ['GET', 'POST'], + authLevel: 'anonymous', + handler: httpTriggerReceiveStream, +}); diff --git a/app/v4/src/functions/httpTriggerSendStream.ts b/app/v4/src/functions/httpTriggerSendStream.ts new file mode 100644 index 0000000..8da1c2f --- /dev/null +++ b/app/v4/src/functions/httpTriggerSendStream.ts @@ -0,0 +1,22 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. + +import { app, HttpRequest, HttpResponseInit, InvocationContext } from '@azure/functions'; +import { createRandomStream } from '../utils/streamHttp'; + +export async function httpTriggerSendStream( + request: HttpRequest, + context: InvocationContext +): Promise { + context.log(`Http function processed request for url "${request.url}"`); + + const lengthInMb = request.query.get('lengthInMb'); + const stream = createRandomStream(Number(lengthInMb)); + return { body: stream }; +} + +app.http('httpTriggerSendStream', { + methods: ['GET', 'POST'], + authLevel: 'anonymous', + handler: httpTriggerSendStream, +}); diff --git a/app/v4/src/setup.ts b/app/v4/src/setup.ts new file mode 100644 index 0000000..7671d82 --- /dev/null +++ b/app/v4/src/setup.ts @@ -0,0 +1,6 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. + +import { app } from '@azure/functions'; + +app.setup({ enableHttpStream: true }); diff --git a/app/v4/src/utils/delay.ts b/app/v4/src/utils/delay.ts new file mode 100644 index 0000000..7b7a884 --- /dev/null +++ b/app/v4/src/utils/delay.ts @@ -0,0 +1,13 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. + +export async function delay(ms: number): Promise { + await new Promise((resolve) => setTimeout(resolve, ms)); +} + +export function delaySync(ms: number): void { + const endTime = Date.now() + ms; + while (Date.now() < endTime) { + // wait + } +} diff --git a/app/v4/src/utils/getRandomTestData.ts b/app/v4/src/utils/getRandomTestData.ts new file mode 100644 index 0000000..618f46d --- /dev/null +++ b/app/v4/src/utils/getRandomTestData.ts @@ -0,0 +1,32 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. + +import * as crypto from 'crypto'; +import { delay, delaySync } from './delay'; + +export function getRandomTestData(): string { + // This should start with non-numeric data to prevent this bug from causing a test failure + // https://github.com/Azure/azure-functions-nodejs-library/issues/90 + return `testData${getRandomHexString()}`; +} + +export function getRandomHexString(length = 10): string { + const buffer: Buffer = crypto.randomBytes(Math.ceil(length / 2)); + return buffer.toString('hex').slice(0, length); +} + +export function getRandomInt(min: number, max: number): number { + return Math.floor(Math.random() * (max - min) + min); +} + +export function getRandomBoolean(percentTrue: number): boolean { + return Math.random() * 100 > percentTrue; +} + +export async function addRandomAsyncOrSyncDelay(): Promise { + if (getRandomBoolean(95)) { + await delay(getRandomInt(0, 250)); + } else { + delaySync(getRandomInt(0, 10)); + } +} diff --git a/app/v4/src/utils/streamHttp.ts b/app/v4/src/utils/streamHttp.ts new file mode 100644 index 0000000..68760d8 --- /dev/null +++ b/app/v4/src/utils/streamHttp.ts @@ -0,0 +1,57 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. + +import * as crypto from 'crypto'; +import { Readable } from 'stream'; +import { delay } from './delay'; + +const oneMb = 1024 * 1024; + +export function createRandomStream(lengthInMb: number): Readable { + const stream = new Readable(); + stream._read = () => {}; + setTimeout(() => { + void sendRandomData(stream, lengthInMb); + }, 5); + return stream; +} + +async function sendRandomData(stream: Readable, lengthInMb: number): Promise { + const maxChunkSize = oneMb; + let remainingBytes = convertMbToB(lengthInMb); + do { + if (stream.readableLength > maxChunkSize) { + await delay(5); + } else { + const chunkSize = Math.min(maxChunkSize, remainingBytes); + stream.push(crypto.randomBytes(chunkSize)); + remainingBytes -= chunkSize; + } + } while (remainingBytes > 0); + stream.push(null); +} + +export async function receiveStreamWithProgress(stream: { + [Symbol.asyncIterator](): AsyncIterableIterator; +}): Promise { + let bytesReceived = 0; + const logInterval = 500; + let nextLogTime = Date.now(); + for await (const chunk of stream) { + if (Date.now() > nextLogTime) { + nextLogTime = Date.now() + logInterval; + console.log(`Progress: ${convertBToMb(bytesReceived)}mb`); + } + + bytesReceived += chunk.length; + } + return bytesReceived; +} + +export function convertMbToB(mb: number): number { + return mb * oneMb; +} + +function convertBToMb(bytes: number) { + return Math.round(bytes / oneMb); +} diff --git a/package-lock.json b/package-lock.json index f2dc7c5..0a102a5 100644 --- a/package-lock.json +++ b/package-lock.json @@ -32,6 +32,7 @@ "@types/uuid": "^9.0.4", "@typescript-eslint/eslint-plugin": "^5.12.1", "@typescript-eslint/parser": "^5.12.1", + "agentkeepalive": "^4.5.0", "chai": "^4.2.0", "chai-as-promised": "^7.1.1", "eslint": "^7.32.0", @@ -49,7 +50,7 @@ "mocha-junit-reporter": "^2.0.2", "mocha-multi-reporters": "^1.5.1", "mssql": "^9.1.0", - "node-fetch": "^2.0.0", + "node-fetch": "2.6.7", "p-retry": "^4.0.0", "prettier": "^2.4.1", "rimraf": "^5.0.0", @@ -1526,6 +1527,17 @@ "node": ">= 6.0.0" } }, + "node_modules/agentkeepalive": { + "version": "4.5.0", + "resolved": "https://registry.npmjs.org/agentkeepalive/-/agentkeepalive-4.5.0.tgz", + "integrity": "sha512-5GG/5IbQQpC9FpkRGsSvZI5QYeSCzlJHdpBQntCsuTOxhKD8lqKhrleg2Yi7yvMIf82Ycmmqln9U8V9qwEiJew==", + "dependencies": { + "humanize-ms": "^1.2.1" + }, + "engines": { + "node": ">= 8.0.0" + } + }, "node_modules/ajv": { "version": "6.12.6", "resolved": "https://registry.npmjs.org/ajv/-/ajv-6.12.6.tgz", @@ -3296,6 +3308,14 @@ "node": ">= 6" } }, + "node_modules/humanize-ms": { + "version": "1.2.1", + "resolved": "https://registry.npmjs.org/humanize-ms/-/humanize-ms-1.2.1.tgz", + "integrity": "sha512-Fl70vYtsAFb/C06PTS9dZBo7ihau+Tu/DNCk/OyHhea07S+aeMWpFFkUaXRa8fI+ScZbEI8dfSxwY7gxZ9SAVQ==", + "dependencies": { + "ms": "^2.0.0" + } + }, "node_modules/iconv-lite": { "version": "0.6.3", "resolved": "https://registry.npmjs.org/iconv-lite/-/iconv-lite-0.6.3.tgz", @@ -4325,9 +4345,9 @@ "integrity": "sha512-AGK2yQKIjRuqnc6VkX2Xj5d+QW8xZ87pa1UK6yA6ouUyuxfHuMP6umE5QK7UmTeOAymo+Zx1Fxiuw9rVx8taHQ==" }, "node_modules/node-fetch": { - "version": "2.7.0", - "resolved": "https://registry.npmjs.org/node-fetch/-/node-fetch-2.7.0.tgz", - "integrity": "sha512-c4FRfUm/dbcWZ7U+1Wq0AwCyFL+3nt2bEw05wfxSz+DWpWsitgmSgYmy2dQdWyKC1694ELPqMs/YzUSNozLt8A==", + "version": "2.6.7", + "resolved": "https://registry.npmjs.org/node-fetch/-/node-fetch-2.6.7.tgz", + "integrity": "sha512-ZjMPFEfVx5j+y2yF35Kzx5sF7kDzxuDj6ziH4FFbOp87zKDZNx8yExJIb05OGF4Nlt9IHFIMBkRl41VdvcNdbQ==", "dependencies": { "whatwg-url": "^5.0.0" }, diff --git a/package.json b/package.json index 65ee8f4..d960777 100644 --- a/package.json +++ b/package.json @@ -59,6 +59,7 @@ "@types/uuid": "^9.0.4", "@typescript-eslint/eslint-plugin": "^5.12.1", "@typescript-eslint/parser": "^5.12.1", + "agentkeepalive": "^4.5.0", "chai": "^4.2.0", "chai-as-promised": "^7.1.1", "eslint": "^7.32.0", @@ -76,7 +77,7 @@ "mocha-junit-reporter": "^2.0.2", "mocha-multi-reporters": "^1.5.1", "mssql": "^9.1.0", - "node-fetch": "^2.0.0", + "node-fetch": "2.6.7", "p-retry": "^4.0.0", "prettier": "^2.4.1", "rimraf": "^5.0.0", @@ -86,4 +87,4 @@ "typescript": "^4.5.5", "uuid": "^9.0.1" } -} \ No newline at end of file +} diff --git a/src/global.test.ts b/src/global.test.ts index f81723f..f8329c7 100644 --- a/src/global.test.ts +++ b/src/global.test.ts @@ -25,6 +25,12 @@ export let isOldConfig: boolean; let childProc: cp.ChildProcess | undefined; let testsDone = false; +interface FuncCliSettings { + hideOutput?: boolean; +} + +export const funcCliSettings: FuncCliSettings = {}; + before(async function (this: Mocha.Context): Promise { model = getModelArg(); if (model === 'v4' && semver.lt(process.versions.node, '18.0.0')) { @@ -108,6 +114,7 @@ async function startFuncProcess(appPath: string): Promise { [EnvVarNames.cosmosDB]: cosmosDBConnectionString, [EnvVarNames.serviceBus]: serviceBusConnectionString, [EnvVarNames.sql]: sqlConnectionString, + FUNCTIONS_REQUEST_BODY_SIZE_LIMIT: '4294967296', }, }, null, @@ -123,7 +130,9 @@ async function startFuncProcess(appPath: string): Promise { childProc.stdout?.on('data', (data: string | Buffer) => { data = data.toString(); - process.stdout.write(data); + if (!funcCliSettings.hideOutput) { + process.stdout.write(data); + } perTestFuncOutput += data; fullFuncOutput += data; }); diff --git a/src/http.test.ts b/src/http.test.ts index c9293bb..1a7c349 100644 --- a/src/http.test.ts +++ b/src/http.test.ts @@ -1,13 +1,17 @@ // Copyright (c) .NET Foundation. All rights reserved. // Licensed under the MIT License. +import Agent from 'agentkeepalive'; import { expect } from 'chai'; import { encode } from 'iconv-lite'; // Node.js core added support for fetch in v18, but while we're testing versions <18 we'll use "node-fetch" import { default as fetch, HeadersInit } from 'node-fetch'; +import { Readable } from 'stream'; import util from 'util'; import { getFuncUrl } from './constants'; -import { model } from './global.test'; +import { funcCliSettings, isOldConfig, model } from './global.test'; +import { addRandomAsyncOrSyncDelay, getRandomTestData } from './utils/getRandomTestData'; +import { convertMbToB, createRandomStream, receiveStreamWithProgress } from './utils/streamHttp'; const helloWorldUrl = getFuncUrl('helloWorld'); const httpRawBodyUrl = getFuncUrl('httpRawBody'); @@ -23,6 +27,10 @@ const multipartFormHeaders = getContentTypeHeaders('multipart/form'); const textPlainHeaders = getContentTypeHeaders('text/plain'); describe('http', () => { + afterEach(() => { + funcCliSettings.hideOutput = false; + }); + it('hello world', async () => { const response = await fetch(helloWorldUrl); const body = await response.text(); @@ -57,9 +65,108 @@ describe('http', () => { expect(body).to.equal(''); expect(response.status).to.equal(200); const cookies = response.headers.get('Set-Cookie'); - expect(cookies).to.equal( - 'mycookie=myvalue; max-age=200000; path=/, mycookie2=myvalue; max-age=200000; path=/, mycookie3-expires=myvalue3-expires; max-age=0; path=/, mycookie4-samesite-lax=myvalue; path=/; samesite=lax, mycookie5-samesite-strict=myvalue; path=/; samesite=strict' - ); + + // Cookie logic changed slightly with the http streaming feature, although it should be functionally the same + // The old logic adds the default "path=/" to every cookie and the new logic only adds the path if it's explicitly specified + if (isOldConfig || model === 'v3') { + expect(cookies).to.equal( + 'mycookie=myvalue; max-age=200000; path=/, mycookie2=myvalue; max-age=200000; path=/, mycookie3-expires=myvalue3-expires; max-age=0; path=/, mycookie4-samesite-lax=myvalue; path=/; samesite=lax, mycookie5-samesite-strict=myvalue; path=/; samesite=strict' + ); + } else { + expect(cookies?.toLowerCase()).to.equal( + 'mycookie=myvalue; max-age=200000, mycookie2=myvalue; max-age=200000; path=/, mycookie3-expires=myvalue3-expires; max-age=0, mycookie4-samesite-lax=myvalue; samesite=lax, mycookie5-samesite-strict=myvalue; samesite=strict' + ); + } + }); + + // Use a connection pool to avoid flaky test failures due to various connection limits (Mac in particular seems to have a low limit) + // NOTE: The node-fetch package has a bug starting in 2.6.8 related to keep alive agents, so we have to use 2.6.7 + // https://github.com/node-fetch/node-fetch/issues/1767 + const keepaliveAgent = new Agent({ maxSockets: 128, maxFreeSockets: 64 }); + + async function validateIndividualRequest(url: string): Promise { + const data = getRandomTestData(); + await addRandomAsyncOrSyncDelay(); + const response = await fetch(url, { + method: 'POST', + body: data, + timeout: 40 * 1000, + agent: keepaliveAgent, + }); + const body = await response.text(); + expect(body).to.equal(`Hello, ${data}!`); + } + + it('http trigger concurrent requests', async function (this: Mocha.Context) { + funcCliSettings.hideOutput = true; // because this test is too noisy + const url = getFuncUrl('httpTriggerRandomDelay'); + + const reqs: Promise[] = []; + const numReqs = 1024; + for (let i = 0; i < numReqs; i++) { + reqs.push(validateIndividualRequest(url)); + } + let countFailed = 0; + let countTimedOut = 0; + let countSucceeded = 0; + const results = await Promise.allSettled(reqs); + for (const result of results) { + if (result.status === 'rejected') { + if (typeof result.reason?.message === 'string' && /timeout/i.test(result.reason.message)) { + countTimedOut += 1; + } else { + console.error(result.reason); + countFailed += 1; + } + } else { + countSucceeded += 1; + } + } + if (countFailed > 0 || countTimedOut > 0) { + throw new Error( + `${countFailed} request(s) failed, ${countTimedOut} timed out, ${countSucceeded} succeeded` + ); + } + }); + + describe('stream', () => { + before(function (this: Mocha.Context) { + if (isOldConfig || model === 'v3') { + this.skip(); + } + }); + + it('hello world stream', async () => { + const body = new Readable(); + body._read = () => {}; + body.push('testName-chunked'); + body.push(null); + + const response = await fetch(helloWorldUrl, { method: 'POST', body }); + const resBody = await response.text(); + expect(resBody).to.equal('Hello, testName-chunked!'); + expect(response.status).to.equal(200); + }); + + for (const lengthInMb of [32, 128, 512, 2048]) { + it(`send stream ${lengthInMb}mb`, async () => { + const funcUrl = getFuncUrl('httpTriggerReceiveStream'); + const randomStream = createRandomStream(lengthInMb); + const response = await fetch(funcUrl, { method: 'POST', body: randomStream }); + const resBody = await response.text(); + expect(resBody).to.equal(`Bytes received: ${convertMbToB(lengthInMb)}`); + expect(response.status).to.equal(200); + }); + + it(`receive stream ${lengthInMb}mb`, async () => { + const funcUrl = getFuncUrl('httpTriggerSendStream'); + const response = await fetch(`${funcUrl}?lengthInMb=${lengthInMb}`, { method: 'GET' }); + + const bytesReceived = await receiveStreamWithProgress(response.body); + expect(bytesReceived).to.equal(convertMbToB(lengthInMb)); + expect(response.status).to.equal(200); + }); + } }); describe('v3 only', () => { diff --git a/src/utils/delay.ts b/src/utils/delay.ts index 297eb21..7b7a884 100644 --- a/src/utils/delay.ts +++ b/src/utils/delay.ts @@ -4,3 +4,10 @@ export async function delay(ms: number): Promise { await new Promise((resolve) => setTimeout(resolve, ms)); } + +export function delaySync(ms: number): void { + const endTime = Date.now() + ms; + while (Date.now() < endTime) { + // wait + } +} diff --git a/src/utils/getRandomTestData.ts b/src/utils/getRandomTestData.ts index 87170f5..618f46d 100644 --- a/src/utils/getRandomTestData.ts +++ b/src/utils/getRandomTestData.ts @@ -1,7 +1,8 @@ // Copyright (c) .NET Foundation. All rights reserved. // Licensed under the MIT License. -import crypto from 'crypto'; +import * as crypto from 'crypto'; +import { delay, delaySync } from './delay'; export function getRandomTestData(): string { // This should start with non-numeric data to prevent this bug from causing a test failure @@ -13,3 +14,19 @@ export function getRandomHexString(length = 10): string { const buffer: Buffer = crypto.randomBytes(Math.ceil(length / 2)); return buffer.toString('hex').slice(0, length); } + +export function getRandomInt(min: number, max: number): number { + return Math.floor(Math.random() * (max - min) + min); +} + +export function getRandomBoolean(percentTrue: number): boolean { + return Math.random() * 100 > percentTrue; +} + +export async function addRandomAsyncOrSyncDelay(): Promise { + if (getRandomBoolean(95)) { + await delay(getRandomInt(0, 250)); + } else { + delaySync(getRandomInt(0, 10)); + } +} diff --git a/src/utils/streamHttp.ts b/src/utils/streamHttp.ts new file mode 100644 index 0000000..68760d8 --- /dev/null +++ b/src/utils/streamHttp.ts @@ -0,0 +1,57 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. + +import * as crypto from 'crypto'; +import { Readable } from 'stream'; +import { delay } from './delay'; + +const oneMb = 1024 * 1024; + +export function createRandomStream(lengthInMb: number): Readable { + const stream = new Readable(); + stream._read = () => {}; + setTimeout(() => { + void sendRandomData(stream, lengthInMb); + }, 5); + return stream; +} + +async function sendRandomData(stream: Readable, lengthInMb: number): Promise { + const maxChunkSize = oneMb; + let remainingBytes = convertMbToB(lengthInMb); + do { + if (stream.readableLength > maxChunkSize) { + await delay(5); + } else { + const chunkSize = Math.min(maxChunkSize, remainingBytes); + stream.push(crypto.randomBytes(chunkSize)); + remainingBytes -= chunkSize; + } + } while (remainingBytes > 0); + stream.push(null); +} + +export async function receiveStreamWithProgress(stream: { + [Symbol.asyncIterator](): AsyncIterableIterator; +}): Promise { + let bytesReceived = 0; + const logInterval = 500; + let nextLogTime = Date.now(); + for await (const chunk of stream) { + if (Date.now() > nextLogTime) { + nextLogTime = Date.now() + logInterval; + console.log(`Progress: ${convertBToMb(bytesReceived)}mb`); + } + + bytesReceived += chunk.length; + } + return bytesReceived; +} + +export function convertMbToB(mb: number): number { + return mb * oneMb; +} + +function convertBToMb(bytes: number) { + return Math.round(bytes / oneMb); +}