Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add stats logging #1204

Merged
merged 2 commits into from
Mar 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs_website/docs/configurations/infra_config.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,14 @@ The following settings are only relevant if you are using `s3` and your S3 bucke

You can also add addtional loggers in the event logger plugin. See [Add Event Logger guide](../integrations/add_event_logger.md) for more details.

### Stats Logging

`STATS_LOGGER_NAME` (optional, defaults to **"null"**): This configures what stats logger to be used.

- null: This is the default logger, which does nothing and disregards the logs.
- console: This will print the stats logs to the console. Could be used for debugging purpose.

You need to add your own stats logger plugin to use it. See [Add Stats Logger guide](../integrations/add_stats_logger.md) for more details.
## Authentication

`AUTH_BACKEND` (optional, defaults to **app.auth.password_auth**): Python path to the authentication file. By default Querybook provides:
Expand Down
31 changes: 31 additions & 0 deletions docs_website/docs/integrations/add_stats_logger.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
---
id: add_stats_logger
title: Stats Logging
sidebar_label: Stats Logging
---

Stats logging is used for monitoring and measuring the performance of an application or system. Querybook provides the support to collect metrics by adding your own stats logger, like StatsD. Here are the metrics we currently added:
- Number of active users
- Number of API requests
- Latency of API requests
- Number of websocket connections
- Number of sql session failures
- Number of scheduled system task failures
- Number of scheduled datadoc failures
- Latency of Redis operations
- Number of query executions

## Configure Event Logger
Update `STATS_LOGGER_NAME` in the querybook config yaml file with the logger name you'd like to use.

```
STATS_LOGGER_NAME: ~
```

## Add a new Stats Logger as a plugin
If you'd like to actually use this feature, you need to create your own stats logger and add it as a [plugin](plugins.md).


