Skip to content

Commit

Permalink
part2 code complete
Browse files Browse the repository at this point in the history
  • Loading branch information
adyzng committed Dec 25, 2017
1 parent ed9de1b commit 357989e
Show file tree
Hide file tree
Showing 8 changed files with 332 additions and 148 deletions.
174 changes: 120 additions & 54 deletions app.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,26 @@ import (
"fmt"
"os"
"path/filepath"
"sort"
"strings"
"sync"
"time"

"github.com/adyzng/go-duka/bi5"
"github.com/adyzng/go-duka/core"
"github.com/adyzng/go-duka/csv"
"github.com/adyzng/go-duka/misc"
)

var (
log = misc.NewLogger("App", 2)
log = misc.NewLogger("App", 2)
supportsFormats = []string{"csv", "fxt", "hst"}
)

// DukaApp used to download source tick data
//
type DukaApp struct {
Option AppOption
Output core.Convertor
option AppOption
outputs []core.Converter
}

// AppOption download options
Expand All @@ -38,7 +39,6 @@ type AppOption struct {
Mode uint32
Timeframe uint32
Convert bool
CsvDump bool
CsvHeader bool
}

Expand All @@ -59,6 +59,19 @@ func ParseOption(args argsList) (*AppOption, error) {
err = fmt.Errorf("Invalid symbol parameter")
return nil, err
}
{
bSupport, format := false, strings.ToLower(args.Format)
for _, sformat := range supportsFormats {
if format == sformat {
bSupport = true
break
}
}
if !bSupport {
err = fmt.Errorf("not supported output format")
return nil, err
}
}
if opt.Start, err = time.ParseInLocation("2006-01-02", args.Start, time.UTC); err != nil {
err = fmt.Errorf("invalid start parameter")
return nil, err
Expand All @@ -71,8 +84,7 @@ func ParseOption(args argsList) (*AppOption, error) {
err = fmt.Errorf("invalid end parameter which shouldn't early then start")
return nil, err
}

if opt.Folder, err = filepath.Abs(args.Folder); err != nil {
if opt.Folder, err = filepath.Abs(args.Output); err != nil {
err = fmt.Errorf("invalid destination folder")
return nil, err
}
Expand All @@ -92,22 +104,45 @@ func ParseOption(args argsList) (*AppOption, error) {

opt.Symbol = strings.ToUpper(args.Symbol)
opt.CsvHeader = args.Header
opt.CsvDump = true
opt.Format = args.Format
opt.Spread = uint32(args.Spread)
opt.Mode = uint32(args.Model)

return &opt, nil
}

// NewApp create an application instance by input arguments
//
func NewApp(opt *AppOption) *DukaApp {
return &DukaApp{
option: *opt,
outputs: NewOutputs(opt),
}
}

// Execute download source bi5 tick data from dukascopy
//
func (duka *DukaApp) Execute() error {
var err error
startTime := time.Now()
func (app *DukaApp) Execute() error {
var (
err error
opt = app.option
startTime = time.Now()
)

//
// 创建输出目录
//
if _, err := os.Stat(opt.Folder); os.IsNotExist(err) {
if err = os.MkdirAll(opt.Folder, 666); err != nil {
log.Error("Create folder (%s) failed: %v.", opt.Folder, err)
return err
}
}

//
// 按天下载,每天24小时的数据由24个goroutine并行下载
//
for day := duka.Option.Start; day.Unix() < duka.Option.End.Unix(); day = day.Add(24 * time.Hour) {
for day := opt.Start; day.Unix() < opt.End.Unix(); day = day.Add(24 * time.Hour) {
//
// 周六没数据,跳过
//
Expand All @@ -118,19 +153,36 @@ func (duka *DukaApp) Execute() error {
//
// 下载,解析,存储
//
if err = duka.saveRaw(day, duka.fetchDay(day)); err != nil {
if err = app.saveData(day, app.fetchDay(day)); err != nil {
break
}
log.Info("Finished %s %s.", duka.Option.Symbol, day.Format("2006-01-02"))

log.Info("Finished %s %s.", opt.Symbol, day.Format("2006-01-02"))
}

//
// flush all output file
//
var wg sync.WaitGroup
for _, output := range app.outputs {
wg.Add(1)
go func() {
defer wg.Done()
output.Finish()
}()
}

wg.Wait()
log.Info("Time cost: %v.", time.Since(startTime))
return err
}

func (duka *DukaApp) fetchDay(day time.Time) <-chan *hReader {
// fetchDay 现在一天24小时的tick数据,24个goroutine并行下载,返回数据并不一定按时间顺序排序
// 转换端需要按天对tick数据排序。
//
func (app *DukaApp) fetchDay(day time.Time) <-chan *hReader {
ch := make(chan *hReader, 24)
opt := &duka.Option
opt := app.option

go func() {
defer close(ch)
Expand Down Expand Up @@ -170,63 +222,77 @@ func (duka *DukaApp) fetchDay(day time.Time) <-chan *hReader {
}

wg.Wait()
log.Trace("%s:%s download complete.", duka.Option.Symbol, day.Format("2006-01-02"))
log.Trace("%s %s download complete.", opt.Symbol, day.Format("2006-01-02"))
}()

return ch
}

func (duka *DukaApp) saveRaw(day time.Time, chData <-chan *hReader) error {
// sortAndOutput 按时间戳,从前到后排序当天tick数据
//
func (app *DukaApp) sortAndOutput(day time.Time, ticks []*core.TickData) error {
if len(ticks) == 0 {
return nil
}

// sort
sort.Slice(ticks, func(i, j int) bool {
return ticks[i].Timestamp < ticks[j].Timestamp
})

// 输出到文件
for _, out := range app.outputs {
timestamp := uint32(day.Unix())
out.PackTicks(timestamp, ticks[:])
}

firstTick := ticks[0].Timestamp
tm := time.Unix(firstTick/1000, 0).UTC()
log.Trace("%s sort and output day %v.", app.option.Format, tm)
return nil
}

// saveData
func (app *DukaApp) saveData(day time.Time, chData <-chan *hReader) error {
var (
err error
dest string
csvFile *csv.CsvDump
opt = &duka.Option
err error
opt = app.option
)

for data := range chData {
if dest == "" {
y, m, d := day.Date()
subDir := fmt.Sprintf("%s/%04d/%02d/%02d", opt.Symbol, y, m, d)

dest = filepath.Join(opt.Folder, subDir)
if err = os.MkdirAll(dest, 666); err != nil {
log.Error("Create folder (%s) failed: %v.", dest, err)
return err
}

if opt.CsvDump {
csvFile = csv.New(day, opt.Symbol, dest, opt.CsvHeader)
defer csvFile.Finish()
}
}
nDay := -1
dayTicks := make([]*core.TickData, 0, 2048)

for data := range chData {
// save bi5 by hour
bi5File := data.Bi5
var ticks []*core.TickData

// decode bi5
if ticks, err := bi5File.Decode(data.Data[:]); err != nil {
// 解析 bi5 成 TickData 数据
if ticks, err = bi5File.Decode(data.Data[:]); err != nil {
log.Error("Decode bi5 %s: %s failed: %v.", opt.Symbol, data.DayH.Format("2006-01-02:15H"), err)
continue
} else {
if opt.CsvDump && len(ticks) > 0 {
csvFile.PackTicks(0, ticks[:])
}
}

if !opt.Convert {
// save bi5 source data
if err := bi5File.Save(data.Data[:]); err != nil {
log.Error("Save Bi5 %s: %s failed: %v.", opt.Symbol, data.DayH.Format("2006-01-02:15H"), err)
continue
}
// 保留 bi5 数据
if err := bi5File.Save(data.Data[:]); err != nil {
log.Error("Save Bi5 %s: %s failed: %v.", opt.Symbol, data.DayH.Format("2006-01-02:15H"), err)
continue
}

// 新的一天开始
if nDay != day.Day() {
app.sortAndOutput(day, dayTicks[:])
dayTicks = dayTicks[:0]
nDay = day.Day()
}

dayTicks = append(dayTicks, ticks...)
}

if err != nil {
log.Warn("%s:%s partial complete.", opt.Symbol, day.Format("2006-01-02"))
} else {
log.Trace("%s:%s complete.", opt.Symbol, day.Format("2006-01-02"))
if len(dayTicks) > 0 {
app.sortAndOutput(day, dayTicks[:])
}

log.Trace("%s %s convert complete.", opt.Symbol, day.Format("2006-01-02"))
return err
}
4 changes: 3 additions & 1 deletion bi5/bi5.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type Bi5 struct {
dayH time.Time
symbol string
dest string
save bool
}

// New create an bi5 saver
Expand Down Expand Up @@ -79,7 +80,7 @@ func (b *Bi5) Decode(data []byte) ([]*core.TickData, error) {
// Save bi5 data to file
//
func (b *Bi5) Save(data []byte) error {
if len(data) == 0 {
if len(data) == 0 || !b.save {
return nil
}

Expand Down Expand Up @@ -150,6 +151,7 @@ func (b *Bi5) Download() ([]byte, error) {

if len(data) > 0 {
log.Trace("Downloaded %s: %s.", b.symbol, b.dayH.Format("2006-01-02:15H"))
b.save = true
return data, err
}

Expand Down
Loading

0 comments on commit 357989e

Please sign in to comment.