Skip to content

Commit

Permalink
Add flag for earliest file to transfer
Browse files Browse the repository at this point in the history
  • Loading branch information
ctberthiaume committed Dec 7, 2019
1 parent e1b7edb commit 91da820
Show file tree
Hide file tree
Showing 5 changed files with 334 additions and 67 deletions.
9 changes: 9 additions & 0 deletions build.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#!/bin/bash
# Build seaflow-transfer command-line tool for 64-bit MacOS and Linux

[[ -d seaflow-transfer.darwin-amd64 ]] && rm -rf seaflow-transfer.darwin-amd64
[[ -d seaflow-transfer.linux-amd64 ]] && rm -rf seaflow-transfer.linux-amd64
GOOS=darwin GOARCH=amd64 go build -o seaflow-transfer.darwin-amd64/seaflow-transfer cmd/seaflow-transfer/main.go || exit 1
GOOS=linux GOARCH=amd64 go build -o seaflow-transfer.linux-amd64/seaflow-transfer cmd/seaflow-transfer/main.go || exit 1
zip -q -r seaflow-transfer.darwin-amd64.zip seaflow-transfer.darwin-amd64 || exit 1
zip -q -r seaflow-transfer.linux-amd64.zip seaflow-transfer.linux-amd64 || exit 1
25 changes: 20 additions & 5 deletions cmd/seaflow-transfer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@ import (
"log"
"os"
"syscall"
"time"

"github.com/armbrustlab/seaflow-transfer/internal/fs"
"golang.org/x/crypto/ssh/terminal"
)

const versionStr string = "v0.1.0"
const versionStr string = "v0.2.0"

var (
srcRoot string // SRCROOT
Expand All @@ -24,9 +25,10 @@ var (
sshPassword string // SSHPASSWORD
sshPublicKey string // SSHPUBLICKEY
quiet bool // QUIET
start string // START
version bool // VERSION

)
var t0 time.Time
var cmdname string = "seaflow-transfer"

func init() {
Expand All @@ -44,6 +46,13 @@ func init() {
}
sshPassword = string(b)
}
if start != "" {
var err error
t0, err = time.Parse(time.RFC3339, start)
if err != nil {
log.Fatalf("could not parse -start RFC3339 timestamp: %v", err)
}
}
}

