Skip to content

Commit

Permalink
feat: 🎸 add unsubscribe packet
Browse files Browse the repository at this point in the history
  • Loading branch information
streamich committed Oct 4, 2020
1 parent 3c5a09f commit 495fbaa
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 1 deletion.
7 changes: 6 additions & 1 deletion src/MqttDecoder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {PacketPubrel, parsePubrel} from './packets/pubrel';
import {PacketPubcomp, parsePubcomp} from './packets/pubcomp';
import {PacketSubscribe, parseSubscribe} from './packets/subscribe';
import {PacketSuback, parseSuback} from './packets/suback';
import {PacketUnsubscribe, parseUnsubscribe} from './packets/unsubscribe';

export class MqttDecoder {
public state: DECODER_STATE = DECODER_STATE.HEADER;
Expand Down Expand Up @@ -36,7 +37,7 @@ export class MqttDecoder {
this.list = new BufferList();
}

public parse(): null | PacketConnect | PacketConnack | PacketPublish | PacketPuback | PacketPubrec | PacketPubrel | PacketPubcomp | PacketSubscribe | PacketSuback {
public parse(): null | PacketConnect | PacketConnack | PacketPublish | PacketPuback | PacketPubrec | PacketPubrel | PacketPubcomp | PacketSubscribe | PacketSuback | PacketUnsubscribe {
this.parseFixedHeader();
const data = this.parseVariableData();
if (!data) return null;
Expand Down Expand Up @@ -79,6 +80,10 @@ export class MqttDecoder {
const packet = parseSuback(b, l, data, this.version);
return packet;
}
case PACKET_TYPE.UNSUBSCRIBE: {
const packet = parseUnsubscribe(b, l, data, this.version);
return packet;
}
default: {
return null;
}
Expand Down
51 changes: 51 additions & 0 deletions src/__tests__/MqttDecoder.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {PacketPuback} from '../packets/puback';
import {PacketPubrec} from '../packets/pubrec';
import {PacketSubscribe} from '../packets/subscribe';
import { PacketSuback } from '../packets/suback';
import { PacketUnsubscribe } from '../packets/unsubscribe';

it('can instantiate', () => {
const decoder = new MqttDecoder();
Expand Down Expand Up @@ -521,3 +522,53 @@ describe('SUBACK', () => {
expect(packet.s).toEqual([0, 1, 2, 1]);
});
});

describe('UNSUBSCRIBE', () => {
it('parses MQTT 3.1.1 packet', () => {
const decoder = new MqttDecoder();
decoder.push(Buffer.from([
162, 14,
0, 7, // Message ID (7)
0, 4, // Topic length
116, 102, 115, 116, // Topic (tfst)
0, 4, // Topic length,
116, 101, 115, 116 // Topic (test)
]));
const packet: PacketUnsubscribe = decoder.parse() as PacketUnsubscribe;
expect(packet.b).toBe(162);
expect(packet.l).toBe(14);
expect(packet.i).toBe(7);
expect(packet.s).toEqual([
'tfst',
'test',
]);
});

it('parses MQTT 5.0 packet', () => {
const decoder = new MqttDecoder();
decoder.version = 5;
decoder.push(Buffer.from([
162, 28,
0, 7, // Message ID (7)
13, // properties length
38, 0, 4, 116, 101, 115, 116, 0, 4, 116, 101, 115, 116, // userProperties
0, 4, // Topic length
116, 102, 115, 116, // Topic (tfst)
0, 4, // Topic length,
116, 101, 115, 116 // Topic (test)
]));
const packet: PacketUnsubscribe = decoder.parse() as PacketUnsubscribe;
expect(packet.b).toBe(162);
expect(packet.l).toBe(28);
expect(packet.i).toBe(7);
expect(packet.p).toEqual({
[PROPERTY.UserProperty]: [
['test', 'test'],
],
});
expect(packet.s).toEqual([
'tfst',
'test',
]);
});
});
44 changes: 44 additions & 0 deletions src/packets/unsubscribe.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import BufferList from 'bl';
import {Packet, PacketHeaderData} from '../packet';
import {Properties} from '../types';
import {parseBinary, parseProps} from '../util/parse';

export interface PacketUnsubscribeData extends PacketHeaderData {
/** Packet Identifier. */
i: number;
/** Properties. */
p: Properties;
/** UNSUBSCRIBE Payload. */
s: string[];
}

export class PacketUnsubscribe extends Packet implements PacketUnsubscribeData {
constructor(
b: number,
l: number,
public i: number,
public p: Properties,
public s: string[],
) {
super(b, l);
}
}

export const parseUnsubscribe = (b: number, l: number, data: BufferList, version: number): PacketUnsubscribe => {
const i = data.readUInt16BE(0);
let offset = 2;
let p: Properties = {};
if (version === 5) {
const [props, size] = parseProps(data, offset);
p = props;
offset += size;
}
const len = data.length;
const s: string[] = [];
while (offset < len) {
const topic = parseBinary(data, offset);
s.push(topic.toString('utf8'));
offset += 2 + topic.byteLength;
}
return new PacketUnsubscribe(b, l, i, p, s);
};

0 comments on commit 495fbaa

Please sign in to comment.