Skip to content

Commit

Permalink
part3 first works version
Browse files Browse the repository at this point in the history
  • Loading branch information
adyzng committed Dec 26, 2017
1 parent 357989e commit 54eb860
Show file tree
Hide file tree
Showing 9 changed files with 113 additions and 45 deletions.
6 changes: 3 additions & 3 deletions app.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,10 @@ func (app *DukaApp) Execute() error {
var wg sync.WaitGroup
for _, output := range app.outputs {
wg.Add(1)
go func() {
go func(o core.Converter) {
defer wg.Done()
output.Finish()
}()
o.Finish()
}(output)
}

wg.Wait()
Expand Down
4 changes: 2 additions & 2 deletions convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/adyzng/go-duka/csv"
"github.com/adyzng/go-duka/fxt4"
"github.com/adyzng/go-duka/hst"
"github.com/go-clog/clog"
)

var (
Expand Down Expand Up @@ -109,6 +108,7 @@ func (tf *Timeframe) PackTicks(barTimestamp uint32, ticks []*core.TickData) erro

// Finish wait convert finish
func (tf *Timeframe) Finish() error {
close(tf.chTicks)
<-tf.close
return tf.out.Finish()
}
Expand All @@ -119,7 +119,7 @@ func (tf *Timeframe) worker() error {
barTicks := make([]*core.TickData, 0, maxCap)

defer func() {
clog.Info("%s %s convert completed.", tf.symbol, tf.period)
log.Info("%s %s convert completed.", tf.symbol, tf.period)
close(tf.close)
}()

Expand Down
10 changes: 8 additions & 2 deletions core/tick.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@ type TickData struct {
VolumeBid float64 // 单位:MIO(百万)
}

// UTC convert timestamp to UTC time
//
func (t *TickData) UTC() time.Time {
tm := time.Unix(t.Timestamp/1000, (t.Timestamp%1000)*int64(time.Millisecond))
return tm.UTC()
}

// BarData means tick data within one Bar
//
type BarData struct {
Expand All @@ -31,9 +38,8 @@ type BarData struct {
// ToString used to format into csv row
//
func (t *TickData) ToString() []string {
tm := time.Unix(t.Timestamp/1000, (t.Timestamp%1000)*int64(time.Millisecond))
return []string{
tm.Format("2006-01-02 15:04:05.000"),
t.UTC().Format("2006-01-02 15:04:05.000"),
fmt.Sprintf("%.5f", t.Ask),
fmt.Sprintf("%.5f", t.Bid),
fmt.Sprintf("%.2f", t.VolumeAsk),
Expand Down
29 changes: 15 additions & 14 deletions csv/csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,21 @@ import (
)

var (
ext = "csv"
ext = "CSV"
log = misc.NewLogger("CSV", 3)
csvHeader = []string{"time", "ask", "bid", "ask_volume", "bid_volume"}
)

// CsvDump save csv format
type CsvDump struct {
day time.Time
end time.Time
dest string
symbol string
header bool
close chan struct{}
chTicks chan *core.TickData
day time.Time
end time.Time
dest string
symbol string
header bool
tickCount int64
chClose chan struct{}
chTicks chan *core.TickData
}

// New Csv file
Expand All @@ -36,7 +37,7 @@ func New(start, end time.Time, header bool, symbol, dest string) *CsvDump {
dest: dest,
symbol: symbol,
header: header,
close: make(chan struct{}, 1),
chClose: make(chan struct{}, 1),
chTicks: make(chan *core.TickData, 1024),
}

Expand All @@ -47,7 +48,8 @@ func New(start, end time.Time, header bool, symbol, dest string) *CsvDump {
// Finish complete csv file writing
//
func (c *CsvDump) Finish() error {
<-c.close
close(c.chTicks)
<-c.chClose
return nil
}

Expand All @@ -57,6 +59,7 @@ func (c *CsvDump) PackTicks(barTimestamp uint32, ticks []*core.TickData) error {
for _, tick := range ticks {
select {
case c.chTicks <- tick:
c.tickCount++
break
}
}
Expand All @@ -81,10 +84,10 @@ func (c *CsvDump) worker() error {

defer func() {
f.Close()
close(c.close)
close(c.chClose)
log.Info("Saved Ticks: %d.", c.tickCount)
}()

var tickCount int64
csv := csv.NewWriter(f)
defer csv.Flush()

Expand All @@ -99,9 +102,7 @@ func (c *CsvDump) worker() error {
log.Error("Write csv %s failed: %v.", fpath, err)
break
}
tickCount++
}

log.Trace("Saved %s with %d ticks.", fpath, tickCount)
return err
}
18 changes: 18 additions & 0 deletions csv/csv_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package csv

import (
"testing"
"time"
)

func TestCloseChan(t *testing.T) {
chClose := make(chan struct{}, 1)
go func() {
defer close(chClose)
time.Sleep(2 * time.Second)
t.Logf("Close chan.\n")
}()

<-chClose
t.Logf("Receive close channel.\n")
}
4 changes: 2 additions & 2 deletions duka.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ func main() {
fmt.Printf(" Timeframe: %d\n", opt.Timeframe)
fmt.Printf(" Format: %s\n", opt.Format)
fmt.Printf(" CsvHeader: %t\n", opt.CsvHeader)
fmt.Printf(" StartDate: %s\n", opt.Start.Format("2006-01-02"))
fmt.Printf(" EndDate: %s\n", opt.End.Format("2006-01-02"))
fmt.Printf(" StartDate: %s\n", opt.Start.Format("2006-01-02:15H"))
fmt.Printf(" EndDate: %s\n", opt.End.Format("2006-01-02:15H"))

defer clog.Shutdown()
app := NewApp(opt)
Expand Down
43 changes: 43 additions & 0 deletions duka_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package main

import (
"fmt"
"testing"

clog "gopkg.in/clog.v1"
)

func TestDukaApp(t *testing.T) {
args := argsList{
Verbose: true,
Header: true,
Spread: 20,
Model: 0,
Symbol: "EURUSD",
Output: "g:\\00",
Format: "csv",
Period: "M1",
Start: "2017-01-01",
End: "2017-01-03",
}

opt, err := ParseOption(args)
if err != nil {
fmt.Println(err)
return
}

fmt.Printf(" Output: %s\n", opt.Folder)
fmt.Printf(" Symbol: %s\n", opt.Symbol)
fmt.Printf(" Spread: %d\n", opt.Spread)
fmt.Printf(" Mode: %d\n", opt.Mode)
fmt.Printf(" Timeframe: %d\n", opt.Timeframe)
fmt.Printf(" Format: %s\n", opt.Format)
fmt.Printf(" CsvHeader: %t\n", opt.CsvHeader)
fmt.Printf(" StartDate: %s\n", opt.Start.Format("2006-01-02:15H"))
fmt.Printf(" EndDate: %s\n", opt.End.Format("2006-01-02:15H"))

defer clog.Shutdown()
app := NewApp(opt)
app.Execute()
}
26 changes: 15 additions & 11 deletions fxt4/fxt.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"
"os"
"path/filepath"
"sync"

"github.com/adyzng/go-duka/core"
"github.com/adyzng/go-duka/misc"
Expand Down Expand Up @@ -49,15 +48,16 @@ func convertToFxtTick(tick *core.TickData) *fxtTick {
// M - model number (0,1 or 2)
//
type FxtFile struct {
wg sync.WaitGroup
header *FXTHeader
chTicks chan *fxtTick
fpath string
header *FXTHeader
firstUniBar *fxtTick
lastUniBar *fxtTick
deltaTimestamp uint32
endTimestamp uint32
barCount int32
tickCount int64
chTicks chan *fxtTick
chClose chan struct{}
}

// NewFxtFile create an new fxt file instance
Expand All @@ -67,16 +67,19 @@ func NewFxtFile(timeframe, spread, model uint32, dest, symbol string) *FxtFile {
header: NewHeader(405, symbol, timeframe, spread, model),
fpath: filepath.Join(dest, fn),
chTicks: make(chan *fxtTick, 1024),
chClose: make(chan struct{}, 1),
deltaTimestamp: timeframe * 60,
}

fxt.wg.Add(1)
go fxt.worker()
return fxt
}

func (f *FxtFile) worker() error {
defer f.wg.Done()
defer func() {
close(f.chClose)
log.Info("Saved Bar: %d, Ticks: %d.", f.barCount, f.tickCount)
}()

fxt, err := os.OpenFile(f.fpath, os.O_CREATE|os.O_TRUNC, 666)
if err != nil {
Expand Down Expand Up @@ -115,8 +118,6 @@ func (f *FxtFile) worker() error {
break
}
}

log.Trace("Total %u ticks write.", f.barCount)
return err
}

Expand All @@ -131,9 +132,12 @@ func (f *FxtFile) PackTicks(barTimestemp uint32, ticks []*core.TickData) error {
Close: tick.Bid,
Volume: uint64(tick.VolumeBid),
}
//f.chTicks <- convertToFxtTick(tick)
f.tickCount++
}
if f.endTimestamp != barTimestemp {
f.barCount++
f.endTimestamp = barTimestemp
}
f.barCount++
return nil
}

Expand Down Expand Up @@ -197,6 +201,6 @@ func (f *FxtFile) adjustHeader() error {

func (f *FxtFile) Finish() error {
close(f.chTicks)
f.wg.Wait()
<-f.chClose
return f.adjustHeader()
}
18 changes: 7 additions & 11 deletions hst/hst401.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"math"
"os"
"path/filepath"
"sync"

"github.com/adyzng/go-duka/core"
"github.com/adyzng/go-duka/misc"
Expand All @@ -18,14 +17,14 @@ var (
// HST401 MT4 history data format .hst with version 401
//
type HST401 struct {
wg sync.WaitGroup
header *Header
dest string
symbol string
spread uint32
timefame uint32
barCount int64
chBars chan *BarData
close chan struct{}
chClose chan struct{}
}

// NewHST create a HST convertor
Expand All @@ -38,12 +37,10 @@ func NewHST(timefame, spread uint32, symbol, dest string) *HST401 {
spread: spread,
timefame: timefame,
chBars: make(chan *BarData, 128),
close: make(chan struct{}, 1),
chClose: make(chan struct{}, 1),
}

hst.wg.Add(1)
go hst.worker()

return hst
}

Expand All @@ -61,8 +58,8 @@ func (h *HST401) worker() error {

defer func() {
f.Close()
h.wg.Done()
log.Trace("Saved : %s.", fpath)
close(h.chClose)
log.Info("Saved Bar: %d.", h.barCount)
}()

// write HST header
Expand Down Expand Up @@ -119,6 +116,7 @@ func (h *HST401) PackTicks(barTimestamp uint32, ticks []*core.TickData) error {

select {
case h.chBars <- bar:
h.barCount++
break
//case <-h.close:
// break
Expand All @@ -129,9 +127,7 @@ func (h *HST401) PackTicks(barTimestamp uint32, ticks []*core.TickData) error {
// Finish HST file convert
//
func (h *HST401) Finish() error {
//close(h.close)
close(h.chBars)
h.wg.Wait()
close(h.close)
<-h.chClose
return nil
}

0 comments on commit 54eb860

Please sign in to comment.