Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: handle AttributeError in bigquery_storage writer #414

Merged
merged 8 commits into from
Jun 24, 2022

Conversation

parthea
Copy link
Contributor

@parthea parthea commented Mar 14, 2022

This PR fixes an issue where an AttributeError could be raised here if either self._rpc or the self._consumer is null. @tswast and I saw this AttributeError locally with the changes from googleapis/python-api-core#357

See stack trace below

Exception has occurred: AttributeError
'NoneType' object has no attribute 'is_active'
  File "[/usr/local/google/home/partheniou/git/python-bigquery-storage/google/cloud/bigquery_storage_v1/writer.py]()", line 192, in _open
    while not self._rpc.is_active and self._consumer.is_active:
  File "[/usr/local/google/home/partheniou/git/python-bigquery-storage/google/cloud/bigquery_storage_v1/writer.py]()", line 230, in send
    return self._open(request)
  File "[/usr/local/google/home/partheniou/git/python-bigquery-storage/samples/snippets/append_rows_proto2.py]()", line 123, in append_rows_proto2
    response_future_1 = append_rows_stream.send(request)
  File "[/usr/local/google/home/partheniou/git/python-bigquery-storage/samples/snippets/append_rows_proto2_test.py]()", line 58, in test_append_rows_proto2
    append_rows_proto2.append_rows_proto2(

@parthea parthea requested a review from a team as a code owner March 14, 2022 21:05
@parthea parthea requested review from a team and stephaniewang526 March 14, 2022 21:05
@product-auto-label product-auto-label bot added the api: bigquerystorage Issues related to the googleapis/python-bigquery-storage API. label Mar 14, 2022
@parthea parthea force-pushed the handle-null-rpc-and-consumer branch from 3956003 to fc2b386 Compare March 14, 2022 21:06
@parthea parthea changed the title fix: resolve issue AttributeError in bigquery_storage writer fix: resolve AttributeError in bigquery_storage writer Mar 14, 2022
tswast
tswast previously requested changes Mar 15, 2022
google/cloud/bigquery_storage_v1/writer.py Outdated Show resolved Hide resolved
@parthea parthea changed the title fix: resolve AttributeError in bigquery_storage writer fix: handle AttributeError in bigquery_storage writer Mar 15, 2022
@parthea parthea requested a review from tswast March 15, 2022 15:33
@parthea parthea added kokoro:run Add this label to force Kokoro to re-run the tests. kokoro:force-run Add this label to force Kokoro to re-run the tests. labels Mar 15, 2022
@yoshi-kokoro yoshi-kokoro removed kokoro:run Add this label to force Kokoro to re-run the tests. kokoro:force-run Add this label to force Kokoro to re-run the tests. labels Mar 15, 2022
Copy link
Contributor

@tswast tswast left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good! Would be great if we could get a unit test or two checking this behavior (closed during _open)

@meredithslota meredithslota requested review from steffnay and removed request for stephaniewang526 May 13, 2022 15:33
@product-auto-label product-auto-label bot added the size: m Pull request size is medium. label Jun 13, 2022
@parthea
Copy link
Contributor Author

parthea commented Jun 13, 2022

I'm only able to get attribute error when I change the client code here to the code in googleapis/python-api-core#268 (comment). I suggest proceeding without a test as I'm not able to get the code to fail without changing the client code in writer.py.

@parthea
Copy link
Contributor Author

parthea commented Jun 13, 2022

Here is an example of the failure locally when using the modified version of writer.py from googleapis/python-api-core#268 (comment)

______________________________________________________________________________________________________________ test_append_rows_proto2[non-US] _______________________________________________________________________________________________________________

capsys = <_pytest.capture.CaptureFixture object at 0x7f56ca07ff70>, bigquery_client = <google.cloud.bigquery.client.Client object at 0x7f56ca1026d0>
sample_data_table = 'python-docs-samples-tests.python_bigquery_storage_samples_snippets_20220613232605_aa4ab3.append_rows_proto2_1390'

    def test_append_rows_proto2(
        capsys: pytest.CaptureFixture,
        bigquery_client: bigquery.Client,
        sample_data_table: str,
    ):
        project_id, dataset_id, table_id = sample_data_table.split(".")
>       append_rows_proto2.append_rows_proto2(
            project_id=project_id, dataset_id=dataset_id, table_id=table_id
        )

samples/snippets/append_rows_proto2_test.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
samples/snippets/append_rows_proto2.py:124: in append_rows_proto2
    response_future_1 = append_rows_stream.send(request)
google/cloud/bigquery_storage_v1/writer.py:221: in send
    return self._open(request)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <google.cloud.bigquery_storage_v1.writer.AppendRowsStream object at 0x7f56e1859af0>
initial_request = offset {
}
proto_rows {
  rows {
    serialized_rows: "\010\001\022\rHello, World!\031\000\000\000\000\000\000\360\177...1\004"
    serialized_rows: " \270\213\004\200\001\005"
    serialized_rows: "*\020Auf Wiedersehen!\200\001\006"
  }
}

timeout = 600

    def _open(
        self,
        initial_request: gapic_types.AppendRowsRequest,
        timeout: float = _DEFAULT_TIMEOUT,
    ) -> "AppendRowsFuture":
        """Open an append rows stream.
    
        This is automatically called by the first call to the
        :attr:`google.cloud.bigquery_storage_v1.writer.AppendRowsStream.send`
        method.
    
        Args:
            initial_request:
                The initial request to start the stream. Must have
                :attr:`google.cloud.bigquery_storage_v1.types.AppendRowsRequest.write_stream`
                and ``proto_rows.writer_schema.proto_descriptor`` and
                properties populated.
            timeout:
                How long (in seconds) to wait for the stream to be ready.
    
        Returns:
            A future, which can be used to process the response to the initial
            request when it arrives.
        """
        if self.is_active:
            raise ValueError("This manager is already open.")
    
        if self._closed:
            raise bqstorage_exceptions.StreamClosedError(
                "This manager has been closed and can not be re-used."
            )
    
        start_time = time.monotonic()
        self._inital_request = initial_request
        self._stream_name = initial_request.write_stream
    
        inital_response_future = AppendRowsFuture(self)
        self._futures_queue.put(inital_response_future)
    
        self._rpc = bidi.BidiRpc(
            self._client.append_rows,
            initial_request=self._inital_request,
            # TODO: pass in retry and timeout. Blocked by
            # https://github.com/googleapis/python-api-core/issues/262
            metadata=tuple(
                itertools.chain(
                    self._metadata,
                    # This header is required so that the BigQuery Storage API
                    # knows which region to route the request to.
                    (("x-goog-request-params", f"write_stream={self._stream_name}"),),
                )
            ),
        )
        self._rpc.add_done_callback(self._on_rpc_done)
    
        self._consumer = bidi.BackgroundConsumer(self._rpc, self._on_response)
        self._consumer.start()
    
        # Make sure RPC has started before returning.
        # Without this, consumers may get:
        #
        # ValueError: Can not send() on an RPC that has never been open()ed.
        #
        # when they try to send a request.
>       while not self._rpc.is_active and self._consumer.is_active:
E       AttributeError: 'NoneType' object has no attribute 'is_active'

google/cloud/bigquery_storage_v1/writer.py:183: AttributeError

@parthea parthea assigned loferris and unassigned tswast Jun 13, 2022
@parthea parthea requested a review from loferris June 13, 2022 23:29
@parthea parthea dismissed tswast’s stale review June 24, 2022 15:30

Dismiss stale review

@parthea parthea merged commit 2cb641a into main Jun 24, 2022
@parthea parthea deleted the handle-null-rpc-and-consumer branch June 24, 2022 15:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: bigquerystorage Issues related to the googleapis/python-bigquery-storage API. size: m Pull request size is medium.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants