Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add RackAwareRoundRobinPolicy for host selection #332

Merged
merged 1 commit into from
Aug 5, 2024

Conversation

sylwiaszunejko
Copy link
Collaborator

Added RackAwareRoundRobinPolicy to prefer replicas in local rack while picking a coordinator. Similar to DCAwareRoundRobinPolicy but with extra option.

Tests are added.

Fixes: #325

@sylwiaszunejko
Copy link
Collaborator Author

@Lorak-mmk @fruch ping

Copy link

@fruch fruch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • few clarifications, and a bit more documenting the internal implementation
  • I think adding integration test for this, should be very helpful, also as documentation on how to use it
  • and documentation, the docstring of the class isn't enough, we should have an example how a user should be expected to set it up. (in the java implementation was all bunch of hidden assumptions, and it took us quite time to figure why c-s wasn't working as expected), in this case we don't have a stress tool readily available to test the same in SCT, so would recommend investing documentation a bit more.

cassandra/policies.py Outdated Show resolved Hide resolved
cassandra/policies.py Show resolved Hide resolved
cassandra/policies.py Outdated Show resolved Hide resolved

def populate(self, cluster, hosts):
for (dc, rack), dc_hosts in groupby(hosts, lambda h: (self._dc(h), self._rack(h))):
self._live_hosts[(dc, rack)] = tuple(set(dc_hosts))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we get to situation we have duplicates hosts, passed to this function ? that a set is needed ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

o.k., I was just wondering how we get to that situation.

cassandra/policies.py Show resolved Hide resolved
tests/unit/test_policies.py Show resolved Hide resolved
@fruch
Copy link

fruch commented Jun 20, 2024

as for the integration test with multi dc and multi-racks, I would recommend looking into similar test in dtest, and clone the needed ideas/helper functions from there.

Copy link

