Skip to content

Commit

Permalink
wip - still needs alloc watcher tests
Browse files Browse the repository at this point in the history
  • Loading branch information
schmichael committed Nov 17, 2017
1 parent 0f44008 commit e2972ce
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 9 deletions.
15 changes: 15 additions & 0 deletions client/alloc_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,9 @@ func (p *remotePrevAlloc) streamAllocDir(ctx context.Context, resp io.ReadCloser
}
}

// if we see this file, there was an error on the remote side
errorFilename := allocdir.SnapshotErrorFilename(p.prevAllocID)

buf := make([]byte, 1024)
for !canceled() {
// Get the next header
Expand All @@ -478,6 +481,18 @@ func (p *remotePrevAlloc) streamAllocDir(ctx context.Context, resp io.ReadCloser
p.prevAllocID, p.allocID, err)
}

if hdr.Name == errorFilename {
// Error snapshotting on the remote side, try to read
// the message out of the file and return it.
errBuf := make([]byte, int(hdr.Size))
if _, err := tr.Read(errBuf); err != nil {
return fmt.Errorf("error streaming previous alloc %q for new alloc %q; failed reading error message: %v",
p.prevAllocID, p.allocID, err)
}
return fmt.Errorf("error streaming previous alloc %q for new alloc %q: %s",
p.prevAllocID, p.allocID, string(errBuf))
}

// If the header is for a directory we create the directory
if hdr.Typeflag == tar.TypeDir {
os.MkdirAll(filepath.Join(dest, hdr.Name), os.FileMode(hdr.Mode))
Expand Down
49 changes: 48 additions & 1 deletion client/allocdir/alloc_dir.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ const (
)

var (
// SnapshotErrorTime is the sentinel time that will be used on the
// error file written by Snapshot when it encounters as error.
SnapshotErrorTime = time.Date(2000, 0, 0, 0, 0, 0, 0, time.UTC)

// The name of the directory that is shared across tasks in a task group.
SharedAllocName = "alloc"

Expand Down Expand Up @@ -128,6 +132,10 @@ func (d *AllocDir) NewTaskDir(name string) *TaskDir {

// Snapshot creates an archive of the files and directories in the data dir of
// the allocation and the task local directories
//
// Since a valid tar may have been written even when an error occurs, a special
// file "NOMAD-${ALLOC_ID}-ERROR.log" will be appended to the tar with the
// error message as the contents.
func (d *AllocDir) Snapshot(w io.Writer) error {
allocDataDir := filepath.Join(d.SharedDir, SharedDataDir)
rootPaths := []string{allocDataDir}
Expand Down Expand Up @@ -162,7 +170,9 @@ func (d *AllocDir) Snapshot(w io.Writer) error {
return fmt.Errorf("error creating file header: %v", err)
}
hdr.Name = relPath
tw.WriteHeader(hdr)
if err := tw.WriteHeader(hdr); err != nil {
return err
}

// If it's a directory or symlink we just write the header into the tar
if fileInfo.IsDir() || (fileInfo.Mode()&os.ModeSymlink != 0) {
Expand All @@ -186,6 +196,15 @@ func (d *AllocDir) Snapshot(w io.Writer) error {
// directories in the archive
for _, path := range rootPaths {
if err := filepath.Walk(path, walkFn); err != nil {
allocID := filepath.Base(d.AllocDir)
if writeErr := writeError(tw, allocID, err); writeErr != nil {
// This could be bad; other side won't know
// snapshotting failed. It could also just mean
// the snapshotting side closed the connect
// prematurely and won't try to use the tar
// anyway.
d.logger.Printf("[WARN] client: snapshotting failed and unable to write error marker: %v", writeErr)
}
return fmt.Errorf("failed to snapshot %s: %v", path, err)
}
}
Expand Down Expand Up @@ -562,3 +581,31 @@ func splitPath(path string) ([]fileInfo, error) {
}
return dirs, nil
}

// SnapshotErrorFilename returns the filename which will exist if there was an
// error snapshotting a tar.
func SnapshotErrorFilename(allocID string) string {
return fmt.Sprintf("NOMAD-%s-ERROR.log", allocID)
}

// writeError writes a special file to a tar archive with the error encountered
// during snapshotting. See Snapshot().
func writeError(tw *tar.Writer, allocID string, err error) error {
contents := []byte(fmt.Sprintf("Error snapshotting: %v", err))
hdr := tar.Header{
Name: SnapshotErrorFilename(allocID),
Mode: 0666,
Size: int64(len(contents)),
AccessTime: SnapshotErrorTime,
ChangeTime: SnapshotErrorTime,
ModTime: SnapshotErrorTime,
Typeflag: tar.TypeReg,
}

if err := tw.WriteHeader(&hdr); err != nil {
return err
}

_, err = tw.Write(contents)
return err
}
43 changes: 35 additions & 8 deletions command/agent/alloc_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,8 +408,9 @@ func TestHTTP_AllocSnapshot_Atomic(t *testing.T) {
}

// Make the HTTP request to ensure the Snapshot error is
// propagated through to the HTTP layer and that a valid tar
// just isn't emitted.
// propagated through to the HTTP layer. Since the tar is
// streamed over a 200 HTTP response the only way to signal an
// error is by writing a marker file.
respW := httptest.NewRecorder()
req, err := http.NewRequest("GET", fmt.Sprintf("/v1/client/allocation/%s/snapshot", alloc.ID), nil)
if err != nil {
Expand All @@ -420,19 +421,45 @@ func TestHTTP_AllocSnapshot_Atomic(t *testing.T) {
// by Snapshot is properly propogated via HTTP
s.Server.mux.ServeHTTP(respW, req)
resp := respW.Result()
t.Logf("HTTP Response Status Code: %d", resp.StatusCode)
r := tar.NewReader(resp.Body)
errorFilename := allocdir.SnapshotErrorFilename(alloc.ID)
markerFound := false
markerContents := ""
for {
header, err := r.Next()
if err != nil {
if err == io.EOF {
t.Fatalf("Looks like a valid tar file to me?")
if err != io.EOF {
// Huh, I wonder how a non-EOF error can happen?
t.Errorf("Unexpected error while streaming: %v", err)
}
t.Logf("Yay! An error: %v", err)
return
break
}

t.Logf("Valid file returned: %s", header.Name)
if markerFound {
// No more files should be found after the failure marker
t.Errorf("Next file found after error marker: %s", header.Name)
break
}

if header.Name == errorFilename {
// Found it!
markerFound = true
buf := make([]byte, int(header.Size))
if _, err := r.Read(buf); err != nil {
t.Errorf("Unexpected error reading error marker %s: %v", errorFilename, err)
} else {
markerContents = string(buf)
}
}
}

if !markerFound {
t.Fatalf("marker file %s not written; bad tar will be treated as good!", errorFilename)
}
if markerContents == "" {
t.Fatalf("marker file %s empty", markerContents)
} else {
t.Logf("EXPECTED snapshot error: %s", markerContents)
}
})
}
Expand Down

0 comments on commit e2972ce

Please sign in to comment.