some reactive shit

This commit is contained in:
Timerix22 2021-10-30 13:48:13 +03:00
parent 5566170210
commit 2ed8d5d9e8
6 changed files with 97 additions and 70 deletions

View File

@ -51,6 +51,10 @@
<Compile Include="Network\OldNetwork.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="Hasher.cs" />
<Compile Include="Reactive\ReactiveListener.cs" />
<Compile Include="Reactive\ReactiveSender.cs" />
<Compile Include="Reactive\ReactiveStream.cs" />
<Compile Include="Reactive\ReactiveProvider.cs" />
<Compile Include="SecureRandom.cs" />
<Compile Include="FrameworkFix.cs" />
<Compile Include="TImer.cs" />

View File

@ -4,30 +4,46 @@ using System.Threading.Tasks;
namespace DTLib.Reactive
{
public class ReactiveListener<T> : ReactiveWorker<T>
public class ReactiveListener<T> : ReactiveProvider<T>
{
public ReactiveListener() { }
public ReactiveListener(ReactiveStream<T> stream) : base(stream) { }
public ReactiveListener(IEnumerable<ReactiveStream<T>> streams) : base(streams) { }
public Action<object, T> ElementAddedHandler;
public void SetHandler(Action<object, T> handler) => ReactiveWorkerMutex.Execute(() => ElementAddedHandler=handler);
public async Task ElementAdded(object s, T e) => await Task.Run(() => ElementAddedHandler?.Invoke(s, e));
public EventHandlerAsync<T> ElementAddedHandler;
public void SetHandler(EventHandlerAsync<T> handler)
{
lock (Stream) ElementAddedHandler = handler;
}
public async Task ElementAdded(T e) => await Task.Run(() => ElementAddedHandler?.Invoke(e));
public override void Join(ReactiveStream<T> stream) =>
ReactiveWorkerMutex.Execute(() =>
{
Streams.Add(stream);
stream.ElementAdded+=ElementAdded;
});
public override void Join(ReactiveStream<T> stream)
{
base.Join(stream);
lock (Stream) stream.ElementAddedEvent += ElementAdded;
}
public override void Leave(ReactiveStream<T> stream) =>
ReactiveWorkerMutex.Execute(() =>
{
Streams.Remove(stream);
stream.ElementAdded-=ElementAdded;
});
public override void Leave(ReactiveStream<T> stream)
{
base.Leave(stream);
lock (Stream) stream.ElementAddedEvent -= ElementAdded;
}
//public T GetElement()
public T GetFirstElement() => Stream[0];
public T GetLastElement() => Stream[Stream.Length - 1];
public T FindOne(Func<T, bool> condition) =>
/*foreach (T el in Stream)
if (condition(el))
return el;*/
default;
public List<T> FindAll(Func<T, bool> condition)
{
List<T> elements = new();
/*foreach (T el in Stream)
if (condition(el))
elements.Add(el);*/
return elements;
}
}
}

View File

@ -1,28 +1,28 @@
using System;
using System.Collections.Generic;
namespace DTLib.Reactive
namespace DTLib.Reactive
{
public class ReactiveProvider<T> : ReactiveWorker<T>
public abstract class ReactiveProvider<T>
{
protected ReactiveStream<T> Stream
{
get
{ lock (_stream) return _stream; }
set
{ lock (_stream) _stream = value; }
}
protected ReactiveStream<T> _stream;
public ReactiveProvider() { }
public ReactiveProvider(ReactiveStream<T> stream) : base(stream) { }
public ReactiveProvider(IEnumerable<ReactiveStream<T>> streams) : base(streams) { }
event Action<T> AnnounceEvent;
public void Announce(T e) => ReactiveWorkerMutex.Execute(() => AnnounceEvent.Invoke(e));
public ReactiveProvider(ReactiveStream<T> stream) => Join(stream);
public override void Join(ReactiveStream<T> stream) => ReactiveWorkerMutex.Execute(() =>
{
Streams.Add(stream);
AnnounceEvent+=stream.Add;
});
public virtual void Join(ReactiveStream<T> stream)
{
lock (Stream) Stream = stream;
}
public override void Leave(ReactiveStream<T> stream) => ReactiveWorkerMutex.Execute(() =>
{
Streams.Remove(stream);
AnnounceEvent-=stream.Add;
});
public virtual void Leave(ReactiveStream<T> stream)
{
lock (Stream) Stream = null;
}
}
}

View File

@ -0,0 +1,14 @@
namespace DTLib.Reactive
{
public class ReactiveSender<T> : ReactiveProvider<T>
{
public ReactiveSender() { }
public ReactiveSender(ReactiveStream<T> stream) : base(stream) { }
public void Send(T e)
{
lock (Stream) Stream.Add(e);
}
}
}

View File

@ -1,25 +1,38 @@
using System;
using System.Collections.Generic;
using System.Collections.Generic;
namespace DTLib.Reactive
{
public class ReactiveStream<T>
{
List<(long time, T value)> Storage = new();
public event EventHandlerAsync<T> ElementAdded;
SafeMutex StorageMutex = new();
public int Length => StorageMutex.Execute(() => Storage.Count);
public ReactiveStream() { }
public void Add(T elem)
List<T> _storage = new();
List<T> Storage
{
StorageMutex.Execute(() => Storage.Add((DateTime.Now.Ticks, elem)));
ElementAdded?.Invoke(this, elem);
get
{ lock (Storage) return _storage; }
}
public void Get(int index) => StorageMutex.Execute(() => Storage[index]);
public int Length
{
get
{ lock (Storage) return Storage.Count; }
}
public void Clear() => StorageMutex.Execute(() => Storage.Clear());
public T this[int index]
{
get
{ lock (Storage) return Storage[index]; }
}
internal event EventHandlerAsync<T> ElementAddedEvent;
internal void Add(T elem)
{
lock (Storage) Storage.Add(elem);
ElementAddedEvent?.Invoke(elem);
}
internal void Clear() { lock (Storage) Storage.Clear(); }
}
}

View File

@ -1,20 +0,0 @@
using System.Collections.Generic;
namespace DTLib.Reactive
{
public abstract class ReactiveWorker<T>
{
protected List<ReactiveStream<T>> Streams = new();
protected SafeMutex ReactiveWorkerMutex = new();
public ReactiveWorker() { }
public ReactiveWorker(ReactiveStream<T> stream) => Join(stream);
public ReactiveWorker(IEnumerable<ReactiveStream<T>> streams) =>
ReactiveWorkerMutex.Execute(() => { foreach(ReactiveStream<T> stream in streams) Join(stream); });
public abstract void Join(ReactiveStream<T> stream);
public abstract void Leave(ReactiveStream<T> stream);
}
}