Skip to content

Commit

Permalink
docs: add samples for JSONWriter with Default and Committed streams
Browse files Browse the repository at this point in the history
  • Loading branch information
alvarowolfx committed Aug 17, 2023
1 parent 07794c1 commit 96a6cf4
Show file tree
Hide file tree
Showing 3 changed files with 316 additions and 0 deletions.
127 changes: 127 additions & 0 deletions samples/append_rows_json_writer_commited.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

'use strict';

function main(
projectId = 'my_project',
datasetId = 'my_dataset',
tableId = 'my_table'
) {
// [START bigquerystorage_jsonstreamwriter_commited]
const {adapt, managedwriter} = require('@google-cloud/bigquery-storage');
const {WriterClient, JSONWriter} = managedwriter;
const {BigQuery} = require('@google-cloud/bigquery');

async function appendJSONRowsCommitedStream() {
/**
* TODO(developer): Uncomment the following lines before running the sample.
*/
// projectId = 'my_project';
// datasetId = 'my_dataset';
// tableId = 'my_table';

const destinationTable = `projects/${projectId}/datasets/${datasetId}/tables/${tableId}`;
const streamType = managedwriter.CommittedStream;
const writeClient = new WriterClient({projectId});
const bigquery = new BigQuery({projectId: projectId});

try {
const dataset = bigquery.dataset(datasetId);
const table = await dataset.table(tableId);
const [metadata] = await table.getMetadata();
const {schema} = metadata;
const storageSchema =
adapt.convertBigQuerySchemaToStorageTableSchema(schema);
const protoDescriptor = adapt.convertStorageSchemaToProto2Descriptor(
storageSchema,
'root'
);

const connection = await writeClient.createStreamConnection({
streamType,
destinationTable,
});

const streamId = connection.getStreamId();
console.log(`Stream created: ${streamId}`);

const writer = new JSONWriter({
streamId,
connection,
protoDescriptor,
});

let rows = [];
const pendingWrites = [];

// Row 1
let row = {
row_num: 1,
customer_name: 'Octavia',
};
rows.push(row);

// Row 2
row = {
row_num: 2,
customer_name: 'Turing',
};
rows.push(row);

// Send batch.
let pw = writer.appendRows(rows);
pendingWrites.push(pw);

rows = [];

// Row 3
row = {
row_num: 3,
customer_name: 'Bell',
};
rows.push(row);

// Send batch.
pw = writer.appendRows(rows);
pendingWrites.push(pw);

const results = await Promise.all(
pendingWrites.map(pw => pw.getResult())
);
console.log('Write results:', results);

const {rowCount} = await connection.finalize();
console.log(`Row count: ${rowCount}`);

const response = await writeClient.batchCommitWriteStream({
parent: destinationTable,
writeStreams: [streamId],
});

console.log(response);
} catch (err) {
console.log(err);
} finally {
writeClient.close();
}
}
// [END bigquerystorage_jsonstreamwriter_commited]
appendJSONRowsCommitedStream();
}
process.on('unhandledRejection', err => {
console.error(err.message);
process.exitCode = 1;
});
main(...process.argv.slice(2));
114 changes: 114 additions & 0 deletions samples/append_rows_json_writer_default.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

'use strict';

