Skip to content
This repository has been archived by the owner on Jul 13, 2023. It is now read-only.

Commit

Permalink
feat: fill in missing metrics for autopush_rs
Browse files Browse the repository at this point in the history
the rust side's metrics (via cadence) currently lack the ability to
pass datadog tags, currently ua.{message_data,connection.lifespan}

Issue: #1054
  • Loading branch information
pjenvey committed Nov 7, 2017
1 parent bc32908 commit eb37fa0
Show file tree
Hide file tree
Showing 12 changed files with 111 additions and 9 deletions.
8 changes: 7 additions & 1 deletion autopush/tests/test_webpush_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@

from autopush.db import (
DatabaseManager,
make_rotating_tablename,
generate_last_connect,
make_rotating_tablename,
)
from autopush.metrics import SinkMetrics
from autopush.config import AutopushConfig
Expand Down Expand Up @@ -234,6 +234,9 @@ def test_nonexisting_uaid(self):
assert isinstance(result, HelloResponse)
assert hello.uaid != result.uaid
assert result.check_storage is False
assert result.connected_at == hello.connected_at
assert self.metrics.increment.called
assert self.metrics.increment.call_args[0][0] == 'ua.command.hello'

def test_existing_uaid(self):
p = self._makeFUT()
Expand All @@ -245,6 +248,9 @@ def test_existing_uaid(self):
assert isinstance(result, HelloResponse)
assert hello.uaid.hex == result.uaid
assert result.check_storage is True
assert result.connected_at == hello.connected_at
assert self.metrics.increment.called
assert self.metrics.increment.call_args[0][0] == 'ua.command.hello'

def test_existing_newer_uaid(self):
p = self._makeFUT()
Expand Down
20 changes: 17 additions & 3 deletions autopush/webpush_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ class HelloResponse(OutputCommand):
message_month = attrib() # type: str
check_storage = attrib() # type: bool
reset_uaid = attrib() # type: bool
connected_at = attrib() # type: int
rotate_message_table = attrib(default=False) # type: bool


Expand Down Expand Up @@ -212,7 +213,6 @@ def __init__(self, conf, db, num_threads=10):
self.db = db
self.db.setup_tables()
self.num_threads = num_threads
self.metrics = self.db.metrics
self.incoming = AutopushQueue()
self.workers = [] # type: List[Thread]
self.command_processor = CommandProcessor(conf, self.db)
Expand Down Expand Up @@ -357,10 +357,12 @@ def process(self, hello):
# Save the UAID as register_user removes it
uaid = user_item["uaid"] # type: str
success, _ = self.db.router.register_user(user_item)
flags["connected_at"] = hello.connected_at
if not success:
# User has already connected more recently elsewhere
return HelloResponse(uaid=None, **flags)

self.metrics.increment('ua.command.hello')
return HelloResponse(uaid=uaid, **flags)

def lookup_user(self, hello):
Expand All @@ -380,13 +382,13 @@ def lookup_user(self, hello):
# All records must have a router_type and connected_at, in some odd
# cases a record exists for some users without it
if "router_type" not in record or "connected_at" not in record:
self.db.router.drop_user(uaid)
self.drop_user(uaid, record, 104)
return None, flags

# Current month must exist and be a valid prior month
if ("current_month" not in record) or record["current_month"] \
not in self.db.message_tables:
self.db.router.drop_user(uaid)
self.drop_user(uaid, record, 105)
return None, flags

# If we got here, its a valid user that needs storage checked
Expand Down Expand Up @@ -425,6 +427,18 @@ def create_user(self, hello):
current_month=self.db.current_msg_month,
)

def drop_user(self, uaid, uaid_record, code):
# type: (str, dict, int) -> None
"""Drop a user record"""
log.debug(
"Dropping User",
code=code,
uaid_hash=hasher(uaid),
uaid_record=repr(uaid_record)
)
self.metrics.increment('ua.expiration', tags=['code:{}'.format(code)])
self.db.router.drop_user(uaid)


class CheckStorageCommand(ProcessorCommand):
def process(self, command):
Expand Down
16 changes: 16 additions & 0 deletions autopush_rs/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions autopush_rs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ crate-type = ["cdylib"]

[dependencies]
bytes = "0.4"
cadence = "0.12.1"
env_logger = { version = "0.4", default-features = false }
error-chain = "0.10"
futures = "0.1"
Expand Down
2 changes: 2 additions & 0 deletions autopush_rs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ def __init__(self, conf, queue):
cfg.ssl_key = ffi_from_buffer(conf.ssl.key)
cfg.url = ffi_from_buffer(conf.ws_url)
cfg.json_logging = True
cfg.statsd_host = ffi_from_buffer(conf.statsd_host)
cfg.statsd_port = conf.statsd_port

ptr = _call(lib.autopush_server_new, cfg)
self.ffi = ffi.gc(ptr, lib.autopush_server_free)
Expand Down
1 change: 1 addition & 0 deletions autopush_rs/src/call.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ pub struct HelloResponse {
pub check_storage: bool,
pub reset_uaid: bool,
pub rotate_message_table: bool,
pub connected_at: u64,
}

#[derive(Deserialize)]
Expand Down
29 changes: 27 additions & 2 deletions autopush_rs/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use std::rc::Rc;

