Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

revert sidecar changes to expose Prometheus metrics #762

Merged
merged 2 commits into from
Dec 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 1 addition & 38 deletions baseplate/lib/live_data/zookeeper.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,44 +2,12 @@
from typing import Optional

from kazoo.client import KazooClient
from kazoo.client import KazooState
from kazoo.handlers.gevent import SequentialGeventHandler
from prometheus_client import Counter

from baseplate.lib import config
from baseplate.lib.secrets import SecretsStore
from baseplate.server.monkey import gevent_is_patched

SESSION_LOST_TOTAL = Counter(
"live_data_fetcher_lost_sessions_total",
"The number of times a Zookeeper client has had a session be lost.",
)

SESSION_SUSPENDED_TOTAL = Counter(
"live_data_fetcher_suspended_sessions_total",
"The number of times a Zookeeper client has had a session be suspended.",
)

SESSION_SUCCESSFUL_TOTAL = Counter(
"live_data_fetcher_connected_sessions_total",
"The number of times a Zookeeper client has successfully established a session.",
)


class SessionStatsListener:
"""A Kazoo listener that monitors changes in connection state.
Increments an event counter whenever connection state changes in a
Zookeeper connection.
"""

def __call__(self, state: KazooState) -> None:
if state == KazooState.LOST:
SESSION_LOST_TOTAL.inc()
elif state == KazooState.SUSPENDED:
SESSION_SUSPENDED_TOTAL.inc()
else:
SESSION_SUCCESSFUL_TOTAL.inc()


