Skip to content

Commit

Permalink
added invoker and deployer
Browse files Browse the repository at this point in the history
removed name from knativeworkloads yaml files

Signed-off-by: Dmitrii Ustiugov <dmitrii.ustiugov@epfl.ch>
  • Loading branch information
ustiugov committed Nov 30, 2020
1 parent 6eb084f commit 0a71497
Show file tree
Hide file tree
Showing 7 changed files with 357 additions and 4 deletions.
1 change: 0 additions & 1 deletion configs/knative_workloads/helloworld.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
name: helloworld # The name of the function instance
namespace: default
spec:
template:
Expand Down
1 change: 0 additions & 1 deletion configs/knative_workloads/helloworldSerial.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
name: helloworldserial # The name of the function instance
namespace: default
spec:
template:
Expand Down
1 change: 0 additions & 1 deletion configs/knative_workloads/pyaes.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
name: pyaes # The name of the function instance
namespace: default
spec:
template:
Expand Down
1 change: 0 additions & 1 deletion configs/knative_workloads/rnn_serving.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
name: rnnserving # The name of the function instance
namespace: default
spec:
template:
Expand Down
163 changes: 163 additions & 0 deletions examples/deployer/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
// MIT License
//
// Copyright (c) 2020 Dmitrii Ustiugov and EASE lab
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package main

import (
"bufio"
"encoding/json"
"flag"
"fmt"
"io/ioutil"
"os"
"os/exec"
"path/filepath"

log "github.com/sirupsen/logrus"
)

var (
gatewayURL = flag.String("gatewayURL", "192.168.1.240.xip.io", "URL of the gateway")
namespaceName = flag.String("namespace", "default", "name of namespace in which services exists")
)

func main() {
funcPath := flag.String("funcPath", "./configs/knative_workloads", "Path to the folder with *.yml files")
funcJSONFile := flag.String("jsonFile", "./examples/deployer/functions.json", "Path to the JSON file with functions to deploy")
urlFile := flag.String("urlFile", "urls.txt", "File with functions' URLs")
deploymentConcurrency := flag.Int("conc", 5, "Number of functions to deploy concurrently")

flag.Parse()

log.Debug("Function files are taken from ", *funcPath)

funcSlice := getFuncSlice(*funcJSONFile)

urls := deploy(*funcPath, funcSlice, *deploymentConcurrency)

writeURLs(*urlFile, urls)

log.Infoln("Deployment finished")
}

// Functions is an object for unmarshalled JSON with functions to deploy.
type Functions struct {
Functions []functionType `json:"functions"`
}

type functionType struct {
Name string `json:"name"`
File string `json:"file"`

// number of functions to deploy from the same file (with different names)
Count int `json:"count"`
}

func getFuncSlice(file string) []functionType {
log.Debugf("Opening JSON file with functions: %s\n", file)
jsonFile, err := os.Open(file)
if err != nil {
log.Fatal(err)
}
defer jsonFile.Close()

byteValue, err := ioutil.ReadAll(jsonFile)
if err != nil {
log.Fatal(err)
}

var functions Functions

json.Unmarshal(byteValue, &functions)

return functions.Functions
}

func deploy(funcPath string, funcSlice []functionType, deploymentConcurrency int) []string {
var urls []string
sem := make(chan bool, deploymentConcurrency) // limit the number of parallel deployments

for _, fType := range funcSlice {
for i := 0; i < fType.Count; i++ {

sem <- true

funcName := fmt.Sprintf("%s-%d", fType.Name, i)
url := fmt.Sprintf("%s.%s.%s", funcName, *namespaceName, *gatewayURL)
urls = append(urls, url)

filePath := filepath.Join(funcPath, fType.File)

go func(funcName, filePath string) {
defer func() { <-sem }()

deployFunction(funcName, filePath)
}(funcName, filePath)
}
}

for i := 0; i < cap(sem); i++ {
sem <- true
}

return urls
}

func deployFunction(funcName, filePath string) {
cmd := exec.Command(
"kn",
"service",
"apply",
funcName,
"-f",
filePath,
"--concurrency-target",
"1",
)
stdoutStderr, err := cmd.CombinedOutput()
if err != nil {
log.Warnf("Failed to deploy function %s, %s: %v\n%s\n", funcName, filePath, err, stdoutStderr)
}

log.Info("Deployed function", funcName)
}

