Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allow configuring crawler statistics #2213

Merged
merged 13 commits into from
Dec 20, 2023
Merged
13 changes: 12 additions & 1 deletion packages/basic-crawler/src/internals/basic-crawler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import type {
SessionPoolOptions,
Source,
StatisticState,
StatisticsOptions,
} from '@crawlee/core';
import {
AutoscaledPool,
Expand Down Expand Up @@ -338,6 +339,12 @@ export interface BasicCrawlerOptions<Context extends CrawlingContext = BasicCraw
* WARNING: these options are not guaranteed to be stable and may change or be removed at any time.
*/
experiments?: CrawlerExperiments;

/**
* Customize the way statistics collecting works, such as logging interval or
* whether to output them to the Key-Value store.
*/
statisticsOptions?: StatisticsOptions;
}

/**
Expand Down Expand Up @@ -524,6 +531,8 @@ export class BasicCrawler<Context extends CrawlingContext = BasicCrawlingContext
// internal
log: ow.optional.object,
experiments: ow.optional.object,

statisticsOptions: ow.optional.object,
};

/**
Expand Down Expand Up @@ -569,6 +578,8 @@ export class BasicCrawler<Context extends CrawlingContext = BasicCrawlingContext

statusMessageLoggingInterval = 10,
statusMessageCallback,

statisticsOptions,
} = options;

this.requestList = requestList;
Expand Down Expand Up @@ -650,7 +661,7 @@ export class BasicCrawler<Context extends CrawlingContext = BasicCrawlingContext
this.sameDomainDelayMillis = sameDomainDelaySecs * 1000;
this.maxSessionRotations = maxSessionRotations;
this.handledRequestsCount = 0;
this.stats = new Statistics({ logMessage: `${log.getOptions().prefix} request statistics:`, config });
this.stats = new Statistics({ logMessage: `${log.getOptions().prefix} request statistics:`, config, ...statisticsOptions });
this.sessionPoolOptions = {
...sessionPoolOptions,
log,
Expand Down
69 changes: 63 additions & 6 deletions packages/core/src/crawlers/statistics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,17 @@ const errorTrackerConfig = {
showFullMessage: false,
};

/**
* Persistence-related options to control how and when crawler's data gets persisted.
*/
export interface PersistenceOptions {
/**
* Use this flag to disable or enable periodic persistence to key value store.
* @default true
*/
enable?: boolean;
}

/**
* The statistics class provides an interface to collecting and logging run
* statistics for requests.
Expand Down Expand Up @@ -92,6 +103,7 @@ export class Statistics {
private instanceStart!: number;
private logInterval: unknown;
private events: EventManager;
private persistenceOptions: PersistenceOptions;

/**
* @internal
Expand All @@ -102,13 +114,17 @@ export class Statistics {
logMessage: ow.optional.string,
keyValueStore: ow.optional.object,
config: ow.optional.object,
persistenceOptions: ow.optional.object,
}));

const {
logIntervalSecs = 60,
logMessage = 'Statistics',
keyValueStore,
config = Configuration.getGlobalConfig(),
persistenceOptions = {
enable: true,
},
} = options;

this.logIntervalMillis = logIntervalSecs * 1000;
Expand All @@ -117,6 +133,7 @@ export class Statistics {
this.listener = this.persistState.bind(this);
this.events = config.getEventManager();
this.config = config;
this.persistenceOptions = persistenceOptions;

// initialize by "resetting"
this.reset();
Expand Down Expand Up @@ -155,7 +172,14 @@ export class Statistics {
this._teardown();
}

async resetStore() {
/**
* @param options - Override the persistence options provided in the constructor
*/
async resetStore(options?: PersistenceOptions) {
if (!this.persistenceOptions.enable && !options?.enable) {
return;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i like to keep one blank line after control statements like if/for/while/etc, this applies to other places in the PR too (e.g. the persistState method), take it as a rule of thumb

Suggested change
}
}


