Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: JavascriptException: Error: stream.push() after EOF, js engine: hermes #1914

Open
sanduluca opened this issue Aug 1, 2024 · 10 comments · Fixed by #1915 · May be fixed by WontonSam/Bekki13-cloud-node#2581 or #1932
Open

[Bug]: JavascriptException: Error: stream.push() after EOF, js engine: hermes #1914

sanduluca opened this issue Aug 1, 2024 · 10 comments · Fixed by #1915 · May be fixed by WontonSam/Bekki13-cloud-node#2581 or #1932
Labels

Comments

@sanduluca
Copy link
Contributor

sanduluca commented Aug 1, 2024

MQTTjs Version

5.8.0

Broker

AWS

Environment

React Native (engine: Hermes)

Description

After releasing a new version of out react native app with the mqtt feature we started receiving crashes in Firebase crashlytics.
image

This is the only issue I found related to this problem
nodejs/readable-stream#207

Code
// mqtt.tsx
import React, { createContext, useContext } from 'react';
import mqtt from 'mqtt';

import useMqttConnection, { MqttError, MqttStatus } from 'hooks/useMqttConnection';
import { useAppSelector } from 'store';

interface MqttContextValue {
	mqttClient: mqtt.MqttClient | null;
	mqttStatus: MqttStatus;
	mqttError: MqttError | null;
	subscribeToTopic: (topic: string, ops?: mqtt.IClientSubscribeOptions) => void;
}

// @ts-ignore
const MqttContext = createContext<MqttContextValue>({ mqttClient: {} });

export const MqttProvider = ({ children }: React.PropsWithChildren) => {
	const isAuthenticated = useAppSelector(state => state.auth.authenticated);

	const { mqttClient, mqttStatus, mqttError, setMqttError, setMqttStatus } =
		useMqttConnection(isAuthenticated);

	const subscribeToTopic = (topic: string, ops: mqtt.IClientSubscribeOptions = { qos: 1 }) => {
		if (!mqttClient) return;

		mqttClient.subscribe(topic, ops, error => {
			if (error) {
				setMqttStatus('Error');
				setMqttError({
					type: 'MqttTopic',
					msg: error.message,
				});
			}
		});
	};

	return (
		<MqttContext.Provider
			value={{
				mqttClient,
				mqttStatus,
				mqttError,
				subscribeToTopic,
			}}
		>
			{children}
		</MqttContext.Provider>
	);
};

export const useMqtt = () => useContext(MqttContext);
// useMqttConnection.ts
import { useState, useEffect, useRef, useCallback } from 'react';
import mqtt, { Timer } from 'mqtt';
import BackgroundTimer from 'react-native-background-timer';

import { instance } from 'api';

export type MqttStatus = 'Connected' | 'Disconnected' | 'Offline' | 'Reconnecting' | 'Error';
export type MqttError = { type: string; msg: string };

interface WssDetails {
	signedUrl: string;
	clientId: string;
	validitySeconds: number;
}
// dont directly assign methods to timer object otherwise this throws: Cannot set property 'NaN' of undefined
const timer: Timer = {
	clear: id => BackgroundTimer.clearInterval(id),
	// @ts-expect-error
	set: (func, time) => BackgroundTimer.setInterval(func, time),
};

const getWssDetails = () => instance.get<WssDetails>('/app/wss');