1. Locate the plugin root directory for your customized Querybook, and find the folder called `stats_logger_plugin`.
2. Add your stats logger code similiar to the builtin loggers, like `ConsoleStatsLogger`, which means making sure it inherits from `BaseStatsLogger` and implements the abstract methods.
3. Add the new stats logger in the variable `ALL_PLUGIN_STATS_LOGGERS` under `stats_logger_plugin/__init__.py`
1 change: 1 addition & 0 deletions docs_website/sidebars.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
"integrations/add_query_transpilation",
"integrations/add_table_upload",
"integrations/add_event_logger",
"integrations/add_stats_logger",
"integrations/customize_html",
"integrations/embedded_iframe"
],
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "querybook",
"version": "3.20.0",
"version": "3.21.0",
"description": "A Big Data Webapp",
"private": true,
"scripts": {
Expand Down
1 change: 1 addition & 0 deletions plugins/stats_logger_plugin/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALL_PLUGIN_STATS_LOGGERS = []
3 changes: 3 additions & 0 deletions querybook/config/querybook_default_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,6 @@ TABLE_MAX_UPLOAD_SIZE: ~

# --------------- Event Logging ---------------
EVENT_LOGGER_NAME: ~

# --------------- Stats Logging ---------------
STATS_LOGGER_NAME: ~
32 changes: 24 additions & 8 deletions querybook/server/app/datasource.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,32 @@
import datetime
import functools
import json
import traceback
import socket
import time
import traceback

import flask
from flask_login import current_user
from werkzeug.exceptions import Forbidden, NotFound

from app.flask_app import flask_app, limiter
from app.db import get_session
from app.flask_app import flask_app, limiter
from const.datasources import (
ACCESS_RESTRICTED_STATUS_CODE,
DS_PATH,
INVALID_SEMANTIC_STATUS_CODE,
OK_STATUS_CODE,
UNAUTHORIZED_STATUS_CODE,
INVALID_SEMANTIC_STATUS_CODE,
ACCESS_RESTRICTED_STATUS_CODE,
UNKNOWN_CLIENT_ERROR_STATUS_CODE,
UNKNOWN_SERVER_ERROR_STATUS_CODE,
)
from flask_login import current_user
from lib.event_logger import event_logger
from lib.stats_logger import (
stats_logger,
API_REQUEST_COUNTER,
API_REQUEST_LATENCY_TIMER,
)
from lib.logger import get_logger
from logic.impression import create_impression
from lib.event_logger import event_logger
from werkzeug.exceptions import Forbidden, NotFound

LOG = get_logger(__file__)
_host = socket.gethostname()
Expand Down Expand Up @@ -56,6 +61,11 @@ def wrapper(fn):
@flask_app.route(r"%s%s" % (DS_PATH, url), methods=methods)
@functools.wraps(fn)
def handler(**kwargs):
# increment the number of api request counter
stats_logger.incr(API_REQUEST_COUNTER.format(fn.__name__))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we use the url here? it is generic right

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

url is something like /query_execution/<int:query_execution_id>/, which will include the parameter types.

# start the timer for api request duration
start_time = time.time()

if require_auth and not current_user.is_authenticated:
flask.abort(UNAUTHORIZED_STATUS_CODE, description="Login required.")

Expand All @@ -79,6 +89,12 @@ def handler(**kwargs):

results = fn(**kwargs)

# stop the timer and record the duration
duration_ms = (time.time() - start_time) * 1000.0
stats_logger.timing(
API_REQUEST_LATENCY_TIMER.format(fn.__name__), duration_ms
)

if not custom_response:
if not isinstance(results, dict) or "data" not in results:
results = {"data": results, "host": _host}
Expand Down
9 changes: 9 additions & 0 deletions querybook/server/app/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from sqlalchemy.exc import SQLAlchemyError, DisconnectionError
from sqlalchemy import create_engine, event
from sqlalchemy.orm import sessionmaker, scoped_session
from lib.stats_logger import stats_logger, SQL_SESSION_FAILURE_COUNTER

try:
from greenlet import getcurrent as _get_ident
Expand Down Expand Up @@ -122,6 +123,10 @@ def func(*args, **kwargs):
import traceback

LOG.error(traceback.format_exc())

# increment sql session failure counter
stats_logger.incr(SQL_SESSION_FAILURE_COUNTER)

raise e
finally:
# Since we created the session, close it.
Expand Down Expand Up @@ -151,6 +156,10 @@ def DBSession():
import traceback

LOG.error(traceback.format_exc())

# increment sql session failure counter
stats_logger.incr(SQL_SESSION_FAILURE_COUNTER)

raise e
finally:
get_session().remove()
Expand Down
13 changes: 11 additions & 2 deletions querybook/server/clients/redis_client.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import redis
import functools
import time

import redis
from env import QuerybookSettings

from lib.stats_logger import REDIS_LATENCY_TIMER, stats_logger

__redis = None

Expand All @@ -19,6 +20,9 @@ def with_redis(fn):

@functools.wraps(fn)
def func(*args, **kwargs):
# start the timer for redis latency
start_time = time.time()

conn = None
# If there's no session, create a new one. We will
# automatically close this after the function is called.
Expand All @@ -27,6 +31,11 @@ def func(*args, **kwargs):
kwargs["redis_conn"] = conn

result = fn(*args, **kwargs)

# stop the timer and record the duration
duration_ms = (time.time() - start_time) * 1000.0
stats_logger.timing(REDIS_LATENCY_TIMER.format(fn.__name__), duration_ms)

return result

return func
Expand Down
8 changes: 8 additions & 0 deletions querybook/server/const/schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,11 @@ class NotifyOn(Enum):
ALL = 0
ON_FAILURE = 1
ON_SUCCESS = 2


class ScheduleTaskType(Enum):
PROD = "prod"
USER = "user"


UserTaskNames = set(["tasks.run_datadoc.run_datadoc"])
3 changes: 2 additions & 1 deletion querybook/server/datasources/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from app.datasource import register, admin_only, api_assert
from app.db import DBSession
from const.admin import AdminOperation, AdminItemType
from const.schedule import ScheduleTaskType
from datasources.admin_audit_log import with_admin_audit_log
from env import QuerybookSettings

Expand Down Expand Up @@ -601,7 +602,7 @@ def exec_demo_set_up():
"task": "tasks.update_metastore.update_metastore",
"cron": "0 0 * * *",
"args": [metastore_id],
"task_type": "prod",
"task_type": ScheduleTaskType.PROD,
"enabled": True,
},
# commit=False,
Expand Down
1 change: 0 additions & 1 deletion querybook/server/datasources/datadoc.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,6 @@ def create_datadoc_schedule(
"user_id": data_doc.owner_uid,
"doc_id": id,
},
task_type="user",
session=session,
)

