Skip to content

Commit

Permalink
Patching 0.9.0 release (#631)
Browse files Browse the repository at this point in the history
* Use balanced sharding strategy in bigquery read session. (#601)

We use sharding to provide a way of shuffling the read order. With liquid sharding, a single stream can read all data which wouldn't be shuffled.

* making sure that bigquery reader is returning meaningfull defaults for null values (#626)

* making sure that bigquery reader is returning meaningfull defaults for null values

* linter fixes

* changing after-success.sh to push binaries to dropbox for branch R0.91

* bumping up version 0.9.0->0.9.1
  • Loading branch information
vlasenkoalexey authored Nov 14, 2019
1 parent 5308315 commit e1af544
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 31 deletions.
2 changes: 1 addition & 1 deletion .travis/after-success.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# limitations under the License.
# ==============================================================================

if [[ ( ${TRAVIS_BRANCH} == "master" ) && ( ${TRAVIS_EVENT_TYPE} != "pull_request" ) ]]; then
if [[ (( ${TRAVIS_BRANCH} == "master" ) || ( ${TRAVIS_BRANCH} == "R0.91" )) && ( ${TRAVIS_EVENT_TYPE} != "pull_request" ) ]]; then

twine upload wheelhouse/tensorflow_io_nightly-*.whl

Expand Down
1 change: 1 addition & 0 deletions tensorflow_io/bigquery/kernels/bigquery_kernels.cc
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ class BigQueryReadSessionOp : public OpKernel {
createReadSessionRequest.mutable_read_options()->set_row_restriction(
row_restriction_);
createReadSessionRequest.set_requested_streams(requested_streams_);
createReadSessionRequest.set_sharding_strategy(apiv1beta1::ShardingStrategy::BALANCED);
createReadSessionRequest.set_format(apiv1beta1::DataFormat::AVRO);
VLOG(3) << "createReadSessionRequest: "
<< createReadSessionRequest.DebugString();
Expand Down
24 changes: 24 additions & 0 deletions tensorflow_io/bigquery/kernels/bigquery_lib.h
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,30 @@ class BigQueryReaderDatasetIterator : public DatasetIterator<Dataset> {
field.value<avro::GenericEnum>().symbol();
break;
case avro::AVRO_NULL:
switch(output_types[i]) {
case DT_BOOL:
((*out_tensors)[i]).scalar<bool>()() = false;
break;
case DT_INT32:
((*out_tensors)[i]).scalar<int32>()() = 0;
break;
case DT_INT64:
((*out_tensors)[i]).scalar<int64>()() = 0l;
break;
case DT_FLOAT:
((*out_tensors)[i]).scalar<float>()() = 0.0f;
break;
case DT_DOUBLE:
((*out_tensors)[i]).scalar<double>()() = 0.0;
break;
case DT_STRING:
((*out_tensors)[i]).scalar<string>()() = "";
break;
default:
return errors::InvalidArgument(
"unsupported data type against AVRO_NULL: ",
output_types[i]);
}
break;
default:
return errors::InvalidArgument("unsupported data type: ",
Expand Down
2 changes: 1 addition & 1 deletion tensorflow_io/core/python/ops/io_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@
from __future__ import print_function

package = 'tensorflow>=2.0.0,<2.1.0'
version = '0.9.0'
version = '0.9.1'
132 changes: 103 additions & 29 deletions tests/test_bigquery_eager.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def __init__(self, avro_schema, avro_serialized_rows_per_stream,
self, self._grpc_server)
port = self._grpc_server.add_insecure_port("localhost:0")
self._endpoint = "localhost:" + str(port)
print ("started server on :" + self._endpoint)
print("started a fake server on :" + self._endpoint)

def start(self):
self._grpc_server.start()
Expand All @@ -74,7 +74,7 @@ def _build_stream_name(self, stream_index):
self._table_id + "/" + str(stream_index)

def CreateReadSession(self, request, context):
print ("called CreateReadSession on server")
print("called CreateReadSession on a fake server")
self._project_id = request.table_reference.project_id
self._table_id = request.table_reference.table_id
self._dataset_id = request.table_reference.dataset_id
Expand All @@ -89,10 +89,10 @@ def CreateReadSession(self, request, context):
return response

def ReadRows(self, request, context):
print ("called ReadRows on server")
print("called ReadRows on a fake server")
response = storage_pb2.ReadRowsResponse()
stream_index = self._streams.index(request.read_position.stream.name)
if stream_index >= 0 and stream_index < len(
if 0 <= stream_index < len(
self._avro_serialized_rows_per_stream):
response.avro_rows.serialized_binary_rows = \
self._avro_serialized_rows_per_stream[stream_index]
Expand All @@ -104,53 +104,107 @@ def ReadRows(self, request, context):
class BigqueryOpsTest(test.TestCase):
"""Tests for BigQuery adapter."""

GCP_PROJECT_ID = "bigquery-public-data"
DATASET_ID = "usa_names"
TABLE_ID = "usa_1910_current"
PARENT = "projects/some_parent"
GCP_PROJECT_ID = "test_project_id"
DATASET_ID = "test_dataset"
TABLE_ID = "test_table"
PARENT = "projects/test_parent"

AVRO_SCHEMA = """
{
"type": "record",
"name": "__root__",
"fields": [
{
"name": "state",
"name": "string",
"type": [
"null",
"string"
],
"doc": "2-digit state code"
"doc": "nullable string"
},
{
"name": "name",
"name": "boolean",
"type": [
"null",
"string"
"boolean"
],
"doc": "nullable boolean"
},
{
"name": "int",
"type": [
"null",
"int"
],
"doc": "Given name of a person at birth"
"doc": "nullable int"
},
{
"name": "number",
"name": "long",
"type": [
"null",
"long"
],
"doc": "Number of occurrences of the name"
"doc": "nullable long"
},
{
"name": "float",
"type": [
"null",
"float"
],
"doc": "nullable float"
},
{
"name": "double",
"type": [
"null",
"double"
],
"doc": "nullable double"
}
]
}"""

STREAM_1_ROWS = [{
"state": "wa",
"name": "Andrew",
"number": 1
}, {
"state": "wa",
"name": "Eva",
"number": 2
}]
STREAM_2_ROWS = [{"state": "ny", "name": "Emma", "number": 10}]
STREAM_1_ROWS = [
{
"string": "string1",
"boolean": True,
"int": 10,
"long": 100,
"float": 1000.0,
"double": 10000.0
},
{
"string": "string2",
"boolean": False,
"int": 12,
"long": 102,
"float": 1002.0,
"double": 10002.0
}
]
STREAM_2_ROWS = [
{
"string": "string2",
"boolean": True,
"int": 20,
"long": 200,
"float": 2000.0,
"double": 20000.0
},
{
# Empty record, all values are null
}
]

DEFAULT_VALUES = {
'boolean': False,
'double': 0.0,
'float': 0.0,
'int': 0,
'long': 0,
'string': ''
}

@staticmethod
def _serialize_to_avro(rows, schema):
Expand Down Expand Up @@ -233,8 +287,16 @@ def test_read_rows(self):
self.PARENT,
self.GCP_PROJECT_ID,
self.TABLE_ID,
self.DATASET_ID, ["state", "name", "number"],
[dtypes.string, dtypes.string, dtypes.int64],
self.DATASET_ID,
["string", "boolean", "int", "long", "float", "double"],
[
dtypes.string,
dtypes.bool,
dtypes.int32,
dtypes.int64,
dtypes.float32,
dtypes.float64
],
requested_streams=2)

streams_list = read_session.get_streams()
Expand All @@ -252,6 +314,8 @@ def test_read_rows(self):
itr2 = iter(dataset2)
self.assertEqual(self.STREAM_2_ROWS[0],
self._normalize_dictionary(itr2.get_next()))
self.assertEqual(self.DEFAULT_VALUES,
self._normalize_dictionary(itr2.get_next()))
with self.assertRaises(errors.OutOfRangeError):
itr2.get_next()

Expand All @@ -262,8 +326,16 @@ def test_parallel_read_rows(self):
self.PARENT,
self.GCP_PROJECT_ID,
self.TABLE_ID,
self.DATASET_ID, ["state", "name", "number"],
[dtypes.string, dtypes.string, dtypes.int64],
self.DATASET_ID,
["string", "boolean", "int", "long", "float", "double"],
[
dtypes.string,
dtypes.bool,
dtypes.int32,
dtypes.int64,
dtypes.float32,
dtypes.float64
],
requested_streams=2)

dataset = read_session.parallel_read_rows()
Expand All @@ -274,6 +346,8 @@ def test_parallel_read_rows(self):
self._normalize_dictionary(itr.get_next()))
self.assertEqual(self.STREAM_1_ROWS[1],
self._normalize_dictionary(itr.get_next()))
self.assertEqual(self.DEFAULT_VALUES,
self._normalize_dictionary(itr.get_next()))

if __name__ == "__main__":
test.main()

0 comments on commit e1af544

Please sign in to comment.