Skip to content

Commit

Permalink
Merge pull request #58 from koopjs/b/10625-hub-feeds-error
Browse files Browse the repository at this point in the history
B/10625 hub feeds error
  • Loading branch information
sansth1010 authored Jun 5, 2024
2 parents ce57d73 + d931ecd commit 4793e7e
Show file tree
Hide file tree
Showing 6 changed files with 210 additions and 30 deletions.
30 changes: 30 additions & 0 deletions src/helpers/enrich-dataset.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -515,4 +515,34 @@ describe('enrichDataset function', () => {
expect(geojson.properties).toBeDefined();
expect(geojsonValidation.isFeature(geojson)).toBe(true);
});

it('should set issuedDateTime as undefined if hub dataset created field is undefined', () => {
const hubDataset = {
id: 'foo',
access: 'public',
size: 1,
type: 'CSV',
created: undefined
};

const { properties } = enrichDataset(hubDataset,
{ siteUrl: 'arcgis.com', portalUrl: 'portal.com', orgBaseUrl: 'qa.arcgis.com', orgTitle: "QA Premium Alpha Hub" });
expect(properties).toBeDefined()
expect(properties.issuedDateTime).toBeUndefined()
});

it('should set issuedDateTime as undefined if hub dataset created field contains invalid value', () => {
const hubDataset = {
id: 'foo',
access: 'public',
size: 1,
type: 'CSV',
created: 'invalid-string'
};

const { properties } = enrichDataset(hubDataset,
{ siteUrl: 'arcgis.com', portalUrl: 'portal.com', orgBaseUrl: 'qa.arcgis.com', orgTitle: "QA Premium Alpha Hub" });
expect(properties).toBeDefined()
expect(properties.issuedDateTime).toBeUndefined()
});
})
16 changes: 13 additions & 3 deletions src/helpers/enrich-dataset.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ export function enrichDataset(dataset: HubDataset, hubsite: HubSite): Feature {
} as UserSession) + '?f=json',
language: _.get(dataset, 'metadata.metadata.dataIdInfo.dataLang.languageCode.@_value') || localeToLang(dataset.culture) || '',
keyword: getDatasetKeyword(dataset),
issuedDateTime: _.get(dataset, 'metadata.metadata.dataIdInfo.idCitation.date.pubDate') || new Date(dataset.created).toISOString(),
issuedDateTime: _.get(dataset, 'metadata.metadata.dataIdInfo.idCitation.date.pubDate') || timestampToIsoDate(dataset.created),
orgTitle,
provenance: _.get(dataset, 'metadata.metadata.dataIdInfo.idCredit', ''),
hubLandingPage: concatUrlAndPath(siteUrl, relative.slice(1)),
Expand All @@ -86,7 +86,7 @@ export function enrichDataset(dataset: HubDataset, hubsite: HubSite): Feature {
additionalFields.accessUrlKML = downloadLinkFor('kml');
additionalFields.durableUrlKML = generateDurableDownloadUrl(dataset.id, siteUrl, 'kml');
additionalFields.accessUrlShapeFile = downloadLinkFor('zip');
additionalFields.durableUrlShapeFile= generateDurableDownloadUrl(dataset.id, siteUrl, 'shapefile');
additionalFields.durableUrlShapeFile = generateDurableDownloadUrl(dataset.id, siteUrl, 'shapefile');
}
}

Expand Down Expand Up @@ -223,4 +223,14 @@ function objectWithoutKeys(obj, keys): Record<string, any> {
if (keys.indexOf(key) === -1) newObject[key] = obj[key];
return newObject;
}, {});
}
}

function timestampToIsoDate (val: number): string {
if (_.isNil(val)) return undefined;

const date = new Date(val);
if (date instanceof Date && !isNaN(date.valueOf())) {
return date.toISOString();
}
return undefined;
}
104 changes: 91 additions & 13 deletions src/model.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -697,7 +697,6 @@ describe('HubApiModel', () => {
}
});


