From 2ed8d5d9e8373609e0418cea4c869de19adc4356 Mon Sep 17 00:00:00 2001 From: Timerix22 Date: Sat, 30 Oct 2021 13:48:13 +0300 Subject: [PATCH] some reactive shit --- DTLib.csproj | 4 +++ Reactive/ReactiveListener.cs | 52 +++++++++++++++++++++++------------- Reactive/ReactiveProvider.cs | 38 +++++++++++++------------- Reactive/ReactiveSender.cs | 14 ++++++++++ Reactive/ReactiveStream.cs | 39 ++++++++++++++++++--------- Reactive/ReactiveWorker.cs | 20 -------------- 6 files changed, 97 insertions(+), 70 deletions(-) create mode 100644 Reactive/ReactiveSender.cs delete mode 100644 Reactive/ReactiveWorker.cs diff --git a/DTLib.csproj b/DTLib.csproj index 52c0406..3da3b06 100644 --- a/DTLib.csproj +++ b/DTLib.csproj @@ -51,6 +51,10 @@ + + + + diff --git a/Reactive/ReactiveListener.cs b/Reactive/ReactiveListener.cs index e1698ee..e6a3595 100644 --- a/Reactive/ReactiveListener.cs +++ b/Reactive/ReactiveListener.cs @@ -4,30 +4,46 @@ using System.Threading.Tasks; namespace DTLib.Reactive { - public class ReactiveListener : ReactiveWorker + public class ReactiveListener : ReactiveProvider { public ReactiveListener() { } public ReactiveListener(ReactiveStream stream) : base(stream) { } - public ReactiveListener(IEnumerable> streams) : base(streams) { } - public Action ElementAddedHandler; - public void SetHandler(Action handler) => ReactiveWorkerMutex.Execute(() => ElementAddedHandler=handler); - public async Task ElementAdded(object s, T e) => await Task.Run(() => ElementAddedHandler?.Invoke(s, e)); + public EventHandlerAsync ElementAddedHandler; + public void SetHandler(EventHandlerAsync handler) + { + lock (Stream) ElementAddedHandler = handler; + } + public async Task ElementAdded(T e) => await Task.Run(() => ElementAddedHandler?.Invoke(e)); - public override void Join(ReactiveStream stream) => - ReactiveWorkerMutex.Execute(() => - { - Streams.Add(stream); - stream.ElementAdded+=ElementAdded; - }); + public override void Join(ReactiveStream stream) + { + base.Join(stream); + lock (Stream) stream.ElementAddedEvent += ElementAdded; + } - public override void Leave(ReactiveStream stream) => - ReactiveWorkerMutex.Execute(() => - { - Streams.Remove(stream); - stream.ElementAdded-=ElementAdded; - }); + public override void Leave(ReactiveStream stream) + { + base.Leave(stream); + lock (Stream) stream.ElementAddedEvent -= ElementAdded; + } - //public T GetElement() + public T GetFirstElement() => Stream[0]; + public T GetLastElement() => Stream[Stream.Length - 1]; + + public T FindOne(Func condition) => + /*foreach (T el in Stream) +if (condition(el)) +return el;*/ + default; + + public List FindAll(Func condition) + { + List elements = new(); + /*foreach (T el in Stream) + if (condition(el)) + elements.Add(el);*/ + return elements; + } } } diff --git a/Reactive/ReactiveProvider.cs b/Reactive/ReactiveProvider.cs index 4436b92..4f01c1a 100644 --- a/Reactive/ReactiveProvider.cs +++ b/Reactive/ReactiveProvider.cs @@ -1,28 +1,28 @@ -using System; -using System.Collections.Generic; - -namespace DTLib.Reactive +namespace DTLib.Reactive { - public class ReactiveProvider : ReactiveWorker + public abstract class ReactiveProvider { + protected ReactiveStream Stream + { + get + { lock (_stream) return _stream; } + set + { lock (_stream) _stream = value; } + } + protected ReactiveStream _stream; public ReactiveProvider() { } - public ReactiveProvider(ReactiveStream stream) : base(stream) { } - public ReactiveProvider(IEnumerable> streams) : base(streams) { } - event Action AnnounceEvent; - public void Announce(T e) => ReactiveWorkerMutex.Execute(() => AnnounceEvent.Invoke(e)); + public ReactiveProvider(ReactiveStream stream) => Join(stream); - public override void Join(ReactiveStream stream) => ReactiveWorkerMutex.Execute(() => - { - Streams.Add(stream); - AnnounceEvent+=stream.Add; - }); + public virtual void Join(ReactiveStream stream) + { + lock (Stream) Stream = stream; + } - public override void Leave(ReactiveStream stream) => ReactiveWorkerMutex.Execute(() => - { - Streams.Remove(stream); - AnnounceEvent-=stream.Add; - }); + public virtual void Leave(ReactiveStream stream) + { + lock (Stream) Stream = null; + } } } diff --git a/Reactive/ReactiveSender.cs b/Reactive/ReactiveSender.cs new file mode 100644 index 0000000..81f8849 --- /dev/null +++ b/Reactive/ReactiveSender.cs @@ -0,0 +1,14 @@ +namespace DTLib.Reactive +{ + public class ReactiveSender : ReactiveProvider + { + + public ReactiveSender() { } + public ReactiveSender(ReactiveStream stream) : base(stream) { } + + public void Send(T e) + { + lock (Stream) Stream.Add(e); + } + } +} diff --git a/Reactive/ReactiveStream.cs b/Reactive/ReactiveStream.cs index cc301a0..5592ad4 100644 --- a/Reactive/ReactiveStream.cs +++ b/Reactive/ReactiveStream.cs @@ -1,25 +1,38 @@ -using System; -using System.Collections.Generic; +using System.Collections.Generic; namespace DTLib.Reactive { public class ReactiveStream { - List<(long time, T value)> Storage = new(); - public event EventHandlerAsync ElementAdded; - SafeMutex StorageMutex = new(); - public int Length => StorageMutex.Execute(() => Storage.Count); - public ReactiveStream() { } - public void Add(T elem) + List _storage = new(); + List Storage { - StorageMutex.Execute(() => Storage.Add((DateTime.Now.Ticks, elem))); - ElementAdded?.Invoke(this, elem); + get + { lock (Storage) return _storage; } } - public void Get(int index) => StorageMutex.Execute(() => Storage[index]); + public int Length + { + get + { lock (Storage) return Storage.Count; } + } - public void Clear() => StorageMutex.Execute(() => Storage.Clear()); + public T this[int index] + { + get + { lock (Storage) return Storage[index]; } + } + + internal event EventHandlerAsync ElementAddedEvent; + + internal void Add(T elem) + { + lock (Storage) Storage.Add(elem); + ElementAddedEvent?.Invoke(elem); + } + + internal void Clear() { lock (Storage) Storage.Clear(); } } -} +} \ No newline at end of file diff --git a/Reactive/ReactiveWorker.cs b/Reactive/ReactiveWorker.cs deleted file mode 100644 index fe7e98c..0000000 --- a/Reactive/ReactiveWorker.cs +++ /dev/null @@ -1,20 +0,0 @@ -using System.Collections.Generic; - -namespace DTLib.Reactive -{ - public abstract class ReactiveWorker - { - protected List> Streams = new(); - protected SafeMutex ReactiveWorkerMutex = new(); - - public ReactiveWorker() { } - - public ReactiveWorker(ReactiveStream stream) => Join(stream); - - public ReactiveWorker(IEnumerable> streams) => - ReactiveWorkerMutex.Execute(() => { foreach(ReactiveStream stream in streams) Join(stream); }); - - public abstract void Join(ReactiveStream stream); - public abstract void Leave(ReactiveStream stream); - } -}