Skip to content

Commit

Permalink
feat: synchronize stac TDE-725 (#417)
Browse files Browse the repository at this point in the history
add a file hash to the object metadata on AWS S3 to determine if file
has been modified

Has been tested here
https://github.com/linz/imagery-test/actions/runs/4909985617/jobs/8766726824
  • Loading branch information
paulfouquet authored and amfage committed May 17, 2023
1 parent 1a76511 commit 0402754
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 5 deletions.
12 changes: 7 additions & 5 deletions src/commands/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,23 @@ import { commandCreateManifest } from './create-manifest/create-manifest.js';
import { commandLdsFetch } from './lds-cache/lds.cache.js';
import { commandList } from './list/list.js';
import { commandStacCatalog } from './stac-catalog/stac.catalog.js';
import { commandStacSync } from './stac-sync/stac.sync.js';
import { commandStacValidate } from './stac-validate/stac.validate.js';
import { commandTileSetValidate } from './tileset-validate/tileset.validate.js';

export const cmd = subcommands({
name: 'argo-tasks',
description: 'Utility tasks for argo',
cmds: {
'lds-fetch-layer': commandLdsFetch,
ls: commandList,
list: commandList,
'stac-validate': commandStacValidate,
copy: commandCopy,
'create-manifest': commandCreateManifest,
flatten: commandCreateManifest,
'tileset-validate': commandTileSetValidate,
'lds-fetch-layer': commandLdsFetch,
list: commandList,
ls: commandList,
'stac-catalog': commandStacCatalog,
'stac-sync': commandStacSync,
'stac-validate': commandStacValidate,
'tileset-validate': commandTileSetValidate,
},
});
42 changes: 42 additions & 0 deletions src/commands/stac-sync/__test__/stac.sync.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import o from 'ospec';
import { FsMemory } from '@chunkd/source-memory';
import { fsa } from '@chunkd/fs';
import { HashKey, synchroniseFiles } from '../stac.sync.js';
import { createHash } from 'crypto';

o.spec('stacSync', () => {
const fs = new FsMemory();
o.beforeEach(() => {
fs.files.clear();
fsa.register('m://', fs);
});

o('shouldUploadFile', async () => {
await fs.write(
'm://source/stac/wellington/collection.json',
JSON.stringify({ title: 'Wellington Collection', description: 'abcd' }),
);
await fs.write(
'm://destination/stac/wellington/collection.json',
JSON.stringify({ title: 'Wellington Collection', description: 'abc' }),
);
const destinationURL = new URL('m://destination/stac/');
o(await synchroniseFiles('m://source/stac/', destinationURL)).equals(1);
});

o('shouldNotUploadFile', async () => {
await fs.write(
'm://source/stac/wellington/collection.json',
JSON.stringify({ title: 'Wellington Collection', description: 'abcd' }),
);
const sourceData = await fsa.read('m://source/stac/wellington/collection.json');
const sourceHash = '1220' + createHash('sha256').update(sourceData).digest('hex');
await fs.write(
'm://destination/stac/wellington/collection.json',
JSON.stringify({ title: 'Wellington Collection', description: 'abcd' }),
{ metadata: { [HashKey]: sourceHash } },
);
const destinationURL = new URL('m://destination/stac/');
o(await synchroniseFiles('m://source/stac/', destinationURL)).equals(0);
});
});
81 changes: 81 additions & 0 deletions src/commands/stac-sync/stac.sync.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import { fsa } from '@chunkd/fs';
import { command, positional, string, Type } from 'cmd-ts';
import { logger } from '../../log.js';
import { config, registerCli, verbose } from '../common.js';
import { createHash } from 'crypto';
import { FileInfo } from '@chunkd/core';

const S3Path: Type<string, URL> = {
async from(str) {
if (!str.startsWith('s3://')) throw new Error('Path is not S3');
return new URL(str);
},
};

export const commandStacSync = command({
name: 'stac-sync',
description: 'Sync STAC files',
args: {
config,
verbose,
sourcePath: positional({ type: string, description: 'Location of the source STAC to synchronise from' }),
destinationPath: positional({
type: S3Path,
description: 'Location of the destination STAC in S3 to synchronise to',
}),
},

handler: async (args) => {
registerCli(args);
logger.info({ source: args.sourcePath, destination: args.destinationPath }, 'StacSync:Start');
const nb = await synchroniseFiles(args.sourcePath, args.destinationPath);
logger.info({ copied: nb }, 'StacSync:Done');
},
});

/** Key concatenated to 'x-amz-meta-' */
export const HashKey = 'linz-hash';

/**
* Synchronise STAC (JSON) files from a path to another.
*
* @param sourcePath where the source files are
* @param destinationPath S3 path where the files need to be synchronized
* @returns the number of files copied over
*/
export async function synchroniseFiles(sourcePath: string, destinationPath: URL): Promise<number> {
let count = 0;
const sourceFilesInfo = await fsa.toArray(fsa.details(sourcePath));

await Promise.all(
sourceFilesInfo.map(async (fileInfo) => {
if (!fileInfo.path.endsWith('.json')) return;

const key = new URL(fileInfo.path.slice(sourcePath.length), destinationPath);
(await uploadFileToS3(fileInfo, key)) && count++;
}),
);

return count;
}

/**
* Upload a file to the destination if the same version (matched hash) does not exist.
*
* @param fileData source file data
* @param bucket destination bucket
* @param key destination key
* @returns
*/
export async function uploadFileToS3(sourceFileInfo: FileInfo, path: URL): Promise<boolean> {
const destinationHead = await fsa.head(path.href);
const sourceData = await fsa.read(sourceFileInfo.path);
const sourceHash = '1220' + createHash('sha256').update(sourceData).digest('hex');
if (destinationHead?.size === sourceFileInfo.size && sourceHash === destinationHead?.metadata?.[HashKey]) {
return false;
}

await fsa.write(path.href, sourceData, { metadata: { [HashKey]: sourceHash } });
logger.debug({ path: path.href }, 'StacSync:FileUploaded');
return true;
}

0 comments on commit 0402754

Please sign in to comment.