reactivity development is continuing

This commit is contained in:
Timerix 2021-10-04 16:40:24 +03:00
parent 89da4c951c
commit c7f3deab57
6 changed files with 66 additions and 34 deletions

View File

@ -1,8 +1,4 @@
using System; using System.Threading.Tasks;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace DTLib namespace DTLib
{ {

View File

@ -11,7 +11,7 @@ namespace DTLib.Network
// //
public class FSP public class FSP
{ {
Socket mainSocket { get; init; } Socket mainSocket { get; init; }
static public bool debug = false; static public bool debug = false;
public FSP(Socket _mainSocket) => mainSocket = _mainSocket; public FSP(Socket _mainSocket) => mainSocket = _mainSocket;

View File

@ -1,21 +1,34 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks; using System.Threading.Tasks;
namespace DTLib.Reactive namespace DTLib.Reactive
{ {
public class ReactiveListener<T> : ReactiveWorker<T> public class ReactiveListener<T> : ReactiveWorker<T>
{ {
public ReactiveListener() public ReactiveListener() { }
public ReactiveListener(ReactiveStream<T> stream) : base(stream) { }
public ReactiveListener(IEnumerable<ReactiveStream<T>> streams) : base(streams) { }
public Action<object, T> ElementAddedHandler;
public void SetHandler(Action<object, T> handler) => ReactiveWorkerMutex.Execute(() => { ElementAddedHandler = handler; });
public async Task ElementAdded(object s, T e) => await Task.Run(() => ElementAddedHandler?.Invoke(s, e));
public override void Join(ReactiveStream<T> stream)
{ {
StreamCollectionAccess.Execute(() => ReactiveWorkerMutex.Execute(() =>
{ {
foreach(var stream in Streams) Streams.Add(stream);
{ stream.ElementAdded += ElementAdded;
stream.ElementAdded += async (sender, value) => { await Task.Run(() =>{ }); }; });
} }
public override void Leave(ReactiveStream<T> stream)
{
ReactiveWorkerMutex.Execute(() =>
{
Streams.Remove(stream);
stream.ElementAdded -= ElementAdded;
}); });
} }
} }

View File

@ -1,13 +1,37 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace DTLib.Reactive namespace DTLib.Reactive
{ {
public class ReactiveProvider<T> : ReactiveWorker<T> public class ReactiveProvider<T> : ReactiveWorker<T>
{ {
public ReactiveProvider() { }
public ReactiveProvider(ReactiveStream<T> stream) : base(stream) { }
public ReactiveProvider(IEnumerable<ReactiveStream<T>> streams) : base(streams) { }
event Action<T> AnnounceEvent;
public void Announce(T e)
{
ReactiveWorkerMutex.Execute(()=>AnnounceEvent.Invoke(e));
}
public override void Join(ReactiveStream<T> stream)
{
ReactiveWorkerMutex.Execute(() =>
{
Streams.Add(stream);
AnnounceEvent += stream.Add;
});
}
public override void Leave(ReactiveStream<T> stream)
{
ReactiveWorkerMutex.Execute(() =>
{
Streams.Remove(stream);
AnnounceEvent -= stream.Add;
});
}
} }
} }

View File

@ -1,9 +1,5 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Collections;
using System.Collections.ObjectModel;
using System.Text;
using System.Threading.Tasks;
namespace DTLib.Reactive namespace DTLib.Reactive
{ {
@ -13,20 +9,21 @@ namespace DTLib.Reactive
public event EventHandlerAsync<T> ElementAdded; public event EventHandlerAsync<T> ElementAdded;
bool StoreData = false; bool StoreData = false;
SafeMutex StorageAccess = new(); SafeMutex StorageMutex = new();
public ReactiveStream() { } public ReactiveStream() { }
public ReactiveStream(bool storeData) => StoreData = storeData; public ReactiveStream(bool storeData) => StoreData = storeData;
public void Add(T elem) public void Add(T elem)
{ {
if (StoreData) StorageAccess.Execute(() => Storage.Add(elem)); if (StoreData) StorageMutex.Execute(() => Storage.Add(elem));
ElementAdded?.Invoke(this, elem); ElementAdded?.Invoke(this, elem);
} }
public void Clear() public void Clear()
{ {
if (StoreData) StorageAccess.Execute(() => Storage.Clear()); if (StoreData) StorageMutex.Execute(() => Storage.Clear());
else throw new Exception("Can't clear ReactiveStream because StoreData==false");
} }
} }
} }

View File

@ -1,18 +1,20 @@
using System; using System.Collections.Generic;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace DTLib.Reactive namespace DTLib.Reactive
{ {
public abstract class ReactiveWorker<T> public abstract class ReactiveWorker<T>
{ {
protected List<ReactiveStream<T>> Streams = new(); protected List<ReactiveStream<T>> Streams = new();
protected SafeMutex ReactiveWorkerMutex = new();
protected SafeMutex StreamCollectionAccess = new(); public ReactiveWorker() { }
public void Join(ReactiveStream<T> stream) => StreamCollectionAccess.Execute(()=>Streams.Add(stream)); public ReactiveWorker(ReactiveStream<T> stream) => Join(stream);
public void Leave(ReactiveStream<T> stream) => StreamCollectionAccess.Execute(() => Streams.Remove(stream));
public ReactiveWorker(IEnumerable<ReactiveStream<T>> streams) =>
ReactiveWorkerMutex.Execute(() => { foreach (var stream in streams) Join(stream); });
public abstract void Join(ReactiveStream<T> stream);
public abstract void Leave(ReactiveStream<T> stream);
} }
} }