Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
Signed-off-by: Plamen Petrov <plamb0brt@gmail.com>
  • Loading branch information
plamenmpetrov committed Sep 11, 2020
1 parent f916134 commit c9e2a40
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 61 deletions.
17 changes: 7 additions & 10 deletions ctriface/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,17 +583,16 @@ func (o *Orchestrator) ResumeVM(ctx context.Context, vmID string) (string, *metr

// CreateSnapshot Creates a snapshot of a VM
func (o *Orchestrator) CreateSnapshot(ctx context.Context, vmID string) (string, error) {
var (
snapPath string = o.getSnapshotFile(vmID)
memPath string = o.getMemoryFile(vmID)
)

logger := log.WithFields(log.Fields{"vmID": vmID})
logger.Debug("Orchestrator received CreateSnapshot")

ctx = namespaces.WithNamespace(ctx, namespaceName)

req := &proto.CreateSnapshotRequest{VMID: vmID, SnapshotFilePath: snapPath, MemFilePath: memPath}
req := &proto.CreateSnapshotRequest{
VMID: vmID,
SnapshotFilePath: o.getSnapshotFile(vmID),
MemFilePath: o.getMemoryFile(vmID),
}

if _, err := o.fcClient.CreateSnapshot(ctx, req); err != nil {
logger.Warn("failed to create snapshot of the VM: ", err)
Expand All @@ -608,8 +607,6 @@ func (o *Orchestrator) LoadSnapshot(ctx context.Context, vmID string) (string, *
var (
loadSnapshotMetric *metrics.Metric = metrics.NewMetric()
tStart time.Time
snapPath string = o.getSnapshotFile(vmID)
memPath string = o.getMemoryFile(vmID)
activateDone chan error = make(chan error)
activateErr error
)
Expand All @@ -621,8 +618,8 @@ func (o *Orchestrator) LoadSnapshot(ctx context.Context, vmID string) (string, *

req := &proto.LoadSnapshotRequest{
VMID: vmID,
SnapshotFilePath: snapPath,
MemFilePath: memPath,
SnapshotFilePath: o.getSnapshotFile(vmID),
MemFilePath: o.getMemoryFile(vmID),
EnableUserPF: o.GetUPFEnabled(),
}

Expand Down
13 changes: 9 additions & 4 deletions memory/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func (m *MemoryManager) DeregisterVM(vmID string) error {
}

// Activate Creates an epoller to serve page faults for the VM
// doneCh is used to indicate that all associated goroutines have completed
func (m *MemoryManager) Activate(vmID string, doneCh chan error) (err error) {
logger := log.WithFields(log.Fields{"vmID": vmID})

Expand Down Expand Up @@ -119,19 +120,23 @@ func (m *MemoryManager) Activate(vmID string, doneCh chan error) (err error) {
if state.metricsModeOn {
tStart = time.Now()
}
state.fetchState()
if err := state.fetchState(); err != nil {
return err
}
if state.metricsModeOn {
state.currentMetric.MetricMap[fetchStateMetric] = metrics.ToUS(time.Since(tStart))
}
}

go activateAsync(state, doneCh)
go activateRest(state, doneCh)

return nil
}

// The asynchronous functionality of Activate
func activateAsync(state *SnapshotState, doneCh chan error) {
// The remaining functionality of Activate
// It is done asynchronously because getUFFD relies on executing
// loadSnapshot simultaneously
func activateRest(state *SnapshotState, doneCh chan error) {
logger := log.WithFields(log.Fields{"vmID": state.VMID})

var (
Expand Down
65 changes: 18 additions & 47 deletions memory/manager/snapshot_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,6 @@ type SnapshotState struct {
epfd int
quitCh chan int

// prefetch the VMM state to the host memory
isPrefetchVMMState bool
// to indicate whether the instance has even been activated. this is to
// get around cases where offload is called for the first time
isEverActivated bool
Expand All @@ -63,8 +61,6 @@ type SnapshotState struct {

isRecordReady bool

isWSCopy bool

guestMem []byte
workingSet []byte

Expand Down Expand Up @@ -166,21 +162,24 @@ func alignment(block []byte, alignSize int) int {

// AlignedBlock returns []byte of size BlockSize aligned to a multiple
// of alignSize in memory (must be power of two)
func AlignedBlock(BlockSize int) []byte {
func AlignedBlock(blockSize int) []byte {
alignSize := os.Getpagesize() // must be multiple of the filesystem block size

block := make([]byte, BlockSize+alignSize)
if alignSize == 0 {
return block
if blockSize == 0 {
return nil
}

block := make([]byte, blockSize+alignSize)

a := alignment(block, alignSize)
offset := 0
if a != 0 {
offset = alignSize - a
}
block = block[offset : offset+BlockSize]
// Can't check alignment of a zero sized block
if BlockSize != 0 {
block = block[offset : offset+blockSize]

// Check
if blockSize != 0 {
a = alignment(block, alignSize)
if a != 0 {
log.Fatal("Failed to align block")
Expand All @@ -189,35 +188,36 @@ func AlignedBlock(BlockSize int) []byte {
return block
}

// fetchState Fetches the working set file (or the whole guest memory) and/or the VMM state file
func (s *SnapshotState) fetchState() {
// if s.isPrefetchVMMState {
// fetchState Fetches the working set file (or the whole guest memory) and the VMM state file
func (s *SnapshotState) fetchState() error {
if _, err := ioutil.ReadFile(s.VMMStatePath); err != nil {
log.Errorf("Failed to fetch VMM state: %v\n", err)
return err
}
//}

size := len(s.trace.trace) * os.Getpagesize()

// O_DIRECT allows to fully leverage disk bandwidth by bypassing the OS page cache
f, err := os.OpenFile(s.WorkingSetPath, os.O_RDONLY|syscall.O_DIRECT, 0600)
if err != nil {
log.Errorf("Failed to open the working set file for direct-io: %v\n", err)
return err
}

s.workingSet = AlignedBlock(size) // direct io requires aligned buffer

if n, err := f.Read(s.workingSet); n != size || err != nil {
log.Errorf("Reading working set file failed: %v\n", err)
return err
}

//trace.wsFetched = true
log.Debug("Fetched the entire working set")
if err := f.Close(); err != nil {
log.Errorf("Failed to close the working set file: %v\n", err)
return err
}

// return nil FIXME: add error checks in this function
return nil
}

func (s *SnapshotState) pollUserPageFaults(readyCh chan int) {
Expand Down Expand Up @@ -352,7 +352,7 @@ func (s *SnapshotState) servePageFault(fd int, address uint64) error {
func (s *SnapshotState) installWorkingSetPages(fd int) {
log.Debug("Installing the working set pages")

// build a list of sorted regions (probably, it's better to make trace.regions an array instead of a map FIXME)
// build a list of sorted regions
keys := make([]uint64, 0)
for k := range s.trace.regions {
keys = append(keys, k)
Expand All @@ -361,49 +361,20 @@ func (s *SnapshotState) installWorkingSetPages(fd int) {

var (
srcOffset uint64
//wg sync.WaitGroup
)

/*
concurrency := 10
sem := make(chan bool, concurrency) // channel-based semaphore to limit IOCTLs in-flight
*/

for _, offset := range keys {
regLength := s.trace.regions[offset]
regAddress := s.startAddress + offset
mode := uint64(C.const_UFFDIO_COPY_MODE_DONTWAKE)
src := uint64(uintptr(unsafe.Pointer(&s.workingSet[srcOffset])))
dst := regAddress

// BUG: concurrency of 10-100 goroutines caused some OS kernel process (kworker) to hang
/*
wg.Add(1)
sem <- true
go func(fd int, src, dst, len uint64) {
defer wg.Done()
installRegion(fd, src, dst, mode, len)
<-sem
}(fd, src, dst, uint64(regLength))
*/

installRegion(fd, src, dst, mode, uint64(regLength))

srcOffset += uint64(regLength) * 4096
}

/*
for i := 0; i < cap(sem); i++ {
sem <- true
}
wg.Wait()
*/

// working set installation happens on the first page fault that is always at startAddress
wake(fd, s.startAddress, os.Getpagesize())
}

Expand Down

0 comments on commit c9e2a40

Please sign in to comment.