Skip to content

Commit

Permalink
refactor watcher
Browse files Browse the repository at this point in the history
  • Loading branch information
reddec committed Dec 9, 2021
1 parent 129fe26 commit d84324e
Show file tree
Hide file tree
Showing 3 changed files with 228 additions and 175 deletions.
176 changes: 111 additions & 65 deletions internal/kube_watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,13 @@ package internal

import (
"context"
"log"
"sort"
"sync"
"time"

v12 "k8s.io/api/networking/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
cache2 "k8s.io/client-go/tools/cache"
)

const (
Expand All @@ -20,68 +18,127 @@ const (
syncInterval = 30 * time.Second
)

func WatchKubernetes(ctx context.Context, clientset *kubernetes.Clientset, reciever interface {
type Receiver interface {
Set(ingresses []Ingress)
}

func WatchKubernetes(global context.Context, clientset *kubernetes.Clientset, reciever interface {
Set(ingresses []Ingress)
}) {
var cache = make(map[string]Ingress)
var lock sync.Mutex
var toDetect = make(chan Ingress, 1024)
defer close(toDetect)

go runLogoFetcher(ctx, toDetect, func(ing Ingress) {
lock.Lock()
defer lock.Unlock()
if v, ok := cache[ing.UID]; ok && v.LogoURL == "" {
v.LogoURL = ing.LogoURL
cache[ing.UID] = v
reciever.Set(toList(cache))
ctx, cancel := context.WithCancel(global)
defer cancel()

watcher := newWatcher(reciever)

var wg sync.WaitGroup

wg.Add(1)
go func() {
defer wg.Done()
defer cancel()
watcher.runWatcher(ctx, clientset)
}()

wg.Add(1)
go func() {
defer wg.Done()
defer cancel()
watcher.runLogoFetcher(ctx)
}()
wg.Wait()
}

func newWatcher(receiver Receiver) *kubeWatcher {
return &kubeWatcher{
cache: make(map[string]Ingress),
receiver: receiver,
checkLogos: make(chan struct{}, 1),
}
}

type kubeWatcher struct {
cache map[string]Ingress
lock sync.RWMutex
receiver Receiver
checkLogos chan struct{}
}

func (kw *kubeWatcher) OnAdd(obj interface{}) {
kw.upsertIngress(obj)
}

func (kw *kubeWatcher) OnUpdate(_, newObj interface{}) {
kw.upsertIngress(newObj)
}

func (kw *kubeWatcher) OnDelete(obj interface{}) {
defer kw.notify()

kw.lock.Lock()
defer kw.lock.Unlock()
ing := obj.(*v12.IngressClass)
delete(kw.cache, string(ing.UID))
}

func (kw *kubeWatcher) runLogoFetcher(ctx context.Context) {
for {
for _, ing := range kw.items() {
if ing.LogoURL == "" && len(ing.URLs) > 0 {
ing.LogoURL = detectIconURL(ctx, ing.URLs[0])
if ing.LogoURL != "" {
kw.updateLogo(ing)
}
}
}
})
select {
case <-ctx.Done():
return
case <-kw.checkLogos:
}
}
}

func (kw *kubeWatcher) runWatcher(ctx context.Context, clientset *kubernetes.Clientset) {
informerFactory := informers.NewSharedInformerFactory(clientset, syncInterval)
informer := informerFactory.Networking().V1().Ingresses().Informer()

informer.AddEventHandler(cache2.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
ing := obj.(*v12.Ingress)
ingress := inspectIngress(ing)
informer.AddEventHandler(kw)
informer.Run(ctx.Done())
}

func (kw *kubeWatcher) upsertIngress(obj interface{}) {
defer kw.notify()
ing := obj.(*v12.Ingress)
ingress := inspectIngress(ing)

lock.Lock()
defer lock.Unlock()
cache[string(ing.UID)] = ingress
kw.lock.Lock()
defer kw.lock.Unlock()
kw.cache[ingress.UID] = ingress
}

log.Println("new ingress", ing.UID, ":", ing.Name, "in", ing.Namespace)
reciever.Set(toList(cache))
select {
case toDetect <- ingress:
default:
}
},
UpdateFunc: func(_, newObj interface{}) {
ing := newObj.(*v12.Ingress)
ingress := inspectIngress(ing)

lock.Lock()
defer lock.Unlock()
cache[string(ing.UID)] = ingress

log.Println("updated ingress", ing.UID, ":", ing.Name, "in", ing.Namespace)
reciever.Set(toList(cache))
select {
case toDetect <- ingress:
default:
}
},
DeleteFunc: func(obj interface{}) {
ing := obj.(*v12.IngressClass)
func (kw *kubeWatcher) notify() {
kw.receiver.Set(kw.items())
select {
case kw.checkLogos <- struct{}{}:
default:
}
}

lock.Lock()
defer lock.Unlock()
delete(cache, string(ing.UID))
func (kw *kubeWatcher) items() []Ingress {
kw.lock.RLock()
defer kw.lock.RUnlock()
return toList(kw.cache)
}

log.Println("ingress removed", ing.UID, ":", ing.Name, "in", ing.Namespace)
},
})
informer.Run(ctx.Done())
func (kw *kubeWatcher) updateLogo(ingress Ingress) {
kw.lock.Lock()
defer kw.lock.Unlock()
old, exists := kw.cache[ingress.UID]
if !exists || old.LogoURL != "" {
return
}
old.LogoURL = ingress.LogoURL
kw.cache[ingress.UID] = old
}

func inspectIngress(ing *v12.Ingress) Ingress {
Expand Down Expand Up @@ -126,14 +183,3 @@ func toURLs(spec v12.IngressSpec) []string {

return urls
}

func runLogoFetcher(ctx context.Context, ch <-chan Ingress, fn func(ing Ingress)) {
for ing := range ch {
if ing.LogoURL == "" && len(ing.URLs) > 0 {
ing.LogoURL = detectIconURL(ctx, ing.URLs[0])
if ing.LogoURL != "" {
fn(ing)
}
}
}
}
117 changes: 117 additions & 0 deletions internal/logo_fetcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package internal

import (
"context"
"fmt"
"log"
"net/http"
"net/url"
"strings"

"golang.org/x/net/html"
)

func detectIconURL(ctx context.Context, url string) string {
u, err := mainSrcPageIcon(ctx, url)
if err == nil {
return u
}
log.Println("detect icon from main page", url, ":", err)
// fallback to classical icon
faviconURL := url + "/favicon.ico"
if pingURL(ctx, faviconURL) == nil {
return faviconURL
}
return ""
}

func mainSrcPageIcon(ctx context.Context, pageURL string) (string, error) {
// try to load main page and check meta headers
req, err := http.NewRequestWithContext(ctx, http.MethodGet, pageURL, nil)
if err != nil {
return "", err
}
res, err := http.DefaultClient.Do(req)
if err != nil {
return "", err
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
return "", fmt.Errorf("non-200 code")
}
doc, err := html.Parse(res.Body)
if err != nil {
return "", fmt.Errorf("parse HTML: %w", err)
}
logoURL := findIcons(doc)
if logoURL == "" {
return "", fmt.Errorf("no logo in meta")
}
if u, err := url.Parse(logoURL); err == nil && !u.IsAbs() && !strings.HasPrefix(logoURL, "/") {
logoURL = "/" + logoURL
}
return logoURL, nil
}

func findIcons(doc *html.Node) string {
priorityList := []string{"apple-touch-icon", "shortcut icon", "icon", "alternate icon"}

root := findChild(doc, "html")
if root == nil {
return ""
}

head := findChild(root, "head")
if head == nil {
return ""
}

var links = make(map[string]string)
for child := head.FirstChild; child != nil; child = child.NextSibling {
if child.Type == html.ElementNode && child.Data == "link" {
var key string
var value string
for _, attr := range child.Attr {
if attr.Key == "rel" {
key = attr.Val
} else if attr.Key == "href" {
value = attr.Val
}
}
if key != "" && value != "" {
links[key] = value
}
}
}
for _, name := range priorityList {
if u, ok := links[name]; ok {
return u
}
}
return ""
}

func pingURL(ctx context.Context, url string) error {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return err
}
res, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
return fmt.Errorf("non-200 code")
}
return nil
}

func findChild(doc *html.Node, name string) *html.Node {
for child := doc.FirstChild; child != nil; child = child.NextSibling {
if child.Type == html.ElementNode && child.Data == name {
return child
}
}
return nil
}
Loading

0 comments on commit d84324e

Please sign in to comment.