Expand Down
2 changes: 0 additions & 2 deletions querybook/server/datasources/schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ def create_task_schedule(
cron,
name,
task,
task_type,
enabled,
args=None,
kwargs=None,
Expand All @@ -36,7 +35,6 @@ def create_task_schedule(
cron=cron,
args=args,
kwargs=kwargs,
task_type=task_type,
options=options,
enabled=enabled,
session=session,
Expand Down
2 changes: 2 additions & 0 deletions querybook/server/datasources_socketio/connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
from app.flask_app import socketio
from const.data_doc import DATA_DOC_NAMESPACE
from const.query_execution import QUERY_EXECUTION_NAMESPACE
from lib.stats_logger import stats_logger, WS_CONNECTIONS_COUNTER


def connect():
stats_logger.incr(WS_CONNECTIONS_COUNTER)
if not current_user.is_authenticated:
raise ConnectionRefusedError("User is not logged in, please refresh the page.")

Expand Down
7 changes: 7 additions & 0 deletions querybook/server/datasources_socketio/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from app.flask_app import socketio
from lib.event_logger import event_logger
from lib.logger import get_logger
from lib.stats_logger import stats_logger, WS_CONNECTIONS_COUNTER

LOG = get_logger(__file__)

Expand All @@ -18,6 +19,8 @@ def handler(*args, **kwargs):
if not current_user.is_authenticated:
LOG.error("Unauthorized websocket access")
disconnect()
# decrement ws connections counter on disconnect
stats_logger.decr(WS_CONNECTIONS_COUNTER)
else:
try:
if websocket_logging:
Expand All @@ -36,6 +39,10 @@ def handler(*args, **kwargs):
room=flask.request.sid,
)

# decrement ws connections counter on disconnect
if url == "disconnect":
stats_logger.decr(WS_CONNECTIONS_COUNTER)

handler.__raw__ = fn
return handler

Expand Down
3 changes: 3 additions & 0 deletions querybook/server/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,3 +127,6 @@ class QuerybookSettings(object):

# Event Logging
EVENT_LOGGER_NAME = get_env_config("EVENT_LOGGER_NAME") or "null"

# Stats Logging
STATS_LOGGER_NAME = get_env_config("STATS_LOGGER_NAME") or "null"
17 changes: 17 additions & 0 deletions querybook/server/lib/stats_logger/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from env import QuerybookSettings
from lib.stats_logger.all_stats_loggers import get_stats_logger_class
from .base_stats_logger import BaseStatsLogger


# metrics name templates
API_REQUEST_COUNTER = "api.{}"
API_REQUEST_LATENCY_TIMER = "api.duration.ms.{}"
WS_CONNECTIONS_COUNTER = "ws.connections"
SQL_SESSION_FAILURE_COUNTER = "sql.session.failure"
SYSTEM_TASK_FAILURE_COUNTER = "task.failure.system"
DATADOC_TASK_FAILURE_COUNTER = "task.failure.datadoc"
REDIS_LATENCY_TIMER = "redis.duration.ms.{}"
QUERY_EXECUTION_COUNTER = "query_execution.{}"

logger_name = QuerybookSettings.STATS_LOGGER_NAME
stats_logger: BaseStatsLogger = get_stats_logger_class(logger_name)
18 changes: 18 additions & 0 deletions querybook/server/lib/stats_logger/all_stats_loggers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from lib.utils.import_helper import import_module_with_default
from .loggers.null_stats_logger import NullStatsLogger
from .loggers.console_stats_logger import ConsoleStatsLogger

ALL_PLUGIN_STATS_LOGGERS = import_module_with_default(
"stats_logger_plugin",
"ALL_PLUGIN_STATS_LOGGERS",
default=[],
)

ALL_STATS_LOGGERS = [NullStatsLogger(), ConsoleStatsLogger()] + ALL_PLUGIN_STATS_LOGGERS


def get_stats_logger_class(name: str):
for logger in ALL_STATS_LOGGERS:
if logger.logger_name == name:
return logger
raise ValueError(f"Unknown event logger name {name}")
37 changes: 37 additions & 0 deletions querybook/server/lib/stats_logger/base_stats_logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from abc import ABC, abstractmethod


class BaseStatsLogger(ABC):
"""Base class for logging realtime stats"""

def key(self, key: str) -> str:
if self.prefix:
return self.prefix + key
return key

@property
def logger_name(self) -> str:
raise NotImplementedError()

@property
def prefix(self) -> str:
return "querybook."

@abstractmethod
def incr(self, key: str) -> None:
"""Increment a counter"""
raise NotImplementedError()

@abstractmethod
def decr(self, key: str) -> None:
"""Decrement a counter"""
raise NotImplementedError()

@abstractmethod
def timing(self, key: str, value: float) -> None:
raise NotImplementedError()

@abstractmethod
def gauge(self, key: str, value: float) -> None:
"""Setup a gauge"""
raise NotImplementedError()
Empty file.
Loading