Skip to content

Commit

Permalink
feat: new log option to set the log level
Browse files Browse the repository at this point in the history
  • Loading branch information
dreyacosta committed Jun 24, 2024
1 parent e6697b8 commit 694c278
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 31 deletions.
25 changes: 17 additions & 8 deletions lib/nixbus.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import { getNixBusCrypto } from '@nixbus/crypto'

import type { LogLevel } from 'src/infrastructure/Logger'
import { NixEventBus } from 'src/domain/NixEventBus'
import { HttpNixEvents } from 'src/infrastructure/HttpNixEvents'
import { InMemoryNixEvents } from 'src/infrastructure/InMemoryNixEvents'
import { Logger } from 'src/infrastructure/Logger'
import { NixBusHttpClient } from 'src/infrastructure/NixBusHttpClient'

export type { NixEvents } from 'src/domain/NixEvents'
Expand All @@ -19,25 +21,31 @@ export { InMemoryNixEvents } from 'src/infrastructure/InMemoryNixEvents'
export { NixBusHttpClient } from 'src/infrastructure/NixBusHttpClient'
export { EventIdIsRequired } from 'src/domain/errors'

type InMemoryNixBusOptions = {
log?: LogLevel
}

let _inMemoryNixBus: NixEventBus | null = null
export function getInMemoryNixBus(): NixEventBus {
export function getInMemoryNixBus(options: InMemoryNixBusOptions = {}): NixEventBus {
if (_inMemoryNixBus) {
return _inMemoryNixBus
}
_inMemoryNixBus = createInMemoryNixBus()
_inMemoryNixBus = createInMemoryNixBus(options)
return _inMemoryNixBus
}

export function createInMemoryNixBus(): NixEventBus {
export function createInMemoryNixBus(options: InMemoryNixBusOptions = {}): NixEventBus {
const events = new InMemoryNixEvents()
return new NixEventBus({ events })
const logger = new Logger({ level: options.log?.level })
return new NixEventBus({ events, logger })
}

type HttpNixBusOptions = {
passphrase: string
token: string
clientEncryption?: boolean
baseUrl?: string
log?: LogLevel
}

let _httpNixBus: NixEventBus | null = null
Expand All @@ -50,14 +58,15 @@ export function getHttpNixBus(options: HttpNixBusOptions): NixEventBus {
}

export function createHttpNixBus(options: HttpNixBusOptions): NixEventBus {
const logger = new Logger({ level: options.log?.level })
const encrypted = options.clientEncryption ?? true
if (!encrypted) {
const client = new NixBusHttpClient(
{ crypto: null },
{ crypto: null, logger },
{ token: options.token, baseUrl: options.baseUrl },
)
const events = new HttpNixEvents({ client })
return new NixEventBus({ events })
return new NixEventBus({ events, logger })
}

const defaultPassphraseVersion = 'v1'
Expand All @@ -66,9 +75,9 @@ export function createHttpNixBus(options: HttpNixBusOptions): NixEventBus {
passphrases: [{ version: defaultPassphraseVersion, phrase: options.passphrase }],
})
const client = new NixBusHttpClient(
{ crypto: nixBusCrypto },
{ crypto: nixBusCrypto, logger },
{ token: options.token, baseUrl: options.baseUrl },
)
const events = new HttpNixEvents({ client })
return new NixEventBus({ events })
return new NixEventBus({ events, logger })
}
51 changes: 36 additions & 15 deletions src/domain/NixEventBus.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import type { NixEvent, NixNewEvent } from 'src/domain/NixEvent'
import type { NixEvents } from 'src/domain/NixEvents'
import type { NixSubscriber, NixSubscriberAction, NixSubscriberId } from 'src/domain/NixSubscriber'
import type { Logger } from 'src/infrastructure/Logger'
import { setAsyncInterval } from 'src/shared/interval'

type EventBusDeps = {
events: NixEvents
logger: Logger
}

export class NixEventBus {
Expand All @@ -21,20 +23,28 @@ export class NixEventBus {
subscriber: NixSubscriber & { action: NixSubscriberAction },
) {
this.subscribersActions[subscriber.id] = subscriber.action
await this.deps.events.subscribe(eventType, subscriber).catch(console.error)
await this.deps.events
.subscribe(eventType, subscriber)
.catch((error) => this.deps.logger.error('EventBus', 'subscribe', { error }))
}

public async unsubscribe(evenType: string, subscriberId: NixSubscriberId) {
await this.deps.events.unsubscribe(evenType, subscriberId).catch(console.error)
await this.deps.events
.unsubscribe(evenType, subscriberId)
.catch((error) => this.deps.logger.error('EventBus', 'unsubscribe', { error }))
}

public async unsubscribeAll() {
this.subscribersActions = {}
await this.deps.events.unsubscribeAll().catch(console.error)
await this.deps.events
.unsubscribeAll()
.catch((error) => this.deps.logger.error('EventBus', 'unsubscribeAll', { error }))
}

public async publish(event: NixNewEvent) {
await this.deps.events.put({ event }).catch(console.error)
await this.deps.events
.put({ event })
.catch((error) => this.deps.logger.error('EventBus', 'publish', { error }))
}

public async run() {
Expand All @@ -52,7 +62,9 @@ export class NixEventBus {
}),
)
} catch (error: any) {
console.error(`[EventBus] runScheduler error:`, error)
this.deps.logger.error('EventBus', 'runScheduler', {
error,
})
}
}

