From 29dcf9ee9222a325a1edd234cf3f5ce80f6b3975 Mon Sep 17 00:00:00 2001 From: Cagri Yonca Date: Thu, 22 Aug 2024 18:46:26 +0200 Subject: [PATCH] update: added google-cloud-storage v2.18.2 support Signed-off-by: Cagri Yonca --- tests/clients/test_google-cloud-pubsub.py | 1153 ++++++++++++++++++--- tests/requirements-310.txt | 2 +- tests/requirements.txt | 2 +- 3 files changed, 1023 insertions(+), 134 deletions(-) diff --git a/tests/clients/test_google-cloud-pubsub.py b/tests/clients/test_google-cloud-pubsub.py index 48b57eda9..d8762584f 100644 --- a/tests/clients/test_google-cloud-pubsub.py +++ b/tests/clients/test_google-cloud-pubsub.py @@ -1,189 +1,1078 @@ # (c) Copyright IBM Corp. 2021 -# (c) Copyright Instana Inc. 2021 +# (c) Copyright Instana Inc. 2020 -import os -import threading -import time -import six +import sys import unittest +import json +import requests +import io -from google.cloud.pubsub_v1 import PublisherClient, SubscriberClient -from google.api_core.exceptions import AlreadyExists -from google.cloud.pubsub_v1.publisher import exceptions from instana.singletons import agent, tracer from tests.test_utils import _TraceContextMixin -# Use PubSub Emulator exposed at :8085 -os.environ["PUBSUB_EMULATOR_HOST"] = "localhost:8085" +from mock import patch, Mock +from six.moves import http_client +from google.cloud import storage +from google.api_core import iam +from google.auth.credentials import AnonymousCredentials -class TestPubSubPublish(unittest.TestCase, _TraceContextMixin): - @classmethod - def setUpClass(cls): - cls.publisher = PublisherClient() +class TestGoogleCloudStorage(unittest.TestCase, _TraceContextMixin): def setUp(self): self.recorder = tracer.recorder self.recorder.clear_spans() - self.project_id = 'test-project' - self.topic_name = 'test-topic' - - # setup topic_path & topic - self.topic_path = self.publisher.topic_path(self.project_id, self.topic_name) - try: - self.publisher.create_topic(request={"name": self.topic_path}) - except AlreadyExists: - self.publisher.delete_topic(request={"topic": self.topic_path}) - self.publisher.create_topic(request={"name": self.topic_path}) - def tearDown(self): - self.publisher.delete_topic(request={"topic": self.topic_path}) + """Ensure that allow_exit_as_root has the default value""" agent.options.allow_exit_as_root = False - def test_publish(self): - # publish a single message - with tracer.start_active_span('test'): - future = self.publisher.publish(self.topic_path, - b'Test Message', - origin="instana") - time.sleep(2.0) # for sanity - result = future.result() - self.assertIsInstance(result, six.string_types) + @unittest.skipIf( + sys.platform == "darwin", reason="Raises not Implemented exception in OSX" + ) + @patch("requests.Session.request") + def test_buckets_list(self, mock_requests): + mock_requests.return_value = self._mock_response( + json_content={"kind": "storage#buckets", "items": []}, + status_code=http_client.OK, + ) + + client = self._client( + credentials=AnonymousCredentials(), project="test-project" + ) + + with tracer.start_active_span("test"): + buckets = client.list_buckets() + self.assertEqual( + 0, + self.recorder.queue_size(), + msg="span has been created before the actual request", + ) + + # trigger the iterator + for b in buckets: + pass spans = self.recorder.queued_spans() - gcps_span, test_span = spans[0], spans[1] self.assertEqual(2, len(spans)) self.assertIsNone(tracer.active_span) - self.assertEqual('gcps', gcps_span.n) - self.assertEqual(2, gcps_span.k) # EXIT - self.assertEqual('publish', gcps_span.data['gcps']['op']) - self.assertEqual(self.topic_name, gcps_span.data['gcps']['top']) + gcs_span = spans[0] + test_span = spans[1] - # Trace Context Propagation - self.assertTraceContextPropagated(test_span, gcps_span) + self.assertTraceContextPropagated(test_span, gcs_span) - # Error logging - self.assertErrorLogging(spans) + self.assertEqual("gcs", gcs_span.n) + self.assertEqual(2, gcs_span.k) + self.assertIsNone(gcs_span.ec) - def test_publish_as_root_exit_span(self): + self.assertEqual("buckets.list", gcs_span.data["gcs"]["op"]) + self.assertEqual("test-project", gcs_span.data["gcs"]["projectId"]) + + @unittest.skipIf( + sys.platform == "darwin", reason="Raises not Implemented exception in OSX" + ) + @patch("requests.Session.request") + def test_buckets_list_as_root_exit_span(self, mock_requests): agent.options.allow_exit_as_root = True - # publish a single message - future = self.publisher.publish(self.topic_path, - b'Test Message', - origin="instana") - time.sleep(2.0) # for sanity - result = future.result() - self.assertIsInstance(result, six.string_types) + mock_requests.return_value = self._mock_response( + json_content={"kind": "storage#buckets", "items": []}, + status_code=http_client.OK, + ) + + client = self._client( + credentials=AnonymousCredentials(), project="test-project" + ) + + buckets = client.list_buckets() + self.assertEqual( + 0, + self.recorder.queue_size(), + msg="span has been created before the actual request", + ) + + # trigger the iterator + for b in buckets: + pass spans = self.recorder.queued_spans() + self.assertEqual(1, len(spans)) - gcps_span = spans[0] + self.assertIsNone(tracer.active_span) + + gcs_span = spans[0] + + self.assertEqual("gcs", gcs_span.n) + self.assertEqual(2, gcs_span.k) + self.assertIsNone(gcs_span.ec) + + self.assertEqual("buckets.list", gcs_span.data["gcs"]["op"]) + self.assertEqual("test-project", gcs_span.data["gcs"]["projectId"]) + + @patch("requests.Session.request") + def test_buckets_insert(self, mock_requests): + mock_requests.return_value = self._mock_response( + json_content={"kind": "storage#bucket"}, status_code=http_client.OK + ) + + client = self._client( + credentials=AnonymousCredentials(), project="test-project" + ) + with tracer.start_active_span("test"): + client.create_bucket("test bucket") + + spans = self.recorder.queued_spans() + + self.assertEqual(2, len(spans)) self.assertIsNone(tracer.active_span) - self.assertEqual('gcps', gcps_span.n) - self.assertEqual(2, gcps_span.k) # EXIT - self.assertEqual('publish', gcps_span.data['gcps']['op']) - self.assertEqual(self.topic_name, gcps_span.data['gcps']['top']) + gcs_span = spans[0] + test_span = spans[1] - # Error logging - self.assertErrorLogging(spans) + self.assertTraceContextPropagated(test_span, gcs_span) + self.assertEqual("gcs", gcs_span.n) + self.assertEqual(2, gcs_span.k) + self.assertIsNone(gcs_span.ec) -class AckCallback(object): - def __init__(self): - self.calls = 0 - self.lock = threading.Lock() + self.assertEqual("buckets.insert", gcs_span.data["gcs"]["op"]) + self.assertEqual("test-project", gcs_span.data["gcs"]["projectId"]) + self.assertEqual("test bucket", gcs_span.data["gcs"]["bucket"]) - def __call__(self, message): - message.ack() - # Only increment the number of calls **after** finishing. - with self.lock: - self.calls += 1 + @patch("requests.Session.request") + def test_buckets_get(self, mock_requests): + mock_requests.return_value = self._mock_response( + json_content={"kind": "storage#bucket"}, status_code=http_client.OK + ) + client = self._client( + credentials=AnonymousCredentials(), project="test-project" + ) -class TestPubSubSubscribe(unittest.TestCase, _TraceContextMixin): - @classmethod - def setUpClass(cls): - cls.publisher = PublisherClient() - cls.subscriber = SubscriberClient() + with tracer.start_active_span("test"): + client.get_bucket("test bucket") - def setUp(self): + spans = self.recorder.queued_spans() - self.recorder = tracer.recorder - self.recorder.clear_spans() + self.assertEqual(2, len(spans)) + self.assertIsNone(tracer.active_span) + + gcs_span = spans[0] + test_span = spans[1] + + self.assertEqual(test_span.t, gcs_span.t) + self.assertEqual(test_span.s, gcs_span.p) + + self.assertEqual("gcs", gcs_span.n) + self.assertEqual(2, gcs_span.k) + self.assertIsNone(gcs_span.ec) + + self.assertEqual("buckets.get", gcs_span.data["gcs"]["op"]) + self.assertEqual("test bucket", gcs_span.data["gcs"]["bucket"]) + + @patch("requests.Session.request") + def test_buckets_patch(self, mock_requests): + mock_requests.return_value = self._mock_response( + json_content={"kind": "storage#bucket"}, status_code=http_client.OK + ) + + client = self._client( + credentials=AnonymousCredentials(), project="test-project" + ) + + with tracer.start_active_span("test"): + client.bucket("test bucket").patch() + + spans = self.recorder.queued_spans() + + self.assertEqual(2, len(spans)) + self.assertIsNone(tracer.active_span) + + gcs_span = spans[0] + test_span = spans[1] + + self.assertTraceContextPropagated(test_span, gcs_span) + + self.assertEqual("gcs", gcs_span.n) + self.assertEqual(2, gcs_span.k) + self.assertIsNone(gcs_span.ec) + + self.assertEqual("buckets.patch", gcs_span.data["gcs"]["op"]) + self.assertEqual("test bucket", gcs_span.data["gcs"]["bucket"]) + + @patch("requests.Session.request") + def test_buckets_update(self, mock_requests): + mock_requests.return_value = self._mock_response( + json_content={"kind": "storage#bucket"}, status_code=http_client.OK + ) + + client = self._client( + credentials=AnonymousCredentials(), project="test-project" + ) + + with tracer.start_active_span("test"): + client.bucket("test bucket").update() + + spans = self.recorder.queued_spans() + + self.assertEqual(2, len(spans)) + self.assertIsNone(tracer.active_span) + + gcs_span = spans[0] + test_span = spans[1] + + self.assertTraceContextPropagated(test_span, gcs_span) + + self.assertEqual("gcs", gcs_span.n) + self.assertEqual(2, gcs_span.k) + self.assertIsNone(gcs_span.ec) + + self.assertEqual("buckets.update", gcs_span.data["gcs"]["op"]) + self.assertEqual("test bucket", gcs_span.data["gcs"]["bucket"]) + + @patch("requests.Session.request") + def test_buckets_get_iam_policy(self, mock_requests): + mock_requests.return_value = self._mock_response( + json_content={"kind": "storage#policy"}, status_code=http_client.OK + ) + + client = self._client( + credentials=AnonymousCredentials(), project="test-project" + ) + + with tracer.start_active_span("test"): + client.bucket("test bucket").get_iam_policy() + + spans = self.recorder.queued_spans() + + self.assertEqual(2, len(spans)) + self.assertIsNone(tracer.active_span) + + gcs_span = spans[0] + test_span = spans[1] + + self.assertTraceContextPropagated(test_span, gcs_span) + + self.assertEqual("gcs", gcs_span.n) + self.assertEqual(2, gcs_span.k) + self.assertIsNone(gcs_span.ec) + + self.assertEqual("buckets.getIamPolicy", gcs_span.data["gcs"]["op"]) + self.assertEqual("test bucket", gcs_span.data["gcs"]["bucket"]) + + @patch("requests.Session.request") + def test_buckets_set_iam_policy(self, mock_requests): + mock_requests.return_value = self._mock_response( + json_content={"kind": "storage#policy"}, status_code=http_client.OK + ) + + client = self._client( + credentials=AnonymousCredentials(), project="test-project" + ) + + with tracer.start_active_span("test"): + client.bucket("test bucket").set_iam_policy(iam.Policy()) + + spans = self.recorder.queued_spans() + + self.assertEqual(2, len(spans)) + self.assertIsNone(tracer.active_span) + + gcs_span = spans[0] + test_span = spans[1] + + self.assertTraceContextPropagated(test_span, gcs_span) + + self.assertEqual("gcs", gcs_span.n) + self.assertEqual(2, gcs_span.k) + self.assertIsNone(gcs_span.ec) + + self.assertEqual("buckets.setIamPolicy", gcs_span.data["gcs"]["op"]) + self.assertEqual("test bucket", gcs_span.data["gcs"]["bucket"]) + + @patch("requests.Session.request") + def test_buckets_test_iam_permissions(self, mock_requests): + mock_requests.return_value = self._mock_response( + json_content={"kind": "storage#testIamPermissionsResponse"}, + status_code=http_client.OK, + ) + + client = self._client( + credentials=AnonymousCredentials(), project="test-project" + ) + + with tracer.start_active_span("test"): + client.bucket("test bucket").test_iam_permissions("test-permission") + + spans = self.recorder.queued_spans() + + self.assertEqual(2, len(spans)) + self.assertIsNone(tracer.active_span) - self.project_id = 'test-project' - self.topic_name = 'test-topic' - self.subscription_name = 'test-subscription' - - # setup topic_path & topic - self.topic_path = self.publisher.topic_path(self.project_id, self.topic_name) - try: - self.publisher.create_topic(request={"name": self.topic_path}) - except AlreadyExists: - self.publisher.delete_topic(request={"topic": self.topic_path}) - self.publisher.create_topic(request={"name": self.topic_path}) - - # setup subscription path & attach subscription - self.subscription_path = self.subscriber.subscription_path( - self.project_id, self.subscription_name) - try: - self.subscriber.create_subscription( - request={"name": self.subscription_path, "topic": self.topic_path} + gcs_span = spans[0] + test_span = spans[1] + + self.assertTraceContextPropagated(test_span, gcs_span) + + self.assertEqual("gcs", gcs_span.n) + self.assertEqual(2, gcs_span.k) + self.assertIsNone(gcs_span.ec) + + self.assertEqual("buckets.testIamPermissions", gcs_span.data["gcs"]["op"]) + self.assertEqual("test bucket", gcs_span.data["gcs"]["bucket"]) + + @patch("requests.Session.request") + def test_buckets_lock_retention_policy(self, mock_requests): + mock_requests.return_value = self._mock_response( + json_content={ + "kind": "storage#bucket", + "metageneration": 1, + "retentionPolicy": {"isLocked": False}, + }, + status_code=http_client.OK, + ) + + client = self._client( + credentials=AnonymousCredentials(), project="test-project" + ) + + bucket = client.bucket("test bucket") + bucket.reload() + + with tracer.start_active_span("test"): + bucket.lock_retention_policy() + + spans = self.recorder.queued_spans() + + self.assertEqual(2, len(spans)) + self.assertIsNone(tracer.active_span) + + gcs_span = spans[0] + test_span = spans[1] + + self.assertTraceContextPropagated(test_span, gcs_span) + + self.assertEqual("gcs", gcs_span.n) + self.assertEqual(2, gcs_span.k) + self.assertIsNone(gcs_span.ec) + + self.assertEqual("buckets.lockRetentionPolicy", gcs_span.data["gcs"]["op"]) + self.assertEqual("test bucket", gcs_span.data["gcs"]["bucket"]) + + @patch("requests.Session.request") + def test_buckets_delete(self, mock_requests): + mock_requests.return_value = self._mock_response() + + client = self._client( + credentials=AnonymousCredentials(), project="test-project" + ) + + with tracer.start_active_span("test"): + client.bucket("test bucket").delete() + + spans = self.recorder.queued_spans() + + self.assertEqual(2, len(spans)) + self.assertIsNone(tracer.active_span) + + gcs_span = spans[0] + test_span = spans[1] + + self.assertTraceContextPropagated(test_span, gcs_span) + + self.assertEqual("gcs", gcs_span.n) + self.assertEqual(2, gcs_span.k) + self.assertIsNone(gcs_span.ec) + + self.assertEqual("buckets.delete", gcs_span.data["gcs"]["op"]) + self.assertEqual("test bucket", gcs_span.data["gcs"]["bucket"]) + + @patch("requests.Session.request") + def test_objects_compose(self, mock_requests): + mock_requests.return_value = self._mock_response( + json_content={"kind": "storage#object"}, status_code=http_client.OK + ) + + client = self._client( + credentials=AnonymousCredentials(), project="test-project" + ) + + with tracer.start_active_span("test"): + client.bucket("test bucket").blob("dest object").compose( + [ + storage.blob.Blob("object 1", "test bucket"), + storage.blob.Blob("object 2", "test bucket"), + ] ) - except AlreadyExists: - self.subscriber.delete_subscription(request={"subscription": self.subscription_path}) - self.subscriber.create_subscription( - request={"name": self.subscription_path, "topic": self.topic_path} + + spans = self.recorder.queued_spans() + + self.assertEqual(2, len(spans)) + self.assertIsNone(tracer.active_span) + + gcs_span = spans[0] + test_span = spans[1] + + self.assertTraceContextPropagated(test_span, gcs_span) + + self.assertEqual("gcs", gcs_span.n) + self.assertEqual(2, gcs_span.k) + self.assertIsNone(gcs_span.ec) + + self.assertEqual("objects.compose", gcs_span.data["gcs"]["op"]) + self.assertEqual("test bucket", gcs_span.data["gcs"]["destinationBucket"]) + self.assertEqual("dest object", gcs_span.data["gcs"]["destinationObject"]) + self.assertEqual( + "test bucket/object 1,test bucket/object 2", + gcs_span.data["gcs"]["sourceObjects"], + ) + + @patch("requests.Session.request") + def test_objects_copy(self, mock_requests): + mock_requests.return_value = self._mock_response( + json_content={"kind": "storage#object"}, status_code=http_client.OK + ) + + client = self._client( + credentials=AnonymousCredentials(), project="test-project" + ) + bucket = client.bucket("src bucket") + + with tracer.start_active_span("test"): + bucket.copy_blob( + bucket.blob("src object"), + client.bucket("dest bucket"), + new_name="dest object", ) - def tearDown(self): - self.publisher.delete_topic(request={"topic": self.topic_path}) - self.subscriber.delete_subscription(request={"subscription": self.subscription_path}) + spans = self.recorder.queued_spans() + + self.assertEqual(2, len(spans)) + self.assertIsNone(tracer.active_span) + + gcs_span = spans[0] + test_span = spans[1] + + self.assertTraceContextPropagated(test_span, gcs_span) - def test_subscribe(self): + self.assertEqual("gcs", gcs_span.n) + self.assertEqual(2, gcs_span.k) + self.assertIsNone(gcs_span.ec) - with tracer.start_active_span('test'): - # Publish a message - future = self.publisher.publish(self.topic_path, - b"Test Message to PubSub", - origin="instana") - self.assertIsInstance(future.result(), six.string_types) + self.assertEqual("objects.copy", gcs_span.data["gcs"]["op"]) + self.assertEqual("dest bucket", gcs_span.data["gcs"]["destinationBucket"]) + self.assertEqual("dest object", gcs_span.data["gcs"]["destinationObject"]) + self.assertEqual("src bucket", gcs_span.data["gcs"]["sourceBucket"]) + self.assertEqual("src object", gcs_span.data["gcs"]["sourceObject"]) - time.sleep(2.0) # for sanity + @patch("requests.Session.request") + def test_objects_delete(self, mock_requests): + mock_requests.return_value = self._mock_response() - # Subscribe to the subscription - callback_handler = AckCallback() - future = self.subscriber.subscribe(self.subscription_path, callback_handler) - timeout = 2.0 - try: - future.result(timeout) - except exceptions.TimeoutError: - future.cancel() + client = self._client( + credentials=AnonymousCredentials(), project="test-project" + ) + + with tracer.start_active_span("test"): + client.bucket("test bucket").blob("test object").delete() spans = self.recorder.queued_spans() - producer_span = spans[0] - consumer_span = spans[1] - test_span = spans[2] + self.assertEqual(2, len(spans)) + self.assertIsNone(tracer.active_span) + + gcs_span = spans[0] + test_span = spans[1] + + self.assertTraceContextPropagated(test_span, gcs_span) - self.assertEqual(3, len(spans)) + self.assertEqual("gcs", gcs_span.n) + self.assertEqual(2, gcs_span.k) + self.assertIsNone(gcs_span.ec) + + self.assertEqual("objects.delete", gcs_span.data["gcs"]["op"]) + self.assertEqual("test bucket", gcs_span.data["gcs"]["bucket"]) + self.assertEqual("test object", gcs_span.data["gcs"]["object"]) + + @patch("requests.Session.request") + def test_objects_attrs(self, mock_requests): + mock_requests.return_value = self._mock_response( + json_content={"kind": "storage#object"}, status_code=http_client.OK + ) + + client = self._client( + credentials=AnonymousCredentials(), project="test-project" + ) + + with tracer.start_active_span("test"): + client.bucket("test bucket").blob("test object").exists() + + spans = self.recorder.queued_spans() + + self.assertEqual(2, len(spans)) self.assertIsNone(tracer.active_span) - self.assertEqual('publish', producer_span.data['gcps']['op']) - self.assertEqual('consume', consumer_span.data['gcps']['op']) - self.assertEqual(self.topic_name, producer_span.data['gcps']['top']) - self.assertEqual(self.subscription_name, consumer_span.data['gcps']['sub']) - self.assertEqual(2, producer_span.k) # EXIT - self.assertEqual(1, consumer_span.k) # ENTRY + gcs_span = spans[0] + test_span = spans[1] + + self.assertTraceContextPropagated(test_span, gcs_span) + + self.assertEqual("gcs", gcs_span.n) + self.assertEqual(2, gcs_span.k) + self.assertIsNone(gcs_span.ec) + + self.assertEqual("objects.attrs", gcs_span.data["gcs"]["op"]) + self.assertEqual("test bucket", gcs_span.data["gcs"]["bucket"]) + self.assertEqual("test object", gcs_span.data["gcs"]["object"]) + + @patch("requests.Session.request") + def test_objects_get(self, mock_requests): + mock_requests.return_value = self._mock_response( + content=b"CONTENT", status_code=http_client.OK + ) + + client = self._client( + credentials=AnonymousCredentials(), project="test-project" + ) + + with tracer.start_active_span("test"): + client.bucket("test bucket").blob("test object").download_to_file( + io.BytesIO(), raw_download=True + ) + + spans = self.recorder.queued_spans() + + self.assertEqual(2, len(spans)) + self.assertIsNone(tracer.active_span) + + gcs_span = spans[0] + test_span = spans[1] + + self.assertTraceContextPropagated(test_span, gcs_span) + + self.assertEqual("gcs", gcs_span.n) + self.assertEqual(2, gcs_span.k) + self.assertIsNone(gcs_span.ec) + + self.assertEqual("objects.get", gcs_span.data["gcs"]["op"]) + self.assertEqual("test bucket", gcs_span.data["gcs"]["bucket"]) + self.assertEqual("test object", gcs_span.data["gcs"]["object"]) + + @patch("requests.Session.request") + def test_objects_insert(self, mock_requests): + mock_requests.return_value = self._mock_response( + json_content={"kind": "storage#object"}, status_code=http_client.OK + ) + + client = self._client( + credentials=AnonymousCredentials(), project="test-project" + ) + + with tracer.start_active_span("test"): + client.bucket("test bucket").blob("test object").upload_from_string( + "CONTENT" + ) + + spans = self.recorder.queued_spans() + + self.assertEqual(2, len(spans)) + self.assertIsNone(tracer.active_span) + + gcs_span = spans[0] + test_span = spans[1] + + self.assertTraceContextPropagated(test_span, gcs_span) + + self.assertEqual("gcs", gcs_span.n) + self.assertEqual(2, gcs_span.k) + self.assertIsNone(gcs_span.ec) + + self.assertEqual("objects.insert", gcs_span.data["gcs"]["op"]) + self.assertEqual("test bucket", gcs_span.data["gcs"]["bucket"]) + self.assertEqual("test object", gcs_span.data["gcs"]["object"]) + + @unittest.skipIf( + sys.platform == "darwin", reason="Raises not Implemented exception in OSX" + ) + @patch("requests.Session.request") + def test_objects_list(self, mock_requests): + mock_requests.return_value = self._mock_response( + json_content={"kind": "storage#object"}, status_code=http_client.OK + ) + + client = self._client( + credentials=AnonymousCredentials(), project="test-project" + ) + + with tracer.start_active_span("test"): + blobs = client.bucket("test bucket").list_blobs() + self.assertEqual( + 0, + self.recorder.queue_size(), + msg="span has been created before the actual request", + ) + + for b in blobs: + pass + + spans = self.recorder.queued_spans() + + self.assertEqual(2, len(spans)) + self.assertIsNone(tracer.active_span) + + gcs_span = spans[0] + test_span = spans[1] + + self.assertTraceContextPropagated(test_span, gcs_span) + + self.assertEqual("gcs", gcs_span.n) + self.assertEqual(2, gcs_span.k) + self.assertIsNone(gcs_span.ec) + + self.assertEqual("objects.list", gcs_span.data["gcs"]["op"]) + self.assertEqual("test bucket", gcs_span.data["gcs"]["bucket"]) + + @patch("requests.Session.request") + def test_objects_patch(self, mock_requests): + mock_requests.return_value = self._mock_response( + json_content={"kind": "storage#object"}, status_code=http_client.OK + ) + + client = self._client( + credentials=AnonymousCredentials(), project="test-project" + ) + + with tracer.start_active_span("test"): + client.bucket("test bucket").blob("test object").patch() + + spans = self.recorder.queued_spans() + + self.assertEqual(2, len(spans)) + self.assertIsNone(tracer.active_span) + + gcs_span = spans[0] + test_span = spans[1] + + self.assertTraceContextPropagated(test_span, gcs_span) + + self.assertEqual("gcs", gcs_span.n) + self.assertEqual(2, gcs_span.k) + self.assertIsNone(gcs_span.ec) + + self.assertEqual("objects.patch", gcs_span.data["gcs"]["op"]) + self.assertEqual("test bucket", gcs_span.data["gcs"]["bucket"]) + self.assertEqual("test object", gcs_span.data["gcs"]["object"]) + + @patch("requests.Session.request") + def test_objects_rewrite(self, mock_requests): + mock_requests.return_value = self._mock_response( + json_content={ + "kind": "storage#rewriteResponse", + "totalBytesRewritten": 0, + "objectSize": 0, + "done": True, + "resource": {}, + }, + status_code=http_client.OK, + ) + + client = self._client( + credentials=AnonymousCredentials(), project="test-project" + ) + + with tracer.start_active_span("test"): + client.bucket("dest bucket").blob("dest object").rewrite( + client.bucket("src bucket").blob("src object") + ) + + spans = self.recorder.queued_spans() + + self.assertEqual(2, len(spans)) + self.assertIsNone(tracer.active_span) + + gcs_span = spans[0] + test_span = spans[1] + + self.assertTraceContextPropagated(test_span, gcs_span) + + self.assertEqual("gcs", gcs_span.n) + self.assertEqual(2, gcs_span.k) + self.assertIsNone(gcs_span.ec) + + self.assertEqual("objects.rewrite", gcs_span.data["gcs"]["op"]) + self.assertEqual("dest bucket", gcs_span.data["gcs"]["destinationBucket"]) + self.assertEqual("dest object", gcs_span.data["gcs"]["destinationObject"]) + self.assertEqual("src bucket", gcs_span.data["gcs"]["sourceBucket"]) + self.assertEqual("src object", gcs_span.data["gcs"]["sourceObject"]) + + @patch("requests.Session.request") + def test_objects_update(self, mock_requests): + mock_requests.return_value = self._mock_response( + json_content={"kind": "storage#object"}, status_code=http_client.OK + ) + + client = self._client( + credentials=AnonymousCredentials(), project="test-project" + ) + + with tracer.start_active_span("test"): + client.bucket("test bucket").blob("test object").update() + + spans = self.recorder.queued_spans() + + self.assertEqual(2, len(spans)) + self.assertIsNone(tracer.active_span) + + gcs_span = spans[0] + test_span = spans[1] + + self.assertTraceContextPropagated(test_span, gcs_span) + + self.assertEqual("gcs", gcs_span.n) + self.assertEqual(2, gcs_span.k) + self.assertIsNone(gcs_span.ec) + + self.assertEqual("objects.update", gcs_span.data["gcs"]["op"]) + self.assertEqual("test bucket", gcs_span.data["gcs"]["bucket"]) + self.assertEqual("test object", gcs_span.data["gcs"]["object"]) + + @patch("requests.Session.request") + def test_default_acls_list(self, mock_requests): + mock_requests.return_value = self._mock_response( + json_content={"kind": "storage#objectAccessControls", "items": []}, + status_code=http_client.OK, + ) + + client = self._client( + credentials=AnonymousCredentials(), project="test-project" + ) + + with tracer.start_active_span("test"): + client.bucket("test bucket").default_object_acl.get_entities() + + spans = self.recorder.queued_spans() + + self.assertEqual(2, len(spans)) + self.assertIsNone(tracer.active_span) + + gcs_span = spans[0] + test_span = spans[1] + + self.assertTraceContextPropagated(test_span, gcs_span) + + self.assertEqual("gcs", gcs_span.n) + self.assertEqual(2, gcs_span.k) + self.assertIsNone(gcs_span.ec) + + self.assertEqual("defaultAcls.list", gcs_span.data["gcs"]["op"]) + self.assertEqual("test bucket", gcs_span.data["gcs"]["bucket"]) + + @patch("requests.Session.request") + def test_object_acls_list(self, mock_requests): + mock_requests.return_value = self._mock_response( + json_content={"kind": "storage#objectAccessControls", "items": []}, + status_code=http_client.OK, + ) + + client = self._client( + credentials=AnonymousCredentials(), project="test-project" + ) + + with tracer.start_active_span("test"): + client.bucket("test bucket").blob("test object").acl.get_entities() + + spans = self.recorder.queued_spans() + + self.assertEqual(2, len(spans)) + self.assertIsNone(tracer.active_span) + + gcs_span = spans[0] + test_span = spans[1] + + self.assertTraceContextPropagated(test_span, gcs_span) + + self.assertEqual("gcs", gcs_span.n) + self.assertEqual(2, gcs_span.k) + self.assertIsNone(gcs_span.ec) + + self.assertEqual("objectAcls.list", gcs_span.data["gcs"]["op"]) + self.assertEqual("test bucket", gcs_span.data["gcs"]["bucket"]) + self.assertEqual("test object", gcs_span.data["gcs"]["object"]) + + @patch("requests.Session.request") + def test_object_hmac_keys_create(self, mock_requests): + mock_requests.return_value = self._mock_response( + json_content={"kind": "storage#hmacKey", "metadata": {}, "secret": ""}, + status_code=http_client.OK, + ) + + client = self._client( + credentials=AnonymousCredentials(), project="test-project" + ) + + with tracer.start_active_span("test"): + client.create_hmac_key("test@example.com") + + spans = self.recorder.queued_spans() + + self.assertEqual(2, len(spans)) + self.assertIsNone(tracer.active_span) + + gcs_span = spans[0] + test_span = spans[1] + + self.assertTraceContextPropagated(test_span, gcs_span) + + self.assertEqual("gcs", gcs_span.n) + self.assertEqual(2, gcs_span.k) + self.assertIsNone(gcs_span.ec) + + self.assertEqual("hmacKeys.create", gcs_span.data["gcs"]["op"]) + self.assertEqual("test-project", gcs_span.data["gcs"]["projectId"]) + + @patch("requests.Session.request") + def test_object_hmac_keys_delete(self, mock_requests): + mock_requests.return_value = self._mock_response() + + client = self._client( + credentials=AnonymousCredentials(), project="test-project" + ) + + with tracer.start_active_span("test"): + key = storage.hmac_key.HMACKeyMetadata(client, access_id="test key") + key.state = storage.hmac_key.HMACKeyMetadata.INACTIVE_STATE + key.delete() + + spans = self.recorder.queued_spans() + + self.assertEqual(2, len(spans)) + self.assertIsNone(tracer.active_span) + + gcs_span = spans[0] + test_span = spans[1] + + self.assertTraceContextPropagated(test_span, gcs_span) + + self.assertEqual("gcs", gcs_span.n) + self.assertEqual(2, gcs_span.k) + self.assertIsNone(gcs_span.ec) + + self.assertEqual("hmacKeys.delete", gcs_span.data["gcs"]["op"]) + self.assertEqual("test-project", gcs_span.data["gcs"]["projectId"]) + self.assertEqual("test key", gcs_span.data["gcs"]["accessId"]) + + @patch("requests.Session.request") + def test_object_hmac_keys_get(self, mock_requests): + mock_requests.return_value = self._mock_response( + json_content={"kind": "storage#hmacKey", "metadata": {}, "secret": ""}, + status_code=http_client.OK, + ) + + client = self._client( + credentials=AnonymousCredentials(), project="test-project" + ) + + with tracer.start_active_span("test"): + storage.hmac_key.HMACKeyMetadata(client, access_id="test key").exists() + + spans = self.recorder.queued_spans() + + self.assertEqual(2, len(spans)) + self.assertIsNone(tracer.active_span) + + gcs_span = spans[0] + test_span = spans[1] + + self.assertTraceContextPropagated(test_span, gcs_span) + + self.assertEqual("gcs", gcs_span.n) + self.assertEqual(2, gcs_span.k) + self.assertIsNone(gcs_span.ec) + + self.assertEqual("hmacKeys.get", gcs_span.data["gcs"]["op"]) + self.assertEqual("test-project", gcs_span.data["gcs"]["projectId"]) + self.assertEqual("test key", gcs_span.data["gcs"]["accessId"]) + + @unittest.skipIf( + sys.platform == "darwin", reason="Raises not Implemented exception in OSX" + ) + @patch("requests.Session.request") + def test_object_hmac_keys_list(self, mock_requests): + mock_requests.return_value = self._mock_response( + json_content={"kind": "storage#hmacKeysMetadata", "items": []}, + status_code=http_client.OK, + ) + + client = self._client( + credentials=AnonymousCredentials(), project="test-project" + ) + + with tracer.start_active_span("test"): + keys = client.list_hmac_keys() + self.assertEqual( + 0, + self.recorder.queue_size(), + msg="span has been created before the actual request", + ) + + for k in keys: + pass + + spans = self.recorder.queued_spans() + + self.assertEqual(2, len(spans)) + self.assertIsNone(tracer.active_span) + + gcs_span = spans[0] + test_span = spans[1] + + self.assertTraceContextPropagated(test_span, gcs_span) + + self.assertEqual("gcs", gcs_span.n) + self.assertEqual(2, gcs_span.k) + self.assertIsNone(gcs_span.ec) + + self.assertEqual("hmacKeys.list", gcs_span.data["gcs"]["op"]) + self.assertEqual("test-project", gcs_span.data["gcs"]["projectId"]) + + @patch("requests.Session.request") + def test_object_hmac_keys_update(self, mock_requests): + mock_requests.return_value = self._mock_response( + json_content={"kind": "storage#hmacKey", "metadata": {}, "secret": ""}, + status_code=http_client.OK, + ) + + client = self._client( + credentials=AnonymousCredentials(), project="test-project" + ) + + with tracer.start_active_span("test"): + storage.hmac_key.HMACKeyMetadata(client, access_id="test key").update() + + spans = self.recorder.queued_spans() + + self.assertEqual(2, len(spans)) + self.assertIsNone(tracer.active_span) + + gcs_span = spans[0] + test_span = spans[1] + + self.assertTraceContextPropagated(test_span, gcs_span) + + self.assertEqual("gcs", gcs_span.n) + self.assertEqual(2, gcs_span.k) + self.assertIsNone(gcs_span.ec) + + self.assertEqual("hmacKeys.update", gcs_span.data["gcs"]["op"]) + self.assertEqual("test-project", gcs_span.data["gcs"]["projectId"]) + self.assertEqual("test key", gcs_span.data["gcs"]["accessId"]) + + @patch("requests.Session.request") + def test_object_hmac_keys_update(self, mock_requests): + mock_requests.return_value = self._mock_response( + json_content={ + "email_address": "test@example.com", + "kind": "storage#serviceAccount", + }, + status_code=http_client.OK, + ) + + client = self._client( + credentials=AnonymousCredentials(), project="test-project" + ) + + with tracer.start_active_span("test"): + client.get_service_account_email() + + spans = self.recorder.queued_spans() + + self.assertEqual(2, len(spans)) + self.assertIsNone(tracer.active_span) + + gcs_span = spans[0] + test_span = spans[1] + + self.assertTraceContextPropagated(test_span, gcs_span) + + self.assertEqual("gcs", gcs_span.n) + self.assertEqual(2, gcs_span.k) + self.assertIsNone(gcs_span.ec) + + self.assertEqual("serviceAccount.get", gcs_span.data["gcs"]["op"]) + self.assertEqual("test-project", gcs_span.data["gcs"]["projectId"]) + + @patch("requests.Session.request") + def test_batch_operation(self, mock_requests): + mock_requests.return_value = self._mock_response( + _TWO_PART_BATCH_RESPONSE, + status_code=http_client.OK, + headers={"content-type": 'multipart/mixed; boundary="DEADBEEF="'}, + ) + + client = self._client( + credentials=AnonymousCredentials(), project="test-project" + ) + bucket = client.bucket("test-bucket") + + with tracer.start_active_span("test"): + with client.batch(): + for obj in ["obj1", "obj2"]: + bucket.delete_blob(obj) + + spans = self.recorder.queued_spans() + + self.assertEqual(2, len(spans)) + + def _client(self, *args, **kwargs): + # override the HTTP client to bypass the authorization + kwargs["_http"] = kwargs.get("_http", requests.Session()) + kwargs["_http"].is_mtls = False + + return storage.Client(*args, **kwargs) + + def _mock_response( + self, + content=b"", + status_code=http_client.NO_CONTENT, + json_content=None, + headers={}, + ): + resp = Mock() + resp.status_code = status_code + resp.headers = headers + resp.content = content + resp.__enter__ = Mock(return_value=resp) + resp.__exit__ = Mock() + + if json_content is not None: + if resp.content == b"": + resp.content = json.dumps(json_content) + + resp.json = Mock(return_value=json_content) + + return resp + + +_TWO_PART_BATCH_RESPONSE = b"""\ +--DEADBEEF= +Content-Type: application/json +Content-ID: + +HTTP/1.1 204 No Content + +Content-Type: application/json; charset=UTF-8 +Content-Length: 0 + +--DEADBEEF= +Content-Type: application/json +Content-ID: + +HTTP/1.1 204 No Content - # Trace Context Propagation - self.assertTraceContextPropagated(producer_span, consumer_span) - self.assertTraceContextPropagated(test_span, producer_span) +Content-Type: application/json; charset=UTF-8 +Content-Length: 0 - # Error logging - self.assertErrorLogging(spans) +--DEADBEEF=-- +""" diff --git a/tests/requirements-310.txt b/tests/requirements-310.txt index 136802554..22514153f 100644 --- a/tests/requirements-310.txt +++ b/tests/requirements-310.txt @@ -9,7 +9,7 @@ flask>=2.3.2 markupsafe>=2.1.0 grpcio>=1.37.1 google-cloud-pubsub>=2.0.0 -google-cloud-storage<=2.14.0 +google-cloud-storage>=1.24.0 lxml>=4.9.2 mock>=4.0.3 moto>=4.1.2 diff --git a/tests/requirements.txt b/tests/requirements.txt index 1c2597a58..2310401c7 100644 --- a/tests/requirements.txt +++ b/tests/requirements.txt @@ -8,7 +8,7 @@ fastapi>=0.92.0 flask>=2.3.2 grpcio>=1.37.1 google-cloud-pubsub>=2.0.0 -google-cloud-storage<=2.14.0 +google-cloud-storage>=1.24.0 lxml>=4.9.2 mock>=4.0.3 moto>=4.1.2