Skip to content

Commit

Permalink
Handle skipping indexes
Browse files Browse the repository at this point in the history
  • Loading branch information
dadgar committed Jul 21, 2016
1 parent 7c36a83 commit eb88e87
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 17 deletions.
84 changes: 73 additions & 11 deletions command/agent/fs_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ const (
// being flushed if the frame size has not been hit.
streamBatchWindow = 200 * time.Millisecond

// nextLogCheckRate is the rate at which we check for a log entry greater
// than what we are watching for. This is to handle the case in which logs
// rotate faster than we can detect and we have to rely on a normal
// directory listing.
nextLogCheckRate = 100 * time.Millisecond

// deleteEvent and truncateEvent are the file events that can be sent in a
// StreamFrame
deleteEvent = "file deleted"
Expand Down Expand Up @@ -781,8 +787,7 @@ func (s *HTTPServer) logs(follow bool, offset int64,
close(eofCancelCh)
exitAfter = true
} else {
nextPath := filepath.Join(logPath, fmt.Sprintf("%s.%s.%d", task, logType, idx+1))
eofCancelCh = fs.BlockUntilExists(nextPath, &t)
eofCancelCh = blockUntilNextLog(fs, &t, logPath, task, logType, idx+1)
}

p := filepath.Join(logPath, logEntry.Name)
Expand Down Expand Up @@ -815,6 +820,52 @@ func (s *HTTPServer) logs(follow bool, offset int64,
return nil
}

// blockUntilNextLog returns a channel that will have data sent when the next
// log index or anything greater is created.
func blockUntilNextLog(fs allocdir.AllocDirFS, t *tomb.Tomb, logPath, task, logType string, nextIndex int64) chan error {
nextPath := filepath.Join(logPath, fmt.Sprintf("%s.%s.%d", task, logType, nextIndex))
next := make(chan error, 1)

go func() {
eofCancelCh := fs.BlockUntilExists(nextPath, t)
scanCh := time.Tick(nextLogCheckRate)
for {
select {
case err := <-eofCancelCh:
next <- err
close(next)
return
case <-scanCh:
entries, err := fs.List(logPath)
if err != nil {
next <- fmt.Errorf("failed to list entries: %v", err)
close(next)
return
}

indexes, err := logIndexes(entries, task, logType)
if err != nil {
next <- err
close(next)
return
}

// Scan and see if there are any entries larger than what we are
// waiting for.
for _, entry := range indexes {
if entry.idx >= nextIndex {
next <- nil
close(next)
return
}
}
}
}
}()

return next
}

// indexTuple and indexTupleArray are used to find the correct log entry to
// start streaming logs from
type indexTuple struct {
Expand All @@ -828,14 +879,10 @@ func (a indexTupleArray) Len() int { return len(a) }
func (a indexTupleArray) Less(i, j int) bool { return a[i].idx < a[j].idx }
func (a indexTupleArray) Swap(i, j int) { a[i], a[j] = a[j], a[i] }

// findClosest takes a list of entries, the desired log index and desired log
// offset (which can be negative, treated as offset from end), task name and log
// type and returns the log entry, the log index, the offset to read from and a
// potential error.
func findClosest(entries []*allocdir.AllocFileInfo, desiredIdx, desiredOffset int64,
task, logType string) (*allocdir.AllocFileInfo, int64, int64, error) {

// Build the matching indexes
// logIndexes takes a set of entries and returns a indexTupleArray of
// the desired log file entries. If the indexes could not be determined, an
// error is returned.
func logIndexes(entries []*allocdir.AllocFileInfo, task, logType string) (indexTupleArray, error) {
var indexes []indexTuple
prefix := fmt.Sprintf("%s.%s.", task, logType)
for _, entry := range entries {
Expand All @@ -852,12 +899,27 @@ func findClosest(entries []*allocdir.AllocFileInfo, desiredIdx, desiredOffset in
// Convert to an int
idx, err := strconv.Atoi(idxStr)
if err != nil {
return nil, 0, 0, fmt.Errorf("failed to convert %q to a log index: %v", idxStr, err)
return nil, fmt.Errorf("failed to convert %q to a log index: %v", idxStr, err)
}

indexes = append(indexes, indexTuple{idx: int64(idx), entry: entry})
}

return indexTupleArray(indexes), nil
}

// findClosest takes a list of entries, the desired log index and desired log
// offset (which can be negative, treated as offset from end), task name and log
// type and returns the log entry, the log index, the offset to read from and a
// potential error.
func findClosest(entries []*allocdir.AllocFileInfo, desiredIdx, desiredOffset int64,
task, logType string) (*allocdir.AllocFileInfo, int64, int64, error) {

// Build the matching indexes
indexes, err := logIndexes(entries, task, logType)
if err != nil {
return nil, 0, 0, err
}
if len(indexes) == 0 {
return nil, 0, 0, fmt.Errorf("log entry for task %q and log type %q not found", task, logType)
}
Expand Down
12 changes: 6 additions & 6 deletions command/agent/fs_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"reflect"
"strconv"
"strings"
"syscall"
"testing"
"time"

Expand Down Expand Up @@ -901,7 +900,7 @@ func TestHTTP_Logs_Follow(t *testing.T) {

// Start streaming logs
go func() {
if err := s.Server.logs(true, 0, OriginStart, task, logType, ad, wrappedW); err != nil && err != syscall.EPIPE {
if err := s.Server.logs(true, 0, OriginStart, task, logType, ad, wrappedW); err != nil {
t.Fatalf("logs() failed: %v", err)
}
}()
Expand All @@ -912,9 +911,11 @@ func TestHTTP_Logs_Follow(t *testing.T) {
t.Fatalf("did not receive data: got %q", string(received))
}

// We got the first chunk of data, write out the rest to the file to
// check that it is following
writeToFile(initialWrites, expected[initialWrites:])
// We got the first chunk of data, write out the rest to the next file
// at an index much ahead to check that it is following and detecting
// skips
skipTo := initialWrites + 10
writeToFile(skipTo, expected[initialWrites:])

select {
case <-fullResultCh:
Expand All @@ -930,7 +931,6 @@ func TestHTTP_Logs_Follow(t *testing.T) {
}, func(err error) {
t.Fatalf("connection not closed")
})

})
}

Expand Down

0 comments on commit eb88e87

Please sign in to comment.