-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
refactor: use dbconfig package to simplify the config generation
- Loading branch information
Showing
16 changed files
with
377 additions
and
399 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,148 @@ | ||
// Copyright 2023 Greptime Team | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package internal | ||
|
||
import ( | ||
"fmt" | ||
"os" | ||
"strconv" | ||
"strings" | ||
|
||
"k8s.io/klog/v2" | ||
|
||
"github.com/GreptimeTeam/greptimedb-operator/apis/v1alpha1" | ||
"github.com/GreptimeTeam/greptimedb-operator/pkg/dbconfig" | ||
) | ||
|
||
type Options struct { | ||
ConfigPath string | ||
InitConfigPath string | ||
Namespace string | ||
ComponentKind string | ||
|
||
// For generating config of datanode. | ||
DatanodeRPCPort int32 | ||
DatanodeServiceName string | ||
} | ||
|
||
type ConfigGenerator struct { | ||
*Options | ||
|
||
hostname func() (name string, err error) | ||
} | ||
|
||
func NewConfigGenerator(opts *Options, hostname func() (name string, err error)) *ConfigGenerator { | ||
return &ConfigGenerator{ | ||
Options: opts, | ||
hostname: hostname, | ||
} | ||
} | ||
|
||
// Generate generates the final config of datanode. | ||
func (c *ConfigGenerator) Generate() error { | ||
initConfig, err := os.ReadFile(c.InitConfigPath) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
var configData []byte | ||
switch c.ComponentKind { | ||
case string(v1alpha1.DatanodeComponentKind): | ||
configData, err = c.generateDatanodeConfig(initConfig) | ||
if err != nil { | ||
return err | ||
} | ||
default: | ||
return fmt.Errorf("unknown component kind: %s", c.ComponentKind) | ||
} | ||
|
||
if err := os.WriteFile(c.ConfigPath, configData, 0644); err != nil { | ||
return err | ||
} | ||
|
||
klog.Infof("Generate config successfully, config path: %s", c.ConfigPath) | ||
klog.Infof("The config content is: \n%s", string(configData)) | ||
|
||
return nil | ||
} | ||
|
||
func (c *ConfigGenerator) generateDatanodeConfig(initConfig []byte) ([]byte, error) { | ||
cfg, err := dbconfig.FromRawData(initConfig, v1alpha1.DatanodeComponentKind) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
datanodeCfg, ok := cfg.(*dbconfig.DatanodeConfig) | ||
if !ok { | ||
return nil, fmt.Errorf("cfg is not datanode config") | ||
} | ||
|
||
nodeID, err := c.allocateDatanodeID() | ||
if err != nil { | ||
klog.Fatalf("Allocate node id failed: %v", err) | ||
} | ||
datanodeCfg.NodeID = &nodeID | ||
|
||
podIP := os.Getenv("POD_IP") | ||
if len(podIP) == 0 { | ||
return nil, fmt.Errorf("empty pod ip") | ||
} | ||
datanodeCfg.RPCAddr = fmt.Sprintf("%s:%d", podIP, c.DatanodeRPCPort) | ||
|
||
podName := os.Getenv("POD_NAME") | ||
if len(podName) == 0 { | ||
return nil, fmt.Errorf("empty pod name") | ||
} | ||
|
||
datanodeCfg.RPCHostName = fmt.Sprintf("%s.%s.%s:%d", podName, | ||
c.DatanodeServiceName, c.Namespace, c.DatanodeRPCPort) | ||
|
||
configData, err := dbconfig.Marshal(cfg) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
return configData, nil | ||
} | ||
|
||
// TODO(zyy17): the algorithm of allocating datanode id will be changed in the future. | ||
// We use the very easy way to allocate datanode-id: use the pod index of datanode that created by statefulset. | ||
// If the hostname of datanode is 'basic-datanode-1', then its node-id will be '1'. | ||
func (c *ConfigGenerator) allocateDatanodeID() (uint64, error) { | ||
name, err := c.hostname() | ||
if err != nil { | ||
return 0, err | ||
} | ||
|
||
if len(name) == 0 { | ||
return 0, fmt.Errorf("the hostname is empty") | ||
} | ||
|
||
token := strings.Split(name, "-") | ||
if len(token) == 0 { | ||
return 0, fmt.Errorf("invalid hostname format '%s'", name) | ||
} | ||
|
||
// For the pods of statefulset, the last token of hostname is the pod index. | ||
podIndex := token[len(token)-1] | ||
|
||
// Must be the valid integer type. | ||
nodeID, err := strconv.ParseUint(podIndex, 10, 64) | ||
if err != nil { | ||
return 0, fmt.Errorf("invalid hostname format '%s'", name) | ||
} | ||
|
||
return nodeID, nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,90 @@ | ||
// Copyright 2023 Greptime Team | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package internal | ||
|
||
import ( | ||
"fmt" | ||
"log" | ||
"os" | ||
"reflect" | ||
"testing" | ||
|
||
"github.com/GreptimeTeam/greptimedb-operator/apis/v1alpha1" | ||
"github.com/GreptimeTeam/greptimedb-operator/pkg/dbconfig" | ||
) | ||
|
||
var ( | ||
testPodIndex uint64 = 1 | ||
testRPCPort int32 = 4001 | ||
|
||
testPodIP = "192.168.0.1" | ||
testClusterService = "testcluster" | ||
testClusterNamespace = "greptimedb" | ||
testPodName = fmt.Sprintf("%s-datanode-%d", testClusterService, testPodIndex) | ||
) | ||
|
||
func TestConfigGenerator(t *testing.T) { | ||
|
||
file, err := os.CreateTemp("", "config-*.toml") | ||
if err != nil { | ||
log.Fatal(err) | ||
} | ||
defer file.Close() | ||
|
||
opts := &Options{ | ||
ConfigPath: file.Name(), | ||
InitConfigPath: "testdata/config.toml", | ||
Namespace: testClusterNamespace, | ||
ComponentKind: string(v1alpha1.DatanodeComponentKind), | ||
DatanodeRPCPort: testRPCPort, | ||
DatanodeServiceName: testClusterService, | ||
} | ||
|
||
t.Setenv("POD_IP", testPodIP) | ||
t.Setenv("POD_NAME", testPodName) | ||
|
||
cg := NewConfigGenerator(opts, hostname) | ||
if err = cg.Generate(); err != nil { | ||
t.Fatal(err) | ||
} | ||
|
||
cfg, err := dbconfig.FromFile(opts.ConfigPath, v1alpha1.DatanodeComponentKind) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
|
||
datanodeCfg, ok := cfg.(*dbconfig.DatanodeConfig) | ||
if !ok { | ||
t.Fatal(fmt.Errorf("invalid config type")) | ||
} | ||
|
||
if !reflect.DeepEqual(testPodIndex, *datanodeCfg.NodeID) { | ||
t.Fatalf("nodeID is not equal, want: '%d', got: '%d'", testPodIndex, datanodeCfg.NodeID) | ||
} | ||
|
||
wantRPCAddr := fmt.Sprintf("%s:%d", testPodIP, testRPCPort) | ||
if !reflect.DeepEqual(wantRPCAddr, datanodeCfg.RPCAddr) { | ||
t.Fatalf("RPCAddr is not equal, want: '%s', got: '%s'", wantRPCAddr, datanodeCfg.RPCAddr) | ||
} | ||
|
||
wantRPCHostname := fmt.Sprintf("%s.%s.%s:%d", testPodName, testClusterService, testClusterNamespace, testRPCPort) | ||
if !reflect.DeepEqual(wantRPCHostname, datanodeCfg.RPCHostName) { | ||
t.Fatalf("RPCHostName is not equal, want: '%s', got: '%s'", wantRPCHostname, datanodeCfg.RPCHostName) | ||
} | ||
} | ||
|
||
func hostname() (name string, err error) { | ||
return testPodName, nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
mode = 'distributed' | ||
enable_memory_catalog = false | ||
node_id = 42 | ||
rpc_addr = '127.0.0.1:3001' | ||
rpc_hostname = '127.0.0.1' | ||
rpc_runtime_size = 8 | ||
|
||
[meta_client_options] | ||
metasrv_addrs = ['127.0.0.1:3002'] | ||
timeout_millis = 3000 | ||
connect_timeout_millis = 5000 | ||
tcp_nodelay = true | ||
|
||
[wal] | ||
file_size = '1GB' | ||
purge_threshold = '50GB' | ||
purge_interval = '10m' | ||
read_batch_size = 128 | ||
sync_write = false | ||
|
||
[storage] | ||
type = 'File' | ||
data_home = '/tmp/greptimedb/' | ||
|
||
[storage.manifest] | ||
checkpoint_margin = 10 | ||
gc_duration = '30s' | ||
checkpoint_on_startup = false | ||
|
||
[storage.flush] | ||
max_flush_tasks = 8 | ||
region_write_buffer_size = '32MB' | ||
picker_schedule_interval = '5m' | ||
auto_flush_interval = '1h' | ||
global_write_buffer_size = '1GB' | ||
|
||
[procedure] | ||
max_retry_times = 3 | ||
retry_delay = '500ms' | ||
|
||
[logging] | ||
dir = '/tmp/greptimedb/logs' | ||
level = 'info' |
Oops, something went wrong.