Skip to content

Commit

Permalink
Updates to journal functionallity to make it safer
Browse files Browse the repository at this point in the history
Lots of concurrency fixes and some readme updates
  • Loading branch information
mattiasrunge committed Jan 15, 2021
1 parent 7b2aad3 commit 2a916bc
Show file tree
Hide file tree
Showing 37 changed files with 606 additions and 328 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,4 +106,5 @@ Every project needs a reason for being.
- Uses a domain socket for communication (CLI -> background process)
- Stores state and logs under ```~/.config/barf```
- Uses the installed version of [rsync](https://rsync.samba.org/), make sure there is one
- Well defined socket protocol, allowing for other types of clients
- Only tested on Linux, but might work on other systems
13 changes: 6 additions & 7 deletions cmd/barf/run/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@ package run

import (
"fmt"
"os"

"barf/internal/com/socket"
"barf/internal/proc/daemon"
"barf/internal/ui"

cli "github.com/jawher/mow.cli"
)

// StartCLI starts the CLI process
Expand All @@ -17,22 +16,22 @@ func StartCLI(width int, action func() error) {

if err != nil {
fmt.Println(err)
cli.Exit(255)
os.Exit(255)
return
}

err = socket.Connect()

if err != nil {
fmt.Println(err)
cli.Exit(255)
os.Exit(255)
return
}

socket.OnClose(func() {
if !normalClose {
fmt.Println("Lost connection to backend")
cli.Exit(1)
os.Exit(1)
}
})

Expand All @@ -42,7 +41,7 @@ func StartCLI(width int, action func() error) {

if err != nil {
fmt.Println(err)
cli.Exit(255)
os.Exit(255)
return
}

Expand All @@ -52,5 +51,5 @@ func StartCLI(width int, action func() error) {
socket.Close()
socket.WaitOnClose()

cli.Exit(exitCode)
os.Exit(exitCode)
}
2 changes: 1 addition & 1 deletion cmd/barf/run/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func startDaemon() error {

fmt.Println("Listening for connections")

err = runner.StartRunner()
err = runner.Start()

if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion docs/svg/copy-monitor-many.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion docs/svg/copy-monitor.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion docs/svg/copy-normal.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion docs/svg/copy-remote.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion docs/svg/daemon-journal.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
5 changes: 3 additions & 2 deletions internal/cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,15 @@ func (cmd *Cmd) Start(args []string) (int, error) {
}

cmd.cmd = exec.Command(args[0], args[1:]...)
var wg sync.WaitGroup

stdout, _ := cmd.cmd.StdoutPipe()
stderr, _ := cmd.cmd.StderrPipe()

go cmd.handleLog(stdout, cmd.stdoutHandler)
go cmd.handleLog(stderr, cmd.stderrHandler)

cmd.stdoutHandler("Executing: " + strings.Join(args, " "))

err := cmd.cmd.Start()

if err != nil {
Expand All @@ -56,7 +57,7 @@ func (cmd *Cmd) Start(args []string) (int, error) {
return 255, err
}

wg.Wait()
cmd.wg.Wait()

err = cmd.cmd.Wait()

Expand Down
49 changes: 19 additions & 30 deletions internal/journal/actions.go → internal/coordinator/actions.go
Original file line number Diff line number Diff line change
@@ -1,50 +1,38 @@
package journal
package coordinator

import (
"errors"

"barf/internal/com/server"
"barf/internal/journal"
"barf/internal/op"
)

func create(opType op.OperationType, args op.OperationArgs) (*op.Operation, error) {
operation := op.NewOperation(opType, args)
index, err := getNextIndex()

if registerdStartHandler == nil {
return operation, errors.New("No start handler registered")
if err != nil {
return nil, err
}

e := entry{
Operation: operation,
Status: op.NewStatus(),
}
operation := op.NewOperation(opType, args, index)

err := addEntry(&e)
e, err := journal.NewJournalEntry(operation)

if err != nil {
return operation, err
}
addEntry(e)

err = server.OperationCreated(operation)

if err != nil {
UpdateOperationStatus(operation.ID, &op.OperationStatus{
Finished: true,
ExitCode: 255,
Error: err.Error(),
})
return operation, err
}
_ = server.OperationCreated(operation)

err = registerdStartHandler(operation)

if err != nil {
UpdateOperationStatus(operation.ID, &op.OperationStatus{
Finished: true,
ExitCode: 255,
Error: err.Error(),
Message: err.Error(),
})
return operation, err

return nil, err
}

return operation, nil
Expand All @@ -61,8 +49,9 @@ func abort(operationID op.OperationID) error {
UpdateOperationStatus(operationID, &op.OperationStatus{
Finished: true,
ExitCode: 254,
Error: err.Error(),
Message: err.Error(),
})

return err
}

Expand All @@ -72,19 +61,19 @@ func abort(operationID op.OperationID) error {
func list() ([]*op.Operation, error) {
var operations []*op.Operation

for _, e := range entries {
for _, e := range getEntries() {
operations = append(operations, e.Operation)
}

return operations, nil
}

func status(operationID op.OperationID) (*op.OperationStatus, error) {
entry, err := getEntry(operationID)
entry := getEntry(operationID)

if err != nil {
return nil, err
if entry == nil {
return nil, errors.New("No entry for operation with id " + string(operationID) + " found!")
}

return entry.Status, errors.New("No such operation found")
return entry.Status, nil
}
78 changes: 78 additions & 0 deletions internal/coordinator/entries.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package coordinator

import (
"errors"
"sync"

"barf/internal/journal"
"barf/internal/op"
)

var entries []*journal.JournalEntry
var entriesMu sync.Mutex

func addEntry(e *journal.JournalEntry) {
entriesMu.Lock()
defer entriesMu.Unlock()

entries = append(entries, e)
}

func removeEntry(e *journal.JournalEntry) {
entriesMu.Lock()
defer entriesMu.Unlock()

for i, s := range entries {
if s == e {
copy(entries[i:], entries[i+1:])
entries = entries[:len(entries)-1]

return
}
}
}

func getEntry(operationID op.OperationID) *journal.JournalEntry {
entriesMu.Lock()
defer entriesMu.Unlock()

for _, e := range entries {
if e.Operation.ID == operationID {
return e
}
}

return nil
}

func getEntries() []*journal.JournalEntry {
entriesMu.Lock()
defer entriesMu.Unlock()

result := make([]*journal.JournalEntry, len(entries))
copy(result, entries)

return result
}

func getNextIndex() (op.OperationIndex, error) {
var index op.OperationIndex = 1

indexFree := func(index op.OperationIndex) bool {
for _, e := range entries {
if e.Operation.Index == index {
return false
}
}

return true
}

for ; !indexFree(index); index++ {
if index == 0 {
return 0, errors.New("Could not get new index, overflow")
}
}

return index, nil
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package journal
package coordinator

import "barf/internal/op"

Expand Down
46 changes: 46 additions & 0 deletions internal/coordinator/init.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package coordinator

import (
"errors"

"barf/internal/com/server"
"barf/internal/journal"
"barf/internal/op"
)

// Initialize reads active entries from disk
func Initialize() error {
if registerdStartHandler == nil {
return errors.New("No start handler registered")
}

entryList, err := journal.Initialize()

if err != nil {
return err
}

entriesMu.Lock()
entries = entryList
entriesMu.Unlock()

server.OnOperationCreate(create)
server.OnOperationAbort(abort)
server.OnOperationStatus(status)
server.OnListOperations(list)

for _, e := range getEntries() {
err = registerdStartHandler(e.Operation)

if err != nil {
UpdateOperationStatus(e.Operation.ID, &op.OperationStatus{
Finished: true,
ExitCode: 255,
Message: err.Error(),
})
return err
}
}

return nil
}
27 changes: 27 additions & 0 deletions internal/coordinator/log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package coordinator

import (
"barf/internal/op"
)

// WriteOperationStdout writes to the operations stdout log
func WriteOperationStdout(operationID op.OperationID, line string) {
e := getEntry(operationID)

if e == nil {
return // Just drop
}

e.WriteStdout(line)
}

// WriteOperationStderr writes to the operations stderr log
func WriteOperationStderr(operationID op.OperationID, line string) {
e := getEntry(operationID)

if e == nil {
return // Just drop
}

e.WriteStderr(line)
}
20 changes: 8 additions & 12 deletions internal/journal/update.go → internal/coordinator/update.go
Original file line number Diff line number Diff line change
@@ -1,32 +1,28 @@
package journal
package coordinator

import (
"errors"

"barf/internal/com/server"
"barf/internal/op"
)

// UpdateOperationStatus sets the operation status
func UpdateOperationStatus(operationID op.OperationID, status *op.OperationStatus) error {
e, err := getEntry(operationID)
e := getEntry(operationID)

if err != nil {
return err
if e == nil {
return errors.New("Could not find any entry for operation with id " + string(operationID) + " to report status on!")
}

op.UpdateStatus(e.Status, status)

err = writeEntry(e)
err := e.UpdateStatus(status)

if err != nil {
return err
}

if e.Status.Finished {
err = removeEntry(e)

if err != nil {
return err
}
removeEntry(e)
}

return server.OperationStatus(operationID, e.Status)
Expand Down
Loading

0 comments on commit 2a916bc

Please sign in to comment.