Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Standardize local_ids/external_ids/alias and clean plan apis #400

Merged
merged 7 commits into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions .cargo-templates/dev.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,25 @@
#
# To enable
# - install clang
# - install mold https://github.com/rui314/mold into /usr/local/bin/mold
# - add a symbolic link from .cargo -> .cargo-dev
# - install mold https://github.com/rui314/mold into /usr/bin/mold
# - add a symbolic link from .cargo -> .cargo-dev
# via `mkdir -p .cargo && ln -s ../.cargo-templates/dev.toml $_/config.toml`.
#
# If there is an issue, reverting is as simple as deleting .cargo.

[target.x86_64-unknown-linux-gnu]
linker = "/usr/bin/clang"
rustflags = ["-C", "link-arg=-fuse-ld=/usr/local/bin/mold"]
rustflags = ["-C", "link-arg=-fuse-ld=mold"]

[target.aarch64-unknown-linux-gnu]
linker = "clang"
rustflags = ["-C", "link-arg=-fuse-ld=/usr/local/bin/mold"]
rustflags = ["-C", "link-arg=-fuse-ld=mold"]

[profile.release]
incremental = true

[profile.fast]
inherits = "release"
[profile.dev]
incremental = true
opt-level = 0
lto = "off"
codegen-units = 256
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -195,3 +195,4 @@ metering = { path = "modules/metering" }
diesel-models = { path = "modules/meteroid/crates/diesel-models" }
meteroid-store = { path = "modules/meteroid/crates/meteroid-store" }
meteroid-invoicing = { path = "modules/meteroid/crates/meteroid-invoicing" }

