-
Notifications
You must be signed in to change notification settings - Fork 10
Adapting an existing experiment to your own needs
This guide will explain how to adapt an already existing experiment (incl. all protocols) to your own needs. As a basis, we will use the optogenetic experiment from our own Lab, where we stimulated mice injected with the CalLight system upon head direction dependent movement.
Check out the Out-of-the-Box section to get an idea what triggers/stimulations and experiments are already available.
If you are interested in designing your own optogenetic experiment based on animal behavior, this is a good point to start!
For this experiment to work, you will need a NI DAQ Board or any other Python controlled digital modulation for your laser. I will mark the lines where your own device control has to be added or adapted.
Let's Start by reminding ourselfs of what a functioning experiment needs to have in DLStream:
- We need an experiment (found in
/experiments/custom/experiments.py
) - We need a trigger (found in
/experiments/custom/triggers.py
) - We need a stimulation (found in
/experiments/custom/stimulation.py
)
(4. We need a stimulation process if our stimulation requires the continues attention of DLStream (found in /experiments/custom/stimulation.py
))
Note: In the optogenetic experiment, we are using the versatility of a DAQ board sending out a digital signal (TTL) and a laser remote control, which are taking over the necessary "attention" in step 4. So we can concentrate on the first three.
Let's have a look at the two most relevant sections:
The initiation of the experiment where all important variables are defined and the check_skeleton
function where the magic happens.
def __init__(self):
#Standard variables
self.experiment_finished = False
self._event = False
self._event_start = None
self._intertrial_timer = Timer(15)
self._experiment_timer = Timer(1800)
#experiment specific variables
self._results = []
self._point = POINT
self._start_angle, self._end_angle = ANGLE_WINDOW
self._max_trial_time = 5
self._min_trial_time = 1
self._max_total_time = 600
self._total_time = 0
self._trial_time = 0
We have several variables that are defining some aspects of the experiment connected to time:
The most basic are _intertrial_timer
and _experiment_timer
. They control they inter stimulus time and the total duration of the experiment using the Timer
module. Any experiment in DLStream should have these. The inter stimulus timer will overwrite any trigger activation during this time, an important feature to avoid overheating or tissue damage.
_max_trial_time
and _min_trial_time
define the limits of stimulation time, that overwrite any trigger based activation or deactivation. I recommend keeping such an option in your optogenetic experiment to avoid overheating of tissue or flickering stimulation due to pose estimation jitters.
The maximum total time has a similiar function and provides a good safety measure if your molecular system is prone to overstimulation or needs to be comparable between animals.
The variables _total_time
and _trial_time
are recording the stimulation duration during the experiment and are therefore set to 0 at the beginning. They will change during the experiment.
Depending on your trigger you will have to change these, but we will come to that later.
As we are using head direction based on the relation/angle of the animals head (measured from the neck and nose point) to a reference POINT
, the _point
variable is used to define such a reference and will be passed to the Trigger
module. _point
can be any tuple ((X, Y)
) that you want.
_start_angle
and _end_angle
are defining the limits for our head direction trigger.
You will notice that the function underneath has no Trigger
module build in and relies on one main calculation:
_, angle_point = angle_between_vectors(*skeleton['neck'], *skeleton['nose'], *self._point)
For a simple experiment this might be enough, but we are using this chance to include our very own HeadDirectionTrigger
(found in /experiments/triggers.py
).
However before that, let's look at the general structure of this experiment. First, the experiment checks if it already reached any predefined criteria. Here it is wether the total experimental druation or the total stimulation time was reached. If not, the experiment will continue. If so, the experiment will end (last lines of code).
Second, the experiment is checking whether an inter stimulus period is currently active. This will block any potential stimulation.
Third, with every pose estimation DLStream gets, the experiment is calculating the headdirection angle and asks one simple YES/NO (TRUE/FALSE) question:
- Is the headdirection angle in the previously defined window between (
_start_angle
and_end_angle
)?
The second question is:
- If so, is the stimulation already going?
This has several concequences:
If the stimulation has not been started yet, the stimulation event variable will be set to TRUE
(meaning the stimulation has started), the laser will be turned ON
and the time of the start will be recorded. Additionally, we will reset the inter stimulus timer, so it will start again from 0 after the stimulation is done.
In the next cycle (if the stimulation has been started yet and the headdirection angle is still within the window), the experiment checks wheter the stimulation time has already passed the _max_trial_time
. It will save the current stimulation duration and pass on to the next cycle.
If the maximum stimulation time has been reached, it will stop the stimulation, start the inter stimulus timer and add the stimulation duration to _total_time
and _results
.
If the stimulation is running and the headdirection angle is NOT in the window to things can happen:
- If the
_min_trial_time
has NOT been reached, the stimulation will continue. - If the
_min_trial_time
has been reached, the stimulation will be stopped (just as above).
Read through the code below and check if you understood the concept behind it. We will now include the Trigger
.
def check_skeleton(self, frame, skeleton):
if self._experiment_timer.check_timer():
if self._total_time >= self._max_total_time:
# check if total time to stimulate per experiment is reached
print("Ending experiment, total event time ran out")
self.stop_experiment()
else:
# if not continue
if not self._intertrial_timer.check_timer():
# check if there is an intertrial time running right now, if not continue
# check if the headdirection angle is within limits
_, angle_point = angle_between_vectors(*skeleton['neck'], *skeleton['nose'], *self._point)
if self._start_angle <= angle_point <= self._end_angle:
if not self._event:
# if a stimulation event wasn't started already, start one
print("Starting Stimulation")
self._event = True
# and activate the laser, start the timer and reset the intertrial timer
laser_switch(True)
self._event_start = time.time()
self._intertrial_timer.reset()
else:
if time.time() - self._event_start <= self._max_trial_time:
# if the total event time has not reached the maximum time per event
# self._trial_time = time.time() - self._event_start
pass
else:
# if the maximum event time was reached, reset the event,
# turn off the laser and start intertrial time
print("Ending Stimulation, Stimulation time ran out")
self._event = False
laser_switch(False)
trial_time = time.time() - self._event_start
self._total_time += trial_time
self._results.append(trial_time)
print("Stimulation duration", trial_time)
self._intertrial_timer.start()
else:
# if the headdirection is not within the parameters
if self._event:
# but the stimulation is still going
if time.time() - self._event_start < self._min_trial_time:
# check if the minimum event time was not reached, then pass
pass
else:
# if minumum event time has been reached, reset the event,
# turn of the laser and start intertrial time
print("Ending Stimulation, angle not in range")
self._event = False
# laser_toggle(False)
laser_switch(False)
# self._trial_time = time.time() - self._event_start
trial_time = time.time() - self._event_start
self._total_time += trial_time
self._results.append(trial_time)
print("Stimulation duration", trial_time)
self._intertrial_timer.start()
else:
#if maximum experiment time was reached, stop experiment
print("Ending experiment, timer ran out")
self.stop_experiment()
If you want to use your own Trigger
module or just a different type, check with the Trigger
module which input variables you might need. In general it is good practice to pass any fixed variables/parameters to the Trigger
module upon initialization, so that the check_skeleton
function from the module is only getting the skeleton
as input. Out-of-the-Box Triggers
are designed in the same way, so that once you initiated them, you can use them in experiments interchangeably.
First, import the Trigger
module from /experiments/triggers.py
and create the additional variables in the __init__()
function of your experiment.
def __init__(self):
self._angle = 30 #take any number (int) as you like between 0 and 180°
self._point = (X, Y)
self._trigger = HeaddirectionTrigger(self._angle, self._point)
Note: For example for the RegionTrigger
module, you would need region_type
, center
, radius
and bodyparts
, afterwards the rest is the same.
The shortend code below shows you how to add the trigger in the experiment cycle:
def check_skeleton(self, frame, skeleton):
if self._experiment_timer.check_timer():
if self._total_time >= self._max_total_time:
# check if total time to stimulate per experiment is reached
else:
# if not continue
if not self._intertrial_timer.check_timer():
# check if there is an intertrial time running right now, if not continue
# check if the headdirection angle is within limits
result, response = self._trigger.check_skeleton(skeleton=skeleton)
if result:
# if the trigger returns true
else:
# if the trigger returns false
That's it. Easy isn't it? You can add any Trigger
and it will work the same way.
This is where you will need to adjust the code to your own setup. The function laser_switch()
from /experiments/custom/stimulation.py
is using the nidaqmx library to interact with the NI DAQ board in our own setup. If you have a similiar board, you can use the provided modules DigitalModDevice
and AnalogModDevice
to modulate any device connected to your DAQ board either digital (TTL) or analog. The only thing you need is the port reference.
The laser_switch()
function is using the DigitalModDevice
class to switch the laser on and off by sending TTL signals to our laser remote. Depending on your system the necessary implementation might change. We provided several different option to use for digital modulation. Mind though, that some protocols (e.g. frequency stimulation) might need the ongoing "attention" of DLSTream to toggle the laser, this would be a good use of the stimulation processes!
You can also utilize Raspberry Pi or Arduino boards for GPIO control. Check out the relevant section in the wiki.
Do you have any further questions? Ask us in the Issue page!