function useMqttConnection(isAuthenticated: boolean) {
	const [mqttStatus, setMqttStatus] = useState<MqttStatus>('Disconnected');
	const [mqttError, setMqttError] = useState<MqttError | null>(null);
	const [mqttClient, setMqttClient] = useState<mqtt.MqttClient | null>(null);
	const [wssDetails, setWssDetails] = useState<WssDetails | null>(null);
	const wssDetailsRef = useRef<WssDetails | null>(wssDetails);
	const isFetchingWssDetails = useRef(false);
	wssDetailsRef.current = wssDetails;
	const hasWssDetails = !!wssDetailsRef.current;
	const doMqttConnection = isAuthenticated;

	const fetchWssDetails = useCallback(() => {
		isFetchingWssDetails.current = true;
		return getWssDetails()
			.then(r => {
				setWssDetails(r.data);
			})
			.catch(() => {
				// in case we could not fetch initial details, retry every 25 seconds
				setTimeout(() => {
					if (!wssDetailsRef.current) {
						fetchWssDetails();
					}
				}, 25000);
			})
			.finally(() => {
				isFetchingWssDetails.current = false;
			});
	}, []);

	useEffect(() => {
		if (!doMqttConnection) return;
		fetchWssDetails();
	}, [doMqttConnection, fetchWssDetails]);

	useEffect(() => {
		if (!doMqttConnection || !hasWssDetails) return;

		const transformWsUrl = (
			url: string,
			options: mqtt.IClientOptions,
			currentClient: mqtt.MqttClient,
		) => {
			if (!isFetchingWssDetails.current) {
				fetchWssDetails();
			}

			currentClient.options.clientId = wssDetailsRef.current!.clientId;
			return wssDetailsRef.current!.signedUrl;
		};

		const client = mqtt
			.connect(wssDetailsRef.current!.signedUrl, {
				clientId: wssDetailsRef.current!.clientId,
				reconnectPeriod: 5000,
				queueQoSZero: true,
				resubscribe: true,
				clean: true,
				keepalive: 60,
				protocolVersion: 5,
				properties: {
					sessionExpiryInterval: 600,
				},
				timerVariant: timer,
				transformWsUrl,
			})
			.on('connect', () => {
				setMqttStatus('Connected');
			})
			.on('error', error => {
				setMqttError({ type: 'MqttGeneral', msg: error.message });
			})
			.on('disconnect', () => {
				setMqttStatus('Disconnected');
			})
			.on('offline', () => {
				setMqttStatus('Offline');
			})
			.on('reconnect', () => {
				setMqttStatus('Reconnecting');
			})
			.on('close', () => {
				setMqttStatus('Disconnected');
			});

		setMqttClient(client);

		return () => {
			client.end();
		};
	}, [doMqttConnection, fetchWssDetails, hasWssDetails]);

	return {
		mqttClient,
		mqttStatus,
		mqttError,
		setMqttStatus,
		setMqttError,
	};
}

export default useMqttConnection;

Minimal Reproduction

Unfortunately we cannot reproduce this locally. We are only recording crashes in Firebase Crashlytics

Debug logs

Fatal Exception: com.facebook.react.common.JavascriptException: Error: stream.push() after EOF, js engine: hermes, stack:
anonymous@3372:233
c@3372:1426
S@3375:2499
anonymous@3375:3938
_@3478:2534
dispatchEvent@135:5649
anonymous@154:3328
value@52:779
value@49:935
value@37:3897
anonymous@37:693
value@37:2528
value@37:664

       at com.facebook.react.modules.core.ExceptionsManagerModule.reportException(ExceptionsManagerModule.java:65)
       at java.lang.reflect.Method.invoke(Method.java)
       at com.facebook.react.bridge.JavaMethodWrapper.invoke(JavaMethodWrapper.java:372)
       at com.facebook.react.bridge.JavaModuleWrapper.invoke(JavaModuleWrapper.java:146)
       at com.facebook.jni.NativeRunnable.run(NativeRunnable.java)
       at android.os.Handler.handleCallback(Handler.java:996)
       at android.os.Handler.dispatchMessage(Handler.java:110)
       at com.facebook.react.bridge.queue.MessageQueueThreadHandler.dispatchMessage(MessageQueueThreadHandler.java:27)
       at android.os.Looper.loopOnce(Looper.java:210)
       at android.os.Looper.loop(Looper.java:302)
       at com.facebook.react.bridge.queue.MessageQueueThreadImpl$4.run(MessageQueueThreadImpl.java:233)
       at java.lang.Thread.run(Thread.java:1012)
@sanduluca sanduluca added the bug label Aug 1, 2024
@robertsLando
Copy link
Member

robertsLando commented Aug 1, 2024

@sanduluca what was the previous working version you were using? What are the steps to reproduce the issue? Does it happens randomly?

@sanduluca
Copy link
Contributor Author

sanduluca commented Aug 1, 2024

We were not using the mqtt before this.
We could not reproduce this.
Yes. It seems to happen randomly. We have 2-5 crashes a day on a 1000+ users daily

@robertsLando
Copy link
Member

It's hard based on that log to say what could trigger that, could you try to at least provide me a valid stack trace? Something that points me to the line that is triggering the issue.

IMO giving that we treat react native as a browser and it is using wss the error cold be in this two places:

proxy.push(data)

this.push(chunk)

I dunno if adding a check before the push could fix the issue

@mcollina any hint?

@robertsLando
Copy link
Member

Try using version 5.9.1

@sanduluca
Copy link
Contributor Author

Ok, after releasing a few new updates to our users with new mqtt version we still receive JavascriptException: Error: stream.push() after EOF

