Skip to content

Commit

Permalink
Setup rudimentary 'data fixer' framework + add first fixer for missing
Browse files Browse the repository at this point in the history
season numbers. Fixes #131
  • Loading branch information
chrisbenincasa committed Feb 26, 2024
1 parent 66bcae6 commit b5c768e
Show file tree
Hide file tree
Showing 5 changed files with 193 additions and 12 deletions.
6 changes: 4 additions & 2 deletions server/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import { UpdateXmlTvTask } from './tasks/updateXmlTvTask.js';
import { ServerOptions } from './types.js';
import { wait } from './util.js';
import { videoRouter } from './video.js';
import { runFixers } from './tasks/fixers/index.js';

const logger = createLogger(import.meta);

Expand All @@ -53,7 +54,7 @@ const __dirname = dirname(__filename);

console.log(
` \\
dizqueTV ${constants.VERSION_NAME}
Tunarr ${constants.VERSION_NAME}
.------------.
|:::///### o |
|:::///### |
Expand Down Expand Up @@ -117,8 +118,9 @@ export async function initServer(opts: ServerOptions) {
}

scheduleJobs(ctx);
await runFixers();

const updateXMLPromise = scheduledJobsById[UpdateXmlTvTask.ID]?.runNow();
const updateXMLPromise = scheduledJobsById[UpdateXmlTvTask.ID]!.runNow();

const app = fastify({ logger: false, bodyLimit: 50 * 1024 * 1024 });
await app
Expand Down
18 changes: 8 additions & 10 deletions server/services/scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,14 @@ class ScheduledTask<Data> {
// If background=true, this function will not return the underlying
// Promise generated by the running job and all errors will be swallowed.
async runNow(background: boolean = true) {
return withDb(async () => {
this.#scheduledJob.cancelNext(false);
const rescheduleCb = () => this.#scheduledJob.reschedule(this.#schedule);
if (background) {
await this.jobInternal().finally(rescheduleCb);
return;
} else {
return this.jobInternal(true).finally(rescheduleCb);
}
});
this.#scheduledJob.cancelNext(false);
const rescheduleCb = () => this.#scheduledJob.reschedule(this.#schedule);
if (background) {
await this.jobInternal().finally(rescheduleCb);
return;
} else {
return this.jobInternal(true).finally(rescheduleCb);
}
}

cancel() {
Expand Down
9 changes: 9 additions & 0 deletions server/tasks/fixers/fixer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import { EntityManager, withDb } from '../../dao/dataSource.js';

export default abstract class Fixer {
async run() {
return withDb((em) => this.runInternal(em));
}

protected abstract runInternal(em: EntityManager): Promise<void>;
}
24 changes: 24 additions & 0 deletions server/tasks/fixers/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import createLogger from '../../logger.js';
import Fixer from './fixer.js';
import { MissingSeasonNumbersFixer } from './missingSeasonNumbersFixer.js';

const logger = createLogger(import.meta);

// Run all fixers one-off, swallowing all errors.
// Fixers currently do not keep any state and we will
// just run them at each server start. As such, they
// should be idempotent.
// Maybe one day we'll import these all dynamically and run
// them, but not today.
export const runFixers = async () => {
const allFixers: Fixer[] = [new MissingSeasonNumbersFixer()];

for (const fixer of allFixers) {
try {
logger.debug('Running fixer %s', fixer.constructor.name);
await fixer.run();
} catch (e) {
logger.error('Fixer %s failed to run %O', fixer.constructor.name, e);
}
}
};
148 changes: 148 additions & 0 deletions server/tasks/fixers/missingSeasonNumbersFixer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
import { Cursor } from '@mikro-orm/core';
import { PlexEpisodeView, PlexSeasonView } from '@tunarr/types/plex';
import { first, forEach, groupBy, mapValues, pickBy } from 'lodash-es';
import { EntityManager } from '../../dao/dataSource.js';
import { PlexServerSettings } from '../../dao/entities/PlexServerSettings.js';
import { Program, ProgramType } from '../../dao/entities/Program.js';
import { logger } from '../../dao/legacyDbMigration.js';
import { Plex } from '../../plex.js';
import { Maybe } from '../../types.js';
import { groupByUniqAndMap, wait } from '../../util.js';
import Fixer from './fixer.js';

export class MissingSeasonNumbersFixer extends Fixer {
async runInternal(em: EntityManager): Promise<void> {
const allPlexServers = await em.findAll(PlexServerSettings);

if (allPlexServers.length === 0) {
return;
}

const plexByName = groupByUniqAndMap(
allPlexServers,
'name',
(server) => new Plex(server),
);

let cursor: Maybe<Cursor<Program>> = undefined;
do {
cursor = await em.findByCursor(
Program,
{ season: null, type: ProgramType.Episode },
{
first: 25,
orderBy: { uuid: 'desc' },
after: cursor,
},
);
const programsByPlexServer = groupBy(cursor.items, 'externalSourceId');
const goodProgramsByServer = pickBy(programsByPlexServer, (_, key) => {
const hasKey = !!plexByName[key];
if (!hasKey) {
logger.warn('No configured server called "%s"', key);
}
return hasKey;
});

const programsByServerAndParent: Record<
string,
Record<string, Program[]>
> = mapValues(goodProgramsByServer, (programs) => {
return groupBy(programs, (p) => p.parentExternalKey ?? 'unset');
});

for (const server in programsByServerAndParent) {
for (const parentId in programsByServerAndParent[server]) {
const programs = programsByServerAndParent[server][parentId];

if (parentId === 'unset') {
for (const program of programs) {
if (!program.plexRatingKey) {
logger.warn(
`Uh-oh, we're missing a plex rating key for %s`,
program.uuid,
);
continue;
}

const seasonNum = await this.findSeasonNumberUsingEpisode(
program.plexRatingKey,
plexByName[server],
);

await wait(100);

if (seasonNum) {
program.season = seasonNum;
em.persist(program);
}
}
} else {
const seasonNum = await this.findSeasonNumberUsingParent(
parentId,
plexByName[server],
);
await wait(100);

if (seasonNum) {
forEach(programs, (program) => {
program.season = seasonNum;
em.persist(program);
});
} else {
for (const program of programs) {
if (!program.plexRatingKey) {
logger.warn(
`Uh-oh, we're missing a plex rating key for %s`,
program.uuid,
);
continue;
}

const seasonNum = await this.findSeasonNumberUsingEpisode(
program.plexRatingKey,
plexByName[server],
);

await wait(100);

if (seasonNum) {
program.season = seasonNum;
em.persist(program);
}
}
}
}
}
}

await em.flush();
} while (cursor.hasNextPage);
}

private async findSeasonNumberUsingEpisode(episodeId: string, plex: Plex) {
try {
const episode = await plex.Get<PlexEpisodeView>(
`/library/metadata/${episodeId}`,
);
return episode?.parentIndex;
} catch (e) {
logger.warn('Error grabbing episode %s from plex: %O', episodeId, e);
return;
}
}

private async findSeasonNumberUsingParent(seasonId: string, plex: Plex) {
// We get the parent because we're dealing with an episode and we want the
// season index.
try {
const season = await plex.Get<PlexSeasonView>(
`/library/metadata/${seasonId}`,
);
return first(season?.Metadata ?? [])?.index;
} catch (e) {
logger.warn('Error grabbing season from plex: %O', e);
return;
}
}
}

0 comments on commit b5c768e

Please sign in to comment.