Skip to content

Commit

Permalink
feat: add uploadFiles implementation for node and web
Browse files Browse the repository at this point in the history
  • Loading branch information
dtfiedler committed Aug 28, 2023
1 parent e553e47 commit 42c2ed9
Show file tree
Hide file tree
Showing 12 changed files with 1,167 additions and 158 deletions.
Empty file added examples/node/files/0_kb.txt
Empty file.
14 changes: 14 additions & 0 deletions examples/node/index.mjs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import Arweave from 'arweave';
import fs from 'fs';

import { TurboFactory } from '../../lib/index.js';

Expand Down Expand Up @@ -38,4 +39,17 @@ import { TurboFactory } from '../../lib/index.js';
currency: 'usd',
});
console.log('10 USD to winc:', estimatedWinc);

/**
* Post some local data items to the Turbo service.
*/
console.log('Posting data items to Turbo service...');
const files = [new URL('files/0_kb.txt', import.meta.url).pathname];
const fileStreamGenerator = files.map(
(dataItem) => () => fs.createReadStream(dataItem),
);
const uploadResult = await turboAuthClient.uploadFiles({
fileStreamGenerator: fileStreamGenerator,
});
console.log(JSON.stringify(uploadResult, null, 2));
})();
75 changes: 68 additions & 7 deletions examples/web/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,39 @@ <h1>Rates</h1>
<div>
<h1>Balance</h1>
<span id="balance">Fetching balance for generated wallet...</span>
</div>
<h1>Upload File</h1>

<form
id="uploadForm"
action="/upload"
method="post"
enctype="multipart/form-data"
>
<label for="file">Choose file to upload:</label>
<input type="file" id="file" name="file" />
<br /><br />
<input type="submit" value="Upload File" />
<br /><br />
<span id="uploadStatus"></span>
</form>
</div>
<script type="module">
import { TurboFactory } from './web.bundle.min.js';

// set up our factory
const turbo = TurboFactory.init();
/**
* Set up our authenticated client factory
*/
const arweave = new Arweave({
host: 'ar-io.dev',
port: 443,
protocol: 'https',
});
const jwk = await arweave.crypto.generateJWK();
const address = await arweave.wallets.jwkToAddress(jwk);
const turbo = TurboFactory.init({
privateKey: jwk,
});

// print the client to the console
console.log(turbo);
Expand All @@ -42,16 +69,50 @@ <h1>Balance</h1>
.join('\n');
document.getElementById('rates').innerText = ratesString;

const arweave = Arweave.init();
const jwk = await Arweave.crypto.generateJWK();
const address = await arweave.wallets.jwkToAddress(jwk);
const turboAuth = TurboFactory.init({ privateKey: jwk });
const balance = await turboAuth.getBalance();
/**
* Fetch wallet balance.
*/
const balance = await turbo.getBalance();
document.getElementById('balance').innerText = JSON.stringify(
balance,
null,
2,
);

