Skip to content

Commit

Permalink
refactor: set default values back to the original resource
Browse files Browse the repository at this point in the history
  • Loading branch information
zyy17 committed Aug 28, 2024
1 parent 5c3668f commit dea3334
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 114 deletions.
2 changes: 1 addition & 1 deletion apis/v1alpha1/defaulting.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ var (
// The default replicas for frontend/meta/datanode.
defaultFrontendReplicas int32 = 1
defaultMetaReplicas int32 = 1
defaultDatanodeReplicas int32 = 3
defaultDatanodeReplicas int32 = 1
defaultFlownodeReplicas int32 = 1

// The default storage settings for datanode.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ spec:
datanode:
httpPort: 4000
rpcPort: 4001
replicas: 3
replicas: 1
storage:
dataHome: /data/greptimedb
mountPath: /data/greptimedb
Expand Down
26 changes: 17 additions & 9 deletions controllers/greptimedbcluster/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,10 @@ func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error {
// +kubebuilder:rbac:groups=apiextensions.k8s.io,resources=customresourcedefinitions,verbs=get;list;watch

// Reconcile is reconciliation loop for GreptimeDBCluster.
func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, err error) {
func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
klog.V(2).Infof("Reconciling GreptimeDBCluster: %s", req.NamespacedName)

var err error
cluster := new(v1alpha1.GreptimeDBCluster)
if err := r.Get(ctx, req.NamespacedName, cluster); err != nil {
if k8serrors.IsNotFound(err) {
Expand All @@ -129,24 +130,31 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ct

if err = r.addFinalizer(ctx, cluster); err != nil {
r.Recorder.Event(cluster, corev1.EventTypeWarning, "AddFinalizerFailed", fmt.Sprintf("Add finalizer failed: %v", err))
return
return ctrl.Result{}, err
}

if err = r.validate(ctx, cluster); err != nil {
r.Recorder.Event(cluster, corev1.EventTypeWarning, "InvalidCluster", fmt.Sprintf("Invalid cluster: %v", err))
return
}

if err = cluster.SetDefaults(); err != nil {
r.Recorder.Event(cluster, corev1.EventTypeWarning, "SetDefaultValuesFailed", fmt.Sprintf("Set default values failed: %v", err))
return
return ctrl.Result{}, err
}

// Means the cluster is just created.
if len(cluster.Status.ClusterPhase) == 0 {
klog.Infof("Start to create the cluster '%s/%s'", cluster.Namespace, cluster.Name)

if err = cluster.SetDefaults(); err != nil {
r.Recorder.Event(cluster, corev1.EventTypeWarning, "SetDefaultValuesFailed", fmt.Sprintf("Set default values failed: %v", err))
return ctrl.Result{}, err
}

// Update the default values to the cluster spec.
if err = r.Update(ctx, cluster); err != nil {
r.Recorder.Event(cluster, corev1.EventTypeWarning, "UpdateClusterFailed", fmt.Sprintf("Update cluster failed: %v", err))
return ctrl.Result{}, err
}

if err = r.updateClusterStatus(ctx, cluster, v1alpha1.PhaseStarting); err != nil {
return
return ctrl.Result{}, err
}
}

Expand Down
26 changes: 17 additions & 9 deletions controllers/greptimedbstandalone/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,10 @@ func Setup(mgr ctrl.Manager, _ *options.Options) error {
// +kubebuilder:rbac:groups=monitoring.coreos.com,resources=podmonitors,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=apiextensions.k8s.io,resources=customresourcedefinitions,verbs=get;list;watch

func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, err error) {
func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
klog.V(2).Infof("Reconciling GreptimeDBStandalone: %s", req.NamespacedName)

var err error
standalone := new(v1alpha1.GreptimeDBStandalone)
if err := r.Get(ctx, req.NamespacedName, standalone); err != nil {
if k8serrors.IsNotFound(err) {
Expand All @@ -110,24 +111,31 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ct

if err = r.addFinalizer(ctx, standalone); err != nil {
r.Recorder.Event(standalone, corev1.EventTypeWarning, "AddFinalizerFailed", fmt.Sprintf("Add finalizer failed: %v", err))
return
return ctrl.Result{}, err
}

if err = r.validate(ctx, standalone); err != nil {
r.Recorder.Event(standalone, corev1.EventTypeWarning, "InvalidStandalone", fmt.Sprintf("Invalid standalone: %v", err))
return
}

if err = standalone.SetDefaults(); err != nil {
r.Recorder.Event(standalone, corev1.EventTypeWarning, "SetDefaultValuesFailed", fmt.Sprintf("Set default values failed: %v", err))
return
return ctrl.Result{}, err
}

// Means the standalone is just created.
if len(standalone.Status.StandalonePhase) == 0 {
klog.Infof("Start to create the standalone '%s/%s'", standalone.Namespace, standalone.Name)

if err = standalone.SetDefaults(); err != nil {
r.Recorder.Event(standalone, corev1.EventTypeWarning, "SetDefaultValuesFailed", fmt.Sprintf("Set default values failed: %v", err))
return ctrl.Result{}, err
}

// Update the default values to the standalone spec.
if err = r.Update(ctx, standalone); err != nil {
r.Recorder.Event(standalone, corev1.EventTypeWarning, "UpdateStandaloneFailed", fmt.Sprintf("Update standalone failed: %v", err))
return ctrl.Result{}, err
}

if err = r.setStandaloneStatus(ctx, standalone, v1alpha1.PhaseStarting); err != nil {
return
return ctrl.Result{}, err
}
}

Expand Down
1 change: 0 additions & 1 deletion tests/e2e/testdata/resources/cluster/basic/cluster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,3 @@ spec:
rpcPort: 4001
mysqlPort: 4002
postgreSQLPort: 4003

148 changes: 55 additions & 93 deletions tests/e2e/testdata/sql/cluster/flow_basic.sql
Original file line number Diff line number Diff line change
@@ -1,100 +1,62 @@
CREATE TABLE numbers_input_basic (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(number),
TIME INDEX(ts)
-- FIXME(zyy17): The test case will be replaced by the real sqlness test case.

CREATE TABLE `ngx_access_log` (
`client` STRING NULL,
`ua_platform` STRING NULL,
`referer` STRING NULL,
`method` STRING NULL,
`endpoint` STRING NULL,
`trace_id` STRING NULL FULLTEXT,
`protocol` STRING NULL,
`status` SMALLINT UNSIGNED NULL,
`size` DOUBLE NULL,
`agent` STRING NULL,
`access_time` TIMESTAMP(3) NOT NULL,
TIME INDEX (`access_time`)
)
WITH(
append_mode = 'true'
);

CREATE FLOW test_numbers_basic
SINK TO out_num_cnt_basic
AS
SELECT sum(number) FROM numbers_input_basic GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00');

-- TODO(discord9): confirm if it's necessary to flush flow here?
-- because flush_flow result is at most 1
select flush_flow('test_numbers_basic')<=1;

-- SQLNESS ARG restart=true
INSERT INTO numbers_input_basic
CREATE TABLE `ngx_statistics` (
`status` SMALLINT UNSIGNED NULL,
`total_logs` BIGINT NULL,
`min_size` DOUBLE NULL,
`max_size` DOUBLE NULL,
`avg_size` DOUBLE NULL,
`high_size_count` DOUBLE NULL,
`time_window` TIMESTAMP time index,
`update_at` TIMESTAMP NULL,
PRIMARY KEY (`status`));

CREATE FLOW ngx_aggregation
SINK TO ngx_statistics
AS
SELECT
status,
count(client) AS total_logs,
min(size) as min_size,
max(size) as max_size,
avg(size) as avg_size,
sum(case when `size` > 550::double then 1::double else 0::double end) as high_size_count,
date_bin(INTERVAL '1 minutes', access_time) as time_window,
FROM ngx_access_log
GROUP BY
status,
time_window;

INSERT INTO ngx_access_log
VALUES
(20, "2021-07-01 00:00:00.200"),
(22, "2021-07-01 00:00:00.600");

select flush_flow('test_numbers_basic')<=1;
("android", "Android", "referer", "GET", "/api/v1", "trace_id", "HTTP", 200, 1000, "agent", "2021-07-01 00:00:01.000"),
("ios", "iOS", "referer", "GET", "/api/v1", "trace_id", "HTTP", 200, 500, "agent", "2021-07-01 00:00:30.500"),
("android", "Android", "referer", "GET", "/api/v1", "trace_id", "HTTP", 200, 600, "agent", "2021-07-01 00:01:01.000"),
("ios", "iOS", "referer", "GET", "/api/v1", "trace_id", "HTTP", 404, 700, "agent", "2021-07-01 00:01:01.500");

SELECT col_0, window_start, window_end FROM out_num_cnt_basic;
SELECT * FROM ngx_statistics;

select flush_flow('test_numbers_basic')<=1;

INSERT INTO numbers_input_basic
INSERT INTO ngx_access_log
VALUES
(23,"2021-07-01 00:00:01.000"),
(24,"2021-07-01 00:00:01.500");

select flush_flow('test_numbers_basic')<=1;

SELECT col_0, window_start, window_end FROM out_num_cnt_basic;

DROP FLOW test_numbers_basic;
DROP TABLE numbers_input_basic;
DROP TABLE out_num_cnt_basic;

-- test interprete interval

CREATE TABLE numbers_input_basic (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(number),
TIME INDEX(ts)
);
create table out_num_cnt_basic (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP TIME INDEX);

CREATE FLOW filter_numbers_basic SINK TO out_num_cnt_basic AS SELECT INTERVAL '1 day 1 second', INTERVAL '1 month 1 day 1 second', INTERVAL '1 year 1 month' FROM numbers_input_basic where number > 10;

SHOW CREATE FLOW filter_numbers_basic;

drop flow filter_numbers_basic;

drop table out_num_cnt_basic;

drop table numbers_input_basic;

CREATE TABLE bytes_log (
byte INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, -- event time
TIME INDEX(ts)
);

CREATE TABLE approx_rate (
rate FLOAT,
time_window TIMESTAMP,
update_at TIMESTAMP,
TIME INDEX(time_window)
);

CREATE FLOW find_approx_rate
SINK TO approx_rate
AS
SELECT CAST((max(byte) - min(byte)) AS FLOAT)/30.0, date_bin(INTERVAL '30 second', ts) as time_window from bytes_log GROUP BY time_window;

INSERT INTO bytes_log VALUES
(101, '2025-01-01 00:00:01'),
(300, '2025-01-01 00:00:29');

SELECT flush_flow('find_approx_rate')<=1;

SELECT rate, time_window FROM approx_rate;

INSERT INTO bytes_log VALUES
(450, '2025-01-01 00:00:32'),
(500, '2025-01-01 00:00:37');

SELECT flush_flow('find_approx_rate')<=1;

SELECT rate, time_window FROM approx_rate;
("android", "Android", "referer", "GET", "/api/v1", "trace_id", "HTTP", 200, 500, "agent", "2021-07-01 00:01:01.000"),
("ios", "iOS", "referer", "GET", "/api/v1", "trace_id", "HTTP", 404, 800, "agent", "2021-07-01 00:01:01.500");

DROP TABLE bytes_log;
DROP FLOW find_approx_rate;
DROP TABLE approx_rate;
SELECT * FROM ngx_statistics;

0 comments on commit dea3334

Please sign in to comment.