Skip to content

Commit

Permalink
Merge pull request #73 from aiven/jankatins/clearer_logs_and_small_re…
Browse files Browse the repository at this point in the history
…factoring

Clearer logs and small refactoring

#73
  • Loading branch information
kmichel-aiven authored Sep 15, 2021
2 parents e928723 + defb2ce commit 6924e3f
Show file tree
Hide file tree
Showing 8 changed files with 120 additions and 46 deletions.
74 changes: 74 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
name: Build pglookout

on:
push:
branches:
- master
tags:
- '**'
pull_request:

jobs:

lint:
runs-on: ubuntu-latest
strategy:
matrix:
# only use the newest version for the lint step -> as long as the other version can run it it's ok
python-version: [3.9]

steps:

- id: checkout-code
uses: actions/checkout@v2

- id: prepare-python
uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python-version }}

- id: dependencies
run: |
pip install -e .
pip install --upgrade pylint flake8 pytest mock
- id: pylint
run: make pylint

- id: flake8
run: make flake8

test:
runs-on: ubuntu-latest
needs: lint
strategy:
max-parallel: 4
matrix:
python-version: [3.6, 3.7, 3.8, 3.9]
pg-version: [9.6, 10, 11, 12, 13]

steps:
- id: checkout-code
uses: actions/checkout@v2

- id: prepare-python
uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python-version }}

- id: dependencies
run: |
# Setup the Postgres repositories
sudo sh -c 'echo "deb http://apt.postgresql.org/pub/repos/apt $(lsb_release -cs)-pgdg main" > /etc/apt/sources.list.d/pgdg.list'
wget --quiet -O - https://www.postgresql.org/media/keys/ACCC4CF8.asc | sudo apt-key add -
sudo apt-get update
# Setup build deps
sudo apt-get install -y libsnappy-dev
sudo apt-get install -y postgresql-${{ matrix.pg-version }}
# Setup common python dependencies
python -m pip install --upgrade pip
pip install --upgrade pytest mock
pip install -e .
- id: unittest
run: make unittest
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,5 @@ build/
pglookout.egg-info/
.coverage
/pglookout/version.py

.idea/
3 changes: 2 additions & 1 deletion .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ disable=
too-many-public-methods,
too-many-statements,
ungrouped-imports,
wrong-import-order
wrong-import-order,
unspecified-encoding

[REPORTS]
output-format=text
Expand Down
22 changes: 0 additions & 22 deletions .travis.yml

This file was deleted.

