Skip to content

Commit

Permalink
remove deprecated Deno API's
Browse files Browse the repository at this point in the history
  • Loading branch information
seriousme committed Mar 3, 2024
1 parent b146a3a commit 5949119
Show file tree
Hide file tree
Showing 12 changed files with 220 additions and 152 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,5 @@ $RECYCLE.BIN/

# Windows shortcuts
*.lnk
# generated NPM
npm/
Empty file removed client/client.test.ts
Empty file.
4 changes: 2 additions & 2 deletions client/dev_deps.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
export { assertEquals, Buffer } from "../utils/dev_deps.ts";
export { dummyConn, dummyReader } from "../utils/dev_utils.ts";
export { assertEquals } from "../utils/dev_deps.ts";
export { DummyConn } from "../utils/dev_utils.ts";
1 change: 1 addition & 0 deletions mqttConn/deps.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,5 @@ export {
} from "../mqttPacket/mod.ts";

export { getLengthDecoder } from "../mqttPacket/length.ts";
export type { LengthDecoderResult } from "../mqttPacket/length.ts";
export { assert, BufReader } from "../utils/deps.ts";
4 changes: 2 additions & 2 deletions mqttConn/dev_deps.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
export { assertEquals, Buffer } from "../utils/dev_deps.ts";
export { dummyConn, dummyReader } from "../utils/dev_utils.ts";
export { assertEquals } from "../utils/dev_deps.ts";
export { DummyConn } from "../utils/dev_utils.ts";
22 changes: 9 additions & 13 deletions mqttConn/mqttConn.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { MqttConn, MqttConnError } from "./mqttConn.ts";
import { AnyPacket, encode } from "./deps.ts";
import { assertEquals, Buffer, dummyConn, dummyReader } from "./dev_deps.ts";
import { assertEquals, DummyConn } from "./dev_deps.ts";
import { PacketType } from "../mqttPacket/types.ts";