968 changes: 484 additions & 484 deletions docker/develop/data/seed.sql

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions extra/generator/seed.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ events_per_second: 200
limit: 5000
events:
- event_name: api_request
customer_ids: ["spotify", "uber", "comodo"]
customer_aliases: [ "spotify", "uber", "comodo" ]
properties:
app_id:
type: string
endpoint:
type: pick
values: ["/api/v1/auth", "/api/v1/checkout", "/api/v3/ride"]
values: [ "/api/v1/auth", "/api/v1/checkout", "/api/v3/ride" ]
success:
type: bool
2 changes: 1 addition & 1 deletion extra/generator/src/domain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub struct Connect {
#[derive(Debug, Serialize, Deserialize)]
pub struct Schema {
pub event_name: String,
pub customer_ids: Vec<String>,
pub customer_aliases: Vec<String>,
pub properties: std::collections::HashMap<String, Property>,
pub weight: Option<f64>,
}
Expand Down
4 changes: 2 additions & 2 deletions extra/generator/src/generate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,8 @@ fn generate_random_data(schema: &Schema) -> Event {
Event {
event_id: Uuid::new_v4().to_string(),
event_name: schema.event_name.clone(),
customer_id: Some(CustomerId::ExternalCustomerId(
schema.customer_ids[fastrand::usize(0..schema.customer_ids.len())].clone(),
customer_id: Some(CustomerId::ExternalCustomerAlias(
schema.customer_aliases[fastrand::usize(0..schema.customer_aliases.len())].clone(),
)),
timestamp: now.to_rfc3339(),
properties,
Expand Down
4 changes: 2 additions & 2 deletions modules/adapters/openstack/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ impl EventHandler {
Ok(Some(server::Event {
event_id: sample.message_id.clone(),
event_name: format!("openstack.{}", sample.counter_name),
customer_id: Some(server::event::CustomerId::ExternalCustomerId(
customer_id: Some(server::event::CustomerId::ExternalCustomerAlias(
sample.project_id.clone(),
)),
timestamp: sample.timestamp.clone(),
Expand Down Expand Up @@ -215,7 +215,7 @@ impl EventHandler {
Ok(Some(server::Event {
event_id: event.message_id.clone(),
event_name: format!("openstack.{}", event.event_type),
customer_id: Some(server::event::CustomerId::ExternalCustomerId(
customer_id: Some(server::event::CustomerId::ExternalCustomerAlias(
project_id.to_string(),
)),
timestamp: timestamp.clone(),
Expand Down
2 changes: 1 addition & 1 deletion modules/adapters/slurm-collector/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ async fn send_batch_to_api(client: &mut GrpcClient, batch: &[SacctData]) -> Resu
event_id: data.id.clone(),
event_name: "slurm_job".to_string(),
customer_id: Some(
metering_grpc::meteroid::metering::v1::event::CustomerId::ExternalCustomerId(data.account.clone())
metering_grpc::meteroid::metering::v1::event::CustomerId::ExternalCustomerAlias(data.account.clone())
),
timestamp: data.start_time.to_rfc3339(),
properties,
Expand Down
3 changes: 2 additions & 1 deletion modules/metering/proto/meters.proto
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ message RegisterMeterResponse {
}

message UnregisterMeterRequest {
ResourceIdentifier meter = 1;
string id = 1;
string tenant_id = 2;
}

message UnregisterMeterResponse {}
Expand Down
13 changes: 5 additions & 8 deletions modules/metering/proto/models.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@ syntax = "proto3";

package meteroid.metering.v1;

import "google/protobuf/timestamp.proto";

message Event {
string event_id = 1;
string event_name = 2;
oneof customer_id {
string meteroid_customer_id = 3;
string external_customer_id = 4;
string external_customer_alias = 4;
// TODO we can allow external_subscription_id as well if a resource is linked to a specific subscription (and if we don't have the customer_id)
}
// rfc3339 string
Expand All @@ -25,7 +23,7 @@ message Metadata {
}

message Meter {
string meter_slug = 1;
string id = 1; // id by default is local_id. We could at some point support metric alias for external implementations of metering, & use this field
string event_name = 3;

// TODO used ? (can we store metadata in clickhouse ? if yes do we want some metadata field instead)
Expand Down Expand Up @@ -56,8 +54,7 @@ message Meter {
// segmentation matrix (or that's just the group_by_dimensions ?)
}

message ResourceIdentifier {
string meteroid_id = 1;
// the identifier of the resource within the customer's system (associated with the Meteroid's resource)
string external_id = 2;
message CustomerIdentifier {
string local_id = 1;
optional string alias = 2;
}
4 changes: 1 addition & 3 deletions modules/metering/proto/queries.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@ syntax = "proto3";

package meteroid.metering.v1;

import "common/v1/date.proto";
import "common/v1/decimal.proto";
import "google/protobuf/timestamp.proto";
import "google/protobuf/wrappers.proto";
import "models.proto";

message Filter {
Expand All @@ -18,7 +16,7 @@ message QueryMeterRequest {
string tenant_id = 1;
string meter_slug = 2;
Meter.AggregationType meter_aggregation_type = 3;
repeated ResourceIdentifier customers = 4;
repeated CustomerIdentifier customers = 4;
google.protobuf.Timestamp from = 5;
google.protobuf.Timestamp to = 6;
// If null, default to WindowSize.AGGREGATE_ALL
Expand Down
3 changes: 2 additions & 1 deletion modules/metering/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ use std::sync::Arc;

// type IdentifierCache = Lazy<RwLock<SizedCache<(String, String), String>>>;
// pub static CUSTOMER_ID_CACHE: IdentifierCache = Lazy::new(|| RwLock::new(SizedCache::with_size(10000)));
type IdentifierCache = Lazy<Arc<Cache<(String, String), String>>>;
type TenantAliasTuple = (String, String);
type IdentifierCache = Lazy<Arc<Cache<TenantAliasTuple, String>>>;
pub static CUSTOMER_ID_CACHE: IdentifierCache = Lazy::new(|| Arc::new(Cache::new(10000)));

// TODO add an optional redis on top
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ fn create_meter_view_to_select_sql(meter: Meter) -> String {
}

pub fn create_meter_view(meter: Meter, populate: bool) -> String {
let view_name = get_meter_view_name(&meter.namespace, &meter.meter_slug);
let view_name = get_meter_view_name(&meter.namespace, &meter.id);
let mut columns = vec![
Column {
name: "customer_id".to_string(),
Expand Down Expand Up @@ -159,7 +159,7 @@ mod tests {
fn test_create_meter_view_count() {
let meter = Meter {
namespace: "test_namespace".to_string(),
meter_slug: "test_slug".to_string(),
id: "test_slug".to_string(),
event_name: "test_event".to_string(),
aggregation: MeterAggregation::Count,
group_by: vec!["test_group1".to_string(), "test_group2".to_string()],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ pub fn query_meter_view_sql(params: QueryMeterParams) -> Result<String, String>
let subjects_condition = params
.customers
.iter()
.map(|customer| format!("customer_id = '{}'", customer.id)) // TODO config for id/ext/custom field
.map(|customer| format!("customer_id = '{}'", customer.local_id)) // TODO config for id/ext/custom field
.collect::<Vec<_>>()
.join(" OR ");
where_clauses.push(format!("({})", subjects_condition));
Expand Down
6 changes: 3 additions & 3 deletions modules/metering/src/domain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,16 @@ pub enum WindowSize {
pub struct Meter {
pub aggregation: MeterAggregation,
pub namespace: String,
pub meter_slug: String,
pub id: String,
pub event_name: String,
pub value_property: Option<String>,
pub group_by: Vec<String>,
}

#[derive(Debug, Clone)]
pub struct Customer {
pub id: String,
pub external_id: String,
pub local_id: String,
pub alias: Option<String>,
// pub custom_fields: HashMap<String, String>,
}

Expand Down
57 changes: 29 additions & 28 deletions modules/metering/src/ingest/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::ingest::domain::{FailedEvent, ProcessedEvent};
use crate::ingest::sinks::Sink;
use common_grpc::middleware::server::auth::RequestExt;
use meteroid_grpc::meteroid::internal::v1::internal_service_client::InternalServiceClient;
use meteroid_grpc::meteroid::internal::v1::ResolveCustomerExternalIdsRequest;
use meteroid_grpc::meteroid::internal::v1::ResolveCustomerAliasesRequest;

#[derive(Clone)]
pub struct EventsService {
Expand Down Expand Up @@ -62,19 +62,18 @@ impl EventsServiceGrpc for EventsService {
// - get the customer_id from external_customer_id as necessary
let mut resolved = vec![];
let mut unresolved = vec![];
let mut unresolved_ids = vec![];
let mut unresolved_aliases = vec![];

let now = chrono::Utc::now();

for event in events {
match validate_event(&event, &now, allow_backfilling) {
Ok((id, ts)) => match id {
CustomerId::MeteroidCustomerId(meteroid_id) => resolved.push(
to_processed_event(event, meteroid_id, tenant_id.clone(), ts),
),
CustomerId::ExternalCustomerId(external_id) => {
let from_cache =
CUSTOMER_ID_CACHE.get(&(tenant_id.clone(), external_id.clone()));
CustomerId::MeteroidCustomerId(local_id) => {
resolved.push(to_processed_event(event, local_id, tenant_id.clone(), ts))
}
CustomerId::ExternalCustomerAlias(alias) => {
let from_cache = CUSTOMER_ID_CACHE.get(&(tenant_id.clone(), alias.clone()));
match from_cache {
Some(meteroid_id) => resolved.push(to_processed_event(
event,
Expand All @@ -83,8 +82,8 @@ impl EventsServiceGrpc for EventsService {
ts,
)),
None => {
unresolved_ids.push(external_id.clone());
unresolved.push((event, external_id.clone(), ts))
unresolved_aliases.push(alias.clone());
unresolved.push((event, alias.clone(), ts))
}
}
}
Expand All @@ -98,15 +97,15 @@ impl EventsServiceGrpc for EventsService {
};
}

if !unresolved_ids.is_empty() {
if !unresolved_aliases.is_empty() {
// we call the api to resolve customers by external id & tenant

let mut client = self.internal_client.clone();

let res = client
.resolve_customer_external_ids(ResolveCustomerExternalIdsRequest {
.resolve_customer_aliases(ResolveCustomerAliasesRequest {
tenant_id: tenant_id.clone(),
external_ids: unresolved_ids,
aliases: unresolved_aliases,
})
.await
.map_err(|e| {
Expand All @@ -117,31 +116,33 @@ impl EventsServiceGrpc for EventsService {

let res = res.into_inner();

res.unresolved_ids.into_iter().for_each(|external_id| {
failed_events.push(FailedEvent {
event: unresolved
.iter()
.find(|(_, id, _)| id == &external_id)
.unwrap()
.0
.clone(),
reason: "Unable to resolve external id".to_string(),
})
});
res.unresolved_aliases
.into_iter()
.for_each(|unresolved_alias| {
failed_events.push(FailedEvent {
event: unresolved
.iter()
.find(|(_, alias, _)| alias == &unresolved_alias)
.unwrap()
.0
.clone(),
reason: "Unable to resolve unresolved alias".to_string(),
})
});

res.customers.into_iter().for_each(|customer| {
CUSTOMER_ID_CACHE.insert(
(tenant_id.clone(), customer.external_id.clone()),
customer.meteroid_id.clone(),
(tenant_id.clone(), customer.alias.clone()),
customer.local_id.clone(),
);
let (event, _, ts) = unresolved
.iter()
.find(|(_, id, _)| id == &customer.external_id)
.find(|(_, alias, _)| alias == &customer.alias)
.unwrap();

resolved.push(to_processed_event(
event.clone(),
customer.meteroid_id,
customer.local_id,
tenant_id.clone(),
*ts,
))
Expand Down
2 changes: 1 addition & 1 deletion modules/metering/src/meters/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl MetersServiceGrpc for MetersService {
let meter = Meter {
aggregation: meter_aggregation,
namespace: req.tenant_id,
meter_slug: meter.meter_slug,
id: meter.id,
event_name: meter.event_name,
value_property: meter.aggregation_key,
group_by: meter.dimensions,
Expand Down
4 changes: 2 additions & 2 deletions modules/metering/src/query/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ impl UsageQueryServiceGrpc for UsageQueryService {
.customers
.iter()
.map(|c| Customer {
id: c.meteroid_id.clone(),
external_id: c.external_id.clone(),
alias: c.alias.clone(),
local_id: c.local_id.clone(),
})
.collect(),
group_by: req.group_by_properties,
Expand Down
2 changes: 2 additions & 0 deletions modules/meteroid/crates/diesel-models/src/add_ons.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub struct AddOnRow {
pub tenant_id: Uuid,
pub created_at: NaiveDateTime,
pub updated_at: NaiveDateTime,
pub local_id: String,
}

#[derive(Debug, Default, Insertable)]
Expand All @@ -22,6 +23,7 @@ pub struct AddOnRowNew {
pub name: String,
pub fee: serde_json::Value,
pub tenant_id: Uuid,
pub local_id: String,
}

#[derive(AsChangeset)]
Expand Down
5 changes: 5 additions & 0 deletions modules/meteroid/crates/diesel-models/src/billable_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ pub struct BillableMetricRow {
pub archived_at: Option<NaiveDateTime>,
pub tenant_id: Uuid,
pub product_family_id: Uuid,
pub product_id: Option<Uuid>,
pub local_id: String,
}

#[derive(Debug, Clone, Insertable)]
Expand All @@ -44,13 +46,16 @@ pub struct BillableMetricRowNew {
pub created_by: Uuid,
pub tenant_id: Uuid,
pub product_family_id: Uuid,
pub product_id: Option<Uuid>,
pub local_id: String,
}

#[derive(Debug, Identifiable, Queryable, Selectable)]
#[diesel(table_name = crate::schema::billable_metric)]
#[diesel(check_for_backend(diesel::pg::Pg))]
pub struct BillableMetricMetaRow {
pub id: Uuid,
pub local_id: String,
pub name: String,
pub code: String,
pub aggregation_type: BillingMetricAggregateEnum,
Expand Down
Loading
Loading