Skip to content

Commit

Permalink
Migrate to socket.io
Browse files Browse the repository at this point in the history
  • Loading branch information
lucafaggianelli committed Nov 3, 2023
1 parent 06c4176 commit aa912c2
Show file tree
Hide file tree
Showing 21 changed files with 189 additions and 234 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
- Improve task data visualization (#257)
- Check data file paths before accessing them

### Changed
- Migrated plain websocket to SocketIO for improved communication stability

## [0.4.1] - 2023-10-11

### Fixed
Expand Down
2 changes: 1 addition & 1 deletion frontend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
"react": "^18.2.0",
"react-dom": "^18.2.0",
"react-router-dom": "^6.17.0",
"react-use-websocket": "^4.3.1"
"socket.io-client": "^4.7.2"
},
"devDependencies": {
"@types/json-schema": "^7.0.11",
Expand Down
13 changes: 8 additions & 5 deletions frontend/src/App.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import '@/globals.css'
import Router from './Router'
import { AuthProvider } from './contexts/AuthContext'
import { BrowserRouter } from 'react-router-dom'
import WebSocketContext from './contexts/WebSocketContext'

const queryClient = new QueryClient({
defaultOptions: {
Expand All @@ -16,11 +17,13 @@ const queryClient = new QueryClient({
function App() {
return (
<QueryClientProvider client={queryClient}>
<BrowserRouter>
<AuthProvider>
<Router />
</AuthProvider>
</BrowserRouter>
<WebSocketContext>
<BrowserRouter>
<AuthProvider>
<Router />
</AuthProvider>
</BrowserRouter>
</WebSocketContext>
</QueryClientProvider>
)
}
Expand Down
27 changes: 10 additions & 17 deletions frontend/src/components/LogViewer.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,14 @@ import {
TableRow,
Text,
} from '@tremor/react'
import { BarsArrowDownIcon } from '@heroicons/react/24/outline'
import { createRef, useCallback, useEffect, useState } from 'react'

import { getLogs } from '@/repository'
import { useSocket } from '@/socket'
import {
LogEntry,
LogLevel,
Pipeline,
PipelineRun,
WebSocketMessage,
} from '@/types'
import { socket } from '@/socket'
import { LogEntry, LogLevel, Pipeline, PipelineRun } from '@/types'
import { formatNumber, formatTime, getTasksColors } from '@/utils'
import TracebackInfoDialog from './TracebackInfoDialog'
import { BarsArrowDownIcon } from '@heroicons/react/24/outline'

interface Props {
pipeline: Pipeline
Expand Down Expand Up @@ -57,19 +51,16 @@ interface FilterType {
const LogViewer: React.FC<Props> = ({ pipeline, run }) => {
const [filter, setFilter] = useState<FilterType>({ levels: [], tasks: [] })
const [scrollToBottom, setScrollToBottom] = useState(true)
const { lastMessage } = useSocket(`logs.${run.id}`)
const queryClient = useQueryClient()

const tableRef = createRef<HTMLTableElement>()

const query = useQuery(getLogs(run.id))

const onWsMessage = useCallback(
(message: WebSocketMessage) => {
const { data } = message

(message: string) => {
queryClient.setQueryData<LogEntry[]>(['logs', run.id], (oldLogs = []) => {
const log: LogEntry = JSON.parse(data)
const log: LogEntry = JSON.parse(message)
log.id = oldLogs.length
log.timestamp = new Date(log.timestamp)
return [...oldLogs, log]
Expand Down Expand Up @@ -122,10 +113,12 @@ const LogViewer: React.FC<Props> = ({ pipeline, run }) => {
})

useEffect(() => {
if (lastMessage) {
onWsMessage(lastMessage)
socket.on(`logs.${run.id}`, onWsMessage)

return () => {
socket.off(`logs.${run.id}`, onWsMessage)
}
}, [lastMessage])
}, [])

const onFilterChange = useCallback((newFilter: Partial<FilterType>) => {
setFilter((currentFilter) => ({ ...currentFilter, ...newFilter }))
Expand Down
17 changes: 8 additions & 9 deletions frontend/src/components/RunsList.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ import { useCallback, useEffect, useState } from 'react'
import { Link, useNavigate } from 'react-router-dom'
import { HTTPError } from 'ky'

import { useSocket } from '@/socket'
import { PipelineRun, WebSocketMessage } from '@/types'
import { socket } from '@/socket'
import { PipelineRun } from '@/types'
import { formatDateTime } from '@/utils'
import StatusBadge from './StatusBadge'
import Timer from './Timer'
Expand All @@ -33,12 +33,9 @@ const RunsList: React.FC<Props> = ({ pipelineId, query, triggerId }) => {
const [runs, setRuns] = useState<PipelineRun[]>(query.data || [])
const queryClient = useQueryClient()
const navigate = useNavigate()
const { lastMessage } = useSocket('run-update')

const onWsMessage = useCallback(
(message: WebSocketMessage) => {
const { data, type } = message

(data: any) => {
data.run.start_time = new Date(data.run.start_time)
data.run.trigger_id = data.trigger

Expand All @@ -65,10 +62,12 @@ const RunsList: React.FC<Props> = ({ pipelineId, query, triggerId }) => {
)

useEffect(() => {
if (lastMessage) {
onWsMessage(lastMessage)
socket.on('run-update', onWsMessage)

return () => {
socket.off('run-update', onWsMessage)
}
}, [lastMessage])
}, [pipelineId])

useEffect(() => {
if (query.data?.length) {
Expand Down
31 changes: 31 additions & 0 deletions frontend/src/contexts/WebSocketContext.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import { PropsWithChildren, useEffect, useState } from 'react'

import { socket } from '@/socket'

interface Props extends PropsWithChildren {}

const WebSocketContext: React.FC<Props> = ({ children }) => {
const [_, setIsConnected] = useState(socket.connected)

useEffect(() => {
function onConnect() {
setIsConnected(true)
}

function onDisconnect() {
setIsConnected(false)
}

socket.on('connect', onConnect)
socket.on('disconnect', onDisconnect)

return () => {
socket.off('connect', onConnect)
socket.off('disconnect', onDisconnect)
}
}, [])

return children
}

export default WebSocketContext
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,29 @@ import RunsTasksList from '@/components/Tasks'
import Timer from '@/components/Timer'
import { MANUAL_TRIGGER } from '@/constants'
import { getPipeline, getRun } from '@/repository'
import { useSocket } from '@/socket'
import { socket } from '@/socket'
import { Trigger } from '@/types'
import { TASKS_COLORS, formatDate, formatDateTime, formatTime } from '@/utils'

const RunViewPage = () => {
const { lastMessage } = useSocket('run-update')
const queryClient = useQueryClient()
const urlParams = useParams()
const pipelineId = urlParams.pipelineId as string
const triggerId = urlParams.triggerId as string
const runId = parseInt(urlParams.runId as string)

useEffect(() => {
if (lastMessage) {
const onRunUpdate = () => {
queryClient.invalidateQueries({
queryKey: getRun(pipelineId, triggerId, runId).queryKey,
})
}
}, [lastMessage, pipelineId])
socket.on('run-update', onRunUpdate)

return () => {
socket.off('run-update', onRunUpdate)
}
}, [pipelineId])

const pipelineQuery = useQuery(getPipeline(pipelineId))
const runQuery = useQuery(getRun(pipelineId, triggerId, runId))
Expand Down
3 changes: 1 addition & 2 deletions frontend/src/repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,7 @@ const post = async <ResponseType = any>(

export const getWebsocketUrl = () => {
const url = new URL(BASE_URL)
url.protocol = url.protocol === 'http:' ? 'ws' : 'wss'
url.pathname += '/ws/'
url.pathname = url.pathname.replace(/api$/, '')
return url
}

Expand Down
62 changes: 4 additions & 58 deletions frontend/src/socket.ts
Original file line number Diff line number Diff line change
@@ -1,61 +1,7 @@
import { useEffect } from 'react'
import useWebSocket from 'react-use-websocket'
import { io } from 'socket.io-client'

import { getWebsocketUrl } from './repository'
import { WebSocketMessage } from './types'

/**
* Hook to access the websocket
*
* This is a wrapper over react-use-websocket that enforces
* the application protocol implemented on the backend
*
* @param topic
* @returns
*/
export const useSocket = (topic?: string) => {
const filter = (msg: MessageEvent) => {
const { type } = JSON.parse(msg.data)

return type === topic
}

const hook = useWebSocket(getWebsocketUrl().toString(), {
share: true,
filter: topic ? filter : undefined,
})

const lastMessage = hook.lastJsonMessage as unknown as WebSocketMessage

const send = (type: string, data: any) => {
hook.sendJsonMessage({
type,
data,
})
}

const subscribe = (topic: string) => {
send('subscribe', topic)
}

const unsubscribe = (topic: string) => {
send('unsubscribe', topic)
}

useEffect(() => {
if (topic) {
subscribe(topic)

return () => {
unsubscribe(topic)
}
}
}, [])

return {
lastMessage,
send,
subscribe,
unsubscribe,
}
}
export const socket = io(getWebsocketUrl().toString(), {
transports: ['websocket'],
})
5 changes: 0 additions & 5 deletions frontend/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,6 @@ export interface PipelineRun {
tasks_run: TaskRun[]
}

export interface WebSocketMessage {
type: 'logs' | 'run-update'
data: any
}

export interface WhoamiResponse {
user: any
is_authentication_enabled: boolean
Expand Down
Loading

0 comments on commit aa912c2

Please sign in to comment.