-
Notifications
You must be signed in to change notification settings - Fork 26
/
config.ex
227 lines (183 loc) · 6.9 KB
/
config.ex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
defmodule NSQ.Config do
@ms 1
@seconds 1000 * @ms
@minutes 60 * @seconds
@default_config %{
dial_timeout: 1000,
# Deadlines for network reads and writes
read_timeout: 1 * @minutes,
write_timeout: 1 * @seconds,
# {host, port} tuples identifying where we should look for nsqd/nsqlookupd.
nsqds: [],
nsqlookupds: [],
# Duration between polling lookupd for new producers, and fractional jitter
# to add to the lookupd pool loop. this helps evenly distribute requests
# even if multiple consumers restart at the same time
#
# NOTE: when not using nsqlookupd, lookupd_poll_interval represents the
# duration of time between reconnection attempts
lookupd_poll_interval: 1 * @minutes,
lookupd_poll_jitter: 0.3,
# If nsqlookupd is not being used and a connection to nsqd should fail,
# it will automatically attempt to reconnect based on the
# lookupd_poll_interval. This is how many times it will make the attempt
# before erroring out.
max_reconnect_attempts: 30,
# Maximum duration when REQueueing (for doubling of deferred requeue)
max_requeue_delay: 15 * @minutes,
# Backoff strategy, defaults to exponential backoff. Overwrite this to
# define alternative backoff algorithms
backoff_strategy: :exponential,
# Maximum amount of time to backoff when processing fails 0 == no backoff
max_backoff_duration: 2 * @minutes,
# Unit of time for calculating consumer backoff
backoff_multiplier: 1 * @seconds,
# Maximum number of times this consumer will attempt to process a message
# before giving up
max_attempts: 5,
# Duration to wait for a message from a producer when in a state where RDY
# counts are re-distributed (ie. max_in_flight < num_producers)
low_rdy_idle_timeout: 10 * @seconds,
# Duration between redistributing max-in-flight to connections
rdy_redistribute_interval: 5 * @seconds,
# Duration to wait to retry RDY, if conn is maxed out
rdy_retry_delay: 5 * @seconds,
# Identifiers sent to nsqd representing this client UserAgent is in the
# spirit of HTTP (default: "<client_library_name>/<version>")
client_id: nil,
hostname: nil,
user_agent: nil,
# Duration of time between heartbeats. This must be less than read_timeout
heartbeat_interval: 30 * @seconds,
# Integer percentage 0-99 to sample the channel (requires nsqd 0.2.25+)
sample_rate: 0,
# To set TLS config, use the following options:
#
# tls_v1 - Bool enable TLS negotiation
# tls_root_ca_file - String path to file containing root CA
# tls_insecure_skip_verify - Bool indicates whether this client should verify server certificates
# tls_cert - String path to file containing public key for certificate
# tls_key - String path to file containing private key for certificate
# tls_min_version - String indicating the minimum version of tls acceptable :sslv3, :tlsv1, :"tlsv1.1", :"tlsv1.2"
tls_v1: false,
tls_insecure_skip_verify: false,
tls_cert: nil,
tls_key: nil,
tls_min_version: nil,
# Compression settings
deflate: false,
deflate_level: 6,
snappy: false,
# Size of the buffer (in bytes) used by nsqd for buffering writes to this
# connection
output_buffer_size: 16384,
# Timeout used by nsqd before flushing buffered writes (set to 0 to disable).
#
# WARNING: configuring clients with an extremely low
# (< 25ms) output_buffer_timeout has a significant effect
# on nsqd CPU usage (particularly with > 50 clients connected).
output_buffer_timeout: 250 * @ms,
# Maximum number of messages to allow in flight (concurrency knob)
max_in_flight: 2500,
# The server-side message timeout for messages delivered to this client.
# After waiting this long without a TOUCH, NSQD will automatically requeue
# the message.
msg_timeout: 60 * @seconds,
# secret for nsqd authentication (requires nsqd 0.2.29+)
auth_secret: "",
# function or module to deal with messages
message_handler: nil,
# define a custom event manager to get callbacks as to what NSQ is doing
event_manager: nil
}
@valid_ranges %{
read_timeout: {100 * @ms, 5 * @minutes},
write_timeout: {100 * @ms, 5 * @minutes},
lookupd_poll_interval: {10 * @ms, 5 * @minutes},
lookupd_poll_jitter: {0, 1},
max_requeue_delay: {0, :infinity},
max_backoff_duration: {0, 60 * @minutes},
backoff_multiplier: {0, 60 * @minutes},
max_attempts: {0, 65535},
low_rdy_idle_timeout: {1 * @seconds, 5 * @minutes},
rdy_redistribute_interval: {1 * @ms, 5 * @seconds},
sample_rate: {0, 99},
deflate_level: {1, 9},
max_in_flight: {0, :infinity},
msg_timeout: {0, :infinity}
}
defstruct Enum.into(@default_config, [])
@doc """
Given a config, tell us what's wrong with it. If nothing is wrong, we'll
return `{:ok, config}`.
## Examples
iex> NSQ.Config.validate(%NSQ.Config{})
{:ok, %NSQ.Config{}}
iex> NSQ.Config.validate(%NSQ.Config{max_attempts: -1})
{:error, ["max_attempts: -1 below minimum 0"]}
"""
def validate(config) do
errors = []
%NSQ.Config{} = config
errors =
errors ++
Enum.map(@valid_ranges, fn {name, {min, max}} ->
case range_error(Map.get(config, name), min, max) do
{:error, reason} -> "#{name}: #{reason}"
:ok -> nil
end
end)
errors = [
no_match_error(
config.backoff_strategy,
[:exponential, :test]
)
| errors
]
errors = Enum.reject(errors, fn v -> v == nil end)
if length(errors) > 0 do
{:error, errors}
else
{:ok, config}
end
end
def normalize(config) do
config = %NSQ.Config{config | nsqds: normalize_hosts(config.nsqds)}
config = %NSQ.Config{config | nsqlookupds: normalize_hosts(config.nsqlookupds)}
{:ok, config}
end
def normalize_hosts(hosts) do
Enum.map(hosts, fn host_with_port ->
cond do
is_tuple(host_with_port) ->
{_host, _port} = host_with_port
is_binary(host_with_port) ->
[host, port] = host_with_port |> String.split(":")
{port, _} = Integer.parse(port)
{host, port}
is_list(host_with_port) ->
{_host, _port} = List.to_tuple(host_with_port)
true ->
raise "Invalid host definition #{inspect(host_with_port)}"
end
end)
end
defp range_error(val, min, max) do
cond do
val == nil -> :ok
val < min -> {:error, "#{val} below minimum #{min}"}
max != :infinity && val > max -> {:error, "#{val} above maximum #{max}"}
true -> :ok
end
end
defp matches_any?(val, candidates) do
Enum.any?(candidates, fn candidate -> candidate == val end)
end
defp no_match_error(val, candidates) do
if matches_any?(val, candidates) do
nil
else
{:error, "#{val} doesn't match any supported values"}
end
end
end