Skip to content

Commit

Permalink
feat(core): add crawler.exportData() helper
Browse files Browse the repository at this point in the history
Retrieves all the data from the default crawler `Dataset` and exports them to the specified format.
Supported formats are currently 'json' and 'csv', and will be inferred from the `path` automatically.

```ts
const crawler = new BasicCrawler({ ... });
crawler.pushData({ ... });

await crawler.exportData('./data.csv');
```
  • Loading branch information
B4nan committed Nov 6, 2023
1 parent b59ee50 commit 69c2931
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 22 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/test-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ jobs:

- name: Tests
run: yarn test
env:
YARN_IGNORE_NODE: 1

docs:
name: Docs build
Expand Down
76 changes: 60 additions & 16 deletions packages/basic-crawler/src/internals/basic-crawler.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import defaultLog, { LogLevel } from '@apify/log';
import type { Log } from '@apify/log';
import { addTimeoutToPromise, tryCancel, TimeoutError } from '@apify/timeout';
import defaultLog, { LogLevel } from '@apify/log';
import { addTimeoutToPromise, TimeoutError, tryCancel } from '@apify/timeout';
import { cryptoRandomObjectId } from '@apify/utilities';
import type {
AddRequestsBatchedOptions,
Expand All @@ -24,28 +24,30 @@ import type {
StatisticState,
} from '@crawlee/core';
import {
Dataset,
AutoscaledPool,
Configuration,
CriticalError,
Dataset,
enqueueLinks,
EventType,
KeyValueStore,
mergeCookies,
NonRetryableError,
purgeDefaultStorages,
RequestQueue,
RequestQueueV2,
RequestState,
RetryRequestError,
Router,
SessionError,
SessionPool,
Statistics,
enqueueLinks,
mergeCookies,
purgeDefaultStorages,
validators,
SessionError,
CriticalError,
} from '@crawlee/core';
import type { Dictionary, Awaitable, BatchAddRequestsResult, SetStatusMessageOptions } from '@crawlee/types';
import type { Awaitable, BatchAddRequestsResult, Dictionary, SetStatusMessageOptions } from '@crawlee/types';
import { ROTATE_PROXY_ERRORS } from '@crawlee/utils';
import { stringify } from 'csv-stringify/sync';
import { writeFile, writeJSON } from 'fs-extra';
import type { Method, OptionsInit } from 'got-scraping';
import { gotScraping } from 'got-scraping';
import ow, { ArgumentError } from 'ow';
Expand Down Expand Up @@ -476,6 +478,7 @@ export class BasicCrawler<Context extends CrawlingContext = BasicCrawlingContext
protected autoscaledPoolOptions: AutoscaledPoolOptions;
protected events: EventManager;
protected retryOnBlocked: boolean;
protected dataset!: Dataset;
private _closeEvents?: boolean;

private experiments: CrawlerExperiments;
Expand Down Expand Up @@ -935,23 +938,63 @@ export class BasicCrawler<Context extends CrawlingContext = BasicCrawlingContext
* Pushes data to the default crawler {@apilink Dataset} by calling {@apilink Dataset.pushData}.
*/
async pushData(...args: Parameters<Dataset['pushData']>): Promise<void> {
const dataset = await Dataset.open(undefined, { config: this.config });
return dataset.pushData(...args);
if (this.dataset == null) {
this.dataset = await Dataset.open(undefined, { config: this.config });
}

return this.dataset.pushData(...args);
}

/**
* Retrieves the default crawler {@apilink Dataset} by calling {@apilink Dataset.open}.
* Retrieves the default crawler {@apilink Dataset}.
*/
async getDataset(): Promise<Dataset> {
return Dataset.open(undefined, { config: this.config });
getDataset(): Dataset {
return this.dataset;
}

/**
* Retrieves data from the default crawler {@apilink Dataset} by calling {@apilink Dataset.getData}.
*/
async getData(...args: Parameters<Dataset['getData']>): ReturnType<Dataset['getData']> {
const dataset = await this.getDataset();
return dataset.getData(...args);
return this.getDataset().getData(...args);
}

/**
* Retrieves all the data from the default crawler {@apilink Dataset} and exports them to the specified format.
* Supported formats are currently 'json' and 'csv', and will be inferred from the `path` automatically.
*/
async exportData<Data>(path: string, format?: 'json' | 'csv'): Promise<Data[]> {
const supportedFormats = ['json', 'csv'];

if (!format && path.match(/\.(json|csv)$/i)) {
format = path.toLowerCase().match(/\.(json|csv)$/)![1] as 'json' | 'csv';
}

if (!format) {
throw new Error(`Failed to infer format from the path: '${path}'. Supported formats: ${supportedFormats.join(', ')}`);
}

if (!supportedFormats.includes(format)) {
throw new Error(`Unsupported format: '${format}'. Use one of ${supportedFormats.join(', ')}`);
}

const items = await this.getDataset().exportTo('', {}, 'object');

if (format === 'csv') {
const value = stringify([
Object.keys(items[0]),
...items.map((item) => Object.values(item)),
]);
await writeFile(path, value);
this.log.info(`Export to ${path} finished!`);
}

if (format === 'json') {
await writeJSON(path, items, { spaces: 4 });
this.log.info(`Export to ${path} finished!`);
}

return items;
}

protected async _init(): Promise<void> {
Expand All @@ -972,6 +1015,7 @@ export class BasicCrawler<Context extends CrawlingContext = BasicCrawlingContext
}

await this._loadHandledRequestCount();
this.dataset = await Dataset.open(undefined, { config: this.config });
}

protected async _runRequestHandler(crawlingContext: Context): Promise<void> {
Expand Down
14 changes: 10 additions & 4 deletions packages/core/src/storages/dataset.ts
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ export class Dataset<Data extends Dictionary = Dictionary> {
* @param [options] An optional options object where you can provide the dataset and target KVS name.
* @param [contentType] Only JSON and CSV are supported currently, defaults to JSON.
*/
async exportTo(key: string, options?: ExportOptions, contentType?: string): Promise<void> {
async exportTo(key: string, options?: ExportOptions, contentType?: string): Promise<Data[]> {
const kvStore = await KeyValueStore.open(options?.toKVS ?? null, { config: this.config });
const items: Data[] = [];

Expand All @@ -326,14 +326,20 @@ export class Dataset<Data extends Dictionary = Dictionary> {
Object.keys(items[0]),
...items.map((item) => Object.values(item)),
]);
return kvStore.setValue(key, value, { contentType });
await kvStore.setValue(key, value, { contentType });
return items;
}

if (contentType === 'application/json') {
return kvStore.setValue(key, items);
await kvStore.setValue(key, items);
return items;
}

throw new Error(`Unsupported content type: ${contentType}`);
if (contentType !== 'object') {
throw new Error(`Unsupported content type: ${contentType}`);
}

return items;
}

/**
Expand Down
38 changes: 36 additions & 2 deletions test/core/crawlers/basic_crawler.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import type { Server } from 'http';
import http from 'http';
import type { AddressInfo } from 'net';
import { readFile, rm } from 'node:fs/promises';

import log from '@apify/log';
import type {
Expand Down Expand Up @@ -1383,8 +1384,8 @@ describe('BasicCrawler', () => {
});

describe('Dataset helpers, crawler paralellism', () => {
const payload: Dictionary<any>[] = [{ foo: 'bar' }];
const getPayload: (id: string) => Dictionary<any>[] = (id) => [{ foo: id }];
const payload: Dictionary[] = [{ foo: 'bar', baz: 123 }];
const getPayload: (id: string) => Dictionary[] = (id) => [{ foo: id }];

test('should expose default Dataset methods', async () => {
const crawler = new BasicCrawler();
Expand All @@ -1395,6 +1396,39 @@ describe('BasicCrawler', () => {
.toEqual(payload);
});

test('export data', async () => {
const row: Dictionary = { foo: 'bar', baz: 123 };
const crawler = new BasicCrawler();

await crawler.pushData(row);
await crawler.pushData(row);
await crawler.pushData(row);

await crawler.exportData('./storage/result.csv');
await crawler.exportData('./storage/result.json');

const csv = await readFile('./storage/result.csv');
expect(csv.toString()).toBe('foo,baz\nbar,123\nbar,123\nbar,123\n');
const json = await readFile('./storage/result.json');
expect(json.toString()).toBe('[\n'
+ ' {\n'
+ ' "foo": "bar",\n'
+ ' "baz": 123\n'
+ ' },\n'
+ ' {\n'
+ ' "foo": "bar",\n'
+ ' "baz": 123\n'
+ ' },\n'
+ ' {\n'
+ ' "foo": "bar",\n'
+ ' "baz": 123\n'
+ ' }\n'
+ ']\n');

await rm('./storage/result.csv');
await rm('./storage/result.json');
});

test('should expose pushData helper', async () => {
const crawler = new BasicCrawler({
requestHandler: async ({ pushData }) => pushData(payload),
Expand Down

0 comments on commit 69c2931

Please sign in to comment.