Skip to content

Commit

Permalink
chore: Standardize local_ids/external_ids/alias and clean plan apis (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
gaspb authored Nov 18, 2024
1 parent bdc6b5a commit ac8ef3c
Show file tree
Hide file tree
Showing 160 changed files with 2,552 additions and 2,260 deletions.
12 changes: 6 additions & 6 deletions .cargo-templates/dev.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,25 @@
#
# To enable
# - install clang
# - install mold https://github.com/rui314/mold into /usr/local/bin/mold
# - add a symbolic link from .cargo -> .cargo-dev
# - install mold https://github.com/rui314/mold into /usr/bin/mold
# - add a symbolic link from .cargo -> .cargo-dev
# via `mkdir -p .cargo && ln -s ../.cargo-templates/dev.toml $_/config.toml`.
#
# If there is an issue, reverting is as simple as deleting .cargo.

[target.x86_64-unknown-linux-gnu]
linker = "/usr/bin/clang"
rustflags = ["-C", "link-arg=-fuse-ld=/usr/local/bin/mold"]
rustflags = ["-C", "link-arg=-fuse-ld=mold"]

[target.aarch64-unknown-linux-gnu]
linker = "clang"
rustflags = ["-C", "link-arg=-fuse-ld=/usr/local/bin/mold"]
rustflags = ["-C", "link-arg=-fuse-ld=mold"]

[profile.release]
incremental = true

[profile.fast]
inherits = "release"
[profile.dev]
incremental = true
opt-level = 0
lto = "off"
codegen-units = 256
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -195,3 +195,4 @@ metering = { path = "modules/metering" }
diesel-models = { path = "modules/meteroid/crates/diesel-models" }
meteroid-store = { path = "modules/meteroid/crates/meteroid-store" }
meteroid-invoicing = { path = "modules/meteroid/crates/meteroid-invoicing" }

968 changes: 484 additions & 484 deletions docker/develop/data/seed.sql

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions extra/generator/seed.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ events_per_second: 200
limit: 5000
events:
- event_name: api_request
customer_ids: ["spotify", "uber", "comodo"]
customer_aliases: [ "spotify", "uber", "comodo" ]
properties:
app_id:
type: string
endpoint:
type: pick
values: ["/api/v1/auth", "/api/v1/checkout", "/api/v3/ride"]
values: [ "/api/v1/auth", "/api/v1/checkout", "/api/v3/ride" ]
success:
type: bool
2 changes: 1 addition & 1 deletion extra/generator/src/domain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub struct Connect {
#[derive(Debug, Serialize, Deserialize)]
pub struct Schema {
pub event_name: String,
pub customer_ids: Vec<String>,
pub customer_aliases: Vec<String>,
pub properties: std::collections::HashMap<String, Property>,
pub weight: Option<f64>,
}
Expand Down
4 changes: 2 additions & 2 deletions extra/generator/src/generate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,8 @@ fn generate_random_data(schema: &Schema) -> Event {
Event {
event_id: Uuid::new_v4().to_string(),
event_name: schema.event_name.clone(),
customer_id: Some(CustomerId::ExternalCustomerId(
schema.customer_ids[fastrand::usize(0..schema.customer_ids.len())].clone(),
customer_id: Some(CustomerId::ExternalCustomerAlias(
schema.customer_aliases[fastrand::usize(0..schema.customer_aliases.len())].clone(),
)),
timestamp: now.to_rfc3339(),
properties,
Expand Down
4 changes: 2 additions & 2 deletions modules/adapters/openstack/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ impl EventHandler {
Ok(Some(server::Event {
event_id: sample.message_id.clone(),
event_name: format!("openstack.{}", sample.counter_name),
customer_id: Some(server::event::CustomerId::ExternalCustomerId(
customer_id: Some(server::event::CustomerId::ExternalCustomerAlias(
sample.project_id.clone(),
)),
timestamp: sample.timestamp.clone(),
Expand Down Expand Up @@ -215,7 +215,7 @@ impl EventHandler {
Ok(Some(server::Event {
event_id: event.message_id.clone(),
event_name: format!("openstack.{}", event.event_type),
customer_id: Some(server::event::CustomerId::ExternalCustomerId(
customer_id: Some(server::event::CustomerId::ExternalCustomerAlias(
project_id.to_string(),
)),
timestamp: timestamp.clone(),
Expand Down
2 changes: 1 addition & 1 deletion modules/adapters/slurm-collector/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ async fn send_batch_to_api(client: &mut GrpcClient, batch: &[SacctData]) -> Resu
event_id: data.id.clone(),
event_name: "slurm_job".to_string(),
customer_id: Some(
metering_grpc::meteroid::metering::v1::event::CustomerId::ExternalCustomerId(data.account.clone())
metering_grpc::meteroid::metering::v1::event::CustomerId::ExternalCustomerAlias(data.account.clone())
),
timestamp: data.start_time.to_rfc3339(),
properties,
Expand Down
3 changes: 2 additions & 1 deletion modules/metering/proto/meters.proto
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ message RegisterMeterResponse {
}

message UnregisterMeterRequest {
ResourceIdentifier meter = 1;
string id = 1;
string tenant_id = 2;
}

message UnregisterMeterResponse {}
Expand Down
13 changes: 5 additions & 8 deletions modules/metering/proto/models.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@ syntax = "proto3";

package meteroid.metering.v1;

import "google/protobuf/timestamp.proto";

message Event {
string event_id = 1;
string event_name = 2;
oneof customer_id {
string meteroid_customer_id = 3;
string external_customer_id = 4;
string external_customer_alias = 4;
// TODO we can allow external_subscription_id as well if a resource is linked to a specific subscription (and if we don't have the customer_id)
}
// rfc3339 string
Expand All @@ -25,7 +23,7 @@ message Metadata {
}

message Meter {
string meter_slug = 1;
string id = 1; // id by default is local_id. We could at some point support metric alias for external implementations of metering, & use this field
string event_name = 3;

// TODO used ? (can we store metadata in clickhouse ? if yes do we want some metadata field instead)
Expand Down Expand Up @@ -56,8 +54,7 @@ message Meter {
// segmentation matrix (or that's just the group_by_dimensions ?)
}

message ResourceIdentifier {
string meteroid_id = 1;
// the identifier of the resource within the customer's system (associated with the Meteroid's resource)
string external_id = 2;
message CustomerIdentifier {
string local_id = 1;
optional string alias = 2;
}
4 changes: 1 addition & 3 deletions modules/metering/proto/queries.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@ syntax = "proto3";

package meteroid.metering.v1;

import "common/v1/date.proto";
import "common/v1/decimal.proto";
import "google/protobuf/timestamp.proto";
import "google/protobuf/wrappers.proto";
import "models.proto";

message Filter {
Expand All @@ -18,7 +16,7 @@ message QueryMeterRequest {
string tenant_id = 1;
string meter_slug = 2;
Meter.AggregationType meter_aggregation_type = 3;
repeated ResourceIdentifier customers = 4;
repeated CustomerIdentifier customers = 4;
google.protobuf.Timestamp from = 5;
google.protobuf.Timestamp to = 6;
// If null, default to WindowSize.AGGREGATE_ALL
Expand Down
3 changes: 2 additions & 1 deletion modules/metering/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ use std::sync::Arc;

// type IdentifierCache = Lazy<RwLock<SizedCache<(String, String), String>>>;
// pub static CUSTOMER_ID_CACHE: IdentifierCache = Lazy::new(|| RwLock::new(SizedCache::with_size(10000)));
type IdentifierCache = Lazy<Arc<Cache<(String, String), String>>>;
type TenantAliasTuple = (String, String);
type IdentifierCache = Lazy<Arc<Cache<TenantAliasTuple, String>>>;
pub static CUSTOMER_ID_CACHE: IdentifierCache = Lazy::new(|| Arc::new(Cache::new(10000)));

// TODO add an optional redis on top
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ fn create_meter_view_to_select_sql(meter: Meter) -> String {
}

pub fn create_meter_view(meter: Meter, populate: bool) -> String {
let view_name = get_meter_view_name(&meter.namespace, &meter.meter_slug);
let view_name = get_meter_view_name(&meter.namespace, &meter.id);
let mut columns = vec![
Column {
name: "customer_id".to_string(),
Expand Down Expand Up @@ -159,7 +159,7 @@ mod tests {
fn test_create_meter_view_count() {
let meter = Meter {
namespace: "test_namespace".to_string(),
meter_slug: "test_slug".to_string(),
id: "test_slug".to_string(),
event_name: "test_event".to_string(),
aggregation: MeterAggregation::Count,
group_by: vec!["test_group1".to_string(), "test_group2".to_string()],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ pub fn query_meter_view_sql(params: QueryMeterParams) -> Result<String, String>
let subjects_condition = params
.customers
.iter()
.map(|customer| format!("customer_id = '{}'", customer.id)) // TODO config for id/ext/custom field
.map(|customer| format!("customer_id = '{}'", customer.local_id)) // TODO config for id/ext/custom field
.collect::<Vec<_>>()
.join(" OR ");
where_clauses.push(format!("({})", subjects_condition));
Expand Down
6 changes: 3 additions & 3 deletions modules/metering/src/domain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,16 @@ pub enum WindowSize {
pub struct Meter {
pub aggregation: MeterAggregation,
pub namespace: String,
pub meter_slug: String,
pub id: String,
pub event_name: String,
pub value_property: Option<String>,
pub group_by: Vec<String>,
}

#[derive(Debug, Clone)]
pub struct Customer {
pub id: String,
pub external_id: String,
pub local_id: String,
pub alias: Option<String>,
// pub custom_fields: HashMap<String, String>,
}

Expand Down
57 changes: 29 additions & 28 deletions modules/metering/src/ingest/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::ingest::domain::{FailedEvent, ProcessedEvent};
use crate::ingest::sinks::Sink;
use common_grpc::middleware::server::auth::RequestExt;
use meteroid_grpc::meteroid::internal::v1::internal_service_client::InternalServiceClient;
use meteroid_grpc::meteroid::internal::v1::ResolveCustomerExternalIdsRequest;
use meteroid_grpc::meteroid::internal::v1::ResolveCustomerAliasesRequest;

#[derive(Clone)]
pub struct EventsService {
Expand Down Expand Up @@ -62,19 +62,18 @@ impl EventsServiceGrpc for EventsService {
// - get the customer_id from external_customer_id as necessary
let mut resolved = vec![];
let mut unresolved = vec![];
let mut unresolved_ids = vec![];
let mut unresolved_aliases = vec![];

let now = chrono::Utc::now();

for event in events {
match validate_event(&event, &now, allow_backfilling) {
Ok((id, ts)) => match id {
CustomerId::MeteroidCustomerId(meteroid_id) => resolved.push(
to_processed_event(event, meteroid_id, tenant_id.clone(), ts),
),
CustomerId::ExternalCustomerId(external_id) => {
let from_cache =
CUSTOMER_ID_CACHE.get(&(tenant_id.clone(), external_id.clone()));
CustomerId::MeteroidCustomerId(local_id) => {
resolved.push(to_processed_event(event, local_id, tenant_id.clone(), ts))
}
CustomerId::ExternalCustomerAlias(alias) => {
let from_cache = CUSTOMER_ID_CACHE.get(&(tenant_id.clone(), alias.clone()));
match from_cache {
Some(meteroid_id) => resolved.push(to_processed_event(
event,
Expand All @@ -83,8 +82,8 @@ impl EventsServiceGrpc for EventsService {
ts,
)),
None => {
unresolved_ids.push(external_id.clone());
unresolved.push((event, external_id.clone(), ts))
unresolved_aliases.push(alias.clone());
unresolved.push((event, alias.clone(), ts))
}
}
}
Expand All @@ -98,15 +97,15 @@ impl EventsServiceGrpc for EventsService {
};
}

if !unresolved_ids.is_empty() {
if !unresolved_aliases.is_empty() {
// we call the api to resolve customers by external id & tenant

let mut client = self.internal_client.clone();

let res = client
.resolve_customer_external_ids(ResolveCustomerExternalIdsRequest {
.resolve_customer_aliases(ResolveCustomerAliasesRequest {
tenant_id: tenant_id.clone(),
external_ids: unresolved_ids,
aliases: unresolved_aliases,
})
.await
.map_err(|e| {
Expand All @@ -117,31 +116,33 @@ impl EventsServiceGrpc for EventsService {

let res = res.into_inner();

res.unresolved_ids.into_iter().for_each(|external_id| {
failed_events.push(FailedEvent {
event: unresolved
.iter()
.find(|(_, id, _)| id == &external_id)
.unwrap()
.0
.clone(),
reason: "Unable to resolve external id".to_string(),
})
});
res.unresolved_aliases
.into_iter()
.for_each(|unresolved_alias| {
failed_events.push(FailedEvent {
event: unresolved
.iter()
.find(|(_, alias, _)| alias == &unresolved_alias)
.unwrap()
.0
.clone(),
reason: "Unable to resolve unresolved alias".to_string(),
})
});

res.customers.into_iter().for_each(|customer| {
CUSTOMER_ID_CACHE.insert(
(tenant_id.clone(), customer.external_id.clone()),
customer.meteroid_id.clone(),
(tenant_id.clone(), customer.alias.clone()),
customer.local_id.clone(),
);
let (event, _, ts) = unresolved
.iter()
.find(|(_, id, _)| id == &customer.external_id)
.find(|(_, alias, _)| alias == &customer.alias)
.unwrap();

resolved.push(to_processed_event(
event.clone(),
customer.meteroid_id,
customer.local_id,
tenant_id.clone(),
*ts,
))
Expand Down
2 changes: 1 addition & 1 deletion modules/metering/src/meters/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl MetersServiceGrpc for MetersService {
let meter = Meter {
aggregation: meter_aggregation,
namespace: req.tenant_id,
meter_slug: meter.meter_slug,
id: meter.id,
event_name: meter.event_name,
value_property: meter.aggregation_key,
group_by: meter.dimensions,
Expand Down
4 changes: 2 additions & 2 deletions modules/metering/src/query/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ impl UsageQueryServiceGrpc for UsageQueryService {
.customers
.iter()
.map(|c| Customer {
id: c.meteroid_id.clone(),
external_id: c.external_id.clone(),
alias: c.alias.clone(),
local_id: c.local_id.clone(),
})
.collect(),
group_by: req.group_by_properties,
Expand Down
2 changes: 2 additions & 0 deletions modules/meteroid/crates/diesel-models/src/add_ons.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub struct AddOnRow {
pub tenant_id: Uuid,
pub created_at: NaiveDateTime,
pub updated_at: NaiveDateTime,
pub local_id: String,
}

#[derive(Debug, Default, Insertable)]
Expand All @@ -22,6 +23,7 @@ pub struct AddOnRowNew {
pub name: String,
pub fee: serde_json::Value,
pub tenant_id: Uuid,
pub local_id: String,
}

#[derive(AsChangeset)]
Expand Down
5 changes: 5 additions & 0 deletions modules/meteroid/crates/diesel-models/src/billable_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ pub struct BillableMetricRow {
pub archived_at: Option<NaiveDateTime>,
pub tenant_id: Uuid,
pub product_family_id: Uuid,
pub product_id: Option<Uuid>,
pub local_id: String,
}

#[derive(Debug, Clone, Insertable)]
Expand All @@ -44,13 +46,16 @@ pub struct BillableMetricRowNew {
pub created_by: Uuid,
pub tenant_id: Uuid,
pub product_family_id: Uuid,
pub product_id: Option<Uuid>,
pub local_id: String,
}

#[derive(Debug, Identifiable, Queryable, Selectable)]
#[diesel(table_name = crate::schema::billable_metric)]
#[diesel(check_for_backend(diesel::pg::Pg))]
pub struct BillableMetricMetaRow {
pub id: Uuid,
pub local_id: String,
pub name: String,
pub code: String,
pub aggregation_type: BillingMetricAggregateEnum,
Expand Down
Loading

0 comments on commit ac8ef3c

Please sign in to comment.