Skip to content

Commit

Permalink
Added StreamDataFrame stream
Browse files Browse the repository at this point in the history
  • Loading branch information
philippjfr committed Oct 21, 2017
1 parent 3d09a28 commit 8f1e6d1
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 5 deletions.
13 changes: 9 additions & 4 deletions holoviews/plotting/bokeh/element.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from ...core import Store, DynamicMap, CompositeOverlay, Element, Dimension
from ...core.options import abbreviated_exception, SkipRendering
from ...core import util
from ...streams import Stream
from ...streams import Stream, StreamDataFrame
from ..plot import GenericElementPlot, GenericOverlayPlot
from ..util import dynamic_update
from .plot import BokehPlot, TOOLS
Expand Down Expand Up @@ -180,6 +180,8 @@ def __init__(self, element, plot=None, **params):
self.static = len(self.hmap) == 1 and len(self.keys) == len(self.hmap)
self.callbacks = self._construct_callbacks()
self.static_source = False
dfstream = [s for s in self.streams if isinstance(s, StreamDataFrame)]
self.streaming = dfstream[0] if any(dfstream) else None

# Whether axes are shared between plots
self._shared = {'x': False, 'y': False}
Expand Down Expand Up @@ -349,7 +351,10 @@ def _axes_props(self, plots, subplots, element, ranges):
categorical_x = any(isinstance(x, util.basestring) for x in (l, r))
categorical_y = any(isinstance(y, util.basestring) for y in (b, t))

range_types = (self._x_range_type, self._y_range_type)
if self.streaming:
range_types = DataRange1d, DataRange1d
else:
range_types = self._x_range_type, self._y_range_type
if self.invert_axes: range_types = range_types[::-1]
x_range_type, y_range_type = range_types
if categorical or categorical_x:
Expand Down Expand Up @@ -572,7 +577,7 @@ def _update_ranges(self, element, ranges):


def _update_range(self, axis_range, low, high, factors, invert, shared, log):
if isinstance(axis_range, (Range1d, DataRange1d)) and self.apply_ranges:
if isinstance(axis_range, Range1d) and self.apply_ranges:
if (low == high and low is not None and
not isinstance(high, util.datetime_types)):
offset = abs(low*0.1 if low else 0.5)
Expand Down Expand Up @@ -1288,7 +1293,7 @@ class OverlayPlot(GenericOverlayPlot, LegendPlot):
'show_grid', 'logx', 'logy', 'xticks', 'toolbar',
'yticks', 'xrotation', 'yrotation', 'lod',
'border', 'invert_xaxis', 'invert_yaxis', 'sizing_mode',
'title_format']
'title_format', 'legend_position', 'legend_cols']

def _process_legend(self):
plot = self.handles['plot']
Expand Down
6 changes: 5 additions & 1 deletion holoviews/plotting/bokeh/plot.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,11 @@ def _update_datasource(self, source, data):
"""
Update datasource with data for a new frame.
"""
source.data.update(data)
if self.streaming:
if self.streaming._triggering:
source.stream(data, self.streaming.backlog)
else:
source.data.update(data)

@property
def state(self):
Expand Down
3 changes: 3 additions & 0 deletions holoviews/plotting/bokeh/tabular.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import numpy as np
from ...core import Dataset
from ...element import ItemTable
from ...streams import StreamDataFrame
from ..plot import GenericElementPlot
from .plot import BokehPlot
from .util import bokeh_version
Expand Down Expand Up @@ -32,6 +33,8 @@ def __init__(self, element, plot=None, **params):
element_ids = self.hmap.traverse(lambda x: id(x), [Dataset, ItemTable])
self.static = len(set(element_ids)) == 1 and len(self.keys) == len(self.hmap)
self.callbacks = [] # Callback support on tables not implemented
dfstream = [s for s in self.streams if isinstance(s, StreamDataFrame)]
self.streaming = dfstream[0] if any(dfstream) else None


def _execute_hooks(self, element):
Expand Down
24 changes: 24 additions & 0 deletions holoviews/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,30 @@ def hashkey(self):
return {'hash': uuid.uuid4().hex}


class StreamDataFrame(StreamData):
"""
StreamDataFrame provides an adaptor to attach a
streamz.StreamingDataFrame to a HoloViews stream. Whenever the
StreamingDataFrame emits a dataframe it will trigger a HoloViews
stream event supplying the dataframe to the user callback via
the 'data' keyword.
"""

def __init__(self, sdf, backlog=1000, **params):
try:
from streamz.dataframe import StreamingDataFrame, StreamingSeries
except ImportError:
raise ImportError("StreamDataFrame requires streamz library to be available")
if not isinstance(sdf, (StreamingDataFrame, StreamingSeries)):
raise ValueError("StreamDataFrame must be instantiated with a "
"streamz.StreamingDataFrame or streamz.StreamingSeries")
if 'data' not in params:
params['data'] = sdf.example.reset_index()
super(StreamDataFrame, self).__init__(**params)
self.sdf = sdf
self.backlog = backlog


class LinkedStream(Stream):
"""
A LinkedStream indicates is automatically linked to plot interactions
Expand Down

0 comments on commit 8f1e6d1

Please sign in to comment.