-
Notifications
You must be signed in to change notification settings - Fork 0
/
ServicePublish.go
108 lines (95 loc) · 2.31 KB
/
ServicePublish.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
package GrpcConnectionPool
import (
"fmt"
"github.com/hashicorp/consul/api"
"log"
"sync"
)
type Subscriber interface {
notice(list *healthServiceList)
}
type Publisher interface {
Subscript(subscriber Subscriber)
publish(service string, serviceList []string)
}
type Watch interface {
watchService(service string)
watching()
}
var once sync.Once
type ServiceList struct {
lock sync.RWMutex
subscriptList []Subscriber
serviceList []string
config *api.Config
client *api.Client
address string
}
//创建新的健康服务发布列表
func InitPublishServiceList(service []string) *ServiceList {
address := fmt.Sprintf("%s:%d", IpAddress, Port)
serviceList := &ServiceList{
address: address,
serviceList: service,
}
once.Do(serviceList.newPublish)
serviceList.watching()
return serviceList
}
func (this *ServiceList) newPublish() {
config := api.DefaultConfig()
config.Address = this.address
client, err := api.NewClient(config)
defer func() {
if err := recover(); err != nil {
log.Println(err)
}
}()
this.config = config
this.client = client
if err != nil {
panic(err)
}
this.subscriptList = make([]Subscriber, 0)
}
//新增订阅
func (this *ServiceList) Subscript(subscriber Subscriber) {
this.lock.Lock()
this.subscriptList = append(this.subscriptList, subscriber)
this.lock.Unlock()
}
//发布新健康列表
func (this *ServiceList) publish(service string, serviceList []string) {
this.lock.RLock()
defer this.lock.RUnlock()
for i := 0; i < len(this.subscriptList); i++ {
this.subscriptList[i].notice(&healthServiceList{service: service, addressList: serviceList})
}
}
func (this *ServiceList) watching() {
for i := 0; i < len(this.serviceList); i++ {
go this.watchService(this.serviceList[i])
}
}
func (this *ServiceList) watchService(service string) {
var lastIndex uint64
defer func() {
if err := recover(); err != nil {
log.Println(err)
}
}()
for {
sv := make([]string, 0)
services, metainfo, err := this.client.Health().Service(service, "", true, &api.QueryOptions{
WaitIndex: lastIndex,
})
if err != nil {
panic(err)
}
lastIndex = metainfo.LastIndex
for i := 0; i < len(services); i++ {
sv = append(sv, fmt.Sprintf("%s:%d", services[i].Service.Address, services[i].Service.Port))
}
this.publish(service, sv)
}
}