@Lorak-mmk Lorak-mmk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this new policy should be the default? I think you need to modify default_lbp_factory function in cluster.py to do this (+ comment in class ExecutionProfile's load_balancing_policy attribute and perhaps some other comments). @mykaul wdyt?

In general please go over all occurences of DCAwareRoundRobinPolicy in the driver code (and docs etc) and see if they need to be replaced or complemented by RackAwareRoundRobinPolicy

cassandra/cluster.py Outdated Show resolved Hide resolved
cassandra/metadata.py Outdated Show resolved Hide resolved
cassandra/policies.py Outdated Show resolved Hide resolved
Comment on lines +350 to +357
`used_hosts_per_remote_dc` controls how many nodes in
each remote datacenter will have connections opened
against them. In other words, `used_hosts_per_remote_dc` hosts
will be considered :attr:`~.HostDistance.REMOTE` and the
rest will be considered :attr:`~.HostDistance.IGNORED`.
By default, all remote hosts are ignored.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I started to think if there should be similar mechanism for racks, but I don't immediately see how would it work.

cassandra/policies.py Outdated Show resolved Hide resolved
cassandra/policies.py Outdated Show resolved Hide resolved
@fruch
Copy link

fruch commented Jun 20, 2024

Maybe this new policy should be the default? I think you need to modify default_lbp_factory function in cluster.py to do this (+ comment in class ExecutionProfile's load_balancing_policy attribute and perhaps some other comments). @mykaul wdyt?

what's the reason ? i.e. it would be doing the exact same thing, since no parameters are passed to it.

In general please go over all occurences of DCAwareRoundRobinPolicy in the driver code (and docs etc) and see if they need to be replaced or complemented by RackAwareRoundRobinPolicy

again, don't know about replacing, but each occurence need to be understood, and figure if there's something todo with it.

cassandra/policies.py Outdated Show resolved Hide resolved
@sylwiaszunejko sylwiaszunejko force-pushed the rack_aware_policy branch 2 times, most recently from 9092800 to 8e5cf99 Compare June 26, 2024 09:55
if not dc_hosts:
return HostDistance.IGNORED

if host in list(dc_hosts)[:self.used_hosts_per_remote_dc]:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This conversion still needs to copy whole list, right? Why do this weird tuple(set()) conversion in populate instead of just holding a list?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Imo we should understand why they are using set and not just following it.
Either there is a good reason, which should be documented, or there is not - then we could change it here and maybe in other policies.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe there is no good reason

cassandra/policies.py Outdated Show resolved Hide resolved
Comment on lines 416 to 417
# the dict can change, so get candidate DCs iterating over keys of a copy
other_dcs = [dc for dc in self._dc_live_hosts.copy().keys() if dc != self.local_dc]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we cloning the whole dict here?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also why make a list and not just iterate generator directly?

tests/unit/test_policies.py Outdated Show resolved Hide resolved
cassandra/policies.py Outdated Show resolved Hide resolved
cassandra/policies.py Outdated Show resolved Hide resolved
cassandra/policies.py Outdated Show resolved Hide resolved
cassandra/policies.py Outdated Show resolved Hide resolved
@sylwiaszunejko
Copy link
Collaborator Author

v2:

  • @pytest.mark.parametrize instead of subclasses in tests
  • use list instead of tuple(set()) in populate

@mykaul
Copy link

mykaul commented Jun 28, 2024

Maybe this new policy should be the default? I think you need to modify default_lbp_factory function in cluster.py to do this (+ comment in class ExecutionProfile's load_balancing_policy attribute and perhaps some other comments). @mykaul wdyt?

In general please go over all occurences of DCAwareRoundRobinPolicy in the driver code (and docs etc) and see if they need to be replaced or complemented by RackAwareRoundRobinPolicy

Yes, I believe it should be the default. What is the default in other drivers?

@sylwiaszunejko
Copy link
Collaborator Author

sylwiaszunejko commented Jun 28, 2024

Maybe this new policy should be the default? I think you need to modify default_lbp_factory function in cluster.py to do this (+ comment in class ExecutionProfile's load_balancing_policy attribute and perhaps some other comments). @mykaul wdyt?
In general please go over all occurences of DCAwareRoundRobinPolicy in the driver code (and docs etc) and see if they need to be replaced or complemented by RackAwareRoundRobinPolicy

Yes, I believe it should be the default. What is the default in other drivers?

@mykaul
In Rust driver rack awareness is a part of default load balancing policy, there is a field for preferred rack, if it is set rack awareness is used if not it is not used https://github.com/scylladb/scylla-rust-driver/blob/main/docs/source/load-balancing/default-policy.md#preferences
In java-driver 4.x the rack-awareness feature is optional and to enable it the local rack should
be supplied through the configuration. The feature is disabled by default and unlike the local datacenter it will not be implicitly fetched from the provided contact points. https://github.com/scylladb/java-driver/tree/scylla-4.x/manual/core/load_balancing#rack-awareness

@mykaul
Copy link

mykaul commented Jun 28, 2024

In Rust driver rack awareness is a part of default load balancing policy, there is a field for preferred rack, if it is set rack awareness is used if not it is not used https://github.com/scylladb/scylla-rust-driver/blob/main/docs/source/load-balancing/default-policy.md#preferences

This sounds like what we should support as well.

@sylwiaszunejko
Copy link
Collaborator Author

In Rust driver rack awareness is a part of default load balancing policy, there is a field for preferred rack, if it is set rack awareness is used if not it is not used https://github.com/scylladb/scylla-rust-driver/blob/main/docs/source/load-balancing/default-policy.md#preferences

This sounds like what we should support as well.

We can pass RackAwareRoundRobinPolicy as a child policy in TokenAwarePolicy, isn't that an equivalent of specifying field? In Rust driver DC awareness is handled by a field and in python-driver by a child policy.

If we would want to replace all occurrences of DCAware with RackAware it would not be obvious, in this function for example: https://github.com/scylladb/python-driver/blob/master/cassandra/cluster.py#L240-#L243 it will hard to use Rack aware instead of DC aware policy, because to be consistent with rust driver and java driver 4.x we decided that local_dc and local_rack have to be specified.

@Lorak-mmk
Copy link

Now that we decided to not do implicit dc / rack it makes no sense imo to make this policy default. Because without specifying dc / rack it won't do anything, and as @sylwiaszunejko mentioned specifying it is equivalent to using it as child policy to TokenAware.

@sylwiaszunejko
Copy link
Collaborator Author

I added some kind of pattern for integration tests, but I am not really sure how the tests should look like. I could use some help with that @Lorak-mmk @fruch

response = self.session.execute('SELECT * from system.local')
queried_hosts.update(response.response_future.attempted_hosts)
queried_hosts = set(host.address for host in queried_hosts)
# Performe some checks
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can use a similar approach as in shard aware tests, and use tracing to validate requests arrived at the correct nodes

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would need to be done carefully because of heat weighed load balancing in Scylla - if it kicks in, then entries in tracing will be different than expected. I've run into this before with shard awareness tests.

tests/integration/standard/test_rack_aware_policy.py Outdated Show resolved Hide resolved
tests/integration/standard/test_rack_aware_policy.py Outdated Show resolved Hide resolved
@sylwiaszunejko
Copy link
Collaborator Author

I am not sure what is going on with the CI. I cannot download full logs it keeps failing every time. Locally all tests pass

@sylwiaszunejko
Copy link
Collaborator Author

I am still fighting with the CI, this part of the code in policies.py seems to be problematic:

for host in islice(cycle(local_rack_live), pos, pos + len(local_rack_live)):
    yield host

Even though local_rack_live = [<Host: 127.0.0.2:9042 DC1>, <Host: 127.0.0.1:9042 DC1>] first host is yield two times, and because neither of these cases results in a successful read (node2 is stopped), driver uses host with not local rack.
I am not sure why that happens.

@sylwiaszunejko sylwiaszunejko force-pushed the rack_aware_policy branch 2 times, most recently from 6fcc371 to 5cd5ecf Compare July 23, 2024 18:15
@sylwiaszunejko
Copy link
Collaborator Author

I am still fighting with the CI, this part of the code in policies.py seems to be problematic:

for host in islice(cycle(local_rack_live), pos, pos + len(local_rack_live)):
    yield host

Even though local_rack_live = [<Host: 127.0.0.2:9042 DC1>, <Host: 127.0.0.1:9042 DC1>] first host is yield two times, and because neither of these cases results in a successful read (node2 is stopped), driver uses host with not local rack. I am not sure why that happens.

I changed this slice logic to just iterating through list and the issue stopped reproducing. @Lorak-mmk do you have any idea why?

@sylwiaszunejko
Copy link
Collaborator Author

@Lorak-mmk ping

@Lorak-mmk Lorak-mmk removed their assignment Jul 31, 2024
@sylwiaszunejko
Copy link
Collaborator Author

It turns out that using islice and cycle combination results in broken iterator when there are concurent modification to the list, I checked it using this simple program:

import threading
import time
from itertools import cycle, islice

# Shared list among threads
local_rack_live = ['host1', 'host2', 'host3', 'host4', 'host5']

def remove_elements():
    """Thread function to remove elements from the list."""
    while local_rack_live:
        time.sleep(1)  # Simulate some delay
        removed_host = local_rack_live.pop(0)
        print(f"Removed: {removed_host}")

def iterate_and_yield():
    """Thread function to iterate through the list and yield elements."""
    pos = 0
    while local_rack_live:
        # Ensure pos is valid
        pos = (pos % len(local_rack_live)) if local_rack_live else 0
        for host in islice(cycle(local_rack_live), pos, pos + len(local_rack_live)):
            print(f"Yielding from list: {local_rack_live} host: {host}")
            time.sleep(1)  # Simulate some delay
        pos += 1

# Create threads
remove_thread = threading.Thread(target=remove_elements)
iterate_thread = threading.Thread(target=iterate_and_yield)

# Start threads
remove_thread.start()
iterate_thread.start()

# Wait for threads to complete
remove_thread.join()
iterate_thread.join()

The output looks like that:

Yielding from list: ['host1', 'host2', 'host3', 'host4', 'host5'] host: host1
Removed: host1
Yielding from list: ['host2', 'host3', 'host4', 'host5'] host: host3
Yielding from list: ['host2', 'host3', 'host4', 'host5'] host: host4
Removed: host2
Removed: host3
Yielding from list: ['host4', 'host5'] host: host1
Yielding from list: ['host4', 'host5'] host: host3
Removed: host4
Yielding from list: ['host5'] host: host5
Removed: host5

Clearly it doesn't work returning e.g. host1 when the list looks like [host4, host5].
Because of that we need to use copy of local_rack_live to make sure that everything is correct. In other policies there is the same islice and cycle logic and it is similarly broken; I will address that in a separate PR.

@sylwiaszunejko sylwiaszunejko force-pushed the rack_aware_policy branch 2 times, most recently from 2bdae06 to 3532186 Compare August 1, 2024 16:38
@sylwiaszunejko
Copy link
Collaborator Author

In other policies there is the same islice and cycle logic and it is similarly broken; I will address that in a separate PR.

I have to correct myself here. In other policies such as DCAwareRoundRobinPolicy hosts are stored in a tuple and when they are changes (host up/down) the value in the directory is reassigned so the tuple we get from the dict before the change is unaffected e.g. here: https://github.com/scylladb/python-driver/blob/master/cassandra/policies.py#L272.
So my change from tuples to lists (and changing the list not creating a new one when host down/up) created a problem with the logic similar to other policies.

@Lorak-mmk
Copy link

Lorak-mmk commented Aug 2, 2024

In other policies there is the same islice and cycle logic and it is similarly broken; I will address that in a separate PR.

I have to correct myself here. In other policies such as DCAwareRoundRobinPolicy hosts are stored in a tuple and when they are changes (host up/down) the value in the directory is reassigned so the tuple we get from the dict before the change is unaffected e.g. here: https://github.com/scylladb/python-driver/blob/master/cassandra/policies.py#L272. So my change from tuples to lists (and changing the list not creating a new one when host down/up) created a problem with the logic similar to other policies.

This approach makes sense - copy in on_up / on_down because those are rare so can be expensive, don't copy in make_query_plan because we want it to be fast.
Maybe thats why they used tuples - to prevent accidental modification.

@sylwiaszunejko
Copy link
Collaborator Author

Maybe thats why they used tuples - to prevent accidental modification.

Maybe but it was not that easy to deduct from the code before. So I guess the optimal solution will be to return to using tuples and copy in on_up / on_down, do you agree? @Lorak-mmk

cassandra/policies.py Outdated Show resolved Hide resolved
cassandra/policies.py Outdated Show resolved Hide resolved
cassandra/metadata.py Outdated Show resolved Hide resolved
@Lorak-mmk
Copy link

Maybe thats why they used tuples - to prevent accidental modification.

Maybe but it was not that easy to deduct from the code before. So I guess the optimal solution will be to return to using tuples and copy in on_up / on_down, do you agree? @Lorak-mmk

I agree. Please also verify that other policies are implemented correctly in this regard.
It would also be beneficial to put a comment explaining all of this, maybe at the top of the file, so that the next person looking at this has easier time understanding.

cassandra/policies.py Outdated Show resolved Hide resolved
Comment on lines +431 to +433
if host not in current_rack_hosts:
self._live_hosts[(dc, rack)] = current_rack_hosts + (host, )
current_dc_hosts = self._dc_live_hosts.get(dc, ())
if host not in current_dc_hosts:
self._dc_live_hosts[dc] = current_dc_hosts + (host, )

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that DCAwarePolicy does the same thing. I wonder if it is possible for on_up to be marked on already present host. It probably is, otherwise those checks wouldn't exist. Why would driver do this I have no idea.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure to be honest

cassandra/policies.py Outdated Show resolved Hide resolved
cassandra/policies.py Outdated Show resolved Hide resolved
@sylwiaszunejko sylwiaszunejko force-pushed the rack_aware_policy branch 2 times, most recently from 33af849 to ee99b31 Compare August 5, 2024 09:21
@sylwiaszunejko sylwiaszunejko merged commit dbb4552 into scylladb:master Aug 5, 2024
19 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support RackAware load balancing policy
5 participants