Skip to content

Commit

Permalink
Add mysqldb/pymysql wrappers (#351)
Browse files Browse the repository at this point in the history
* Add mysqldb/pymysql wrappers

Closes #345

* Add tests for mysql

* Install mysql-devel to make build happy

* Remove empty run directive
  • Loading branch information
kolanos authored Aug 20, 2019
1 parent 58f1969 commit 32bf272
Show file tree
Hide file tree
Showing 9 changed files with 311 additions and 61 deletions.
1 change: 1 addition & 0 deletions Dockerfile.python2.7
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
FROM lambci/lambda:build-python2.7

RUN yum -y install mysql-devel
RUN pip install -U pip setuptools

RUN mkdir -p /var/lib/iopipe
Expand Down
1 change: 1 addition & 0 deletions Dockerfile.python3.6
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
FROM lambci/lambda:build-python3.6

RUN yum -y install mysql-devel
RUN pip install -U pip setuptools

RUN mkdir -p /var/lib/iopipe
Expand Down
1 change: 1 addition & 0 deletions Dockerfile.python3.7
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
FROM lambci/lambda:build-python3.7

RUN yum -y install mysql-devel
RUN pip install -U pip setuptools

RUN mkdir -p /var/lib/iopipe
Expand Down
238 changes: 181 additions & 57 deletions iopipe/contrib/trace/auto_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,35 @@
)


def collect_mysql_metrics(context, trace, instance, args):
connection = instance.connection_proxy

db = connection.extract_db
hostname = connection.extract_hostname
port = connection.extract_port

query = args[0]
command = query.split()[0].lower()
table = table_name(query, command)

request = Request(
command=ensure_utf8(command),
key=None,
hostname=ensure_utf8(hostname),
port=ensure_utf8(port),
connectionName=None,
db=ensure_utf8(db),
table=ensure_utf8(table),
)
request = request._asdict()
context.iopipe.mark.db_trace(trace, "mysql", request)


def collect_psycopg2_metrics(context, trace, instance):
from psycopg2.extensions import parse_dsn
try:
from psycopg2.extensions import parse_dsn
except ImportError: # pragma: no cover
from .dbapi import parse_dsn

connection = instance.connection_proxy
dsn = parse_dsn(connection.dsn)
Expand Down Expand Up @@ -114,44 +141,45 @@ def collect_redis_metrics(context, trace, args, connection):
context.iopipe.mark.db_trace(trace, "redis", request)


def patch_pymongo(context):
def patch_mysqldb(context):
"""
Monkey patches pymongo client, if available. Overloads the
query methods to add tracing and metrics collection.
Monkey patches mysqldb client, if available. Overloads the
execute method to add tracing and metrics collection.
"""

def wrapper(wrapped, instance, args, kwargs):
if not hasattr(context, "iopipe") or not hasattr(
context.iopipe, "mark"
): # pragma: no cover
return wrapped(*args, **kwargs)
class _CursorProxy(CursorProxy):
def execute(self, *args, **kwargs):
if not hasattr(context, "iopipe") or not hasattr(
context.iopipe, "mark"
): # pragma: no cover
self.__wrapped__.execute(*args, **kwargs)
return

id = ensure_utf8(str(uuid.uuid4()))
with context.iopipe.mark(id):
response = wrapped(*args, **kwargs)
trace = context.iopipe.mark.measure(id)
context.iopipe.mark.delete(id)
collect_pymongo_metrics(context, trace, instance, response)
return response
id = ensure_utf8(str(uuid.uuid4()))
with context.iopipe.mark(id):
self.__wrapped__.execute(*args, **kwargs)
trace = context.iopipe.mark.measure(id)
context.iopipe.mark.delete(id)
collect_mysql_metrics(context, trace, self, args)

try:
wrapt.wrap_function_wrapper("pymongo.collection", "Collection.find", wrapper)
except Exception: # pragma: no cover
pass
else:
for class_method in (
"bulk_write",
"delete_many",
"delete_one",
"insert_many",
"insert_one",
"replace_one",
"update_many",
"update_one",
):
wrapt.wrap_function_wrapper(
"pymongo.collection", "Collection.%s" % class_method, wrapper
)
class _ConnectionProxy(ConnectionProxy):
def cursor(self, *args, **kwargs):
cursor = self.__wrapped__.cursor(*args, **kwargs)
return _CursorProxy(cursor, self)

