Skip to content

Commit

Permalink
Merge pull request #9 from JoaquinBCh/feature/play-pause
Browse files Browse the repository at this point in the history
feature: play/pause and running promise modified
  • Loading branch information
englishm authored Dec 3, 2024
2 parents f6c7424 + f7907db commit e3f0461
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 13 deletions.
10 changes: 9 additions & 1 deletion lib/playback/backend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,15 @@ export default class Backend {
this.send({ config: msg }, msg.video.canvas)
}

async play() {
pause() {
this.send({ pause: true })
}

async mute() {
await this.#audio?.context.suspend()
}

async unmute() {
await this.#audio?.context.resume()
}

Expand Down
97 changes: 85 additions & 12 deletions lib/playback/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,16 @@ export class Player {
#tracksByName: Map<string, Catalog.Track>
#tracknum: number
#audioTrackName: string
#videoTrackName: string
#muted: boolean
#paused: boolean

// Running is a promise that resolves when the player is closed.
// #close is called with no error, while #abort is called with an error.
#running: Promise<void>
#close!: () => void
#abort!: (err: Error) => void
#trackTasks: Map<string, Promise<void>> = new Map()

private constructor(connection: Connection, catalog: Catalog.Root, backend: Backend, tracknum: number) {
this.#connection = connection
Expand All @@ -46,15 +49,22 @@ export class Player {
this.#backend = backend
this.#tracknum = tracknum
this.#audioTrackName = ""
this.#videoTrackName = ""
this.#muted = false
this.#paused = false

const abort = new Promise<void>((resolve, reject) => {
this.#close = resolve
this.#abort = reject
})

// Async work
this.#running = Promise.race([this.#run(), abort]).catch(this.#close)
this.#running = abort.catch(this.#close)

this.#run().catch((err) => {
console.error("Error in #run():", err)
this.#abort(err)
})
}

static async create(config: PlayerConfig, tracknum: number): Promise<Player> {
Expand Down Expand Up @@ -87,7 +97,9 @@ export class Player {
await Promise.all(Array.from(inits).map((init) => this.#runInit(...init)))

// Call #runTrack on each track
await Promise.all(tracks.map((track) => this.#runTrack(track)))
tracks.forEach((track) => {
this.#runTrack(track)
})
}

async #runInit(namespace: string, name: string) {
Expand All @@ -107,18 +119,29 @@ export class Player {
}
}

async #runTrack(track: Catalog.Track) {
async #trackTask(track: Catalog.Track) {
if (!track.namespace) throw new Error("track has no namespace")

if (this.#paused) return

const kind = Catalog.isVideoTrack(track) ? "video" : Catalog.isAudioTrack(track) ? "audio" : "unknown"
if (kind == "audio" && this.#muted) return

const sub = await this.#connection.subscribe(track.namespace, track.name)

if (kind == "audio") {
// Save ref to last audio track we subscribed to for unmuting
this.#audioTrackName = track.name
}

if (kind == "video") {
this.#videoTrackName = track.name
}

try {
for (;;) {
const segment = await Promise.race([sub.data(), this.#running])
if (!segment) break
if (!segment) continue

if (!(segment instanceof GroupReader)) {
throw new Error(`expected group reader for segment: ${track.name}`)
Expand All @@ -132,11 +155,6 @@ export class Player {
throw new Error(`no init track for segment: ${track.name}`)
}

if (kind == "audio") {
// Save ref to last audio track we subscribed to for unmuting
this.#audioTrackName = track.name
}

const [buffer, stream] = segment.stream.release()

this.#backend.segment({
Expand All @@ -154,6 +172,23 @@ export class Player {
}
}

#runTrack(track: Catalog.Track) {
if (this.#trackTasks.has(track.name)) {
console.warn(`Already exist a runTrack task for the track: ${track.name}`)
return
}

const task = (async () => this.#trackTask(track))()

this.#trackTasks.set(track.name, task)

task.catch((err) => {
console.error(`Error to subscribe to track ${track.name}`, err)
}).finally(() => {
this.#trackTasks.delete(track.name)
})
}

getCatalog() {
return this.#catalog
}
Expand All @@ -171,8 +206,16 @@ export class Player {
return this.#catalog.tracks.filter(Catalog.isVideoTrack).map((track) => track.name)
}

getAudioTracks() {
return this.#catalog.tracks.filter(Catalog.isAudioTrack).map((track) => track.name)
}

async switchTrack(trackname: string) {
const currentTrack = this.getCurrentTrack()
if (this.#paused) {
this.#videoTrackName = trackname
return
}
if (currentTrack) {
console.log(`Unsubscribing from track: ${currentTrack.name} and Subscribing to track: ${trackname}`)
await this.unsubscribeFromTrack(currentTrack.name)
Expand All @@ -185,18 +228,35 @@ export class Player {
}

async mute(isMuted: boolean) {
this.#muted = isMuted
if (isMuted) {
console.log("Unsubscribing from audio track: ", this.#audioTrackName)
await this.unsubscribeFromTrack(this.#audioTrackName)
await this.#backend.mute()
} else {
console.log("Subscribing to audio track: ", this.#audioTrackName)
const audioTrack = this.#tracksByName.get(this.#audioTrackName)
audioTrack && (await this.#runTrack(audioTrack))
this.subscribeFromTrackName(this.#audioTrackName)
await this.#backend.unmute()
}
}

async unsubscribeFromTrack(trackname: string) {
console.log(`Unsubscribing from track: ${trackname}`)
await this.#connection.unsubscribe(trackname)
const task = this.#trackTasks.get(trackname)
if (task) {
await task
}
}

subscribeFromTrackName(trackname: string) {
console.log(`Subscribing to track: ${trackname}`)
const track = this.#tracksByName.get(trackname)
if (track) {
this.#runTrack(track)
} else {
console.warn(`Track ${trackname} not in #tracksByName`)
}
}

#onMessage(msg: Message.FromWorker) {
Expand Down Expand Up @@ -232,7 +292,20 @@ export class Player {
*/

async play() {
await this.#backend.play()
if (this.#paused) {
this.#paused = false
this.subscribeFromTrackName(this.#videoTrackName)
if (!this.#muted) {
this.subscribeFromTrackName(this.#audioTrackName)
await this.#backend.unmute()
}
} else {
await this.unsubscribeFromTrack(this.#videoTrackName)
await this.unsubscribeFromTrack(this.#audioTrackName)
await this.#backend.mute()
this.#backend.pause()
this.#paused = true
}
}

/*
Expand Down
8 changes: 8 additions & 0 deletions lib/playback/worker/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ class Worker {
this.#onInit(msg.init)
} else if (msg.segment) {
this.#onSegment(msg.segment).catch(console.warn)
} else if (msg.pause) {
this.#onPause(msg.pause)
} else {
throw new Error(`unknown message: + ${JSON.stringify(msg)}`)
}
Expand Down Expand Up @@ -100,6 +102,12 @@ class Worker {
// We done.
await segment.close()
}

#onPause(pause: boolean) {
if (this.#video && pause) {
this.#video.pause()
}
}
}

// Pass all events to the worker
Expand Down
1 change: 1 addition & 0 deletions lib/playback/worker/message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ export interface ToWorker {
// Sent on each init/data stream
init?: Init
segment?: Segment
pause?: boolean

/*
// Sent to control playback
Expand Down
5 changes: 5 additions & 0 deletions lib/playback/worker/video.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ export class Renderer {
this.#run().catch(console.error)
}

pause() {
console.log("pause")
this.#waitingForKeyframe = true
}

async #run() {
const reader = this.#timeline.frames.pipeThrough(this.#queue).getReader()
for (;;) {
Expand Down
12 changes: 12 additions & 0 deletions web/src/components/watch.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,17 @@ export default function Watch(props: { name: string }) {
usePlayer()?.play().catch(setError)
}

const handlePlayPause = async () => {
const player = usePlayer();
if (!player) return;

try {
await player.play();
} catch (error) {
setError();
}
};

// The JSON catalog for debugging.
const catalog = createMemo(() => {
const player = usePlayer()
Expand Down Expand Up @@ -115,6 +126,7 @@ export default function Watch(props: { name: string }) {
<input type="checkbox" checked={mute()} onChange={handleMuteChange} />
<span>Mute</span>
</label>
<button onClick={handlePlayPause}>{"Play/Pause"}</button>
</div>
</div>
<h3>Debug</h3>
Expand Down

0 comments on commit e3f0461

Please sign in to comment.