Skip to content

Commit

Permalink
Merge pull request #37 from ihippik/healthz
Browse files Browse the repository at this point in the history
Add K8S probes
  • Loading branch information
ihippik authored Jun 16, 2024
2 parents af4a3de + f532334 commit 8622108
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 7 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Dependencies Stage
FROM golang:1.22-alpine AS base
FROM golang:1.22.4-alpine AS base
LABEL maintainer="Konstantin Makarov <hippik80@gmail.com>"

WORKDIR /listener
Expand Down
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,9 @@ You can take metrics by specifying an endpoint for Prometheus in the configurati
| published_events_total | the total number of published events | `subject`, `table` |
| filter_skipped_events_total | the total number of skipped events | `table` |

### Kubernetes
Application initializes a web server (*if a port is specified in the configuration*) with two endpoints
for readiness `/ready` and liveness `/healthz` probes.

## Docker

Expand Down
13 changes: 10 additions & 3 deletions cmd/wal-listener/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"log/slog"
"os"
"os/signal"
"syscall"

scfg "github.com/ihippik/config"
"github.com/urfave/cli/v2"
Expand Down Expand Up @@ -35,6 +37,9 @@ func main() {
},
},
Action: func(c *cli.Context) error {
ctx, cancel := signal.NotifyContext(c.Context, syscall.SIGINT, syscall.SIGTERM)
defer cancel()

cfg, err := config.InitConfig(c.String("config"))
if err != nil {
return fmt.Errorf("get config: %w", err)
Expand All @@ -44,7 +49,7 @@ func main() {
return fmt.Errorf("validate config: %w", err)
}

if err := scfg.InitSentry(cfg.Monitoring.SentryDSN, version); err != nil {
if err = scfg.InitSentry(cfg.Monitoring.SentryDSN, version); err != nil {
return fmt.Errorf("init sentry: %w", err)
}

Expand All @@ -57,7 +62,7 @@ func main() {
return fmt.Errorf("pgx connection: %w", err)
}

pub, err := factoryPublisher(c.Context, cfg.Publisher, logger)
pub, err := factoryPublisher(ctx, cfg.Publisher, logger)
if err != nil {
return fmt.Errorf("factory publisher: %w", err)
}
Expand All @@ -78,7 +83,9 @@ func main() {
config.NewMetrics(),
)

if err := service.Process(c.Context); err != nil {
go service.InitHandlers(ctx)

if err := service.Process(ctx); err != nil {
slog.Error("service process failed", "err", err.Error())
}

Expand Down
3 changes: 2 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,15 @@ type Config struct {
// ListenerCfg path of the listener config.
type ListenerCfg struct {
SlotName string `valid:"required"`
ServerPort int
AckTimeout time.Duration
RefreshConnection time.Duration `valid:"required"`
HeartbeatInterval time.Duration `valid:"required"`
Filter FilterStruct
TopicsMap map[string]string
}

// PublisherCfg represent configuration for any types publisher.
// PublisherCfg represent configuration for any publisher types.
type PublisherCfg struct {
Type PublisherType `valid:"required"`
Address string
Expand Down
1 change: 1 addition & 0 deletions config_example.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
listener:
serverPort: 80 # k8s probes, optional
slotName: myslot_1
refreshConnection: 30s
heartbeatInterval: 10s
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/ihippik/wal-listener/v2

go 1.22
go 1.22.4

require (
cloud.google.com/go/pubsub v1.37.0
Expand Down
87 changes: 86 additions & 1 deletion listener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@ import (
"context"
"fmt"
"log/slog"
"net/http"
"os"
"os/signal"
"strconv"
"sync"
"sync/atomic"
"time"

"github.com/jackc/pgx"
Expand Down Expand Up @@ -62,6 +65,7 @@ type Listener struct {
repository repository
parser parser
lsn uint64
isAlive atomic.Bool
}

// NewWalListener create and initialize new service instance.
Expand All @@ -85,6 +89,84 @@ func NewWalListener(
}
}

// InitHandlers init web handlers for liveness & readiness k8s probes.
func (l *Listener) InitHandlers(ctx context.Context) {
const defaultTimeout = 500 * time.Millisecond

if l.cfg.Listener.ServerPort == 0 {
l.log.Debug("web server port for probes not specified, skip")
return
}

handler := http.NewServeMux()
handler.HandleFunc("GET /healthz", l.liveness)
handler.HandleFunc("GET /ready", l.readiness)

addr := ":" + strconv.Itoa(l.cfg.Listener.ServerPort)
srv := http.Server{
Addr: addr,
Handler: handler,
ReadTimeout: defaultTimeout,
WriteTimeout: defaultTimeout,
}

go func() {
if err := srv.ListenAndServe(); err != nil {
l.log.Error("error starting http listener", "err", err)
}
}()

l.log.Debug("web handlers were initialised", slog.String("addr", addr))

<-ctx.Done()
}

const contentTypeTextPlain = "text/plain"

func (l *Listener) liveness(w http.ResponseWriter, r *http.Request) {
var (
respCode = http.StatusOK
resp = []byte(`ok`)
)

w.Header().Set("Content-Type", contentTypeTextPlain)

if !l.replicator.IsAlive() || !l.repository.IsAlive() {
resp = []byte("failed")
respCode = http.StatusInternalServerError

l.log.Warn("liveness probe failed")
}

w.WriteHeader(respCode)

if _, err := w.Write(resp); err != nil {
l.log.Error("liveness: error writing response", "err", err)
}
}

func (l *Listener) readiness(w http.ResponseWriter, r *http.Request) {
var (
respCode = http.StatusOK
resp = []byte(`ok`)
)

w.Header().Set("Content-Type", contentTypeTextPlain)

if !l.isAlive.Load() {
resp = []byte("failed")
respCode = http.StatusInternalServerError

l.log.Warn("readiness probe failed")
}

w.WriteHeader(respCode)

if _, err := w.Write(resp); err != nil {
l.log.Error("liveness: error writing response", "err", err)
}
}

// Process is main service entry point.
func (l *Listener) Process(ctx context.Context) error {
logger := l.log.With("slot_name", l.cfg.Listener.SlotName)
Expand Down Expand Up @@ -346,9 +428,12 @@ func (l *Listener) SendPeriodicHeartbeats(ctx context.Context) {
case <-heart.C:
if err := l.SendStandbyStatus(); err != nil {
l.log.Error("failed to send heartbeat status", "err", err)
l.isAlive.Store(false)

continue
}

l.isAlive.Store(true)
l.log.Debug("sending periodic heartbeat status")
}
}
Expand All @@ -365,7 +450,7 @@ func (l *Listener) SendStandbyStatus() error {

standbyStatus.ReplyRequested = 0

if err := l.replicator.SendStandbyStatus(standbyStatus); err != nil {
if err = l.replicator.SendStandbyStatus(standbyStatus); err != nil {
return fmt.Errorf("unable to send StandbyStatus object: %w", err)
}

Expand Down

0 comments on commit 8622108

Please sign in to comment.