Skip to content

Commit

Permalink
* Feature: Create streaming API for tick data.
Browse files Browse the repository at this point in the history
  Currently the download is controlled by day instead of hour.
  Future enhancement can reduce the download to minimum hours
* Feature: Streaming API allows stop iterating at any point
  • Loading branch information
edward-yakop committed Jan 1, 2021
1 parent 857a9cb commit 315de21
Show file tree
Hide file tree
Showing 9 changed files with 369 additions and 163 deletions.
6 changes: 4 additions & 2 deletions api/tickdata/day.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import (
"time"
)

type Iterator func(ticks []*TickData, err error)
type DayIterator func(ticks []*TickData, err error) bool
type TickIterator func(tick *TickData, err error) bool

type Day interface {
Symbol() string
Time() time.Time
Each(it Iterator)
EachDay(it DayIterator)
EachTick(it TickIterator)
}
79 changes: 79 additions & 0 deletions api/tickdata/stream/stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package stream

import (
"ed-fx/go-duka/api/tickdata"
iTickdata "ed-fx/go-duka/internal/tickdata"
"time"
"unknwon.dev/clog/v2"
)

// time is in UTC
type Iterator func(time time.Time, tick *tickdata.TickData, err error) bool

type Stream struct {
symbol string
start time.Time
end time.Time
downloadFolderPath string
}

func (s Stream) Start() time.Time {
return s.start
}

func (s Stream) End() time.Time {
return s.end
}

func (s Stream) EachTick(it Iterator) {
start := s.start
loc := start.Location()
end := s.end.In(loc)

dEnd := downloadEnd(s.end)
var isContinue = true
for t := downloadStart(start); t.Before(dEnd) && isContinue; t = t.Add(24 * time.Hour) {
day, err := iTickdata.FetchDay(s.symbol, t, s.downloadFolderPath)
if err != nil && !it(t, nil, err) {
return
}
day.EachTick(func(tick *tickdata.TickData, err error) bool {
tickTime := tick.TimeInLocation(loc)
if (start.Equal(tickTime) || start.Before(tickTime)) &&
(end.Equal(tickTime) || end.After(tickTime)) {
isContinue = it(tickTime, tick, err)
}
return isContinue
})
}
}

func downloadStart(start time.Time) time.Time {
dStart := start.UTC()
dStart = time.Date(dStart.Year(), dStart.Month(), dStart.Day(), 0, 0, 0, 0, time.UTC)
return dStart
}

func downloadEnd(end time.Time) time.Time {
dEnd := end.UTC()
dEnd = time.Date(dEnd.Year(), dEnd.Month(), dEnd.Day(), 23, 59, 59, 0, time.UTC)
return dEnd
}

var isLogSetup = false

// time are in UTC
func NewStream(symbol string, start time.Time, end time.Time, downloadFolderPath string) *Stream {
if !isLogSetup {
clog.NewConsole(0, clog.ConsoleConfig{
Level: clog.LevelInfo,
})
}

return &Stream{
symbol: symbol,
start: start,
end: end,
downloadFolderPath: downloadFolderPath,
}
}
75 changes: 75 additions & 0 deletions api/tickdata/stream/stream_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package stream

import (
"ed-fx/go-duka/api/tickdata"
"github.com/stretchr/testify/assert"
"io/ioutil"
"os"
"testing"
"time"
_ "time/tzdata" // Ensure that custom timezone is included
)

func TestStream_EachTick_AlwaysContinue(t *testing.T) {
start := time.Date(2017, time.January, 10, 22, 0, 0, 0, time.UTC)
end := start.Add(1 * time.Hour)
stream := NewStream("GBPJPY", start, end, createEmptyDir(t))
isRun := false

stream.EachTick(func(time time.Time, tick *tickdata.TickData, err error) bool {
isRun = true
if !assert.NoError(t, err) {
t.FailNow()
}
assertTime(t, time, start, end)
assert.NotNil(t, tick)
return true
})
assert.True(t, isRun)
}

func assertTime(t *testing.T, time time.Time, start time.Time, end time.Time) {
if !assert.True(t, start.Before(time) || start.Equal(time)) {
t.FailNow()
}
if !assert.True(t, end.Equal(time) || end.After(time)) {
t.FailNow()
}
}

func createEmptyDir(t *testing.T) string {
dir, err := ioutil.TempDir(".", "test")
if !assert.NoError(t, err) {
t.FailNow()
}
t.Cleanup(func() {
os.RemoveAll(dir)
})
return dir
}

