-
Notifications
You must be signed in to change notification settings - Fork 4
/
auxiliary.rs
153 lines (121 loc) · 4.78 KB
/
auxiliary.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
use crate::{lowlevel::Extensions, SftpAuxiliaryData};
use std::sync::atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering};
use once_cell::sync::OnceCell;
use tokio::sync::Notify;
use tokio_util::sync::CancellationToken;
#[derive(Debug, Copy, Clone)]
pub(super) struct Limits {
pub(super) read_len: u32,
pub(super) write_len: u32,
}
#[derive(Debug)]
pub(super) struct ConnInfo {
pub(super) limits: Limits,
pub(super) extensions: Extensions,
}
#[derive(Debug)]
pub(super) struct Auxiliary {
pub(super) conn_info: OnceCell<ConnInfo>,
/// cancel_token is used to cancel `Awaitable*Future`
/// when the read_task/flush_task has failed.
pub(super) cancel_token: CancellationToken,
/// flush_end_notify is used to avoid unnecessary wakeup
/// in flush_task.
pub(super) flush_end_notify: Notify,
/// `Notify::notify_one` is called if
/// pending_requests == max_pending_requests.
pub(super) flush_immediately: Notify,
/// There can be at most `u32::MAX` pending requests, since each request
/// requires a request id that is 32 bits.
pub(super) pending_requests: AtomicUsize,
pub(super) max_pending_requests: u16,
pub(super) read_end_notify: Notify,
pub(super) requests_to_read: AtomicUsize,
/// 0 means no shutdown is requested
/// 1 means the read task should shutdown
/// 2 means the flush task should shutdown
pub(super) shutdown_stage: AtomicU8,
/// Number of handles that can issue new requests.
///
/// Use AtomicU64 in case the user keep creating new [`sftp::SftpHandle`]
/// and then [`std::mem::forget`] them.
pub(super) active_user_count: AtomicU64,
pub(super) auxiliary_data: SftpAuxiliaryData,
}
impl Auxiliary {
pub(super) fn new(max_pending_requests: u16, auxiliary_data: SftpAuxiliaryData) -> Self {
Self {
conn_info: OnceCell::new(),
cancel_token: CancellationToken::new(),
flush_end_notify: Notify::new(),
flush_immediately: Notify::new(),
pending_requests: AtomicUsize::new(0),
max_pending_requests,
read_end_notify: Notify::new(),
requests_to_read: AtomicUsize::new(0),
shutdown_stage: AtomicU8::new(0),
active_user_count: AtomicU64::new(1),
auxiliary_data,
}
}
pub(super) fn wakeup_flush_task(&self) {
// Must increment requests_to_read first, since
// flush_task might wakeup read_end once it done flushing.
self.requests_to_read.fetch_add(1, Ordering::Relaxed);
let pending_requests = self.pending_requests.fetch_add(1, Ordering::Relaxed);
self.flush_end_notify.notify_one();
// Use `==` here to avoid unnecessary wakeup of flush_task.
if pending_requests == self.max_pending_requests() {
self.flush_immediately.notify_one();
}
}
fn conn_info(&self) -> &ConnInfo {
self.conn_info
.get()
.expect("auxiliary.conn_info shall be initialized by sftp::Sftp::new")
}
pub(super) fn extensions(&self) -> Extensions {
// since writing to conn_info is only done in `Sftp::new`,
// reading these variable should never block.
self.conn_info().extensions
}
pub(super) fn limits(&self) -> Limits {
// since writing to conn_info is only done in `Sftp::new`,
// reading these variable should never block.
self.conn_info().limits
}
pub(super) fn max_pending_requests(&self) -> usize {
self.max_pending_requests as usize
}
pub(super) fn order_shutdown(&self) {
// Order the shutdown of read_task.
//
// Once it shutdowns, it will automatically order
// shutdown of flush_task.
self.shutdown_stage.store(1, Ordering::Relaxed);
self.flush_immediately.notify_one();
self.flush_end_notify.notify_one();
}
/// Triggers the flushing of the internal buffer in `flush_task`.
///
/// If there are pending requests, then flushing would happen immediately.
///
/// If not, then the next time a request is queued in the write buffer, it
/// will be immediately flushed.
pub(super) fn trigger_flushing(&self) {
self.flush_immediately.notify_one();
}
/// Return number of pending requests in the write buffer.
pub(super) fn get_pending_requests(&self) -> usize {
self.pending_requests.load(Ordering::Relaxed)
}
pub(super) fn inc_active_user_count(&self) {
self.active_user_count.fetch_add(1, Ordering::Relaxed);
}
pub(super) fn dec_active_user_count(&self) {
if self.active_user_count.fetch_sub(1, Ordering::Relaxed) == 1 {
// self.active_user_count is now equal to 0, ready for shutdown.
self.order_shutdown()
}
}
}