def connect_wrapper(wrapped, instance, args, kwargs):
connection = wrapped(*args, **kwargs)
return _ConnectionProxy(connection, args, kwargs)

for module, attr, wrapper in [
("MySQLdb", "connect", connect_wrapper),
("MySQLdb", "Connection", connect_wrapper),
("MySQLdb", "Connect", connect_wrapper),
]:
try:
wrapt.wrap_function_wrapper(module, attr, wrapper)
except Exception: # pragma: no cover
pass


def patch_psycopg2(context):
Expand All @@ -160,7 +188,7 @@ def patch_psycopg2(context):
execute method to add tracing and metrics collection.
"""

class PGCursorProxy(CursorProxy):
class _CursorProxy(CursorProxy):
def execute(self, *args, **kwargs):
if not hasattr(context, "iopipe") or not hasattr(
context.iopipe, "mark"
Expand All @@ -175,18 +203,18 @@ def execute(self, *args, **kwargs):
context.iopipe.mark.delete(id)
collect_psycopg2_metrics(context, trace, self)

class PGConnectionProxy(ConnectionProxy):
class _ConnectionProxy(ConnectionProxy):
def cursor(self, *args, **kwargs):
cursor = self.__wrapped__.cursor(*args, **kwargs)
return PGCursorProxy(cursor, self)
return _CursorProxy(cursor, self)

def adapt_wrapper(wrapped, instance, args, kwargs):
adapter = wrapped(*args, **kwargs)
return AdapterProxy(adapter) if hasattr(adapter, "prepare") else adapter

def connect_wrapper(wrapped, instance, args, kwargs):
connection = wrapped(*args, **kwargs)
return PGConnectionProxy(connection, args, kwargs)
return _ConnectionProxy(connection, args, kwargs)

def register_type_wrapper(wrapped, instance, args, kwargs):
def _extract_arguments(obj, scope=None):
Expand All @@ -201,21 +229,90 @@ def _extract_arguments(obj, scope=None):

return wrapped(obj)

for module, attr, wrapper in [
("psycopg2", "connect", connect_wrapper),
("psycopg2.extensions", "adapt", adapt_wrapper),
("psycopg2.extensions", "register_type", register_type_wrapper),
("psycopg2._psycopg", "register_type", register_type_wrapper),
("psycopg2._json", "register_type", register_type_wrapper),
]:
try:
wrapt.wrap_function_wrapper(module, attr, wrapper)
except Exception: # pragma: no cover
pass


def patch_pymongo(context):
"""
Monkey patches pymongo client, if available. Overloads the
query methods to add tracing and metrics collection.
"""

def wrapper(wrapped, instance, args, kwargs):
if not hasattr(context, "iopipe") or not hasattr(
context.iopipe, "mark"
): # pragma: no cover
return wrapped(*args, **kwargs)

id = ensure_utf8(str(uuid.uuid4()))
with context.iopipe.mark(id):
response = wrapped(*args, **kwargs)
trace = context.iopipe.mark.measure(id)
context.iopipe.mark.delete(id)
collect_pymongo_metrics(context, trace, instance, response)
return response

for module, attr, _wrapper in [
("pymongo.collection", "Collection.find", wrapper),
("pymongo.collection", "Collection.bulk_write", wrapper),
("pymongo.collection", "Collection.delete_many", wrapper),
("pymongo.collection", "Collection.delete_one", wrapper),
("pymongo.collection", "Collection.insert_many", wrapper),
("pymongo.collection", "Collection.insert_one", wrapper),
("pymongo.collection", "Collection.replace_one", wrapper),
("pymongo.collection", "Collection.update_many", wrapper),
("pymongo.collection", "Collection.update_one", wrapper),
]:
try:
wrapt.wrap_function_wrapper(module, attr, _wrapper)
except Exception: # pragma: no cover
pass


def patch_pymysql(context):
"""
Monkey patches pymysql client, if available. Overloads the
execute method to add tracing and metrics collection.
"""

class _CursorProxy(CursorProxy):
def execute(self, *args, **kwargs):
if not hasattr(context, "iopipe") or not hasattr(
context.iopipe, "mark"
): # pragma: no cover
self.__wrapped__.execute(*args, **kwargs)
return

id = ensure_utf8(str(uuid.uuid4()))
with context.iopipe.mark(id):
self.__wrapped__.execute(*args, **kwargs)
trace = context.iopipe.mark.measure(id)
context.iopipe.mark.delete(id)
collect_mysql_metrics(context, trace, self, args)

class _ConnectionProxy(ConnectionProxy):
def cursor(self, *args, **kwargs):
cursor = self.__wrapped__.cursor(*args, **kwargs)
return _CursorProxy(cursor, self)

def connect_wrapper(wrapped, instance, args, kwargs):
connection = wrapped(*args, **kwargs)
return _ConnectionProxy(connection, args, kwargs)

try:
wrapt.wrap_function_wrapper("psycopg2", "connect", connect_wrapper)
wrapt.wrap_function_wrapper("pymysql", "connect", connect_wrapper)
except Exception: # pragma: no cover
pass
else:
wrapt.wrap_function_wrapper("psycopg2.extensions", "adapt", adapt_wrapper)
wrapt.wrap_function_wrapper(
"psycopg2.extensions", "register_type", register_type_wrapper
)
wrapt.wrap_function_wrapper(
"psycopg2._psycopg", "register_type", register_type_wrapper
)
wrapt.wrap_function_wrapper(
"psycopg2._json", "register_type", register_type_wrapper
)


def patch_redis(context):
Expand Down Expand Up @@ -259,16 +356,27 @@ def pipeline_wrapper(wrapped, instance, args, kwargs): # pragma: no cover
)
return response

for module, attr, _wrapper in [
("redis.client", "Redis.execute_command", wrapper),
("redis.client", "Pipeline.execute", wrapper),
("redis.client", "Pipeline.immediate_execute_command", wrapper),
]:
try:
wrapt.wrap_function_wrapper(module, attr, _wrapper)
except Exception: # pragma: no cover
pass


def restore_mysqldb():
"""Restores mysqldb"""
try:
wrapt.wrap_function_wrapper("redis.client", "Redis.execute_command", wrapper)
except Exception: # pragma: no cover
import MySQLdb
except ImportError: # pragma: no cover
pass
else:
for module_name, class_method in [
("redis.client", "Pipeline.execute"),
("redis.client", "Pipeline.immediate_execute_command"),
]:
wrapt.wrap_function_wrapper(module_name, class_method, pipeline_wrapper)
setattr(
MySQLdb, "connect", getattr(MySQLdb.connect, "__wrapped__", MySQLdb.connect)
)


def restore_psycopg2():
Expand Down Expand Up @@ -337,6 +445,18 @@ def restore_pymongo():
)


def restore_pymysql():
"""Restores pymysql"""
try:
import pymysql
except ImportError: # pragma: no cover
pass
else:
setattr(
pymysql, "connect", getattr(pymysql.connect, "__wrapped__", pymysql.connect)
)


def restore_redis():
"""Restores the redis client"""
try:
Expand All @@ -357,12 +477,16 @@ def patch_db_requests(context):
if not hasattr(context, "iopipe"):
return

patch_mysqldb(context)
patch_psycopg2(context)
patch_pymongo(context)
patch_pymysql(context)
patch_redis(context)


def restore_db_requests():
restore_mysqldb()
restore_psycopg2()
restore_pymongo()
restore_pymysql()
restore_redis()
14 changes: 11 additions & 3 deletions iopipe/contrib/trace/dbapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@
}


def parse_dsn(dsn):
return dict(attr.split("=") for attr in dsn.split() if "=" in attr)


def table_name(query, command):
if command in COMMAND_KEYWORDS:
keyword = COMMAND_KEYWORDS[command]
Expand Down Expand Up @@ -44,12 +48,16 @@ def cursor(self, *args, **kwargs): # pragma: no cover
return CursorProxy(cursor, self)

@property
def extract_hostname(self): # pragma: no cover
def extract_db(self):
return self._self_kwargs.get("db", self._self_kwargs.get("database", ""))

@property
def extract_hostname(self):
return self._self_kwargs.get("host", "localhost")

@property
def extract_dbname(self): # pragma: no cover
return self._self_kwargs.get("db", self._self_kwargs.get("database", ""))
def extract_port(self):
return self._self_kwargs.get("port")


class AdapterProxy(wrapt.ObjectProxy):
Expand Down
2 changes: 2 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
"mock",
"mongomock==3.17.0",
"more-itertools<6.0.0",
"mysqlclient==1.4.4",
"psycopg2-binary==2.8.3",
"pymongo==3.8.0",
"PyMySQL==0.9.3",
"pytest==4.1.0",
"pytest-benchmark==3.2.0",
"redis==3.3.4",
Expand Down
Loading

0 comments on commit 32bf272

Please sign in to comment.