Skip to content

Commit

Permalink
checkpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisbenincasa committed Dec 21, 2024
1 parent 2f39648 commit f296e51
Show file tree
Hide file tree
Showing 6 changed files with 133 additions and 53 deletions.
31 changes: 25 additions & 6 deletions server/src/api/debug/debugStreamApi.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
import { getDatabase } from '@/db/DBAccess.ts';
import { createOfflineStreamLineupItem } from '@/db/derived_types/StreamLineup.ts';
import {
ProgramStreamLineupItem,
createOfflineStreamLineupItem,
} from '@/db/derived_types/StreamLineup.ts';
import { AllChannelTableKeys, Channel } from '@/db/schema/Channel.ts';
import { MediaSourceType } from '@/db/schema/MediaSource.ts';
import { ProgramDao, ProgramType } from '@/db/schema/Program.ts';
import {
AllTranscodeConfigColumns,
TranscodeConfig,
} from '@/db/schema/TranscodeConfig.ts';
import { MpegTsOutputFormat } from '@/ffmpeg/builder/constants.ts';
import { serverContext } from '@/serverContext.ts';
import { OfflineProgramStream } from '@/stream/OfflinePlayer.ts';
import { PlayerContext } from '@/stream/PlayerStreamContext.ts';
import { ProgramStream } from '@/stream/ProgramStream.ts';
import { JellyfinProgramStream } from '@/stream/jellyfin/JellyfinProgramStream.ts';
import { PlexProgramStream } from '@/stream/plex/PlexProgramStream.ts';
import { TruthyQueryParam } from '@/types/schemas.ts';
import { RouterPluginAsyncCallback } from '@/types/serverType.ts';
import dayjs from '@/util/dayjs.ts';
import { jsonObjectFrom } from 'kysely/helpers/sqlite';
import { isNumber, isUndefined, nth, random } from 'lodash-es';
import { PassThrough } from 'stream';
Expand Down Expand Up @@ -267,10 +271,8 @@ export const debugStreamApiRouter: RouterPluginAsyncCallback = async (
startTime: number = 0,
useNewPipeline: boolean = false,
) {
const lineupItem = serverContext()
.streamProgramCalculator()
.createStreamItemFromProgram(program);
lineupItem.start = startTime;
const lineupItem = createStreamItemFromProgram(program);
lineupItem.startOffset = startTime;
const ctx = new PlayerContext(
lineupItem,
channel,
Expand Down Expand Up @@ -298,3 +300,20 @@ export const debugStreamApiRouter: RouterPluginAsyncCallback = async (
return out;
}
};

function createStreamItemFromProgram(
program: ProgramDao,
): ProgramStreamLineupItem {
return {
...program,
type: 'program',
programType: program.type,
programId: program.uuid,
id: program.uuid,
// HACK
externalSource: z.nativeEnum(MediaSourceType).parse(program.sourceType),
plexFilePath: program.plexFilePath ?? undefined,
filePath: program.filePath ?? undefined,
programBeginMs: +dayjs(),
};
}
7 changes: 5 additions & 2 deletions server/src/db/derived_types/StreamLineup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ const baseStreamLineupItemSchema = z.object({
streamDuration: z.number().nonnegative().optional(),
beginningOffset: z.number().nonnegative().optional(),
title: z.string().optional(),
start: z.number().nonnegative().optional(),
startOffset: z.number().nonnegative().optional(),
programBeginMs: z.number().nonnegative(),
duration: z.number().nonnegative(),
});

Expand Down Expand Up @@ -139,10 +140,12 @@ export type EnrichedLineupItem = z.infer<typeof EnrichedLineupItemSchema>;

export function createOfflineStreamLineupItem(
duration: number,
programBeginMs: number,
): OfflineStreamLineupItem {
return {
duration,
start: 0,
startOffset: 0,
type: 'offline',
programBeginMs,
};
}
2 changes: 1 addition & 1 deletion server/src/ffmpeg/FfmpegStreamFactory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ export class FfmpegStreamFactory extends IFFMPEG {
videoBufferSize: playbackParams.videoBufferSize,
videoTrackTimescale: playbackParams.videoTrackTimeScale,
videoFormat: playbackParams.videoFormat,
// videoPreset: playbackParams.vid
// videoPreset: playbackParams.video
}),
);

