Skip to content

Commit

Permalink
Introduce framwork of RuntimeManager and cri-proxy (#105)
Browse files Browse the repository at this point in the history
Signed-off-by: pengyang.hpy <honpey@gmail.com>
  • Loading branch information
honpey committed Apr 28, 2022
1 parent 5915884 commit 1a83fb3
Show file tree
Hide file tree
Showing 11 changed files with 659 additions and 1 deletion.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/docker/docker v20.10.2+incompatible
github.com/fsnotify/fsnotify v1.4.9
github.com/go-logr/logr v0.4.0
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da
github.com/golang/mock v1.6.0
github.com/google/uuid v1.2.0
github.com/jinzhu/copier v0.3.5
Expand Down Expand Up @@ -61,7 +62,6 @@ require (
github.com/go-openapi/jsonreference v0.19.5 // indirect
github.com/go-openapi/swag v0.19.14 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/go-cmp v0.5.5 // indirect
github.com/google/gofuzz v1.1.0 // indirect
Expand Down
86 changes: 86 additions & 0 deletions pkg/runtime-manager/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
Copyright 2022 The Koordinator Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package dispatcher

import (
"context"
"fmt"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/klog/v2"

"github.com/koordinator-sh/koordinator/apis/runtime/v1alpha1"
"github.com/koordinator-sh/koordinator/pkg/runtime-manager/config"
)

// RuntimeHookDispatcher dispatches hook request to RuntimeHookServer(e.g. koordlet)
type RuntimeHookDispatcher struct {
cm *HookServerClientManager
hookManager *config.Manager
}

func NewRuntimeDispatcher(cm *HookServerClientManager, hookManager *config.Manager) *RuntimeHookDispatcher {
return &RuntimeHookDispatcher{
cm: cm,
hookManager: hookManager,
}
}

func (rd *RuntimeHookDispatcher) dispatchInternal(ctx context.Context, hookType config.RuntimeHookType,
client *RuntimeHookClient, request interface{}) (response interface{}, err error) {
switch hookType {
case config.PreRunPodSandbox:
return client.PreRunPodSandboxHook(ctx, request.(*v1alpha1.RunPodSandboxHookRequest))
case config.PreStartContainer:
return client.PreStartContainerHook(ctx, request.(*v1alpha1.ContainerResourceHookRequest))
case config.PreUpdateContainerResources:
return client.PreUpdateContainerResourcesHook(ctx, request.(*v1alpha1.ContainerResourceHookRequest))
case config.PostStartContainer:
return client.PostStartContainerHook(ctx, request.(*v1alpha1.ContainerResourceHookRequest))
case config.PostStopContainer:
return client.PostStopContainerHook(ctx, request.(*v1alpha1.ContainerResourceHookRequest))
}
return nil, status.Errorf(codes.Unimplemented, fmt.Sprintf("method %v not implemented", string(hookType)))
}

func (rd *RuntimeHookDispatcher) Dispatch(ctx context.Context, runtimeRequestPath config.RuntimeRequestPath,
stage config.RuntimeHookStage, request interface{}) (interface{}, error) {
hookServers := rd.hookManager.GetAllHook()
for _, hookServer := range hookServers {
for _, hookType := range hookServer.RuntimeHooks {
if !hookType.OccursOn(runtimeRequestPath) {
continue
}
if hookType.HookStage() != stage {
continue
}
client, err := rd.cm.RuntimeHookClient(HookServerPath{
Path: hookServer.RemoteEndpoint,
})
if err != nil {
klog.Errorf("fail to get client %v", err)
continue
}
// currently, only one hook be called during one runtime
// TODO: multi hook server to merge response
return rd.dispatchInternal(ctx, hookType, client, request)
}
}
return nil, nil
}
87 changes: 87 additions & 0 deletions pkg/runtime-manager/dispatcher/hookclient.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
Copyright 2022 The Koordinator Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package dispatcher

import (
"encoding/json"
"fmt"

"github.com/golang/groupcache/lru"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"k8s.io/klog/v2"

"github.com/koordinator-sh/koordinator/apis/runtime/v1alpha1"
)

type HookServerClientManager struct {
cache *lru.Cache
}

const (
defaultCacheSize = 10
)

// NewClientManager
// TODO: garbage client gc
func NewClientManager() *HookServerClientManager {
cache := lru.New(defaultCacheSize)
return &HookServerClientManager{
cache: cache,
}
}

type HookServerPath struct {
Path string
Port int64
}

type RuntimeHookClient struct {
SockPath string
v1alpha1.RuntimeHookServiceClient
}

func newRuntimeHookClient(sockPath string) (*RuntimeHookClient, error) {
client := &RuntimeHookClient{
SockPath: sockPath,
}
conn, err := grpc.Dial(fmt.Sprintf("unix://%v", sockPath),
grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, err
}
client.RuntimeHookServiceClient = v1alpha1.NewRuntimeHookServiceClient(conn)
return client, nil
}

func (cm *HookServerClientManager) RuntimeHookClient(serverPath HookServerPath) (*RuntimeHookClient, error) {
cacheKey, err := json.Marshal(serverPath)
if err != nil {
return nil, err
}
if client, ok := cm.cache.Get(string(cacheKey)); ok {
return client.(*RuntimeHookClient), nil
}

runtimeHookClient, err := newRuntimeHookClient(serverPath.Path)
if err != nil {
klog.Errorf("fail to create client %v", err)
return nil, err
}
cm.cache.Add(string(cacheKey), runtimeHookClient)
return runtimeHookClient, nil
}
38 changes: 38 additions & 0 deletions pkg/runtime-manager/resource-executor/resource_executor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
Copyright 2022 The Koordinator Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package resource_executor

type RuntimeResourceExecutor interface {
GetMetaInfo() string
GenerateResourceCheckpoint() interface{}
GenerateHookRequest() interface{}
ParseRequest(request interface{}) error
ResourceCheckPoint(response interface{}) error
}

type RuntimeResourceType string

const (
RuntimePodResource RuntimeResourceType = "RuntimePodResource"
RuntimeContainerResource RuntimeResourceType = "RuntimeContainerResource"
RuntimeNoopResource RuntimeResourceType = "RuntimeNoopResource"
)

func NewRuntimeResourceExecutor(runtimeResourceType RuntimeResourceType) RuntimeResourceExecutor {
return nil
}
137 changes: 137 additions & 0 deletions pkg/runtime-manager/server/cri/criserver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
Copyright 2022 The Koordinator Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package cri

import (
"context"
"net"
"time"

"google.golang.org/grpc"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
"k8s.io/klog/v2"

"github.com/koordinator-sh/koordinator/pkg/runtime-manager/config"
"github.com/koordinator-sh/koordinator/pkg/runtime-manager/dispatcher"
resource_executor "github.com/koordinator-sh/koordinator/pkg/runtime-manager/resource-executor"
"github.com/koordinator-sh/koordinator/pkg/runtime-manager/server/utils"
)

const (
defaultTimeout = 5 * time.Second
)

type RuntimeManagerCriServer struct {
hookDispatcher *dispatcher.RuntimeHookDispatcher
backendClient runtimeapi.RuntimeServiceClient
}

func NewRuntimeManagerCriServer(dispatcher *dispatcher.RuntimeHookDispatcher) *RuntimeManagerCriServer {
criInterceptor := &RuntimeManagerCriServer{
hookDispatcher: dispatcher,
}
return criInterceptor
}

func (ci *RuntimeManagerCriServer) Name() string {
return "RuntimeManagerCriServer"
}

func (ci *RuntimeManagerCriServer) Run() error {
if err := ci.initBackendServer(utils.DefaultContainerdSocketPath); err != nil {
return err
}
lis, err := net.Listen("unix", utils.DefaultRuntimeManagerSocketPath)
if err != nil {
klog.Errorf("fail to create the lis %v", err)
return err
}
grpcServer := grpc.NewServer()
runtimeapi.RegisterRuntimeServiceServer(grpcServer, ci)
err = grpcServer.Serve(lis)
return err
}

func (ci *RuntimeManagerCriServer) getRuntimeHookInfo(serviceType RuntimeServiceType) (config.RuntimeRequestPath,
resource_executor.RuntimeResourceType) {
switch serviceType {
case RunPodSandbox:
return config.RunPodSandbox, resource_executor.RuntimePodResource
case CreateContainer:
// No Nook point in create container, but we need store the container info during container create
return config.NoneRuntimeHookPath, resource_executor.RuntimeContainerResource
case StartContainer:
return config.StartContainer, resource_executor.RuntimeContainerResource
case StopContainer:
return config.StopContainer, resource_executor.RuntimeContainerResource
case UpdateContainerResources:
return config.UpdateContainerResources, resource_executor.RuntimeContainerResource
}
return config.NoneRuntimeHookPath, resource_executor.RuntimeNoopResource
}

func (ci *RuntimeManagerCriServer) interceptRuntimeRequest(serviceType RuntimeServiceType,
ctx context.Context, request interface{}, handler grpc.UnaryHandler) (interface{}, error) {

runtimeHookPath, runtimeResourceType := ci.getRuntimeHookInfo(serviceType)
resourceExecutor := resource_executor.NewRuntimeResourceExecutor(runtimeResourceType)

if err := resourceExecutor.ParseRequest(request); err != nil {
klog.Errorf("fail to parse request %v %v", request, err)
}

// pre call hook server
// TODO deal with the Dispatch response
if _, err := ci.hookDispatcher.Dispatch(ctx, runtimeHookPath, config.PreHook, resourceExecutor.GenerateHookRequest()); err != nil {
klog.Errorf("fail to call hook server %v", err)
}

// call the backend runtime engine
res, err := handler(ctx, request)
if err == nil {
klog.Infof("%v call on backend %v success", resourceExecutor.GetMetaInfo(), string(runtimeHookPath))
// store checkpoint info basing request
// checkpoint only when response success
if err := resourceExecutor.ResourceCheckPoint(res); err != nil {
klog.Errorf("fail to checkpoint %v %v", resourceExecutor.GetMetaInfo(), err)
}
} else {
klog.Errorf("%v call on backend %v fail %v", resourceExecutor.GetMetaInfo(), string(runtimeHookPath), err)
}

// post call hook server
// TODO the response
ci.hookDispatcher.Dispatch(ctx, runtimeHookPath, config.PostHook, resourceExecutor.GenerateHookRequest())
return res, err
}

func dialer(ctx context.Context, addr string) (net.Conn, error) {
return (&net.Dialer{}).DialContext(ctx, "unix", addr)
}

func (ci *RuntimeManagerCriServer) initBackendServer(sockPath string) error {
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
defer cancel()
conn, err := grpc.DialContext(ctx, sockPath, grpc.WithInsecure(), grpc.WithContextDialer(dialer))
if err != nil {
klog.Infof("err to create %v\n", err)
return err
}
ci.backendClient = runtimeapi.NewRuntimeServiceClient(conn)
return nil
}
Loading

0 comments on commit 1a83fb3

Please sign in to comment.