From ea54bed80d0bcc0c3a0d0f881a726a6c03e3795f Mon Sep 17 00:00:00 2001 From: Timerix22 Date: Tue, 2 Nov 2021 01:01:20 +0300 Subject: [PATCH] reactivity --- DTLib.csproj | 2 +- EventHandlerAsync.cs | 8 +++- FrameworkFix.cs | 15 ++++++ Reactive/ReactiveListener.cs | 91 ++++++++++++++++++++++++++++-------- Reactive/ReactiveProvider.cs | 25 ++++++---- Reactive/ReactiveSender.cs | 8 +++- Reactive/ReactiveStream.cs | 68 ++++++++++++++++++++------- Reactive/TimeSignedObject.cs | 16 +++++++ 8 files changed, 183 insertions(+), 50 deletions(-) create mode 100644 Reactive/TimeSignedObject.cs diff --git a/DTLib.csproj b/DTLib.csproj index 3da3b06..069cfa3 100644 --- a/DTLib.csproj +++ b/DTLib.csproj @@ -55,6 +55,7 @@ + @@ -62,7 +63,6 @@ - diff --git a/EventHandlerAsync.cs b/EventHandlerAsync.cs index 76b26df..5af2df5 100644 --- a/EventHandlerAsync.cs +++ b/EventHandlerAsync.cs @@ -2,6 +2,10 @@ namespace DTLib { - public delegate Task EventHandlerAsync(TEventArgs e); - public delegate Task EventHandlerAsync(); + // по идее это нужно, чтоб делать так: SomeEvent?.Invoke().Wait() + public delegate Task EventHandlerAsyncDelegate(); + public delegate Task EventHandlerAsyncDelegate(T e); + public delegate Task EventHandlerAsyncDelegate(T0 e0, T1 e1); + public delegate Task EventHandlerAsyncDelegate(T0 e0, T1 e1, T2 e2); + public delegate Task EventHandlerAsyncDelegate(T0 e0, T1 e1, T2 e2, T3 e3); } diff --git a/FrameworkFix.cs b/FrameworkFix.cs index f9768c2..4188646 100644 --- a/FrameworkFix.cs +++ b/FrameworkFix.cs @@ -152,6 +152,13 @@ namespace DTLib b.Append(input); return b.ToString(); } + public static string Multiply(this char input, int howMany) + { + StringBuilder b = new(); + for (int i = 0; i < howMany; i++) + b.Append(input); + return b.ToString(); + } public static void Throw(this Exception ex) => throw ex; @@ -221,5 +228,13 @@ namespace DTLib { if (input is null) if_true(); } + + public static string AddZeroes(this T number, int length) + { + var str = number.ToString(); + //var diff = str.Length -length ; + //if (diff > 0) + return Multiply('0', str.Length - length) + str; + } } } \ No newline at end of file diff --git a/Reactive/ReactiveListener.cs b/Reactive/ReactiveListener.cs index e6a3595..ae23eb7 100644 --- a/Reactive/ReactiveListener.cs +++ b/Reactive/ReactiveListener.cs @@ -8,42 +8,93 @@ namespace DTLib.Reactive { public ReactiveListener() { } public ReactiveListener(ReactiveStream stream) : base(stream) { } + public ReactiveListener(ICollection> streams) : base(streams) { } - public EventHandlerAsync ElementAddedHandler; - public void SetHandler(EventHandlerAsync handler) - { - lock (Stream) ElementAddedHandler = handler; - } - public async Task ElementAdded(T e) => await Task.Run(() => ElementAddedHandler?.Invoke(e)); + + public event Action, T> ElementAddedEvent; + void ElementAdded(ReactiveStream stream, TimeSignedObject e) => ElementAdded(stream, e.Value); + void ElementAdded(ReactiveStream stream, T e) => + Task.Run(() => ElementAddedEvent?.Invoke(stream, e)); public override void Join(ReactiveStream stream) { base.Join(stream); - lock (Stream) stream.ElementAddedEvent += ElementAdded; + lock (Streams) stream.ElementAddedEvent += ElementAdded; } public override void Leave(ReactiveStream stream) { base.Leave(stream); - lock (Stream) stream.ElementAddedEvent -= ElementAdded; + lock (Streams) stream.ElementAddedEvent -= ElementAdded; } - public T GetFirstElement() => Stream[0]; - public T GetLastElement() => Stream[Stream.Length - 1]; + public T GetFirst() + { + if (Streams.Count == 0) throw new Exception("ReactiveListener is not connected to any streams"); + TimeSignedObject rezult = null; + foreach (ReactiveStream stream in Streams) + if (stream.Count != 0) + { + TimeSignedObject e = stream[0]; + if (rezult is null) rezult = e; + else if (rezult.Time > e.Time) rezult = e; + } + return rezult.Value; + } + public T GetLast() + { + if (Streams.Count == 0) throw new Exception("ReactiveListener is not connected to any streams"); + TimeSignedObject rezult = null; + foreach (ReactiveStream stream in Streams) + if (stream.Count != 0) + { + TimeSignedObject e = stream[stream.Count - 1]; + if (rezult is null) rezult = e; + else if (rezult.Time < e.Time) rezult = e; + } + return rezult.Value; + } - public T FindOne(Func condition) => - /*foreach (T el in Stream) -if (condition(el)) -return el;*/ - default; + public T FindOne(Func condition) + { + if (Streams.Count == 0) throw new Exception("ReactiveListener is not connected to any streams"); + foreach (ReactiveStream stream in Streams) + foreach (TimeSignedObject el in stream) + if (condition(el.Value)) + return el.Value; + return default; + } + + public TimeSignedObject FindOne(Func, bool> condition) + { + if (Streams.Count == 0) throw new Exception("ReactiveListener is not connected to any streams"); + foreach (ReactiveStream stream in Streams) + foreach (TimeSignedObject el in stream) + if (condition(el)) + return el; + return default; + } public List FindAll(Func condition) { - List elements = new(); - /*foreach (T el in Stream) - if (condition(el)) - elements.Add(el);*/ - return elements; + if (Streams.Count == 0) throw new Exception("ReactiveListener is not connected to any streams"); + List rezults = new(); + foreach (ReactiveStream stream in Streams) + foreach (TimeSignedObject el in stream) + if (condition(el.Value)) + rezults.Add(el.Value); + return rezults; + } + + public List> FindAll(Func, bool> condition) + { + if (Streams.Count == 0) throw new Exception("ReactiveListener is not connected to any streams"); + List> rezults = new(); + foreach (ReactiveStream stream in Streams) + foreach (TimeSignedObject el in stream) + if (condition(el)) + rezults.Add(el); + return rezults; } } } diff --git a/Reactive/ReactiveProvider.cs b/Reactive/ReactiveProvider.cs index 4f01c1a..fbb182c 100644 --- a/Reactive/ReactiveProvider.cs +++ b/Reactive/ReactiveProvider.cs @@ -1,28 +1,35 @@ -namespace DTLib.Reactive +using System; +using System.Collections.Generic; +using System.Linq; + +namespace DTLib.Reactive { public abstract class ReactiveProvider { - protected ReactiveStream Stream + protected List> Streams { get - { lock (_stream) return _stream; } + { lock (_streams) return _streams; } set - { lock (_stream) _stream = value; } + { lock (_streams) _streams = value; } } - protected ReactiveStream _stream; + private List> _streams = new(); public ReactiveProvider() { } - - public ReactiveProvider(ReactiveStream stream) => Join(stream); + public ReactiveProvider(ReactiveStream stream) => Streams.Add(stream); + public ReactiveProvider(ICollection> streams) => Streams = streams.ToList(); public virtual void Join(ReactiveStream stream) { - lock (Stream) Stream = stream; + if (IsConnetcedTo(stream)) throw new Exception("ReactiveListener is already connected to the stream"); + Streams.Add(stream); } public virtual void Leave(ReactiveStream stream) { - lock (Stream) Stream = null; + if (!Streams.Remove(stream)) throw new Exception("ReactiveListener is not connected to the stream"); } + + public bool IsConnetcedTo(ReactiveStream stream) => Streams.Contains(stream); } } diff --git a/Reactive/ReactiveSender.cs b/Reactive/ReactiveSender.cs index 81f8849..0d687d6 100644 --- a/Reactive/ReactiveSender.cs +++ b/Reactive/ReactiveSender.cs @@ -1,14 +1,18 @@ -namespace DTLib.Reactive +using System.Collections.Generic; + +namespace DTLib.Reactive { public class ReactiveSender : ReactiveProvider { public ReactiveSender() { } public ReactiveSender(ReactiveStream stream) : base(stream) { } + public ReactiveSender(ICollection> streams) : base(streams) { } public void Send(T e) { - lock (Stream) Stream.Add(e); + foreach (ReactiveStream s in Streams) + s.Add(e); } } } diff --git a/Reactive/ReactiveStream.cs b/Reactive/ReactiveStream.cs index 5592ad4..4b7080d 100644 --- a/Reactive/ReactiveStream.cs +++ b/Reactive/ReactiveStream.cs @@ -1,38 +1,74 @@ -using System.Collections.Generic; +using System; +using System.Collections; +using System.Collections.Generic; namespace DTLib.Reactive { - public class ReactiveStream + public class ReactiveStream : IEnumerable>, IList> { public ReactiveStream() { } - List _storage = new(); - List Storage + List> _storage = new(); + List> Storage { get - { lock (Storage) return _storage; } + { lock (_storage) return _storage; } } - public int Length + public int Count => Storage.Count; + + public TimeSignedObject this[int index] { - get - { lock (Storage) return Storage.Count; } + get => Storage[index]; + set => throw new NotImplementedException(); } - public T this[int index] + public event Action, TimeSignedObject> ElementAddedEvent; + public void Add(TimeSignedObject elem) { - get - { lock (Storage) return Storage[index]; } + Storage.Add(elem); + ElementAddedEvent?.Invoke(this, elem); } + public void Add(T elem) => Add(new TimeSignedObject(elem)); - internal event EventHandlerAsync ElementAddedEvent; + public void Clear() => Storage.Clear(); + public int IndexOf(TimeSignedObject item) => Storage.IndexOf(item); + public bool Contains(TimeSignedObject item) => Storage.Contains(item); - internal void Add(T elem) + public IEnumerator> GetEnumerator() => new Enumerator(Storage); + IEnumerator IEnumerable.GetEnumerator() => new Enumerator(Storage); + + struct Enumerator : IEnumerator> { - lock (Storage) Storage.Add(elem); - ElementAddedEvent?.Invoke(elem); + public Enumerator(List> storage) + { + _storage = storage; + _index = storage.Count - 1; + } + + List> _storage; + int _index; + public TimeSignedObject Current => _storage[_index]; + object IEnumerator.Current => Current; + + public void Dispose() => _storage = null; + + public bool MoveNext() + { + if (_index < 0) + return false; + _index--; + return true; + } + + public void Reset() => _index = _storage.Count - 1; } - internal void Clear() { lock (Storage) Storage.Clear(); } + bool ICollection>.IsReadOnly { get; } = false; + + public void Insert(int index, TimeSignedObject item) => throw new NotImplementedException(); + public void RemoveAt(int index) => throw new NotImplementedException(); + public void CopyTo(TimeSignedObject[] array, int arrayIndex) => throw new NotImplementedException(); + public bool Remove(TimeSignedObject item) => throw new NotImplementedException(); } } \ No newline at end of file diff --git a/Reactive/TimeSignedObject.cs b/Reactive/TimeSignedObject.cs new file mode 100644 index 0000000..443d233 --- /dev/null +++ b/Reactive/TimeSignedObject.cs @@ -0,0 +1,16 @@ +using System; + +namespace DTLib.Reactive +{ + public class TimeSignedObject + { + public T Value { get; init; } + public long Time { get; init; } + + public TimeSignedObject(T value) + { + Value = value; + Time = DateTime.Now.Ticks; + } + } +}