-
Notifications
You must be signed in to change notification settings - Fork 54
How To code a data source
fawce edited this page Nov 14, 2012
·
1 revision
- Implement a new class that extends DataSource
- You must provide an init method that:
- takes arguments for any connection info
- takes date range parameters
- takes sid or sid list parameters
- sets the value of self.source_id.
- the value of source_id must be a unique and invariant hash for the class, sid list, and date range. We use this hash to guarantee that all events across all sources in a zipline are ordered deterministically. For example, imagine a data source of news events updated minutely is combined in a zipline with minute bar trade events. It is very likely that a news item will have the same datetime stamp as a trade bar. To break that tie, zipline sorts by the source_id.
- sets the self.TYPE to be a value from zipline_protocol.EVENT_TYPE (n.b. you may need to add a new type to the enumeration)
- You must provide a emit_raw_data method.
- Must yield dictionaries with expected properties. A trivial method would be something like:
def emit_raw_data(self):
for x in xrange(1000):
next_event = {'datetime':datetime.utcnow().replace(tzinfo=pytz.utc), 'value':x}
# the datetime field will have asserts in the base class on
# its existence, type, and tzinfo being pytz.utc.
assert next_event['value'] < 1000
yield next_event
- Define an optional mapping model and accessor method get_mapping.
def get_mapping(self):
return { field_name1 : (conversion_method, source_field_key) }
# here's an example conversion method
def convert_datestring_to_date(string_date):
datetime.strptime(string_date, "%m/%d/%y")
- We need you to write tests! At a minimum please write a unit test for your mapping function (example correct, and incorrect inputs). We will really sing your praises if you create a mock version of the source and write a test that creates an algorithm and adds that mock source to the inputs.
You can add a source to the zipline from within the init method of your algorithm:
class ExampleAlgo(TradingAlgorithm):
def initialize(self):
# Add 2 mavg transforms, one with a long window, one
# with a short window.
self.add_source(MyNewDataSource([24],start_date,end_date)
datetime, open, high, low, close, price, volume, source_id, TYPE=TRADE