diff --git a/DTLib.csproj b/DTLib.csproj
index 3da3b06..069cfa3 100644
--- a/DTLib.csproj
+++ b/DTLib.csproj
@@ -55,6 +55,7 @@
+
@@ -62,7 +63,6 @@
-
diff --git a/EventHandlerAsync.cs b/EventHandlerAsync.cs
index 76b26df..5af2df5 100644
--- a/EventHandlerAsync.cs
+++ b/EventHandlerAsync.cs
@@ -2,6 +2,10 @@
namespace DTLib
{
- public delegate Task EventHandlerAsync(TEventArgs e);
- public delegate Task EventHandlerAsync();
+ // по идее это нужно, чтоб делать так: SomeEvent?.Invoke().Wait()
+ public delegate Task EventHandlerAsyncDelegate();
+ public delegate Task EventHandlerAsyncDelegate(T e);
+ public delegate Task EventHandlerAsyncDelegate(T0 e0, T1 e1);
+ public delegate Task EventHandlerAsyncDelegate(T0 e0, T1 e1, T2 e2);
+ public delegate Task EventHandlerAsyncDelegate(T0 e0, T1 e1, T2 e2, T3 e3);
}
diff --git a/FrameworkFix.cs b/FrameworkFix.cs
index f9768c2..4188646 100644
--- a/FrameworkFix.cs
+++ b/FrameworkFix.cs
@@ -152,6 +152,13 @@ namespace DTLib
b.Append(input);
return b.ToString();
}
+ public static string Multiply(this char input, int howMany)
+ {
+ StringBuilder b = new();
+ for (int i = 0; i < howMany; i++)
+ b.Append(input);
+ return b.ToString();
+ }
public static void Throw(this Exception ex) => throw ex;
@@ -221,5 +228,13 @@ namespace DTLib
{
if (input is null) if_true();
}
+
+ public static string AddZeroes(this T number, int length)
+ {
+ var str = number.ToString();
+ //var diff = str.Length -length ;
+ //if (diff > 0)
+ return Multiply('0', str.Length - length) + str;
+ }
}
}
\ No newline at end of file
diff --git a/Reactive/ReactiveListener.cs b/Reactive/ReactiveListener.cs
index e6a3595..ae23eb7 100644
--- a/Reactive/ReactiveListener.cs
+++ b/Reactive/ReactiveListener.cs
@@ -8,42 +8,93 @@ namespace DTLib.Reactive
{
public ReactiveListener() { }
public ReactiveListener(ReactiveStream stream) : base(stream) { }
+ public ReactiveListener(ICollection> streams) : base(streams) { }
- public EventHandlerAsync ElementAddedHandler;
- public void SetHandler(EventHandlerAsync handler)
- {
- lock (Stream) ElementAddedHandler = handler;
- }
- public async Task ElementAdded(T e) => await Task.Run(() => ElementAddedHandler?.Invoke(e));
+
+ public event Action, T> ElementAddedEvent;
+ void ElementAdded(ReactiveStream stream, TimeSignedObject e) => ElementAdded(stream, e.Value);
+ void ElementAdded(ReactiveStream stream, T e) =>
+ Task.Run(() => ElementAddedEvent?.Invoke(stream, e));
public override void Join(ReactiveStream stream)
{
base.Join(stream);
- lock (Stream) stream.ElementAddedEvent += ElementAdded;
+ lock (Streams) stream.ElementAddedEvent += ElementAdded;
}
public override void Leave(ReactiveStream stream)
{
base.Leave(stream);
- lock (Stream) stream.ElementAddedEvent -= ElementAdded;
+ lock (Streams) stream.ElementAddedEvent -= ElementAdded;
}
- public T GetFirstElement() => Stream[0];
- public T GetLastElement() => Stream[Stream.Length - 1];
+ public T GetFirst()
+ {
+ if (Streams.Count == 0) throw new Exception("ReactiveListener is not connected to any streams");
+ TimeSignedObject rezult = null;
+ foreach (ReactiveStream stream in Streams)
+ if (stream.Count != 0)
+ {
+ TimeSignedObject e = stream[0];
+ if (rezult is null) rezult = e;
+ else if (rezult.Time > e.Time) rezult = e;
+ }
+ return rezult.Value;
+ }
+ public T GetLast()
+ {
+ if (Streams.Count == 0) throw new Exception("ReactiveListener is not connected to any streams");
+ TimeSignedObject rezult = null;
+ foreach (ReactiveStream stream in Streams)
+ if (stream.Count != 0)
+ {
+ TimeSignedObject e = stream[stream.Count - 1];
+ if (rezult is null) rezult = e;
+ else if (rezult.Time < e.Time) rezult = e;
+ }
+ return rezult.Value;
+ }
- public T FindOne(Func condition) =>
- /*foreach (T el in Stream)
-if (condition(el))
-return el;*/
- default;
+ public T FindOne(Func condition)
+ {
+ if (Streams.Count == 0) throw new Exception("ReactiveListener is not connected to any streams");
+ foreach (ReactiveStream stream in Streams)
+ foreach (TimeSignedObject el in stream)
+ if (condition(el.Value))
+ return el.Value;
+ return default;
+ }
+
+ public TimeSignedObject FindOne(Func, bool> condition)
+ {
+ if (Streams.Count == 0) throw new Exception("ReactiveListener is not connected to any streams");
+ foreach (ReactiveStream stream in Streams)
+ foreach (TimeSignedObject el in stream)
+ if (condition(el))
+ return el;
+ return default;
+ }
public List FindAll(Func condition)
{
- List elements = new();
- /*foreach (T el in Stream)
- if (condition(el))
- elements.Add(el);*/
- return elements;
+ if (Streams.Count == 0) throw new Exception("ReactiveListener is not connected to any streams");
+ List rezults = new();
+ foreach (ReactiveStream stream in Streams)
+ foreach (TimeSignedObject el in stream)
+ if (condition(el.Value))
+ rezults.Add(el.Value);
+ return rezults;
+ }
+
+ public List> FindAll(Func, bool> condition)
+ {
+ if (Streams.Count == 0) throw new Exception("ReactiveListener is not connected to any streams");
+ List> rezults = new();
+ foreach (ReactiveStream stream in Streams)
+ foreach (TimeSignedObject el in stream)
+ if (condition(el))
+ rezults.Add(el);
+ return rezults;
}
}
}
diff --git a/Reactive/ReactiveProvider.cs b/Reactive/ReactiveProvider.cs
index 4f01c1a..fbb182c 100644
--- a/Reactive/ReactiveProvider.cs
+++ b/Reactive/ReactiveProvider.cs
@@ -1,28 +1,35 @@
-namespace DTLib.Reactive
+using System;
+using System.Collections.Generic;
+using System.Linq;
+
+namespace DTLib.Reactive
{
public abstract class ReactiveProvider
{
- protected ReactiveStream Stream
+ protected List> Streams
{
get
- { lock (_stream) return _stream; }
+ { lock (_streams) return _streams; }
set
- { lock (_stream) _stream = value; }
+ { lock (_streams) _streams = value; }
}
- protected ReactiveStream _stream;
+ private List> _streams = new();
public ReactiveProvider() { }
-
- public ReactiveProvider(ReactiveStream stream) => Join(stream);
+ public ReactiveProvider(ReactiveStream stream) => Streams.Add(stream);
+ public ReactiveProvider(ICollection> streams) => Streams = streams.ToList();
public virtual void Join(ReactiveStream stream)
{
- lock (Stream) Stream = stream;
+ if (IsConnetcedTo(stream)) throw new Exception("ReactiveListener is already connected to the stream");
+ Streams.Add(stream);
}
public virtual void Leave(ReactiveStream stream)
{
- lock (Stream) Stream = null;
+ if (!Streams.Remove(stream)) throw new Exception("ReactiveListener is not connected to the stream");
}
+
+ public bool IsConnetcedTo(ReactiveStream stream) => Streams.Contains(stream);
}
}
diff --git a/Reactive/ReactiveSender.cs b/Reactive/ReactiveSender.cs
index 81f8849..0d687d6 100644
--- a/Reactive/ReactiveSender.cs
+++ b/Reactive/ReactiveSender.cs
@@ -1,14 +1,18 @@
-namespace DTLib.Reactive
+using System.Collections.Generic;
+
+namespace DTLib.Reactive
{
public class ReactiveSender : ReactiveProvider
{
public ReactiveSender() { }
public ReactiveSender(ReactiveStream stream) : base(stream) { }
+ public ReactiveSender(ICollection> streams) : base(streams) { }
public void Send(T e)
{
- lock (Stream) Stream.Add(e);
+ foreach (ReactiveStream s in Streams)
+ s.Add(e);
}
}
}
diff --git a/Reactive/ReactiveStream.cs b/Reactive/ReactiveStream.cs
index 5592ad4..4b7080d 100644
--- a/Reactive/ReactiveStream.cs
+++ b/Reactive/ReactiveStream.cs
@@ -1,38 +1,74 @@
-using System.Collections.Generic;
+using System;
+using System.Collections;
+using System.Collections.Generic;
namespace DTLib.Reactive
{
- public class ReactiveStream
+ public class ReactiveStream : IEnumerable>, IList>
{
public ReactiveStream() { }
- List _storage = new();
- List Storage
+ List> _storage = new();
+ List> Storage
{
get
- { lock (Storage) return _storage; }
+ { lock (_storage) return _storage; }
}
- public int Length
+ public int Count => Storage.Count;
+
+ public TimeSignedObject this[int index]
{
- get
- { lock (Storage) return Storage.Count; }
+ get => Storage[index];
+ set => throw new NotImplementedException();
}
- public T this[int index]
+ public event Action, TimeSignedObject> ElementAddedEvent;
+ public void Add(TimeSignedObject elem)
{
- get
- { lock (Storage) return Storage[index]; }
+ Storage.Add(elem);
+ ElementAddedEvent?.Invoke(this, elem);
}
+ public void Add(T elem) => Add(new TimeSignedObject(elem));
- internal event EventHandlerAsync ElementAddedEvent;
+ public void Clear() => Storage.Clear();
+ public int IndexOf(TimeSignedObject item) => Storage.IndexOf(item);
+ public bool Contains(TimeSignedObject item) => Storage.Contains(item);
- internal void Add(T elem)
+ public IEnumerator> GetEnumerator() => new Enumerator(Storage);
+ IEnumerator IEnumerable.GetEnumerator() => new Enumerator(Storage);
+
+ struct Enumerator : IEnumerator>
{
- lock (Storage) Storage.Add(elem);
- ElementAddedEvent?.Invoke(elem);
+ public Enumerator(List> storage)
+ {
+ _storage = storage;
+ _index = storage.Count - 1;
+ }
+
+ List> _storage;
+ int _index;
+ public TimeSignedObject Current => _storage[_index];
+ object IEnumerator.Current => Current;
+
+ public void Dispose() => _storage = null;
+
+ public bool MoveNext()
+ {
+ if (_index < 0)
+ return false;
+ _index--;
+ return true;
+ }
+
+ public void Reset() => _index = _storage.Count - 1;
}
- internal void Clear() { lock (Storage) Storage.Clear(); }
+ bool ICollection>.IsReadOnly { get; } = false;
+
+ public void Insert(int index, TimeSignedObject item) => throw new NotImplementedException();
+ public void RemoveAt(int index) => throw new NotImplementedException();
+ public void CopyTo(TimeSignedObject[] array, int arrayIndex) => throw new NotImplementedException();
+ public bool Remove(TimeSignedObject item) => throw new NotImplementedException();
}
}
\ No newline at end of file
diff --git a/Reactive/TimeSignedObject.cs b/Reactive/TimeSignedObject.cs
new file mode 100644
index 0000000..443d233
--- /dev/null
+++ b/Reactive/TimeSignedObject.cs
@@ -0,0 +1,16 @@
+using System;
+
+namespace DTLib.Reactive
+{
+ public class TimeSignedObject
+ {
+ public T Value { get; init; }
+ public long Time { get; init; }
+
+ public TimeSignedObject(T value)
+ {
+ Value = value;
+ Time = DateTime.Now.Ticks;
+ }
+ }
+}