Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clearer logs and small refactoring #73

Merged
merged 7 commits into from
Sep 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
name: Build pglookout

on:
push:
branches:
- master
tags:
- '**'
pull_request:

jobs:

lint:
runs-on: ubuntu-latest
strategy:
matrix:
# only use the newest version for the lint step -> as long as the other version can run it it's ok
python-version: [3.9]

steps:

- id: checkout-code
uses: actions/checkout@v2

- id: prepare-python
uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python-version }}

- id: dependencies
run: |
pip install -e .
pip install --upgrade pylint flake8 pytest mock
kmichel-aiven marked this conversation as resolved.
Show resolved Hide resolved

- id: pylint
run: make pylint

- id: flake8
run: make flake8

test:
runs-on: ubuntu-latest
needs: lint
strategy:
max-parallel: 4
matrix:
python-version: [3.6, 3.7, 3.8, 3.9]
pg-version: [9.6, 10, 11, 12, 13]

steps:
- id: checkout-code
uses: actions/checkout@v2

- id: prepare-python
uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python-version }}

- id: dependencies
run: |
# Setup the Postgres repositories
sudo sh -c 'echo "deb http://apt.postgresql.org/pub/repos/apt $(lsb_release -cs)-pgdg main" > /etc/apt/sources.list.d/pgdg.list'
wget --quiet -O - https://www.postgresql.org/media/keys/ACCC4CF8.asc | sudo apt-key add -
sudo apt-get update
# Setup build deps
sudo apt-get install -y libsnappy-dev
sudo apt-get install -y postgresql-${{ matrix.pg-version }}
# Setup common python dependencies
python -m pip install --upgrade pip
pip install --upgrade pytest mock
pip install -e .

- id: unittest
run: make unittest
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,5 @@ build/
pglookout.egg-info/
.coverage
/pglookout/version.py

.idea/
3 changes: 2 additions & 1 deletion .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ disable=
too-many-public-methods,
too-many-statements,
ungrouped-imports,
wrong-import-order
wrong-import-order,
unspecified-encoding

[REPORTS]
output-format=text
Expand Down
22 changes: 0 additions & 22 deletions .travis.yml

This file was deleted.

