Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: set default values back to the original resource #172

Merged
merged 1 commit into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apis/v1alpha1/defaulting.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ var (
// The default replicas for frontend/meta/datanode.
defaultFrontendReplicas int32 = 1
defaultMetaReplicas int32 = 1
defaultDatanodeReplicas int32 = 3
defaultDatanodeReplicas int32 = 1
defaultFlownodeReplicas int32 = 1

// The default storage settings for datanode.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ spec:
datanode:
httpPort: 4000
rpcPort: 4001
replicas: 3
replicas: 1
storage:
dataHome: /data/greptimedb
mountPath: /data/greptimedb
Expand Down
26 changes: 17 additions & 9 deletions controllers/greptimedbcluster/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,10 @@ func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error {
// +kubebuilder:rbac:groups=apiextensions.k8s.io,resources=customresourcedefinitions,verbs=get;list;watch

// Reconcile is reconciliation loop for GreptimeDBCluster.
func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, err error) {
func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
klog.V(2).Infof("Reconciling GreptimeDBCluster: %s", req.NamespacedName)

var err error
cluster := new(v1alpha1.GreptimeDBCluster)
if err := r.Get(ctx, req.NamespacedName, cluster); err != nil {
if k8serrors.IsNotFound(err) {
Expand All @@ -129,24 +130,31 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ct

if err = r.addFinalizer(ctx, cluster); err != nil {
r.Recorder.Event(cluster, corev1.EventTypeWarning, "AddFinalizerFailed", fmt.Sprintf("Add finalizer failed: %v", err))
return
return ctrl.Result{}, err
}

if err = r.validate(ctx, cluster); err != nil {
r.Recorder.Event(cluster, corev1.EventTypeWarning, "InvalidCluster", fmt.Sprintf("Invalid cluster: %v", err))
return
}

if err = cluster.SetDefaults(); err != nil {
r.Recorder.Event(cluster, corev1.EventTypeWarning, "SetDefaultValuesFailed", fmt.Sprintf("Set default values failed: %v", err))
return
return ctrl.Result{}, err
}

// Means the cluster is just created.
if len(cluster.Status.ClusterPhase) == 0 {
klog.Infof("Start to create the cluster '%s/%s'", cluster.Namespace, cluster.Name)

if err = cluster.SetDefaults(); err != nil {
r.Recorder.Event(cluster, corev1.EventTypeWarning, "SetDefaultValuesFailed", fmt.Sprintf("Set default values failed: %v", err))
return ctrl.Result{}, err
}

// Update the default values to the cluster spec.
if err = r.Update(ctx, cluster); err != nil {
r.Recorder.Event(cluster, corev1.EventTypeWarning, "UpdateClusterFailed", fmt.Sprintf("Update cluster failed: %v", err))
return ctrl.Result{}, err
}

if err = r.updateClusterStatus(ctx, cluster, v1alpha1.PhaseStarting); err != nil {
return
return ctrl.Result{}, err
}
}

Expand Down
26 changes: 17 additions & 9 deletions controllers/greptimedbstandalone/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,10 @@ func Setup(mgr ctrl.Manager, _ *options.Options) error {
// +kubebuilder:rbac:groups=monitoring.coreos.com,resources=podmonitors,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=apiextensions.k8s.io,resources=customresourcedefinitions,verbs=get;list;watch

func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, err error) {
func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
klog.V(2).Infof("Reconciling GreptimeDBStandalone: %s", req.NamespacedName)

var err error
standalone := new(v1alpha1.GreptimeDBStandalone)
if err := r.Get(ctx, req.NamespacedName, standalone); err != nil {
if k8serrors.IsNotFound(err) {
Expand All @@ -110,24 +111,31 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ct

if err = r.addFinalizer(ctx, standalone); err != nil {
r.Recorder.Event(standalone, corev1.EventTypeWarning, "AddFinalizerFailed", fmt.Sprintf("Add finalizer failed: %v", err))
return
return ctrl.Result{}, err
}

if err = r.validate(ctx, standalone); err != nil {
r.Recorder.Event(standalone, corev1.EventTypeWarning, "InvalidStandalone", fmt.Sprintf("Invalid standalone: %v", err))
return
}

if err = standalone.SetDefaults(); err != nil {
r.Recorder.Event(standalone, corev1.EventTypeWarning, "SetDefaultValuesFailed", fmt.Sprintf("Set default values failed: %v", err))
return
return ctrl.Result{}, err
}

// Means the standalone is just created.
if len(standalone.Status.StandalonePhase) == 0 {
klog.Infof("Start to create the standalone '%s/%s'", standalone.Namespace, standalone.Name)

if err = standalone.SetDefaults(); err != nil {
r.Recorder.Event(standalone, corev1.EventTypeWarning, "SetDefaultValuesFailed", fmt.Sprintf("Set default values failed: %v", err))
return ctrl.Result{}, err
}

// Update the default values to the standalone spec.
if err = r.Update(ctx, standalone); err != nil {
r.Recorder.Event(standalone, corev1.EventTypeWarning, "UpdateStandaloneFailed", fmt.Sprintf("Update standalone failed: %v", err))
return ctrl.Result{}, err
}

if err = r.setStandaloneStatus(ctx, standalone, v1alpha1.PhaseStarting); err != nil {
return
return ctrl.Result{}, err
}
}

Expand Down
1 change: 0 additions & 1 deletion tests/e2e/testdata/resources/cluster/basic/cluster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,3 @@ spec:
rpcPort: 4001
mysqlPort: 4002
postgreSQLPort: 4003

148 changes: 55 additions & 93 deletions tests/e2e/testdata/sql/cluster/flow_basic.sql
Original file line number Diff line number Diff line change
@@ -1,100 +1,62 @@
CREATE TABLE numbers_input_basic (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(number),
TIME INDEX(ts)
-- FIXME(zyy17): The test case will be replaced by the real sqlness test case.

CREATE TABLE `ngx_access_log` (
`client` STRING NULL,
`ua_platform` STRING NULL,
`referer` STRING NULL,
`method` STRING NULL,
`endpoint` STRING NULL,
`trace_id` STRING NULL FULLTEXT,
`protocol` STRING NULL,
`status` SMALLINT UNSIGNED NULL,
`size` DOUBLE NULL,
`agent` STRING NULL,
`access_time` TIMESTAMP(3) NOT NULL,
TIME INDEX (`access_time`)
)
WITH(
append_mode = 'true'
);

CREATE FLOW test_numbers_basic
SINK TO out_num_cnt_basic
AS
SELECT sum(number) FROM numbers_input_basic GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00');

-- TODO(discord9): confirm if it's necessary to flush flow here?
-- because flush_flow result is at most 1
select flush_flow('test_numbers_basic')<=1;

-- SQLNESS ARG restart=true
INSERT INTO numbers_input_basic
CREATE TABLE `ngx_statistics` (
`status` SMALLINT UNSIGNED NULL,
`total_logs` BIGINT NULL,
`min_size` DOUBLE NULL,
`max_size` DOUBLE NULL,
`avg_size` DOUBLE NULL,
`high_size_count` DOUBLE NULL,
`time_window` TIMESTAMP time index,
`update_at` TIMESTAMP NULL,
PRIMARY KEY (`status`));

CREATE FLOW ngx_aggregation
SINK TO ngx_statistics
AS
SELECT
status,
count(client) AS total_logs,
min(size) as min_size,
max(size) as max_size,
avg(size) as avg_size,
sum(case when `size` > 550::double then 1::double else 0::double end) as high_size_count,
date_bin(INTERVAL '1 minutes', access_time) as time_window,
FROM ngx_access_log
GROUP BY
status,
time_window;

INSERT INTO ngx_access_log
VALUES
(20, "2021-07-01 00:00:00.200"),
(22, "2021-07-01 00:00:00.600");

select flush_flow('test_numbers_basic')<=1;
("android", "Android", "referer", "GET", "/api/v1", "trace_id", "HTTP", 200, 1000, "agent", "2021-07-01 00:00:01.000"),
("ios", "iOS", "referer", "GET", "/api/v1", "trace_id", "HTTP", 200, 500, "agent", "2021-07-01 00:00:30.500"),
("android", "Android", "referer", "GET", "/api/v1", "trace_id", "HTTP", 200, 600, "agent", "2021-07-01 00:01:01.000"),
("ios", "iOS", "referer", "GET", "/api/v1", "trace_id", "HTTP", 404, 700, "agent", "2021-07-01 00:01:01.500");

SELECT col_0, window_start, window_end FROM out_num_cnt_basic;
SELECT * FROM ngx_statistics;

select flush_flow('test_numbers_basic')<=1;

INSERT INTO numbers_input_basic
INSERT INTO ngx_access_log
VALUES
(23,"2021-07-01 00:00:01.000"),
(24,"2021-07-01 00:00:01.500");

select flush_flow('test_numbers_basic')<=1;

SELECT col_0, window_start, window_end FROM out_num_cnt_basic;

DROP FLOW test_numbers_basic;
DROP TABLE numbers_input_basic;
DROP TABLE out_num_cnt_basic;

-- test interprete interval

CREATE TABLE numbers_input_basic (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(number),
TIME INDEX(ts)
);
create table out_num_cnt_basic (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP TIME INDEX);

CREATE FLOW filter_numbers_basic SINK TO out_num_cnt_basic AS SELECT INTERVAL '1 day 1 second', INTERVAL '1 month 1 day 1 second', INTERVAL '1 year 1 month' FROM numbers_input_basic where number > 10;

SHOW CREATE FLOW filter_numbers_basic;

drop flow filter_numbers_basic;

drop table out_num_cnt_basic;

drop table numbers_input_basic;

CREATE TABLE bytes_log (
byte INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, -- event time
TIME INDEX(ts)
);

CREATE TABLE approx_rate (
rate FLOAT,
time_window TIMESTAMP,
update_at TIMESTAMP,
TIME INDEX(time_window)
);

CREATE FLOW find_approx_rate
SINK TO approx_rate
AS
SELECT CAST((max(byte) - min(byte)) AS FLOAT)/30.0, date_bin(INTERVAL '30 second', ts) as time_window from bytes_log GROUP BY time_window;

INSERT INTO bytes_log VALUES
(101, '2025-01-01 00:00:01'),
(300, '2025-01-01 00:00:29');

SELECT flush_flow('find_approx_rate')<=1;

SELECT rate, time_window FROM approx_rate;

INSERT INTO bytes_log VALUES
(450, '2025-01-01 00:00:32'),
(500, '2025-01-01 00:00:37');

SELECT flush_flow('find_approx_rate')<=1;

SELECT rate, time_window FROM approx_rate;
("android", "Android", "referer", "GET", "/api/v1", "trace_id", "HTTP", 200, 500, "agent", "2021-07-01 00:01:01.000"),
("ios", "iOS", "referer", "GET", "/api/v1", "trace_id", "HTTP", 404, 800, "agent", "2021-07-01 00:01:01.500");

DROP TABLE bytes_log;
DROP FLOW find_approx_rate;
DROP TABLE approx_rate;
SELECT * FROM ngx_statistics;
Loading