using System; using System.Collections.Generic; using System.Threading.Tasks; namespace DTLib.Reactive { public class ReactiveListener : ReactiveProvider { public ReactiveListener() { } public ReactiveListener(ReactiveStream stream) : base(stream) { } public ReactiveListener(ICollection> streams) : base(streams) { } public event Action, T> ElementAddedEvent; void ElementAdded(ReactiveStream stream, TimeSignedObject e) => ElementAdded(stream, e.Value); void ElementAdded(ReactiveStream stream, T e) => Task.Run(() => ElementAddedEvent?.Invoke(stream, e)); public override void Join(ReactiveStream stream) { base.Join(stream); lock (Streams) stream.ElementAddedEvent += ElementAdded; } public override void Leave(ReactiveStream stream) { base.Leave(stream); lock (Streams) stream.ElementAddedEvent -= ElementAdded; } public T GetFirst() { if (Streams.Count == 0) throw new Exception("ReactiveListener is not connected to any streams"); TimeSignedObject rezult = null; foreach (ReactiveStream stream in Streams) if (stream.Count != 0) { TimeSignedObject e = stream[0]; if (rezult is null) rezult = e; else if (rezult.Time > e.Time) rezult = e; } return rezult.Value; } public T GetLast() { if (Streams.Count == 0) throw new Exception("ReactiveListener is not connected to any streams"); TimeSignedObject rezult = null; foreach (ReactiveStream stream in Streams) if (stream.Count != 0) { TimeSignedObject e = stream[stream.Count - 1]; if (rezult is null) rezult = e; else if (rezult.Time < e.Time) rezult = e; } return rezult.Value; } public T FindOne(Func condition) { if (Streams.Count == 0) throw new Exception("ReactiveListener is not connected to any streams"); foreach (ReactiveStream stream in Streams) foreach (TimeSignedObject el in stream) if (condition(el.Value)) return el.Value; return default; } public TimeSignedObject FindOne(Func, bool> condition) { if (Streams.Count == 0) throw new Exception("ReactiveListener is not connected to any streams"); foreach (ReactiveStream stream in Streams) foreach (TimeSignedObject el in stream) if (condition(el)) return el; return default; } public List FindAll(Func condition) { if (Streams.Count == 0) throw new Exception("ReactiveListener is not connected to any streams"); List rezults = new(); foreach (ReactiveStream stream in Streams) foreach (TimeSignedObject el in stream) if (condition(el.Value)) rezults.Add(el.Value); return rezults; } public List> FindAll(Func, bool> condition) { if (Streams.Count == 0) throw new Exception("ReactiveListener is not connected to any streams"); List> rezults = new(); foreach (ReactiveStream stream in Streams) foreach (TimeSignedObject el in stream) if (condition(el)) rezults.Add(el); return rezults; } } }