Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add max_bandwidth option #101

Merged
merged 3 commits into from
Nov 29, 2017
Merged

Add max_bandwidth option #101

merged 3 commits into from
Nov 29, 2017

Conversation

kyleknap
Copy link
Contributor

Allows user to configure the max bandwidth consumption for the streaming of data for uploads and downloads. The value is set in terms of bytes per second. I would recommend trying it out with the CLI PR that I am planning to send up soon.

pass


class TimeUtils(object):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason we're not using the time module directly? Just to restrict the interface?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is so you inject your own implementations. We do this a lot in s3transfer with OSUtils and this pattern is done a lot with chalice. If you didn't, you would have to patch a bunch of stuff to affect the behavior of time.

Copy link
Contributor

@joguSD joguSD Nov 28, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I mean is we don't need a wrapper class to do this. time_utils=time would also work as our restraint is that the thing passed has a time and sleep callable. The only benefit I see is that there is an example of what interface the time_module needs to meet.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we ever need anything more complicated in the future, this will allow us to have it. OSUtils has things like that which aren't direct mappings.

projected_rate = self._rate_tracker.get_projected_rate(amt, time_now)
return projected_rate > self._max_rate

def _consume_for_scheduled_consumption(self, amt, request_token, time_now):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some of these private function names are a little confusing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

class BandwidthRateTracker(object):
def __init__(self, a=0.8):
"""Tracks the rate of bandwidth consumption"""
self._a = a
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have no idea what this variable is for.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah can you rename this alpha or something more description? I wouldn't expect everyone to know the vars used for EMA formulas.

def _calculate_rate(self, amt, time_at_consumption):
return amt/(time_at_consumption - self._last_time)

def _calculate_ema_rate(self, amt, time_at_consumption):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does ema mean?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exponential moving average. I can see if I can rename it or add a comment.

# This is just a smoke test to make sure that the limiter is
# being used and not necessary its exactness. So we set the maximum
# bandwidth to len(content)/2 per sec and make sure that it is
# noticeably slower. Ideally it will take more than two seconds, but
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix up grammar/spelling here.

@jamesls
Copy link
Member

jamesls commented Nov 28, 2017

Looking at this now, but would you mind adding a more detailed commit message? This is a pretty big feature and I think it's worth more of an explanation.

return self._fileobj.read(amount)

