Skip to content

Commit

Permalink
Initial Version
Browse files Browse the repository at this point in the history
  • Loading branch information
adyzng committed Dec 19, 2017
0 parents commit 24e20d9
Show file tree
Hide file tree
Showing 10 changed files with 647 additions and 0 deletions.
20 changes: 20 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
*.db
*.log
symbols.json
log/
custom/
data/
testdata/
.vscode/
.vendor/
.idea/
*.cookies
*.png
*.iml
*.exe
*.exe~
*.pem
*.bak
output*
debug/
test/
115 changes: 115 additions & 0 deletions app.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package main

import (
"bytes"
"fmt"
"os"
"path/filepath"
"sync"
"time"

"github.com/adyzng/duka/bi5"
"github.com/adyzng/duka/csv"
"github.com/adyzng/duka/download"
"github.com/adyzng/duka/misc"
)

var (
log = misc.NewLogger("App", 2)
)

type DukaOption struct {
Start time.Time
End time.Time
Symbol string
Format string
Destination string
}

func App(opt DukaOption) {
type hReader struct {
Hour int
Day time.Time
Data []byte
}

// dukascopy downloader
duka := download.NewDukaDownloader()
startTime := time.Now()

for day := opt.Start; day.Unix() < opt.End.Unix(); day = day.Add(24 * time.Hour) {
y, m, d := day.Date()
chClose := make(chan struct{}, 1)
chReaders := make(chan *hReader, 24)

if day.Weekday() == time.Saturday {
log.Trace("Skip Saturday %s.", day.Format("2006-01-02"))
continue
}

go func() {
var wg sync.WaitGroup
for h := 0; h < 24; h++ {
wg.Add(1)
go func(hour int) {
defer wg.Done()
URL := fmt.Sprintf(download.DukaTmplURL, opt.Symbol, y, m, d, hour)
if data, err := duka.Download(URL); err == nil {
chReaders <- &hReader{
Data: data,
Hour: hour,
Day: day.Add(time.Duration(hour) * time.Hour),
}
} else {
log.Error("Duka download %s failed.", URL)
}
}(h)
}

wg.Wait()
close(chReaders)
log.Info("%s:%s download complete.", opt.Symbol, day.Format("2006-01-02"))
}()

go func() {
defer close(chClose)

subDir := fmt.Sprintf("%s/%04d/%02d/%02d", opt.Symbol, y, m, d)
dest := filepath.Join(opt.Destination, subDir)

if err := os.MkdirAll(dest, 666); err != nil {
log.Error("Create folder (%s/%s) failed: %v.", opt.Destination, dest, err)
return
}

// save csv by day
csvFile := csv.New(day, opt.Symbol, dest)

for chr := range chReaders {
// save bi5 by hour
bi5File := bi5.New(chr.Day, chr.Hour, opt.Symbol, dest)

if ticks, err := bi5File.Decode(bytes.NewBuffer(chr.Data[:])); err != nil {
log.Error("Decode bi5 %s:%s failed: %v.", opt.Symbol, chr.Day.Format("2006-01-02:15H"))
continue
} else {
csvFile.AddTicks(ticks)
}

if err := bi5File.Save(bytes.NewBuffer(chr.Data[:])); err != nil {
log.Error("Save Bi5 %s:%s failed: %v.", opt.Symbol, chr.Day.Format("2006-01-02:15H"))
continue
}
}

if err := csvFile.Save(nil); err != nil {
log.Error("Save CSV %s:%s failed: %v.", opt.Symbol, day.Format("2006-01-02"), err)
}
}()

<-chClose
log.Info("%s:%s decode complete.", opt.Symbol, day.Format("2006-01-02"))
}

log.Info("Completed. Time Cost: %v.", time.Since(startTime))
}
93 changes: 93 additions & 0 deletions bi5/bi5.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package bi5

import (
"fmt"
"io"
"os"
"path/filepath"
"time"

"github.com/adyzng/duka/misc"
"github.com/adyzng/duka/parse"
"github.com/kjk/lzma"
)

var (
ext = "bi5"
log = misc.NewLogger("Bi5", 3)
)

type Bi5 struct {
day time.Time
hour int
dest string
symbol string
}

// New create an bi5 saver
func New(day time.Time, hour int, symbol, dest string) *Bi5 {
return &Bi5{
day: day,
hour: hour,
dest: dest,
symbol: symbol,
}
}

