Skip to content

Commit

Permalink
feat: better qos support (#323)
Browse files Browse the repository at this point in the history
  • Loading branch information
sudoshreyansh committed Jan 25, 2023
1 parent b10b6ee commit d07d74e
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 18 deletions.
50 changes: 32 additions & 18 deletions src/adapters/mqtt/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ interface IMQTTHeaders {
length: number;
}

const MQTT_UNSPECIFIED_ERROR_REASON = 0x80
const MQTT_SUCCESS_REASON = 0

class MqttAdapter extends Adapter {
private client: MqttClient
private firstConnect: boolean
Expand All @@ -29,9 +32,9 @@ class MqttAdapter extends Adapter {
async _connect(): Promise<this> { // NOSONAR
const mqttOptions = await this.resolveProtocolConfig('mqtt')
const subscribedChannels = this.getSubscribedChannels()
const serverBinding = this.AsyncAPIServer.binding('mqtt')
const securityRequirements = (this.AsyncAPIServer.security() || []).map(
(sec) => {
const mqttServerBinding = this.AsyncAPIServer.binding('mqtt')
const mqtt5ServerBinding = this.AsyncAPIServer.binding('mqtt5')
const securityRequirements = (this.AsyncAPIServer.security() || []).map(sec => {
const secName = Object.keys(sec.json())[0]
return this.parsedAsyncAPI.components().securityScheme(secName)
}
Expand All @@ -44,6 +47,9 @@ class MqttAdapter extends Adapter {
)
const url = new URL(this.AsyncAPIServer.url())

const protocolVersion = parseInt(this.AsyncAPIServer.protocolVersion() || '4')
const serverBinding = protocolVersion === 5 ? mqtt5ServerBinding : mqttServerBinding

this.client = mqtt.connect({
host: url.host,
port: url.port || (url.protocol === 'mqtt:' ? 1883 : 8883),
Expand All @@ -64,21 +70,10 @@ class MqttAdapter extends Adapter {
? mqttOptions?.authentication?.userPassword.password
: undefined,
ca: X509SecurityReq ? mqttOptions?.authentication?.cert : undefined,
protocolVersion,
customHandleAcks: this._customAckHandler.bind(this),
} as any)


this.client.on('message', (channel, message, mqttPacket) => {
const msg = this._createMessage(mqttPacket as IPublishPacket)
this.emit('message', msg, this.client)
})

this.client.on('reconnect', () => {
this.emit('reconnect', {
connection: this.client,
channels: this.channelNames,
})
})

this.client.on('close', () => {
this.emit('close', {
connection: this.client,
Expand All @@ -90,10 +85,19 @@ class MqttAdapter extends Adapter {
this.emit('error', error)
})

this.client.on('message', (channel, message, mqttPacket) => {
const qos = mqttPacket.qos
if ( protocolVersion === 5 && qos > 0 ) return // ignore higher qos messages. already processed

const msg = this._createMessage(mqttPacket as IPublishPacket)
this.emit('message', msg, this.client)
})

const connectClient = (): Promise<this> => {
return new Promise((resolve) => {
this.client.on('connect', () => {
this.client.on('connect', connAckPacket => {
const isSessionResume = connAckPacket.sessionPresent

if (!this.firstConnect) {
this.firstConnect = true
this.emit('connect', {
Expand All @@ -104,7 +108,7 @@ class MqttAdapter extends Adapter {
})
}

if (Array.isArray(subscribedChannels)) {
if (!isSessionResume && Array.isArray(subscribedChannels)) {
subscribedChannels.forEach((channel) => {
const operation = this.parsedAsyncAPI.channel(channel).publish()
const binding = operation.binding('mqtt')
Expand Down Expand Up @@ -163,6 +167,16 @@ class MqttAdapter extends Adapter {
channel: packet.topic,
})
}

_customAckHandler(channel, message, mqttPacket, done) {
const msg = this._createMessage(mqttPacket as IPublishPacket)
console.log('Hello World')

msg.on('processing:successful', () => done(MQTT_SUCCESS_REASON))
msg.on('processing:failed', () => done(MQTT_UNSPECIFIED_ERROR_REASON))

this.emit('message', msg, this.client)
}
}

export default MqttAdapter
3 changes: 3 additions & 0 deletions src/lib/glee.ts
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,8 @@ export default class Glee extends EventEmitter {

async.seq(...mws)(message, (err: Error, msg: GleeMessage) => {
if (err) {
message.notifyFailedProcessing()
debug('Error encountered while processing middlewares.')
this._processError(errorMiddlewares, err, msg)
return
}
Expand All @@ -238,6 +240,7 @@ export default class Glee extends EventEmitter {
}
})
} else {
message.notifySuccessfulProcessing()
debug('Inbound pipeline finished.')
}
})
Expand Down
14 changes: 14 additions & 0 deletions src/lib/message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,20 @@ class GleeMessage extends EventEmitter {
send() {
this.emit('send', this)
}

/**
* Indicates successfully processed the message
*/
notifySuccessfulProcessing() {
this.emit('processing:successful')
}

/**
* Indicates failure in processing the message
*/
notifyFailedProcessing() {
this.emit('processing:failed')
}
}

export default GleeMessage

0 comments on commit d07d74e

Please sign in to comment.