self._consume_through_leaky_bucket()
return self._fileobj.read(amount)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is going to result in very bursty reads I think in cases where someone is passing a large number. I don't think there's a great way around this besides allowing partial reads which I think would be a change in the contract. However it might make sense to at least document that large numbers passed to amount might reduce the effectiveness of bandwidth limiting.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep that is true if they were passing any a large number. The good news is they really do not have control over the value passed into the read() for this abstraction. For downloads, the internals of s3transfer will always read 256KB when streaming data down, and for uploads httplib uses 8KB (which we don't even have control over). So given that the burstiness will be pretty small if at all.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think more worrisome for me is the fact that this seems to rely on multiple threads to smooth over the burstiness. The less threads there are the more bursting you get. This also appears to throw off the bandwidth calculations. I'm not quite sure why this happens, but it's really pronounced if you set the bandwidth to something high and the max concurrent requests to 1. In one test, I had 100MB+ bandwidth available, and with one thread and max_bandwidth to something really high (order of GB/s) I was only getting about 15MB/s.

Any insight on what's going on here? I haven't had a whole lot of time to dig into this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried looking into it. I was running on a machine where with default configuration's of 10 threads I was getting 100MB+, but I was never able to get a single thread (with bandwidth enabled or not enabled) to ever reach that. It was seeing speeds of ~25 MB with bandwidth turned off but turning on to something high resulted in the same speeds. I saw this for both uploads and downloads. I want to see it on your environment though.

As to the bandwidth calculations, those are just bursty in general because we aggregate the amounts we report back so if only one thread is working it will report in increments of that threshold. When there is more threads reporting it smoothes the progress updates more because there are more threads forcing the the progress to update. So that may explain that? If the threshold is low and you have one thread working, I can see the progress even more bursty because there is only thread reporting and it has to do sleeps.


class RequestExceededException(Exception):
def __init__(self, requested_amt, retry_time):
"""Error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't very helpful

pass


class TimeUtils(object):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we ever need anything more complicated in the future, this will allow us to have it. OSUtils has things like that which aren't direct mappings.

"""
self._token_bucket = token_bucket
self._time_utils = time_utils
if not time_utils:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

time_utils is not None

self._leaky_bucket = leaky_bucket
self._transfer_coordinator = transfer_coordinator
self._time_utils = time_utils
if not time_utils:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

time_utils is not None

self._bytes_seen = 0
self._bytes_threshold = bytes_threshold

def enable(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about something like enable_bandwidth_limiting (and similar for disabled)? This makes it seem like you're enabling / disabling the object itself rather than the limiting feature.

# We do not want to be calling consume on every read as the read
# amounts can be small causing the lock of the leaky bucket to
# introduce noticeable overhead. So instead we keep track of
# how many bytes we seen and only call consume once we pass a
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have seen

projected_rate = self._rate_tracker.get_projected_rate(amt, time_now)
return projected_rate > self._max_rate

def _consume_for_scheduled_consumption(self, amt, request_token, time_now):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Member

@jamesls jamesls left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, I had a few small comments.

Do we ever debug log the fact that we're throttling bandwidth (and what the max bandwidth value is)?

"""Signal that data being read is being transferred to S3"""
self.enable()
if hasattr(self._fileobj, 'signal_transferring'):
self._fileobj.signal_transferring()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This whole signal_transferring thing is really confusing, can you explain this better? Also, why is the hasattr check here? The readfilechunk I get, but I don't follow when self._fileobj would ever have a signal_transferring method for this class. It at least deserves a comment. I'm also not a fan of these hasattr() style checks and I'd prefer we not continue this pattern going forward (it makes it less clear what the expected interfaces are suppose to be).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I explained the signal transferring portion in the comment.

As to the hasattr check that is because of a whole other issue. The issue boils down to we need one shared event handler with no shared state that can indicate to a stream that the files is being read for the specific reason of being transferred to S3. We need one shared handler because one handler per request really bogs down the system. We need no share state because the handler could be invoked by separate threads.

So to achieve this, the current strategy is to check for a method on the stream and call it in the handlers to enable/disable transfer reading functionality. This is what we had in place for callbacks. However if the stream is wrapped multiple times and two of these wrappers require this enabling/disabling functionality, before only the top level wrapper would have its method called and the wrapper underneath would not have its method called. So instead the top level wrapper needs to be able to signal to the wrapper underneath that it was signaled to enable/disable whatever functionality it needs to enable/disable and so it will need to as well. So I tried to generalize all of this with the signal_transferring() and signal_not_transferring() methods. I'm up for suggestions on how to improve this.

self._client.meta.events.register_last(
event_name, enable_upload_callbacks,
unique_id='s3upload-callback-enable')
event_name, signal_transferring,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is the last event handler on a request-created event indicative that we're transferring? Does this just mean "uploads can now happen from this point on?"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not a big fan of it as well, but essentially what it suppose to encompass is to indicate that the stream is being read because it is being transferred. This is needed because the stream for uploads can be read for the following reasons:

  • It is being read and uploaded to S3
  • It is being read for content MD5
  • It is being read in memory to calculate the SHA256 for a sigv4 request.

The problem is that when you have functionality that is specific to the file stream being read from for the specific purpose of uploading to s3 (i.e. transfer callbacks and transfer bandwidth limiting), it is ambiguous on why the read is happening. We handle the MD5 point to start because these are disabled to start. However, the SHA256 portion is tricky because the signing happens during the request creation and request-created is the last event we have before sending the request over the wire. To solve it, we straddle the handler that does the sha256 calculation by disabling to start and enable it back at the very end. Hence the meaning of signal_not_transferring() comes to be "even though the stream is being read from, it is not actually being sent to S3 and the meaning of signal_transferring() means "if the stream is being read from it is because it is being read from in transferring the data".

I would be up for better names if that is confusing, but I am not sure if there is a better mechanism to use without exposing something new in botocore. Not sure if you have any thoughts?

class BandwidthRateTracker(object):
def __init__(self, a=0.8):
"""Tracks the rate of bandwidth consumption"""
self._a = a
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah can you rename this alpha or something more description? I wouldn't expect everyone to know the vars used for EMA formulas.

self._last_time = time_at_consumption

def _calculate_rate(self, amt, time_at_consumption):
return amt/(time_at_consumption - self._last_time)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spacing around the amt

return self._fileobj.read(amount)

self._consume_through_leaky_bucket()
return self._fileobj.read(amount)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think more worrisome for me is the fact that this seems to rely on multiple threads to smooth over the burstiness. The less threads there are the more bursting you get. This also appears to throw off the bandwidth calculations. I'm not quite sure why this happens, but it's really pronounced if you set the bandwidth to something high and the max concurrent requests to 1. In one test, I had 100MB+ bandwidth available, and with one thread and max_bandwidth to something really high (order of GB/s) I was only getting about 15MB/s.

Any insight on what's going on here? I haven't had a whole lot of time to dig into this.

@kyleknap
Copy link
Contributor Author

@JordonPhillips I think I got all of your comments

@joguSD I addressed all of your comments. Did not change the TimeUtils though. I feel like I would prefer to keep it as is mainly because of Jordon's point that it ensures we are not coupled to the time module and implementers do not have to implement everything under the time module to have a compliant implementation.

@dstufft I put a comment about what you were talking about for large transfers. Still looking at @jamesls brought up. Let me know if there was something specific you were thinking?

@jamesls I am still working at your stuff, but yeah I will add a better commit. I'm probably going to squash all of these and make a better commit before it gets merged in.

@kyleknap
Copy link
Contributor Author

Also as to debug logging. I do not have any set currently. To track retries and consumption amount I had an abstraction that I could query before, but I yanked that from the original review. How does these logs sound?:

  • Logging for when bandwidth limiting gets set in the transfer manager
  • Logging the request token and the retry time when requests get schedueled for retries
  • Logging when a scheduled requests is completed and releases that amt.

Copy link
Member

@jamesls jamesls left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Capturing offline feedback:

We tried to repro the degradation of single threaded performance offline but weren't able to come up with anything. Not sure what the root cause was but likely a mistake on my end. Given that, I think everything looks good to me.

if self._config.max_bandwidth is not None:
logging.debug(
'Setting max_bandwidth to %s', self._config.max_bandwidth)
token_bucket = LeakyBucket(self._config.max_bandwidth)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/token_bucket/leaky_bucket/g

This limits the rate in which uploads and downloads can stream
content to and from S3. The abstraction uses a leaky bucket to
control bandwidth consumption.
This was not seen in real life transfers, but for the functional
tests, the time deltas between reads were infitesimally small
causing the rate to be calculate with a time of 0 and throw
a ZeroDivisionError
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants