Skip to content

Commit

Permalink
Merge pull request #143 from glerchundi/master
Browse files Browse the repository at this point in the history
sdjournal: add support for cursors (get&seek&test)
  • Loading branch information
lucab committed May 26, 2016
2 parents 61576a9 + ce21643 commit b9b5f59
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 0 deletions.
93 changes: 93 additions & 0 deletions sdjournal/journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,24 @@ package sdjournal
// }
//
// int
// my_sd_journal_get_cursor(void *f, sd_journal *j, char **cursor)
// {
// int (*sd_journal_get_cursor)(sd_journal *, char **);
//
// sd_journal_get_cursor = f;
// return sd_journal_get_cursor(j, cursor);
// }
//
// int
// my_sd_journal_test_cursor(void *f, sd_journal *j, const char *cursor)
// {
// int (*sd_journal_test_cursor)(sd_journal *, const char *);
//
// sd_journal_test_cursor = f;
// return sd_journal_test_cursor(j, cursor);
// }
//
// int
// my_sd_journal_get_realtime_usec(void *f, sd_journal *j, uint64_t *usec)
// {
// int (*sd_journal_get_realtime_usec)(sd_journal *, uint64_t *);
Expand All @@ -181,6 +199,16 @@ package sdjournal
// return sd_journal_seek_tail(j);
// }
//
//
// int
// my_sd_journal_seek_cursor(void *f, sd_journal *j, const char *cursor)
// {
// int (*sd_journal_seek_cursor)(sd_journal *, const char *);
//
// sd_journal_seek_cursor = f;
// return sd_journal_seek_cursor(j, cursor);
// }
//
// int
// my_sd_journal_seek_realtime_usec(void *f, sd_journal *j, uint64_t usec)
// {
Expand Down Expand Up @@ -597,6 +625,50 @@ func (j *Journal) GetRealtimeUsec() (uint64, error) {
return uint64(usec), nil
}

// GetCursor gets the cursor of the current journal entry.
func (j *Journal) GetCursor() (string, error) {
var d *C.char

sd_journal_get_cursor, err := j.getFunction("sd_journal_get_cursor")
if err != nil {
return "", err
}

j.mu.Lock()
r := C.my_sd_journal_get_cursor(sd_journal_get_cursor, j.cjournal, &d)
j.mu.Unlock()

if r < 0 {
return "", fmt.Errorf("failed to get cursor: %d", syscall.Errno(-r))
}

cursor := C.GoString(d)

return cursor, nil
}

// TestCursor checks whether the current position in the journal matches the
// specified cursor
func (j *Journal) TestCursor(cursor string) error {
sd_journal_test_cursor, err := j.getFunction("sd_journal_test_cursor")
if err != nil {
return err
}

c := C.CString(cursor)
defer C.free(unsafe.Pointer(c))

j.mu.Lock()
r := C.my_sd_journal_test_cursor(sd_journal_test_cursor, j.cjournal, c)
j.mu.Unlock()

if r < 0 {
return fmt.Errorf("failed to test to cursor %q: %d", cursor, syscall.Errno(-r))
}

return nil
}

// SeekHead seeks to the beginning of the journal, i.e. the oldest available
// entry.
func (j *Journal) SeekHead() error {
Expand Down Expand Up @@ -654,6 +726,27 @@ func (j *Journal) SeekRealtimeUsec(usec uint64) error {
return nil
}

// SeekCursor seeks to a concrete journal cursor.
func (j *Journal) SeekCursor(cursor string) error {
sd_journal_seek_cursor, err := j.getFunction("sd_journal_seek_cursor")
if err != nil {
return err
}

c := C.CString(cursor)
defer C.free(unsafe.Pointer(c))

j.mu.Lock()
r := C.my_sd_journal_seek_cursor(sd_journal_seek_cursor, j.cjournal, c)
j.mu.Unlock()

if r < 0 {
return fmt.Errorf("failed to seek to cursor %q: %d", cursor, syscall.Errno(-r))
}

return nil
}

// Wait will synchronously wait until the journal gets changed. The maximum time
// this call sleeps may be controlled with the timeout parameter. If
// sdjournal.IndefiniteWait is passed as the timeout parameter, Wait will
Expand Down
62 changes: 62 additions & 0 deletions sdjournal/journal_test.go
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
package sdjournal

import (
"errors"
"fmt"
"io"
"os"
"testing"
"time"
Expand Down Expand Up @@ -88,3 +91,62 @@ func TestJournalGetUsage(t *testing.T) {
t.Fatalf("Error getting journal size: %s", err)
}
}

func TestJournalCursorGetSeekAndTest(t *testing.T) {
j, err := NewJournal()
if err != nil {
t.Fatalf("Error opening journal: %s", err)
}

if j == nil {
t.Fatal("Got a nil journal")
}

defer j.Close()

waitAndNext := func(j *Journal) error {
r := j.Wait(time.Duration(1) * time.Second)
if r < 0 {
return errors.New("Error waiting to journal")
}

n, err := j.Next()
if err != nil {
return fmt.Errorf("Error reading to journal: %s", err)
}

if n == 0 {
return fmt.Errorf("Error reading to journal: %s", io.EOF)
}

return nil
}

err = journal.Print(journal.PriInfo, "test message for cursor %s", time.Now())
if err != nil {
t.Fatalf("Error writing to journal: %s", err)
}

if err = waitAndNext(j); err != nil {
t.Fatalf(err.Error())
}

c, err := j.GetCursor()
if err != nil {
t.Fatalf("Error getting cursor from journal: %s", err)
}

err = j.SeekCursor(c)
if err != nil {
t.Fatalf("Error seeking cursor to journal: %s", err)
}

if err = waitAndNext(j); err != nil {
t.Fatalf(err.Error())
}

err = j.TestCursor(c)
if err != nil {
t.Fatalf("Error testing cursor to journal: %s", err)
}
}
6 changes: 6 additions & 0 deletions sdjournal/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type JournalReaderConfig struct {
// where the reading begins within the journal.
Since time.Duration // start relative to a Duration from now
NumFromTail uint64 // start relative to the tail
Cursor string // start relative to the cursor

// Show only journal entries whose fields match the supplied values. If
// the array is empty, entries will not be filtered.
Expand Down Expand Up @@ -89,6 +90,11 @@ func NewJournalReader(config JournalReaderConfig) (*JournalReader, error) {
if _, err := r.journal.PreviousSkip(config.NumFromTail + 1); err != nil {
return nil, err
}
} else if config.Cursor != "" {
// Start based on a custom cursor
if err := r.journal.SeekCursor(config.Cursor); err != nil {
return nil, err
}
}

return r, nil
Expand Down

0 comments on commit b9b5f59

Please sign in to comment.