3 changes: 1 addition & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ long_ver = $(shell git describe --long 2>/dev/null || echo $(short_ver)-0-unknow
generated = pglookout/version.py

PYTHON ?= python3
PYLINT=$(shell which pylint 2> /dev/null | which pylint-3)
PYLINT_DIRS = pglookout/ test/

all: $(generated)
Expand All @@ -21,7 +20,7 @@ flake8: $(generated)
$(PYTHON) -m flake8 --ignore E722 --max-line-len=125 $(PYLINT_DIRS)

pylint: $(generated)
$(PYLINT) --rcfile .pylintrc $(PYLINT_DIRS)
$(PYTHON) -m pylint --rcfile .pylintrc $(PYLINT_DIRS)

coverage:
$(PYTHON) -m pytest $(PYTEST_ARG) --cov-report term-missing --cov pglookout test/
Expand Down
21 changes: 14 additions & 7 deletions pglookout/cluster_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ def wait_select(conn, timeout=5.0):
class ClusterMonitor(Thread):
def __init__(self, config, cluster_state, observer_state, create_alert_file, cluster_monitor_check_queue,
failover_decision_queue, is_replication_lag_over_warning_limit, stats):
"""Thread which collects cluster state.
Basically a loop which tries to connect to each cluster member and
to external observers for status information. The information is collected
in the cluster_state/observer_state dictionaries, which are shared with the main thread.
"""
Thread.__init__(self)
self.log = logging.getLogger("ClusterMonitor")
self.stats = stats
Expand Down Expand Up @@ -103,8 +109,8 @@ def _connect_to_db(self, instance, dsn):

def _fetch_observer_state(self, instance, uri):
result = {"fetch_time": get_iso_timestamp(), "connection": True}
fetch_uri = uri + "/state.json"
try:
fetch_uri = uri + "/state.json"
response = self.session.get(fetch_uri, timeout=5.0)

# check time difference for large skews
Expand Down Expand Up @@ -148,16 +154,16 @@ def connect_to_cluster_nodes_and_cleanup_old_nodes(self):
for instance, connect_string in self.config.get("remote_conns", {}).items():
self._connect_to_db(instance, dsn=connect_string)

def _standby_status_query(self, instance, db_conn):
"""Status query that is executed on the standby node"""
def _query_cluster_member_state(self, instance, db_conn):
"""Query a single cluster member for its state"""
f_result = None
result = {"fetch_time": get_iso_timestamp(), "connection": False}
if not db_conn:
db_conn = self._connect_to_db(instance, self.config["remote_conns"].get(instance))
if not db_conn:
return result
phase = "querying status from"
try:
phase = "querying status from"
self.log.debug("%s %r", phase, instance)
c = db_conn.cursor(cursor_factory=RealDictCursor)
if db_conn.server_version >= 100000:
Expand Down Expand Up @@ -236,9 +242,10 @@ def _parse_status_query_result(result):
result.update({"db_time": get_iso_timestamp(result["db_time"]), "connection": True})
return result

def standby_status_query(self, instance, db_conn):
def update_cluster_member_state(self, instance, db_conn):
"""Update the cluster state entry for a single cluster member"""
start_time = time.monotonic()
result = self._standby_status_query(instance, db_conn)
result = self._query_cluster_member_state(instance, db_conn)
self.log.debug("DB state gotten from: %r was: %r, took: %.4fs to fetch",
instance, result, time.monotonic() - start_time)
if instance in self.cluster_state:
Expand Down Expand Up @@ -266,7 +273,7 @@ def main_monitoring_loop(self, requested_check=False):
always_observers = not self.config.get("poll_observers_on_warning_only")
with ThreadPoolExecutor(max_workers=thread_count) as tex:
for instance, db_conn in self.db_conns.items():
futures.append(tex.submit(self.standby_status_query, instance, db_conn))
futures.append(tex.submit(self.update_cluster_member_state, instance, db_conn))
if always_observers or self.is_replication_lag_over_warning_limit():
for instance, uri in self.config.get("observers", {}).items():
futures.append(tex.submit(self.fetch_observer_state, instance, uri))
Expand Down
35 changes: 23 additions & 12 deletions pglookout/pglookout.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
class PgLookout:
def __init__(self, config_path):
self.log = logging.getLogger("pglookout")
self.stats = None
# dummy to make sure we never get an AttributeError -> gets overwritten after the first config loading
self.stats = statsd.StatsClient(host=None)
self.running = True
self.replication_lag_over_warning_limit = False

Expand Down Expand Up @@ -71,9 +72,6 @@ def __init__(self, config_path):

self.cluster_state = {}
self.observer_state = {}
self.overall_state = {"db_nodes": self.cluster_state, "observer_nodes": self.observer_state,
"current_master": self.current_master,
"replication_lag_over_warning": self.replication_lag_over_warning_limit}

self.cluster_monitor = ClusterMonitor(
config=self.config,
Expand Down Expand Up @@ -168,24 +166,31 @@ def load_config(self, _signal=None, _frame=None):
self.cluster_monitor_check_queue.put("new config came, recheck")

def write_cluster_state_to_json_file(self):
"""Periodically write a JSON state file to disk"""
"""Periodically write a JSON state file to disk
Currently only used to share state with the current_master helper command, pglookout itself does
not rely in this file.
"""
start_time = time.monotonic()
state_file_path = self.config.get("json_state_file_path", "/tmp/pglookout_state.json")
overall_state = {"db_nodes": self.cluster_state, "observer_nodes": self.observer_state,
"current_master": self.current_master}
try:
self.overall_state = {"db_nodes": self.cluster_state, "observer_nodes": self.observer_state,
"current_master": self.current_master}
json_to_dump = json.dumps(self.overall_state, indent=4)
json_to_dump = json.dumps(overall_state, indent=4)
self.log.debug("Writing JSON state file to: %r, file_size: %r", state_file_path, len(json_to_dump))
with open(state_file_path + ".tmp", "w") as fp:
fp.write(json_to_dump)
os.rename(state_file_path + ".tmp", state_file_path)
self.log.debug("Wrote JSON state file to disk, took %.4fs", time.monotonic() - start_time)
except Exception as ex: # pylint: disable=broad-except
self.log.exception("Problem in writing JSON: %r file to disk, took %.4fs",
self.overall_state, time.monotonic() - start_time)
overall_state, time.monotonic() - start_time)
self.stats.unexpected_exception(ex, where="write_cluster_state_to_json_file")

def create_node_map(self, cluster_state, observer_state):
"""Computes roles for each known member of cluster.
Use the information gathered in the cluster_state and observer_state to figure out the roles of each member."""
standby_nodes, master_node, master_instance = {}, None, None
connected_master_nodes, disconnected_master_nodes = {}, {}
connected_observer_nodes, disconnected_observer_nodes = {}, {}
Expand Down Expand Up @@ -469,9 +474,15 @@ def _been_in_contact_with_master_within_failover_timeout(self):
return False

def do_failover_decision(self, own_state, standby_nodes):
if self.connected_master_nodes or self._been_in_contact_with_master_within_failover_timeout():
if self.connected_master_nodes:
self.log.warning("We still have some connected masters: %r, not failing over", self.connected_master_nodes)
return
if self._been_in_contact_with_master_within_failover_timeout():
self.log.warning(
"No connected master nodes, but last contact was still within failover timeout (%ss), not failing over",
self.replication_lag_failover_timeout,
)
return

known_replication_positions = self.get_replication_positions(standby_nodes)
if not known_replication_positions:
Expand Down Expand Up @@ -618,12 +629,12 @@ def create_alert_file(self, filename):
with open(filepath, "w") as fp:
fp.write("alert")
except Exception as ex: # pylint: disable=broad-except
self.log.exception("Problem writing alert file: %r", filepath)
self.log.exception("Problem writing alert file: %r", filename)
self.stats.unexpected_exception(ex, where="create_alert_file")

def delete_alert_file(self, filename):
filepath = os.path.join(self.config.get("alert_file_dir", os.getcwd()), filename)
try:
filepath = os.path.join(self.config.get("alert_file_dir", os.getcwd()), filename)
if os.path.exists(filepath):
self.log.debug("Deleting alert file: %r", filepath)
os.unlink(filepath)
Expand Down
6 changes: 4 additions & 2 deletions pglookout/webserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,24 +50,26 @@ def close(self):

class RequestHandler(SimpleHTTPRequestHandler):
def do_GET(self):
assert isinstance(self.server, ThreadedWebServer), f"server: {self.server!r}"
self.server.log.debug("Got request: %r", self.path)
if self.path.startswith("/state.json"):
self.send_response(200)
self.send_header('Content-type', 'application/json')
response = json.dumps(self.server.cluster_state, indent=4).encode("utf8")
self.send_header('Content-length', len(response))
self.send_header('Content-length', str(len(response)))
self.end_headers()
self.wfile.write(response)
else:
self.send_response(404)

def do_POST(self):
assert isinstance(self.server, ThreadedWebServer), f"server: {self.server!r}"
self.server.log.debug("Got request: %r", self.path)
if self.path.startswith("/check"):
self.server.cluster_monitor_check_queue.put("request from webserver")
self.server.log.info("Immediate status check requested")
self.send_response(204)
self.send_header('Content-length', 0)
self.send_header('Content-length', str(0))
self.end_headers()
else:
self.send_response(404)

0 comments on commit 6924e3f

Please sign in to comment.