Skip to content

Commit

Permalink
Firstore Async Instrumentation (#882)
Browse files Browse the repository at this point in the history
* Remove unnecessary instrumentation

* Simplify existing instrumentation

* Remove unnecessary settings lookups

* Client instrumentation

* Add query and aggregation query instrumentation

* Fix deprecation warning

* Simplify collection lookup

* Combine query test files

* Rename methods for clarity

* Instrument Firestore batching

* Add transaction instrumentation

* Consumer iterators on <=Py38

* Add async generator wrapper

* Allow better parallelization in firestore tests

* Fix issue in async generator wrapper

* Add async client instrumentation

* Squashed commit of the following:

commit 9d411e0
Author: Tim Pansino <timpansino@gmail.com>
Date:   Wed Jul 26 15:57:39 2023 -0700

    Clean out unnecessary code

commit cb550ba
Author: Tim Pansino <timpansino@gmail.com>
Date:   Wed Jul 26 14:27:01 2023 -0700

    Allow better parallelization in firestore tests

* Add async collection instrumentation

* Add async document instrumentation

* Async Query instrumentation

* Add async batch instrumentation

* Add instrumentation for AsyncTransaction

* Squashed commit of the following:

commit c836f8f
Author: TimPansino <TimPansino@users.noreply.github.com>
Date:   Thu Jul 27 19:54:35 2023 +0000

    [Mega-Linter] Apply linters fixes

commit 02a55a1
Author: Tim Pansino <timpansino@gmail.com>
Date:   Thu Jul 27 12:46:46 2023 -0700

    Add collection group instrumentation

commit ab1f4ff
Author: Tim Pansino <timpansino@gmail.com>
Date:   Thu Jul 27 12:00:33 2023 -0700

    Better parallelization safeguards

commit fa5f39a
Author: TimPansino <TimPansino@users.noreply.github.com>
Date:   Wed Jul 26 22:59:11 2023 +0000

    [Mega-Linter] Apply linters fixes

commit 9d411e0
Author: Tim Pansino <timpansino@gmail.com>
Date:   Wed Jul 26 15:57:39 2023 -0700

    Clean out unnecessary code

commit cb550ba
Author: Tim Pansino <timpansino@gmail.com>
Date:   Wed Jul 26 14:27:01 2023 -0700

    Allow better parallelization in firestore tests

* Remove reset_firestore

* Re-merge of test_query

* Use public API imports

* Add async collection group instrumentation

* Refactor exercise functions to fixtures

* Squashed commit of the following:

commit 09c5e11
Author: Tim Pansino <timpansino@gmail.com>
Date:   Wed Aug 2 14:33:24 2023 -0700

    Add testing for automatic and manual asyncwrappers

commit fc3ef6b
Author: Tim Pansino <timpansino@gmail.com>
Date:   Wed Aug 2 14:33:05 2023 -0700

    Add async wrapper argument to all trace APIs

commit 479f9e2
Merge: faf3ccc edd1f94
Author: Tim Pansino <timpansino@gmail.com>
Date:   Wed Aug 2 13:44:24 2023 -0700

    Merge remote-tracking branch 'origin/develop-google-firestore-instrumentation' into feature-async-wrapper-argument

commit edd1f94
Author: Timothy Pansino <11214426+TimPansino@users.noreply.github.com>
Date:   Wed Aug 2 13:40:51 2023 -0700

    Async Generator Wrapper (#884)

    * Add async generator wrapper

    * Add no harm test

    * Remove anext calls

    * Add graphql traces to decorator testing

    * Remove pypy generator gc logic

    ---------

    Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>

commit faf3ccc
Author: Tim Pansino <timpansino@gmail.com>
Date:   Mon Jul 31 15:10:56 2023 -0700

    Add async_wrapper to datastore_trace api

* Remove custom wrapper code from firestore

* Undo wrapper edits

---------

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
TimPansino and mergify[bot] authored Aug 9, 2023
1 parent adcee3f commit e835758
Show file tree
Hide file tree
Showing 11 changed files with 788 additions and 24 deletions.
35 changes: 35 additions & 0 deletions newrelic/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2279,31 +2279,61 @@ def _process_module_builtin_defaults():
"newrelic.hooks.datastore_firestore",
"instrument_google_cloud_firestore_v1_client",
)
_process_module_definition(
"google.cloud.firestore_v1.async_client",
"newrelic.hooks.datastore_firestore",
"instrument_google_cloud_firestore_v1_async_client",
)
_process_module_definition(
"google.cloud.firestore_v1.document",
"newrelic.hooks.datastore_firestore",
"instrument_google_cloud_firestore_v1_document",
)
_process_module_definition(
"google.cloud.firestore_v1.async_document",
"newrelic.hooks.datastore_firestore",
"instrument_google_cloud_firestore_v1_async_document",
)
_process_module_definition(
"google.cloud.firestore_v1.collection",
"newrelic.hooks.datastore_firestore",
"instrument_google_cloud_firestore_v1_collection",
)
_process_module_definition(
"google.cloud.firestore_v1.async_collection",
"newrelic.hooks.datastore_firestore",
"instrument_google_cloud_firestore_v1_async_collection",
)
_process_module_definition(
"google.cloud.firestore_v1.query",
"newrelic.hooks.datastore_firestore",
"instrument_google_cloud_firestore_v1_query",
)
_process_module_definition(
"google.cloud.firestore_v1.async_query",
"newrelic.hooks.datastore_firestore",
"instrument_google_cloud_firestore_v1_async_query",
)
_process_module_definition(
"google.cloud.firestore_v1.aggregation",
"newrelic.hooks.datastore_firestore",
"instrument_google_cloud_firestore_v1_aggregation",
)
_process_module_definition(
"google.cloud.firestore_v1.async_aggregation",
"newrelic.hooks.datastore_firestore",
"instrument_google_cloud_firestore_v1_async_aggregation",
)
_process_module_definition(
"google.cloud.firestore_v1.batch",
"newrelic.hooks.datastore_firestore",
"instrument_google_cloud_firestore_v1_batch",
)
_process_module_definition(
"google.cloud.firestore_v1.async_batch",
"newrelic.hooks.datastore_firestore",
"instrument_google_cloud_firestore_v1_async_batch",
)
_process_module_definition(
"google.cloud.firestore_v1.bulk_batch",
"newrelic.hooks.datastore_firestore",
Expand All @@ -2314,6 +2344,11 @@ def _process_module_builtin_defaults():
"newrelic.hooks.datastore_firestore",
"instrument_google_cloud_firestore_v1_transaction",
)
_process_module_definition(
"google.cloud.firestore_v1.async_transaction",
"newrelic.hooks.datastore_firestore",
"instrument_google_cloud_firestore_v1_async_transaction",
)

_process_module_definition(
"ariadne.asgi",
Expand Down
121 changes: 99 additions & 22 deletions newrelic/hooks/datastore_firestore.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,30 +12,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from newrelic.api.datastore_trace import DatastoreTrace, wrap_datastore_trace
from newrelic.api.datastore_trace import wrap_datastore_trace
from newrelic.api.function_trace import wrap_function_trace
from newrelic.common.async_wrapper import generator_wrapper
from newrelic.common.object_wrapper import wrap_function_wrapper
from newrelic.common.async_wrapper import generator_wrapper, async_generator_wrapper


_get_object_id = lambda obj, *args, **kwargs: getattr(obj, "id", None)
_get_parent_id = lambda obj, *args, **kwargs: getattr(getattr(obj, "_parent", None), "id", None)
_get_collection_ref_id = lambda obj, *args, **kwargs: getattr(getattr(obj, "_collection_ref", None), "id", None)


def wrap_generator_method(module, class_name, method_name, target):
def _wrapper(wrapped, instance, args, kwargs):
target_ = target(instance) if callable(target) else target
trace = DatastoreTrace(product="Firestore", target=target_, operation=method_name)
wrapped = generator_wrapper(wrapped, trace)
return wrapped(*args, **kwargs)

class_ = getattr(module, class_name)
if class_ is not None:
if hasattr(class_, method_name):
wrap_function_wrapper(module, "%s.%s" % (class_name, method_name), _wrapper)


def instrument_google_cloud_firestore_v1_base_client(module):
rollup = ("Datastore/all", "Datastore/Firestore/all")
wrap_function_trace(
Expand All @@ -48,7 +34,15 @@ def instrument_google_cloud_firestore_v1_client(module):
class_ = module.Client
for method in ("collections", "get_all"):
if hasattr(class_, method):
wrap_generator_method(module, "Client", method, target=None)
wrap_datastore_trace(module, "Client.%s" % method, operation=method, product="Firestore", target=None, async_wrapper=generator_wrapper)


def instrument_google_cloud_firestore_v1_async_client(module):
if hasattr(module, "AsyncClient"):
class_ = module.AsyncClient
for method in ("collections", "get_all"):
if hasattr(class_, method):
wrap_datastore_trace(module, "AsyncClient.%s" % method, operation=method, product="Firestore", target=None, async_wrapper=async_generator_wrapper)


def instrument_google_cloud_firestore_v1_collection(module):
Expand All @@ -66,7 +60,21 @@ def instrument_google_cloud_firestore_v1_collection(module):

for method in ("stream", "list_documents"):
if hasattr(class_, method):
wrap_generator_method(module, "CollectionReference", method, target=_get_object_id)
wrap_datastore_trace(module, "CollectionReference.%s" % method, operation=method, product="Firestore", target=_get_object_id, async_wrapper=generator_wrapper)


def instrument_google_cloud_firestore_v1_async_collection(module):
if hasattr(module, "AsyncCollectionReference"):
class_ = module.AsyncCollectionReference
for method in ("add", "get"):
if hasattr(class_, method):
wrap_datastore_trace(
module, "AsyncCollectionReference.%s" % method, product="Firestore", target=_get_object_id, operation=method
)

for method in ("stream", "list_documents"):
if hasattr(class_, method):
wrap_datastore_trace(module, "AsyncCollectionReference.%s" % method, operation=method, product="Firestore", target=_get_object_id, async_wrapper=async_generator_wrapper)


def instrument_google_cloud_firestore_v1_document(module):
Expand All @@ -84,7 +92,21 @@ def instrument_google_cloud_firestore_v1_document(module):

for method in ("collections",):
if hasattr(class_, method):
wrap_generator_method(module, "DocumentReference", method, target=_get_object_id)
wrap_datastore_trace(module, "DocumentReference.%s" % method, operation=method, product="Firestore", target=_get_object_id, async_wrapper=generator_wrapper)


def instrument_google_cloud_firestore_v1_async_document(module):
if hasattr(module, "AsyncDocumentReference"):
class_ = module.AsyncDocumentReference
for method in ("create", "delete", "get", "set", "update"):
if hasattr(class_, method):
wrap_datastore_trace(
module, "AsyncDocumentReference.%s" % method, product="Firestore", target=_get_object_id, operation=method
)

for method in ("collections",):
if hasattr(class_, method):
wrap_datastore_trace(module, "AsyncDocumentReference.%s" % method, operation=method, product="Firestore", target=_get_object_id, async_wrapper=async_generator_wrapper)


def instrument_google_cloud_firestore_v1_query(module):
Expand All @@ -98,13 +120,33 @@ def instrument_google_cloud_firestore_v1_query(module):

for method in ("stream",):
if hasattr(class_, method):
wrap_generator_method(module, "Query", method, target=_get_parent_id)
wrap_datastore_trace(module, "Query.%s" % method, operation=method, product="Firestore", target=_get_parent_id, async_wrapper=generator_wrapper)

if hasattr(module, "CollectionGroup"):
class_ = module.CollectionGroup
for method in ("get_partitions",):
if hasattr(class_, method):
wrap_generator_method(module, "CollectionGroup", method, target=_get_parent_id)
wrap_datastore_trace(module, "CollectionGroup.%s" % method, operation=method, product="Firestore", target=_get_parent_id, async_wrapper=generator_wrapper)


def instrument_google_cloud_firestore_v1_async_query(module):
if hasattr(module, "AsyncQuery"):
class_ = module.AsyncQuery
for method in ("get",):
if hasattr(class_, method):
wrap_datastore_trace(
module, "AsyncQuery.%s" % method, product="Firestore", target=_get_parent_id, operation=method
)

for method in ("stream",):
if hasattr(class_, method):
wrap_datastore_trace(module, "AsyncQuery.%s" % method, operation=method, product="Firestore", target=_get_parent_id, async_wrapper=async_generator_wrapper)

if hasattr(module, "AsyncCollectionGroup"):
class_ = module.AsyncCollectionGroup
for method in ("get_partitions",):
if hasattr(class_, method):
wrap_datastore_trace(module, "AsyncCollectionGroup.%s" % method, operation=method, product="Firestore", target=_get_parent_id, async_wrapper=async_generator_wrapper)


def instrument_google_cloud_firestore_v1_aggregation(module):
Expand All @@ -122,7 +164,21 @@ def instrument_google_cloud_firestore_v1_aggregation(module):

for method in ("stream",):
if hasattr(class_, method):
wrap_generator_method(module, "AggregationQuery", method, target=_get_collection_ref_id)
wrap_datastore_trace(module, "AggregationQuery.%s" % method, operation=method, product="Firestore", target=_get_collection_ref_id, async_wrapper=generator_wrapper)


def instrument_google_cloud_firestore_v1_async_aggregation(module):
if hasattr(module, "AsyncAggregationQuery"):
class_ = module.AsyncAggregationQuery
for method in ("get",):
if hasattr(class_, method):
wrap_datastore_trace(
module, "AsyncAggregationQuery.%s" % method, product="Firestore", target=_get_collection_ref_id, operation=method
)

for method in ("stream",):
if hasattr(class_, method):
wrap_datastore_trace(module, "AsyncAggregationQuery.%s" % method, operation=method, product="Firestore", target=_get_collection_ref_id, async_wrapper=async_generator_wrapper)


def instrument_google_cloud_firestore_v1_batch(module):
Expand All @@ -135,6 +191,16 @@ def instrument_google_cloud_firestore_v1_batch(module):
)


def instrument_google_cloud_firestore_v1_async_batch(module):
if hasattr(module, "AsyncWriteBatch"):
class_ = module.AsyncWriteBatch
for method in ("commit",):
if hasattr(class_, method):
wrap_datastore_trace(
module, "AsyncWriteBatch.%s" % method, product="Firestore", target=None, operation=method
)


def instrument_google_cloud_firestore_v1_bulk_batch(module):
if hasattr(module, "BulkWriteBatch"):
class_ = module.BulkWriteBatch
Expand All @@ -154,3 +220,14 @@ def instrument_google_cloud_firestore_v1_transaction(module):
wrap_datastore_trace(
module, "Transaction.%s" % method, product="Firestore", target=None, operation=operation
)


def instrument_google_cloud_firestore_v1_async_transaction(module):
if hasattr(module, "AsyncTransaction"):
class_ = module.AsyncTransaction
for method in ("_commit", "_rollback"):
if hasattr(class_, method):
operation = method[1:] # Trim leading underscore
wrap_datastore_trace(
module, "AsyncTransaction.%s" % method, product="Firestore", target=None, operation=operation
)
38 changes: 38 additions & 0 deletions tests/datastore_firestore/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,12 @@
import uuid

import pytest

from google.cloud.firestore import Client
from google.cloud.firestore import Client, AsyncClient

from testing_support.db_settings import firestore_settings
from testing_support.fixture.event_loop import event_loop as loop # noqa: F401; pylint: disable=W0611
from testing_support.fixtures import ( # noqa: F401; pylint: disable=W0611
collector_agent_registration_fixture,
collector_available_fixture,
Expand Down Expand Up @@ -62,6 +66,20 @@ def collection(client):
client.recursive_delete(collection_)


@pytest.fixture(scope="session")
def async_client(loop):
os.environ["FIRESTORE_EMULATOR_HOST"] = "%s:%d" % (FIRESTORE_HOST, FIRESTORE_PORT)
client = AsyncClient()
loop.run_until_complete(client.collection("healthcheck").document("healthcheck").set({}, retry=None, timeout=5)) # Ensure connection is available
return client


@pytest.fixture(scope="function")
def async_collection(async_client, collection):
# Use the same collection name as the collection fixture
yield async_client.collection(collection.id)


@pytest.fixture(scope="session")
def assert_trace_for_generator():
def _assert_trace_for_generator(generator_func, *args, **kwargs):
Expand All @@ -76,3 +94,23 @@ def _assert_trace_for_generator(generator_func, *args, **kwargs):
assert current_trace() is txn # Generator trace has exited.

return _assert_trace_for_generator


@pytest.fixture(scope="session")
def assert_trace_for_async_generator(loop):
def _assert_trace_for_async_generator(generator_func, *args, **kwargs):
_trace_check = []
txn = current_trace()
assert not isinstance(txn, DatastoreTrace)

async def coro():
# Check for generator trace on collections
async for _ in generator_func(*args, **kwargs):
_trace_check.append(isinstance(current_trace(), DatastoreTrace))

loop.run_until_complete(coro())

assert _trace_check and all(_trace_check) # All checks are True, and at least 1 is present.
assert current_trace() is txn # Generator trace has exited.

return _assert_trace_for_async_generator
56 changes: 56 additions & 0 deletions tests/datastore_firestore/test_async_batching.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Copyright 2010 New Relic, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import pytest

from testing_support.validators.validate_transaction_metrics import validate_transaction_metrics
from newrelic.api.background_task import background_task
from testing_support.validators.validate_database_duration import (
validate_database_duration,
)


@pytest.fixture()
def exercise_async_write_batch(async_client, async_collection):
async def _exercise_async_write_batch():
docs = [async_collection.document(str(x)) for x in range(1, 4)]
async_batch = async_client.batch()
for doc in docs:
async_batch.set(doc, {})

await async_batch.commit()
return _exercise_async_write_batch


def test_firestore_async_write_batch(loop, exercise_async_write_batch):
_test_scoped_metrics = [
("Datastore/operation/Firestore/commit", 1),
]

_test_rollup_metrics = [
("Datastore/all", 1),
("Datastore/allOther", 1),
]
@validate_database_duration()
@validate_transaction_metrics(
"test_firestore_async_write_batch",
scoped_metrics=_test_scoped_metrics,
rollup_metrics=_test_rollup_metrics,
background_task=True,
)
@background_task(name="test_firestore_async_write_batch")
def _test():
loop.run_until_complete(exercise_async_write_batch())

_test()
Loading

0 comments on commit e835758

Please sign in to comment.