use cadence::prelude::*;
use futures::AsyncSink;
use futures::future::Either;
use futures::sync::mpsc;
Expand Down Expand Up @@ -48,6 +49,7 @@ pub struct WebPushClient {
// Highest version from stored, retained for use with increment
// when all the unacked storeds are ack'd
unacked_stored_highest: Option<i64>,
connected_at: u64,
}

impl WebPushClient {
Expand Down Expand Up @@ -172,6 +174,16 @@ where
if more_messages.is_some() {
let mut messages = more_messages.take().unwrap();
if let Some(message) = messages.pop() {
if let Some(_) = message.topic {
self.data.srv.metrics.incr("ua.notification.topic")?;
}
// XXX: tags
self.data.srv.metrics.count(
"ua.message_data",
message.data.as_ref().map_or(0, |d| {
d.len() as i64
}),
)?;
ClientState::FinishSend(
Some(ServerMessage::Notification(message)),
Some(Box::new(ClientState::SendMessages(if messages.len() > 0 {
Expand Down Expand Up @@ -220,8 +232,10 @@ where
} => uaid,
_ => return Err("Invalid message, must be hello".into()),
};
let ms_time = time::precise_time_ns() / 1000;
ClientState::WaitingForProcessHello(self.data.srv.hello(&ms_time, uaid.as_ref()))
let connected_at = time::precise_time_ns() / 1000;
ClientState::WaitingForProcessHello(
self.data.srv.hello(&connected_at, uaid.as_ref()),
)
}
ClientState::WaitingForProcessHello(ref mut response) => {
debug!("State: WaitingForProcessHello");
Expand All @@ -232,13 +246,15 @@ where
check_storage,
reset_uaid,
rotate_message_table,
connected_at,
} => {
self.data.process_hello(
uaid,
message_month,
reset_uaid,
rotate_message_table,
check_storage,
connected_at,
)
}
call::HelloResponse { uaid: None, .. } => {
Expand Down Expand Up @@ -466,6 +482,7 @@ where
reset_uaid: bool,
rotate_message_table: bool,
check_storage: bool,
connected_at: u64,
) -> ClientState {
let (tx, rx) = mpsc::unbounded();
let mut flags = ClientFlags::new();
Expand All @@ -480,6 +497,7 @@ where
unacked_direct_notifs: Vec::new(),
unacked_stored_notifs: Vec::new(),
unacked_stored_highest: None,
connected_at: connected_at,
});
self.srv.connect_client(
RegisteredClient { uaid: uaid, tx: tx },
Expand Down Expand Up @@ -523,6 +541,7 @@ where
}

fn process_acks(&mut self, updates: Vec<ClientAck>) -> ClientState {
self.srv.metrics.incr("ua.command.ack").ok();
let webpush = self.webpush.as_mut().unwrap();
let mut fut: Option<MyFuture<call::DeleteMessageResponse>> = None;
for notif in updates.iter() {
Expand Down Expand Up @@ -588,8 +607,14 @@ where

pub fn shutdown(&mut self) {
// If we made it past hello, do more cleanup

if self.webpush.is_some() {
let webpush = self.webpush.take().unwrap();
let now = time::precise_time_ns() / 1000;
let elapsed = now - webpush.connected_at;
// XXX: tags
self.srv.metrics.time("ua.connection.lifespan", elapsed).ok();

// If there's direct unack'd messages, they need to be saved out without blocking
// here
self.srv.disconnet_client(&webpush.uaid);
Expand Down
2 changes: 2 additions & 0 deletions autopush_rs/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use std::any::Any;
use std::error;
use std::io;

use cadence;
use futures::Future;
use httparse;
use serde_json;
Expand All @@ -38,6 +39,7 @@ error_chain! {
Io(io::Error);
Json(serde_json::Error);
Httparse(httparse::Error);
MetricError(cadence::MetricError);
}

errors {
Expand Down
1 change: 1 addition & 0 deletions autopush_rs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
//! Otherwise be sure to check out each module for more documentation!
extern crate bytes;
extern crate cadence;
extern crate env_logger;
#[macro_use]
extern crate futures;
Expand Down
2 changes: 1 addition & 1 deletion autopush_rs/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ pub struct Notification {
pub topic: Option<String>,
pub timestamp: u64,
#[serde(skip_serializing_if = "Option::is_none")]
data: Option<String>,
pub data: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
headers: Option<HashMap<String, String>>,
}
23 changes: 23 additions & 0 deletions autopush_rs/src/server/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
//! Metrics tie-ins
use std::net::UdpSocket;

use cadence::{BufferedUdpMetricSink, NopMetricSink, QueuingMetricSink, StatsdClient};

use errors::*;
use server::ServerOptions;

/// Create a cadence StatsdClient from the given options
pub fn metrics_from_opts(opts: &ServerOptions) -> Result<StatsdClient> {
Ok(if let Some(statsd_host) = opts.statsd_host.as_ref() {
let socket = UdpSocket::bind("0.0.0.0:0")?;
socket.set_nonblocking(true)?;

let host = (statsd_host.as_str(), opts.statsd_port);
let udp_sink = BufferedUdpMetricSink::from(host, socket)?;
let sink = QueuingMetricSink::from(udp_sink);
StatsdClient::from_sink("autopush", sink)
} else {
StatsdClient::from_sink("autopush", NopMetricSink)
})
}
Loading

0 comments on commit eb37fa0

Please sign in to comment.