REFACTORING
This commit is contained in:
@@ -11,25 +11,21 @@ namespace DTLib.Reactive
|
||||
public ReactiveListener(IEnumerable<ReactiveStream<T>> streams) : base(streams) { }
|
||||
|
||||
public Action<object, T> ElementAddedHandler;
|
||||
public void SetHandler(Action<object, T> handler) => ReactiveWorkerMutex.Execute(() => { ElementAddedHandler = handler; });
|
||||
public void SetHandler(Action<object, T> handler) => ReactiveWorkerMutex.Execute(() => ElementAddedHandler=handler);
|
||||
public async Task ElementAdded(object s, T e) => await Task.Run(() => ElementAddedHandler?.Invoke(s, e));
|
||||
|
||||
public override void Join(ReactiveStream<T> stream)
|
||||
{
|
||||
public override void Join(ReactiveStream<T> stream) =>
|
||||
ReactiveWorkerMutex.Execute(() =>
|
||||
{
|
||||
Streams.Add(stream);
|
||||
stream.ElementAdded += ElementAdded;
|
||||
stream.ElementAdded+=ElementAdded;
|
||||
});
|
||||
}
|
||||
|
||||
public override void Leave(ReactiveStream<T> stream)
|
||||
{
|
||||
public override void Leave(ReactiveStream<T> stream) =>
|
||||
ReactiveWorkerMutex.Execute(() =>
|
||||
{
|
||||
Streams.Remove(stream);
|
||||
stream.ElementAdded -= ElementAdded;
|
||||
stream.ElementAdded-=ElementAdded;
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -11,27 +11,18 @@ namespace DTLib.Reactive
|
||||
public ReactiveProvider(IEnumerable<ReactiveStream<T>> streams) : base(streams) { }
|
||||
|
||||
event Action<T> AnnounceEvent;
|
||||
public void Announce(T e)
|
||||
{
|
||||
ReactiveWorkerMutex.Execute(() => AnnounceEvent.Invoke(e));
|
||||
}
|
||||
public void Announce(T e) => ReactiveWorkerMutex.Execute(() => AnnounceEvent.Invoke(e));
|
||||
|
||||
public override void Join(ReactiveStream<T> stream)
|
||||
{
|
||||
ReactiveWorkerMutex.Execute(() =>
|
||||
{
|
||||
Streams.Add(stream);
|
||||
AnnounceEvent += stream.Add;
|
||||
});
|
||||
}
|
||||
public override void Join(ReactiveStream<T> stream) => ReactiveWorkerMutex.Execute(() =>
|
||||
{
|
||||
Streams.Add(stream);
|
||||
AnnounceEvent+=stream.Add;
|
||||
});
|
||||
|
||||
public override void Leave(ReactiveStream<T> stream)
|
||||
{
|
||||
ReactiveWorkerMutex.Execute(() =>
|
||||
{
|
||||
Streams.Remove(stream);
|
||||
AnnounceEvent -= stream.Add;
|
||||
});
|
||||
}
|
||||
public override void Leave(ReactiveStream<T> stream) => ReactiveWorkerMutex.Execute(() =>
|
||||
{
|
||||
Streams.Remove(stream);
|
||||
AnnounceEvent-=stream.Add;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Collections.Generic;
|
||||
|
||||
namespace DTLib.Reactive
|
||||
{
|
||||
@@ -8,7 +7,7 @@ namespace DTLib.Reactive
|
||||
List<T> Storage = new();
|
||||
public event EventHandlerAsync<T> ElementAdded;
|
||||
SafeMutex StorageMutex = new();
|
||||
public int Length { get { return StorageMutex.Execute(() => Storage.Count); } }
|
||||
public int Length => StorageMutex.Execute(() => Storage.Count);
|
||||
|
||||
public ReactiveStream() { }
|
||||
|
||||
|
||||
@@ -12,7 +12,7 @@ namespace DTLib.Reactive
|
||||
public ReactiveWorker(ReactiveStream<T> stream) => Join(stream);
|
||||
|
||||
public ReactiveWorker(IEnumerable<ReactiveStream<T>> streams) =>
|
||||
ReactiveWorkerMutex.Execute(() => { foreach (var stream in streams) Join(stream); });
|
||||
ReactiveWorkerMutex.Execute(() => { foreach(ReactiveStream<T> stream in streams) Join(stream); });
|
||||
|
||||
public abstract void Join(ReactiveStream<T> stream);
|
||||
public abstract void Leave(ReactiveStream<T> stream);
|
||||
|
||||
Reference in New Issue
Block a user