Skip to content

mapReduce

Alexius Wadell edited this page Jan 17, 2017 · 5 revisions

Syntax

[R1, R2, ..] = ds.mapReduce(mapFun, reduceFun)

[R1, R2, ..] = ds.mapReduce(mapFun, reduceFun, channels)

Description

[R1, R2, ..] = ds.mapReduce(mapFun, reduceFun) Applies mapFun to each datasource contained in ds and then applies reduceFun to the combined results from mapFun


[R1, R2, ..] = ds.mapReduce(___, channels) Required channels can be passed as using a single string (For one channel) or a cell array of strings (for multiple channels). mapReduce will then only process the datasources in ds with all of the required channels. Additionally mapReduce will handle loading all of the listed channels using loadChannel.

Input Arguments

mapFun

Function handle to map function. mapFun receives a single datasource from the input set ds and returns one or more results.

Note: For better performance load all required channels at the start using loadChannel

An example template for a map function is

function [R1, R2] = mapFun(ds)

%Load all channels need for analysis using loadChannel
ds.loadChannel(RequiredChannels);

%Synchronize Sampling Rates using Sync
ds.Sync;

%Do some calculations on the supplied datasource 
R1 = SomeFunction(Channel Data from ds);
R2 = SomeOtherFunction(Channel Data from ds);

end

reduceFun

Function handle to reduce function. reduceFun takes a number of cell array inputs equal to the number of outputs of mapFun and returns one or more outputs.

An example template for a reduce function is

function [R1_Out, R2_Out, R3_Out] = reduceFun(R1, R2)

%Do some calculations on the supplied datasource 
R1_Out = SomeFunction(R1);
R2_Out = SomeOtherFunction(R2);
R3_Out = AnotherFunction(R1, R2);

end

Output Arguments

mapReduce will return the one or more arguments ([R1, R2, ...]) returned by reduceFun exactly.

Using mapReduce

Diagram of how mapReduce Operates

The mapReduce method implements a common strategy for processing Big Data (See Wikipedia) with a focus on streamlining the process of scaling a function from being able to analyze a single datasource to analyzing thousands. In principle any function that can be applied to a single datasource can be quickly applied to thousands using mapReduce, once even a crude reduceFun has been created.

The mapReduce function forms the backbone of most of the built in methods for processing datasource and as such contains a few additional features:

  • Automatically clears data loaded into memory from a datasource to prevent out of memory errors
  • Implements an efficient progress bar to notify the user of the progress (or lack of progress) made by the mapFun.

Examples

Total Drive Time

This example shows how to use mapReduce to determine the total amount of time an engine has spent at speeds above 2000 RPM. This could be used to estimate the total non-idle run time of an engine.

Gather Datasources

First create a Datamaster object (dm) and get a set of datasource (ds) to analyze. For this example the entire datastore will be examined but a subset could also be used (See getDatasource).

dm = Datamaster;
ds = dm.getDatasource;

Create Map Function

Next let's define a mapping function to process each datsource and return the amount of time spent above 2000 RPM during that datasource.

function driveTime = MapDriveTimeFun(ds)
% Count the amount of time logged where Engine_RPM > 2000 RPM for the datasource

%First get the Engine_RPM Channel with units of 'rpm'
speed = ds.getChannel('Engine_RPM', 'unit', 'rpm');

%Now find when the Engine_RPM > 2000
count = sum(speed.Value > 2000);

% Assuming a constant sampling period, find the sampling period
period = mean(diff(speed.Time));

%The driveTime is the count times the sampling period
driveTime = count * period;

end

Create Reduce Function

To get the total time spent above 2000 RPM, we need to sum the results from the map function

function totalDriveTime = ReduceDriveTimeFun(driveTime)
%Reducing function for computing the total drivetime

%Sum the driveTime from each datasource
%Recalling that driveTime is a cell array of the outputs from the mapFun
totalDriveTime = sum(driveTime{:});

end

Run mapReduce

Now use mapReduce to process all of the collected datasources

totalDriveTime = ds.mapReduce(@MapDriveTimeFun, @sReduceDriveTimeFun, 'Engine_RPM')

Putting it all together

Remember as this is a script the inline functions have to come after everything else

dm = Datamaster;
ds = dm.getDatasource;

totalDriveTime = ds.mapReduce(@MapDriveTimeFun, @ReduceDriveTimeFun, 'Engine_RPM')

function driveTime = MapDriveTimeFun(ds)
% Count the amount of time logged where Engine_RPM > 2000 RPM for the datasource

%First get the Engine_RPM Channel with units of 'rpm'
speed = ds.getChannel('Engine_RPM', 'unit', 'rpm');

%Now find when the Engine_RPM > 2000
count = sum(speed.Value > 2000);

% Assuming a constant sampling period, find the sampling period
period = mean(diff(speed.Time));

%The driveTime is the count times the sampling period
driveTime = count * period;

end

function totalDriveTime = ReduceDriveTimeFun(driveTime)
%Reducing function for computing the total drivetime

%Sum the driveTime from each datasource
%Recalling that driveTime is a cell array of the outputs from the mapFun
totalDriveTime = sum([driveTime{:}]);

end