def zookeeper_client_from_config(
secrets: SecretsStore, app_config: config.RawConfig, read_only: Optional[bool] = None
Expand Down Expand Up @@ -93,7 +61,7 @@ def zookeeper_client_from_config(
else:
handler = None

client = KazooClient(
return KazooClient(
cfg.hosts,
timeout=cfg.timeout.total_seconds(),
auth_data=auth_data,
Expand Down Expand Up @@ -122,8 +90,3 @@ def zookeeper_client_from_config(
max_delay=60, # never wait longer than this
),
)

listener = SessionStatsListener()
client.add_listener(listener)

return client
59 changes: 16 additions & 43 deletions baseplate/sidecars/event_publisher.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,3 @@
# pylint: disable=wrong-import-position,wrong-import-order
from gevent.monkey import patch_all

from baseplate.server.monkey import patch_stdlib_queues

# In order to allow Prometheus to scrape metrics, we need to concurrently
# handle requests to '/metrics' along with the sidecar's execution.
# Monkey patching is used to replace the stdlib sequential versions of functions
# with concurrent versions. It must happen as soon as possible, before the
# sequential versions are imported.
patch_all()
patch_stdlib_queues()

import argparse
import configparser
import email.utils
Expand All @@ -23,13 +10,9 @@
from typing import List
from typing import Optional

import gevent
import requests

from prometheus_client import Counter

from baseplate import Baseplate
from baseplate.clients.requests import ExternalRequestsClient
from baseplate import __version__ as baseplate_version
from baseplate.lib import config
from baseplate.lib import metrics
from baseplate.lib.events import MAX_EVENT_SIZE
Expand All @@ -39,7 +22,6 @@
from baseplate.lib.metrics import metrics_client_from_config
from baseplate.lib.retry import RetryPolicy
from baseplate.server import EnvironmentInterpolation
from baseplate.server.prometheus import start_prometheus_exporter_for_sidecar
from baseplate.sidecars import Batch
from baseplate.sidecars import BatchFull
from baseplate.sidecars import SerializedBatch
Expand All @@ -61,8 +43,6 @@
# maximum size (in bytes) of a batch of events
MAX_BATCH_SIZE = 500 * 1024

PUBLISHES_COUNT_TOTAL = Counter("eventv2_publishes_total", "total count of published events")


class MaxRetriesError(Exception):
pass
Expand Down Expand Up @@ -119,12 +99,15 @@ def serialize(self) -> SerializedBatch:


class BatchPublisher:
def __init__(self, bp: Baseplate, metrics_client: metrics.Client, cfg: Any):
self.baseplate = bp
def __init__(self, metrics_client: metrics.Client, cfg: Any):
self.metrics = metrics_client
self.url = f"{cfg.collector.scheme}://{cfg.collector.hostname}/v{cfg.collector.version}"
self.key_name = cfg.key.name
self.key_secret = cfg.key.secret
self.session = requests.Session()
self.session.headers[
"User-Agent"
] = f"baseplate.py-{self.__class__.__name__}/{baseplate_version}"

def _sign_payload(self, payload: bytes) -> str:
digest = hmac.new(self.key_secret, payload, hashlib.sha256).hexdigest()
Expand All @@ -146,14 +129,15 @@ def publish(self, payload: SerializedBatch) -> None:

for _ in RetryPolicy.new(budget=MAX_RETRY_TIME, backoff=RETRY_BACKOFF):
try:
with self.baseplate.server_context("post") as context:
with self.metrics.timer("post"):
response = context.http_client.post(
self.url,
headers=headers,
data=compressed_payload,
timeout=POST_TIMEOUT,
)
with self.metrics.timer("post"):
response = self.session.post(
self.url,
headers=headers,
data=compressed_payload,
timeout=POST_TIMEOUT,
# http://docs.python-requests.org/en/latest/user/advanced/#keep-alive
stream=False,
)
response.raise_for_status()
except requests.HTTPError as exc:
self.metrics.counter("error.http").increment()
Expand All @@ -171,7 +155,6 @@ def publish(self, payload: SerializedBatch) -> None:
self.metrics.counter("error.io").increment()
logger.exception("HTTP Request failed")
else:
PUBLISHES_COUNT_TOTAL.inc(payload.item_count)
self.metrics.counter("sent").increment(payload.item_count)
return

Expand Down Expand Up @@ -226,21 +209,12 @@ def publish_events() -> None:
max_message_size=MAX_EVENT_SIZE,
)

bp = Baseplate()
bp.configure_context(
{
"http_client": ExternalRequestsClient("event_collector"),
}
)

# pylint: disable=maybe-no-member
serializer = SERIALIZER_BY_VERSION[cfg.collector.version]()
batcher = TimeLimitedBatch(serializer, MAX_BATCH_AGE)
publisher = BatchPublisher(bp, metrics_client, cfg)
publisher = BatchPublisher(metrics_client, cfg)

while True:
# allow other routines to execute (specifically handling requests to /metrics)
gevent.sleep(0)
message: Optional[bytes]

try:
Expand All @@ -264,5 +238,4 @@ def publish_events() -> None:


if __name__ == "__main__":
start_prometheus_exporter_for_sidecar()
publish_events()
19 changes: 2 additions & 17 deletions baseplate/sidecars/live_data_watcher.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,11 @@
"""Watch nodes in ZooKeeper and sync their contents to disk on change."""
# pylint: disable=wrong-import-position,wrong-import-order
from gevent.monkey import patch_all

from baseplate.server.monkey import patch_stdlib_queues

# In order to allow Prometheus to scrape metrics, we need to concurrently
# handle requests to '/metrics' along with the sidecar's execution.
# Monkey patching is used to replace the stdlib sequential versions of functions
# with concurrent versions. It must happen as soon as possible, before the
# sequential versions are imported.
patch_all()
patch_stdlib_queues()

import argparse
import configparser
import json
import logging
import os
import sys
import time

from enum import Enum
from pathlib import Path
Expand All @@ -26,7 +14,6 @@
from typing import Optional

import boto3 # type: ignore
import gevent

from botocore import UNSIGNED # type: ignore
from botocore.client import ClientError # type: ignore
Expand All @@ -39,7 +26,6 @@
from baseplate.lib.live_data.zookeeper import zookeeper_client_from_config
from baseplate.lib.secrets import secrets_store_from_config
from baseplate.server import EnvironmentInterpolation
from baseplate.server.prometheus import start_prometheus_exporter_for_sidecar


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -201,7 +187,7 @@ def watch_zookeeper_nodes(zookeeper: KazooClient, nodes: Any) -> NoReturn:
# all the interesting stuff is now happening in the Kazoo worker thread
# and so we'll just spin and periodically heartbeat to prove we're alive.
while True:
gevent.sleep(HEARTBEAT_INTERVAL)
time.sleep(HEARTBEAT_INTERVAL)

# see the comment in baseplate.live_data.zookeeper for explanation of
# how reconnects work with the background thread.
Expand Down Expand Up @@ -272,5 +258,4 @@ def main() -> NoReturn:


if __name__ == "__main__":
start_prometheus_exporter_for_sidecar()
main()
19 changes: 2 additions & 17 deletions baseplate/sidecars/secrets_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,19 +60,6 @@
write to a new file in whatever format needed, and restart other services if necessary.

"""
# pylint: disable=wrong-import-position,wrong-import-order
from gevent.monkey import patch_all

from baseplate.server.monkey import patch_stdlib_queues

# In order to allow Prometheus to scrape metrics, we need to concurrently
# handle requests to '/metrics' along with the sidecar's execution.
# Monkey patching is used to replace the stdlib sequential versions of functions
# with concurrent versions. It must happen as soon as possible, before the
# sequential versions are imported.
patch_all()
patch_stdlib_queues()

import argparse
import configparser
import datetime
Expand All @@ -81,6 +68,7 @@
import os
import posixpath
import subprocess
import time
import urllib.parse
import uuid

Expand All @@ -90,13 +78,11 @@
from typing import Optional
from typing import Tuple

import gevent
import requests

from baseplate import __version__ as baseplate_version
from baseplate.lib import config
from baseplate.server import EnvironmentInterpolation
from baseplate.server.prometheus import start_prometheus_exporter_for_sidecar


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -441,9 +427,8 @@ def main() -> None:
last_proc = trigger_callback(cfg.callback, cfg.output.path, last_proc)
time_til_expiration = soonest_expiration - datetime.datetime.utcnow()
time_to_sleep = time_til_expiration - VAULT_TOKEN_PREFETCH_TIME
gevent.sleep(max(int(time_to_sleep.total_seconds()), 1))
time.sleep(max(int(time_to_sleep.total_seconds()), 1))


if __name__ == "__main__":
start_prometheus_exporter_for_sidecar()
main()
Loading