Skip to content

Commit

Permalink
refactor(discovery): style changes, remove node from local nodes on u…
Browse files Browse the repository at this point in the history
…nregister
  • Loading branch information
caspiano committed Jan 31, 2020
1 parent 4b769c2 commit d03b830
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 24 deletions.
2 changes: 1 addition & 1 deletion shard.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: hound-dog
version: 1.1.3
version: 1.1.4
crystal: 0.32.0
license: MIT

Expand Down
2 changes: 0 additions & 2 deletions spec/discovery_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ module HoundDog

chan.receive.should be_nil
discovery.unregister
sleep 0.2
discovery.nodes.should eq [node0]
end

Expand All @@ -61,7 +60,6 @@ module HoundDog

discovery.own_node?("hello").should be_true
discovery.unregister
sleep 0.2
discovery.nodes.should be_empty
end

Expand Down
59 changes: 38 additions & 21 deletions src/hound-dog/discovery.cr
Original file line number Diff line number Diff line change
Expand Up @@ -9,78 +9,95 @@ require "./settings"
# Transparently manage service discovery through consistent hashing and ETCD
module HoundDog
class Discovery
getter service, ip, port, node
getter service, ip, port, node, rendezvous
private getter callback : Proc(Void)? = nil
private getter service_events : Service

def initialize(
@service : String,
@ip : String = "127.0.0.1",
@port : Int32 = 8080
)
@node = {ip: @ip, port: @port}
@node = {ip: ip, port: port}

# Get service nodes
@service_events = Service.new(
service: @service,
node: @node,
service: service,
node: node,
)

# Initialiase the hash
@rendezvous = RendezvousHash.new(nodes: current_nodes)
@rendezvous = RendezvousHash.new(nodes: etcd_nodes)

# Prepare watchfeed
watchfeed = @service_events.monitor(&->handle_service_message(Service::Event))
watchfeed = service_events.monitor(&->handle_service_message(Service::Event))

# ASYNC! spawn service monitoring
spawn(same_thread: true) { watchfeed.start }
Fiber.yield
end

# Consistent hash lookup
def find(key : String) : Service::Node?
@rendezvous.find(key).try &->Service.node(String)
def find?(key : String) : Service::Node?
rendezvous.find?(key).try &->Service.node(String)
end

# Consistent hash lookup
def find!(key : String) : Service::Node
Service.node(@rendezvous.find!(key))
def find(key : String) : Service::Node
Service.node(rendezvous.find(key))
end

def [](key)
find(key)
end

def []?(key)
find?(key)
end

# Determine if key maps to current node
#
def own_node?(key : String) : Bool
service_value = @rendezvous.find(key)
!service_value.nil? && Service.node(service_value) == @node
service_value = rendezvous[key]?
!!(service_value && Service.node(service_value) == node)
end

# Consistent hash nodes
#
def nodes : Array(Service::Node)
@rendezvous.nodes.map &->Service.node(String)
rendezvous.nodes.map &->Service.node(String)
end

delegate register, unregister, lease_id, registered?, to: @service_events
delegate register, lease_id, registered?, unmonitor, to: service_events

# Register service
#
def register(&callback : Proc(Void))
@callback = callback
@service_events.register
service_events.register
end

def unregister
service_events.unregister
rendezvous.remove?(Service.key_value(node))

nil
end

# Event handler
#
def handle_service_message(event : Service::Event)
@rendezvous.replace_nodes(current_nodes)
private def handle_service_message(event : Service::Event)
rendezvous.nodes = etcd_nodes
# Trigger change callback if present
callback.not_nil!.call if callback
callback.try &.call
end

private def current_nodes
Service.nodes(@service).map { |n| Service.key_value(n) }
private def etcd_nodes
Service.nodes(service).map { |n| Service.key_value(n) }
end

def finalize
@service_events.unmonitor
unmonitor
unregister
end
end
Expand Down

0 comments on commit d03b830

Please sign in to comment.