func writeURLs(filePath string, urls []string) {
file, err := os.OpenFile(filePath, os.O_CREATE|os.O_WRONLY, 0644)

if err != nil {
log.Fatalf("failed creating file: %s", err)
}

datawriter := bufio.NewWriter(file)

for _, url := range urls {
_, err := datawriter.WriteString(url + "\n")
if err != nil {
log.Fatal("Failed to write the URLs to a file ", err)
}
}

datawriter.Flush()
file.Close()
return
}
19 changes: 19 additions & 0 deletions examples/deployer/functions.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"functions": [
{
"name": "helloworld",
"file": "helloworld.yaml",
"count": 1
},
{
"name": "pyaes",
"file": "pyaes.yaml",
"count": 2
},
{
"name": "rnn-serving",
"file": "rnn_serving.yaml",
"count": 3
}
]
}
175 changes: 175 additions & 0 deletions examples/invoker/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
// MIT License
//
// Copyright (c) 2020 Dmitrii Ustiugov and EASE lab
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package main

import (
"bufio"
"context"
"flag"
"fmt"
"os"
"strconv"
"sync"
"sync/atomic"
"time"

log "github.com/sirupsen/logrus"
pb "github.com/ustiugov/fccd-orchestrator/helloworld"
"google.golang.org/grpc"
)

var (
completed int64
latSlice LatencySlice
)

func main() {
urlFile := flag.String("urlFile", "urls.txt", "File with functions' URLs")
rps := flag.Int("rps", 1, "Target requests per second")
runDuration := flag.Int("time", 5, "Run the benchmark for X seconds")
latencyOutputFile := flag.String("latf", "lat.csv", "CSV file for the latency measurements in microseconds")

flag.Parse()

log.Infof("Reading the URLs from the file: %s", *urlFile)

urls, err := readLines(*urlFile)
if err != nil {
log.Fatal("Failed to read the URL files:", err)
}

realRPS := runBenchmark(urls, *runDuration, *rps)

writeLatencies(realRPS, *latencyOutputFile)
}

func readLines(path string) ([]string, error) {
file, err := os.Open(path)
if err != nil {
return nil, err
}
defer file.Close()

var lines []string
scanner := bufio.NewScanner(file)
for scanner.Scan() {
lines = append(lines, scanner.Text())
}
return lines, scanner.Err()
}

func runBenchmark(urls []string, runDuration, targetRPS int) (realRPS float64) {
timeout := time.After(time.Duration(runDuration) * time.Second)
tick := time.Tick(time.Duration(1000/targetRPS) * time.Millisecond)

var issued int
start := time.Now()

for {
select {
case <-timeout:
duration := time.Since(start).Seconds()
realRPS = float64(completed) / duration
log.Infof("Real / target RPS : %.2f / %v", realRPS, targetRPS)

log.Println("Benchmark finished!")

return
case <-tick:
url := urls[issued%len(urls)]
go invokeFunction(url)

issued++
}
}
}

func invokeFunction(url string) {
defer getDuration(startMeasurement(url)) // measure entire invocation time

address := fmt.Sprintf("%s:%d", url, 80)

conn, err := grpc.Dial(address, grpc.WithInsecure(), grpc.WithBlock())
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()

c := pb.NewGreeterClient(conn)

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

_, err = c.SayHello(ctx, &pb.HelloRequest{Name: "faas"})
if err != nil {
log.Warnf("Failed to invoke %v, err=%v", address, err)
}

atomic.AddInt64(&completed, 1)

return
}

// LatencySlice is a thread-safe slice to hold a slice of latency measurements.
type LatencySlice struct {
sync.Mutex
slice []int64
}

func startMeasurement(msg string) (string, time.Time) {
return msg, time.Now()
}

func getDuration(msg string, start time.Time) {
latency := time.Since(start).Microseconds()
log.Debugf("Invoked %v in %v usec\n", msg, latency)

latSlice.Lock()
latSlice.slice = append(latSlice.slice, latency)
latSlice.Unlock()
}

func writeLatencies(rps float64, latencyOutputFile string) {
latSlice.Lock()
defer latSlice.Unlock()

fileName := fmt.Sprintf("%s_%frps", latencyOutputFile, rps)
file, err := os.OpenFile(fileName, os.O_CREATE|os.O_WRONLY, 0644)

if err != nil {
log.Fatalf("failed creating file: %s", err)
}

datawriter := bufio.NewWriter(file)

for _, lat := range latSlice.slice {
_, err := datawriter.WriteString(strconv.FormatInt(lat, 10) + "\n")
if err != nil {
log.Fatal("Failed to write the URLs to a file ", err)
}
}

datawriter.Flush()
file.Close()
return
}

0 comments on commit 0a71497

Please sign in to comment.