Skip to content

Commit

Permalink
Live trading consolidation synchronization (#8436)
Browse files Browse the repository at this point in the history
* Adjust time slice time for live trading

Adjust time slice to be driven by data, so that consolidators are update at the correct times.
Live trading uses DateTime.UtcNow which might be a few milliseconds after the latest data, causing some race conditions in consolidators scan times.

* Refactor: rounding time slice time down for past consolidators scan (live)

* Cleanup
  • Loading branch information
jhonabreul authored Dec 4, 2024
1 parent 8123e76 commit a1d0b6c
Show file tree
Hide file tree
Showing 2 changed files with 189 additions and 2 deletions.
10 changes: 8 additions & 2 deletions Engine/AlgorithmManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,14 @@ public void Run(AlgorithmNodePacket job, IAlgorithm algorithm, ISynchronizer syn
// and fire them with the correct date/time.
realtime.ScanPastEvents(time);

// will scan registered consolidators for which we've past the expected scan call
algorithm.SubscriptionManager.ScanPastConsolidators(time, algorithm);
// will scan registered consolidators for which we've past the expected scan call.
// In live mode we want to round down to the second, so we don't scan too far into the future:
// The time slice might carry the data needed to complete a current consolidated bar but the
// time slice time might be slightly ahead (a few milliseconds or even ticks) because in live we
// use DateTime.UtcNow. So we don't want to scan past the data time so that the consolidators can
// complete the current bar.
var pastConsolidatorsScanTime = _liveMode ? time.RoundDown(Time.OneSecond) : time;
algorithm.SubscriptionManager.ScanPastConsolidators(pastConsolidatorsScanTime, algorithm);

//Set the algorithm and real time handler's time
algorithm.SetDateTime(time);
Expand Down
181 changes: 181 additions & 0 deletions Tests/Engine/DataFeeds/LiveTradingDataFeedTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
using NUnit.Framework;
using QuantConnect.Algorithm;
using QuantConnect.Data;
using QuantConnect.Data.Consolidators;
using QuantConnect.Data.Custom.IconicTypes;
using QuantConnect.Data.Fundamental;
using QuantConnect.Data.Market;
Expand Down Expand Up @@ -3881,6 +3882,186 @@ public void HandlesFutureAndOptionChainUniverse(SecurityType securityType, int e
timer.Dispose();
}

// Reproduces https://github.com/QuantConnect/Lean/issues/8363
[TestCase(Resolution.Second)]
[TestCase(Resolution.Minute)]
[TestCase(Resolution.Hour)]
[TestCase(Resolution.Daily)]
public void UsesFullPeriodDataForConsolidation(Resolution resolution)
{
_startDate = new DateTime(2014, 3, 27);
_algorithm.SetStartDate(_startDate);
_algorithm.Settings.DailyPreciseEndTime = false;

// Add a few milliseconds to the start date to mimic a real world live scenario, where the time provider
// will not always return an perfect rounded-down to second time
_manualTimeProvider.SetCurrentTimeUtc(_startDate.AddMilliseconds(1).ConvertToUtc(TimeZones.NewYork));

var symbol = Symbols.SPY;
_algorithm.SetBenchmark(x => 0);

var data = new[]
{
new [] { 108, 109, 90, 109, 72 },
new [] { 105, 105, 94, 100, 175 },
new [] { 93, 109, 90, 90, 170 },
new [] { 95, 105, 90, 91, 19 },
new [] { 91, 109, 91, 93, 132 },
new [] { 98, 109, 94, 102, 175 },
new [] { 107, 107, 91, 96, 97 },
new [] { 105, 108, 91, 101, 124 },
new [] { 105, 107, 91, 107, 81 },
new [] { 91, 109, 91, 101, 168 },
new [] { 93, 107, 90, 107, 199 },
new [] { 101, 108, 90, 90, 169 },
new [] { 101, 109, 90, 103, 14 },
new [] { 92, 109, 90, 105, 55 },
new [] { 96, 107, 92, 92, 176 },
new [] { 94, 105, 90, 94, 28 },
new [] { 105, 109, 91, 93, 172 },
new [] { 107, 109, 93, 93, 137 },
new [] { 95, 109, 91, 97, 168 },
new [] { 103, 109, 91, 107, 178 },
new [] { 96, 109, 96, 100, 168 },
new [] { 90, 108, 90, 102, 63 },
new [] { 100, 109, 96, 102, 134 },
new [] { 95, 103, 90, 94, 39 },
new [] { 105, 109, 91, 108, 117 },
new [] { 106, 106, 91, 103, 20 },
new [] { 95, 109, 93, 107, 7 },
new [] { 104, 108, 90, 102, 150 },
new [] { 94, 109, 90, 99, 178 },
new [] { 99, 109, 90, 106, 150 },
};

var seconds = 0;
var timeSpan = resolution.ToTimeSpan();
using var dataQueueHandler = new TestDataQueueHandler
{
DataPerSymbol = new Dictionary<Symbol, List<BaseData>>
{
{
symbol,
data
.Select(prices => new TradeBar(_startDate.Add(timeSpan * seconds++),
symbol,
prices[0],
prices[1],
prices[2],
prices[3],
prices[4],
timeSpan))
.Cast<BaseData>()
.ToList()
}
}
};

var feed = RunDataFeed(
resolution: resolution,
equities: new() { "SPY" },
dataQueueHandler: dataQueueHandler);

var consolidatedData = new List<TradeBar>();
var consolidatorUpdateData = new List<TradeBar>();

const int consolidatorBarCountSpan = 6;
var consolidatedCount = 0;
var dataCountUsedForFirstConsolidatedBar = 0;

_algorithm.Consolidate<TradeBar>(symbol, timeSpan * consolidatorBarCountSpan, (consolidatedBar) =>
{
_algorithm.Debug($"Consolidated: {_algorithm.Time} - {consolidatedBar}");

// The first consolidated bar will be consolidated from 1 to consolidatorSpanSeconds second bars,
// from the start time to the next multiple of consolidatorSpanSeconds
var dataCountToTake = 0;
if (consolidatedCount++ == 0)
{
Assert.LessOrEqual(consolidatorUpdateData.Count, consolidatorBarCountSpan);
dataCountToTake = dataCountUsedForFirstConsolidatedBar = consolidatorUpdateData.Count;
}
else
{
Assert.AreEqual(dataCountUsedForFirstConsolidatedBar + consolidatorBarCountSpan * (consolidatedCount - 1),
consolidatorUpdateData.Count);
dataCountToTake = consolidatorBarCountSpan;
}

var dataForCurrentConsolidatedBar = consolidatorUpdateData
.Skip(consolidatorBarCountSpan * (consolidatedCount - 1))
.Take(dataCountToTake)
.ToList();

Assert.AreEqual(consolidatedBar.Time, dataForCurrentConsolidatedBar[0].Time);
Assert.AreEqual(consolidatedBar.EndTime, dataForCurrentConsolidatedBar[^1].EndTime);

var expectedOpen = dataForCurrentConsolidatedBar[0].Open;
Assert.AreEqual(expectedOpen, consolidatedBar.Open);

var expectedClose = dataForCurrentConsolidatedBar[^1].Close;
Assert.AreEqual(expectedClose, consolidatedBar.Close);

var expectedHigh = dataForCurrentConsolidatedBar.Max(x => x.High);
Assert.AreEqual(expectedHigh, consolidatedBar.High);

var expectedLow = dataForCurrentConsolidatedBar.Min(x => x.Low);
Assert.AreEqual(expectedLow, consolidatedBar.Low);

var expectedVolume = dataForCurrentConsolidatedBar.Sum(x => x.Volume);
Assert.AreEqual(expectedVolume, consolidatedBar.Volume);
});

ConsumeBridge(feed,
TimeSpan.FromSeconds(5),
true,
timeSlice =>
{
if (consolidatorUpdateData.Count >= data.Length)
{
// Ran out of data, stop the feed
_manualTimeProvider.SetCurrentTimeUtc(Time.EndOfTime);
return;
}

// Mimic the algorithm manager consolidators scan:

// First, scan for consolidators that need to be updated
// NOTE: Rounding time down to mimic the algorithm manager consolidators scan
_algorithm.SubscriptionManager.ScanPastConsolidators(timeSlice.Time.RoundDown(Time.OneSecond), _algorithm);

// Then, update the consolidators with the new data
if (timeSlice.ConsolidatorUpdateData.Count > 0)
{
var timeKeeper = _algorithm.TimeKeeper;
foreach (var update in timeSlice.ConsolidatorUpdateData)
{
var localTime = timeKeeper.GetLocalTimeKeeper(update.Target.ExchangeTimeZone).LocalTime;
var consolidators = update.Target.Consolidators;
foreach (var consolidator in consolidators)
{
foreach (var dataPoint in update.Data)
{
if (consolidator is TradeBarConsolidator tradeBarConsolidator)
{
consolidatorUpdateData.Add(dataPoint as TradeBar);
}

consolidator.Update(dataPoint);
}

// scan for time after we've pumped all the data through for this consolidator
consolidator.Scan(localTime);
}
}
}
},
endDate: _startDate.Date.AddDays(60),
secondsTimeStep: (int)timeSpan.TotalSeconds);

Assert.AreEqual(dataQueueHandler.DataPerSymbol.Values.Single().Count / consolidatorBarCountSpan, consolidatedCount);
}

private class TestFundamentalDataProviderTrue : IFundamentalDataProvider
{
public T Get<T>(DateTime time, SecurityIdentifier securityIdentifier, FundamentalProperty name)
Expand Down

0 comments on commit a1d0b6c

Please sign in to comment.