func initFlags() {
Expand All @@ -56,6 +65,7 @@ func initFlags() {
flagset.StringVar(&sshUser, "sshUser", "", "SSH user name")
flagset.StringVar(&sshPublicKey, "sshPublicKey", "", "SSH public key file, overrides SSHPASSWORD")
flagset.BoolVar(&quiet, "quiet", false, "Suppress informational logging")
flagset.StringVar(&start, "start", "", "Earliest file timestamp to transfer as an RFC3339 string")
flagset.BoolVar(&version, "version", false, "Display version and exit")

flagset.Usage = func() {
Expand Down Expand Up @@ -113,6 +123,10 @@ func initEnvVars() {
if ok && val == "1" {
quiet = true
}
val, ok = os.LookupEnv("START")
if ok {
start = val
}
val, ok = os.LookupEnv("VERSION")
if ok && val == "1" {
version = true
Expand All @@ -126,9 +140,10 @@ func main() {
}

t := &fs.Transfer{
Srcroot: srcRoot,
Dstroot: dstRoot,
Info: logger,
Srcroot: srcRoot,
Dstroot: dstRoot,
Info: logger,
Earliest: t0,
}
var err error
if srcAddress != "" {
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ github.com/pkg/sftp v1.10.1 h1:VasscCm72135zRysgrJDKsntdmPN+OuU3+nnHYA9wyc=
github.com/pkg/sftp v1.10.1/go.mod h1:lYOWFsE0bwd1+KfKJaKeuokY15vzFx25BLbzYYoAxZI=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
Expand All @@ -20,6 +21,7 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5h
golang.org/x/sys v0.0.0-20190412213103-97732733099d h1:+R4KGOnez64A81RvjARKc4UT5/tI9ujCIVX+P5KiHuI=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
151 changes: 102 additions & 49 deletions internal/fs/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"math/rand"
"os"
"path/filepath"
"regexp"
"sort"
"time"

Expand Down Expand Up @@ -116,12 +117,14 @@ func (l Localfs) rename(oldname, newname string) error {
// Transfer provides methods to copy SeaFlow data from a source to a destination
// location
type Transfer struct {
Srcfs Fs
Srcroot string
Dstfs Fs
Dstroot string
Info *log.Logger
rand *rand.Rand // for temp file names
Srcfs Fs
Srcroot string
Dstfs Fs
Dstroot string
Info *log.Logger
rand *rand.Rand // for temp file names
Earliest time.Time // earliest file time to transfer
//Latest time.Time // latest file time to transfer
}

// CopySFLFiles copies SFL files from source to destination. Files are
Expand All @@ -135,7 +138,16 @@ func (t *Transfer) CopySFLFiles() error {
}
t.Info.Printf("found %v source SFL files\n", len(srcFiles))
for _, path := range srcFiles {
err := t.CopyFile(path, false)
if !t.Earliest.IsZero() {
filetime, err := timeFromFilename(path)
if err == nil && filetime.Before(t.Earliest) {
t.Info.Printf("skipping %v: %v < %v\n", path, filetime, t.Earliest)
continue
}
// otherwise default to transferring files that don't have parseable
// timestamps or are not before t.Earliest
}
err = t.CopyFile(path, false)
if err != nil {
return fmt.Errorf("error while copying %v: %v", path, err)
}
Expand All @@ -158,55 +170,83 @@ func (t *Transfer) CopyEVTFiles() error {
panic(err)
}
t.Info.Printf("found %v source EVT files\n", len(srcFiles))
if len(srcFiles) > 1 {
// Copy all but the latest EVT file since it's most likely currently
// being appended to. It's possible to identify the latest file here as
// the last in the array after a lexicographical sort, which sorts
// timestamped SeaFlow EVT files chronologically.
sort.Strings(srcFiles)
srcFiles = srcFiles[:len(srcFiles)-1]
dstPattern := filepath.Join(t.Dstroot, "????_???", "????-??-??T??-??-??[\\-\\+]??-??")
dstFiles, err := t.Dstfs.glob(dstPattern)
if err != nil {
panic(err)
}
dstFilesgz, err := t.Dstfs.glob(dstPattern + ".gz")
if err != nil {
panic(err)

if len(srcFiles) <= 1 {
return nil
}

// Copy all but the latest EVT file since it's most likely currently
// being appended to. It's possible to identify the latest file here as
// the last in the array after a lexicographical sort, which sorts
// timestamped SeaFlow EVT files chronologically.
sort.Strings(srcFiles)
srcFiles = srcFiles[:len(srcFiles)-1]
dstPattern := filepath.Join(t.Dstroot, "????_???", "????-??-??T??-??-??[\\-\\+]??-??")
dstFiles, err := t.Dstfs.glob(dstPattern)
if err != nil {
panic(err)
}
dstFilesgz, err := t.Dstfs.glob(dstPattern + ".gz")
if err != nil {
panic(err)
}
dstFiles = append(dstFiles, dstFilesgz...)
// Skip EVT files already present in destination
present := make(map[string]bool)
for _, path := range dstFiles {
pathgz := path
if filepath.Ext(path) == ".gz" {
path = path[:len(path)-len(".gz")]
} else {
pathgz = pathgz + ".gz"
}
dstFiles = append(dstFiles, dstFilesgz...)
// Skip EVT files already present in destination
present := make(map[string]bool)
for _, path := range dstFiles {
pathgz := path
if filepath.Ext(path) == ".gz" {
path = path[:len(path)-len(".gz")]
} else {
pathgz = pathgz + ".gz"
}
_, name := filepath.Split(path)
present[name] = true
_, namegz := filepath.Split(pathgz)
present[namegz] = true
_, name := filepath.Split(path)
present[name] = true
_, namegz := filepath.Split(pathgz)
present[namegz] = true
}
dups := 0
nodups := make([]string, 0)
for _, path := range srcFiles {
_, name := filepath.Split(path)
if ok, _ := present[name]; !ok {
nodups = append(nodups, path)
} else {
dups++
}
files := make([]string, 0)
for _, path := range srcFiles {
_, name := filepath.Split(path)
if ok, _ := present[name]; !ok {
files = append(files, path)
}
// Skip EVT files that are before t.Earliest
early := 0
files := make([]string, 0)
for _, path := range nodups {
if !t.Earliest.IsZero() {
filetime, err := timeFromFilename(path)
if err == nil && filetime.Before(t.Earliest) {
t.Info.Printf("skipping %v: %v < %v\n", path, filetime, t.Earliest)
early++
continue
}
// otherwise default to transferring files that don't have parseable
// timestamps or are not before t.Earliest
}
files = append(files, path)
}

t.Info.Printf("skipped %v duplicates and the most recent EVT file\n", len(srcFiles)-len(files))
// Copy files
for _, path := range files {
err := t.CopyFile(path, true)
if err != nil {
return fmt.Errorf("error while copying %v: %v", path, err)
}
t.Info.Printf("copied %v\n", path)
t.Info.Printf("skipped %v duplicates\n", dups)
if !t.Earliest.IsZero() {
t.Info.Printf("skipped %v EVT files earlier than %v\n", early, t.Earliest)
}
t.Info.Printf("skipped the most recent EVT file\n")

// Copy files
for _, path := range files {
err := t.CopyFile(path, true)
if err != nil {
return fmt.Errorf("error while copying %v: %v", path, err)
}
t.Info.Printf("copied %v\n", path)
}

return nil
}

Expand Down Expand Up @@ -361,3 +401,16 @@ func newSftpClient(addr string, user string, pass string, publickey string) (cli
}
return client, nil
}

// timeFromFilename parses a SeaFlow timestamped filename. This function assumes
// all times are UTC, even if they have non-UTC timezone designator.
func timeFromFilename(fn string) (time.Time, error) {
fnbase := filepath.Base(fn)
re := regexp.MustCompile(`^(\d{4}-\d{2}-\d{2}T\d{2})-(\d{2})-(\d{2}(?:\.\d+)?)(?:.+)?$`)
subs := re.FindStringSubmatch(fnbase)
if len(subs) != 4 {
return time.Time{}, fmt.Errorf("file timtestamp could not be parsed for %v", fn)
}
ts := subs[1] + ":" + subs[2] + ":" + subs[3] + "Z"
return time.Parse(time.RFC3339, ts)
}
Loading

0 comments on commit 91da820

Please sign in to comment.