From 809d879a7af9c66ef552f498a9effa2881944f12 Mon Sep 17 00:00:00 2001 From: Tomer Eskenazi Date: Mon, 18 May 2020 21:54:08 +0300 Subject: [PATCH] authorization - gateway - Implement local policy attachment caching for all resource repositories --- services/src/gateway.ts | 5 +- .../modules/resource-repository/composite.ts | 13 ++ .../modules/resource-repository/fs.spec.ts | 126 ++++++++++++ .../src/modules/resource-repository/fs.ts | 84 ++++++++ .../modules/resource-repository/s3.spec.ts | 181 ++++++++++++++++++ .../src/modules/resource-repository/s3.ts | 108 ++++++++++- .../src/modules/resource-repository/types.ts | 2 + 7 files changed, 516 insertions(+), 3 deletions(-) create mode 100644 services/src/modules/resource-repository/fs.spec.ts create mode 100644 services/src/modules/resource-repository/s3.spec.ts diff --git a/services/src/gateway.ts b/services/src/gateway.ts index bdfb74e6..9f555203 100644 --- a/services/src/gateway.ts +++ b/services/src/gateway.ts @@ -16,12 +16,15 @@ import { async function run() { logger.info('Stitch gateway booting up...'); + const resourceRepository = getResourceRepository(); + const {server, dispose} = createStitchGateway({ - resourceGroups: pollForUpdates(getResourceRepository(), config.resourceUpdateInterval), + resourceGroups: pollForUpdates(resourceRepository, config.resourceUpdateInterval), tracing: config.enableGraphQLTracing, playground: config.enableGraphQLPlayground, introspection: config.enableGraphQLIntrospection, }); + await resourceRepository.initializePolicyAttachments(); const app = fastify(); app.register(fastifyMetrics, {endpoint: '/metrics'}); diff --git a/services/src/modules/resource-repository/composite.ts b/services/src/modules/resource-repository/composite.ts index cd817478..4ed8065a 100644 --- a/services/src/modules/resource-repository/composite.ts +++ b/services/src/modules/resource-repository/composite.ts @@ -20,4 +20,17 @@ export class CompositeResourceRepository implements ResourceRepository { async writePolicyAttachment(): Promise { throw new Error('Multiplexed resource repository cannot handle updates'); } + + public getPolicyAttachment(filename: string): Buffer { + for (const repo of this.repositories) { + const policyAttachment = repo.getPolicyAttachment(filename); + if (policyAttachment) return policyAttachment; + } + + throw new Error(`Policy attachment with the filename ${filename} was not found`); + } + + public async initializePolicyAttachments() { + await Promise.all(this.repositories.map(repo => repo.initializePolicyAttachments())); + } } diff --git a/services/src/modules/resource-repository/fs.spec.ts b/services/src/modules/resource-repository/fs.spec.ts new file mode 100644 index 00000000..10de6a33 --- /dev/null +++ b/services/src/modules/resource-repository/fs.spec.ts @@ -0,0 +1,126 @@ +import {promises as fs} from 'fs'; +import * as path from 'path'; +import {FileSystemResourceRepository} from './fs'; + +let repo: FileSystemResourceRepository; +const pathToResourcesFile = '/var/stitch/resources.json'; +const policyAttachmentsFolderPath = '/var/stitch/policy-attachments'; +let fsMocks: jest.Mock[]; + +beforeEach(() => { + repo = new FileSystemResourceRepository(pathToResourcesFile, policyAttachmentsFolderPath); + fsMocks = []; +}); + +afterEach(() => { + for (const mock of fsMocks) (mock as any).__restore(); +}); + +describe('shouldRefreshPolicyAttachment', () => { + let shouldRefreshPolicyAttachment: Function; + + beforeEach(() => { + shouldRefreshPolicyAttachment = repo['shouldRefreshPolicyAttachment'].bind(repo); + }); + + it('returns true if the given file does not currently exist in memory', () => { + repo['policyAttachmentsRefreshedAt'] = new Date(); + + const result = shouldRefreshPolicyAttachment({filename: 'file', updatedAt: minutesAgo(5)}); + expect(result).toBe(true); + }); + + it('returns true if this is the first refresh for this process', () => { + repo['policyAttachments']['file'] = Buffer.from('content'); + + const result = shouldRefreshPolicyAttachment({filename: 'file', updatedAt: minutesAgo(5)}); + expect(result).toBe(true); + }); + + it('returns true if the attachment was last updated after the last refresh', () => { + repo['policyAttachments']['file'] = Buffer.from('content'); + repo['policyAttachmentsRefreshedAt'] = minutesAgo(5); + + const result = shouldRefreshPolicyAttachment({filename: 'file', updatedAt: new Date()}); + expect(result).toBe(true); + }); + + it('returns false if the attachment was last updated before the last refresh', () => { + repo['policyAttachments']['file'] = Buffer.from('content'); + repo['policyAttachmentsRefreshedAt'] = new Date(); + + const result = shouldRefreshPolicyAttachment({filename: 'file', updatedAt: minutesAgo(5)}); + expect(result).toBe(false); + }); +}); + +describe('refreshPolicyAttachments', () => { + let refreshPolicyAttachments: Function; + + beforeEach(() => { + refreshPolicyAttachments = repo['refreshPolicyAttachments'].bind(repo); + }); + + it('refreshes the local copy of all policy attachments that should be refreshed', async () => { + repo['policyAttachments'] = { + upToDateFile: Buffer.from('up to date'), + needsUpdating: Buffer.from('needs updating'), + }; + repo['policyAttachmentsRefreshedAt'] = minutesAgo(5); + + const upToDateFilePath = path.resolve(policyAttachmentsFolderPath, 'upToDateFile'); + const needsUpdatingPath = path.resolve(policyAttachmentsFolderPath, 'needsUpdating'); + const newFilePath = path.resolve(policyAttachmentsFolderPath, 'newFile'); + + const readdirMock = mockFsFunction('readdir'); + readdirMock.mockReturnValue(Promise.resolve(['upToDateFile', 'needsUpdating', 'newFile'])); + + const statParamBasedReturnValues = { + [upToDateFilePath]: {mtime: minutesAgo(10)}, + [needsUpdatingPath]: {mtime: minutesAgo(2)}, + [newFilePath]: {mtime: minutesAgo(3)}, + }; + const statMock = mockFsFunction('stat'); + statMock.mockImplementation(filePath => Promise.resolve(statParamBasedReturnValues[filePath])); + + const readFileParamBasedReturnValues = { + [needsUpdatingPath]: Buffer.from('needs updating - updated'), + [newFilePath]: Buffer.from('new file'), + }; + const readFileMock = mockFsFunction('readFile'); + readFileMock.mockImplementation(filePath => Promise.resolve(readFileParamBasedReturnValues[filePath])); + + await refreshPolicyAttachments(); + + expect(repo['policyAttachments']).toMatchObject({ + upToDateFile: Buffer.from('up to date'), + needsUpdating: Buffer.from('needs updating - updated'), + newFile: Buffer.from('new file'), + }); + expect(repo['policyAttachmentsRefreshedAt'].getTime()).toBeGreaterThan(minutesAgo(1).getTime()); + + expect(readdirMock).toHaveBeenCalledTimes(1); + expect(readdirMock).toHaveBeenCalledWith(policyAttachmentsFolderPath); + + expect(statMock).toHaveBeenCalledTimes(3); + expect(statMock).toHaveBeenCalledWith(upToDateFilePath); + expect(statMock).toHaveBeenCalledWith(needsUpdatingPath); + expect(statMock).toHaveBeenCalledWith(newFilePath); + + expect(readFileMock).toHaveBeenCalledTimes(2); + expect(readFileMock).toHaveBeenCalledWith(needsUpdatingPath); + expect(readFileMock).toHaveBeenCalledWith(newFilePath); + }); +}); + +function mockFsFunction(functionName: string): jest.Mock { + const realFunction = (fs as any)[functionName]; + const mock: any = jest.fn(); + mock.__restore = () => ((fs as any)[functionName] = realFunction); + (fs as any)[functionName] = mock; + + fsMocks.push(mock); + return mock; +} + +const minutesAgo = (m: number) => new Date(Date.now() - m * 60000); diff --git a/services/src/modules/resource-repository/fs.ts b/services/src/modules/resource-repository/fs.ts index 020b239b..ace703d8 100644 --- a/services/src/modules/resource-repository/fs.ts +++ b/services/src/modules/resource-repository/fs.ts @@ -2,11 +2,16 @@ import {ResourceGroup, ResourceRepository} from '.'; import * as envVar from 'env-var'; import {promises as fs} from 'fs'; import * as path from 'path'; +import pLimit from 'p-limit'; +import * as config from '../config'; +import logger from '../logger'; import {FetchLatestResult} from './types'; export class FileSystemResourceRepository implements ResourceRepository { protected current?: {mtime: number; rg: ResourceGroup}; protected policyAttachmentsDirInitialized = false; + protected policyAttachments: {[filename: string]: Buffer} = {}; + protected policyAttachmentsRefreshedAt?: Date; constructor(protected pathToFile: string, protected policyAttachmentsFolderPath: string) {} @@ -35,6 +40,85 @@ export class FileSystemResourceRepository implements ResourceRepository { await fs.writeFile(filePath, content); } + public getPolicyAttachment(filename: string): Buffer { + return this.policyAttachments[filename]; + } + + public async initializePolicyAttachments() { + try { + await this.refreshPolicyAttachments(); + } catch (err) { + logger.fatal({err}, 'Failed fetching fs policy attachments on startup'); + throw err; + } + + setInterval(async () => { + try { + await this.refreshPolicyAttachments(); + } catch (err) { + logger.error( + {err}, + `Failed refreshing fs policy attachments, last successful refresh was at ${this.policyAttachmentsRefreshedAt}` + ); + } + }, config.resourceUpdateInterval); + } + + private async refreshPolicyAttachments() { + const newRefreshedAt = new Date(); + + const allAttachments = await this.getPolicyAttachmentsList(); + const attachmentsToRefresh = allAttachments + .filter(a => this.shouldRefreshPolicyAttachment(a)) + .map(a => a.filename); + + if (attachmentsToRefresh.length > 0) { + const newAttachments = await this.getPolicyAttachments(attachmentsToRefresh); + newAttachments.forEach(a => (this.policyAttachments[a.filename] = a.content)); + } + + this.policyAttachmentsRefreshedAt = newRefreshedAt; + } + + private shouldRefreshPolicyAttachment({filename, updatedAt}: {filename: string; updatedAt: Date}) { + if (!this.policyAttachments[filename]) return true; + if (!this.policyAttachmentsRefreshedAt) return true; + + return updatedAt > this.policyAttachmentsRefreshedAt; + } + + private async getPolicyAttachmentsList(): Promise<{filename: string; updatedAt: Date}[]> { + const filenames = await fs.readdir(this.policyAttachmentsFolderPath); + const limit = pLimit(10); + + return Promise.all( + filenames.map(filename => { + return limit(async () => { + const filePath = path.resolve(this.policyAttachmentsFolderPath, filename); + const stats = await fs.stat(filePath); + + const updatedAt = stats.mtime; + return {filename, updatedAt}; + }); + }) + ); + } + + private async getPolicyAttachments(filenames: string[]): Promise<{filename: string; content: Buffer}[]> { + const limit = pLimit(10); + + return Promise.all( + filenames.map(filename => { + return limit(async () => { + const filePath = path.resolve(this.policyAttachmentsFolderPath, filename); + const content = await fs.readFile(filePath); + + return {filename, content}; + }); + }) + ); + } + private async initializePolicyAttachmentsDir() { if (this.policyAttachmentsDirInitialized) return; diff --git a/services/src/modules/resource-repository/s3.spec.ts b/services/src/modules/resource-repository/s3.spec.ts new file mode 100644 index 00000000..52c12556 --- /dev/null +++ b/services/src/modules/resource-repository/s3.spec.ts @@ -0,0 +1,181 @@ +import {S3ResourceRepository} from './s3'; + +let repo: S3ResourceRepository; +let s3Mock: any; +const policyAttachmentsKeyPrefix = 'policyAttachments/'; +const bucketName = 'bucket'; + +beforeEach(() => { + s3Mock = { + getObject: jest.fn(() => ({promise: () => Promise.resolve()})), + listObjectsV2: jest.fn(() => ({promise: () => Promise.resolve()})), + }; + + repo = new S3ResourceRepository({ + s3: s3Mock as any, + bucketName, + objectKey: 'key', + policyAttachmentsKeyPrefix, + }); +}); + +describe('shouldRefreshPolicyAttachment', () => { + let shouldRefreshPolicyAttachment: Function; + + beforeEach(() => { + shouldRefreshPolicyAttachment = repo['shouldRefreshPolicyAttachment'].bind(repo); + }); + + it('returns true if the given file does not currently exist in memory', () => { + repo['policyAttachmentsRefreshedAt'] = new Date(); + + const result = shouldRefreshPolicyAttachment({filename: 'file', updatedAt: minutesAgo(5)}); + expect(result).toBe(true); + }); + + it('returns true if this is the first refresh for this process', () => { + repo['policyAttachments']['file'] = Buffer.from('content'); + + const result = shouldRefreshPolicyAttachment({filename: 'file', updatedAt: minutesAgo(5)}); + expect(result).toBe(true); + }); + + it('returns true if the attachment was last updated after the last refresh', () => { + repo['policyAttachments']['file'] = Buffer.from('content'); + repo['policyAttachmentsRefreshedAt'] = minutesAgo(5); + + const result = shouldRefreshPolicyAttachment({filename: 'file', updatedAt: new Date()}); + expect(result).toBe(true); + }); + + it('returns false if the attachment was last updated before the last refresh', () => { + repo['policyAttachments']['file'] = Buffer.from('content'); + repo['policyAttachmentsRefreshedAt'] = new Date(); + + const result = shouldRefreshPolicyAttachment({filename: 'file', updatedAt: minutesAgo(5)}); + expect(result).toBe(false); + }); +}); + +describe('refreshPolicyAttachments', () => { + let refreshPolicyAttachments: Function; + + beforeEach(() => { + refreshPolicyAttachments = repo['refreshPolicyAttachments'].bind(repo); + }); + + it('refreshes the local copy of all policy attachments that should be refreshed', async () => { + repo['policyAttachments'] = { + upToDateFile: Buffer.from('up to date'), + needsUpdating: Buffer.from('needs updating'), + }; + repo['policyAttachmentsRefreshedAt'] = minutesAgo(5); + + s3Mock.listObjectsV2 = jest.fn().mockReturnValue({ + promise: () => + Promise.resolve({ + Contents: [ + {Key: `${policyAttachmentsKeyPrefix}upToDateFile`, LastModified: minutesAgo(10)}, + {Key: `${policyAttachmentsKeyPrefix}needsUpdating`, LastModified: minutesAgo(2)}, + {Key: `${policyAttachmentsKeyPrefix}newFile`, LastModified: minutesAgo(3)}, + ], + IsTruncated: false, + }), + }); + + const paramBasedReturnValues = { + [`${policyAttachmentsKeyPrefix}needsUpdating`]: {Body: Buffer.from('needs updating - updated')}, + [`${policyAttachmentsKeyPrefix}newFile`]: {Body: Buffer.from('new file')}, + }; + s3Mock.getObject = jest.fn(({Key}) => ({ + promise: () => { + return Promise.resolve(paramBasedReturnValues[Key]); + }, + })); + + await refreshPolicyAttachments(); + + expect(repo['policyAttachments']).toMatchObject({ + upToDateFile: Buffer.from('up to date'), + needsUpdating: Buffer.from('needs updating - updated'), + newFile: Buffer.from('new file'), + }); + expect(repo['policyAttachmentsRefreshedAt'].getTime()).toBeGreaterThan(minutesAgo(1).getTime()); + + expect(s3Mock.listObjectsV2).toHaveBeenCalledTimes(1); + expect(s3Mock.listObjectsV2).toHaveBeenCalledWith({ + Bucket: bucketName, + MaxKeys: 1000, + Prefix: policyAttachmentsKeyPrefix, + }); + + expect(s3Mock.getObject).toHaveBeenCalledTimes(2); + expect(s3Mock.getObject).toHaveBeenCalledWith({ + Bucket: bucketName, + Key: `${policyAttachmentsKeyPrefix}needsUpdating`, + }); + expect(s3Mock.getObject).toHaveBeenCalledWith({ + Bucket: bucketName, + Key: `${policyAttachmentsKeyPrefix}newFile`, + }); + }); +}); + +describe('getPolicyAttachmentsList', () => { + let getPolicyAttachmentsList: Function; + + beforeEach(() => { + getPolicyAttachmentsList = repo['getPolicyAttachmentsList'].bind(repo); + }); + + it('correctly handles truncated results with a continuation token', async () => { + const expectedResult = [ + {filename: 'f1', updatedAt: minutesAgo(10)}, + {filename: 'f2', updatedAt: minutesAgo(7)}, + {filename: 'f3', updatedAt: minutesAgo(5)}, + ]; + + const listMock = jest + .fn() + .mockReturnValueOnce({ + promise: () => + Promise.resolve({ + Contents: [{Key: `${policyAttachmentsKeyPrefix}f1`, LastModified: expectedResult[0].updatedAt}], + IsTruncated: true, + ContinuationToken: 'ct1', + }), + }) + .mockReturnValueOnce({ + promise: () => + Promise.resolve({ + Contents: [{Key: `${policyAttachmentsKeyPrefix}f2`, LastModified: expectedResult[1].updatedAt}], + IsTruncated: true, + ContinuationToken: 'ct2', + }), + }) + .mockReturnValueOnce({ + promise: () => + Promise.resolve({ + Contents: [{Key: `${policyAttachmentsKeyPrefix}f3`, LastModified: expectedResult[2].updatedAt}], + IsTruncated: false, + }), + }); + s3Mock.listObjectsV2 = listMock; + + const result = await getPolicyAttachmentsList(); + + expect(result).toEqual(expectedResult); + + const listParams: any = { + Bucket: bucketName, + MaxKeys: 1000, + Prefix: policyAttachmentsKeyPrefix, + }; + expect(listMock).toHaveBeenCalledTimes(3); + expect(listMock).toHaveBeenNthCalledWith(1, listParams); + expect(listMock).toHaveBeenNthCalledWith(2, {...listParams, ContinuationToken: 'ct1'}); + expect(listMock).toHaveBeenNthCalledWith(3, {...listParams, ContinuationToken: 'ct2'}); + }); +}); + +const minutesAgo = (m: number) => new Date(Date.now() - m * 60000); diff --git a/services/src/modules/resource-repository/s3.ts b/services/src/modules/resource-repository/s3.ts index eefab919..26488005 100644 --- a/services/src/modules/resource-repository/s3.ts +++ b/services/src/modules/resource-repository/s3.ts @@ -1,5 +1,8 @@ import * as AWS from 'aws-sdk'; import * as envVar from 'env-var'; +import pLimit from 'p-limit'; +import * as config from '../config'; +import logger from '../logger'; import {ResourceRepository, ResourceGroup, FetchLatestResult} from './types'; interface S3ResourceRepositoryConfig { @@ -10,6 +13,8 @@ interface S3ResourceRepositoryConfig { } export class S3ResourceRepository implements ResourceRepository { protected current?: {etag?: string; rg: ResourceGroup}; + protected policyAttachments: {[filename: string]: Buffer} = {}; + protected policyAttachmentsRefreshedAt?: Date; constructor(protected config: S3ResourceRepositoryConfig) {} @@ -69,10 +74,109 @@ export class S3ResourceRepository implements ResourceRepository { .promise(); } + public getPolicyAttachment(filename: string): Buffer { + return this.policyAttachments[filename]; + } + + public async initializePolicyAttachments() { + try { + await this.refreshPolicyAttachments(); + } catch (err) { + logger.fatal({err}, 'Failed fetching s3 policy attachments on startup'); + throw err; + } + + setInterval(async () => { + try { + await this.refreshPolicyAttachments(); + } catch (err) { + logger.error( + {err}, + `Failed refreshing s3 policy attachments, last successful refresh was at ${this.policyAttachmentsRefreshedAt}` + ); + } + }, config.resourceUpdateInterval); + } + + private async refreshPolicyAttachments() { + const newRefreshedAt = new Date(); + + const allAttachments = await this.getPolicyAttachmentsList(); + const attachmentsToRefresh = allAttachments + .filter(a => this.shouldRefreshPolicyAttachment(a)) + .map(a => a.filename); + + if (attachmentsToRefresh.length > 0) { + const newAttachments = await this.getPolicyAttachments(attachmentsToRefresh); + newAttachments.forEach(a => (this.policyAttachments[a.filename] = a.content)); + } + + this.policyAttachmentsRefreshedAt = newRefreshedAt; + } + + private shouldRefreshPolicyAttachment({filename, updatedAt}: {filename: string; updatedAt: Date}) { + if (!this.policyAttachments[filename]) return true; + if (!this.policyAttachmentsRefreshedAt) return true; + + return updatedAt > this.policyAttachmentsRefreshedAt; + } + + private async getPolicyAttachmentsList(): Promise<{filename: string; updatedAt: Date}[]> { + const attachments: {filename: string; updatedAt: Date}[] = []; + let isTruncated = true; + let continuationToken; + + while (isTruncated) { + const params: any = { + Bucket: this.config.bucketName, + MaxKeys: 1000, + Prefix: this.config.policyAttachmentsKeyPrefix, + }; + if (continuationToken) params['ContinuationToken'] = continuationToken; + + const listResult = await this.config.s3.listObjectsV2(params).promise(); + const keys = listResult.Contents || []; + const newAttachments = keys.map(k => ({ + filename: this.getPolicyAttachmentFilenameByKey(k.Key!), + updatedAt: k.LastModified!, + })); + attachments.push(...newAttachments); + + isTruncated = listResult.IsTruncated!; + if (isTruncated) continuationToken = listResult.ContinuationToken; + } + + return attachments; + } + + private async getPolicyAttachments(filenames: string[]): Promise<{filename: string; content: Buffer}[]> { + const limit = pLimit(10); + + return Promise.all( + filenames.map(filename => { + return limit(async () => { + const key = this.getPolicyAttachmentKey(filename); + const params = {Bucket: this.config.bucketName, Key: key}; + const res = await this.config.s3.getObject(params).promise(); + + return {filename, content: res.Body as Buffer}; + }); + }) + ); + } + private getPolicyAttachmentKey(filename: string) { + return `${this.getPolicyAttachmentPrefix()}${filename}`; + } + + private getPolicyAttachmentPrefix() { return this.config.policyAttachmentsKeyPrefix.endsWith('/') - ? `${this.config.policyAttachmentsKeyPrefix}${filename}` - : `${this.config.policyAttachmentsKeyPrefix}/${filename}`; + ? this.config.policyAttachmentsKeyPrefix + : `${this.config.policyAttachmentsKeyPrefix}/`; + } + + private getPolicyAttachmentFilenameByKey(key: string) { + return key.replace(this.getPolicyAttachmentPrefix(), ''); } static fromEnvironment() { diff --git a/services/src/modules/resource-repository/types.ts b/services/src/modules/resource-repository/types.ts index 55f6250b..59680602 100644 --- a/services/src/modules/resource-repository/types.ts +++ b/services/src/modules/resource-repository/types.ts @@ -74,6 +74,8 @@ export interface ResourceRepository { fetchLatest(): Promise; update(rg: ResourceGroup): Promise; writePolicyAttachment(filename: string, content: Buffer): Promise; + getPolicyAttachment(filename: string): Buffer; + initializePolicyAttachments(): Promise; } enum AuthType {