From 081589b05850d130e772d983c18b169eca0c252f Mon Sep 17 00:00:00 2001 From: Bob Weinand Date: Wed, 4 Sep 2024 13:05:59 +0200 Subject: [PATCH] Provide a common atomic Rate Limiter implementation (#560) * Provide a common atomic Rate Limiter implementation Signed-off-by: Bob Weinand * Use proper high resolution monotonic timer on windows * Add minimal sleep for test stability * Disable shellcheck warning Signed-off-by: Bob Weinand * Fix aftrer rebase * Validate rate() in test Signed-off-by: Bob Weinand * Validate limit change in test Signed-off-by: Bob Weinand * Work around floating point precision issues Signed-off-by: Bob Weinand * Update rate_limiter.rs --------- Signed-off-by: Bob Weinand --- .github/workflows/lint.yml | 4 +- Cargo.lock | 64 +++--- LICENSE-3rdparty.yml | 247 ++++++++++++++++++++- ddcommon/Cargo.toml | 8 + ddcommon/src/lib.rs | 1 + ddcommon/src/rate_limiter.rs | 186 ++++++++++++++++ ipc/Cargo.toml | 1 + ipc/src/lib.rs | 1 + ipc/src/platform/mem_handle.rs | 4 +- ipc/src/platform/unix/mem_handle.rs | 9 +- ipc/src/platform/unix/mem_handle_macos.rs | 11 +- ipc/src/platform/windows/mem_handle.rs | 5 +- ipc/src/rate_limiter.rs | 259 ++++++++++++++++++++++ sidecar/src/one_way_shared_memory.rs | 29 +-- 14 files changed, 760 insertions(+), 69 deletions(-) create mode 100644 ddcommon/src/rate_limiter.rs create mode 100644 ipc/src/rate_limiter.rs diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index b85263253..c068e9cf7 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -44,7 +44,9 @@ jobs: run: rustup install ${{ matrix.rust_version }} && rustup default ${{ matrix.rust_version }} && rustup component add clippy - name: Run clippy on ${{ matrix.platform }} ${{ matrix.rust_version }} shell: bash - run: cargo clippy --all-targets --all-features -- -D warnings "$([ ${{ matrix.rust_version }} = 1.71.1 ] && echo -Aunknown-lints)" + run: | + # shellcheck disable=SC2046 + cargo clippy --all-targets --all-features -- -D warnings $([ ${{ matrix.rust_version }} = 1.71.1 ] && echo -Aunknown-lints -Aclippy::cast_ref_to_mut) licensecheck: runs-on: ubuntu-latest name: "Presence of licence headers" diff --git a/Cargo.lock b/Cargo.lock index e6b616fbc..ec2677846 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -924,7 +924,7 @@ dependencies = [ "num-traits", "serde", "wasm-bindgen", - "windows-targets 0.52.4", + "windows-targets 0.52.6", ] [[package]] @@ -1423,6 +1423,7 @@ dependencies = [ "bytes", "criterion", "datadog-ipc-macros", + "ddcommon", "futures", "glibc_version", "io-lifetimes", @@ -1757,6 +1758,7 @@ dependencies = [ "hyper-util", "indexmap 2.2.6", "lazy_static", + "libc", "log", "maplit", "pin-project", @@ -1767,6 +1769,7 @@ dependencies = [ "static_assertions", "tokio", "tokio-rustls 0.26.0", + "windows-sys 0.52.0", ] [[package]] @@ -2998,7 +3001,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c2a198fb6b0eada2a8df47933734e6d35d350665a33a3593d7164fa52c75c19" dependencies = [ "cfg-if", - "windows-targets 0.52.4", + "windows-targets 0.48.5", ] [[package]] @@ -5740,7 +5743,7 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" dependencies = [ - "windows-targets 0.52.4", + "windows-targets 0.52.6", ] [[package]] @@ -5773,7 +5776,7 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" dependencies = [ - "windows-targets 0.52.4", + "windows-targets 0.52.6", ] [[package]] @@ -5793,17 +5796,18 @@ dependencies = [ [[package]] name = "windows-targets" -version = "0.52.4" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7dd37b7e5ab9018759f893a1952c9420d060016fc19a472b4bb20d1bdd694d1b" +checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973" dependencies = [ - "windows_aarch64_gnullvm 0.52.4", - "windows_aarch64_msvc 0.52.4", - "windows_i686_gnu 0.52.4", - "windows_i686_msvc 0.52.4", - "windows_x86_64_gnu 0.52.4", - "windows_x86_64_gnullvm 0.52.4", - "windows_x86_64_msvc 0.52.4", + "windows_aarch64_gnullvm 0.52.6", + "windows_aarch64_msvc 0.52.6", + "windows_i686_gnu 0.52.6", + "windows_i686_gnullvm", + "windows_i686_msvc 0.52.6", + "windows_x86_64_gnu 0.52.6", + "windows_x86_64_gnullvm 0.52.6", + "windows_x86_64_msvc 0.52.6", ] [[package]] @@ -5820,9 +5824,9 @@ checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" [[package]] name = "windows_aarch64_gnullvm" -version = "0.52.4" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bcf46cf4c365c6f2d1cc93ce535f2c8b244591df96ceee75d8e83deb70a9cac9" +checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" [[package]] name = "windows_aarch64_msvc" @@ -5838,9 +5842,9 @@ checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" [[package]] name = "windows_aarch64_msvc" -version = "0.52.4" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da9f259dd3bcf6990b55bffd094c4f7235817ba4ceebde8e6d11cd0c5633b675" +checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" [[package]] name = "windows_i686_gnu" @@ -5856,9 +5860,15 @@ checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" [[package]] name = "windows_i686_gnu" -version = "0.52.4" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b" + +[[package]] +name = "windows_i686_gnullvm" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b474d8268f99e0995f25b9f095bc7434632601028cf86590aea5c8a5cb7801d3" +checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" [[package]] name = "windows_i686_msvc" @@ -5874,9 +5884,9 @@ checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" [[package]] name = "windows_i686_msvc" -version = "0.52.4" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1515e9a29e5bed743cb4415a9ecf5dfca648ce85ee42e15873c3cd8610ff8e02" +checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" [[package]] name = "windows_x86_64_gnu" @@ -5892,9 +5902,9 @@ checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" [[package]] name = "windows_x86_64_gnu" -version = "0.52.4" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5eee091590e89cc02ad514ffe3ead9eb6b660aedca2183455434b93546371a03" +checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" [[package]] name = "windows_x86_64_gnullvm" @@ -5910,9 +5920,9 @@ checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" [[package]] name = "windows_x86_64_gnullvm" -version = "0.52.4" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77ca79f2451b49fa9e2af39f0747fe999fcda4f5e241b2898624dca97a1f2177" +checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" [[package]] name = "windows_x86_64_msvc" @@ -5928,9 +5938,9 @@ checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" [[package]] name = "windows_x86_64_msvc" -version = "0.52.4" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32b752e52a2da0ddfbdbcc6fceadfeede4c939ed16d13e648833a61dfb611ed8" +checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" [[package]] name = "winnow" diff --git a/LICENSE-3rdparty.yml b/LICENSE-3rdparty.yml index 6df1a737c..2d75da322 100644 --- a/LICENSE-3rdparty.yml +++ b/LICENSE-3rdparty.yml @@ -32870,7 +32870,7 @@ third_party_libraries: See the License for the specific language governing permissions and limitations under the License. - package_name: windows-targets - package_version: 0.52.4 + package_version: 0.52.6 repository: https://github.com/microsoft/windows-rs license: MIT OR Apache-2.0 licenses: @@ -33563,7 +33563,7 @@ third_party_libraries: See the License for the specific language governing permissions and limitations under the License. - package_name: windows_aarch64_gnullvm - package_version: 0.52.4 + package_version: 0.52.6 repository: https://github.com/microsoft/windows-rs license: MIT OR Apache-2.0 licenses: @@ -34256,7 +34256,7 @@ third_party_libraries: See the License for the specific language governing permissions and limitations under the License. - package_name: windows_aarch64_msvc - package_version: 0.52.4 + package_version: 0.52.6 repository: https://github.com/microsoft/windows-rs license: MIT OR Apache-2.0 licenses: @@ -34949,7 +34949,238 @@ third_party_libraries: See the License for the specific language governing permissions and limitations under the License. - package_name: windows_i686_gnu - package_version: 0.52.4 + package_version: 0.52.6 + repository: https://github.com/microsoft/windows-rs + license: MIT OR Apache-2.0 + licenses: + - license: MIT + text: |2 + MIT License + + Copyright (c) Microsoft Corporation. + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in all + copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + SOFTWARE + - license: Apache-2.0 + text: |2 + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright (c) Microsoft Corporation. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +- package_name: windows_i686_gnullvm + package_version: 0.52.6 repository: https://github.com/microsoft/windows-rs license: MIT OR Apache-2.0 licenses: @@ -35642,7 +35873,7 @@ third_party_libraries: See the License for the specific language governing permissions and limitations under the License. - package_name: windows_i686_msvc - package_version: 0.52.4 + package_version: 0.52.6 repository: https://github.com/microsoft/windows-rs license: MIT OR Apache-2.0 licenses: @@ -36335,7 +36566,7 @@ third_party_libraries: See the License for the specific language governing permissions and limitations under the License. - package_name: windows_x86_64_gnu - package_version: 0.52.4 + package_version: 0.52.6 repository: https://github.com/microsoft/windows-rs license: MIT OR Apache-2.0 licenses: @@ -37028,7 +37259,7 @@ third_party_libraries: See the License for the specific language governing permissions and limitations under the License. - package_name: windows_x86_64_gnullvm - package_version: 0.52.4 + package_version: 0.52.6 repository: https://github.com/microsoft/windows-rs license: MIT OR Apache-2.0 licenses: @@ -37721,7 +37952,7 @@ third_party_libraries: See the License for the specific language governing permissions and limitations under the License. - package_name: windows_x86_64_msvc - package_version: 0.52.4 + package_version: 0.52.6 repository: https://github.com/microsoft/windows-rs license: MIT OR Apache-2.0 licenses: diff --git a/ddcommon/Cargo.toml b/ddcommon/Cargo.toml index bd458b36a..d5bd74f65 100644 --- a/ddcommon/Cargo.toml +++ b/ddcommon/Cargo.toml @@ -36,6 +36,14 @@ tokio = { version = "1.23", features = ["rt", "macros"] } tokio-rustls = { version = "0.26", default-features = false } serde = { version = "1.0", features = ["derive"] } static_assertions = "1.1.0" +libc = "0.2" + +[target.'cfg(windows)'.dependencies.windows-sys] +version = "0.52" +features = [ + "Win32_Foundation", + "Win32_System_Performance", +] [target.'cfg(unix)'.dependencies] hyper-rustls = { version = "0.27", default-features = false, features = [ diff --git a/ddcommon/src/lib.rs b/ddcommon/src/lib.rs index 089de4252..03f5056ee 100644 --- a/ddcommon/src/lib.rs +++ b/ddcommon/src/lib.rs @@ -16,6 +16,7 @@ pub mod entity_id; #[macro_use] pub mod cstr; pub mod config; +pub mod rate_limiter; pub mod tag; pub mod header { diff --git a/ddcommon/src/rate_limiter.rs b/ddcommon/src/rate_limiter.rs new file mode 100644 index 000000000..fee7480d1 --- /dev/null +++ b/ddcommon/src/rate_limiter.rs @@ -0,0 +1,186 @@ +// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +use std::sync::atomic::{AtomicI64, AtomicU32, AtomicU64, Ordering}; + +pub trait Limiter { + /// Takes the limit per interval. + /// Returns false if the limit is exceeded, otherwise true. + fn inc(&self, limit: u32) -> bool; + /// Returns the effective rate per interval. + /// Note: The rate is only guaranteed to be accurate immediately after a call to inc(). + fn rate(&self) -> f64; +} + +/// A thread-safe limiter built on Atomics. +/// It's base unit is in seconds, i.e. the minimum allowed rate is 1 per second. +/// Internally the limiter works with the system time granularity, i.e. nanoseconds on unix and +/// milliseconds on windows. +/// The implementation is a sliding window: every time the limiter is increased, the amount of time +/// that has passed is also refilled. +#[repr(C)] +pub struct LocalLimiter { + hit_count: AtomicI64, + last_update: AtomicU64, + last_limit: AtomicU32, + granularity: i64, +} + +const TIME_PER_SECOND: i64 = 1_000_000_000; // nanoseconds + +fn now() -> u64 { + #[cfg(windows)] + let now = unsafe { + static FREQUENCY: AtomicU64 = AtomicU64::new(0); + + let mut frequency = FREQUENCY.load(Ordering::Relaxed); + if frequency == 0 { + windows_sys::Win32::System::Performance::QueryPerformanceFrequency( + &mut frequency as *mut u64 as *mut i64, + ); + FREQUENCY.store(frequency, Ordering::Relaxed); + } + + let mut perf_counter = 0; + windows_sys::Win32::System::Performance::QueryPerformanceCounter(&mut perf_counter); + perf_counter as u64 * frequency / TIME_PER_SECOND as u64 + }; + #[cfg(not(windows))] + let now = { + let mut ts: libc::timespec = libc::timespec { + tv_sec: 0, + tv_nsec: 0, + }; + unsafe { libc::clock_gettime(libc::CLOCK_MONOTONIC, &mut ts) }; + (ts.tv_sec * TIME_PER_SECOND + ts.tv_nsec) as u64 + }; + now +} + +impl Default for LocalLimiter { + fn default() -> Self { + LocalLimiter { + hit_count: Default::default(), + last_update: AtomicU64::from(now()), + last_limit: Default::default(), + granularity: TIME_PER_SECOND, + } + } +} + +impl LocalLimiter { + /// Allows setting a custom time granularity. The default() implementation is 1 second. + pub fn with_granularity(seconds: u32) -> LocalLimiter { + let mut limiter = LocalLimiter::default(); + limiter.granularity *= seconds as i64; + limiter + } + + /// Resets, with a given granularity. + pub fn reset(&mut self, seconds: u32) { + self.last_update.store(now(), Ordering::Relaxed); + self.hit_count.store(0, Ordering::Relaxed); + self.last_limit.store(0, Ordering::Relaxed); + self.granularity = TIME_PER_SECOND * seconds as i64; + } +} + +impl Limiter for LocalLimiter { + fn inc(&self, limit: u32) -> bool { + let now = now(); + let last = self.last_update.swap(now, Ordering::SeqCst); + // Make sure reducing the limit doesn't stall for a long time + let clear_limit = limit.max(self.last_limit.load(Ordering::Relaxed)); + let clear_counter = (now as i64 - last as i64) * (clear_limit as i64); + let subtract = clear_counter - self.granularity; + let mut previous_hits = self.hit_count.fetch_sub(subtract, Ordering::SeqCst); + // Handle where the limiter goes below zero + if previous_hits < subtract { + let add = clear_counter - previous_hits.max(0); + self.hit_count.fetch_add(add, Ordering::Acquire); + previous_hits += add - clear_counter; + } + if previous_hits / self.granularity >= limit as i64 { + self.hit_count + .fetch_sub(self.granularity, Ordering::Acquire); + false + } else { + // We don't care about race conditions here: + // If the last limit was high enough to increase the previous_hits, we are anyway close + // to a number realistic to decrease the count quickly; i.e. we won't stall the limiter + // indefinitely when switching from a high to a low limit. + self.last_limit.store(limit, Ordering::Relaxed); + true + } + } + + fn rate(&self) -> f64 { + let last_limit = self.last_limit.load(Ordering::Relaxed); + let hit_count = self.hit_count.load(Ordering::Relaxed); + (hit_count as f64 / (last_limit as i64 * self.granularity) as f64).clamp(0., 1.) + } +} + +#[cfg(test)] +mod tests { + use crate::rate_limiter::{Limiter, LocalLimiter, TIME_PER_SECOND}; + use std::sync::atomic::Ordering; + use std::thread::sleep; + use std::time::Duration; + + #[test] + #[cfg_attr(miri, ignore)] + fn test_rate_limiter() { + let limiter = LocalLimiter::default(); + // Two are allowed, then one more because a small amount of time passed since the first one + assert!(limiter.inc(2)); + // Work around floating point precision issues + assert!(limiter.rate() > 0.49999 && limiter.rate() <= 0.5); + // Add a minimal amount of time to ensure the test doesn't run faster than timer precision + sleep(Duration::from_micros(100)); + assert!(limiter.inc(2)); + // We're close to 1, but not quite, due to the minimal time passed + assert!(limiter.rate() > 0.5 && limiter.rate() < 1.); + sleep(Duration::from_micros(100)); + assert!(limiter.inc(2)); + // Rate capped at 1 + assert_eq!(1., limiter.rate()); + sleep(Duration::from_micros(100)); + assert!(!limiter.inc(2)); + sleep(Duration::from_micros(100)); + assert!(!limiter.inc(2)); + sleep(Duration::from_micros(100)); + + // reduce 4 times, we're going into negative territory. Next increment will reset to zero. + limiter + .last_update + .fetch_sub(3 * TIME_PER_SECOND as u64, Ordering::Relaxed); + assert!(limiter.inc(2)); + // Work around floating point precision issues + assert!(limiter.rate() > 0.49999 && limiter.rate() <= 0.5); // We're starting from scratch + sleep(Duration::from_micros(100)); + assert!(limiter.inc(2)); + sleep(Duration::from_micros(100)); + assert!(limiter.inc(2)); + sleep(Duration::from_micros(100)); + assert!(!limiter.inc(2)); + sleep(Duration::from_micros(100)); + + // Test change to higher value + assert!(limiter.inc(3)); + sleep(Duration::from_micros(100)); + assert!(!limiter.inc(3)); + + // Then change to lower value - but we have no capacity + assert!(!limiter.inc(1)); + + // The counter is around 4 (because last limit was 3) + // We're keeping the highest successful limit stored, thus subtracting 3 twice will reset it + limiter + .last_update + .fetch_sub(2 * TIME_PER_SECOND as u64, Ordering::Relaxed); + + // And now 1 succeeds again. + assert!(limiter.inc(1)); + } +} diff --git a/ipc/Cargo.toml b/ipc/Cargo.toml index b702645a3..321e59615 100644 --- a/ipc/Cargo.toml +++ b/ipc/Cargo.toml @@ -21,6 +21,7 @@ libc = { version = "0.2" } # tarpc needed extensions to allow 1 way communication and to export some internal structs tarpc = { path = "tarpc/tarpc", default-features = false, features = ["serde-transport"], package = "tarpc" } +ddcommon = { path = "../ddcommon" } datadog-ipc-macros = { path = "macros" } [dev-dependencies] diff --git a/ipc/src/lib.rs b/ipc/src/lib.rs index b3dc197cf..a73a8c71d 100644 --- a/ipc/src/lib.rs +++ b/ipc/src/lib.rs @@ -6,6 +6,7 @@ pub mod handles; pub mod transport; pub mod platform; +pub mod rate_limiter; pub mod sequential; pub use tarpc; diff --git a/ipc/src/platform/mem_handle.rs b/ipc/src/platform/mem_handle.rs index 94bd62fe6..87714924b 100644 --- a/ipc/src/platform/mem_handle.rs +++ b/ipc/src/platform/mem_handle.rs @@ -215,7 +215,7 @@ mod tests { let shm = ShmHandle::new(5).unwrap(); let mut mapped = shm.map().unwrap(); _ = mapped.as_slice_mut().write(&[1, 2, 3, 4, 5]).unwrap(); - let mapped = mapped.ensure_space(100000); + mapped.ensure_space(100000); assert!(mapped.as_slice().len() >= 100000); let mut exp = vec![0u8; mapped.as_slice().len()]; _ = (&mut exp[..5]).write(&[1, 2, 3, 4, 5]).unwrap(); @@ -229,7 +229,7 @@ mod tests { let shm = NamedShmHandle::create(path.clone(), 5).unwrap(); let mut mapped = shm.map().unwrap(); _ = mapped.as_slice_mut().write(&[1, 2, 3, 4, 5]).unwrap(); - let mapped = mapped.ensure_space(100000); + mapped.ensure_space(100000); assert!(mapped.as_slice().len() >= 100000); let other = NamedShmHandle::open(&path).unwrap().map().unwrap(); diff --git a/ipc/src/platform/unix/mem_handle.rs b/ipc/src/platform/unix/mem_handle.rs index ea70ea4ab..a5aec0e9a 100644 --- a/ipc/src/platform/unix/mem_handle.rs +++ b/ipc/src/platform/unix/mem_handle.rs @@ -111,14 +111,15 @@ impl NamedShmHandle { } impl>> MappedMem { - pub fn ensure_space(self, expected_size: usize) -> MappedMem { + pub fn ensure_space(&mut self, expected_size: usize) { if expected_size <= self.mem.get_shm().size { - return self; + return; } - let mut handle: T = self.into(); + // SAFETY: we'll overwrite the original memory later + let mut handle: T = unsafe { std::ptr::read(self) }.into(); _ = handle.resize(expected_size); - handle.map().unwrap() + unsafe { std::ptr::write(self, handle.map().unwrap()) }; } } diff --git a/ipc/src/platform/unix/mem_handle_macos.rs b/ipc/src/platform/unix/mem_handle_macos.rs index 6a5930e0e..014c98833 100644 --- a/ipc/src/platform/unix/mem_handle_macos.rs +++ b/ipc/src/platform/unix/mem_handle_macos.rs @@ -126,9 +126,9 @@ impl NamedShmHandle { } impl>> MappedMem { - pub fn ensure_space(self, expected_size: usize) -> MappedMem { + pub fn ensure_space(&mut self, expected_size: usize) { if expected_size <= self.mem.get_shm().size { - return self; + return; } if expected_size > MAPPING_MAX_SIZE - page_size::get() { panic!( @@ -138,7 +138,9 @@ impl>> MappedMem { ); } - let mut handle: T = self.into(); + // SAFETY: we'll overwrite the original memory later + let mut handle: T = unsafe { std::ptr::read(self) }.into(); + let page_size = NonZeroUsize::try_from(page_size::get()).unwrap(); unsafe { _ = handle.set_mapping_size(expected_size); @@ -156,7 +158,8 @@ impl>> MappedMem { size.fetch_max(handle.get_size(), Ordering::SeqCst); _ = munmap(ptr, usize::from(page_size)); } - handle.map().unwrap() + + unsafe { std::ptr::write(self, handle.map().unwrap()) }; } } diff --git a/ipc/src/platform/windows/mem_handle.rs b/ipc/src/platform/windows/mem_handle.rs index b3d8fc8d8..474ccbbe7 100644 --- a/ipc/src/platform/windows/mem_handle.rs +++ b/ipc/src/platform/windows/mem_handle.rs @@ -151,10 +151,10 @@ impl NamedShmHandle { } impl>> MappedMem { - pub fn ensure_space(mut self, expected_size: usize) -> MappedMem { + pub fn ensure_space(&mut self, expected_size: usize) { let current_size = self.mem.get_shm().size; if expected_size <= current_size { - return self; + return; } if expected_size > MAPPING_MAX_SIZE { panic!( @@ -175,6 +175,5 @@ impl>> MappedMem { PAGE_READWRITE, ) }; - self } } diff --git a/ipc/src/rate_limiter.rs b/ipc/src/rate_limiter.rs new file mode 100644 index 000000000..f05439f39 --- /dev/null +++ b/ipc/src/rate_limiter.rs @@ -0,0 +1,259 @@ +// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +use crate::platform::{FileBackedHandle, MappedMem, NamedShmHandle}; +use ddcommon::rate_limiter::{Limiter, LocalLimiter}; +use std::ffi::CString; +use std::fmt::{Debug, Formatter}; +use std::io; +use std::marker::PhantomData; +use std::sync::atomic::{AtomicI32, AtomicU32, Ordering}; +use std::sync::{Arc, RwLock}; + +#[repr(C)] +#[derive(Default)] +struct ShmLimiterData<'a> { + next_free: AtomicU32, // free list + rc: AtomicI32, + limiter: LocalLimiter, + _phantom: PhantomData<&'a ShmLimiterMemory>, +} + +#[derive(Clone)] +pub struct ShmLimiterMemory(Arc>>); + +impl ShmLimiterMemory { + const START_OFFSET: u32 = std::mem::align_of::() as u32; + + pub fn create(path: CString) -> io::Result { + // Clean leftover shm + unsafe { libc::unlink(path.as_ptr()) }; + let mem = Self::new(NamedShmHandle::create(path, 0x1000)?.map()?); + mem.first_free_ref() + .store(Self::START_OFFSET, Ordering::Relaxed); + Ok(mem) + } + + /// Opens the shared limiter. Users are expected to re-open this if their sidecar connection + /// breaks. + pub fn open(path: &CString) -> io::Result { + Ok(Self::new(NamedShmHandle::open(path)?.map()?)) + } + + fn new(handle: MappedMem) -> Self { + Self(Arc::new(RwLock::new(handle))) + } + + /// The start of the ShmLimiter memory has 4 bytes indicating an offset to the first free + /// element in the free list. It is zero if there is no element on the free list. + fn first_free_ref(&self) -> &AtomicU32 { + unsafe { &*self.0.read().unwrap().as_slice().as_ptr().cast() } + } + + fn next_free(&mut self) -> u32 { + let mut first_free = self.first_free_ref().load(Ordering::Relaxed); + loop { + let mut target_next_free = ShmLimiter { + idx: first_free, + memory: self.clone(), + } + .limiter() + .next_free + .load(Ordering::Relaxed); + // Not yet used memory will always be 0. The next free entry will then be just above. + if target_next_free == 0 { + target_next_free = first_free + std::mem::size_of::() as u32; + // target_next_free is the end of the current entry - but we need one more + self.0.write().unwrap().ensure_space( + target_next_free as usize + std::mem::size_of::(), + ); + } + match self.first_free_ref().compare_exchange( + first_free, + target_next_free, + Ordering::Release, + Ordering::Relaxed, + ) { + Ok(_) => return first_free, + Err(found) => first_free = found, + } + } + } + + pub fn alloc(&mut self) -> ShmLimiter { + let reference = ShmLimiter { + idx: self.next_free(), + memory: self.clone(), + }; + let limiter = reference.limiter(); + limiter.rc.store(1, Ordering::Relaxed); + // SAFETY: we initialize the struct here + unsafe { + (*(limiter as *const _ as *mut ShmLimiterData)) + .limiter + .reset(1) + }; + reference + } + + pub fn get(&self, idx: u32) -> Option { + assert_eq!( + idx % std::mem::size_of::() as u32, + Self::START_OFFSET + ); + let reference = ShmLimiter { + idx, + memory: self.clone(), + }; + let limiter = reference.limiter(); + let mut rc = limiter.rc.load(Ordering::Relaxed); + loop { + if rc == 0 { + return None; + } + match limiter + .rc + .compare_exchange(rc, rc + 1, Ordering::Release, Ordering::Relaxed) + { + Ok(_) => return Some(reference), + Err(found) => rc = found, + } + } + } +} + +pub struct ShmLimiter { + idx: u32, + memory: ShmLimiterMemory, +} + +impl Debug for ShmLimiter { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + self.idx.fmt(f) + } +} + +impl ShmLimiter { + fn limiter(&self) -> &ShmLimiterData { + unsafe { + &*self + .memory + .0 + .read() + .unwrap() + .as_slice() + .as_ptr() + .offset(self.idx as isize) + .cast() + } + } + + pub fn index(&self) -> u32 { + self.idx + } +} + +impl Limiter for ShmLimiter { + fn inc(&self, limit: u32) -> bool { + self.limiter().limiter.inc(limit) + } + + fn rate(&self) -> f64 { + self.limiter().limiter.rate() + } +} + +impl Drop for ShmLimiter { + fn drop(&mut self) { + let limiter = self.limiter(); + if limiter.rc.fetch_sub(1, Ordering::SeqCst) == 1 { + let next_free_ref = self.memory.first_free_ref(); + let mut next_free = next_free_ref.load(Ordering::Relaxed); + loop { + limiter.next_free.store(next_free, Ordering::Relaxed); + match next_free_ref.compare_exchange( + next_free, + self.idx, + Ordering::SeqCst, + Ordering::Relaxed, + ) { + Ok(_) => return, + Err(found) => next_free = found, + } + } + } + } +} + +pub enum AnyLimiter { + Local(LocalLimiter), + Shm(ShmLimiter), +} + +impl AnyLimiter { + fn limiter(&self) -> &dyn Limiter { + match self { + AnyLimiter::Local(local) => local as &dyn Limiter, + AnyLimiter::Shm(shm) => shm as &dyn Limiter, + } + } +} + +impl Limiter for AnyLimiter { + fn inc(&self, limit: u32) -> bool { + self.limiter().inc(limit) + } + + fn rate(&self) -> f64 { + self.limiter().rate() + } +} + +#[cfg(test)] +mod tests { + use crate::rate_limiter::{ShmLimiterData, ShmLimiterMemory}; + use ddcommon::rate_limiter::Limiter; + use std::ffi::CString; + use std::thread::sleep; + use std::time::Duration; + + fn path() -> CString { + CString::new("/ddlimiters-test".to_string()).unwrap() + } + + #[test] + #[cfg_attr(miri, ignore)] + fn test_limiters() { + let mut limiters = ShmLimiterMemory::create(path()).unwrap(); + let limiter = limiters.alloc(); + let limiter_idx = limiter.idx; + // Two are allowed, then one more because a small amount of time passed since the first one + assert!(limiter.inc(2)); + // Add a minimal amount of time to ensure the test doesn't run faster than timer precision + sleep(Duration::from_micros(100)); + assert!(limiter.inc(2)); + sleep(Duration::from_micros(100)); + assert!(limiter.inc(2)); + sleep(Duration::from_micros(100)); + assert!(!limiter.inc(2)); + sleep(Duration::from_micros(100)); + assert!(!limiter.inc(2)); + + // Now test the free list + let limiter2 = limiters.alloc(); + assert_eq!( + limiter2.idx, + limiter_idx + std::mem::size_of::() as u32 + ); + drop(limiter); + + let limiter = limiters.alloc(); + assert_eq!(limiter.idx, limiter_idx); + + let limiter3 = limiters.alloc(); + assert_eq!( + limiter3.idx, + limiter2.idx + std::mem::size_of::() as u32 + ); + } +} diff --git a/sidecar/src/one_way_shared_memory.rs b/sidecar/src/one_way_shared_memory.rs index 5949e5db0..d5490dcba 100644 --- a/sidecar/src/one_way_shared_memory.rs +++ b/sidecar/src/one_way_shared_memory.rs @@ -11,7 +11,7 @@ pub struct OneWayShmWriter where T: FileBackedHandle + From>, { - handle: Mutex>>, + handle: Mutex>, } pub struct OneWayShmReader @@ -68,7 +68,7 @@ pub fn create_anon_pair() -> anyhow::Result<(OneWayShmWriter, ShmHand let handle = ShmHandle::new(0x1000)?; Ok(( OneWayShmWriter { - handle: Mutex::new(Some(handle.clone().map()?)), + handle: Mutex::new(handle.clone().map()?), }, handle, )) @@ -87,7 +87,7 @@ impl>, D> OneWayShmReader { impl>> OneWayShmWriter { pub fn new(path: CString) -> io::Result> { Ok(OneWayShmWriter { - handle: Mutex::new(Some(NamedShmHandle::create(path, 0x1000)?.map()?)), + handle: Mutex::new(NamedShmHandle::create(path, 0x1000)?.map()?), }) } } @@ -127,9 +127,8 @@ where let fetch_data = |reader: &'a mut OneWayShmReader| { let size = std::mem::size_of::() + source_data.meta.size; - let handle = reader.handle.take().unwrap().ensure_space(size); - reader.handle.replace(handle); - let handle = reader.handle.as_ref().unwrap(); + let handle = reader.handle.as_mut().unwrap(); + handle.ensure_space(size); // aligned on 8 byte boundary, round up to closest 8 byte boundary let mut new_mem = Vec::::with_capacity((size + 7) / 8); @@ -185,11 +184,10 @@ where impl>> OneWayShmWriter { pub fn write(&self, contents: &[u8]) { - let mut handle = self.handle.lock().unwrap(); - let mut mapped = handle.take().unwrap(); + let mut mapped = self.handle.lock().unwrap(); let size = contents.len() + 1; // trailing zero byte, to keep some C code happy - mapped = mapped.ensure_space(std::mem::size_of::() + size); + mapped.ensure_space(std::mem::size_of::() + size); // Safety: ShmHandle is always big enough // Actually &mut mapped.as_slice_mut() as RawData seems safe, but unsized locals are @@ -203,13 +201,10 @@ impl>> OneWayShmWriter { data.meta.generation.fetch_add(1, Ordering::SeqCst); data.meta.writing.store(false, Ordering::SeqCst); - - handle.replace(mapped); } pub fn as_slice(&self) -> &[u8] { - let handle = self.handle.lock().unwrap(); - let mapped = handle.as_ref().unwrap(); + let mapped = self.handle.lock().unwrap(); let data = unsafe { &*(mapped.as_slice() as *const [u8] as *const RawData) }; if data.meta.size > 0 { let slice = data.as_slice(); @@ -220,12 +215,6 @@ impl>> OneWayShmWriter { } pub fn size(&self) -> usize { - self.handle - .lock() - .unwrap() - .as_ref() - .unwrap() - .as_slice() - .len() + self.handle.lock().unwrap().as_slice().len() } }