Skip to content

Commit

Permalink
Revert changes introduced by wrong branch merging.
Browse files Browse the repository at this point in the history
Signed-off-by: Vittorio Cozzolino <vittorio.cozzolino@huawei.com>
  • Loading branch information
vcozzolino committed Oct 25, 2021
1 parent ca2d259 commit d804a4c
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 272 deletions.
250 changes: 0 additions & 250 deletions pkg/globalmanager/runtime/storage_initializer_injector.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"path/filepath"
"strings"

appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -348,255 +347,6 @@ func injectInitializerContainer(pod *v1.Pod, workerParam *WorkerParam) {
injectVolume(pod, volumes, volumeMounts)
}

/*
Deployment Storage Hooks
*/

func injectHostPathMountDeployment(deployment *appsv1.Deployment, workerParam *WorkerParam) {
var volumes []v1.Volume
var volumeMounts []v1.VolumeMount
var initContainerVolumeMounts []v1.VolumeMount

uniqVolumeName := make(map[string]bool)

hostPathType := v1.HostPathDirectory

for _, mount := range workerParam.Mounts {
for _, m := range mount.URLs {
if m.HostPath == "" {
continue
}

volumeName := ConvertK8SValidName(m.HostPath)

if len(volumeName) == 0 {
volumeName = defaultVolumeName
klog.Warningf("failed to get name from url(%s), fallback to default name(%s)", m.URL, volumeName)
}

if _, ok := uniqVolumeName[volumeName]; !ok {
volumes = append(volumes, v1.Volume{
Name: volumeName,
VolumeSource: v1.VolumeSource{
HostPath: &v1.HostPathVolumeSource{
Path: m.HostPath,
Type: &hostPathType,
},
},
})
uniqVolumeName[volumeName] = true
}

vm := v1.VolumeMount{
MountPath: m.MountPath,
Name: volumeName,
}
if m.Indirect {
initContainerVolumeMounts = append(initContainerVolumeMounts, vm)
} else {
volumeMounts = append(volumeMounts, vm)
}
}
}

injectVolumeDeployment(deployment, volumes, volumeMounts)

if len(volumeMounts) > 0 {
hostPathEnvs := []v1.EnvVar{
{
Name: hostPathPrefixEnvKey,
Value: hostPathPrefix,
},
}
injectEnvsDeployment(deployment, hostPathEnvs)
}

if len(initContainerVolumeMounts) > 0 {
initIdx := len(deployment.Spec.Template.Spec.InitContainers) - 1
deployment.Spec.Template.Spec.InitContainers[initIdx].VolumeMounts = append(
deployment.Spec.Template.Spec.InitContainers[initIdx].VolumeMounts,
initContainerVolumeMounts...,
)
}
}

func injectWorkerSecretsDeployment(deployment *appsv1.Deployment, workerParam *WorkerParam) {
var secretEnvs []v1.EnvVar
for _, mount := range workerParam.Mounts {
for _, m := range mount.URLs {
if m.Disable || m.DownloadByInitializer {
continue
}
if len(m.SecretEnvs) > 0 {
secretEnvs = MergeSecretEnvs(secretEnvs, m.SecretEnvs, false)
}
}
}
injectEnvsDeployment(deployment, secretEnvs)
}

func injectInitializerContainerDeployment(deployment *appsv1.Deployment, workerParam *WorkerParam) {
var volumes []v1.Volume
var volumeMounts []v1.VolumeMount

var downloadPairs []string
var secretEnvs []v1.EnvVar
for _, mount := range workerParam.Mounts {
for _, m := range mount.URLs {
if m.Disable {
continue
}

srcURL := m.DownloadSrcURL
dstDir := m.DownloadDstDir
if srcURL != "" && dstDir != "" {
// need to add srcURL first: srcURL dstDir
if m.Indirect {
// here add indirectURLMark into dstDir which is controllable
dstDir = indirectURLMark + dstDir
}
downloadPairs = append(downloadPairs, srcURL, dstDir)

if len(m.SecretEnvs) > 0 {
secretEnvs = MergeSecretEnvs(secretEnvs, m.SecretEnvs, false)
}
}
}
}

// no need to download
if len(downloadPairs) == 0 {
return
}

envs := secretEnvs
envs = append(envs, v1.EnvVar{
Name: indirectURLMarkEnv,
Value: indirectURLMark,
})

// use one empty directory
storageVolume := v1.Volume{
Name: downloadInitalizerVolumeName,
VolumeSource: v1.VolumeSource{
EmptyDir: &v1.EmptyDirVolumeSource{},
},
}

storageVolumeMounts := v1.VolumeMount{
Name: storageVolume.Name,
MountPath: downloadInitalizerPrefix,
ReadOnly: true,
}
volumes = append(volumes, storageVolume)
volumeMounts = append(volumeMounts, storageVolumeMounts)

initVolumeMounts := []v1.VolumeMount{
{
Name: storageVolume.Name,
MountPath: downloadInitalizerPrefix,
ReadOnly: false,
},
}

initContainer := v1.Container{
Name: downloadInitalizerContainerName,
Image: downloadInitalizerImage,
ImagePullPolicy: v1.PullIfNotPresent,
Args: downloadPairs,

TerminationMessagePolicy: v1.TerminationMessageFallbackToLogsOnError,

Resources: v1.ResourceRequirements{
Limits: map[v1.ResourceName]resource.Quantity{
// limit one cpu
v1.ResourceCPU: resource.MustParse("1"),
// limit 1Gi memory
v1.ResourceMemory: resource.MustParse("1Gi"),
},
},
VolumeMounts: initVolumeMounts,
Env: envs,
}

deployment.Spec.Template.Spec.InitContainers = append(deployment.Spec.Template.Spec.InitContainers, initContainer)
injectVolumeDeployment(deployment, volumes, volumeMounts)
}

