From f30ca88d06a88a14e811d2fbd5ea9d4034817f0e Mon Sep 17 00:00:00 2001 From: Vlad Frangu Date: Wed, 29 Mar 2023 18:07:16 +0300 Subject: [PATCH 1/3] fix(MemoryStorage): fix handling of readable streams --- .../src/resource-clients/key-value-store.ts | 9 +++++++- packages/memory-storage/src/utils.ts | 17 ++++---------- .../test/key-value-store-stream.test.ts | 22 +++++++++++++++++++ 3 files changed, 34 insertions(+), 14 deletions(-) create mode 100644 packages/memory-storage/test/key-value-store-stream.test.ts diff --git a/packages/memory-storage/src/resource-clients/key-value-store.ts b/packages/memory-storage/src/resource-clients/key-value-store.ts index 3e7eec4b2695..6c1733859af0 100644 --- a/packages/memory-storage/src/resource-clients/key-value-store.ts +++ b/packages/memory-storage/src/resource-clients/key-value-store.ts @@ -217,7 +217,14 @@ export class KeyValueStoreClient extends BaseClient { async setRecord(record: storage.KeyValueStoreRecord): Promise { s.object({ key: s.string.lengthGreaterThan(0), - value: s.union(s.null, s.string, s.number, s.instance(Buffer), s.object({}).passthrough), + value: s.union( + s.null, + s.string, + s.number, + s.instance(Buffer), + s.instance(ArrayBuffer), + s.typedArray(), + s.object({}).passthrough), contentType: s.string.lengthGreaterThan(0).optional, }).parse(record); diff --git a/packages/memory-storage/src/utils.ts b/packages/memory-storage/src/utils.ts index 455de00c1a00..f79c02bc9b3b 100644 --- a/packages/memory-storage/src/utils.ts +++ b/packages/memory-storage/src/utils.ts @@ -33,9 +33,9 @@ export function uniqueKeyToRequestId(uniqueKey: string): string { export function isBuffer(value: unknown): boolean { try { s.union( - s.typedArray(), - s.instance(ArrayBuffer), s.instance(Buffer), + s.instance(ArrayBuffer), + s.typedArray(), ).parse(value); return true; @@ -44,17 +44,8 @@ export function isBuffer(value: unknown): boolean { } } -export function isStream(value: unknown): boolean { - try { - s.object({ - on: s.any, - pipe: s.any, - }).passthrough.parse(value); - - return true; - } catch { - return false; - } +export function isStream(value: any): boolean { + return ['on', 'pipe'].every((key) => key in value && typeof value[key] === 'function'); } export const memoryStorageLog = defaultLog.child({ prefix: 'MemoryStorage' }); diff --git a/packages/memory-storage/test/key-value-store-stream.test.ts b/packages/memory-storage/test/key-value-store-stream.test.ts new file mode 100644 index 000000000000..5eb6ae51e854 --- /dev/null +++ b/packages/memory-storage/test/key-value-store-stream.test.ts @@ -0,0 +1,22 @@ +import { MemoryStorage } from '@crawlee/memory-storage'; +import { Readable } from 'node:stream'; + +describe('KeyValueStore should drain streams when setting records', () => { + const storage = new MemoryStorage({ + persistStorage: false, + }); + + const fsStream = Readable.from([Buffer.from('hello'), Buffer.from('world')]); + + test('should drain stream', async () => { + const defaultStoreInfo = await storage.keyValueStores().getOrCreate('default'); + const defaultStore = storage.keyValueStore(defaultStoreInfo.id); + + await defaultStore.setRecord({ key: 'streamz', value: fsStream, contentType: 'text/plain' }); + + expect(fsStream.destroyed).toBeTruthy(); + + const record = await defaultStore.getRecord('streamz'); + expect(record!.value.toString('utf8')).toEqual('helloworld'); + }); +}); From 1ddf82af68c23928f1bb59cdfa8b45941a6aa7f5 Mon Sep 17 00:00:00 2001 From: Vlad Frangu Date: Wed, 29 Mar 2023 18:18:21 +0300 Subject: [PATCH 2/3] chore: disable object constraint checking I'll PR an anyObject validator to shapeshift --- .../memory-storage/src/resource-clients/key-value-store.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/packages/memory-storage/src/resource-clients/key-value-store.ts b/packages/memory-storage/src/resource-clients/key-value-store.ts index 6c1733859af0..10157a4698c4 100644 --- a/packages/memory-storage/src/resource-clients/key-value-store.ts +++ b/packages/memory-storage/src/resource-clients/key-value-store.ts @@ -224,7 +224,9 @@ export class KeyValueStoreClient extends BaseClient { s.instance(Buffer), s.instance(ArrayBuffer), s.typedArray(), - s.object({}).passthrough), + // disabling validation will make shapeshift only check the object given is an actual object, not null, nor array + s.object({}).setValidationEnabled(false), + ), contentType: s.string.lengthGreaterThan(0).optional, }).parse(record); From deaec088b0dd091ce165a682d8aa867e5c32f075 Mon Sep 17 00:00:00 2001 From: Vlad Frangu Date: Thu, 30 Mar 2023 10:48:20 +0300 Subject: [PATCH 3/3] fix: ensure value checked for isStream is object like --- packages/memory-storage/src/utils.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/memory-storage/src/utils.ts b/packages/memory-storage/src/utils.ts index f79c02bc9b3b..29baa9962fcb 100644 --- a/packages/memory-storage/src/utils.ts +++ b/packages/memory-storage/src/utils.ts @@ -45,7 +45,7 @@ export function isBuffer(value: unknown): boolean { } export function isStream(value: any): boolean { - return ['on', 'pipe'].every((key) => key in value && typeof value[key] === 'function'); + return typeof value === 'object' && value && ['on', 'pipe'].every((key) => key in value && typeof value[key] === 'function'); } export const memoryStorageLog = defaultLog.child({ prefix: 'MemoryStorage' });