Skip to content

Commit

Permalink
Narrow the gap.
Browse files Browse the repository at this point in the history
  • Loading branch information
mewlist committed Nov 30, 2023
1 parent acf56f8 commit 7dc133b
Showing 1 changed file with 40 additions and 18 deletions.
58 changes: 40 additions & 18 deletions Runtime/TaskQueue/TaskQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,40 @@
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;

namespace Mew.Core.Tasks
{
public class TaskQueue : IDisposable
{
private readonly List<TaskQueueAwaitable> queue = new();
private TaskAwaiter? awaiter;
private bool updateProcessing;
private bool taskProcessing;
private CancellationTokenSource? cts;
private CancellationToken? disposeCt;

protected string loopId = string.Empty;

private object SyncRoot => ((ICollection)queue).SyncRoot;

public TaskQueueLimitType LimitType { get; }
/// <summary>
/// Max size of queue.
/// </summary>
public int MaxSize { get; }

/// <summary>
/// Count of running or waiting tasks.
/// </summary>
public int Count => (queue?.Count ?? 0) + (awaiter.HasValue ? 1 : 0);
public int Count
{
get
{
lock (SyncRoot)
{
return queue.Count + (taskProcessing ? 1 : 0);
}
}
}

/// <summary>
/// true if started.
Expand Down Expand Up @@ -168,7 +179,6 @@ private void CancelCurrent()
cts?.Cancel();
cts?.Dispose();
cts = null;
awaiter = null;
}

private async void Update()
Expand All @@ -179,25 +189,37 @@ private async void Update()
return;
}

if (awaiter.HasValue) return;
if (updateProcessing) return;
lock (SyncRoot) if (!queue.Any()) return;

TaskQueueAwaitable task;
updateProcessing = true;

lock (SyncRoot)
while (true)
{
if (!queue.Any()) return;
using var taskCts = new CancellationTokenSource();
TaskQueueAwaitable task;

lock (SyncRoot)
{
task = queue.First();
queue.RemoveAt(0);
taskProcessing = true;
}

cts = disposeCt.HasValue
? CancellationTokenSource.CreateLinkedTokenSource(taskCts.Token, disposeCt.Value)
: taskCts;

await task.Invoke(cts.Token);

cts?.Dispose();
cts = null;

task = queue.First();
queue.RemoveAt(0);
lock (SyncRoot) if (!queue.Any()) break;
}

var taskCts = new CancellationTokenSource();
cts = disposeCt.HasValue
? CancellationTokenSource.CreateLinkedTokenSource(taskCts.Token, disposeCt.Value)
: taskCts;
var invokedTask = task.Invoke(cts.Token);
awaiter = invokedTask.GetAwaiter();
await invokedTask.ContinueWith(_ => awaiter = null, taskCts.Token);
taskProcessing = false;
updateProcessing = false;
}
}

Expand Down

0 comments on commit 7dc133b

Please sign in to comment.