// InjectStorageInitializer injects these storage related volumes and envs into deployment in-place
func InjectStorageInitializerDeployment(deployment *appsv1.Deployment, workerParam *WorkerParam) {
var mounts []WorkerMount
// parse the mounts and environment key
for _, mount := range workerParam.Mounts {
var envPaths []string

if mount.URL != nil {
mount.URLs = append(mount.URLs, *mount.URL)
}

var mountURLs []MountURL
for _, m := range mount.URLs {
m.Parse()
if m.Disable {
continue
}
mountURLs = append(mountURLs, m)

if m.ContainerPath != "" {
envPaths = append(envPaths, m.ContainerPath)
} else {
// keep the original URL if no container path
envPaths = append(envPaths, m.URL)
}
}

if len(mountURLs) > 0 {
mount.URLs = mountURLs
mounts = append(mounts, mount)
}

if mount.EnvName != "" {
workerParam.Env[mount.EnvName] = strings.Join(
envPaths, urlsFieldSep,
)
}
}

workerParam.Mounts = mounts

// need to call injectInitializerContainer before injectHostPathMount
// since injectHostPathMount could inject volumeMount to init container
injectInitializerContainerDeployment(deployment, workerParam)
injectHostPathMountDeployment(deployment, workerParam)
injectWorkerSecretsDeployment(deployment, workerParam)
}

func injectVolumeDeployment(deployment *appsv1.Deployment, volumes []v1.Volume, volumeMounts []v1.VolumeMount) {
if len(volumes) > 0 {
deployment.Spec.Template.Spec.Volumes = append(deployment.Spec.Template.Spec.Volumes, volumes...)
}

if len(volumeMounts) > 0 {
for idx := range deployment.Spec.Template.Spec.Containers {
// inject every containers
deployment.Spec.Template.Spec.Containers[idx].VolumeMounts = append(
deployment.Spec.Template.Spec.Containers[idx].VolumeMounts, volumeMounts...,
)
}
}
}

func injectEnvsDeployment(deployment *appsv1.Deployment, envs []v1.EnvVar) {
if len(envs) > 0 {
for idx := range deployment.Spec.Template.Spec.Containers {
// inject every containers
deployment.Spec.Template.Spec.Containers[idx].Env = append(
deployment.Spec.Template.Spec.Containers[idx].Env, envs...,
)
}
}
}

// InjectStorageInitializer injects these storage related volumes and envs into pod in-place
func InjectStorageInitializer(pod *v1.Pod, workerParam *WorkerParam) {
var mounts []WorkerMount
Expand Down
27 changes: 5 additions & 22 deletions pkg/globalmanager/runtime/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,42 +261,25 @@ func newDeployment(object CommonInterface, spec *appsv1.DeploymentSpec, workerPa
}

// injectDeploymentParam modifies deployment in-place
func injectDeploymentParam(deployment *appsv1.Deployment, workerParam *WorkerParam, object CommonInterface, _port int32) {
var appLabelKey = "app.sedna.io"
var appLabelValue = object.GetName() + "-" + workerParam.WorkerType + "-" + "svc"

func injectDeploymentParam(deployment *appsv1.Deployment, workerParam *WorkerParam, object CommonInterface, port int32) {
// inject our labels
if deployment.Labels == nil {
deployment.Labels = make(map[string]string)
}
if deployment.Spec.Template.Labels == nil {
deployment.Spec.Template.Labels = make(map[string]string)
}
if deployment.Spec.Selector.MatchLabels == nil {
deployment.Spec.Selector.MatchLabels = make(map[string]string)
}

for k, v := range generateLabels(object, workerParam.WorkerType) {
deployment.Labels[k] = v
deployment.Spec.Template.Labels[k] = v
deployment.Spec.Selector.MatchLabels[k] = v
}

// Edgemesh part, useful for service mapping
deployment.Labels[appLabelKey] = appLabelValue
deployment.Spec.Template.Labels[appLabelKey] = appLabelValue
deployment.Spec.Selector.MatchLabels[appLabelKey] = appLabelValue

// Env variables injection
envs := createEnvVars(workerParam.Env)
for idx := range deployment.Spec.Template.Spec.Containers {
deployment.Spec.Template.Spec.Containers[idx].Env = append(
deployment.Spec.Template.Spec.Containers[idx].Env, envs...,
)
deployment.Spec.Template.Spec.Containers[0].Ports = []v1.ContainerPort{
{
ContainerPort: port,
},
}

InjectStorageInitializerDeployment(deployment, workerParam)

}

// createEnvVars creates EnvMap for container
Expand Down

0 comments on commit d804a4c

Please sign in to comment.