using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Threading;
using System.Collections.Concurrent;
using System.Collections.Specialized;
using System.Collections.ObjectModel;
using Tools.CancellationTokenAsync;
using Tools.Collections.Concurrent.AsyncBuffer.Store;
namespace Tools.Collections.Concurrent.AsyncBuffer
{
public class ConcurrentBuffer<T>
: IConcurrentBuffer<T>
{
public int RecommendedMaxSize { private set; get; }
private readonly IConcurrentStore<T> _Storage;
public int Size
=> _Storage.Size;
public ConcurrentBuffer(
int recommendedMaxSize,
IConcurrentStore<T> store
)
{
RecommendedMaxSize = recommendedMaxSize;
_Storage = store;
}
public void AddForce(T dateEntity)
{
_Storage.Add(dateEntity);
}
public bool TryAdd(T dataEntity)
{
if (Size >= RecommendedMaxSize)
{
return false;
}
AddForce(dataEntity);
return true;
}
/// <summary>
/// Ожидание поступления нового элемента в хранилище.
/// </summary>
private async Task WaitUntilNotEmpty(
CancellationToken cancellation
)
{
//Если элемент был добавлен в промежутке
if (_Storage.Size != 0)
{
//то выходим
return;
}
var cancellationTokenSource = CancellationTokenSource
.CreateLinkedTokenSource(cancellation);
var token = cancellationTokenSource.Token;
var taskCompletion
= new TaskCompletionSource<int>();
using (
token.Register(
() => taskCompletion.SetResult(0)
)
)
{
void StopWait(
IConcurrentStore<T> store,
T item
)
{
cancellationTokenSource.Cancel();
}
//Добавляем заявку ожидания нового элемента
_Storage.OnItemAdded += StopWait;
//Если элемент был добавлен в промежутке
if (_Storage.Size != 0)
{
//то выходим
return;
}
try
{
//Ждем оповещения о поступлении элемента
await taskCompletion
.Task
.ConfigureAwait(false);
}
finally
{
//Удаляем заявку ожидания поступления нового элемента
_Storage.OnItemAdded -= StopWait;
}
}
}
public async Task<T[]> ConsumeBatchOrWaitAsync(
int maxBatchSize,
CancellationToken cancellation = default,
bool configureAwait = false,
bool throwOperationCanceledException = false
)
{
do
{
//Если удалось получить хотя бы 1 элемент из хранилища
if (_Storage.TryTake(out T data))
{
List<T> result = new List<T>(maxBatchSize);
//То извлекаем пока не пусто или не достигнут maxBatchSize
do
{
result.Add(data);
}
while (
!cancellation.IsCancellationRequested
&& result.Count <= maxBatchSize
&& _Storage.TryTake(out data)
);
return result.ToArray();
}
//хранилище пустое
//Ждем, пока в хранилище не добавят новый элемент
//или сработает cancellation
await WaitUntilNotEmpty(cancellation)
.ConfigureAwait(configureAwait);
}
while (
!cancellation.IsCancellationRequested
);
if (throwOperationCanceledException)
{
cancellation.ThrowIfCancellationRequested();
}
return new T[0];
}
public async Task<T> ConsumeOrWaitAsync(
CancellationToken cancellation = default,
bool configureAwait = false,
bool throwOperationCanceledException = false
)
{
do
{
//Если удалось получить элемент из хранилища
if (_Storage.TryTake(out T data))
{
//То берем его
return data;
}
//хранилище пустое
//Ждем, пока в хранилище не добавят новый элемент
//или пока запрос не отменят
await WaitUntilNotEmpty(cancellation)
.ConfigureAwait(configureAwait);
} while (
!cancellation.IsCancellationRequested
);
if (throwOperationCanceledException)
{
cancellation.ThrowIfCancellationRequested();
}
return default;
}
}
}