From 4aeea9bad15342c9868bfa290ba987c75ff00760 Mon Sep 17 00:00:00 2001 From: Stephen von Takach Date: Thu, 11 Apr 2024 10:19:23 +1000 Subject: [PATCH] feat: minor refactor with helper methods --- shard.yml | 2 +- spec/lookup_spec.cr | 17 +++++- src/redis_service_manager.cr | 6 +- src/redis_service_manager/clustering.cr | 5 +- .../clustering/discovery.cr | 55 +++++++++++++++---- 5 files changed, 70 insertions(+), 15 deletions(-) diff --git a/shard.yml b/shard.yml index ba03b3f..44a8858 100644 --- a/shard.yml +++ b/shard.yml @@ -1,5 +1,5 @@ name: redis_service_manager -version: 2.1.2 +version: 3.0.0 dependencies: ulid: diff --git a/spec/lookup_spec.cr b/spec/lookup_spec.cr index 9f07aee..248ff34 100644 --- a/spec/lookup_spec.cr +++ b/spec/lookup_spec.cr @@ -31,6 +31,7 @@ describe RedisServiceManager do end node1.register + sleep 0.1 # ensure node1 is master node2.register loop do break if node1.cluster_size == 2 @@ -44,10 +45,24 @@ describe RedisServiceManager do # Get the cluster state lookup = Clustering::Discovery.new RedisServiceManager.new("spec", REDIS_URL) - hash = lookup.nodes + hash = lookup.rendezvous hash.nodes.includes?("http://node1/node1").should be_true hash.nodes.includes?("http://node2/node2").should be_true + lookup.find?("test").should eq URI.parse("http://node2/node2") + lookup.find("test").should eq URI.parse("http://node2/node2") + + lookup["testing"].should eq URI.parse("http://node1/node1") + lookup["testing"]?.should eq URI.parse("http://node1/node1") + + lookup_local = Clustering::Discovery.new node1 + lookup_local.own_node?("testing").should be_true + lookup_local.own_node?("test").should be_false + lookup_local.nodes.should eq [ + URI.parse("http://node1/node1"), + URI.parse("http://node2/node2"), + ] + node2.unregister node1.unregister end diff --git a/src/redis_service_manager.cr b/src/redis_service_manager.cr index 59ac7f8..02aa0ec 100644 --- a/src/redis_service_manager.cr +++ b/src/redis_service_manager.cr @@ -310,7 +310,7 @@ class RedisServiceManager < Clustering {hash, new_list - delete} end - def nodes : RendezvousHash + def rendezvous : RendezvousHash if registered? @rendezvous_hash else @@ -319,6 +319,10 @@ class RedisServiceManager < Clustering RendezvousHash.new(nodes: keys.map { |key| hash[key] }) end end + + def finalize + unregister + end end require "./redis_service_manager/*" diff --git a/src/redis_service_manager/clustering.cr b/src/redis_service_manager/clustering.cr index 336c38c..2348e65 100644 --- a/src/redis_service_manager/clustering.cr +++ b/src/redis_service_manager/clustering.cr @@ -14,6 +14,9 @@ abstract class Clustering getter rebalance_callbacks : Array((RendezvousHash, RebalanceComplete) ->) getter cluster_stable_callbacks : Array(->) + # the service uri for this host + abstract def uri : String + # Called when the cluster has changed def on_rebalance(&callback : (RendezvousHash, RebalanceComplete) ->) rebalance_callbacks << callback @@ -40,7 +43,7 @@ abstract class Clustering abstract def watching? : Bool # returns the list of known nodes - abstract def nodes : RendezvousHash + abstract def rendezvous : RendezvousHash end require "./clustering/*" diff --git a/src/redis_service_manager/clustering/discovery.cr b/src/redis_service_manager/clustering/discovery.cr index df80b2b..98a0d5b 100644 --- a/src/redis_service_manager/clustering/discovery.cr +++ b/src/redis_service_manager/clustering/discovery.cr @@ -1,8 +1,9 @@ +require "uri" require "../clustering" class Clustering::Discovery def initialize(@cluster : Clustering, @ttl : Time::Span = 5.seconds) - @nodes = RendezvousHash.new + @rendezvous = RendezvousHash.new @last_updated = Time.unix(0) @timer = Time.monotonic - (@ttl + 1.second) @mutex = Mutex.new @@ -12,37 +13,69 @@ class Clustering::Discovery getter last_updated : Time getter rebalance_callbacks : Array(RendezvousHash ->) + getter uri : URI { URI.parse @cluster.uri } def on_rebalance(&callback : RendezvousHash ->) @rebalance_callbacks << callback end - def nodes : RendezvousHash - if @cluster.watching? || !expired? + def rendezvous : RendezvousHash + if @cluster.watching? + @cluster.rendezvous + elsif !expired? # this is up to date as we are listening to cluster.on_rebalance callback # or the current list hasn't had its TTL expire - @nodes + @rendezvous else @mutex.synchronize do if expired? - @nodes = @cluster.nodes + @rendezvous = @cluster.rendezvous @last_updated = Time.utc @timer = Time.monotonic end end - @nodes + @rendezvous end end - protected def update_node_list(nodes : RendezvousHash) - @nodes = nodes + # Consistent hash lookup + def find?(key : String) : URI? + rendezvous.find?(key).try { |node| URI.parse(node) } + end + + # Consistent hash lookup + def find(key : String) : URI + URI.parse(rendezvous.find(key)) + end + + def [](key) + find(key) + end + + def []?(key) + find?(key) + end + + # Determine if key maps to current node + def own_node?(key : String) : Bool + # don't parse into URI + rendezvous.find?(key) == @cluster.uri + end + + # Returns the list of node URIs from the `rendezvous-hash` + def nodes : Array(URI) + rendezvous.nodes.map { |node| URI.parse(node) } + end + + protected def update_node_list(rendezvous : RendezvousHash) + @rendezvous = rendezvous @last_updated = Time.utc @timer = Time.monotonic - rebalance_callbacks.each { |callback| spawn { perform(callback, nodes) } } + rebalance_callbacks.each { |callback| spawn { perform(callback, rendezvous) } } end - protected def perform(callback, nodes) - callback.call(nodes) + protected def perform(callback, rendezvous) + callback.call(rendezvous) rescue error Log.error(exception: error) { "rebalance callback failed" } end