diff --git a/pkg/imagedistributor/weed_mount.go b/pkg/imagedistributor/weed_mount.go new file mode 100644 index 00000000000..995c503aa6a --- /dev/null +++ b/pkg/imagedistributor/weed_mount.go @@ -0,0 +1,92 @@ +package imagedistributor + +import ( + "context" + "fmt" + "github.com/sealerio/sealer/pkg/define/options" + "github.com/sealerio/sealer/pkg/imageengine" + v1 "github.com/sealerio/sealer/types/api/v1" + "github.com/sealerio/sealer/utils/os/fs" + "github.com/sealerio/sealer/utils/weed" + "path/filepath" + "strings" +) + +type weedMounter struct { + imageEngine imageengine.Interface + weedClient weed.Deployer +} + +func (w *weedMounter) Mount(imageName string, platform v1.Platform, dest string) (string, string, string, error) { + mountDir := filepath.Join(dest, + strings.ReplaceAll(imageName, "/", "_"), + strings.Join([]string{platform.OS, platform.Architecture, platform.Variant}, "_")) + + imageID, err := w.imageEngine.Pull(&options.PullOptions{ + Quiet: false, + PullPolicy: "missing", + Image: imageName, + Platform: platform.ToString(), + }) + if err != nil { + return "", "", "", err + } + + if err := fs.FS.MkdirAll(filepath.Dir(mountDir)); err != nil { + return "", "", "", err + } + + id, err := w.imageEngine.CreateWorkingContainer(&options.BuildRootfsOptions{ + DestDir: mountDir, + ImageNameOrID: imageID, + }) + + if err != nil { + return "", "", "", err + } + + // Upload the mounted files to the WeedFS cluster + err = w.weedClient.UploadFile(context.Background(), mountDir) + if err != nil { + return "", "", "", err + } + + return mountDir, id, imageID, nil +} + +func (w *weedMounter) Umount(dir, containerID string) error { + // Download the files from WeedFS cluster + err := w.weedClient.DownloadFile(context.Background(), dir, dir) + if err != nil { + return err + } + + // Umount the image and remove the working container + err = w.imageEngine.RemoveContainer(&options.RemoveContainerOptions{ + ContainerNamesOrIDs: []string{containerID}, + }) + if err != nil { + return err + } + + // Remove the mounted files from the WeedFS cluster + err = w.weedClient.RemoveFile(context.Background(), dir) + if err != nil { + return err + } + + // Remove the local mount directory + if err := fs.FS.RemoveAll(dir); err != nil { + return fmt.Errorf("failed to remove mount dir %s: %v", dir, err) + } + + return nil +} + +func NewWeedMounter(imageEngine imageengine.Interface, config *weed.Config) Mounter { + deployer := weed.NewDeployer(config) + return &weedMounter{ + imageEngine: imageEngine, + weedClient: deployer, + } +} diff --git a/utils/weed/check.go b/utils/weed/check.go new file mode 100644 index 00000000000..dc132ac33e8 --- /dev/null +++ b/utils/weed/check.go @@ -0,0 +1,51 @@ +// Copyright © 2021 Alibaba Group Holding Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package weed + +import ( + "path" + "strconv" + + "github.com/sealerio/sealer/utils/exec" +) + +// checkPort checks if the port is available or can be used. +func checkPort(port int) bool { + // lsof -i:9333 + err := exec.Cmd("lsof", "-i:"+strconv.Itoa(port)) + return err == nil +} + +// checkDir checks if the dir is available or can be used. +//func checkDir(dir string) bool { +// // ls /tmp +// err := exec.Cmd("ls", dir) +// if err != nil { +// return false +// } +// return true +//} + +func checkBinFile(fileName string) bool { + binName := path.Base(fileName) + switch binName { + case "weed": + + case "etcd": + + default: + } + return false +} diff --git a/utils/weed/check_test.go b/utils/weed/check_test.go new file mode 100644 index 00000000000..6b1e5b9ff22 --- /dev/null +++ b/utils/weed/check_test.go @@ -0,0 +1,27 @@ +// Copyright © 2021 Alibaba Group Holding Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package weed + +//import "testing" +// +//func TestCheckPort(t *testing.T) { +// ok := checkPort(9333) +// t.Log(ok) +//} +// +//func TestCheckDir(t *testing.T) { +// ok := checkDir("/tmp") +// t.Log(ok) +//} diff --git a/utils/weed/etcd.go b/utils/weed/etcd.go new file mode 100644 index 00000000000..10354e13e45 --- /dev/null +++ b/utils/weed/etcd.go @@ -0,0 +1,305 @@ +// Copyright © 2021 Alibaba Group Holding Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package weed + +import ( + "bufio" + "bytes" + "context" + "errors" + "fmt" + "io/ioutil" + "net" + "os" + "os/exec" + "path" + "runtime" + "strconv" + "sync" + "syscall" +) + +const ( + EtcdGitHubOrg = "etcd-io" + EtcdGithubRepo = "etcd" + GOOSLinux = "linux" + EtcdArtifactType = "etcd" + EtcdVersion = "v3.4.24" + EtcdDestination = "/tmp/etcd.tar.gz" + EtcdBinName = "etcd" + EtcdctlBinName = "etcdctl" + WeedDestination = "/tmp/weed.tar.gz" + WeedBinName = "weed" +) + +func etcdDownloadURL() (string, error) { + var ext string + + switch runtime.GOOS { + case GOOSLinux: + ext = ".tar.gz" + default: + return "", fmt.Errorf("unsupported OS: %s", runtime.GOOS) + } + + // For the function stability, we use the specific version of etcd. + downloadURL := fmt.Sprintf("https://github.com/%s/%s/releases/download/%s/%s-%s-%s-%s%s", + EtcdGitHubOrg, EtcdGithubRepo, EtcdVersion, EtcdArtifactType, EtcdVersion, runtime.GOOS, runtime.GOARCH, ext) + + return downloadURL, nil +} + +type etcd struct { + dataDir string + logDir string + pidDir string + binDir string + clientURL string + peerURL string + peers []string + wg *sync.WaitGroup + configFile string +} + +// Etcd is the interface for etcd cluster. +type Etcd interface { + Exec +} + +type DeleteOptions struct { + RetainLogs bool +} + +type RunOptions struct { + Binary string + Name string + + pidDir string + logDir string + args []string +} + +func NewEtcd(config *Config) Etcd { + return &etcd{ + dataDir: config.DataDir, + logDir: config.LogDir, + pidDir: config.PidDir, + binDir: config.BinDir, + peers: config.MasterIP, + peerURL: config.CurrentIP + ":" + strconv.Itoa(config.PeerPort), + clientURL: config.CurrentIP + ":" + strconv.Itoa(config.ClientPort), + wg: new(sync.WaitGroup), + configFile: config.EtcdConfigPath, + } +} + +func (e *etcd) Name() string { + return "etcd" +} + +func (e *etcd) Start(ctx context.Context, binary string) error { + // Generate etcd config file. + err := e.GenerateConfig() + if err != nil { + return err + } + + option := &RunOptions{ + Binary: binary, + Name: e.Name(), + logDir: e.logDir, + pidDir: e.pidDir, + args: e.BuildArgs(ctx), + } + if err := runBinary(ctx, option, e.wg); err != nil { + return err + } + + return nil +} + +func (e *etcd) BuildArgs(ctx context.Context, params ...interface{}) []string { + return []string{ + "--config-file", e.configFile, + } +} + +// GenerateConfig creates etcd cluster config file. +func (e *etcd) GenerateConfig() error { + initialCluster := "" + index := 0 + for i, peer := range e.peers { + if peer == e.peerURL { + index = i + } + initialCluster += "node" + strconv.Itoa(i) + "=http://" + peer + "," + } + initialCluster = initialCluster[:len(initialCluster)-1] + name := "node" + strconv.Itoa(index) + configContent := fmt.Sprintf(`name: "%s" +data-dir: "%s" +initial-cluster-token: "my-etcd-token" +initial-cluster: "%s" +initial-advertise-peer-urls: "http://%s" +listen-peer-urls: "http://%s" +listen-client-urls: "http://%s" +advertise-client-urls: "http://%s" +log-file: "%s" +pid-file: "%s" +`, name, e.dataDir, initialCluster, e.peerURL, e.peerURL, e.clientURL, e.clientURL, e.logDir, e.pidDir) + + // write config file + err := ioutil.WriteFile(e.configFile, []byte(configContent), 0644) + if err != nil { + return err + } + + return nil +} + +func (e *etcd) IsRunning(ctx context.Context) bool { + _, port, err := net.SplitHostPort(e.clientURL) + if err != nil { + return false + } + err = exec.Command("lsof", "-i:"+port).Run() + return err == nil +} + +func runBinary(ctx context.Context, option *RunOptions, wg *sync.WaitGroup) error { + cmd := exec.CommandContext(ctx, option.Binary, option.args...) + + // output to binary. + logFile := path.Join(option.logDir, "log") + outputFile, err := os.Create(logFile) + if err != nil { + return err + } + + outputFileWriter := bufio.NewWriter(outputFile) + cmd.Stdout = outputFileWriter + cmd.Stderr = outputFileWriter + + if err := cmd.Start(); err != nil { + return err + } + + pid := strconv.Itoa(cmd.Process.Pid) + + pidFile := path.Join(option.pidDir, "pid") + f, err := os.Create(pidFile) + if err != nil { + return err + } + + _, err = f.Write([]byte(pid)) + if err != nil { + return err + } + + go func() { + defer wg.Done() + wg.Add(1) + if err := cmd.Wait(); err != nil { + // Caught signal kill and interrupt error then ignore. + var exit *exec.ExitError + if errors.As(err, &exit) { + if status, ok := exit.Sys().(syscall.WaitStatus); ok { + if status.Signaled() && + (status.Signal() == syscall.SIGKILL || status.Signal() == syscall.SIGINT) { + return + } + } + } + _ = outputFileWriter.Flush() + } + }() + + return nil +} + +func runBinaryWithJSONResponse(ctx context.Context, option *RunOptions, wg *sync.WaitGroup) ([]byte, error) { + cmd := exec.CommandContext(ctx, option.Binary, option.args...) + + var jsonOutput bytes.Buffer + cmd.Stdout = &jsonOutput + cmd.Stderr = os.Stderr + + if err := cmd.Start(); err != nil { + return nil, err + } + + //TODO if pid file == "", skip this step + pid := strconv.Itoa(cmd.Process.Pid) + + pidFile := path.Join(option.pidDir, "pid") + f, err := os.Create(pidFile) + if err != nil { + return nil, err + } + + _, err = f.Write([]byte(pid)) + if err != nil { + return nil, err + } + + go func() { + defer wg.Done() + wg.Add(1) + if err := cmd.Wait(); err != nil { + // Caught signal kill and interrupt error then ignore. + var exit *exec.ExitError + if errors.As(err, &exit) { + if status, ok := exit.Sys().(syscall.WaitStatus); ok { + if status.Signaled() && + (status.Signal() == syscall.SIGKILL || status.Signal() == syscall.SIGINT) { + return + } + } + } + } + }() + + jsonResponse := jsonOutput.Bytes() + return jsonResponse, nil +} + +func CreateDirIfNotExists(dir string) (err error) { + if err := os.MkdirAll(dir, 0755); err != nil && !os.IsExist(err) { + return err + } + return nil +} + +func IsFileExists(filepath string) (bool, error) { + info, err := os.Stat(filepath) + if os.IsNotExist(err) { + // file does not exist + return false, nil + } + + if err != nil { + // Other errors happened. + return false, err + } + + if info.IsDir() { + // It's a directory. + return false, fmt.Errorf("'%s' is directory, not file", filepath) + } + + // The file exists. + return true, nil +} diff --git a/utils/weed/etcd_client.go b/utils/weed/etcd_client.go new file mode 100644 index 00000000000..adba8921eb8 --- /dev/null +++ b/utils/weed/etcd_client.go @@ -0,0 +1,147 @@ +// Copyright © 2021 Alibaba Group Holding Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package weed + +import ( + "context" + "fmt" + "time" + + etcd3 "go.etcd.io/etcd/client/v3" +) + +// Client is the interface for etcd client. +// It provides the basic operations for etcd cluster. +// Like put, get, delete, register, unregister, get service. +type Client interface { + + // RegisterService register service to etcd cluster. + RegisterService(serviceName string, endpoints string) error + + // UnRegisterService unregister service from etcd cluster. + UnRegisterService(serviceName string, endpoints string) error + + // GetService get service from etcd cluster. + GetService(serviceName string) ([]string, error) + + // Put put key-value to etcd cluster. + Put(key, value string) error + + // Get get key-value from etcd cluster. + Get(key string) (string, error) + + // Delete delete key-value from etcd cluster. + Delete(key string) error +} + +type client struct { + peers []string + client *etcd3.Client + ctx context.Context + lease etcd3.Lease +} + +func (c *client) Put(key, value string) error { + _, err := c.client.Put(c.ctx, key, value) + if err != nil { + return err + } + return nil +} + +func (c *client) Get(key string) (string, error) { + resp, err := c.client.Get(c.ctx, key) + if err != nil { + return "", err + } + return string(resp.Kvs[0].Value), nil +} + +func (c *client) Delete(key string) error { + _, err := c.client.Delete(c.ctx, key) + if err != nil { + return err + } + return nil +} + +func (c *client) RegisterService(serviceName string, endpoint string) error { + c.lease = etcd3.NewLease(c.client) + grant, err := c.lease.Grant(c.ctx, int64(10*time.Second)) + if err != nil { + return err + } + key := fmt.Sprintf("/services/%s/%s", serviceName, endpoint) + _, err = c.client.Put(context.Background(), key, endpoint, etcd3.WithLease(grant.ID)) + keepAliceCh, err := c.client.KeepAlive(context.Background(), grant.ID) + if err != nil { + return err + } + go c.doKeepAlive(keepAliceCh) + return err +} + +func (c *client) UnRegisterService(serviceName string, endpoint string) error { + key := fmt.Sprintf("/services/%s/%s", serviceName, endpoint) + _, err := c.client.Delete(c.ctx, key) + if c.lease != nil { + _ = c.lease.Close() + } + return err +} + +// doKeepAlive continuously keeps alive the lease from ETCD. +func (c *client) doKeepAlive(keepAliceCh <-chan *etcd3.LeaseKeepAliveResponse) { + for { + select { + case <-c.client.Ctx().Done(): + return + + case res, ok := <-keepAliceCh: + if res != nil { + } + if !ok { + return + } + } + } +} + +func (c *client) GetService(serviceName string) ([]string, error) { + key := fmt.Sprintf("/services/%s", serviceName) + response, err := c.client.Get(c.ctx, key, etcd3.WithPrefix()) + if err != nil { + return nil, err + } + res := make([]string, 0) + for _, v := range response.Kvs { + res = append(res, string(v.Value)) + } + return res, nil +} + +func NewClient(peers []string) (Client, error) { + c, err := etcd3.New(etcd3.Config{ + Endpoints: peers, + }) + if err != nil { + return nil, err + } + return &client{ + peers: peers, + client: c, + ctx: context.Background(), + }, nil +} diff --git a/utils/weed/exec.go b/utils/weed/exec.go new file mode 100644 index 00000000000..4e910157d13 --- /dev/null +++ b/utils/weed/exec.go @@ -0,0 +1,34 @@ +// Copyright © 2021 Alibaba Group Holding Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package weed + +import "context" + +// Exec is the interface for command execution. +// It provides the basic operations for command execution. +// Like start, build args, is running, name. +type Exec interface { + // Start starts cluster component by executing binary. + Start(ctx context.Context, binary string) error + + // BuildArgs build up args for cluster component. + BuildArgs(ctx context.Context, params ...interface{}) []string + + // IsRunning returns the status of current cluster component. + IsRunning(ctx context.Context) bool + + // Name return the name of component. + Name() string +} diff --git a/utils/weed/interface.go b/utils/weed/interface.go new file mode 100644 index 00000000000..fa98b083276 --- /dev/null +++ b/utils/weed/interface.go @@ -0,0 +1,483 @@ +// Copyright © 2021 Alibaba Group Holding Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package weed + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "github.com/sirupsen/logrus" + "os" + "os/exec" + "path" + "runtime" +) + +// Config is the config of the weed cluster and etcd cluster. +type Config struct { + // MasterIP is the IP address of the master node. + MasterIP []string + // VolumeIP is the IP address of the volume node. + VolumeIP []string + // LogDir is the directory of the log file. + LogDir string + // DataDir is the directory of the data file. + DataDir string + // PidDir is the directory of the pid file. + PidDir string + // BinDir is the directory of the etcd binary file. + BinDir string + // EtcdConfigPath is the path of the etcd config file, we will generate it automatically. + EtcdConfigPath string + // CurrentIP is the IP address of the current node. + CurrentIP string + // PeerPort is the port of the peer. + PeerPort int + // ClientPort is the port of the client. + ClientPort int + // WeedMasterPort is the port of the weed master node. + WeedMasterPort int + // WeedVolumePort is the port of the weed volume node. + WeedVolumePort int + // NeedMoreLocalNode is the flag of whether you need more local weed node. + NeedMoreLocalNode bool + // WeedMasterDir is the directory of the weed master node. + WeedMasterDir string + // WeedVolumeDir is the directory of the weed volume node. + WeedVolumeDir string + // DefaultReplication is the default replication of the weed cluster. + DefaultReplication string + // WeedLogDir is the directory of the weed log file. + WeedLogDir string + // weedMasterPortList is the port list of the weed master node when need more local weed node. + weedMasterPortList []int + // weedVolumePortList is the port list of the weed volume node when need more local weed node. + weedVolumePortList []int + // weedMDirList is the directory list of the weed master node when need more local weed node. + weedMDirList []string + // weedVDirList is the directory list of the weed volume node when need more local weed node. + weedVDirList []string + // weedMasterList is the list of the weed master node when need more local weed node. + weedMasterList []string +} + +type Deployer interface { + // GetWeedMasterList returns the master list of the weed cluster. + GetWeedMasterList(ctx context.Context) ([]string, error) + + // CreateEtcdCluster creates the etcd cluster. + CreateEtcdCluster(ctx context.Context) error + + // DeleteEtcdCluster deletes the etcd cluster. + DeleteEtcdCluster(ctx context.Context) error + + // CreateWeedCluster creates the weed cluster. + CreateWeedCluster(ctx context.Context) error + + // DeleteWeedCluster deletes the weed cluster. + DeleteWeedCluster(ctx context.Context) error + + // UploadFile uploads the file to the weed cluster. + UploadFile(ctx context.Context, dir string) error + + // DownloadFile download the file from the weed cluster. + DownloadFile(ctx context.Context, dir string, outputDir string) error + + // RemoveFile removes the file from the weed cluster. + RemoveFile(ctx context.Context, dir string) error +} + +type deployer struct { + // config is the config of the weed cluster and etcd cluster. + config *Config + // etcd is the etcd cluster. + etcd Etcd + // client is the etcd client. + client Client + // weedMaster is the weed master node. + weedMaster Master + // weedVolume is the weed volume node. + weedVolume Volume +} + +func (d *deployer) GetWeedMasterList(ctx context.Context) ([]string, error) { + return d.client.GetService("weed-master") +} + +func (d *deployer) CreateEtcdCluster(ctx context.Context) error { + // prepare etcd + err := d.etcdPrepare() + if err != nil { + return err + } + // start etcd + err = d.etcd.Start(ctx, d.config.BinDir+"/etcd") + if err != nil { + return err + } + // check etcd health + if ok := d.etcd.IsRunning(ctx); !ok { + return fmt.Errorf("etcd is not running") + } + // new client + etcdClient, err := NewClient(d.config.MasterIP) + if err != nil { + return err + } + d.client = etcdClient + return nil +} + +func (d *deployer) downloadEtcd() error { + url, err := etcdDownloadURL() + if err != nil { + return err + } + //download + err = downloadFile(url, EtcdDestination) + if err != nil { + return err + } + etcdDirName := fmt.Sprintf("%s-%s-%s-%s", EtcdArtifactType, EtcdVersion, runtime.GOOS, runtime.GOARCH) + err = exec.Command("tar", "-xvf", EtcdDestination, "-C", extractFolder).Run() + if err != nil { + return err + } + err = os.Rename(path.Join(extractFolder, etcdDirName+"/etcd"), path.Join(d.config.BinDir, EtcdBinName)) + if err != nil { + return err + } + return os.Rename(path.Join(extractFolder, etcdDirName+"/etcdctl"), path.Join(d.config.BinDir, EtcdctlBinName)) +} + +func (d *deployer) downloadWeed() error { + url, err := weedDownloadURL() + if err != nil { + return err + } + err = downloadFile(url, WeedDestination) + if err != nil { + return err + } + return exec.Command("tar", "-xvf", WeedDestination, "-C", d.config.BinDir).Run() +} + +func (d *deployer) etcdPrepare() error { + var ( + etcdDirs = []string{d.config.DataDir, d.config.LogDir, d.config.PidDir, d.config.BinDir} + ) + for _, dir := range etcdDirs { + if err := CreateDirIfNotExists(dir); err != nil { + return err + } + } + // download etcd + return d.downloadEtcd() + // TODO scp etcd binary file to other nodes +} + +func (d *deployer) weedMasterPrepare() error { + var weedMasterDirs []string + if len(d.config.MasterIP) < 3 { + d.config.NeedMoreLocalNode = true + weedMasterPortList := make([]int, 0) + weedMDirList := make([]string, 0) + weedMasterList := make([]string, 0) + port := d.config.WeedMasterPort + for i := 0; i < 3; i++ { + for { + ok := checkPort(port) + if ok { + weedMasterPortList = append(weedMasterPortList, port) + weedMDirList = append(weedMDirList, d.config.WeedMasterDir+fmt.Sprintf("/%d", port)) + weedMasterList = append(weedMasterList, d.config.CurrentIP+fmt.Sprintf(":%d", port)) + port++ + break + } else { + port++ + } + } + } + weedMasterDirs = weedMDirList + d.config.weedMasterPortList = weedMasterPortList + d.config.weedMDirList = weedMDirList + d.config.weedMasterList = weedMasterList + } else { + weedMasterDirs = []string{d.config.WeedMasterDir} + } + for _, dir := range weedMasterDirs { + if err := CreateDirIfNotExists(dir); err != nil { + return err + } + } + // download weed binary file + if checkBinFile(d.config.BinDir + "/weed") { + return nil + } + return d.downloadWeed() +} + +func (d *deployer) weedVolumePrepare() error { + var weedVolumeDirs []string + if len(d.config.VolumeIP) < 3 { + d.config.NeedMoreLocalNode = true + weedVolumePortList := make([]int, 0) + weedVDirList := make([]string, 0) + port := d.config.WeedVolumePort + for i := 0; i < 3; i++ { + for { + ok := checkPort(port) + if ok { + weedVolumePortList = append(weedVolumePortList, port) + weedVDirList = append(weedVDirList, d.config.WeedVolumeDir+fmt.Sprintf("/%d", port)) + port++ + break + } else { + port++ + } + } + } + weedVolumeDirs = weedVDirList + d.config.weedVolumePortList = weedVolumePortList + d.config.weedVDirList = weedVDirList + } else { + weedVolumeDirs = []string{d.config.WeedVolumeDir} + } + for _, dir := range weedVolumeDirs { + if err := CreateDirIfNotExists(dir); err != nil { + return err + } + } + if d.config.NeedMoreLocalNode { + weedVolume := NewWeedVolume(d.config, d.config.weedMasterList) + d.weedVolume = weedVolume + } else { + weedVolume := NewWeedVolume(d.config, d.config.MasterIP) + d.weedVolume = weedVolume + } + return nil +} + +func (d *deployer) DeleteEtcdCluster(ctx context.Context) error { + //TODO implement me + panic("implement me") +} + +func (d *deployer) CreateWeedCluster(ctx context.Context) error { + // prepare weed master + err := d.weedMasterPrepare() + if err != nil { + return err + } + // start weed master + err = d.weedMaster.Start(ctx, d.config.BinDir+"/weed") + if err != nil { + return err + } + // check weed master health + ok := d.weedMaster.IsRunning(ctx) + if !ok { + return errors.New("weed master is not running") + } + // prepare weed volume + err = d.weedVolumePrepare() + if err != nil { + return err + } + err = d.weedVolume.Start(ctx, d.config.BinDir+"/weed") + if err != nil { + return err + } + // check weed volume health + ok = d.weedVolume.IsRunning(ctx) + if !ok { + return errors.New("weed volume is not running") + } + // register service to etcd cluster + if d.config.NeedMoreLocalNode { + for _, weedMaster := range d.config.weedMasterList { + err = d.client.RegisterService("weed-master", weedMaster) + if err != nil { + d.cleanService(d.config.weedMasterList) + return err + } + } + } else { + for _, masterIp := range d.config.MasterIP { + err = d.client.RegisterService("weed-master", masterIp) + if err != nil { + d.cleanService(d.config.MasterIP) + return err + } + } + } + return nil +} + +func (d *deployer) cleanService(list []string) { + for _, l := range list { + _ = d.client.UnRegisterService("weed-master", l) + } +} + +func (d *deployer) DeleteWeedCluster(ctx context.Context) error { + //TODO implement me + panic("implement me") +} + +func (d *deployer) UploadFile(ctx context.Context, dir string) error { + masterList, err := d.GetWeedMasterList(ctx) + if err != nil { + return err + } + for _, m := range masterList { + resp, err := d.weedMaster.UploadFile(ctx, m, dir) + if err != nil { + continue + } + // upload resp to etcd + bytes, err := json.Marshal(resp) + if err != nil { + continue + } + err = d.client.Put(dir, string(bytes)) + if err != nil { + continue + } + return nil + } + return errors.New("cannot upload file to weed cluster") +} + +func (d *deployer) DownloadFile(ctx context.Context, dir string, outputDir string) error { + masterList, err := d.GetWeedMasterList(ctx) + if err != nil { + return err + } + // get fid + fid, err := d.client.Get(dir) + if err != nil { + return err + } + var resp UploadFileResponse + err = json.Unmarshal([]byte(fid), &resp) + if err != nil { + return err + } + for _, m := range masterList { + err = d.weedMaster.DownloadFile(ctx, m, resp.Fid, outputDir) + if err != nil { + continue + } + return nil + } + return errors.New("cannot download file from weed cluster") +} + +func (d *deployer) RemoveFile(ctx context.Context, dir string) error { + //TODO implement me + panic("implement me") +} + +func NewDeployer(config *Config) Deployer { + check(config) + return &deployer{ + config: config, + etcd: NewEtcd(config), + weedMaster: NewMaster(config), + } +} + +func check(config *Config) { + // check config add set default value if not set + if config.LogDir == "" { + config.LogDir = "/tmp/log" + } + if config.DataDir == "" { + config.DataDir = "/tmp/data" + } + if config.PidDir == "" { + config.PidDir = "/tmp/pid" + } + if config.BinDir == "" { + config.BinDir = "/tmp/bin" + } + if config.EtcdConfigPath == "" { + config.EtcdConfigPath = "/tmp/etcd.conf" + } + if config.CurrentIP == "" { + config.CurrentIP = "127.0.0.1" + } + if config.PeerPort == 0 { + config.PeerPort = 2380 + } + if config.ClientPort == 0 { + config.ClientPort = 2379 + } + if config.WeedMasterPort == 0 { + config.WeedMasterPort = 9333 + } + if config.WeedVolumePort == 0 { + config.WeedVolumePort = 8080 + } + if config.WeedMasterDir == "" { + config.WeedMasterDir = "/tmp/weed-master" + } + if config.WeedVolumeDir == "" { + config.WeedVolumeDir = "/tmp/weed-volume" + } + if config.DefaultReplication == "" { + config.DefaultReplication = "003" + } + if config.WeedLogDir == "" { + config.WeedLogDir = "/tmp/weed-log" + } + if len(config.MasterIP) == 0 { + logrus.Error("master ip list is empty") + os.Exit(1) + } + if len(config.VolumeIP) == 0 { + logrus.Error("volume ip list is empty") + os.Exit(1) + } + //check if exist tar file + _, err := os.Stat(WeedDestination) + if err == nil { + _ = os.RemoveAll(WeedDestination) + } + _, err = os.Stat(EtcdDestination) + if err == nil { + _ = os.RemoveAll(EtcdDestination) + } + + // test + _, err = os.Stat(config.BinDir) + if err == nil { + _ = os.RemoveAll(config.BinDir) + } + _, err = os.Stat(config.DataDir) + if err == nil { + _ = os.RemoveAll(config.DataDir) + } + _, err = os.Stat(config.LogDir) + if err == nil { + _ = os.RemoveAll(config.LogDir) + } + _, err = os.Stat(config.PidDir) + if err == nil { + _ = os.RemoveAll(config.PidDir) + } +} diff --git a/utils/weed/interface_test.go b/utils/weed/interface_test.go new file mode 100644 index 00000000000..6ea2b3a6ffd --- /dev/null +++ b/utils/weed/interface_test.go @@ -0,0 +1,87 @@ +// Copyright © 2021 Alibaba Group Holding Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package weed + +import ( + "context" + "testing" +) + +// This should run with root permission. + +//func clean() { +//} +// +func TestDeployer_CreateEtcdCluster1(t *testing.T) { + err := NewDeployer(&Config{ + BinDir: "./test/bin3", + DataDir: "./test/data3", + LogDir: "./test/log3", + MasterIP: []string{"127.0.0.1:1111", "127.0.0.1:2222", "127.0.0.1:3333"}, + VolumeIP: []string{"127.0.0.1:4444", "127.0.0.1:5555", "127.0.0.1:6666"}, + PidDir: "./test/pid3", + CurrentIP: "127.0.0.1", + PeerPort: 3333, + ClientPort: 2390, + EtcdConfigPath: "./test/etcd3.conf", + }).CreateEtcdCluster(context.Background()) + if err != nil { + t.Error(err) + return + } +} + +// +//func TestDeployer_CreateEtcdCluster2(t *testing.T) { +// err := NewDeployer(&Config{ +// BinDir: "./test/bin1", +// DataDir: "./test/data1", +// LogDir: "./test/log1", +// MasterIP: []string{"127.0.0.1:1111", "127.0.0.1:2222", "127.0.0.1:3333"}, +// PidDir: "./test/pid1", +// CurrentIP: "127.0.0.1", +// PeerPort: 1111, +// ClientPort: 2391, +// EtcdConfigPath: "./test/etcd1.conf", +// }).CreateEtcdCluster(context.Background()) +// if err != nil { +// t.Error(err) +// return +// } +//} +// +//func TestDeployer_CreateEtcdCluster3(t *testing.T) { +// err := NewDeployer(&Config{ +// BinDir: "./test/bin2", +// DataDir: "./test/data2", +// LogDir: "./test/log2", +// MasterIP: []string{"127.0.0.1:1111", "127.0.0.1:2222", "127.0.0.1:3333"}, +// PidDir: "./test/pid2", +// CurrentIP: "127.0.0.1", +// PeerPort: 2222, +// ClientPort: 2392, +// EtcdConfigPath: "./test/etcd2.conf", +// }).CreateEtcdCluster(context.Background()) +// if err != nil { +// t.Error(err) +// return +// } +//} +// +//func TestDownloadWeed(t *testing.T) { +// d := &deployer{} +// err := d.downloadWeed() +// assert.Nil(t, err) +//} diff --git a/utils/weed/test/main.go b/utils/weed/test/main.go new file mode 100644 index 00000000000..6a948463c8e --- /dev/null +++ b/utils/weed/test/main.go @@ -0,0 +1,103 @@ +package main + +import ( + "context" + "github.com/sealerio/sealer/utils/weed" + "github.com/sealerio/sealer/version" + "github.com/sirupsen/logrus" + "github.com/spf13/cobra" + "os" +) + +func main() { + rootCmd := cobra.Command{ + Use: "weed", + Short: "A tool to build, share and run any distributed applications.", + } + rootCmd.AddCommand(startCmd()) + rootCmd.AddCommand(writeCmd()) + rootCmd.AddCommand(downloadFileCmd()) + if err := rootCmd.Execute(); err != nil { + logrus.Errorf("sealer-%s: %v", version.GetSingleVersion(), err) + os.Exit(1) + } + +} + +var config = &weed.Config{} + +func startCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "start", + Short: "start to run a weed cluster", + RunE: func(cmd *cobra.Command, args []string) error { + deployer := weed.NewDeployer(config) + err := deployer.CreateEtcdCluster(context.Background()) + if err != nil { + return err + } + err = deployer.CreateWeedCluster(context.Background()) + if err != nil { + return err + } + return nil + }, + } + cmd.Flags().StringSliceVar(&config.MasterIP, "master-ip", []string{}, "master ip list") + cmd.Flags().StringSliceVar(&config.VolumeIP, "volume-ip", []string{}, "volume ip list") + cmd.Flags().StringVar(&config.LogDir, "log-dir", "", "log dir") + cmd.Flags().StringVar(&config.DataDir, "data-dir", "", "data dir") + cmd.Flags().StringVar(&config.PidDir, "pid-dir", "", "pid dir") + cmd.Flags().StringVar(&config.BinDir, "bin-dir", "", "bin dir") + cmd.Flags().StringVar(&config.EtcdConfigPath, "etcd-config-path", "", "etcd config path") + cmd.Flags().StringVar(&config.CurrentIP, "current-ip", "", "current ip") + cmd.Flags().IntVar(&config.PeerPort, "peer-port", 0, "peer port") + cmd.Flags().IntVar(&config.ClientPort, "client-port", 0, "client port") + cmd.Flags().IntVar(&config.WeedMasterPort, "weed-master-port", 0, "weed master port") + cmd.Flags().IntVar(&config.WeedVolumePort, "weed-volume-port", 0, "weed volume port") + cmd.Flags().BoolVar(&config.NeedMoreLocalNode, "need-more-local-node", false, "need more local node") + cmd.Flags().StringVar(&config.WeedMasterDir, "weed-master-dir", "", "weed master dir") + cmd.Flags().StringVar(&config.WeedVolumeDir, "weed-volume-dir", "", "weed volume dir") + cmd.Flags().StringVar(&config.DefaultReplication, "default-replication", "", "default replication") + cmd.Flags().StringVar(&config.WeedLogDir, "weed-log-dir", "", "weed log dir") + return cmd +} + +var dir string + +func writeCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "write", + Short: "write data to weed cluster", + RunE: func(cmd *cobra.Command, args []string) error { + deployer := weed.NewDeployer(config) + err := deployer.UploadFile(context.Background(), dir) + if err != nil { + return err + } + return nil + }, + } + cmd.Flags().StringVar(&dir, "dir", "", "dir") + return cmd +} + +var out string + +func downloadFileCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "download", + Short: "download data from weed cluster", + RunE: func(cmd *cobra.Command, args []string) error { + deployer := weed.NewDeployer(config) + err := deployer.DownloadFile(context.Background(), dir, out) + if err != nil { + return err + } + return nil + }, + } + cmd.Flags().StringVar(&dir, "dir", "", "dir") + cmd.Flags().StringVar(&out, "out", "", "out") + return cmd +} diff --git a/utils/weed/weed.go b/utils/weed/weed.go new file mode 100644 index 00000000000..a5bee824027 --- /dev/null +++ b/utils/weed/weed.go @@ -0,0 +1,69 @@ +// Copyright © 2021 Alibaba Group Holding Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package weed + +import ( + "errors" + "io" + "net/http" + "os" + "runtime" +) + +var ( + weedURL = "https://github.com/seaweedfs/seaweedfs/releases/download/3.54/" +) + +const ( + extractFolder = "/tmp" +) + +func weedDownloadURL() (string, error) { + if runtime.GOOS != "linux" { + return "", errors.New("unsupported os") + } + switch arch := runtime.GOARCH; arch { + case "amd64": + weedURL += "linux_amd64.tar.gz" + case "arm64": + weedURL += "linux_arm.tar.gz" + default: + return "", errors.New("unsupported arch") + } + return weedURL, nil +} + +func downloadFile(url string, dest string) error { + resp, err := http.Get(url) + if err != nil { + return err + } + defer resp.Body.Close() + + // check if the destination folder exists + _, err = os.Stat(dest) + if err == nil { + _ = os.RemoveAll(dest) + } + + out, err := os.Create(dest) + if err != nil { + return err + } + defer out.Close() + + _, err = io.Copy(out, resp.Body) + return err +} diff --git a/utils/weed/weed_master.go b/utils/weed/weed_master.go new file mode 100644 index 00000000000..9dd76939240 --- /dev/null +++ b/utils/weed/weed_master.go @@ -0,0 +1,171 @@ +// Copyright © 2021 Alibaba Group Holding Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package weed + +import ( + "context" + "encoding/json" + "strconv" + "strings" + "sync" + + "github.com/sealerio/sealer/utils/exec" +) + +type Master interface { + Exec + UploadFile(ctx context.Context, master string, dir string) (UploadFileResponse, error) + DownloadFile(ctx context.Context, master string, fid string, outputDir string) error + RemoveFile(ctx context.Context, master string, dir string) error +} + +type master struct { + ip string + port int + mDir string + defaultReplication string + peers []string + needMoreLocalNode bool + portList []int + mDirList []string + wg *sync.WaitGroup +} + +type UploadFileResponse struct { + Fid string `json:"fid"` + URL string `json:"url"` + FileName string `json:"fileName"` + Size int64 `json:"size"` +} + +func (m *master) UploadFile(ctx context.Context, master string, dir string) (UploadFileResponse, error) { + runOptions := RunOptions{ + Binary: "weed", + Name: "upload", + args: m.buildUploadFileArgs(ctx, master, dir), + } + jsonResponse, err := runBinaryWithJSONResponse(ctx, &runOptions, m.wg) + if err != nil { + return UploadFileResponse{}, err + } + var uploadFileResponse UploadFileResponse + err = json.Unmarshal(jsonResponse, &uploadFileResponse) + if err != nil { + return UploadFileResponse{}, err + } + return uploadFileResponse, nil +} + +func (m *master) buildUploadFileArgs(ctx context.Context, params ...interface{}) []string { + _ = ctx + return []string{ + "-master=" + params[0].(string), + "-dir=" + params[1].(string), + } +} + +func (m *master) buildDownloadFileArgs(ctx context.Context, params ...interface{}) []string { + _ = ctx + return []string{ + "-server=" + params[0].(string), + "--dir=" + params[2].(string), + params[1].(string), + } +} + +func (m *master) DownloadFile(ctx context.Context, master string, fid string, outputDir string) error { + runOptions := RunOptions{ + Binary: "weed", + Name: "download", + args: m.buildDownloadFileArgs(ctx, master, fid, outputDir), + } + err := runBinary(ctx, &runOptions, m.wg) + if err != nil { + return err + } + return nil +} + +func (m *master) RemoveFile(ctx context.Context, master string, fid string) error { + //TODO weed may not support remove file, may be should consider to use other file system + panic("implement me") +} + +func (m *master) Start(ctx context.Context, binary string) error { + if m.needMoreLocalNode { + return m.startCluster(ctx, binary) + } + return m.startSingle(ctx, binary) +} + +func (m *master) BuildArgs(ctx context.Context, params ...interface{}) []string { + return []string{ + "master", + "-ip " + m.ip, + "-port " + params[0].(string), + "-mdir " + params[1].(string), + "-peers " + strings.Join(m.peers, ","), + "-defaultReplication " + m.defaultReplication, + } +} + +func (m *master) IsRunning(ctx context.Context) bool { + err := exec.Cmd("lsof", "-i:"+strconv.Itoa(m.port)) + return err == nil +} + +func (m *master) Name() string { + return "master" +} + +func NewMaster(config *Config) Master { + return &master{ + ip: config.CurrentIP, + port: config.WeedMasterPort, + mDir: config.WeedMasterDir, + defaultReplication: config.DefaultReplication, + peers: config.MasterIP, + needMoreLocalNode: config.NeedMoreLocalNode, + wg: new(sync.WaitGroup), + } +} + +func (m *master) startSingle(ctx context.Context, binary string) error { + runOptions := &RunOptions{ + Binary: binary, + Name: "master", + args: m.BuildArgs(ctx, strconv.Itoa(m.port), m.mDir), + } + err := runBinary(ctx, runOptions, m.wg) + if err != nil { + return err + } + return nil +} + +func (m *master) startCluster(ctx context.Context, binary string) error { + for i, port := range m.portList { + runOptions := &RunOptions{ + Binary: binary, + Name: "master", + args: m.BuildArgs(ctx, strconv.Itoa(port), m.mDirList[i]), + } + err := runBinary(ctx, runOptions, m.wg) + if err != nil { + return err + } + } + return nil +} diff --git a/utils/weed/weed_test.go b/utils/weed/weed_test.go new file mode 100644 index 00000000000..c29b1f66ccc --- /dev/null +++ b/utils/weed/weed_test.go @@ -0,0 +1,15 @@ +// Copyright © 2021 Alibaba Group Holding Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package weed diff --git a/utils/weed/weed_volumn.go b/utils/weed/weed_volumn.go new file mode 100644 index 00000000000..6614b0e8570 --- /dev/null +++ b/utils/weed/weed_volumn.go @@ -0,0 +1,102 @@ +// Copyright © 2021 Alibaba Group Holding Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package weed + +import ( + "context" + "strconv" + "strings" + "sync" + + "github.com/sealerio/sealer/utils/exec" +) + +type Volume interface { + Exec +} + +type volume struct { + ip string + port int + dir string + mServer []string + needMoreLocalNode bool + dirList []string + portList []int + wg *sync.WaitGroup +} + +func NewWeedVolume(config *Config, mServer []string) Volume { + return &volume{ + ip: config.CurrentIP, + port: config.WeedVolumePort, + dir: config.WeedVolumeDir, + mServer: mServer, + needMoreLocalNode: config.NeedMoreLocalNode, + wg: new(sync.WaitGroup), + } +} + +func (v *volume) Start(ctx context.Context, binary string) error { + if v.needMoreLocalNode { + return v.startCluster(ctx, binary) + } + return v.startSingle(ctx, binary) +} + +func (v *volume) BuildArgs(ctx context.Context, params ...interface{}) []string { + return []string{ + "-mServer " + strings.Join(v.mServer, ","), + "-port " + params[0].(string), + "-dir " + params[1].(string), + } +} + +func (v *volume) IsRunning(ctx context.Context) bool { + err := exec.Cmd("lsof", "-i:"+strconv.Itoa(v.port)) + return err == nil +} + +func (v *volume) Name() string { + return "volume" +} + +func (v *volume) startCluster(ctx context.Context, binary string) error { + for i := 0; i < len(v.portList); i++ { + runOptions := &RunOptions{ + Binary: binary, + Name: "volume", + args: v.BuildArgs(ctx, strconv.Itoa(v.portList[i]), v.dirList[i]), + } + err := runBinary(ctx, runOptions, v.wg) + if err != nil { + return err + } + } + return nil +} + +func (v *volume) startSingle(ctx context.Context, binary string) error { + runOptions := &RunOptions{ + Binary: binary, + Name: "volume", + args: v.BuildArgs(ctx, strconv.Itoa(v.port), v.dir), + } + err := runBinary(ctx, runOptions, v.wg) + if err != nil { + return err + } + return nil +}