const connectPacket: AnyPacket = {
Expand Down Expand Up @@ -34,8 +34,7 @@ Deno.test("MqttConn should act as asyncIterator", async () => {
const publish = encode(publishPacket);
const disconnect = encode(disconnectPacket);

const reader = dummyReader([connect, publish, disconnect]);
const conn = dummyConn(reader, new Buffer());
const conn = new DummyConn([connect, publish, disconnect], new Uint8Array());
const mqttConn = new MqttConn({ conn });

const packets = [];
Expand All @@ -50,8 +49,7 @@ Deno.test("MqttConn should act as asyncIterator", async () => {
});

Deno.test("MqttConn should close on malformed length", async () => {
const reader = dummyReader([new Uint8Array([1, 175])]);
const conn = dummyConn(reader, new Buffer());
const conn = new DummyConn([new Uint8Array([1, 175])], new Uint8Array());
const mqttConn = new MqttConn({ conn });

const packets = [];
Expand All @@ -69,8 +67,7 @@ Deno.test("MqttConn should close on failed packets", async () => {
const publish = encode(publishPacket);
const brokenPublish = publish.slice(0, 7);

const reader = dummyReader([connect, brokenPublish]);
const conn = dummyConn(reader, new Buffer());
const conn = new DummyConn([connect, brokenPublish], new Uint8Array());
const mqttConn = new MqttConn({ conn });

const packets = [];
Expand All @@ -86,8 +83,8 @@ Deno.test("MqttConn should close on failed packets", async () => {

Deno.test("MqttConn should close on packets too large", async () => {
const connect = encode(connectPacket);
const reader = dummyReader([connect]);
const conn = dummyConn(reader, new Buffer());

const conn = new DummyConn([connect], new Uint8Array());
const mqttConn = new MqttConn({ conn, maxPacketSize: 20 });
const packets = [];
for await (const packet of mqttConn) {
Expand All @@ -101,10 +98,9 @@ Deno.test("MqttConn should close on packets too large", async () => {

Deno.test("MqttConn should be writable", async () => {
const connect = encode(connectPacket);
const reader = dummyReader([connect]);
const writer = new Buffer();
const conn = dummyConn(reader, writer);
const writer = new Uint8Array(24);
const conn = new DummyConn([connect], writer);
const mqttConn = new MqttConn({ conn });
await mqttConn.send(connectPacket);
assertEquals(writer.bytes(), connect);
assertEquals(writer, connect);
});
42 changes: 25 additions & 17 deletions mqttConn/mqttConn.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import {
AnyPacket,
assert,
BufReader,
decodePayload,
encode,
getLengthDecoder,
LengthDecoderResult,
} from "./deps.ts";

export type SockConn = Deno.Conn;
Expand All @@ -19,49 +19,58 @@ export interface IMqttConn extends AsyncIterable<AnyPacket> {
readonly conn: SockConn;
readonly isClosed: boolean;
readonly reason: string | undefined;

[Symbol.asyncIterator](): AsyncIterableIterator<AnyPacket>;

send(data: AnyPacket): Promise<void>;

close(): void;
}

async function readByte(conn: SockConn): Promise<number> {
const buf = new Uint8Array(1);
const bytesRead = await conn.read(buf);
assert(bytesRead !== null, MqttConnError.UnexpectedEof);
assert(bytesRead !== 0, MqttConnError.UnexpectedEof);
return buf[0];
}

async function readFull(conn: SockConn, buf: Uint8Array): Promise<void> {
let bytesRead = 0;
while (bytesRead < buf.length) {
const read = await conn.read(buf.subarray(bytesRead));
assert(read !== null, MqttConnError.UnexpectedEof);
assert(read !== 0, MqttConnError.UnexpectedEof);
bytesRead += read;
}
}

/** Read MQTT packet from given BufReader
* @throws `Error` if packet is invalid
*/
export async function readPacket(
reader: BufReader,
conn: SockConn,
maxPacketSize: number,
): Promise<AnyPacket> {
// fixed header is 1 byte of type + flags
// + a maximum of 4 bytes to encode the remaining length
const firstByte = await reader.readByte();
assert(firstByte !== null, MqttConnError.UnexpectedEof);
const decodeLength = getLengthDecoder();
let byte, result;
const firstByte = await readByte(conn);
let result: LengthDecoderResult;
do {
byte = await reader.readByte();
assert(byte !== null, MqttConnError.UnexpectedEof);
const byte = await readByte(conn);
result = decodeLength(byte);
} while (!result.done);

const remainingLength = result.length;
assert(remainingLength < maxPacketSize - 1, MqttConnError.packetTooLarge);
const packetBuf = new Uint8Array(remainingLength);
// read the rest of the packet
assert(
(await reader.readFull(packetBuf)) !== null,
MqttConnError.UnexpectedEof,
);
await readFull(conn, packetBuf);
const packet = decodePayload(firstByte, packetBuf);
assert(packet !== null, MqttConnError.UnexpectedEof);
return packet;
}

export class MqttConn implements IMqttConn {
readonly conn: SockConn;
private readonly bufReader: BufReader;
private readonly maxPacketSize: number;
private _reason: string | undefined = undefined;
private _isClosed = false;
Expand All @@ -74,7 +83,6 @@ export class MqttConn implements IMqttConn {
maxPacketSize?: number;
}) {
this.conn = conn;
this.bufReader = new BufReader(conn);
this.maxPacketSize = maxPacketSize || 2 * 1024 * 1024;
}

Expand All @@ -85,7 +93,7 @@ export class MqttConn implements IMqttConn {
async *[Symbol.asyncIterator](): AsyncIterableIterator<AnyPacket> {
while (!this._isClosed) {
try {
yield await readPacket(this.bufReader, this.maxPacketSize);
yield await readPacket(this.conn, this.maxPacketSize);
} catch (err) {
if (err.name === "PartialReadError") {
err.message = MqttConnError.UnexpectedEof;
Expand Down
9 changes: 6 additions & 3 deletions mqttPacket/length.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
export function encodeLength(x: number): number[] {
export function encodeLength(n: number): number[] {
const output = [];
let x = n;
do {
let encodedByte = x % 0x80;
x = Math.floor(x / 0x80);
Expand Down Expand Up @@ -29,11 +30,13 @@ export function decodeLength(
throw Error("length decoding failed");
}

export function getLengthDecoder(): (encodedByte: number) => {
export type LengthDecoderResult = {
done: boolean;
length: number;
numLengthBytes: number;
} {
};

export function getLengthDecoder(): (encodedByte: number) => LengthDecoderResult {
let numLengthBytes = 1;
let length = 0;
let multiplier = 1;
Expand Down
9 changes: 2 additions & 7 deletions server/test/dev_deps.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,2 @@
export { assertEquals, Buffer } from "../../utils/dev_deps.ts";
export {
dummyConn,
dummyQueueConn,
dummyReader,
dummyWriter,
} from "../../utils/dev_utils.ts";
export { assertEquals } from "../../utils/dev_deps.ts";
export { DummyConn, DummyQueueConn } from "../../utils/dev_utils.ts";
6 changes: 3 additions & 3 deletions server/test/handleConnect.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { assertEquals, dummyQueueConn } from "./dev_deps.ts";
import { assertEquals, DummyQueueConn } from "./dev_deps.ts";
import { handlers } from "./test-handlers.ts";
import {
AnyPacket,
Expand Down Expand Up @@ -37,9 +37,9 @@ function startServer(): {
const reader = new AsyncQueue<Uint8Array>();
const writer = new AsyncQueue<Uint8Array>();

const outputConn = dummyQueueConn(writer, reader);
const outputConn = new DummyQueueConn(writer, reader);
const mqttConn = new MqttConn({ conn: outputConn });
const inputConn = dummyQueueConn(reader, writer, () => {
const inputConn = new DummyQueueConn(reader, writer, () => {
mqttConn.close();
});
mqttServer.serve(inputConn);
Expand Down
1 change: 0 additions & 1 deletion utils/dev_deps.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,4 @@ export {
assertEquals,
assertThrows,
} from "https://deno.land/std@0.215.0/assert/mod.ts";
export { Buffer } from "https://deno.land/std@0.215.0/io/buffer.ts";
export { readerFromIterable } from "https://deno.land/std@0.215.0/streams/mod.ts";
Loading

0 comments on commit 5949119

Please sign in to comment.