Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug 1902992 - Flush database time-based when delayPingLifetimeIo=true #2871

Merged
merged 4 commits into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ commands:
name: Run Rust RLB delayed ping data test
command: |
glean-core/rlb/tests/test-delayed-ping-data.sh
- run:
name: Run Rust RLB flush test
command: |
glean-core/rlb/tests/test-ping-lifetime-flush.sh
- run:
name: Upload coverage report
command: |
Expand Down
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

[Full changelog](https://github.com/mozilla/glean/compare/v60.4.0...main)

* General
* Make auto-flush behavior configurable and time-based ([#2871](https://github.com/mozilla/glean/pull/2871))
* Android
* Update to Gradle v8.9 ([#2909](https://github.com/mozilla/glean/pull/2909))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,8 @@ open class GleanInternalAPI internal constructor() {
experimentationId = configuration.experimentationId,
enableInternalPings = configuration.enableInternalPings,
pingSchedule = emptyMap(),
pingLifetimeThreshold = configuration.pingLifetimeThreshold.toULong(),
pingLifetimeMaxTime = configuration.pingLifetimeMaxTime.toULong(),
)
val clientInfo = getClientInfo(configuration, buildInfo)
val callbacks = OnGleanEventsImpl(this@GleanInternalAPI)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import mozilla.telemetry.glean.net.PingUploader
* to be sent with all pings.
* @property enableInternalPings Whether to enable internal pings.
* @property delayPingLifetimeIo Whether Glean should delay persistence of data from metrics with ping lifetime.
* @property pingLifetimeThreshold Write count threshold when to auto-flush. `0` disables it.
* @property pingLifetimeMaxTime After what time to auto-flush (in milliseconds). 0 disables it.
*/
data class Configuration @JvmOverloads constructor(
val serverEndpoint: String = DEFAULT_TELEMETRY_ENDPOINT,
Expand All @@ -40,6 +42,8 @@ data class Configuration @JvmOverloads constructor(
val experimentationId: String? = null,
val enableInternalPings: Boolean = true,
val delayPingLifetimeIo: Boolean = true,
val pingLifetimeThreshold: Int = 1000,
val pingLifetimeMaxTime: Int = 0,
) {
companion object {
/**
Expand Down
10 changes: 9 additions & 1 deletion glean-core/ios/Glean/Config/Configuration.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ public struct Configuration {
let enableEventTimestamps: Bool
let experimentationId: String?
let enableInternalPings: Bool
let pingLifetimeThreshold: Int
let pingLifetimeMaxTime: Int

struct Constants {
static let defaultTelemetryEndpoint = "https://incoming.telemetry.mozilla.org"
Expand All @@ -32,6 +34,8 @@ public struct Configuration {
/// * experimentationId An experimentation identifier derived by the application
/// to be sent with all pings.
/// * enableInternalPings Whether to enable internal pings.
/// * pingLifetimeThreshold Write count threshold when to auto-flush. `0` disables it.
/// * pingLifetimeMaxTime After what time to auto-flush (in milliseconds). 0 disables it.
public init(
maxEvents: Int32? = nil,
channel: String? = nil,
Expand All @@ -40,7 +44,9 @@ public struct Configuration {
logLevel: LevelFilter? = nil,
enableEventTimestamps: Bool = true,
experimentationId: String? = nil,
enableInternalPings: Bool = true
enableInternalPings: Bool = true,
pingLifetimeThreshold: Int = 0,
pingLifetimeMaxTime: Int = 0
) {
self.serverEndpoint = serverEndpoint ?? Constants.defaultTelemetryEndpoint
self.maxEvents = maxEvents
Expand All @@ -50,5 +56,7 @@ public struct Configuration {
self.enableEventTimestamps = enableEventTimestamps
self.experimentationId = experimentationId
self.enableInternalPings = enableInternalPings
self.pingLifetimeThreshold = pingLifetimeThreshold
self.pingLifetimeMaxTime = pingLifetimeMaxTime
}
}
4 changes: 3 additions & 1 deletion glean-core/ios/Glean/Glean.swift
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,9 @@ public class Glean {
enableEventTimestamps: configuration.enableEventTimestamps,
experimentationId: configuration.experimentationId,
enableInternalPings: configuration.enableInternalPings,
pingSchedule: [:]
pingSchedule: [:],
pingLifetimeThreshold: UInt64(configuration.pingLifetimeThreshold),
pingLifetimeMaxTime: UInt64(configuration.pingLifetimeMaxTime)
)
let clientInfo = getClientInfo(configuration, buildInfo: buildInfo)
let callbacks = OnGleanEventsImpl(glean: self)
Expand Down
2 changes: 2 additions & 0 deletions glean-core/python/glean/glean.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,8 @@ def initialize(
experimentation_id=configuration.experimentation_id,
enable_internal_pings=configuration.enable_internal_pings,
ping_schedule={},
ping_lifetime_threshold=0,
ping_lifetime_max_time=0,
)

_uniffi.glean_initialize(cfg, client_info, callbacks)
Expand Down
2 changes: 2 additions & 0 deletions glean-core/python/glean/net/ping_upload_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ def _process(data_dir: Path, application_id: str, configuration) -> bool:
experimentation_id=None,
enable_internal_pings=False,
ping_schedule={},
ping_lifetime_threshold=0,
ping_lifetime_max_time=0,
)
if not glean_initialize_for_subprocess(cfg):
log.error("Couldn't initialize Glean in subprocess")
Expand Down
3 changes: 3 additions & 0 deletions glean-core/rlb/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,6 @@ tempfile = "3.1.0"

[features]
preinit_million_queue = ["glean-core/preinit_million_queue"]

[[example]]
name = "ping-lifetime-flush"
136 changes: 136 additions & 0 deletions glean-core/rlb/examples/ping-lifetime-flush.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.

use std::fs::File;
use std::io::{Read, Write};
use std::path::PathBuf;
use std::time::Duration;
use std::{env, process, thread};

use once_cell::sync::Lazy;

use flate2::read::GzDecoder;
use glean::net;
use glean::{private::PingType, ClientInfoMetrics, ConfigurationBuilder};

pub mod glean_metrics {
use glean::{private::CounterMetric, CommonMetricData, Lifetime};

#[allow(non_upper_case_globals)]
pub static sample_counter: once_cell::sync::Lazy<CounterMetric> =
once_cell::sync::Lazy::new(|| {
CounterMetric::new(CommonMetricData {
name: "sample_counter".into(),
category: "test.metrics".into(),
send_in_pings: vec!["prototype".into()],
disabled: false,
lifetime: Lifetime::Ping,
..Default::default()
})
});
}

#[derive(Debug)]
struct MovingUploader(PathBuf);

impl MovingUploader {
fn new(mut path: PathBuf) -> Self {
path.push("sent_pings");
std::fs::create_dir_all(&path).unwrap();
Self(path)
}
}

impl net::PingUploader for MovingUploader {
fn upload(&self, upload_request: net::PingUploadRequest) -> net::UploadResult {
let net::PingUploadRequest {
body, url, headers, ..
} = upload_request;
let mut gzip_decoder = GzDecoder::new(&body[..]);
let mut s = String::with_capacity(body.len());

let data = gzip_decoder
.read_to_string(&mut s)
.ok()
.map(|_| &s[..])
.or_else(|| std::str::from_utf8(&body).ok())
.unwrap();

let docid = url.rsplit('/').next().unwrap();
let out_path = self.0.join(format!("{docid}.json"));
let mut fp = File::create(out_path).unwrap();

// pseudo-JSON, let's hope this works.
writeln!(fp, "{{").unwrap();
writeln!(fp, " \"url\": {url},").unwrap();
for (key, val) in headers {
writeln!(fp, " \"{key}\": \"{val}\",").unwrap();
}
writeln!(fp, "}}").unwrap();
writeln!(fp, "{data}").unwrap();

net::UploadResult::http_status(200)
}
}

#[allow(non_upper_case_globals)]
pub static PrototypePing: Lazy<PingType> =
Lazy::new(|| PingType::new("prototype", true, true, false, true, true, vec![], vec![]));

fn main() {
env_logger::init();

let mut args = env::args().skip(1);

let data_path = PathBuf::from(args.next().expect("need data path"));
let state = args.next().unwrap_or_default();

let uploader = MovingUploader::new(data_path.clone());
let cfg = ConfigurationBuilder::new(true, data_path, "glean.pingflush")
.with_server_endpoint("invalid-test-host")
.with_use_core_mps(false)
.with_uploader(uploader)
.with_delay_ping_lifetime_io(true)
.with_ping_lifetime_threshold(1000)
.with_ping_lifetime_max_time(Duration::from_millis(2000))
.build();

let client_info = ClientInfoMetrics {
app_build: env!("CARGO_PKG_VERSION").to_string(),
app_display_version: env!("CARGO_PKG_VERSION").to_string(),
channel: None,
locale: None,
};

glean::initialize(cfg, client_info);

// Wait for init to finish,
// otherwise we might be to quick with calling `shutdown`.
badboy marked this conversation as resolved.
Show resolved Hide resolved
let _ = glean_metrics::sample_counter.test_get_value(None);

match &*state {
"accumulate_one_and_pretend_crash" => {
log::debug!("incrementing by 1. exiting without shutdown.");
glean_metrics::sample_counter.add(1)
}
"accumulate_ten_and_wait" => {
log::debug!("incrementing by 10, waiting, incrementing again. should trigger a flush.");
glean_metrics::sample_counter.add(10);
thread::sleep(Duration::from_millis(2500));
glean_metrics::sample_counter.add(10);
// give it some time to work
thread::sleep(Duration::from_millis(100));
}
"submit_ping" => {
log::info!("submitting PrototypePing");
PrototypePing.submit(None);

glean::shutdown();
}
_ => {
eprintln!("unknown argument: {state}");
process::exit(1);
}
}
}
25 changes: 25 additions & 0 deletions glean-core/rlb/src/configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::net::PingUploader;

use std::collections::HashMap;
use std::path::PathBuf;
use std::time::Duration;

/// The default server pings are sent to.
pub(crate) const DEFAULT_GLEAN_ENDPOINT: &str = "https://incoming.telemetry.mozilla.org";
Expand Down Expand Up @@ -53,6 +54,10 @@ pub struct Configuration {
/// Maps a ping name to a list of pings to schedule along with it.
/// Only used if the ping's own ping schedule list is empty.
pub ping_schedule: HashMap<String, Vec<String>>,
/// Write count threshold when to auto-flush. `0` disables it.
pub ping_lifetime_threshold: usize,
/// After what time to auto-flush. 0 disables it.
pub ping_lifetime_max_time: Duration,
}

/// Configuration builder.
Expand Down Expand Up @@ -105,6 +110,10 @@ pub struct Builder {
/// Maps a ping name to a list of pings to schedule along with it.
/// Only used if the ping's own ping schedule list is empty.
pub ping_schedule: HashMap<String, Vec<String>>,
/// Write count threshold when to auto-flush. `0` disables it.
pub ping_lifetime_threshold: usize,
/// After what time to auto-flush. 0 disables it.
pub ping_lifetime_max_time: Duration,
}

impl Builder {
Expand All @@ -130,6 +139,8 @@ impl Builder {
experimentation_id: None,
enable_internal_pings: true,
ping_schedule: HashMap::new(),
ping_lifetime_threshold: 0,
ping_lifetime_max_time: Duration::ZERO,
}
}

Expand All @@ -151,6 +162,8 @@ impl Builder {
experimentation_id: self.experimentation_id,
enable_internal_pings: self.enable_internal_pings,
ping_schedule: self.ping_schedule,
ping_lifetime_threshold: self.ping_lifetime_threshold,
ping_lifetime_max_time: self.ping_lifetime_max_time,
}
}

Expand Down Expand Up @@ -213,4 +226,16 @@ impl Builder {
self.ping_schedule = value;
self
}

/// Write count threshold when to auto-flush. `0` disables it.
pub fn with_ping_lifetime_threshold(mut self, value: usize) -> Self {
self.ping_lifetime_threshold = value;
self
}

/// After what time to auto-flush. 0 disables it.
pub fn with_ping_lifetime_max_time(mut self, value: Duration) -> Self {
self.ping_lifetime_max_time = value;
self
}
}
2 changes: 2 additions & 0 deletions glean-core/rlb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ fn initialize_internal(cfg: Configuration, client_info: ClientInfoMetrics) -> Op
experimentation_id: cfg.experimentation_id,
enable_internal_pings: cfg.enable_internal_pings,
ping_schedule: cfg.ping_schedule,
ping_lifetime_threshold: cfg.ping_lifetime_threshold as u64,
ping_lifetime_max_time: cfg.ping_lifetime_max_time.as_millis() as u64,
};

glean_core::glean_initialize(core_cfg, client_info.into(), callbacks);
Expand Down
53 changes: 53 additions & 0 deletions glean-core/rlb/tests/test-ping-lifetime-flush.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
#!/bin/bash

# Test harness for testing the RLB processes from the outside.
#
# Some behavior can only be observed when properly exiting the process running Glean,
# e.g. when an uploader runs in another thread.
# On exit the threads will be killed, regardless of their state.

# Remove the temporary data path on all exit conditions
cleanup() {
if [ -n "$datapath" ]; then
rm -r "$datapath"
fi
}
trap cleanup INT ABRT TERM EXIT

tmp="${TMPDIR:-/tmp}"
datapath=$(mktemp -d "${tmp}/glean_ping_lifetime_flush.XXXX")

cmd="cargo run -p glean --example ping-lifetime-flush -- $datapath"

# First run "crashes" -> no increment stored
$cmd accumulate_one_and_pretend_crash
count=$(ls -1q "$datapath/sent_pings" | wc -l)
if [[ "$count" -ne 0 ]]; then
echo "test result: FAILED."
exit 101
fi

# Second run increments, waits, increments -> increment flushed to disk.
# No ping is sent.
$cmd accumulate_ten_and_wait
count=$(ls -1q "$datapath/sent_pings" | wc -l)
if [[ "$count" -ne 0 ]]; then
echo "test result: FAILED."
exit 101
fi

# Third run sends the ping.
$cmd submit_ping
count=$(ls -1q "$datapath/sent_pings" | wc -l)
if [[ "$count" -ne 1 ]]; then
echo "test result: FAILED."
exit 101
fi

if ! grep -q '"test.metrics.sample_counter":20' "$datapath"/sent_pings/*; then
echo "test result: FAILED."
exit 101
fi

echo "test result: ok."
exit 0
Loading
Loading