/**
* Handle file uploads
*/
document
.getElementById('uploadForm')
.addEventListener('submit', function (e) {
e.preventDefault(); // Stop the form from submitting

const fileInput = document.getElementById('file');
const selectedFiles = fileInput.files;

if (selectedFiles.length) {
const blobFileGenerators = Object.values(selectedFiles).map(
(file) => () => file.stream(),
);
const response = turbo
.uploadFiles({
fileStreamGenerator: blobFileGenerators,
})
.then((res) => {
console.log('Successfully uploaded files!', res);
document.getElementById(
'uploadStatus',
).innerText = `Successfully uploaded files! ${JSON.stringify(
res,
null,
2,
)}`;
})
.catch((err) => console.log('Error uploading file!', err));
} else {
console.log('No file selected');
}
});
</script>
</body>
</html>
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
"example:web": "yarn build:web && cp -r bundles/* examples/web && http-server --port 8080 --host -o examples/web"
},
"dependencies": {
"arbundles": "^0.9.9",
"arweave": "^1.14.4",
"axios": "^1.4.0",
"retry-axios": "^3.0.0",
Expand Down
16 changes: 16 additions & 0 deletions src/common/turbo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
import { Readable } from 'stream';

import { TurboNodeUploadService } from '../node/upload.js';
import { JWKInterface, TurboBalanceResponse } from '../types/index.js';
import {
Expand All @@ -26,6 +28,7 @@ import {
TurboPaymentService,
TurboPriceResponse,
TurboRatesResponse,
TurboUploadDataItemsResponse,
TurboUploadService,
} from '../types/index.js';
import { TurboDefaultPaymentService } from './payment.js';
Expand Down Expand Up @@ -114,4 +117,17 @@ export class TurboClient implements Turbo {
async getBalance(): Promise<TurboBalanceResponse> {
return this.paymentService.getBalance();
}

/**
* Signs and uploads data to the upload service.
*/
async uploadFiles({
fileStreamGenerator,
bundle = false,
}: {
fileStreamGenerator: (() => Readable)[] | (() => ReadableStream)[];
bundle?: boolean;
}): Promise<TurboUploadDataItemsResponse> {
return this.uploadService.uploadFiles({ fileStreamGenerator, bundle });
}
}
112 changes: 108 additions & 4 deletions src/node/upload.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,117 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
import { ArweaveSigner, streamSigner } from 'arbundles';
import { AxiosInstance, AxiosResponse } from 'axios';
import { Readable } from 'stream';

import { JWKInterface } from '../types/arweave.js';
import {
TransactionId,
TurboUploadDataItemResponse,
TurboUploadDataItemsResponse,
TurboUploadService,
TurboUploadServiceConfiguration,
} from '../types/index.js';
} from '../types/turbo.js';
import { createAxiosInstance } from '../utils/axiosClient.js';
import { jwkToPublicArweaveAddress } from '../utils/base64.js';
import { UnauthenticatedRequestError } from '../utils/errors.js';

/* eslint-disable */
export class TurboNodeUploadService implements TurboUploadService {
// @ts-ignore
constructor(config: TurboUploadServiceConfiguration) {}
protected axios: AxiosInstance;
protected privateKey: JWKInterface | undefined;

constructor({
url = 'https://upload.ardrive.dev',
privateKey,
retryConfig,
}: TurboUploadServiceConfiguration) {
this.axios = createAxiosInstance({
axiosConfig: {
baseURL: `${url}/v1`,
},
retryConfig,
});
this.privateKey = privateKey;
}

async uploadFiles({
fileStreamGenerator,
bundle = false, // TODO: add bundle param to allow for creating BDI of data items
}: {
fileStreamGenerator: (() => Readable)[];
bundle?: boolean;
}): Promise<TurboUploadDataItemsResponse> {
if (!this.privateKey) {
throw new UnauthenticatedRequestError();
}

if (bundle) {
console.log('Data items will be bundled.', fileStreamGenerator);
}

const signer = new ArweaveSigner(this.privateKey);

const signedDataItemPromises = fileStreamGenerator.map(
(fileStreamGenerator) => {
const [stream1, stream2] = [
fileStreamGenerator(),
fileStreamGenerator(),
];
// TODO: this will not work with BDIs as is, we may need to add an additional stream signer
return streamSigner(stream1, stream2, signer);
},
);

// TODO: we probably don't want to Promise.all, do .allSettled and only return successful signed data items
const signedDataItems = await Promise.all(signedDataItemPromises);

// TODO: add p-limit constraint
const uploadPromises = signedDataItems.map((signedDataItem) => {
return this.axios.post<TurboUploadDataItemResponse>(
`/tx`,
signedDataItem,
{
headers: {
'content-type': 'application/octet-stream',
},
},
);
});

const dataItemResponses = await Promise.allSettled(uploadPromises);
const postedDataItems = dataItemResponses.reduce(
(
postedDataItemsMap: Record<
TransactionId,
Omit<TurboUploadDataItemResponse, 'id'>
>,
dataItemResponse:
| PromiseFulfilledResult<
AxiosResponse<TurboUploadDataItemResponse, 'id'>
>
| PromiseRejectedResult,
) => {
// NOTE: with validateStatus set to true on the axios config we could use Promise.all and remove this check
if (dataItemResponse.status === 'rejected') {
return postedDataItemsMap;
}
// handle the fulfilled response
const { status, data } = dataItemResponse.value;
if (![200, 202].includes(status)) {
// TODO: add to failed data items array
return postedDataItemsMap;
}
const { id, ...dataItemCache } = data;
postedDataItemsMap[id] = dataItemCache;
return postedDataItemsMap;
},
{},
);

return {
ownerAddress: jwkToPublicArweaveAddress(this.privateKey),
dataItems: postedDataItems,
};
}
}
23 changes: 21 additions & 2 deletions src/types/turbo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
import { RetryConfig } from 'retry-axios';
import { Readable } from 'stream';
import winston from 'winston';

import { JWKInterface } from './arweave.js';
Expand Down Expand Up @@ -51,6 +52,17 @@ export type TurboCurrenciesResponse = {
supportedCurrencies: Currency[];
limits: Record<Currency, CurrencyLimit>;
};
export type TurboUploadDataItemResponse = {
byteCount: number;
dataCaches: string[];
fastFinalityIndexes: string[];
id: TransactionId;
};

export type TurboUploadDataItemsResponse = {
ownerAddress: UserAddress;
dataItems: Record<string, Omit<TurboUploadDataItemResponse, 'id'>>;
};

export type TurboSignedRequestHeaders = {
'x-public-key': string;
Expand Down Expand Up @@ -108,7 +120,14 @@ export interface TurboPaymentService
extends AuthenticatedTurboPaymentService,
UnauthenticatedTurboPaymentService {}

// eslint-disable-next-line @typescript-eslint/no-empty-interface
export interface TurboUploadService {}
export interface TurboUploadService {
uploadFiles({
fileStreamGenerator,
bundle,
}: {
fileStreamGenerator: (() => Readable)[] | (() => ReadableStream)[];
bundle?: boolean;
}): Promise<TurboUploadDataItemsResponse>;
}

export interface Turbo extends TurboPaymentService, TurboUploadService {}
35 changes: 35 additions & 0 deletions src/utils/readableStream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/**
* Copyright (C) 2022-2023 Permanent Data Solutions, Inc. All Rights Reserved.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
export async function readableStreamToBuffer({
stream,
}: {
stream: ReadableStream;
}): Promise<Buffer> {
const reader = stream.getReader();
const chunks: any[] = [];

let done = false;
while (!done) {
const { done: streamDone, value } = await reader.read();
done = streamDone;
if (!done) {
chunks.push(value);
}
}

return Buffer.concat(chunks);
}
Loading

0 comments on commit 42c2ed9

Please sign in to comment.