if (!this.keyValueStore) {
return;
}
Expand Down Expand Up @@ -247,13 +271,14 @@ export class Statistics {
async startCapturing() {
this.keyValueStore ??= await KeyValueStore.open(null, { config: this.config });

await this._maybeLoadStatistics();

if (this.state.crawlerStartedAt === null) {
this.state.crawlerStartedAt = new Date();
}

this.events.on(EventType.PERSIST_STATE, this.listener);
if (this.persistenceOptions.enable) {
await this._maybeLoadStatistics();
this.events.on(EventType.PERSIST_STATE, this.listener);
}

this.logInterval = setInterval(() => {
this.log.info(this.logMessage, {
Expand Down Expand Up @@ -284,8 +309,13 @@ export class Statistics {

/**
* Persist internal state to the key value store
* @param options - Override the persistence options provided in the constructor
*/
async persistState() {
async persistState(options?: PersistenceOptions) {
if (!this.persistenceOptions.enable && !options?.enable) {
return;
}

// this might be called before startCapturing was called without using await, should not crash
if (!this.keyValueStore) {
return;
Expand Down Expand Up @@ -382,11 +412,38 @@ export class Statistics {
}
}

interface StatisticsOptions {
/**
* Configuration for the {@apilink Statistics} instance used by the crawler
*/
export interface StatisticsOptions {
/**
* Interval in seconds to log the current statistics
* @default 60
*/
logIntervalSecs?: number;

/**
* Message to log with the current statistics
* @default 'Statistics'
*/
logMessage?: string;

/**
* Key value store instance to persist the statistics.
* If not provided, the default one will be used when capturing starts
*/
keyValueStore?: KeyValueStore;

/**
* Configuration instance to use
* @default Configuration.getGlobalConfig()
*/
config?: Configuration;

/**
* Control how and when to persist the statistics.
*/
persistenceOptions?: PersistenceOptions;
}

/**
Expand Down
40 changes: 37 additions & 3 deletions packages/core/src/session_pool/session_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import ow from 'ow';
import type { SessionOptions } from './session';
import { Session } from './session';
import { Configuration } from '../configuration';
import type { PersistenceOptions } from '../crawlers/statistics';
import type { EventManager } from '../events/event_manager';
import { EventType } from '../events/event_manager';
import { log as defaultLog } from '../log';
Expand Down Expand Up @@ -58,6 +59,11 @@ export interface SessionPoolOptions {

/** @internal */
log?: Log;

/**
* Control how and when to persist the state of the session pool.
*/
persistenceOptions?: PersistenceOptions;
}

/**
Expand Down Expand Up @@ -138,6 +144,8 @@ export class SessionPool extends EventEmitter {
protected _listener!: () => Promise<void>;
protected events: EventManager;
protected readonly blockedStatusCodes: number[];
protected persistenceOptions: PersistenceOptions;
protected isInitialized = false;

/**
* @internal
Expand All @@ -153,6 +161,7 @@ export class SessionPool extends EventEmitter {
sessionOptions: ow.optional.object,
blockedStatusCodes: ow.optional.array.ofType(ow.number),
log: ow.optional.object,
persistenceOptions: ow.optional.object,
}));

const {
Expand All @@ -163,12 +172,16 @@ export class SessionPool extends EventEmitter {
sessionOptions = {},
blockedStatusCodes = [401, 403, 429],
log = defaultLog,
persistenceOptions = {
enable: true,
},
} = options;

this.config = config;
this.blockedStatusCodes = blockedStatusCodes;
this.events = config.getEventManager();
this.log = log.child({ prefix: 'SessionPool' });
this.persistenceOptions = persistenceOptions;

// Pool Configuration
this.maxPoolSize = maxPoolSize;
Expand Down Expand Up @@ -206,7 +219,15 @@ export class SessionPool extends EventEmitter {
* It is called automatically by the {@apilink SessionPool.open} function.
*/
async initialize(): Promise<void> {
if (this.isInitialized) {
return;
}

this.keyValueStore = await KeyValueStore.open(this.persistStateKeyValueStoreId, { config: this.config });
if (!this.persistenceOptions.enable) {
this.isInitialized = true;
return;
}

if (!this.persistStateKeyValueStoreId) {
// eslint-disable-next-line max-len
Expand All @@ -219,6 +240,7 @@ export class SessionPool extends EventEmitter {
this._listener = this.persistState.bind(this);

this.events.on(EventType.PERSIST_STATE, this._listener);
this.isInitialized = true;
}

/**
Expand Down Expand Up @@ -290,7 +312,14 @@ export class SessionPool extends EventEmitter {
return this._createSession();
}

async resetStore() {
/**
* @param options - Override the persistence options provided in the constructor
*/
async resetStore(options?: PersistenceOptions) {
if (!this.persistenceOptions.enable && !options?.enable) {
return;
}

await this.keyValueStore?.setValue(this.persistStateKey, null);
}

Expand All @@ -309,8 +338,13 @@ export class SessionPool extends EventEmitter {
/**
* Persists the current state of the `SessionPool` into the default {@apilink KeyValueStore}.
* The state is persisted automatically in regular intervals.
* @param options - Override the persistence options provided in the constructor
*/
async persistState(): Promise<void> {
async persistState(options?: PersistenceOptions): Promise<void> {
if (!this.persistenceOptions.enable && !options?.enable) {
return;
}

this.log.debug('Persisting state', {
persistStateKeyValueStoreId: this.persistStateKeyValueStoreId,
persistStateKey: this.persistStateKey,
Expand All @@ -331,7 +365,7 @@ export class SessionPool extends EventEmitter {
* SessionPool should not work before initialization.
*/
protected _throwIfNotInitialized() {
if (!this._listener) throw new Error('SessionPool is not initialized.');
if (!this.isInitialized) throw new Error('SessionPool is not initialized.');
}

/**
Expand Down
Loading