it('should generate orgBaseUrl if qa portal url is supplied', async () => {
// Setup
const terms = faker.random.words();
Expand Down Expand Up @@ -761,7 +760,6 @@ describe('HubApiModel', () => {
}
});


it('can handle a request with a valid group', async () => {
// Setup
const terms = faker.random.words();
Expand Down Expand Up @@ -1143,7 +1141,6 @@ describe('HubApiModel', () => {
expect(err.status).toEqual(404);
}
});


it('should throw 400 error if fetchSiteModel fails because site is private', async () => {
// Setup
Expand Down Expand Up @@ -1249,8 +1246,6 @@ describe('HubApiModel', () => {
}
});



it('can handle a request with a valid orgid IContentFilter', async () => {
// Setup
const terms = faker.random.words();
Expand Down Expand Up @@ -1317,8 +1312,6 @@ describe('HubApiModel', () => {
}
});



it('throws error with an empty request', async () => {
// Setup
const model = new HubApiModel();
Expand Down Expand Up @@ -1942,7 +1935,6 @@ describe('HubApiModel', () => {
expect(actualResponses).toHaveLength(batches * pagesPerBatch * resultsPerPage);
});


it('does not fetch the provided site\'s catalog if group and orgid are explicitly provided', async () => {
// Setup
const terms = faker.random.words()
Expand Down Expand Up @@ -2009,9 +2001,7 @@ describe('HubApiModel', () => {
expect(actualResponses).toHaveLength(batches * pagesPerBatch * resultsPerPage);
});



it('stops streaming and throws error if underlying paging stream throws error', async () => {
it('stops non-sequential stream and throws error if underlying paging stream throws error', async () => {
// Setup
const terms = faker.random.words();
const id = faker.datatype.uuid();
Expand Down Expand Up @@ -2105,8 +2095,6 @@ describe('HubApiModel', () => {
}
});



it('getData function does nothing', () => {
// Setup
const model = new HubApiModel();
Expand All @@ -2117,5 +2105,95 @@ describe('HubApiModel', () => {
expect(data).toBeUndefined();
});

it('stops sequential stream and emits error if underlying paging stream throws error', async () => {
// Setup
const terms = faker.random.words();
const id = faker.datatype.uuid();
const model = new HubApiModel();

const searchRequestBody: IContentSearchRequest = {
filter: {
terms,
id
},
options: {
portal: 'https://qaext.arcgis.com',
sortField: 'Date Created|created|modified',
}
};
const req = {
app: { locals: { arcgisPortal: 'https://devext.arcgis.com' } },
res: {
locals: {
searchRequestBody
}
},
query: {}
} as unknown as Request;

// Mock
const batches = 3;
const pagesPerBatch = 2;
const resultsPerPage = 3

const mockedResponses = new Array(batches).fill(null).map(() => {
return new Array(pagesPerBatch).fill(null).map(() => {
return new Array(resultsPerPage).fill(null).map(() => ({
id: faker.datatype.uuid()
}));
});
});

const mockedPagingStreams = mockedResponses.map((batchPages: any[], index: number) => {
let currPage = 0;
return new PagingStream({
firstPageParams: {},
getNextPageParams: () => {
if (currPage >= batchPages.length) {
return null
} else {
return () => batchPages[currPage++];
}
},
loadPage: async (params) => {
if (index === 0 && currPage === 0) {
throw new Error('Error fetching data!')
} else if (typeof params === 'function') {
return params()
} else {
return batchPages[currPage++]
}
},
streamPage: (response, push) => {
response.forEach(result => push(result));
}
})
});

mockGetBatchStreams.mockResolvedValueOnce(mockedPagingStreams);

const actualResponses = [];

// Test and Assert
try {
const stream = await model.getStream(req);
const pass = new PassThrough({ objectMode: true });
pass.on('data', data => {
actualResponses.push(data);
});

pass.on('error', err => {
expect(err.message).toEqual('Error fetching data!');
});
const pipe = promisify(pipeline);

await pipe(stream, pass);

fail('Should never reach here')
} catch (err) {
expect(err.message).toEqual('Error fetching data!');
expect(mockGetBatchStreams).toHaveBeenCalledTimes(1);
}
});

});
6 changes: 4 additions & 2 deletions src/model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,12 @@ export class HubApiModel {
}

for (const stream of sources) {
await new Promise((resolve, reject) => {
await new Promise((resolve) => {
stream.pipe(destination, { end: false });
stream.on('end', resolve);
stream.on('error', reject);
stream.on('error', (err) => {
destination.emit('error', err);
});
});
}
destination.emit('end');
Expand Down
63 changes: 62 additions & 1 deletion src/paging-stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ describe('paging stream', () => {
}));
});

it('destroys stream if error occurs', () => {
it('destroys stream if error occurs when loading a page results', () => {
const requestError = new Error('REQUEST FAILED');
loadPageSpy.mockRejectedValue(requestError);
streamPageSpy.mockImplementation((response, push) => response.data.forEach(push));
Expand Down Expand Up @@ -209,4 +209,65 @@ describe('paging stream', () => {
}
}));
});

it('destroys stream if error occurs streaming page', () => {
const streamError = new Error('STREAM FAILED');
loadPageSpy.mockResolvedValue({data: {itemid: '123s'}, links: { next: 'https://hub.arcgis.com/next'}});
streamPageSpy.mockImplementation((_response, _push) => { throw streamError; });
getNextPageParamsSpy.mockImplementation(response => {
return response.links.next
} );

const stream = new PagingStream({
firstPageParams: null,
loadPage: loadPageSpy,
streamPage: streamPageSpy,
getNextPageParams: getNextPageParamsSpy
});

stream.on('data', () => {
throw Error('Stream should not emit data after erroring!')
});

return new Promise((resolve, reject) => stream.on('error', (err) => {
try {
expect(err).toEqual(streamError);
expect(streamPageSpy).toHaveBeenCalled();
expect(getNextPageParamsSpy).toHaveBeenCalled();
resolve('Test Complete');
} catch (err) {
reject(err);
}
}));
});

it('destroys stream if error occurs when getting next page params', () => {
const nextPageError = new Error('PAGING FAILED');
loadPageSpy.mockResolvedValue({data: {itemid: '123s'}, links: { next: 'https://hub.arcgis.com/next'}});
streamPageSpy.mockImplementation((response, push) => response.data.forEach(push));

getNextPageParamsSpy.mockImplementation(_response => { throw nextPageError; } );

const stream = new PagingStream({
firstPageParams: null,
loadPage: loadPageSpy,
streamPage: streamPageSpy,
getNextPageParams: getNextPageParamsSpy
});

stream.on('data', () => {
throw Error('Stream should not emit data after erroring!')
});

return new Promise((resolve, reject) => stream.on('error', (err) => {
try {
expect(err).toEqual(nextPageError);
expect(streamPageSpy).not.toHaveBeenCalled();
expect(getNextPageParamsSpy).toHaveBeenCalled();
resolve('Test Complete');
} catch (err) {
reject(err);
}
}));
});
});
21 changes: 10 additions & 11 deletions src/paging-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,22 +32,21 @@ export class PagingStream extends Readable {
this._pageLimit = pageLimit;
}

async _read () {
let response: any;
async _read() {
try {
response = await this._loadPage(this._nextPageParams);
const response = await this._loadPage(this._nextPageParams);
this._currPage++;
} catch (err) {
this.destroy(err);
return;
}

this._nextPageParams = this._getNextPageParams(response);
this._nextPageParams = this._getNextPageParams(response);

this._streamPage(response, this.push.bind(this));
this._streamPage(response, this.push.bind(this));

if (!this._nextPageParams || this._currPage >= this._pageLimit) {
this.push(null);
if (!this._nextPageParams || this._currPage >= this._pageLimit) {
this.push(null);
}
} catch (err) {
this.destroy(err);
return;
}
}
}

0 comments on commit 4793e7e

Please sign in to comment.