Expand All @@ -62,12 +74,20 @@ export class NixEventBus {

Promise.all(
events.map((event) => {
console.log(`[EventBus] runSubscriber: ${subscriber.id} ${event.id}...`)
this.deps.logger.info('EventBus', 'runSubscriber', {
event_id: event.id,
event_type: event.type,
subscriber_id: subscriber.id,
})
return this.runSubscriberAction(event, subscriber)
}),
)
} catch (error: any) {
console.error(`[EventBus] runSubscriber error:`, error)
this.deps.logger.error('EventBus', 'runSubscriber', {
method: 'runSubscriber',
subscriber_id: subscriber.id,
error,
})
}
}

Expand All @@ -79,20 +99,21 @@ export class NixEventBus {
event,
subscriber,
})
console.log(
`[EventBus] runSubscriberAction processed.`,
JSON.stringify({ eventId: event.id, subscriberId: subscriber.id }),
)
this.deps.logger.info('EventBus', 'runSubscriberAction', {
event_id: event.id,
event_type: event.type,
subscriber_id: subscriber.id,
})
} catch (error: any) {
await this.deps.events.markAsFailed({
event,
subscriber,
})
console.error(
`[EventBus] runSubscriberAction failed.`,
JSON.stringify({ eventId: event.id, subscriberId: subscriber.id }),
this.deps.logger.error('EventBus', 'runSubscriberAction', {
event_id: event.id,
subscriber_id: subscriber.id,
error,
)
})
}
}
}
56 changes: 56 additions & 0 deletions src/infrastructure/Logger.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
export type LogLevel = {
level?: 'info' | 'error' | 'debug'
}

const LOG_LEVELS = {
debug: 0,
info: 1,
error: 2,
}

export class Logger {
private readonly level: number

constructor(options: LogLevel = {}) {
this.level = this.toLogLevel(options.level)
}

public info(namespace: string, name: string, data: Record<any, any>) {
if (LOG_LEVELS.info >= this.level) {
console.info(this.formatPrefix(namespace, name), this.serialize(data))
}
}

public error(namespace: string, name: string, data: Record<any, any>) {
if (LOG_LEVELS.error >= this.level) {
console.error(this.formatPrefix(namespace, name), this.serialize(data))
}
}

public debug(namespace: string, name: string, data: Record<any, any>) {
if (LOG_LEVELS.debug >= this.level) {
console.debug(this.formatPrefix(namespace, name), this.serialize(data))
}
}

private toLogLevel(level?: 'info' | 'error' | 'debug'): number {
if (level === 'info') {
return LOG_LEVELS.info
}
if (level === 'error') {
return LOG_LEVELS.error
}
if (level === 'debug') {
return LOG_LEVELS.debug
}
return 100
}

private formatPrefix(namespace: string, name: string) {
return `[${namespace}][${name}]`
}

private serialize(data: Record<any, any>) {
return JSON.stringify(data)
}
}
12 changes: 5 additions & 7 deletions src/infrastructure/NixBusHttpClient.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import type { NixBusCrypto } from '@nixbus/crypto'

import type { NixSubscriberId } from 'src/domain/NixSubscriber'
import type { Logger } from 'src/infrastructure/Logger'
import { fetchJson } from 'src/shared/fetch'

export type EventsResponse = {
Expand Down Expand Up @@ -88,6 +89,7 @@ export type RemoveSubscriberRequest = {

type Deps = {
crypto: NixBusCrypto | null
logger: Logger
}
export type NixBusHttpClientOptions = {
token: string
Expand Down Expand Up @@ -141,7 +143,7 @@ export class NixBusHttpClient {
delete this.findNextEventsTimeout[subscriberId]
}

const e = await Promise.all(data.events.map((i) => this.serialize(subscriberId, i)))
const e = await Promise.all(data.events.map((i) => this.deserialize(subscriberId, i)))
const events = e.filter((i) => i !== null) as FindEventResponse[]
return {
events,
Expand Down Expand Up @@ -271,7 +273,7 @@ export class NixBusHttpClient {
})
}

private async serialize(
private async deserialize(
subscriberId: string,
i: EventResponse,
): Promise<FindEventResponse | null> {
Expand All @@ -285,11 +287,7 @@ export class NixBusHttpClient {
updated_at: new Date(i.updated_at),
}
} catch (error) {
console.log(
`[NixBusHttpClient] Decryption failed for event ${i.id}. Marking as failed.`,
i.payload,
error,
)
this.deps.logger.error('NixBusHttpClient', 'deserialize', { event_id: i.id, error })
await this.markEventsAsFailed({
events: [
{
Expand Down
3 changes: 2 additions & 1 deletion tests/domain/NixEventBus.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ import { expect, test } from 'playwright/test'
import type { NixEvents } from 'src/domain/NixEvents'
import { NixEventBus } from 'src/domain/NixEventBus'
import { InMemoryNixEvents } from 'src/infrastructure/InMemoryNixEvents'
import { Logger } from 'src/infrastructure/Logger'

test.describe('NixEventBus', () => {
let events: NixEvents
let eventBus: NixEventBus

test.beforeEach(() => {
events = new InMemoryNixEvents()
eventBus = new NixEventBus({ events })
eventBus = new NixEventBus({ events, logger: new Logger() })
})

test('subscribe to events', async () => {
Expand Down

0 comments on commit 694c278

Please sign in to comment.