Skip to content

Commit

Permalink
Add bulk loading Python Sample [(#2295)](GoogleCloudPlatform/python-d…
Browse files Browse the repository at this point in the history
…ocs-samples#2295)

Adds the following functionality:

* Create bulk_load_csv
* Delete bulk_load_csv
* Create schema.ddl
  • Loading branch information
tonioshikanlu authored and gguuss committed Aug 6, 2019
1 parent cd34847 commit 02fc4e6
Show file tree
Hide file tree
Showing 7 changed files with 17,450 additions and 0 deletions.
77 changes: 77 additions & 0 deletions samples/samples/bulk_load_csv/README.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
Google Cloud Spanner: Bulk Loading From CSV Python Sample
===============

``Google Cloud Spanner`` is a highly scalable, transactional, managed, NewSQL database service.
Cloud Spanner solves the need for a horizontally-scaling database with consistent global transactions and SQL semantics.

This application demonstrates how to load data from a csv file into a Cloud
Spanner database.

The data contained in the csv files is sourced from the "Hacker News - Y Combinator" Bigquery `public dataset`_.

.. _public dataset :
https://cloud.google.com/bigquery/public-data/

Pre-requisuite
-----------------------
Create a database in your Cloud Spanner instance using the `schema`_ in the folder.

.. _schema:
schema.ddl

Setup
------------------------

Authentication
++++++++++++++

This sample requires you to have authentication setup. Refer to the
`Authentication Getting Started Guide`_ for instructions on setting up
credentials for applications.

.. _Authentication Getting Started Guide:
https://cloud.google.com/docs/authentication/getting-started

Install Dependencies
++++++++++++++++++++

#. Install `pip`_ and `virtualenv`_ if you do not already have them. You may want to refer to the `Python Development Environment Setup Guide`_ for Google Cloud Platform for instructions.

.. _Python Development Environment Setup Guide:
https://cloud.google.com/python/setup

#. Create a virtualenv. Samples are compatible with Python 2.7 and 3.4+.

MACOS/LINUX

.. code-block:: bash
$ virtualenv env
$ source env/bin/activate
WINDOWS

.. code-block:: bash
> virtualenv env
> .\env\Scripts\activate
#. Install the dependencies needed to run the samples.

.. code-block:: bash
$ pip install -r requirements.txt
.. _pip: https://pip.pypa.io/
.. _virtualenv: https://virtualenv.pypa.io/


To run sample
-----------------------

$ python batch_import.py instance_id database_id

positional arguments:
instance_id: Your Cloud Spanner instance ID.

database_id : Your Cloud Spanner database ID.
136 changes: 136 additions & 0 deletions samples/samples/bulk_load_csv/batch_import.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
# Copyright 2019 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# This application demonstrates how to do batch operations from a csv file
# using Cloud Spanner.
# For more information, see the README.rst.


import csv
import time
import threading
import argparse
from google.cloud import spanner


def is_bool_null(file):
# This function convertes the boolean values
# in the dataset from strings to boolean data types.
# It also converts the string Null to a None data
# type indicating an empty cell.
data = list(csv.reader(file))
# Reads each line in the csv file.
for line in range(len(data)):
for cell in range(len(data[line])):
# Changes the string to boolean.
if data[line][cell] == 'true':
data[line][cell] = eval('True')
# Changes blank string to python readable None type.
if data[line][cell] == '':
data[line][cell] = None
return (data)


def divide_chunks(lst, n):
# This function divides the csv file into chunks so that the mutation will
# commit every 500 rows.
for i in range(0, len(lst), n):
yield lst[i:i + n]


def insert_data(database, filepath, table_name, column_names):
# This function iterates over the list of files
# belonging to the dataset and,
# writes each line into cloud spanner using the batch mutation function.
with open(filepath, newline='') as file:
data = is_bool_null(file)
data = tuple(data)
l_group = list(divide_chunks(data, 500))
# Inserts each chunk of data into database
for current_inserts in (l_group):
if current_inserts is not None:
with database.batch() as batch:
batch.insert(
table=table_name,
columns=column_names,
values=current_inserts)


def main(instance_id, database_id):
# Inserts sample data into the given database.
# The database and table must already exist and can be created
# using`create_database`.
start = time.time()
# File paths
comments_file = 'hnewscomments.txt'
stories_file = 'hnewsstories.txt'
# Instantiates a spanner client
spanner_client = spanner.Client()
instance = spanner_client.instance(instance_id)
database = instance.database(database_id)
# Sets the Column names.
s_columnnames = (
'id',
'by',
'author',
'dead',
'deleted',
'descendants',
'score',
'text',
'time',
'time_ts',
'title',
'url',
)
c_columnnames = (
'id',
'by',
'author',
'dead',
'deleted',
'parent',
'ranking',
'text',
'time',
'time_ts',
)
# Creates threads
t1 = threading.Thread(
target=insert_data,
args=(database, stories_file, 'stories', s_columnnames))
t2 = threading.Thread(
target=insert_data,
args=(database, comments_file, 'comments', c_columnnames))
# Starting threads
t1.start()
t2.start()
# Wait until all threads finish
t1.join()
t2.join()

print('Finished Inserting Data.')
end = time.time()
print('Time: ', end - start)


if __name__ == '__main__':
parser = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument('instance_id', help='Your Cloud Spanner instance ID.')
parser.add_argument('database_id', help='Your Cloud Spanner database ID.')

args = parser.parse_args()

main(args.instance_id, args.database_id)
67 changes: 67 additions & 0 deletions samples/samples/bulk_load_csv/batch_import_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# Copyright 2019 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# This application demonstrates how to do batch operations from a csv file
# using Cloud Spanner.
# For more information, see the README.rst.
"""Test for batch_import"""
import os
import pytest
import batch_import
from google.cloud import spanner


INSTANCE_ID = os.environ['SPANNER_INSTANCE']
DATABASE_ID = 'hnewsdb'


@pytest.fixture(scope='module')
def spanner_instance():
spanner_client = spanner.Client()
return spanner_client.instance(INSTANCE_ID)


@pytest.fixture
def example_database():
spanner_client = spanner.Client()
instance = spanner_client.instance(INSTANCE_ID)
database = instance.database(DATABASE_ID)

if not database.exists():
with open('schema.ddl', 'r') as myfile:
schema = myfile.read()
database = instance.database(DATABASE_ID, ddl_statements=[schema])
database.create()

yield database
database.drop()


def test_is_bool_null():
assert batch_import.is_bool_null(['12', 'true', '', '12',
'jkl', '']) == [['12'], [True],
[], ['12'],
['jkl'], []]


def test_divide_chunks():
res = list(batch_import.divide_chunks(['12', 'true', '', '12',
'jkl', ''], 2))
assert res == [['12', 'true'], ['', '12'], ['jkl', '']]


def test_insert_data(capsys):
batch_import.main(INSTANCE_ID, DATABASE_ID)
out, _ = capsys.readouterr()
assert 'Finished Inserting Data.' in out
Loading

0 comments on commit 02fc4e6

Please sign in to comment.