From 96a6cf46d1e99efa85da6fe5e2b0f25a0b7b377a Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Thu, 17 Aug 2023 16:55:02 -0400 Subject: [PATCH] docs: add samples for JSONWriter with Default and Committed streams --- samples/append_rows_json_writer_commited.js | 127 ++++++++++++++++++++ samples/append_rows_json_writer_default.js | 114 ++++++++++++++++++ samples/test/writeClient.js | 75 ++++++++++++ 3 files changed, 316 insertions(+) create mode 100644 samples/append_rows_json_writer_commited.js create mode 100644 samples/append_rows_json_writer_default.js diff --git a/samples/append_rows_json_writer_commited.js b/samples/append_rows_json_writer_commited.js new file mode 100644 index 00000000..581327bc --- /dev/null +++ b/samples/append_rows_json_writer_commited.js @@ -0,0 +1,127 @@ +// Copyright 2022 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +'use strict'; + +function main( + projectId = 'my_project', + datasetId = 'my_dataset', + tableId = 'my_table' +) { + // [START bigquerystorage_jsonstreamwriter_commited] + const {adapt, managedwriter} = require('@google-cloud/bigquery-storage'); + const {WriterClient, JSONWriter} = managedwriter; + const {BigQuery} = require('@google-cloud/bigquery'); + + async function appendJSONRowsCommitedStream() { + /** + * TODO(developer): Uncomment the following lines before running the sample. + */ + // projectId = 'my_project'; + // datasetId = 'my_dataset'; + // tableId = 'my_table'; + + const destinationTable = `projects/${projectId}/datasets/${datasetId}/tables/${tableId}`; + const streamType = managedwriter.CommittedStream; + const writeClient = new WriterClient({projectId}); + const bigquery = new BigQuery({projectId: projectId}); + + try { + const dataset = bigquery.dataset(datasetId); + const table = await dataset.table(tableId); + const [metadata] = await table.getMetadata(); + const {schema} = metadata; + const storageSchema = + adapt.convertBigQuerySchemaToStorageTableSchema(schema); + const protoDescriptor = adapt.convertStorageSchemaToProto2Descriptor( + storageSchema, + 'root' + ); + + const connection = await writeClient.createStreamConnection({ + streamType, + destinationTable, + }); + + const streamId = connection.getStreamId(); + console.log(`Stream created: ${streamId}`); + + const writer = new JSONWriter({ + streamId, + connection, + protoDescriptor, + }); + + let rows = []; + const pendingWrites = []; + + // Row 1 + let row = { + row_num: 1, + customer_name: 'Octavia', + }; + rows.push(row); + + // Row 2 + row = { + row_num: 2, + customer_name: 'Turing', + }; + rows.push(row); + + // Send batch. + let pw = writer.appendRows(rows); + pendingWrites.push(pw); + + rows = []; + + // Row 3 + row = { + row_num: 3, + customer_name: 'Bell', + }; + rows.push(row); + + // Send batch. + pw = writer.appendRows(rows); + pendingWrites.push(pw); + + const results = await Promise.all( + pendingWrites.map(pw => pw.getResult()) + ); + console.log('Write results:', results); + + const {rowCount} = await connection.finalize(); + console.log(`Row count: ${rowCount}`); + + const response = await writeClient.batchCommitWriteStream({ + parent: destinationTable, + writeStreams: [streamId], + }); + + console.log(response); + } catch (err) { + console.log(err); + } finally { + writeClient.close(); + } + } + // [END bigquerystorage_jsonstreamwriter_commited] + appendJSONRowsCommitedStream(); +} +process.on('unhandledRejection', err => { + console.error(err.message); + process.exitCode = 1; +}); +main(...process.argv.slice(2)); diff --git a/samples/append_rows_json_writer_default.js b/samples/append_rows_json_writer_default.js new file mode 100644 index 00000000..b791fcf0 --- /dev/null +++ b/samples/append_rows_json_writer_default.js @@ -0,0 +1,114 @@ +// Copyright 2022 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +'use strict'; + +function main( + projectId = 'my_project', + datasetId = 'my_dataset', + tableId = 'my_table' +) { + // [START bigquerystorage_jsonstreamwriter_default] + const {adapt, managedwriter} = require('@google-cloud/bigquery-storage'); + const {WriterClient, JSONWriter} = managedwriter; + const {BigQuery} = require('@google-cloud/bigquery'); + + async function appendJSONRowsDefaultStream() { + /** + * TODO(developer): Uncomment the following lines before running the sample. + */ + // projectId = 'my_project'; + // datasetId = 'my_dataset'; + // tableId = 'my_table'; + + const destinationTable = `projects/${projectId}/datasets/${datasetId}/tables/${tableId}`; + const writeClient = new WriterClient({projectId}); + const bigquery = new BigQuery({projectId: projectId}); + + try { + const dataset = bigquery.dataset(datasetId); + const table = await dataset.table(tableId); + const [metadata] = await table.getMetadata(); + const {schema} = metadata; + const storageSchema = + adapt.convertBigQuerySchemaToStorageTableSchema(schema); + const protoDescriptor = adapt.convertStorageSchemaToProto2Descriptor( + storageSchema, + 'root' + ); + + const connection = await writeClient.createStreamConnection({ + streamId: managedwriter.DefaultStream, + destinationTable, + }); + const streamId = connection.getStreamId(); + + const writer = new JSONWriter({ + streamId, + connection, + protoDescriptor, + }); + + let rows = []; + const pendingWrites = []; + + // Row 1 + let row = { + row_num: 1, + customer_name: 'Octavia', + }; + rows.push(row); + + // Row 2 + row = { + row_num: 2, + customer_name: 'Turing', + }; + rows.push(row); + + // Send batch. + let pw = writer.appendRows(rows); + pendingWrites.push(pw); + + rows = []; + + // Row 3 + row = { + row_num: 3, + customer_name: 'Bell', + }; + rows.push(row); + + // Send batch. + pw = writer.appendRows(rows); + pendingWrites.push(pw); + + const results = await Promise.all( + pendingWrites.map(pw => pw.getResult()) + ); + console.log('Write results:', results); + } catch (err) { + console.log(err); + } finally { + writeClient.close(); + } + } + // [END bigquerystorage_jsonstreamwriter_default] + appendJSONRowsDefaultStream(); +} +process.on('unhandledRejection', err => { + console.error(err.message); + process.exitCode = 1; +}); +main(...process.argv.slice(2)); diff --git a/samples/test/writeClient.js b/samples/test/writeClient.js index 53a4a743..fc238eea 100644 --- a/samples/test/writeClient.js +++ b/samples/test/writeClient.js @@ -88,6 +88,40 @@ describe('writeClient', () => { assert.deepInclude(rows, {customer_name: 'Bell', row_num: 3}); }); + it('should append rows to default stream', async () => { + const schema = [ + {name: 'customer_name', type: 'STRING'}, + {name: 'row_num', type: 'INTEGER', mode: 'REQUIRED'}, + ]; + + const tableId = generateUuid(); + + const [table] = await bigquery + .dataset(datasetId) + .createTable(tableId, {schema}); + + projectId = table.metadata.tableReference.projectId; + + execSync( + `node append_rows_json_writer_default ${projectId} ${datasetId} ${tableId}` + ); + + let [rows] = await table.query( + `SELECT * FROM \`${projectId}.${datasetId}.${tableId}\`` + ); + + rows = rows.map(row => { + if (row.customer_name !== null) { + return row; + } + }); + + assert.strictEqual(rows.length, 3); + assert.deepInclude(rows, {customer_name: 'Octavia', row_num: 1}); + assert.deepInclude(rows, {customer_name: 'Turing', row_num: 2}); + assert.deepInclude(rows, {customer_name: 'Bell', row_num: 3}); + }); + it('should append rows buffered', async () => { const schema = [ {name: 'customer_name', type: 'STRING'}, @@ -125,6 +159,43 @@ describe('writeClient', () => { assert.deepInclude(rows, {customer_name: 'Bell', row_num: 3}); }); + it('should append rows committed', async () => { + const schema = [ + {name: 'customer_name', type: 'STRING'}, + {name: 'row_num', type: 'INTEGER', mode: 'REQUIRED'}, + ]; + + const tableId = generateUuid(); + + const [table] = await bigquery + .dataset(datasetId) + .createTable(tableId, {schema}); + + projectId = table.metadata.tableReference.projectId; + + const output = execSync( + `node append_rows_json_writer_commited ${projectId} ${datasetId} ${tableId}` + ); + + assert.match(output, /Stream created:/); + assert.match(output, /Row count: 3/); + + let [rows] = await table.query( + `SELECT * FROM \`${projectId}.${datasetId}.${tableId}\`` + ); + + rows = rows.map(row => { + if (row.customer_name !== null) { + return row; + } + }); + + assert.strictEqual(rows.length, 3); + assert.deepInclude(rows, {customer_name: 'Octavia', row_num: 1}); + assert.deepInclude(rows, {customer_name: 'Turing', row_num: 2}); + assert.deepInclude(rows, {customer_name: 'Bell', row_num: 3}); + }); + it('should append rows in non-US regions', async () => { const schema = [ {name: 'customer_name', type: 'STRING'}, @@ -170,6 +241,10 @@ describe('writeClient', () => { it('adapts BQ Schema to Proto descriptor', async () => { return testAppendRowsMultipleType('append_rows_table_to_proto2'); }); + + it('using JSON Writer ', async () => { + return testAppendRowsMultipleType('append_rows_table_to_proto2'); + }); }); async function testAppendRowsMultipleType(testFile) {