From c7f3deab570d82a465822c39b82dc7a44801ff19 Mon Sep 17 00:00:00 2001 From: Timerix Date: Mon, 4 Oct 2021 16:40:24 +0300 Subject: [PATCH] reactivity development is continuing --- EventHandlerAsync.cs | 6 +----- Network/FSP.cs | 2 +- Reactive/ReactiveListener.cs | 29 +++++++++++++++++++++-------- Reactive/ReactiveProvider.cs | 30 +++++++++++++++++++++++++++--- Reactive/ReactiveStream.cs | 15 ++++++--------- Reactive/ReactiveWorker.cs | 18 ++++++++++-------- 6 files changed, 66 insertions(+), 34 deletions(-) diff --git a/EventHandlerAsync.cs b/EventHandlerAsync.cs index 6e86a58..13361dc 100644 --- a/EventHandlerAsync.cs +++ b/EventHandlerAsync.cs @@ -1,8 +1,4 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; +using System.Threading.Tasks; namespace DTLib { diff --git a/Network/FSP.cs b/Network/FSP.cs index f700c27..192019c 100644 --- a/Network/FSP.cs +++ b/Network/FSP.cs @@ -11,7 +11,7 @@ namespace DTLib.Network // public class FSP { - Socket mainSocket { get; init; } + Socket mainSocket { get; init; } static public bool debug = false; public FSP(Socket _mainSocket) => mainSocket = _mainSocket; diff --git a/Reactive/ReactiveListener.cs b/Reactive/ReactiveListener.cs index 0ac8647..04ef3c1 100644 --- a/Reactive/ReactiveListener.cs +++ b/Reactive/ReactiveListener.cs @@ -1,21 +1,34 @@ using System; using System.Collections.Generic; -using System.Linq; -using System.Text; using System.Threading.Tasks; namespace DTLib.Reactive { public class ReactiveListener : ReactiveWorker { - public ReactiveListener() + public ReactiveListener() { } + public ReactiveListener(ReactiveStream stream) : base(stream) { } + public ReactiveListener(IEnumerable> streams) : base(streams) { } + + public Action ElementAddedHandler; + public void SetHandler(Action handler) => ReactiveWorkerMutex.Execute(() => { ElementAddedHandler = handler; }); + public async Task ElementAdded(object s, T e) => await Task.Run(() => ElementAddedHandler?.Invoke(s, e)); + + public override void Join(ReactiveStream stream) { - StreamCollectionAccess.Execute(() => + ReactiveWorkerMutex.Execute(() => { - foreach(var stream in Streams) - { - stream.ElementAdded += async (sender, value) => { await Task.Run(() =>{ }); }; - } + Streams.Add(stream); + stream.ElementAdded += ElementAdded; + }); + } + + public override void Leave(ReactiveStream stream) + { + ReactiveWorkerMutex.Execute(() => + { + Streams.Remove(stream); + stream.ElementAdded -= ElementAdded; }); } } diff --git a/Reactive/ReactiveProvider.cs b/Reactive/ReactiveProvider.cs index 6e29089..84f91d4 100644 --- a/Reactive/ReactiveProvider.cs +++ b/Reactive/ReactiveProvider.cs @@ -1,13 +1,37 @@ using System; using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; namespace DTLib.Reactive { public class ReactiveProvider : ReactiveWorker { + public ReactiveProvider() { } + public ReactiveProvider(ReactiveStream stream) : base(stream) { } + public ReactiveProvider(IEnumerable> streams) : base(streams) { } + + event Action AnnounceEvent; + public void Announce(T e) + { + ReactiveWorkerMutex.Execute(()=>AnnounceEvent.Invoke(e)); + } + + public override void Join(ReactiveStream stream) + { + ReactiveWorkerMutex.Execute(() => + { + Streams.Add(stream); + AnnounceEvent += stream.Add; + }); + } + + public override void Leave(ReactiveStream stream) + { + ReactiveWorkerMutex.Execute(() => + { + Streams.Remove(stream); + AnnounceEvent -= stream.Add; + }); + } } } diff --git a/Reactive/ReactiveStream.cs b/Reactive/ReactiveStream.cs index feec05a..819e47a 100644 --- a/Reactive/ReactiveStream.cs +++ b/Reactive/ReactiveStream.cs @@ -1,32 +1,29 @@ using System; using System.Collections.Generic; -using System.Collections; -using System.Collections.ObjectModel; -using System.Text; -using System.Threading.Tasks; namespace DTLib.Reactive { - public class ReactiveStream + public class ReactiveStream { List Storage = new(); public event EventHandlerAsync ElementAdded; bool StoreData = false; - SafeMutex StorageAccess = new(); + SafeMutex StorageMutex = new(); public ReactiveStream() { } public ReactiveStream(bool storeData) => StoreData = storeData; public void Add(T elem) { - if (StoreData) StorageAccess.Execute(() => Storage.Add(elem)); + if (StoreData) StorageMutex.Execute(() => Storage.Add(elem)); ElementAdded?.Invoke(this, elem); } - + public void Clear() { - if (StoreData) StorageAccess.Execute(() => Storage.Clear()); + if (StoreData) StorageMutex.Execute(() => Storage.Clear()); + else throw new Exception("Can't clear ReactiveStream because StoreData==false"); } } } diff --git a/Reactive/ReactiveWorker.cs b/Reactive/ReactiveWorker.cs index c19e6fc..7d36e42 100644 --- a/Reactive/ReactiveWorker.cs +++ b/Reactive/ReactiveWorker.cs @@ -1,18 +1,20 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; +using System.Collections.Generic; namespace DTLib.Reactive { public abstract class ReactiveWorker { protected List> Streams = new(); + protected SafeMutex ReactiveWorkerMutex = new(); - protected SafeMutex StreamCollectionAccess = new(); + public ReactiveWorker() { } - public void Join(ReactiveStream stream) => StreamCollectionAccess.Execute(()=>Streams.Add(stream)); - public void Leave(ReactiveStream stream) => StreamCollectionAccess.Execute(() => Streams.Remove(stream)); + public ReactiveWorker(ReactiveStream stream) => Join(stream); + + public ReactiveWorker(IEnumerable> streams) => + ReactiveWorkerMutex.Execute(() => { foreach (var stream in streams) Join(stream); }); + + public abstract void Join(ReactiveStream stream); + public abstract void Leave(ReactiveStream stream); } }