func TestStream_EachTick_OnlyContinueTwice(t *testing.T) {
location, err := time.LoadLocation("America/New_York")
if !assert.NoError(t, err) {
t.FailNow()
}

start := time.Date(2017, time.January, 10, 22, 0, 0, 0, location)
end := start.Add(4 * 24 * time.Hour)
stream := NewStream("GBPJPY", start, end, createEmptyDir(t))

isRun := false
tickCount := 0
stream.EachTick(func(time time.Time, tick *tickdata.TickData, err error) bool {
isRun = true
tickCount++
if !assert.NoError(t, err) {
t.FailNow()
}
assertTime(t, time, start, end)
assert.NotNil(t, tick)
return tickCount < 2
})
assert.True(t, isRun)
assert.Equal(t, 2, tickCount)
}
6 changes: 5 additions & 1 deletion api/tickdata/tickdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ type TickData struct {
const timeMillisecond = int64(time.Millisecond)

func (t TickData) UTC() time.Time {
return time.Unix(t.Timestamp/1000, (t.Timestamp%1000)*timeMillisecond).UTC()
return t.TimeInLocation(time.UTC)
}

func (t TickData) TimeInLocation(location *time.Location) time.Time {
return time.Unix(t.Timestamp/1000, (t.Timestamp%1000)*timeMillisecond).In(location)
}

func (t TickData) String() string {
Expand Down
72 changes: 0 additions & 72 deletions internal/app/day_tickdata.go

This file was deleted.

51 changes: 6 additions & 45 deletions internal/app/dukapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package app

import (
"ed-fx/go-duka/api/tickdata"
iTickdata "ed-fx/go-duka/internal/tickdata"
"fmt"
"github.com/pkg/errors"
"os"
Expand All @@ -10,16 +11,13 @@ import (
"sync"
"time"

"ed-fx/go-duka/internal/bi5"
"ed-fx/go-duka/internal/core"
"ed-fx/go-duka/internal/export/csv"
"ed-fx/go-duka/internal/export/fxt4"
"ed-fx/go-duka/internal/export/hst"
"ed-fx/go-duka/internal/misc"
)

const noParallelDownloads = 3

var (
log = misc.NewLogger("App", 2)
supportsFormats = []string{"csv", "fxt", "hst"}
Expand Down Expand Up @@ -192,7 +190,7 @@ func (app *DukaApp) Execute() error {
// Download by day, 24 hours a day data is downloaded in parallel by 24 goroutines
for day := opt.Start; day.Unix() < opt.End.Unix(); day = day.Add(24 * time.Hour) {
// Download, parse, store
if td, err := app.fetchDay(day); err != nil {
if td, err := iTickdata.FetchDay(opt.Symbol, day, opt.Folder); err != nil {
err = errors.Wrap(err, "Failed to fetch ["+day.Format("2006-01-02")+"]")
return err
} else if err = app.export(td); err != nil {
Expand All @@ -215,54 +213,17 @@ func (app *DukaApp) Execute() error {
return nil
}

// Fetch a day
func (app *DukaApp) fetchDay(day time.Time) (result tickdata.Day, err error) {
// Worker s get url from this channel
hours := make(chan int)

go func() {
for i := 0; i < 24; i++ {
hours <- i
}
close(hours)
}()

var wg sync.WaitGroup
opt := app.option
td := newDay(opt.Symbol, day)
for i := 0; i < noParallelDownloads; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for hour := range hours {
dayHour := day.Add(time.Duration(hour) * time.Hour)
bi := bi5.New(dayHour, opt.Symbol, opt.Folder)
derr := bi.Download()
if derr != nil {
derr = errors.Wrap(err, "Download Bi5 ["+dayHour.Format("2006-01-02 15")+"] failed")
}
td.append(dayHour, bi, derr)
}
}()
}

wg.Wait()
td.postConstruct()

result = td
return
}

// export
func (app *DukaApp) export(td tickdata.Day) error {
day := td.Time()
dayTicks := make([]*tickdata.TickData, 0, 2048)
td.Each(func(ticks []*tickdata.TickData, err error) {
td.EachDay(func(ticks []*tickdata.TickData, err error) bool {
if err != nil {
log.Error("Decode bi5 %s: %s failed: %v.", td.Symbol(), day.Format("2006-01-02:15H"), err)
return
} else {
dayTicks = append(dayTicks, ticks...)
}
dayTicks = append(dayTicks, ticks...)
return true
})

timestamp := uint32(day.Unix())
Expand Down
Loading

0 comments on commit 315de21

Please sign in to comment.