Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch with lazy enumerables (issue #400) #402

Closed
wants to merge 15 commits into from
51 changes: 48 additions & 3 deletions MoreLinq.Test/BatchTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ namespace MoreLinq.Test
{
using System.Collections.Generic;
using NUnit.Framework;
using System;

[TestFixture]
public class BatchTest
Expand Down Expand Up @@ -71,13 +72,13 @@ public void BatchSequenceTransformingResult()
}

[Test]
public void BatchSequenceYieldsListsOfBatches()
public void BatchSequenceYieldsIEnumerablesOfBatches()
{
var result = new[] { 1, 2, 3 }.Batch(2);
using (var reader = result.Read())
{
Assert.That(reader.Read(), Is.InstanceOf(typeof(IList<int>)));
Assert.That(reader.Read(), Is.InstanceOf(typeof(IList<int>)));
Assert.That(reader.Read(), Is.InstanceOf(typeof(IEnumerable<int>)));
Assert.That(reader.Read(), Is.InstanceOf(typeof(IEnumerable<int>)));
reader.ReadEnd();
}
}
Expand All @@ -87,5 +88,49 @@ public void BatchIsLazy()
{
new BreakingSequence<object>().Batch(1);
}

[Test]
public void BatchHasLazyEnumerables()
{
var seq1 = new[] { 1, 2, 3, 4, 5, 6, 7, 8, 9 };
var seq2 = MoreEnumerable.From<int>(() => throw new InvalidOperationException());
var result = seq1.Concat(seq2).Batch(4);

using (var reader = result.Read())
{
reader.Read().AssertSequenceEqual(1, 2, 3, 4);
reader.Read().AssertSequenceEqual(5, 6, 7, 8);
reader.Read().Take(1).AssertSequenceEqual(9);
}
}

[Test]
public void BatchesCanBePartialIterated()
{
var result = new[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11 }.Batch(3);
IEnumerable<int> batch1, batch2, batch3, batch4;

using (var reader = result.Read())
{
batch1 = reader.Read();
batch1.AssertSequenceEqual(1, 2, 3);

batch2 = reader.Read();
batch2.Take(1).AssertSequenceEqual(4);

batch3 = reader.Read();
batch3.Take(2).AssertSequenceEqual(7, 8);

batch4 = reader.Read();
batch4.Take(3).AssertSequenceEqual(10, 11);

reader.ReadEnd();
}

batch1.AssertSequenceEqual( 1, 2, 3);
batch2.AssertSequenceEqual( 4, 5, 6);
batch3.AssertSequenceEqual( 7, 8, 9);
batch4.AssertSequenceEqual(10, 11);
}
}
}
75 changes: 55 additions & 20 deletions MoreLinq/Batch.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#region License and Terms
// MoreLINQ - Extensions to LINQ to Objects
// Copyright (c) 2009 Atif Aziz. All rights reserved.
// Copyright (c) 2016 Leandro F. Vieira (leandromoh). All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -19,6 +19,7 @@ namespace MoreLinq
{
using System;
using System.Collections.Generic;
using System.Linq;

static partial class MoreEnumerable
{
Expand Down Expand Up @@ -58,37 +59,71 @@ public static IEnumerable<TResult> Batch<TSource, TResult>(this IEnumerable<TSou
if (size <= 0) throw new ArgumentOutOfRangeException(nameof(size));
if (resultSelector == null) throw new ArgumentNullException(nameof(resultSelector));

return _(); IEnumerable<TResult> _()
return _().TakeWhile(x => x.Any()).Select(resultSelector);

IEnumerable<IEnumerable<TSource>> _()
{
TSource[] bucket = null;
var count = 0;
List<TSource> previousBucket = null;
List<TSource> currentBucket = null;
var group = 1;
var disposed = false;
var e = source.GetEnumerator();
var index = 0;

foreach (var item in source)
try
{
if (bucket == null)
while (!disposed)
{
bucket = new TSource[size];
currentBucket = new List<TSource>(size);
yield return GetBucket(group, currentBucket);
previousBucket = currentBucket;
group++;
}
}
finally
{
if (!disposed)
e.Dispose();
}

bucket[count++] = item;
IEnumerable<TSource> GetBucket(int pgroup, List<TSource> pcurrentBucket)
{
var min = (pgroup - 1) * size;
var hasValue = false;

// The bucket is fully buffered before it's yielded
if (count != size)
while (index < min && e.MoveNext())
{
continue;
previousBucket.Add(e.Current);
index++;
}

yield return resultSelector(bucket);
previousBucket = null;

bucket = null;
count = 0;
}
for (var i = 0; i < size; i++)
{
if (i < pcurrentBucket.Count)
{
hasValue = true;
}
else if (hasValue = (!disposed && e.MoveNext()))
{
index++;
pcurrentBucket.Add(e.Current);
}
else
{
if (!disposed)
{
e.Dispose();
disposed = true;
}
}

// Return the last bucket with all remaining elements
if (bucket != null && count > 0)
{
Array.Resize(ref bucket, count);
yield return resultSelector(bucket);
if (hasValue)
yield return pcurrentBucket[i];
else
yield break;
}
}
}
}
Expand Down