func (b *Bi5) Decode(r io.Reader) ([]*parse.TickData, error) {
dec := lzma.NewReader(r)
defer dec.Close()

ticksArr := make([]*parse.TickData, 0)
bytesArr := make([]byte, parse.TICK_BYTES)

for {
n, err := dec.Read(bytesArr[:])
if err == io.EOF {
err = nil
break
}

if n != parse.TICK_BYTES || err != nil {
log.Error("LZMA decode failed: %d: %v.", n, err)
break
}

t, err := parse.DecodeTickData(bytesArr[:], b.symbol)
if err != nil {
log.Error("Decode tick data failed: %v.", err)
break
}

t.Time += time.Duration(b.hour) * time.Hour
ticksArr = append(ticksArr, t)
}

return ticksArr, nil
}

func (b *Bi5) Save(r io.Reader) error {
subpath := fmt.Sprintf("%02dh.%s", b.hour, ext)
fpath := filepath.Join(b.dest, subpath)

f, err := os.OpenFile(fpath, os.O_CREATE|os.O_TRUNC|os.O_RDWR, 666)
if err != nil {
log.Error("Create file %s failed: %v.", fpath, err)
return err
}

var len int64
if len, err = io.Copy(f, r); err == nil {
if len > 0 {
log.Trace("Saved file %s => %d.", fpath, len)
}
} else {
log.Error("Write file %s failed: %v.", fpath, err)
}

f.Close()
if len == 0 {
os.Remove(fpath)
}
return err
}
71 changes: 71 additions & 0 deletions csv/csv.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package csv

import (
"encoding/csv"
"fmt"
"io"
"os"
"path/filepath"
"sort"
"time"

"github.com/adyzng/duka/misc"
"github.com/adyzng/duka/parse"
)

var (
ext = "csv"
log = misc.NewLogger("CSV", 3)
)

// CsvDump save csv format
type CsvDump struct {
day time.Time
dest string
symbol string
ticks []*parse.TickData
}

func New(day time.Time, symbol, dest string) *CsvDump {
return &CsvDump{
day: day,
dest: dest,
symbol: symbol,
}
}

func (c *CsvDump) Save(r io.Reader) error {
subpath := fmt.Sprintf("%s-%s.%s", c.symbol, c.day.Format("2006-01-02"), ext)
fpath := filepath.Join(c.dest, subpath)

f, err := os.OpenFile(fpath, os.O_CREATE|os.O_TRUNC|os.O_RDWR, 666)
if err != nil {
log.Error("Create file %s failed: %v.", fpath, err)
return err
}
defer f.Close()

csv := csv.NewWriter(f)
defer csv.Flush()

// sort by time
sort.Slice(c.ticks, func(i, j int) bool {
return c.ticks[i].Time < c.ticks[j].Time
})

for _, tick := range c.ticks {
if err := csv.Write(tick.ToString(c.day)); err != nil {
log.Error("Write CSV %s failed: %v.", fpath, err)
return err
}
}

log.Trace("Saved file %s with %d ticks.", fpath, len(c.ticks))
return nil
}

func (c *CsvDump) AddTicks(ticks []*parse.TickData) {
if len(ticks) > 0 {
c.ticks = append(c.ticks, ticks...)
}
}
6 changes: 6 additions & 0 deletions download/downloader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package download

// Downloader interface...
type Downloader interface {
Download(URL string) ([]byte, error)
}
55 changes: 55 additions & 0 deletions download/dukascopy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package download

import (
"fmt"
"io/ioutil"
"net/http"
"time"

"github.com/adyzng/duka/misc"
)

const (
// "https://datafeed.dukascopy.com/datafeed/{currency}/{year}/{month:02d}/{day:02d}/{hour:02d}h_ticks.bi5"
DukaTmplURL = "https://datafeed.dukascopy.com/datafeed/%s/%04d/%02d/%02d/%02dh_ticks.bi5"
retryTimes = 5
)

var (
log = misc.NewLogger("Duka", 2)
)

type HTTPDownload struct {
client *http.Client
}

func NewDukaDownloader() Downloader {
return &HTTPDownload{
client: &http.Client{
Timeout: 5 * time.Minute,
},
}
}

func (h *HTTPDownload) Download(URL string) ([]byte, error) {
var err error
for retry := 0; retry < retryTimes; retry++ {
var resp *http.Response
resp, err = h.client.Get(URL)
if err != nil {
log.Error("[%d] Download %s failed: %v.", retry, URL, err)
continue
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
log.Warn("[%d] Download %s response %d:%s.", retry, URL, resp.StatusCode, resp.Status)
err = fmt.Errorf("http response %d:%s", resp.StatusCode, resp.Status)
continue
}

data, err := ioutil.ReadAll(resp.Body)
return data, err
}
return nil, err
}
Loading

0 comments on commit 24e20d9

Please sign in to comment.