ConcurrentBuffer.cs

199 lines | 5.922 kB Blame History Raw Download
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

using System.Threading;
using System.Collections.Concurrent;
using System.Collections.Specialized;
using System.Collections.ObjectModel;

using Tools.CancellationTokenAsync;

using Tools.Collections.Concurrent.AsyncBuffer.Store;

namespace Tools.Collections.Concurrent.AsyncBuffer
{
    public class ConcurrentBuffer<T>
        : IConcurrentBuffer<T>
    {
        public int RecommendedMaxSize { private set; get; }

        private readonly IConcurrentStore<T> _Storage;


        public int Size
            => _Storage.Size;

        public ConcurrentBuffer(
            int recommendedMaxSize, 
            IConcurrentStore<T> store
            )
        {
            RecommendedMaxSize = recommendedMaxSize;
            _Storage = store;
        }

        public void AddForce(T dateEntity)
        {            
            _Storage.Add(dateEntity);
        }

        public bool TryAdd(T dataEntity)
        {
            if (Size >= RecommendedMaxSize)
            {
                return false;
            }

            AddForce(dataEntity);
            return true;
        }


        /// <summary>
        /// Ожидание поступления нового элемента в хранилище.
        /// </summary>
        private async Task WaitUntilNotEmpty(
            CancellationToken cancellation
            )
        {
            //Если элемент был добавлен в промежутке 
            if (_Storage.Size != 0)
            {
                //то выходим
                return;
            }

            var cancellationTokenSource = CancellationTokenSource
                .CreateLinkedTokenSource(cancellation);

            var token = cancellationTokenSource.Token;

            var taskCompletion 
                = new TaskCompletionSource<int>();

            using (
                token.Register(
                    () => taskCompletion.SetResult(0)
                )
                ) 
            {
                void StopWait(
                    IConcurrentStore<T> store,
                    T item
                    )
                {
                    cancellationTokenSource.Cancel();
                }

                //Добавляем заявку ожидания нового элемента
                _Storage.OnItemAdded += StopWait;

                //Если элемент был добавлен в промежутке 
                if (_Storage.Size != 0)
                {
                    //то выходим
                    return;
                }

                try
                {
                    //Ждем оповещения о поступлении элемента
                    await taskCompletion
                        .Task
                        .ConfigureAwait(false);
                }
                finally
                {
                    //Удаляем заявку ожидания поступления нового элемента
                    _Storage.OnItemAdded -= StopWait;
                }
            }                
        }


        public async Task<T[]> ConsumeBatchOrWaitAsync(
            int maxBatchSize,
            CancellationToken cancellation = default,
            bool configureAwait = false,
            bool throwOperationCanceledException = false
            )
        {
            do
            {
                //Если удалось получить хотя бы 1 элемент из хранилища
                if (_Storage.TryTake(out T data))
                {
                    List<T> result = new List<T>(maxBatchSize);

                    //То извлекаем пока не пусто или не достигнут maxBatchSize
                    do
                    {
                        result.Add(data);
                    }
                    while (
                        !cancellation.IsCancellationRequested
                        && result.Count <= maxBatchSize
                        && _Storage.TryTake(out data)
                        );

                    return result.ToArray();
                }

                //хранилище пустое

                //Ждем, пока в хранилище не добавят новый элемент
                //или сработает cancellation
                await WaitUntilNotEmpty(cancellation)
                    .ConfigureAwait(configureAwait);
            }
            while (
                !cancellation.IsCancellationRequested
                );

            if (throwOperationCanceledException)
            {
                cancellation.ThrowIfCancellationRequested();
            }

            return new T[0];
        }

        public async Task<T> ConsumeOrWaitAsync(
            CancellationToken cancellation = default,
            bool configureAwait = false,
            bool throwOperationCanceledException = false
            )
        {
            do
            {
                //Если удалось получить элемент из хранилища
                if (_Storage.TryTake(out T data))
                {
                    //То берем его
                    return data;
                }

                //хранилище пустое

                //Ждем, пока в хранилище не добавят новый элемент
                //или пока запрос не отменят
                await WaitUntilNotEmpty(cancellation)
                        .ConfigureAwait(configureAwait);
            } while (
                    !cancellation.IsCancellationRequested
                    );

            if (throwOperationCanceledException)
            {
                cancellation.ThrowIfCancellationRequested();
            }

            return default;
        }
    }

}