3 changes: 1 addition & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ long_ver = $(shell git describe --long 2>/dev/null || echo $(short_ver)-0-unknow
generated = pglookout/version.py

PYTHON ?= python3
PYLINT=$(shell which pylint 2> /dev/null | which pylint-3)
PYLINT_DIRS = pglookout/ test/

all: $(generated)
Expand All @@ -21,7 +20,7 @@ flake8: $(generated)
$(PYTHON) -m flake8 --ignore E722 --max-line-len=125 $(PYLINT_DIRS)

pylint: $(generated)
$(PYLINT) --rcfile .pylintrc $(PYLINT_DIRS)
$(PYTHON) -m pylint --rcfile .pylintrc $(PYLINT_DIRS)
kmichel-aiven marked this conversation as resolved.
Show resolved Hide resolved

coverage:
$(PYTHON) -m pytest $(PYTEST_ARG) --cov-report term-missing --cov pglookout test/
Expand Down
21 changes: 14 additions & 7 deletions pglookout/cluster_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ def wait_select(conn, timeout=5.0):
class ClusterMonitor(Thread):
def __init__(self, config, cluster_state, observer_state, create_alert_file, cluster_monitor_check_queue,
failover_decision_queue, is_replication_lag_over_warning_limit, stats):
"""Thread which collects cluster state.

Basically a loop which tries to connect to each cluster member and
to external observers for status information. The information is collected
in the cluster_state/observer_state dictionaries, which are shared with the main thread.
"""
Thread.__init__(self)
self.log = logging.getLogger("ClusterMonitor")
self.stats = stats
Expand Down Expand Up @@ -103,8 +109,8 @@ def _connect_to_db(self, instance, dsn):

def _fetch_observer_state(self, instance, uri):
result = {"fetch_time": get_iso_timestamp(), "connection": True}
fetch_uri = uri + "/state.json"
try:
fetch_uri = uri + "/state.json"
response = self.session.get(fetch_uri, timeout=5.0)

# check time difference for large skews
Expand Down Expand Up @@ -148,16 +154,16 @@ def connect_to_cluster_nodes_and_cleanup_old_nodes(self):
for instance, connect_string in self.config.get("remote_conns", {}).items():
self._connect_to_db(instance, dsn=connect_string)

def _standby_status_query(self, instance, db_conn):
"""Status query that is executed on the standby node"""
def _query_cluster_member_state(self, instance, db_conn):
"""Query a single cluster member for its state"""
f_result = None
result = {"fetch_time": get_iso_timestamp(), "connection": False}
if not db_conn:
db_conn = self._connect_to_db(instance, self.config["remote_conns"].get(instance))
if not db_conn:
return result
phase = "querying status from"
try:
phase = "querying status from"
self.log.debug("%s %r", phase, instance)
c = db_conn.cursor(cursor_factory=RealDictCursor)
if db_conn.server_version >= 100000:
Expand Down Expand Up @@ -236,9 +242,10 @@ def _parse_status_query_result(result):
result.update({"db_time": get_iso_timestamp(result["db_time"]), "connection": True})
return result

def standby_status_query(self, instance, db_conn):
def update_cluster_member_state(self, instance, db_conn):
"""Update the cluster state entry for a single cluster member"""
start_time = time.monotonic()
result = self._standby_status_query(instance, db_conn)
result = self._query_cluster_member_state(instance, db_conn)
self.log.debug("DB state gotten from: %r was: %r, took: %.4fs to fetch",
instance, result, time.monotonic() - start_time)
if instance in self.cluster_state:
Expand Down Expand Up @@ -266,7 +273,7 @@ def main_monitoring_loop(self, requested_check=False):
always_observers = not self.config.get("poll_observers_on_warning_only")
with ThreadPoolExecutor(max_workers=thread_count) as tex:
for instance, db_conn in self.db_conns.items():
futures.append(tex.submit(self.standby_status_query, instance, db_conn))
futures.append(tex.submit(self.update_cluster_member_state, instance, db_conn))
if always_observers or self.is_replication_lag_over_warning_limit():
for instance, uri in self.config.get("observers", {}).items():
futures.append(tex.submit(self.fetch_observer_state, instance, uri))
Expand Down
35 changes: 23 additions & 12 deletions pglookout/pglookout.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
class PgLookout:
def __init__(self, config_path):
self.log = logging.getLogger("pglookout")
self.stats = None
# dummy to make sure we never get an AttributeError -> gets overwritten after the first config loading
self.stats = statsd.StatsClient(host=None)
self.running = True
self.replication_lag_over_warning_limit = False

Expand Down Expand Up @@ -71,9 +72,6 @@ def __init__(self, config_path):

self.cluster_state = {}
self.observer_state = {}
self.overall_state = {"db_nodes": self.cluster_state, "observer_nodes": self.observer_state,
"current_master": self.current_master,
"replication_lag_over_warning": self.replication_lag_over_warning_limit}

self.cluster_monitor = ClusterMonitor(
config=self.config,
Expand Down Expand Up @@ -168,24 +166,31 @@ def load_config(self, _signal=None, _frame=None):
self.cluster_monitor_check_queue.put("new config came, recheck")

def write_cluster_state_to_json_file(self):
"""Periodically write a JSON state file to disk"""
"""Periodically write a JSON state file to disk

Currently only used to share state with the current_master helper command, pglookout itself does
not rely in this file.
"""
start_time = time.monotonic()
state_file_path = self.config.get("json_state_file_path", "/tmp/pglookout_state.json")
overall_state = {"db_nodes": self.cluster_state, "observer_nodes": self.observer_state,
"current_master": self.current_master}
try:
self.overall_state = {"db_nodes": self.cluster_state, "observer_nodes": self.observer_state,
"current_master": self.current_master}
json_to_dump = json.dumps(self.overall_state, indent=4)
json_to_dump = json.dumps(overall_state, indent=4)
self.log.debug("Writing JSON state file to: %r, file_size: %r", state_file_path, len(json_to_dump))
with open(state_file_path + ".tmp", "w") as fp:
fp.write(json_to_dump)
os.rename(state_file_path + ".tmp", state_file_path)
self.log.debug("Wrote JSON state file to disk, took %.4fs", time.monotonic() - start_time)
except Exception as ex: # pylint: disable=broad-except
self.log.exception("Problem in writing JSON: %r file to disk, took %.4fs",
self.overall_state, time.monotonic() - start_time)
overall_state, time.monotonic() - start_time)
self.stats.unexpected_exception(ex, where="write_cluster_state_to_json_file")

def create_node_map(self, cluster_state, observer_state):
"""Computes roles for each known member of cluster.

Use the information gathered in the cluster_state and observer_state to figure out the roles of each member."""
standby_nodes, master_node, master_instance = {}, None, None
connected_master_nodes, disconnected_master_nodes = {}, {}
connected_observer_nodes, disconnected_observer_nodes = {}, {}
Expand Down Expand Up @@ -469,9 +474,15 @@ def _been_in_contact_with_master_within_failover_timeout(self):
return False

def do_failover_decision(self, own_state, standby_nodes):
if self.connected_master_nodes or self._been_in_contact_with_master_within_failover_timeout():
if self.connected_master_nodes:
self.log.warning("We still have some connected masters: %r, not failing over", self.connected_master_nodes)
return
if self._been_in_contact_with_master_within_failover_timeout():
self.log.warning(
"No connected master nodes, but last contact was still within failover timeout (%ss), not failing over",
self.replication_lag_failover_timeout,
)
return

known_replication_positions = self.get_replication_positions(standby_nodes)
if not known_replication_positions:
Expand Down Expand Up @@ -618,12 +629,12 @@ def create_alert_file(self, filename):
with open(filepath, "w") as fp:
fp.write("alert")
except Exception as ex: # pylint: disable=broad-except
self.log.exception("Problem writing alert file: %r", filepath)
self.log.exception("Problem writing alert file: %r", filename)
self.stats.unexpected_exception(ex, where="create_alert_file")

def delete_alert_file(self, filename):
filepath = os.path.join(self.config.get("alert_file_dir", os.getcwd()), filename)
try:
filepath = os.path.join(self.config.get("alert_file_dir", os.getcwd()), filename)
if os.path.exists(filepath):
self.log.debug("Deleting alert file: %r", filepath)
os.unlink(filepath)
Expand Down
6 changes: 4 additions & 2 deletions pglookout/webserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,24 +50,26 @@ def close(self):

class RequestHandler(SimpleHTTPRequestHandler):
def do_GET(self):
assert isinstance(self.server, ThreadedWebServer), f"server: {self.server!r}"
self.server.log.debug("Got request: %r", self.path)
if self.path.startswith("/state.json"):
self.send_response(200)
self.send_header('Content-type', 'application/json')
response = json.dumps(self.server.cluster_state, indent=4).encode("utf8")
self.send_header('Content-length', len(response))
self.send_header('Content-length', str(len(response)))
self.end_headers()
self.wfile.write(response)
else:
self.send_response(404)

def do_POST(self):
assert isinstance(self.server, ThreadedWebServer), f"server: {self.server!r}"
self.server.log.debug("Got request: %r", self.path)
if self.path.startswith("/check"):
self.server.cluster_monitor_check_queue.put("request from webserver")
self.server.log.info("Immediate status check requested")
self.send_response(204)
self.send_header('Content-length', 0)
self.send_header('Content-length', str(0))
self.end_headers()
else:
self.send_response(404)