Skip to content

Commit

Permalink
change to save data
Browse files Browse the repository at this point in the history
  • Loading branch information
viktree committed Apr 1, 2024
1 parent 6d6a500 commit a9dfc31
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 39 deletions.
23 changes: 10 additions & 13 deletions src/microservice/gcloud-pub-sub.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,11 @@ import { PubSub, Subscription, Message } from '@google-cloud/pubsub'
import { Server, CustomTransportStrategy } from '@nestjs/microservices'

import { MESSAGE, ERROR, PUB_SUB_DEFAULT_RETRY_CODES } from '../helpers/constants'
import { GCloudPubSubServerOptions } from '../interfaces/gcloud-pub-sub.interface'
import type { GCloudPubSubServerOptions } from '../interfaces/gcloud-pub-sub.interface'

const RETRY_INTERVAL = 5000

export class GCloudPubSubServer
extends Server
implements CustomTransportStrategy
{
export class GCloudPubSubServer extends Server implements CustomTransportStrategy {
public client: PubSub = null
public subscriptions: Subscription[] = []
public isShuttingDown: boolean = false
Expand All @@ -21,28 +18,28 @@ export class GCloudPubSubServer
public listen(callback: () => void) {
this.isShuttingDown = false
this.client = new PubSub(this.options.authOptions)
this.options.subscriptionIds.forEach((subcriptionName) => {
this.options.subscriptionIds.forEach((subscriptionName) => {
const subscription = this.client.subscription(
subcriptionName,
subscriptionName,
this.options.subscriberOptions || {}
)
const handleMessage = this.handleMessageFactory(subcriptionName)
const handleError = this.handleErrorFactory(subscription, subcriptionName)
const handleMessage = this.handleMessageFactory(subscriptionName)
const handleError = this.handleErrorFactory(subscription, subscriptionName)
subscription.on(MESSAGE, handleMessage.bind(this))
subscription.on(ERROR, handleError)
this.subscriptions.push(subscription)
})
callback()
}

public handleErrorFactory(subscription: Subscription, subcriptionName: string) {
return (error): void => {
public handleErrorFactory(subscription: Subscription, subscriptionName: string) {
return (error: any): void => {
this.handleError(error)
if (!this.isShuttingDown && PUB_SUB_DEFAULT_RETRY_CODES.includes(error.code)) {
this.logger.warn(`Closing subscription: ${subcriptionName}`)
this.logger.warn(`Closing subscription: ${subscriptionName}`)
subscription.close()
setTimeout(() => {
this.logger.warn(`Opening subscription: ${subcriptionName}`)
this.logger.warn(`Opening subscription: ${subscriptionName}`)
subscription.open()
}, RETRY_INTERVAL)
}
Expand Down
42 changes: 21 additions & 21 deletions src/module/gcloud-pub-sub.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,99 +38,99 @@ describe('GcloudPubSubService', () => {
const data = 'You Tried Your Best and You Failed Miserably. The Lesson Is Never Try'
const gcloudPubSubLibMock = {
topic: jest.fn().mockReturnThis(),
publish: jest.fn((buffer) => {
expect(buffer).toMatchSnapshot()
publishMessage: jest.fn(({ data }) => {
expect(data).toMatchSnapshot()
}),
}

service.gcloudPubSubLib = gcloudPubSubLibMock as any
service.publishMessage(topic, data)
expect(gcloudPubSubLibMock.publish).toHaveBeenCalled()
expect(gcloudPubSubLibMock.publishMessage).toHaveBeenCalled()
})
it('handles Buffer as data', () => {
const topic = 'Homer'
const data = 'You Tried Your Best and You Failed Miserably. The Lesson Is Never Try'
const gcloudPubSubLibMock = {
topic: jest.fn().mockReturnThis(),
publish: jest.fn((buffer) => {
expect(buffer).toMatchSnapshot()
publishMessage: jest.fn(({ data }) => {
expect(data).toMatchSnapshot()
}),
}

service.gcloudPubSubLib = gcloudPubSubLibMock as any
service.publishMessage(topic, Buffer.from(data))
expect(gcloudPubSubLibMock.publish).toHaveBeenCalled()
expect(gcloudPubSubLibMock.publishMessage).toHaveBeenCalled()
})
it('handles an array of numbers as data', () => {
const topic = 'Homer'
const data = [10, 20, 30, 40, 50]
const gcloudPubSubLibMock = {
topic: jest.fn().mockReturnThis(),
publish: jest.fn((buffer) => {
expect(buffer).toMatchSnapshot()
publishMessage: jest.fn(({ data }) => {
expect(data).toMatchSnapshot()
}),
}

service.gcloudPubSubLib = gcloudPubSubLibMock as any
service.publishMessage(topic, data)
expect(gcloudPubSubLibMock.publish).toHaveBeenCalled()
expect(gcloudPubSubLibMock.publishMessage).toHaveBeenCalled()
})
it('handles an ArrayBuffer as data', () => {
const topic = 'Homer'
const data = new ArrayBuffer(1)
const gcloudPubSubLibMock = {
topic: jest.fn().mockReturnThis(),
publish: jest.fn((buffer) => {
expect(buffer).toMatchSnapshot()
publishMessage: jest.fn(({ data }) => {
expect(data).toMatchSnapshot()
}),
}

service.gcloudPubSubLib = gcloudPubSubLibMock as any
service.publishMessage(topic, data)
expect(gcloudPubSubLibMock.publish).toHaveBeenCalled()
expect(gcloudPubSubLibMock.publishMessage).toHaveBeenCalled()
})
it('handles a SharedArrayBuffer as data', () => {
const topic = 'Homer'
const data = new SharedArrayBuffer(1)
const gcloudPubSubLibMock = {
topic: jest.fn().mockReturnThis(),
publish: jest.fn((buffer) => {
expect(buffer).toMatchSnapshot()
publishMessage: jest.fn(({ data }) => {
expect(data).toMatchSnapshot()
}),
}

service.gcloudPubSubLib = gcloudPubSubLibMock as any
service.publishMessage(topic, data)
expect(gcloudPubSubLibMock.publish).toHaveBeenCalled()
expect(gcloudPubSubLibMock.publishMessage).toHaveBeenCalled()
})
it('handles a Uint8Array as data', () => {
const topic = 'Homer'
const data = new Uint8Array([1, 2, 3])
const gcloudPubSubLibMock = {
topic: jest.fn().mockReturnThis(),
publish: jest.fn((buffer) => {
expect(buffer).toMatchSnapshot()
publishMessage: jest.fn(({ data }) => {
expect(data).toMatchSnapshot()
}),
}

service.gcloudPubSubLib = gcloudPubSubLibMock as any
service.publishMessage(topic, data)
expect(gcloudPubSubLibMock.publish).toHaveBeenCalled()
expect(gcloudPubSubLibMock.publishMessage).toHaveBeenCalled()
})
it('handles a string and binary encoding', () => {
const topic = 'Homer'
const data = 'You Tried Your Best and You Failed Miserably. The Lesson Is Never Try'
const encoding = 'binary'
const gcloudPubSubLibMock = {
topic: jest.fn().mockReturnThis(),
publish: jest.fn((buffer) => {
expect(buffer).toMatchSnapshot()
publishMessage: jest.fn(({ data }) => {
expect(data).toMatchSnapshot()
}),
}

service.gcloudPubSubLib = gcloudPubSubLibMock as any
service.publishMessage(topic, data, {}, encoding)
expect(gcloudPubSubLibMock.publish).toHaveBeenCalled()
expect(gcloudPubSubLibMock.publishMessage).toHaveBeenCalled()
})
})
})
Expand Down
14 changes: 9 additions & 5 deletions src/module/gcloud-pub-sub.service.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { Injectable } from '@nestjs/common'
import { PubSub } from '@google-cloud/pubsub'
import { GoogleAuthOptions } from '../interfaces/gcloud-pub-sub.interface'
import { PublishOptions } from '@google-cloud/pubsub/build/src/topic'
import type { PublishOptions, Topic } from '@google-cloud/pubsub/build/src/topic'

import type { GoogleAuthOptions } from '../interfaces/gcloud-pub-sub.interface'

@Injectable()
export class GcloudPubSubService {
Expand All @@ -11,11 +12,11 @@ export class GcloudPubSubService {
private readonly googleAuthOptions: GoogleAuthOptions,
private readonly publishOptions: PublishOptions
) {
this.gcloudPubSubLib = new PubSub(googleAuthOptions)
this.gcloudPubSubLib = new PubSub(this.googleAuthOptions)
}

public publishMessage(
topic: string,
topicName: string,
data: string | Uint8Array | number[] | ArrayBuffer | SharedArrayBuffer,
attributes: { [key: string]: string } = {},
encoding?: BufferEncoding
Expand All @@ -34,6 +35,9 @@ export class GcloudPubSubService {
} else {
dataBuffer = Buffer.from(data as string)
}
return this.gcloudPubSubLib.topic(topic, this.publishOptions).publish(dataBuffer, attributes)

const topic: Topic = this.gcloudPubSubLib.topic(topicName)
// const topic = this.gcloudPubSubLib.topic(topicName, this.publishOptions)
return topic.publishMessage({ data: dataBuffer, attributes })
}
}

0 comments on commit a9dfc31

Please sign in to comment.