// package.json
"mqtt": "^5.9.1",
// yarn.lock
mqtt-packet@^9.0.0:
  version "9.0.0"
  resolved "https://registry.yarnpkg.com/mqtt-packet/-/mqtt-packet-9.0.0.tgz#fd841854d8c0f1f5211b00de388c4ced45b59216"
  integrity sha512-8v+HkX+fwbodsWAZIZTI074XIoxVBOmPeggQuDFCGg1SqNcC+uoRMWu7J6QlJPqIUIJXmjNYYHxBBLr1Y/Df4w==
  dependencies:
    bl "^6.0.8"
    debug "^4.3.4"
    process-nextick-args "^2.0.1"

mqtt@^5.2.0, mqtt@^5.9.1:
  version "5.9.1"
  resolved "https://registry.yarnpkg.com/mqtt/-/mqtt-5.9.1.tgz#422ded61d432b995d931ae4d9c470684f33e3289"
  integrity sha512-FMENfSUMfCSUCnkuUVAL4U01795SUEfrX0NZ53HNr1r2VNpwKhR5Au9viq9WCFGtgrDAmsll4fkloqFCFgStYA==
  dependencies:
    "@types/readable-stream" "^4.0.5"
    "@types/ws" "^8.5.9"
    commist "^3.2.0"
    concat-stream "^2.0.0"
    debug "^4.3.4"
    help-me "^5.0.0"
    lru-cache "^10.0.1"
    minimist "^1.2.8"
    mqtt "^5.2.0"
    mqtt-packet "^9.0.0"
    number-allocator "^1.0.14"
    readable-stream "^4.4.2"
    reinterval "^1.1.0"
    rfdc "^1.3.0"
    split2 "^4.2.0"
    worker-timers "^7.1.4"
    ws "^8.17.1"

The error seems to come from onMessage

Fatal Exception: com.facebook.react.common.JavascriptException
Error: stream.push() after EOF, js engine: hermes, stack: anonymous@1:3472651 NodeError@1:3473631 readableAddChunk@1:3484528 anonymous@1:3488655 onMessage@1:3684762 dispatchEvent@1:265253 anonymous@1:287806 emit@1:123298 emit@1:121941 __callFunction@1:115290 anonymous@1:113595 __guard@1:114551 callFunctionReturnFlushedQueue@1:113553

Here:

proxy.push(data)

image
image

@robertsLando robertsLando reopened this Aug 28, 2024
@robertsLando
Copy link
Member

@sanduluca I'm sorry but I'm not able to reproduce this issue anyway, could you try to fix this yourself and submit a PR? For what I know that issue happens when writing to a stream that is destroyed and the check is there now, no clue whatever could cause that :(

there is also another check here

if (!this.destroyed) {

As the proxy could be a buffered duplex so there are 2 sreams to handle

@sanduluca
Copy link
Contributor Author

I am not able to reproduce it constantly either. I caught it myself for the first time yesterday. Here is a screenshot:
cloud-photo-size-2-5373186271675023670-y

Anyway, I was searching what we can do and found a possible solution. Opened a PR for this

@Virendra121998
Copy link

I am also planning to integrate mqtt with react native. We operate at a high scale. Should this crash be concerning in that case? Also if we keep a variable to establish whether the connection is open or not and receive message only if connection is established, will that help?

Example of the implementation
function createSocketClient({ username, token }) {
let connectionClosed = false; // Flag to track the connection state

const client = createMqttClient({
    url
    username
    password
});

// Connection event handlers
client.on('connect', () => {
    console.log('Connected to MQTT');
    connectionClosed = false; // Reset the flag when connected
});

client.on('error', (error) => {
    console.error('Error in connection to MQTT:', error);
    // Handle error and ensure the connection is cleaned up
    client.end(true); // Forcefully close the connection and clean up
    connectionClosed = true; // Mark connection as closed
});

client.on('close', () => {
    console.log('MQTT connection closed');
    connectionClosed = true; // Mark connection as closed
});

client.on('end', () => {
    console.log('MQTT connection ended');
    connectionClosed = true; // Mark connection as closed
});

// Ensure no data is sent when connection is closed
function sendMessage(topic, message) {
    if (!connectionClosed) {
        client.publish(topic, message); // Send only if the connection is open
    } else {
        console.warn('Attempted to send a message after connection was closed');
    }
}

return client;

}

@Virendra121998
Copy link

Also currently my use case is only to read messages on frontend and not publish. Will this crash occur on reads as well?

@robertsLando
Copy link
Member

@Virendra121998 I tried to reproduce this on my side without success, I sincerly have no clue what's the reason and I'm not sure if #1932 could be a fix

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment