Skip to content

Commit

Permalink
Add context manager tests to db modules that conform to PEP-249
Browse files Browse the repository at this point in the history
Signed-off-by: Varsha GS <varsha.gs@ibm.com>

remove result

Signed-off-by: Varsha GS <varsha.gs@ibm.com>
  • Loading branch information
GSVarsha committed Aug 31, 2023
1 parent 4b2d90b commit 32818ce
Show file tree
Hide file tree
Showing 3 changed files with 227 additions and 0 deletions.
77 changes: 77 additions & 0 deletions tests/clients/test_mysqlclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,3 +217,80 @@ def test_error_capture(self):
self.assertEqual(db_span.data["mysql"]["stmt"], 'SELECT * from blah')
self.assertEqual(db_span.data["mysql"]["host"], testenv['mysql_host'])
self.assertEqual(db_span.data["mysql"]["port"], testenv['mysql_port'])

def test_connect_cursor_ctx_mgr(self):
with tracer.start_active_span("test"):
with self.db as connection:
with connection.cursor() as cursor:
cursor.execute("""SELECT * from users""")

spans = self.recorder.queued_spans()
self.assertEqual(2, len(spans))

db_span = spans[0]
test_span = spans[1]

self.assertEqual("test", test_span.data["sdk"]["name"])
self.assertEqual(test_span.t, db_span.t)
self.assertEqual(db_span.p, test_span.s)

self.assertIsNone(db_span.ec)

self.assertEqual(db_span.n, "mysql")
self.assertEqual(db_span.data["mysql"]["db"], testenv["mysql_db"])
self.assertEqual(db_span.data["mysql"]["user"], testenv["mysql_user"])
self.assertEqual(db_span.data["mysql"]["stmt"], "SELECT * from users")
self.assertEqual(db_span.data["mysql"]["host"], testenv["mysql_host"])
self.assertEqual(db_span.data["mysql"]["port"], testenv["mysql_port"])

def test_connect_ctx_mgr(self):
with tracer.start_active_span("test"):
with self.db as connection:
cursor = connection.cursor()
cursor.execute("""SELECT * from users""")


spans = self.recorder.queued_spans()
self.assertEqual(2, len(spans))

db_span = spans[0]
test_span = spans[1]

self.assertEqual("test", test_span.data["sdk"]["name"])
self.assertEqual(test_span.t, db_span.t)
self.assertEqual(db_span.p, test_span.s)

self.assertIsNone(db_span.ec)

self.assertEqual(db_span.n, "mysql")
self.assertEqual(db_span.data["mysql"]["db"], testenv["mysql_db"])
self.assertEqual(db_span.data["mysql"]["user"], testenv["mysql_user"])
self.assertEqual(db_span.data["mysql"]["stmt"], "SELECT * from users")
self.assertEqual(db_span.data["mysql"]["host"], testenv["mysql_host"])
self.assertEqual(db_span.data["mysql"]["port"], testenv["mysql_port"])

def test_cursor_ctx_mgr(self):
with tracer.start_active_span("test"):
connection = self.db
with connection.cursor() as cursor:
cursor.execute("""SELECT * from users""")


spans = self.recorder.queued_spans()
self.assertEqual(2, len(spans))

db_span = spans[0]
test_span = spans[1]

self.assertEqual("test", test_span.data["sdk"]["name"])
self.assertEqual(test_span.t, db_span.t)
self.assertEqual(db_span.p, test_span.s)

self.assertIsNone(db_span.ec)

self.assertEqual(db_span.n, "mysql")
self.assertEqual(db_span.data["mysql"]["db"], testenv["mysql_db"])
self.assertEqual(db_span.data["mysql"]["user"], testenv["mysql_user"])
self.assertEqual(db_span.data["mysql"]["stmt"], "SELECT * from users")
self.assertEqual(db_span.data["mysql"]["host"], testenv["mysql_host"])
self.assertEqual(db_span.data["mysql"]["port"], testenv["mysql_port"])
74 changes: 74 additions & 0 deletions tests/clients/test_psycopg2.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,3 +248,77 @@ def test_register_type(self):
ext.register_type(ext.UUID, self.cursor)
ext.register_type(ext.UUIDARRAY, self.cursor)

def test_connect_cursor_ctx_mgr(self):
with tracer.start_active_span("test"):
with self.db as connection:
with connection.cursor() as cursor:
cursor.execute("""SELECT * from users""")

spans = self.recorder.queued_spans()
self.assertEqual(2, len(spans))

db_span = spans[0]
test_span = spans[1]

self.assertEqual("test", test_span.data["sdk"]["name"])
self.assertEqual(test_span.t, db_span.t)
self.assertEqual(db_span.p, test_span.s)

self.assertIsNone(db_span.ec)

self.assertEqual(db_span.n, "postgres")
self.assertEqual(db_span.data["pg"]["db"], testenv["postgresql_db"])
self.assertEqual(db_span.data["pg"]["user"], testenv["postgresql_user"])
self.assertEqual(db_span.data["pg"]["stmt"], "SELECT * from users")
self.assertEqual(db_span.data["pg"]["host"], testenv["postgresql_host"])
self.assertEqual(db_span.data["pg"]["port"], testenv["postgresql_port"])

def test_connect_ctx_mgr(self):
with tracer.start_active_span("test"):
with self.db as connection:
cursor = connection.cursor()
cursor.execute("""SELECT * from users""")

spans = self.recorder.queued_spans()
self.assertEqual(2, len(spans))

db_span = spans[0]
test_span = spans[1]

self.assertEqual("test", test_span.data["sdk"]["name"])
self.assertEqual(test_span.t, db_span.t)
self.assertEqual(db_span.p, test_span.s)

self.assertIsNone(db_span.ec)

self.assertEqual(db_span.n, "postgres")
self.assertEqual(db_span.data["pg"]["db"], testenv["postgresql_db"])
self.assertEqual(db_span.data["pg"]["user"], testenv["postgresql_user"])
self.assertEqual(db_span.data["pg"]["stmt"], "SELECT * from users")
self.assertEqual(db_span.data["pg"]["host"], testenv["postgresql_host"])
self.assertEqual(db_span.data["pg"]["port"], testenv["postgresql_port"])

def test_cursor_ctx_mgr(self):
with tracer.start_active_span("test"):
connection = self.db
with connection.cursor() as cursor:
cursor.execute("""SELECT * from users""")

spans = self.recorder.queued_spans()
self.assertEqual(2, len(spans))

db_span = spans[0]
test_span = spans[1]

self.assertEqual("test", test_span.data["sdk"]["name"])
self.assertEqual(test_span.t, db_span.t)
self.assertEqual(db_span.p, test_span.s)

self.assertIsNone(db_span.ec)

self.assertEqual(db_span.n, "postgres")
self.assertEqual(db_span.data["pg"]["db"], testenv["postgresql_db"])
self.assertEqual(db_span.data["pg"]["user"], testenv["postgresql_user"])
self.assertEqual(db_span.data["pg"]["stmt"], "SELECT * from users")
self.assertEqual(db_span.data["pg"]["host"], testenv["postgresql_host"])
self.assertEqual(db_span.data["pg"]["port"], testenv["postgresql_port"])
76 changes: 76 additions & 0 deletions tests/clients/test_pymysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,3 +243,79 @@ def test_error_capture(self):
self.assertEqual(db_span.data["mysql"]["stmt"], 'SELECT * from blah')
self.assertEqual(db_span.data["mysql"]["host"], testenv['mysql_host'])
self.assertEqual(db_span.data["mysql"]["port"], testenv['mysql_port'])

def test_connect_cursor_ctx_mgr(self):
with tracer.start_active_span("test"):
with self.db as connection:
with connection.cursor() as cursor:
cursor.execute("""SELECT * from users""")

spans = self.recorder.queued_spans()
self.assertEqual(2, len(spans))

db_span = spans[0]
test_span = spans[1]

self.assertEqual("test", test_span.data["sdk"]["name"])
self.assertEqual(test_span.t, db_span.t)
self.assertEqual(db_span.p, test_span.s)

self.assertIsNone(db_span.ec)

self.assertEqual(db_span.n, "mysql")
self.assertEqual(db_span.data["mysql"]["db"], testenv["mysql_db"])
self.assertEqual(db_span.data["mysql"]["user"], testenv["mysql_user"])
self.assertEqual(db_span.data["mysql"]["stmt"], "SELECT * from users")
self.assertEqual(db_span.data["mysql"]["host"], testenv["mysql_host"])
self.assertEqual(db_span.data["mysql"]["port"], testenv["mysql_port"])

def test_connect_ctx_mgr(self):
with tracer.start_active_span("test"):
with self.db as connection:
cursor = connection.cursor()
cursor.execute("""SELECT * from users""")

spans = self.recorder.queued_spans()
self.assertEqual(2, len(spans))

db_span = spans[0]
test_span = spans[1]

self.assertEqual("test", test_span.data["sdk"]["name"])
self.assertEqual(test_span.t, db_span.t)
self.assertEqual(db_span.p, test_span.s)

self.assertIsNone(db_span.ec)

self.assertEqual(db_span.n, "mysql")
self.assertEqual(db_span.data["mysql"]["db"], testenv["mysql_db"])
self.assertEqual(db_span.data["mysql"]["user"], testenv["mysql_user"])
self.assertEqual(db_span.data["mysql"]["stmt"], "SELECT * from users")
self.assertEqual(db_span.data["mysql"]["host"], testenv["mysql_host"])
self.assertEqual(db_span.data["mysql"]["port"], testenv["mysql_port"])

def test_cursor_ctx_mgr(self):
with tracer.start_active_span("test"):
connection = self.db
with connection.cursor() as cursor:
cursor.execute("""SELECT * from users""")


spans = self.recorder.queued_spans()
self.assertEqual(2, len(spans))

db_span = spans[0]
test_span = spans[1]

self.assertEqual("test", test_span.data["sdk"]["name"])
self.assertEqual(test_span.t, db_span.t)
self.assertEqual(db_span.p, test_span.s)

self.assertIsNone(db_span.ec)

self.assertEqual(db_span.n, "mysql")
self.assertEqual(db_span.data["mysql"]["db"], testenv["mysql_db"])
self.assertEqual(db_span.data["mysql"]["user"], testenv["mysql_user"])
self.assertEqual(db_span.data["mysql"]["stmt"], "SELECT * from users")
self.assertEqual(db_span.data["mysql"]["host"], testenv["mysql_host"])
self.assertEqual(db_span.data["mysql"]["port"], testenv["mysql_port"])

0 comments on commit 32818ce

Please sign in to comment.