Expand Down
72 changes: 30 additions & 42 deletions server/src/stream/StreamProgramCalculator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import { ProgramDB } from '@/db/ProgramDB.ts';
import { ProgramExternalIdType } from '@/db/custom_types/ProgramExternalIdType.ts';
import { Channel } from '@/db/schema/Channel.ts';
import { MediaSourceType } from '@/db/schema/MediaSource.ts';
import { ProgramDao as RawProgram } from '@/db/schema/Program.ts';
import type { ProgramDaoWithRelations as RawProgramEntity } from '@/db/schema/derivedTypes.js';
import { FillerPicker } from '@/services/FillerPicker.js';
import { Result } from '@/types/result.js';
Expand All @@ -15,15 +14,13 @@ import constants from '@tunarr/shared/constants';
import dayjs from 'dayjs';
import { first, isEmpty, isNil, isNull, isUndefined, nth } from 'lodash-es';
import { StrictExclude } from 'ts-essentials';
import { z } from 'zod';
import {
Lineup,
isContentItem,
isOfflineItem,
} from '../db/derived_types/Lineup.ts';
import {
EnrichedLineupItem,
ProgramStreamLineupItem,
RedirectStreamLineupItem,
StreamLineupItem,
createOfflineStreamLineupItem,
Expand Down Expand Up @@ -110,10 +107,7 @@ export class StreamProgramCalculator {
lineup,
);

while (
!isUndefined(currentProgram) &&
currentProgram.program.type === 'redirect'
) {
while (currentProgram.program.type === 'redirect') {
redirectChannels.push(channelContext.uuid);
upperBounds.push(
currentProgram.program.duration - currentProgram.timeElapsed,
Expand All @@ -131,7 +125,8 @@ export class StreamProgramCalculator {
redirectChannels.join(', '),
duration: 60_000,
streamDuration: 60_000,
start: 0,
startOffset: 0,
programBeginMs: req.startTime,
},
);
}
Expand All @@ -145,7 +140,7 @@ export class StreamProgramCalculator {
this.logger.error(msg);
currentProgram = {
program: {
...createOfflineStreamLineupItem(60000),
...createOfflineStreamLineupItem(60000, req.startTime),
type: 'error',
error: msg,
},
Expand All @@ -161,7 +156,7 @@ export class StreamProgramCalculator {
req.startTime,
);

if (!isUndefined(lineupItem)) {
if (lineupItem) {
break;
} else {
currentProgram = await this.getCurrentProgramAndTimeElapsed(
Expand Down Expand Up @@ -193,6 +188,7 @@ export class StreamProgramCalculator {
//filler to play (if any)
currentProgram.program = createOfflineStreamLineupItem(
dayjs.duration({ years: 1 }).asMilliseconds(),
req.startTime,
);
} else if (
req.allowSkip &&
Expand All @@ -207,11 +203,10 @@ export class StreamProgramCalculator {
await this.channelCache.clearPlayback(redirectChannels[i]);
}

this.logger.info(
this.logger.debug(
'Too little time before the filler ends, skip to next slot',
);

// return await this.startStream(req, startTimestamp + dt + 1, false);
return await this.getCurrentLineupItem({
...req,
startTime: req.startTime + dt + 1,
Expand All @@ -235,7 +230,7 @@ export class StreamProgramCalculator {
);
}

if (!isUndefined(lineupItem)) {
if (lineupItem) {
let upperBound = Number.MAX_SAFE_INTEGER;
const beginningOffset = lineupItem?.beginningOffset ?? 0;

Expand Down Expand Up @@ -278,7 +273,8 @@ export class StreamProgramCalculator {
type: 'error',
error: 'Too many attempts, throttling',
duration: 60_000,
start: 0,
startOffset: 0,
programBeginMs: req.startTime,
};
}

Expand All @@ -296,7 +292,10 @@ export class StreamProgramCalculator {
'Channel start time is above the given date. Flex time is picked till that.',
);
return {
program: createOfflineStreamLineupItem(channel.startTime - timestamp),
program: createOfflineStreamLineupItem(
channel.startTime - timestamp,
timestamp,
),
timeElapsed: 0,
programIndex: -1,
};
Expand Down Expand Up @@ -375,9 +374,10 @@ export class StreamProgramCalculator {
program = {
duration: lineupItem.durationMs,
type: 'offline',
programBeginMs: timestamp - timeElapsed,
};

if (!isNil(backingItem)) {
if (backingItem) {
// Will play this item on the first found server... unsure if that is
// what we want
const externalInfo = backingItem.externalIds.find(
Expand All @@ -386,10 +386,7 @@ export class StreamProgramCalculator {
eid.sourceType === ProgramExternalIdType.JELLYFIN,
);

if (
!isUndefined(externalInfo) &&
isNonEmptyString(externalInfo.externalSourceId)
) {
if (externalInfo && isNonEmptyString(externalInfo.externalSourceId)) {
program = {
type: 'program',
externalSource:
Expand All @@ -405,19 +402,21 @@ export class StreamProgramCalculator {
title: backingItem.title,
id: backingItem.uuid,
programType: backingItem.type,
programBeginMs: timestamp - timeElapsed,
};
}
}
} else if (isOfflineItem(lineupItem)) {
program = {
duration: lineupItem.durationMs,
type: 'offline',
...createOfflineStreamLineupItem(lineupItem.durationMs, timestamp),
programBeginMs: timestamp - timeElapsed,
};
} else {
program = {
duration: lineupItem.durationMs,
channel: lineupItem.channel,
type: 'redirect',
programBeginMs: timestamp - timeElapsed,
};
}

Expand Down Expand Up @@ -451,8 +450,9 @@ export class StreamProgramCalculator {
error: activeProgram.error,
streamDuration: remaining,
duration: remaining,
start: 0,
beginningOffset: beginningOffset,
startOffset: 0,
beginningOffset,
programBeginMs: activeProgram.programBeginMs,
};
}

Expand Down Expand Up @@ -524,7 +524,7 @@ export class StreamProgramCalculator {
externalInfo.sourceType === ProgramExternalIdType.JELLYFIN
? MediaSourceType.Jellyfin
: MediaSourceType.Plex,
start: fillerstart,
startOffset: fillerstart,
streamDuration: Math.max(
1,
Math.min(filler.duration - fillerstart, remaining),
Expand All @@ -535,6 +535,7 @@ export class StreamProgramCalculator {
externalSourceId: externalInfo.externalSourceId!,
plexFilePath: nullToUndefined(externalInfo.externalFilePath),
programType: filler.type,
programBeginMs: activeProgram.programBeginMs,
};
}
}
Expand All @@ -550,7 +551,8 @@ export class StreamProgramCalculator {
streamDuration: remaining,
beginningOffset: beginningOffset,
duration: remaining,
start: 0,
startOffset: 0,
programBeginMs: activeProgram.programBeginMs,
};
}

Expand All @@ -563,24 +565,10 @@ export class StreamProgramCalculator {
return {
...activeProgram,
type: 'program',
start: timeElapsed,
startOffset: timeElapsed,
streamDuration: activeProgram.duration - timeElapsed,
beginningOffset: beginningOffset,
beginningOffset,
id: activeProgram.id,
};
}

createStreamItemFromProgram(program: RawProgram): ProgramStreamLineupItem {
return {
...program,
type: 'program',
programType: program.type,
programId: program.uuid,
id: program.uuid,
// HACK
externalSource: z.nativeEnum(MediaSourceType).parse(program.sourceType),
plexFilePath: program.plexFilePath ?? undefined,
filePath: program.filePath ?? undefined,
};
}
}
70 changes: 70 additions & 0 deletions server/src/stream/hls/HlsPlaylistCreator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import { ChannelDB } from '@/db/ChannelDB.ts';
import { StreamLineupItem } from '@/db/derived_types/StreamLineup.ts';
import { StreamProgramCalculator } from '@/stream/StreamProgramCalculator.ts';
import dayjs from '@/util/dayjs.ts';
import NodeCache from 'node-cache';
import util from 'node:util';

const playlistFmtString = ```#EXTM3U
#EXT-X-VERSION:3
#EXT-X-TARGETDURATION:10
#EXT-X-MEDIA-SEQUENCE:%d
#EXT-X-DISCONTINUITY
#EXTINF:%d,
%s://%s/ffmpeg/stream/%s?index=%d%s
```;

type CachedCurrentIndex = {
startTime: number;
index: number;
};

export class HlsPlaylistCreator {
private static cache = new NodeCache({
stdTTL: +dayjs.duration({ days: 1 }),
});

constructor(
private channelDB: ChannelDB,
private streamProgramCalculator: StreamProgramCalculator,
) {}

async createPlaylist(channelId: string, now: dayjs.Dayjs) {
const lineupItemResult =
await this.streamProgramCalculator.getCurrentLineupItem({
channelId,
startTime: +now,
allowSkip: false,
});

if (lineupItemResult.isFailure()) {
throw lineupItemResult.error;
}

const { lineupItem } = lineupItemResult.get();

const currentIndex = this.getLineupItemIndex(channelId, lineupItem);

return util.format(playlistFmtString, currentIndex);
}

private getLineupItemIndex(channelId: string, lineupItem: StreamLineupItem) {
const cachedValue =
HlsPlaylistCreator.cache.get<CachedCurrentIndex>(channelId);
if (cachedValue && cachedValue.startTime === lineupItem.programBeginMs) {
return cachedValue.index;
} else if (cachedValue) {
HlsPlaylistCreator.cache.set<CachedCurrentIndex>(channelId, {
startTime: lineupItem.programBeginMs,
index: cachedValue.index + 1,
});
return cachedValue.index + 1;
} else {
HlsPlaylistCreator.cache.set<CachedCurrentIndex>(channelId, {
startTime: lineupItem.programBeginMs,
index: 1,
});
return 1;
}
}
}
Loading

0 comments on commit f296e51

Please sign in to comment.