function main(
projectId = 'my_project',
datasetId = 'my_dataset',
tableId = 'my_table'
) {
// [START bigquerystorage_jsonstreamwriter_default]
const {adapt, managedwriter} = require('@google-cloud/bigquery-storage');
const {WriterClient, JSONWriter} = managedwriter;
const {BigQuery} = require('@google-cloud/bigquery');

async function appendJSONRowsDefaultStream() {
/**
* TODO(developer): Uncomment the following lines before running the sample.
*/
// projectId = 'my_project';
// datasetId = 'my_dataset';
// tableId = 'my_table';

const destinationTable = `projects/${projectId}/datasets/${datasetId}/tables/${tableId}`;
const writeClient = new WriterClient({projectId});
const bigquery = new BigQuery({projectId: projectId});

try {
const dataset = bigquery.dataset(datasetId);
const table = await dataset.table(tableId);
const [metadata] = await table.getMetadata();
const {schema} = metadata;
const storageSchema =
adapt.convertBigQuerySchemaToStorageTableSchema(schema);
const protoDescriptor = adapt.convertStorageSchemaToProto2Descriptor(
storageSchema,
'root'
);

const connection = await writeClient.createStreamConnection({
streamId: managedwriter.DefaultStream,
destinationTable,
});
const streamId = connection.getStreamId();

const writer = new JSONWriter({
streamId,
connection,
protoDescriptor,
});

let rows = [];
const pendingWrites = [];

// Row 1
let row = {
row_num: 1,
customer_name: 'Octavia',
};
rows.push(row);

// Row 2
row = {
row_num: 2,
customer_name: 'Turing',
};
rows.push(row);

// Send batch.
let pw = writer.appendRows(rows);
pendingWrites.push(pw);

rows = [];

// Row 3
row = {
row_num: 3,
customer_name: 'Bell',
};
rows.push(row);

// Send batch.
pw = writer.appendRows(rows);
pendingWrites.push(pw);

const results = await Promise.all(
pendingWrites.map(pw => pw.getResult())
);
console.log('Write results:', results);
} catch (err) {
console.log(err);
} finally {
writeClient.close();
}
}
// [END bigquerystorage_jsonstreamwriter_default]
appendJSONRowsDefaultStream();
}
process.on('unhandledRejection', err => {
console.error(err.message);
process.exitCode = 1;
});
main(...process.argv.slice(2));
75 changes: 75 additions & 0 deletions samples/test/writeClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,40 @@ describe('writeClient', () => {
assert.deepInclude(rows, {customer_name: 'Bell', row_num: 3});
});

it('should append rows to default stream', async () => {
const schema = [
{name: 'customer_name', type: 'STRING'},
{name: 'row_num', type: 'INTEGER', mode: 'REQUIRED'},
];

const tableId = generateUuid();

const [table] = await bigquery
.dataset(datasetId)
.createTable(tableId, {schema});

projectId = table.metadata.tableReference.projectId;

execSync(
`node append_rows_json_writer_default ${projectId} ${datasetId} ${tableId}`
);

let [rows] = await table.query(
`SELECT * FROM \`${projectId}.${datasetId}.${tableId}\``
);

rows = rows.map(row => {
if (row.customer_name !== null) {
return row;
}
});

assert.strictEqual(rows.length, 3);
assert.deepInclude(rows, {customer_name: 'Octavia', row_num: 1});
assert.deepInclude(rows, {customer_name: 'Turing', row_num: 2});
assert.deepInclude(rows, {customer_name: 'Bell', row_num: 3});
});

it('should append rows buffered', async () => {
const schema = [
{name: 'customer_name', type: 'STRING'},
Expand Down Expand Up @@ -125,6 +159,43 @@ describe('writeClient', () => {
assert.deepInclude(rows, {customer_name: 'Bell', row_num: 3});
});

it('should append rows committed', async () => {
const schema = [
{name: 'customer_name', type: 'STRING'},
{name: 'row_num', type: 'INTEGER', mode: 'REQUIRED'},
];

const tableId = generateUuid();

const [table] = await bigquery
.dataset(datasetId)
.createTable(tableId, {schema});

projectId = table.metadata.tableReference.projectId;

const output = execSync(
`node append_rows_json_writer_commited ${projectId} ${datasetId} ${tableId}`
);

assert.match(output, /Stream created:/);
assert.match(output, /Row count: 3/);

let [rows] = await table.query(
`SELECT * FROM \`${projectId}.${datasetId}.${tableId}\``
);

rows = rows.map(row => {
if (row.customer_name !== null) {
return row;
}
});

assert.strictEqual(rows.length, 3);
assert.deepInclude(rows, {customer_name: 'Octavia', row_num: 1});
assert.deepInclude(rows, {customer_name: 'Turing', row_num: 2});
assert.deepInclude(rows, {customer_name: 'Bell', row_num: 3});
});

it('should append rows in non-US regions', async () => {
const schema = [
{name: 'customer_name', type: 'STRING'},
Expand Down Expand Up @@ -170,6 +241,10 @@ describe('writeClient', () => {
it('adapts BQ Schema to Proto descriptor', async () => {
return testAppendRowsMultipleType('append_rows_table_to_proto2');
});

it('using JSON Writer ', async () => {
return testAppendRowsMultipleType('append_rows_table_to_proto2');
});
});

async function testAppendRowsMultipleType(testFile) {
Expand Down

0 comments on commit 96a6cf4

Please sign in to comment.