experimental
This commit is contained in:
100
Experimental/Reactive/ReactiveListener.cs
Normal file
100
Experimental/Reactive/ReactiveListener.cs
Normal file
@@ -0,0 +1,100 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace DTLib.Reactive
|
||||
{
|
||||
public class ReactiveListener<T> : ReactiveProvider<T>
|
||||
{
|
||||
public ReactiveListener() { }
|
||||
public ReactiveListener(ReactiveStream<T> stream) : base(stream) { }
|
||||
public ReactiveListener(ICollection<ReactiveStream<T>> streams) : base(streams) { }
|
||||
|
||||
|
||||
public event Action<ReactiveStream<T>, T> ElementAddedEvent;
|
||||
void ElementAdded(ReactiveStream<T> stream, TimeSignedObject<T> e) => ElementAdded(stream, e.Value);
|
||||
void ElementAdded(ReactiveStream<T> stream, T e) =>
|
||||
Task.Run(() => ElementAddedEvent?.Invoke(stream, e));
|
||||
|
||||
public override void Join(ReactiveStream<T> stream)
|
||||
{
|
||||
base.Join(stream);
|
||||
lock (Streams) stream.ElementAddedEvent += ElementAdded;
|
||||
}
|
||||
|
||||
public override void Leave(ReactiveStream<T> stream)
|
||||
{
|
||||
base.Leave(stream);
|
||||
lock (Streams) stream.ElementAddedEvent -= ElementAdded;
|
||||
}
|
||||
|
||||
public T GetFirst()
|
||||
{
|
||||
if (Streams.Count == 0) throw new Exception("ReactiveListener is not connected to any streams");
|
||||
TimeSignedObject<T> rezult = null;
|
||||
foreach (ReactiveStream<T> stream in Streams)
|
||||
if (stream.Count != 0)
|
||||
{
|
||||
TimeSignedObject<T> e = stream[0];
|
||||
if (rezult is null) rezult = e;
|
||||
else if (rezult.Time > e.Time) rezult = e;
|
||||
}
|
||||
return rezult.Value;
|
||||
}
|
||||
public T GetLast()
|
||||
{
|
||||
if (Streams.Count == 0) throw new Exception("ReactiveListener is not connected to any streams");
|
||||
TimeSignedObject<T> rezult = null;
|
||||
foreach (ReactiveStream<T> stream in Streams)
|
||||
if (stream.Count != 0)
|
||||
{
|
||||
TimeSignedObject<T> e = stream[stream.Count - 1];
|
||||
if (rezult is null) rezult = e;
|
||||
else if (rezult.Time < e.Time) rezult = e;
|
||||
}
|
||||
return rezult.Value;
|
||||
}
|
||||
|
||||
public T FindOne(Func<T, bool> condition)
|
||||
{
|
||||
if (Streams.Count == 0) throw new Exception("ReactiveListener is not connected to any streams");
|
||||
foreach (ReactiveStream<T> stream in Streams)
|
||||
foreach (TimeSignedObject<T> el in stream)
|
||||
if (condition(el.Value))
|
||||
return el.Value;
|
||||
return default;
|
||||
}
|
||||
|
||||
public TimeSignedObject<T> FindOne(Func<TimeSignedObject<T>, bool> condition)
|
||||
{
|
||||
if (Streams.Count == 0) throw new Exception("ReactiveListener is not connected to any streams");
|
||||
foreach (ReactiveStream<T> stream in Streams)
|
||||
foreach (TimeSignedObject<T> el in stream)
|
||||
if (condition(el))
|
||||
return el;
|
||||
return default;
|
||||
}
|
||||
|
||||
public List<T> FindAll(Func<T, bool> condition)
|
||||
{
|
||||
if (Streams.Count == 0) throw new Exception("ReactiveListener is not connected to any streams");
|
||||
List<T> rezults = new();
|
||||
foreach (ReactiveStream<T> stream in Streams)
|
||||
foreach (TimeSignedObject<T> el in stream)
|
||||
if (condition(el.Value))
|
||||
rezults.Add(el.Value);
|
||||
return rezults;
|
||||
}
|
||||
|
||||
public List<TimeSignedObject<T>> FindAll(Func<TimeSignedObject<T>, bool> condition)
|
||||
{
|
||||
if (Streams.Count == 0) throw new Exception("ReactiveListener is not connected to any streams");
|
||||
List<TimeSignedObject<T>> rezults = new();
|
||||
foreach (ReactiveStream<T> stream in Streams)
|
||||
foreach (TimeSignedObject<T> el in stream)
|
||||
if (condition(el))
|
||||
rezults.Add(el);
|
||||
return rezults;
|
||||
}
|
||||
}
|
||||
}
|
||||
35
Experimental/Reactive/ReactiveProvider.cs
Normal file
35
Experimental/Reactive/ReactiveProvider.cs
Normal file
@@ -0,0 +1,35 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
|
||||
namespace DTLib.Reactive
|
||||
{
|
||||
public abstract class ReactiveProvider<T>
|
||||
{
|
||||
protected List<ReactiveStream<T>> Streams
|
||||
{
|
||||
get
|
||||
{ lock (_streams) return _streams; }
|
||||
set
|
||||
{ lock (_streams) _streams = value; }
|
||||
}
|
||||
private List<ReactiveStream<T>> _streams = new();
|
||||
|
||||
public ReactiveProvider() { }
|
||||
public ReactiveProvider(ReactiveStream<T> stream) => Streams.Add(stream);
|
||||
public ReactiveProvider(ICollection<ReactiveStream<T>> streams) => Streams = streams.ToList();
|
||||
|
||||
public virtual void Join(ReactiveStream<T> stream)
|
||||
{
|
||||
if (IsConnetcedTo(stream)) throw new Exception("ReactiveListener is already connected to the stream");
|
||||
Streams.Add(stream);
|
||||
}
|
||||
|
||||
public virtual void Leave(ReactiveStream<T> stream)
|
||||
{
|
||||
if (!Streams.Remove(stream)) throw new Exception("ReactiveListener is not connected to the stream");
|
||||
}
|
||||
|
||||
public bool IsConnetcedTo(ReactiveStream<T> stream) => Streams.Contains(stream);
|
||||
}
|
||||
}
|
||||
18
Experimental/Reactive/ReactiveSender.cs
Normal file
18
Experimental/Reactive/ReactiveSender.cs
Normal file
@@ -0,0 +1,18 @@
|
||||
using System.Collections.Generic;
|
||||
|
||||
namespace DTLib.Reactive
|
||||
{
|
||||
public class ReactiveSender<T> : ReactiveProvider<T>
|
||||
{
|
||||
|
||||
public ReactiveSender() { }
|
||||
public ReactiveSender(ReactiveStream<T> stream) : base(stream) { }
|
||||
public ReactiveSender(ICollection<ReactiveStream<T>> streams) : base(streams) { }
|
||||
|
||||
public void Send(T e)
|
||||
{
|
||||
foreach (ReactiveStream<T> s in Streams)
|
||||
s.Add(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
74
Experimental/Reactive/ReactiveStream.cs
Normal file
74
Experimental/Reactive/ReactiveStream.cs
Normal file
@@ -0,0 +1,74 @@
|
||||
using System;
|
||||
using System.Collections;
|
||||
using System.Collections.Generic;
|
||||
|
||||
namespace DTLib.Reactive
|
||||
{
|
||||
public class ReactiveStream<T> : IEnumerable<TimeSignedObject<T>>, IList<TimeSignedObject<T>>
|
||||
{
|
||||
public ReactiveStream() { }
|
||||
|
||||
List<TimeSignedObject<T>> _storage = new();
|
||||
List<TimeSignedObject<T>> Storage
|
||||
{
|
||||
get
|
||||
{ lock (_storage) return _storage; }
|
||||
}
|
||||
|
||||
public int Count => Storage.Count;
|
||||
|
||||
public TimeSignedObject<T> this[int index]
|
||||
{
|
||||
get => Storage[index];
|
||||
set => throw new NotImplementedException();
|
||||
}
|
||||
|
||||
public event Action<ReactiveStream<T>, TimeSignedObject<T>> ElementAddedEvent;
|
||||
public void Add(TimeSignedObject<T> elem)
|
||||
{
|
||||
Storage.Add(elem);
|
||||
ElementAddedEvent?.Invoke(this, elem);
|
||||
}
|
||||
public void Add(T elem) => Add(new TimeSignedObject<T>(elem));
|
||||
|
||||
public void Clear() => Storage.Clear();
|
||||
public int IndexOf(TimeSignedObject<T> item) => Storage.IndexOf(item);
|
||||
public bool Contains(TimeSignedObject<T> item) => Storage.Contains(item);
|
||||
|
||||
public IEnumerator<TimeSignedObject<T>> GetEnumerator() => new Enumerator(Storage);
|
||||
IEnumerator IEnumerable.GetEnumerator() => new Enumerator(Storage);
|
||||
|
||||
struct Enumerator : IEnumerator<TimeSignedObject<T>>
|
||||
{
|
||||
public Enumerator(List<TimeSignedObject<T>> storage)
|
||||
{
|
||||
_storage = storage;
|
||||
_index = storage.Count - 1;
|
||||
}
|
||||
|
||||
List<TimeSignedObject<T>> _storage;
|
||||
int _index;
|
||||
public TimeSignedObject<T> Current => _storage[_index];
|
||||
object IEnumerator.Current => Current;
|
||||
|
||||
public void Dispose() => _storage = null;
|
||||
|
||||
public bool MoveNext()
|
||||
{
|
||||
if (_index < 0)
|
||||
return false;
|
||||
_index--;
|
||||
return true;
|
||||
}
|
||||
|
||||
public void Reset() => _index = _storage.Count - 1;
|
||||
}
|
||||
|
||||
bool ICollection<TimeSignedObject<T>>.IsReadOnly { get; } = false;
|
||||
|
||||
public void Insert(int index, TimeSignedObject<T> item) => throw new NotImplementedException();
|
||||
public void RemoveAt(int index) => throw new NotImplementedException();
|
||||
public void CopyTo(TimeSignedObject<T>[] array, int arrayIndex) => throw new NotImplementedException();
|
||||
public bool Remove(TimeSignedObject<T> item) => throw new NotImplementedException();
|
||||
}
|
||||
}
|
||||
16
Experimental/Reactive/TimeSignedObject.cs
Normal file
16
Experimental/Reactive/TimeSignedObject.cs
Normal file
@@ -0,0 +1,16 @@
|
||||
using System;
|
||||
|
||||
namespace DTLib.Reactive
|
||||
{
|
||||
public class TimeSignedObject<T>
|
||||
{
|
||||
public T Value { get; init; }
|
||||
public long Time { get; init; }
|
||||
|
||||
public TimeSignedObject(T value)
|
||||
{
|
||||
Value = value;
|
||||
Time = DateTime.Now.Ticks;
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user