-
Notifications
You must be signed in to change notification settings - Fork 182
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
frontend: Add websocket multiplexer #2563
base: main
Are you sure you want to change the base?
Conversation
Backend Code coverage changed from 61.1% to 61.2%. Change: .1% 😃. Coverage report
|
a3b5f1f
to
e23dd95
Compare
Backend Code coverage changed from 61.1% to 61.1%. Change: 0% 😃. Coverage report
|
e23dd95
to
af50857
Compare
Backend Code coverage changed from 61.0% to 61.2%. Change: .2% 😃. Coverage report
|
Hi, I've tried the test case that you've described and it doesn't work for me. It enters some kind of loop and keeps rerendering pod-creation-loop.mp4Another thing I've tried is setting allowed namespaces (default and store) in setting Then went to pods page and it connects to the first namespace twice, but should connect to both default and store |
So websocket connection can be of 4 types
|
af50857
to
8484f79
Compare
Backend Code coverage changed from 61.1% to 61.2%. Change: .1% 😃. Coverage report
|
8484f79
to
146c4db
Compare
Backend Code coverage changed from 61.0% to 61.1%. Change: .1% 😃. Coverage report
|
146c4db
to
6055579
Compare
Backend Code coverage changed from 61.0% to 61.2%. Change: .2% 😃. Coverage report
|
6055579
to
091b60a
Compare
Backend Code coverage changed from 61.0% to 61.1%. Change: .1% 😃. Coverage report
|
I've tried the "k run nginx" case again and it worked but I noticed that it recreates the connection on every update First it receives new pod {
"clusterId": "olek-test-cluster",
"path": "/api/v1/namespaces/default/pods",
"query": "watch=1&resourceVersion=34249370",
"userId": "4bd01bb98296b9e2",
"data": "{\"type\":\"ADDED...",
"type": "DATA"
} then frontend sends a close signal {
"clusterId": "olek-test-cluster",
"path": "/api/v1/namespaces/default/pods",
"query": "watch=1&resourceVersion=34249370",
"userId": "4bd01bb98296b9e2",
"type": "CLOSE"
} and then right after that it requests a new connection with a new resource version {
"clusterId": "olek-test-cluster",
"path": "/api/v1/namespaces/default/pods",
"query": "watch=1&resourceVersion=34249403",
"userId": "4bd01bb98296b9e2",
"type": "REQUEST"
} |
Note: Olek did some more tests for useWatchKubeObjecfLists which have been merged into main now. |
091b60a
to
ceca8d2
Compare
Backend Code coverage changed from 61.0% to 61.1%. Change: .1% 😃. Coverage report
|
ceca8d2
to
b96d1ad
Compare
Backend Code coverage changed from 61.0% to 61.1%. Change: .1% 😃. Coverage report
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work.
I left some notes, and left some ideas for testing in the TODO section of the PR description.
// Wait for existing connection attempt if in progress | ||
if (this.connecting) { | ||
return new Promise(resolve => { | ||
const checkConnection = setInterval(() => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure about this.
It runs every 100ms until the connection is open.
What happens if it never is open? Or if is open and then closed within that 100ms? Could it run forever, or never return at all?
The other issue with polling every 100ms is that it could be max 100ms of waiting, when it might be possible to wait 1ms. Is there a way to do this without polling?
If this can't be changed (or easily changed), could you please leave a comment to describe why it's doing this in some detail?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, there are some issues with this polling approach. But making this change now won't be easy so added detailed comments regarding tradeoffs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if this has a measurable impact on app performance? It would be good to compare a before and after using the performance tab in developer tools. This is my main concern.
Adding an max 100ms isn't as big a deal IMHO.
@@ -225,7 +225,7 @@ func TestHandleClusterMessages(t *testing.T) { | |||
t.Fatal("Test timed out") | |||
} | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's worth adding some more tests for the functions with no test coverage.
multiplexer.go:102: NewMultiplexer 100.0%
multiplexer.go:115: updateStatus 82.4%
multiplexer.go:158: establishClusterConnection 0.0%
multiplexer.go:203: getClusterConfigWithFallback 100.0%
multiplexer.go:220: createConnection 100.0%
multiplexer.go:242: dialWebSocket 50.0%
multiplexer.go:270: monitorConnection 54.5%
multiplexer.go:295: reconnect 0.0%
multiplexer.go:321: HandleClientWebSocket 34.8%
multiplexer.go:370: readClientMessage 50.0%
multiplexer.go:391: getOrCreateConnection 0.0%
multiplexer.go:420: handleConnectionError 75.0%
multiplexer.go:442: writeMessageToCluster 100.0%
multiplexer.go:460: handleClusterMessages 87.5%
multiplexer.go:479: processClusterMessage 83.3%
multiplexer.go:503: createWrapperMessage 100.0%
multiplexer.go:536: handleReadError 100.0%
multiplexer.go:547: handleWriteError 0.0%
multiplexer.go:558: cleanupConnections 100.0%
multiplexer.go:575: getClusterConfig 85.7%
multiplexer.go:590: CloseConnection 92.3%
multiplexer.go:626: createWebSocketURL 0.0%
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think these ones are worth increasing test coverage for...
multiplexer.go:121: updateStatus 72.0%
multiplexer.go:266: dialWebSocket 50.0%
multiplexer.go:294: monitorConnection 54.5%
multiplexer.go:349: HandleClientWebSocket 34.8%
multiplexer.go:398: readClientMessage 50.0%
multiplexer.go:544: checkResourceVersion 55.6%
multiplexer.go:571: sendCompleteMessage 0.0%
multiplexer.go:681: CloseConnection 72.7%
backend/cmd/multiplexer.go
Outdated
@@ -586,37 +660,42 @@ func (m *Multiplexer) getClusterConfig(clusterID string) (*rest.Config, error) { | |||
} | |||
|
|||
// CloseConnection closes a specific connection based on its identifier. | |||
// | |||
//nolint:unparam |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please either fix this lint issue or add a comment about why it is disabled?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand this.
It says arguments are not used in the function body, but I see on that connKey line they are?
//nolint:unparam // The parameters are required for the function signature but are not used in the body.
func (m *Multiplexer) CloseConnection(clusterID, path, userID string) error {
connKey := fmt.Sprintf("%s:%s:%s", clusterID, path, userID)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right, let me fix the comment.
backend/cmd/multiplexer.go
Outdated
|
||
return err | ||
// checkResourceVersion checks and handles resource version changes. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be good to document the return and inputs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about this name which mentions that it sends the message?
if err := m.sendIfNewResourceVersion(message, conn, clientConn, lastResourceVersion); err != nil {
return err
}
// sendIfNewResourceVersion checks the version of a resource from an incoming message
// and sends a complete message to the client if the resource version has changed.
//
// This function is used to ensure that the client is always aware of the latest version
// of a resource. When a new message is received, it extracts the resource version from
// the message metadata. If the resource version has changed since the last known version,
// it sends a complete message to the client to update them with the latest resource state.
//
// Parameters:
// - message: The JSON-encoded message containing resource information.
// - conn: The connection object representing the current connection.
// - clientConn: The WebSocket connection to the client.
// - lastResourceVersion: A pointer to the last known resource version string.
//
// Returns:
// - An error if any issues occur while processing the message, or nil if successful.
func (m *Multiplexer) sendIfNewResourceVersion(
message []byte,
conn *Connection,
clientConn *websocket.Conn,
lastResourceVersion *string,
) error {
0e9e654
to
94bf2e0
Compare
Backend Code coverage changed from 61.1% to 62.9%. Change: 1.8% 😃. Coverage report
|
94bf2e0
to
d242957
Compare
Backend Code coverage changed from 61.0% to 62.8%. Change: 1.8% 😃. Coverage report
|
d242957
to
a48b943
Compare
Backend Code coverage changed from 61.0% to 62.8%. Change: 1.8% 😃. Coverage report
|
@yolossn Would you mind reviewing this? (Especially the backend parts for correctness and security.) |
Good work @knrt10 I think these ones could probably do with some more tests.
|
return server | ||
} | ||
|
||
func createTestWebSocketConnection() (*websocket.Conn, *httptest.Server) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one needs documentation.
@@ -424,3 +577,74 @@ func TestWriteMessageToCluster(t *testing.T) { | |||
assert.Error(t, err) | |||
assert.Equal(t, StateError, conn.Status.State) | |||
} | |||
|
|||
func createMockKubeAPIServer() *httptest.Server { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one needs documentation.
frontend/src/helpers/index.ts
Outdated
* defaults to false. This is a feature flag to enable the websocket multiplexer. | ||
*/ | ||
export function getWebsocketMultiplexerEnabled(): boolean { | ||
return import.meta.env.REACT_APP_ENABLE_WEBSOCKET_MULTIPLEXER === 'true'; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool. Let's enable this by default in the PR now.
@@ -1,11 +1,398 @@ | |||
import { useEffect, useMemo } from 'react'; | |||
import { useCallback, useEffect, useMemo } from 'react'; | |||
import { findKubeconfigByClusterName, getUserIdFromLocalStorage } from '../../../../stateless'; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Weird... Somehow the test coverage went down?
File | % Stmts | % Branch | % Funcs | % Lines | Uncovered Line #s
webSocket.ts | 32.94 | 21.34 | 41.66 | 33.74 | 122-123,136-145,167-176,206-286,313,325-342,410-423,439,455,501-624
By the way... Can you please ask the comment author (or another reviewer) to resolve the discussion items? Especially with a large PR it's hard to keep track of the discussions otherwise. |
Sure, make sense |
a48b943
to
839bd41
Compare
This adds a single websocket connection from frontend to backend and sends lists of messages to the backend. With the help of messages backend creates multiple websocket connection to k8s API and returns back the data. This solves the issue for limiting of websocket connection in frontend in case of multi cluster setup. Signed-off-by: Kautilya Tripathi <ktripathi@microsoft.com>
Now websocket has clear type that is needs to sends. This also fixes panic of websocket in various edge cases. Signed-off-by: Kautilya Tripathi <ktripathi@microsoft.com>
This adds a new way to use new way to run websocket multiplexer. Default way would be the legacy way which creates multiple websocket connection. This adds a new flag `REACT_APP_ENABLE_WEBSOCKET_MULTIPLEXER` to run the new API. Signed-off-by: Kautilya Tripathi <ktripathi@microsoft.com>
Signed-off-by: Kautilya Tripathi <ktripathi@microsoft.com>
839bd41
to
fff39d1
Compare
Addressed changes, please review |
Backend Code coverage changed from 61.1% to 63.6%. Change: 2.5% 😃. Coverage report
|
Backend Code coverage changed from 61.1% to 63.5%. Change: 2.4% 😃. Coverage report
|
This adds a single websocket connection from frontend to backend and sends lists of messages to the backend. With the help of messages backend creates multiple websocket connection to k8s API and returns back the data.
This solves the issue for limiting of websocket connection in frontend in case of multi cluster setup.
Fixes: #2515
Testing
cd frontend && npm run start #(default is multiplexer enabled)
cd backend && go run ./cmd -dev -enable-dynamic-clusters
kubectl run nginx --image=nginx
TODO
cd backend && go test -race -v -p 1 ./...