Skip to content

Commit

Permalink
feat: run all offline checks in a single job
Browse files Browse the repository at this point in the history
  • Loading branch information
etnoy committed Dec 2, 2024
1 parent ba71fd4 commit 8ecde3b
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 53 deletions.
14 changes: 7 additions & 7 deletions server/src/interfaces/job.interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ export enum JobName {
LIBRARY_QUEUE_SYNC_FILES = 'library-queue-sync-files',
LIBRARY_QUEUE_SYNC_ASSETS = 'library-queue-sync-assets',
LIBRARY_SYNC_FILE = 'library-sync-file',
LIBRARY_SYNC_ASSET = 'library-sync-asset',
LIBRARY_SYNC_ASSETS = 'library-sync-assets',
LIBRARY_DELETE = 'library-delete',
LIBRARY_QUEUE_SYNC_ALL = 'library-queue-sync-all',
LIBRARY_QUEUE_CLEANUP = 'library-queue-cleanup',
Expand Down Expand Up @@ -148,15 +148,15 @@ export interface ILibraryFileJob extends IEntityJob {
assetPath: string;
}

export interface ILibraryAssetJob extends IEntityJob {
importPaths: string[];
exclusionPatterns: string[];
}

export interface IBulkEntityJob extends IBaseJob {
ids: string[];
}

export interface ILibraryAssetsJob extends IBulkEntityJob {
importPaths: string[];
exclusionPatterns: string[];
}

export interface IDeleteFilesJob extends IBaseJob {
files: Array<string | null | undefined>;
}
Expand Down Expand Up @@ -287,7 +287,7 @@ export type JobItem =
| { name: JobName.LIBRARY_SYNC_FILE; data: ILibraryFileJob }
| { name: JobName.LIBRARY_QUEUE_SYNC_FILES; data: IEntityJob }
| { name: JobName.LIBRARY_QUEUE_SYNC_ASSETS; data: IEntityJob }
| { name: JobName.LIBRARY_SYNC_ASSET; data: ILibraryAssetJob }
| { name: JobName.LIBRARY_SYNC_ASSETS; data: ILibraryAssetsJob }
| { name: JobName.LIBRARY_DELETE; data: IEntityJob }
| { name: JobName.LIBRARY_QUEUE_SYNC_ALL; data?: IBaseJob }
| { name: JobName.LIBRARY_QUEUE_CLEANUP; data: IBaseJob }
Expand Down
56 changes: 28 additions & 28 deletions server/src/services/library.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { ICronRepository } from 'src/interfaces/cron.interface';
import { IDatabaseRepository } from 'src/interfaces/database.interface';
import {
IJobRepository,
ILibraryAssetJob,
ILibraryAssetsJob,
ILibraryFileJob,
JobName,
JOBS_LIBRARY_PAGINATION_SIZE,
Expand Down Expand Up @@ -231,7 +231,7 @@ describe(LibraryService.name, () => {

expect(jobMock.queueAll).toHaveBeenCalledWith([

Check failure on line 232 in server/src/services/library.service.spec.ts

View workflow job for this annotation

GitHub Actions / Test & Lint Server

src/services/library.service.spec.ts > LibraryService > handleQueueRemoveDeleted > should queue online check of existing assets

AssertionError: expected "spy" to be called with arguments: [ [ { …(2) } ] ] Received: Number of calls: 0 ❯ src/services/library.service.spec.ts:232:32
{
name: JobName.LIBRARY_SYNC_ASSET,
name: JobName.LIBRARY_SYNC_ASSETS,
data: {
id: assetStub.external.id,
importPaths: libraryStub.externalLibrary1.importPaths,
Expand All @@ -250,30 +250,30 @@ describe(LibraryService.name, () => {

describe('handleSyncAsset', () => {
it('should skip missing assets', async () => {
const mockAssetJob: ILibraryAssetJob = {
id: assetStub.external.id,
const mockAssetJob: ILibraryAssetsJob = {
ids: [assetStub.external.id],
importPaths: ['/'],
exclusionPatterns: [],
};

assetMock.getById.mockResolvedValue(null);

await expect(sut.handleSyncAsset(mockAssetJob)).resolves.toBe(JobStatus.SKIPPED);
await expect(sut.handleSyncAssets(mockAssetJob)).resolves.toBe(JobStatus.SKIPPED);

Check failure on line 261 in server/src/services/library.service.spec.ts

View workflow job for this annotation

GitHub Actions / Test & Lint Server

src/services/library.service.spec.ts > LibraryService > handleSyncAsset > should skip missing assets

AssertionError: expected 'success' to be 'skipped' // Object.is equality Expected: "skipped" Received: "success" ❯ src/services/library.service.spec.ts:261:7

expect(assetMock.remove).not.toHaveBeenCalled();
});

it('should offline assets no longer on disk', async () => {
const mockAssetJob: ILibraryAssetJob = {
id: assetStub.external.id,
const mockAssetJob: ILibraryAssetsJob = {
ids: [assetStub.external.id],
importPaths: ['/'],
exclusionPatterns: [],
};

assetMock.getById.mockResolvedValue(assetStub.external);
storageMock.stat.mockRejectedValue(new Error('ENOENT, no such file or directory'));

await expect(sut.handleSyncAsset(mockAssetJob)).resolves.toBe(JobStatus.SUCCESS);
await expect(sut.handleSyncAssets(mockAssetJob)).resolves.toBe(JobStatus.SUCCESS);

expect(assetMock.updateAll).toHaveBeenCalledWith([assetStub.external.id], {
isOffline: true,
Expand All @@ -282,32 +282,32 @@ describe(LibraryService.name, () => {
});

it('should offline assets matching an exclusion pattern', async () => {
const mockAssetJob: ILibraryAssetJob = {
id: assetStub.external.id,
const mockAssetJob: ILibraryAssetsJob = {
ids: [assetStub.external.id],
importPaths: ['/'],
exclusionPatterns: ['**/user1/**'],
};

assetMock.getById.mockResolvedValue(assetStub.external);

await expect(sut.handleSyncAsset(mockAssetJob)).resolves.toBe(JobStatus.SUCCESS);
await expect(sut.handleSyncAssets(mockAssetJob)).resolves.toBe(JobStatus.SUCCESS);
expect(assetMock.updateAll).toHaveBeenCalledWith([assetStub.external.id], {
isOffline: true,
deletedAt: expect.any(Date),
});
});

it('should set assets outside of import paths as offline', async () => {
const mockAssetJob: ILibraryAssetJob = {
id: assetStub.external.id,
const mockAssetJob: ILibraryAssetsJob = {
ids: [assetStub.external.id],
importPaths: ['/data/user2'],
exclusionPatterns: [],
};

assetMock.getById.mockResolvedValue(assetStub.external);
storageMock.checkFileExists.mockResolvedValue(true);

await expect(sut.handleSyncAsset(mockAssetJob)).resolves.toBe(JobStatus.SUCCESS);
await expect(sut.handleSyncAssets(mockAssetJob)).resolves.toBe(JobStatus.SUCCESS);

expect(assetMock.updateAll).toHaveBeenCalledWith([assetStub.external.id], {
isOffline: true,
Expand All @@ -316,31 +316,31 @@ describe(LibraryService.name, () => {
});

it('should do nothing with online assets', async () => {
const mockAssetJob: ILibraryAssetJob = {
id: assetStub.external.id,
const mockAssetJob: ILibraryAssetsJob = {
ids: [assetStub.external.id],
importPaths: ['/'],
exclusionPatterns: [],
};

assetMock.getById.mockResolvedValue(assetStub.external);
storageMock.stat.mockResolvedValue({ mtime: assetStub.external.fileModifiedAt } as Stats);

await expect(sut.handleSyncAsset(mockAssetJob)).resolves.toBe(JobStatus.SUCCESS);
await expect(sut.handleSyncAssets(mockAssetJob)).resolves.toBe(JobStatus.SUCCESS);

expect(assetMock.updateAll).not.toHaveBeenCalled();
});

it('should un-trash an asset previously marked as offline', async () => {
const mockAssetJob: ILibraryAssetJob = {
id: assetStub.external.id,
const mockAssetJob: ILibraryAssetsJob = {
ids: [assetStub.external.id],
importPaths: ['/'],
exclusionPatterns: [],
};

assetMock.getById.mockResolvedValue(assetStub.trashedOffline);
storageMock.stat.mockResolvedValue({ mtime: assetStub.trashedOffline.fileModifiedAt } as Stats);

await expect(sut.handleSyncAsset(mockAssetJob)).resolves.toBe(JobStatus.SUCCESS);
await expect(sut.handleSyncAssets(mockAssetJob)).resolves.toBe(JobStatus.SUCCESS);

expect(assetMock.updateAll).toHaveBeenCalledWith([assetStub.trashedOffline.id], {
deletedAt: null,
Expand All @@ -353,8 +353,8 @@ describe(LibraryService.name, () => {
});

it('should update file when mtime has changed', async () => {
const mockAssetJob: ILibraryAssetJob = {
id: assetStub.external.id,
const mockAssetJob: ILibraryAssetsJob = {
ids: [assetStub.external.id],
importPaths: ['/'],
exclusionPatterns: [],
};
Expand All @@ -363,7 +363,7 @@ describe(LibraryService.name, () => {
assetMock.getById.mockResolvedValue(assetStub.external);
storageMock.stat.mockResolvedValue({ mtime: newMTime } as Stats);

await expect(sut.handleSyncAsset(mockAssetJob)).resolves.toBe(JobStatus.SUCCESS);
await expect(sut.handleSyncAssets(mockAssetJob)).resolves.toBe(JobStatus.SUCCESS);

expect(assetMock.updateAll).toHaveBeenCalledWith([assetStub.external.id], {
fileModifiedAt: newMTime,
Expand Down Expand Up @@ -969,7 +969,7 @@ describe(LibraryService.name, () => {
},
]);
expect(jobMock.queueAll).toHaveBeenCalledWith([

Check failure on line 971 in server/src/services/library.service.spec.ts

View workflow job for this annotation

GitHub Actions / Test & Lint Server

src/services/library.service.spec.ts > LibraryService > watchAll > watching enabled > should handle a new file event

AssertionError: expected "spy" to be called with arguments: [ [ { …(2) } ] ] Received: 1st spy call: Array [ Array [ Object { - "data": ObjectContaining { - "ids": Array [ - "asset-id", - ], + "data": Object { + "assetPath": "/foo/photo.jpg", + "id": "library-id-with-paths1", + "ownerId": "admin_id", }, - "name": "library-sync-assets", + "name": "library-sync-file", }, ], ] Number of calls: 1 ❯ src/services/library.service.spec.ts:971:34
{ name: JobName.LIBRARY_SYNC_ASSET, data: expect.objectContaining({ id: assetStub.image.id }) },
{ name: JobName.LIBRARY_SYNC_ASSETS, data: expect.objectContaining({ ids: [assetStub.image.id] }) },
]);
});

Expand All @@ -994,7 +994,7 @@ describe(LibraryService.name, () => {
},
]);
expect(jobMock.queueAll).toHaveBeenCalledWith([

Check failure on line 996 in server/src/services/library.service.spec.ts

View workflow job for this annotation

GitHub Actions / Test & Lint Server

src/services/library.service.spec.ts > LibraryService > watchAll > watching enabled > should handle a file change event

AssertionError: expected "spy" to be called with arguments: [ [ { …(2) } ] ] Received: 1st spy call: Array [ Array [ Object { - "data": ObjectContaining { - "id": Array [ - "asset-id", - ], + "data": Object { + "assetPath": "/foo/photo.jpg", + "id": "library-id-with-paths1", + "ownerId": "admin_id", }, - "name": "library-sync-assets", + "name": "library-sync-file", }, ], ] Number of calls: 1 ❯ src/services/library.service.spec.ts:996:34
{ name: JobName.LIBRARY_SYNC_ASSET, data: expect.objectContaining({ id: assetStub.image.id }) },
{ name: JobName.LIBRARY_SYNC_ASSETS, data: expect.objectContaining({ id: [assetStub.image.id] }) },
]);
});

Expand All @@ -1009,7 +1009,7 @@ describe(LibraryService.name, () => {
await sut.watchAll();

expect(jobMock.queueAll).toHaveBeenCalledWith([

Check failure on line 1011 in server/src/services/library.service.spec.ts

View workflow job for this annotation

GitHub Actions / Test & Lint Server

src/services/library.service.spec.ts > LibraryService > watchAll > watching enabled > should handle a file unlink event

AssertionError: expected "spy" to be called with arguments: [ [ { …(2) } ] ] Received: Number of calls: 0 ❯ src/services/library.service.spec.ts:1011:34
{ name: JobName.LIBRARY_SYNC_ASSET, data: expect.objectContaining({ id: assetStub.image.id }) },
{ name: JobName.LIBRARY_SYNC_ASSETS, data: expect.objectContaining({ ids: [assetStub.image.id] }) },
]);
});

Expand Down Expand Up @@ -1166,9 +1166,9 @@ describe(LibraryService.name, () => {

expect(jobMock.queueAll).toHaveBeenCalledWith([

Check failure on line 1167 in server/src/services/library.service.spec.ts

View workflow job for this annotation

GitHub Actions / Test & Lint Server

src/services/library.service.spec.ts > LibraryService > handleQueueAssetOfflineCheck > should queue removal jobs

AssertionError: expected "spy" to be called with arguments: [ [ { …(2) } ] ] Received: Number of calls: 0 ❯ src/services/library.service.spec.ts:1167:32
{
name: JobName.LIBRARY_SYNC_ASSET,
name: JobName.LIBRARY_SYNC_ASSETS,
data: {
id: assetStub.image1.id,
ids: [assetStub.image1.id],
importPaths: libraryStub.externalLibrary1.importPaths,
exclusionPatterns: libraryStub.externalLibrary1.exclusionPatterns,
},
Expand Down
44 changes: 26 additions & 18 deletions server/src/services/library.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -242,12 +242,10 @@ export class LibraryService extends BaseService {
}

private async syncAssets({ importPaths, exclusionPatterns }: LibraryEntity, assetIds: string[]) {
await this.jobRepository.queueAll(
assetIds.map((assetId) => ({
name: JobName.LIBRARY_SYNC_ASSET,
data: { id: assetId, importPaths, exclusionPatterns },
})),
);
await this.jobRepository.queue({
name: JobName.LIBRARY_SYNC_ASSETS,
data: { ids: assetIds, importPaths, exclusionPatterns },
});
}

private async validateImportPath(importPath: string): Promise<ValidateLibraryImportPathResponseDto> {
Expand Down Expand Up @@ -472,27 +470,35 @@ export class LibraryService extends BaseService {
return JobStatus.SUCCESS;
}

@OnJob({ name: JobName.LIBRARY_SYNC_ASSET, queue: QueueName.LIBRARY })
async handleSyncAsset(job: JobOf<JobName.LIBRARY_SYNC_ASSET>): Promise<JobStatus> {
const asset = await this.assetRepository.getById(job.id);
@OnJob({ name: JobName.LIBRARY_SYNC_ASSETS, queue: QueueName.LIBRARY })
async handleSyncAssets(job: JobOf<JobName.LIBRARY_SYNC_ASSETS>): Promise<JobStatus> {
for (const id of job.ids) {
await this.handleSyncAsset(id, job.importPaths, job.exclusionPatterns);
}

return JobStatus.SUCCESS;
}

private async handleSyncAsset(id: string, importPaths: string[], exclusionPatterns: string[]): Promise<JobStatus> {
const asset = await this.assetRepository.getById(id);
if (!asset) {
return JobStatus.SKIPPED;
}

const markOffline = async (explanation: string) => {
if (!asset.isOffline) {
this.logger.debug(`${explanation}, removing: ${asset.originalPath}`);
this.logger.debug(`${explanation}, moving to trash: ${asset.originalPath}`);
await this.assetRepository.updateAll([asset.id], { isOffline: true, deletedAt: new Date() });
}
};

const isInPath = job.importPaths.find((path) => asset.originalPath.startsWith(path));
const isInPath = importPaths.find((path) => asset.originalPath.startsWith(path));
if (!isInPath) {
await markOffline('Asset is no longer in an import path');
return JobStatus.SUCCESS;
}

const isExcluded = job.exclusionPatterns.some((pattern) => picomatch.isMatch(asset.originalPath, pattern));
const isExcluded = exclusionPatterns.some((pattern) => picomatch.isMatch(asset.originalPath, pattern));
if (isExcluded) {
await markOffline('Asset is covered by an exclusion pattern');
return JobStatus.SUCCESS;
Expand Down Expand Up @@ -597,12 +603,14 @@ export class LibraryService extends BaseService {
for await (const assets of onlineAssets) {
assetCount += assets.length;
this.logger.debug(`Discovered ${assetCount} asset(s) in library ${library.id}...`);
await this.jobRepository.queueAll(
assets.map((asset) => ({
name: JobName.LIBRARY_SYNC_ASSET,
data: { id: asset.id, importPaths: library.importPaths, exclusionPatterns: library.exclusionPatterns },
})),
);
await this.jobRepository.queue({
name: JobName.LIBRARY_SYNC_ASSETS,
data: {
ids: assets.map((asset) => asset.id),
importPaths: library.importPaths,
exclusionPatterns: library.exclusionPatterns,
},
});
this.logger.debug(`Queued check of ${assets.length} asset(s) in library ${library.id}...`);
}

Expand Down

0 comments on commit 8ecde3b

Please sign in to comment.