Skip to content

Commit

Permalink
fix: merging slices of export to file
Browse files Browse the repository at this point in the history
  • Loading branch information
JakubMatejka committed Dec 6, 2019
1 parent e20f7e1 commit f887650
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 6 deletions.
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@keboola/storage-api-js-client",
"version": "2.6.0",
"version": "2.6.1",
"description": "Javascript client for Keboola Storage API",
"repository": "https://github.com/keboola/storage-api-js-client",
"author": "Jakub Matejka <jakub@keboola.com>",
Expand Down Expand Up @@ -33,6 +33,7 @@
"eslint-plugin-flowtype": "^4.5.2",
"eslint-plugin-flowtype-errors": "^4.1.0",
"eslint-plugin-import": "^2.14.0",
"fast-csv": "^3.4.0",
"flow-bin": "^0.112.0",
"flow-copy-source": "^2.0.2",
"mocha": "^6.1.4",
Expand Down
12 changes: 11 additions & 1 deletion src/Tables.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ import _ from 'lodash';
import aws from 'aws-sdk';
import axios from 'axios';
import axiosRetry from 'axios-retry';
import { execSync } from 'child_process';
import parse from 'csv-parse/lib/sync';
import fs from 'fs';
import os from 'os';
import qs from 'qs';
import Storage from './Storage';

Expand Down Expand Up @@ -138,12 +140,18 @@ export default class Tables {
sessionToken: file.credentials.SessionToken,
});

const tempDir = `${os.tmpdir()}/storage-${Date.now()}-${_.random(1000, 9999)}`;
fs.mkdirSync(tempDir);

let i = 0;
await Promise.all(_.map(slices, (sliceUrl) => {
const current = i;
i += 1;
const objectRequest = s3.getObject({
Bucket: file.s3Path.bucket,
Key: sliceUrl.substr(sliceUrl.indexOf('/', 5) + 1),
});
const outStream = fs.createWriteStream(filePath, { flags: 'a' });
const outStream = fs.createWriteStream(`${tempDir}/${current}`);
const readStream = objectRequest.createReadStream();
readStream.on('error', (err) => {
outStream.emit('S3 Download Error', err);
Expand All @@ -162,6 +170,8 @@ export default class Tables {
});
}));

execSync(`cat ${tempDir}/* > ${filePath}`);
execSync(`rm -rf ${tempDir}`);
return Promise.resolve();
}

Expand Down
21 changes: 17 additions & 4 deletions test/Tables.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ import Storage from '../src/Storage';

const _ = require('lodash');
const aws = require('aws-sdk');
const csv = require('fast-csv');
const fs = require('fs');
const expect = require('unexpected');
const os = require('os');
const { execSync } = require('child_process');
const stream = require('stream');
const util = require('util');

Expand Down Expand Up @@ -164,7 +164,7 @@ describe('Storage.Tables', () => {
const f = new FileStream(inFilePath);
await f.writeRow('"id","name","price","date","info","category"');
const longString = 'a'.repeat(10000);
for (let i = 1; i <= 10000; i += 1) {
for (let i = 1; i <= 50000; i += 1) {
// eslint-disable-next-line no-await-in-loop
await f.writeRow(`"r${i}","Product ${i}","${i}","2016-04-02 12:00:12","${longString}","c2"`);
}
Expand All @@ -176,8 +176,21 @@ describe('Storage.Tables', () => {
const outFilePath = `${tempFilePath}.out`;
await storage.Tables.exportToFile(tableId, {}, outFilePath);
expect(fs.existsSync(outFilePath), 'to be ok');
// Check proper number of rows
expect(execSync(`wc -l ${outFilePath} | awk '{print $1}'`, { encoding: 'utf-8' }).trim(), 'to be', '10000');

// Parse every row with csv parser to check that the file is well formed
// Check rows count
let rowsCount = 0;
await new Promise((res, rej) => {
fs.createReadStream(outFilePath)
.pipe(csv.parse({ headers: ['id', 'name', 'price', 'date', 'info', 'category'] }))
.on('data', (row) => {
rowsCount += 1;
expect(_.size(row), 'to be', 6);
})
.on('end', () => res())
.on('error', (err) => rej(err));
});
expect(rowsCount, 'to be', 50000);
});

it('delete', async () => {
Expand Down

0 comments on commit f887650

Please sign in to comment.