using System; using System.Collections.Generic; using System.Threading.Tasks; namespace DTLib.Reactive { public class ReactiveListener : ReactiveWorker { public ReactiveListener() { } public ReactiveListener(ReactiveStream stream) : base(stream) { } public ReactiveListener(IEnumerable> streams) : base(streams) { } public Action ElementAddedHandler; public void SetHandler(Action handler) => ReactiveWorkerMutex.Execute(() => ElementAddedHandler=handler); public async Task ElementAdded(object s, T e) => await Task.Run(() => ElementAddedHandler?.Invoke(s, e)); public override void Join(ReactiveStream stream) => ReactiveWorkerMutex.Execute(() => { Streams.Add(stream); stream.ElementAdded+=ElementAdded; }); public override void Leave(ReactiveStream stream) => ReactiveWorkerMutex.Execute(() => { Streams.Remove(stream); stream.ElementAdded-=ElementAdded; }); } }