Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: better qos support #323

Merged
merged 6 commits into from
Jan 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 32 additions & 18 deletions src/adapters/mqtt/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ interface IMQTTHeaders {
length: number;
}

const MQTT_UNSPECIFIED_ERROR_REASON = 0x80
const MQTT_SUCCESS_REASON = 0

class MqttAdapter extends Adapter {
private client: MqttClient
private firstConnect: boolean
Expand All @@ -29,9 +32,9 @@ class MqttAdapter extends Adapter {
async _connect(): Promise<this> { // NOSONAR
const mqttOptions = await this.resolveProtocolConfig('mqtt')
const subscribedChannels = this.getSubscribedChannels()
const serverBinding = this.AsyncAPIServer.binding('mqtt')
const securityRequirements = (this.AsyncAPIServer.security() || []).map(
(sec) => {
const mqttServerBinding = this.AsyncAPIServer.binding('mqtt')
const mqtt5ServerBinding = this.AsyncAPIServer.binding('mqtt5')
const securityRequirements = (this.AsyncAPIServer.security() || []).map(sec => {
const secName = Object.keys(sec.json())[0]
return this.parsedAsyncAPI.components().securityScheme(secName)
}
Expand All @@ -44,6 +47,9 @@ class MqttAdapter extends Adapter {
)
const url = new URL(this.AsyncAPIServer.url())

const protocolVersion = parseInt(this.AsyncAPIServer.protocolVersion() || '4')
const serverBinding = protocolVersion === 5 ? mqtt5ServerBinding : mqttServerBinding

this.client = mqtt.connect({
host: url.host,
port: url.port || (url.protocol === 'mqtt:' ? 1883 : 8883),
Expand All @@ -64,21 +70,10 @@ class MqttAdapter extends Adapter {
? mqttOptions?.authentication?.userPassword.password
: undefined,
ca: X509SecurityReq ? mqttOptions?.authentication?.cert : undefined,
protocolVersion,
customHandleAcks: this._customAckHandler.bind(this),
} as any)


this.client.on('message', (channel, message, mqttPacket) => {
const msg = this._createMessage(mqttPacket as IPublishPacket)
this.emit('message', msg, this.client)
})

this.client.on('reconnect', () => {
this.emit('reconnect', {
connection: this.client,
channels: this.channelNames,
})
})

this.client.on('close', () => {
this.emit('close', {
connection: this.client,
Expand All @@ -90,10 +85,19 @@ class MqttAdapter extends Adapter {
this.emit('error', error)
})

this.client.on('message', (channel, message, mqttPacket) => {
const qos = mqttPacket.qos
if ( protocolVersion === 5 && qos > 0 ) return // ignore higher qos messages. already processed

const msg = this._createMessage(mqttPacket as IPublishPacket)
this.emit('message', msg, this.client)
})

const connectClient = (): Promise<this> => {
return new Promise((resolve) => {
this.client.on('connect', () => {
this.client.on('connect', connAckPacket => {
const isSessionResume = connAckPacket.sessionPresent

if (!this.firstConnect) {
this.firstConnect = true
this.emit('connect', {
Expand All @@ -104,7 +108,7 @@ class MqttAdapter extends Adapter {
})
}

if (Array.isArray(subscribedChannels)) {
if (!isSessionResume && Array.isArray(subscribedChannels)) {
subscribedChannels.forEach((channel) => {
const operation = this.parsedAsyncAPI.channel(channel).publish()
const binding = operation.binding('mqtt')
Expand Down Expand Up @@ -163,6 +167,16 @@ class MqttAdapter extends Adapter {
channel: packet.topic,
})
}

_customAckHandler(channel, message, mqttPacket, done) {
const msg = this._createMessage(mqttPacket as IPublishPacket)
console.log('Hello World')

msg.on('processing:successful', () => done(MQTT_SUCCESS_REASON))
msg.on('processing:failed', () => done(MQTT_UNSPECIFIED_ERROR_REASON))

this.emit('message', msg, this.client)
}
}

export default MqttAdapter
3 changes: 3 additions & 0 deletions src/lib/glee.ts
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,8 @@ export default class Glee extends EventEmitter {

async.seq(...mws)(message, (err: Error, msg: GleeMessage) => {
if (err) {
message.notifyFailedProcessing()
debug('Error encountered while processing middlewares.')
this._processError(errorMiddlewares, err, msg)
return
}
Expand All @@ -238,6 +240,7 @@ export default class Glee extends EventEmitter {
}
})
} else {
message.notifySuccessfulProcessing()
debug('Inbound pipeline finished.')
}
})
Expand Down
14 changes: 14 additions & 0 deletions src/lib/message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,20 @@ class GleeMessage extends EventEmitter {
send() {
this.emit('send', this)
}

/**
* Indicates successfully processed the message
*/
notifySuccessfulProcessing() {
this.emit('processing:successful')
}

/**
* Indicates failure in processing the message
*/
notifyFailedProcessing() {
this.emit('processing:failed')
}
}

export default GleeMessage