Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: guide/epg now accurately reflects state of on-demand channels #978

Open
wants to merge 1 commit into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 28 additions & 10 deletions server/src/api/debugApi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { ChannelLineupQuery } from '@tunarr/types/api';
import { ChannelLineupSchema } from '@tunarr/types/schemas';
import dayjs from 'dayjs';
import { jsonArrayFrom } from 'kysely/helpers/sqlite';
import { map, reject, some } from 'lodash-es';
import { isUndefined, map, reject, some } from 'lodash-es';
import os from 'node:os';
import z from 'zod';

Expand Down Expand Up @@ -41,30 +41,48 @@ export const debugApi: RouterPluginAsyncCallback = async (fastify) => {
});

fastify.get(
'/debug/helpers/current_program',
'/debug/helpers/playing_at',
{
schema: ChannelQuerySchema,
schema: {
querystring: ChannelQuerySchema.querystring.extend({
ts: z.coerce.number().optional(),
}),
},
},
async (req, res) => {
const channel = await req.serverCtx.channelDB.getChannelAndPrograms(
req.query.channelId,
);
const channelAndLineup =
await req.serverCtx.channelDB.loadChannelAndLineup(req.query.channelId);

if (!channel) {
if (!channelAndLineup) {
return res
.status(404)
.send({ error: 'No channel with ID ' + req.query.channelId });
}

const result = req.serverCtx
const { channel, lineup } = channelAndLineup;

if (
lineup.onDemandConfig?.state === 'paused' &&
isUndefined(req.query.ts)
) {
req.query.ts = channel.startTime + lineup.onDemandConfig.cursor;
}

const result = await req.serverCtx
.streamProgramCalculator()
.getCurrentLineupItem({
startTime: new Date().getTime(),
startTime: req.query.ts ?? +dayjs(),
channelId: req.query.channelId,
allowSkip: true,
});

return res.send(result);
return result
.map((lineupItem) => {
return res.send(lineupItem);
})
.getOrElse(() => {
return res.status(500).send();
});
},
);

Expand Down
30 changes: 27 additions & 3 deletions server/src/db/ChannelDB.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import {
partition,
reduce,
reject,
sum,
sumBy,
take,
uniq,
Expand Down Expand Up @@ -98,7 +99,10 @@ import {
} from './schema/Channel.ts';
import { programExternalIdString } from './schema/Program.ts';
import { ChannelTranscodingSettings } from './schema/base.ts';
import { ChannelWithPrograms as RawChannelWithPrograms } from './schema/derivedTypes.js';
import {
ChannelWithPrograms,
ChannelWithPrograms as RawChannelWithPrograms,
} from './schema/derivedTypes.js';

dayjs.extend(duration);

Expand Down Expand Up @@ -508,6 +512,24 @@ export class ChannelDB {
.executeTakeFirst();
}

async syncChannelDuration(id: string) {
const channelAndLineup = await this.loadChannelAndLineup(id);
if (!channelAndLineup) {
return false;
}
const { channel, lineup } = channelAndLineup;
const lineupDuration = sum(map(lineup.items, (item) => item.durationMs));
if (lineupDuration !== channel.duration) {
await getDatabase()
.updateTable('channel')
.where('channel.uuid', '=', id)
.set('duration', lineupDuration)
.executeTakeFirst();
return true;
}
return false;
}

async deleteChannel(
channelId: string,
blockOnLineupUpdates: boolean = false,
Expand Down Expand Up @@ -936,8 +958,10 @@ export class ChannelDB {
};
}

async loadDirectChannelAndLineup(channelId: string) {
const channel = await this.getChannel(channelId);
async loadChannelWithProgamsAndLineup(
channelId: string,
): Promise<{ channel: ChannelWithPrograms; lineup: Lineup } | null> {
const channel = await this.getChannelAndPrograms(channelId);
if (isNil(channel)) {
return null;
}
Expand Down
78 changes: 49 additions & 29 deletions server/src/services/OnDemandChannelService.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
import { ChannelDB } from '@/db/ChannelDB.ts';
import { OnDemandChannelConfig } from '@/db/derived_types/Lineup.ts';
import { serverContext } from '@/serverContext.ts';
import { UpdateXmlTvTask } from '@/tasks/UpdateXmlTvTask.ts';
import { LoggerFactory } from '@/util/logging/LoggerFactory.js';
import { MutexMap } from '@/util/mutexMap.js';
import dayjs from 'dayjs';
import { isNull, isUndefined } from 'lodash-es';
import { GlobalScheduler } from './Scheduler.ts';

export class OnDemandChannelService {
#logger = LoggerFactory.child({ className: this.constructor.name });
Expand All @@ -22,22 +26,11 @@ export class OnDemandChannelService {
}

async pauseAllChannels() {
const allConfigs = await this.channelDB.loadAllLineupConfigs();
const now = dayjs().unix() * 1000;
for (const [channelId, { lineup }] of Object.entries(allConfigs)) {
if (isUndefined(lineup.onDemandConfig)) {
continue;
}
const channels = await this.channelDB.getAllChannels();
const now = +dayjs();

if (lineup.onDemandConfig.state === 'paused') {
continue;
}

await this.channelDB.updateLineupConfig(channelId, 'onDemandConfig', {
...lineup.onDemandConfig,
state: 'paused',
lastPaused: now,
});
for (const channel of channels) {
await this.pauseChannel(channel.uuid, now);
}
}

Expand Down Expand Up @@ -73,13 +66,20 @@ export class OnDemandChannelService {
: (lineup.onDemandConfig.cursor + elapsed - rewindMs) %
channel.duration;

return await this.channelDB
await this.channelDB
.updateLineupConfig(id, 'onDemandConfig', {
...(lineup.onDemandConfig ?? {}),
state: 'paused',
lastPaused: pauseTime,
cursor: nextCursor,
})
.then(() => {
GlobalScheduler.scheduleOneOffTask(
`Pause_Channel_Update_Guide_${id}`,
dayjs().add(1000),
UpdateXmlTvTask.create(serverContext(), id),
);
})
.finally(() => {
this.#logger.debug(
'Paused on-demand channel %s (at = %s)',
Expand Down Expand Up @@ -112,7 +112,7 @@ export class OnDemandChannelService {
// and skip it if it's a commercial.

const now = dayjs();
return await this.channelDB
await this.channelDB
.updateLineupConfig(id, 'onDemandConfig', {
...(lineup.onDemandConfig ?? {}),
state: 'playing',
Expand All @@ -125,10 +125,36 @@ export class OnDemandChannelService {
now.format(),
);
});

GlobalScheduler.scheduleOneOffTask(
`Resume_Channel_Update_Guide_${id}`,
dayjs().add(1000),
UpdateXmlTvTask.create(serverContext(), id),
);
});
}

async getLiveTimestamp(channelId: string, requestTime: number) {
getLiveTimestampForConfig(
onDemandConfig: OnDemandChannelConfig,
channelStartTime: number,
requestTime: number,
): number {
let sinceResume = dayjs(requestTime).diff(
dayjs(onDemandConfig.lastResumed),
);

// Don't skip milliseconds
if (sinceResume < 1_000) {
sinceResume = 0;
}

return channelStartTime + onDemandConfig.cursor + sinceResume;
}

async getLiveTimestamp(
channelId: string,
requestTime: number,
): Promise<number> {
const channelAndLineup = await this.loadOnDemandChannelLineup(channelId);

if (isUndefined(channelAndLineup)) {
Expand All @@ -141,21 +167,15 @@ export class OnDemandChannelService {
return requestTime;
}

let sinceResume = dayjs(requestTime).diff(
dayjs(lineup.onDemandConfig.lastResumed),
return this.getLiveTimestampForConfig(
lineup.onDemandConfig,
channel.startTime,
requestTime,
);

// Don't skip milliseconds
if (sinceResume < 1_000) {
sinceResume = 0;
}

return channel.startTime + lineup.onDemandConfig.cursor + sinceResume;
}

private async loadOnDemandChannelLineup(id: string) {
const channelAndLineup =
await this.channelDB.loadDirectChannelAndLineup(id);
const channelAndLineup = await this.channelDB.loadChannelAndLineup(id);
if (isNull(channelAndLineup)) {
return;
}
Expand Down
Loading
Loading