ClientConnection changes

This commit is contained in:
Timerix 2025-07-05 01:21:18 +03:00
parent bf4924c4d6
commit 1b89a75ecb
6 changed files with 121 additions and 51 deletions

View File

@ -1,21 +1,28 @@
using System.Net; using System.Net;
using System.Net.Quic; using System.Net.Quic;
using System.Runtime.InteropServices;
using DTLib.Logging; using DTLib.Logging;
using Meum.Core.Messages;
namespace Meum.Client; namespace Meum.Client;
public class ServerConnection : IAsyncDisposable public class ServerConnection : IAsyncDisposable
{ {
private readonly QuicConnection _quicConnection; private readonly QuicConnection _quicConnection;
private readonly QuicStreamWrapper _systemStream;
private readonly ILogger _logger; private readonly ILogger _logger;
public DnsEndPoint ServerEndPoint { get; } public DnsEndPoint ServerEndPoint { get; }
private ServerConnection(QuicConnection quicConnection, DnsEndPoint serverEndPoint, ILogger logger) private ServerConnection(QuicConnection quicConnection,
DnsEndPoint serverEndPoint,
QuicStreamWrapper systemStream,
ILogger logger)
{ {
ServerEndPoint = serverEndPoint; ServerEndPoint = serverEndPoint;
_quicConnection = quicConnection; _quicConnection = quicConnection;
_systemStream = systemStream;
_logger = logger; _logger = logger;
} }
@ -24,12 +31,30 @@ public class ServerConnection : IAsyncDisposable
ILogger logger, ILogger logger,
CancellationToken ct) CancellationToken ct)
{ {
var serverConnection = new ServerConnection(quicConnection, serverEndPoint, logger);
var systemStream = await quicConnection.OpenStreamAsync(QuicStreamType.Bidirectional, ct); var systemStream = await quicConnection.OpenStreamAsync(QuicStreamType.Bidirectional, ct);
var serverConnection = new ServerConnection(quicConnection, serverEndPoint, systemStream, logger);
await systemStream.SendPingReceivePong(); await systemStream.SendPingReceivePong();
return serverConnection; return serverConnection;
} }
public async Task Register()
{
await _systemStream.WriteStructAsync(new DataMessageHeader(
MessageTypeCode.RegistrationRequest,
Marshal.SizeOf<RegistrationRequest>()));
var registrationRequest = new RegistrationRequest();
await _systemStream.WriteStructAsync(registrationRequest);
var responseHeader = await _systemStream.ReadDataMessageHeaderAsync();
if(responseHeader.type_code != MessageTypeCode.RegistrationResponse)
throw new Exception($"Invalid response header type: {responseHeader.type_code}");
}
public async Task Authorize()
{
}
public override int GetHashCode() public override int GetHashCode()
{ {

View File

@ -0,0 +1,9 @@
using System.Runtime.InteropServices;
namespace Meum.Core.Messages;
[StructLayout(LayoutKind.Sequential)]
public record struct RegistrationRequest(byte[] hash)
{
}

View File

@ -17,7 +17,7 @@ public class QuicStreamWrapper : IAsyncDisposable
public async ValueTask<T> ReadStructAsync<T>(CancellationToken ct = default) public async ValueTask<T> ReadStructAsync<T>(CancellationToken ct = default)
where T : struct where T : struct
{ {
byte[] buffer = ArrayPool<byte>.Shared.Rent(Marshal.SizeOf(typeof(T))); byte[] buffer = ArrayPool<byte>.Shared.Rent(Marshal.SizeOf<T>());
var handle = GCHandle.Alloc(buffer, GCHandleType.Pinned); var handle = GCHandle.Alloc(buffer, GCHandleType.Pinned);
try try
{ {
@ -34,7 +34,7 @@ public class QuicStreamWrapper : IAsyncDisposable
public ValueTask WriteStructAsync<T>(T msg_struct, CancellationToken ct = default) public ValueTask WriteStructAsync<T>(T msg_struct, CancellationToken ct = default)
where T : struct where T : struct
{ {
byte[] buffer = ArrayPool<byte>.Shared.Rent(Marshal.SizeOf(typeof(T))); byte[] buffer = ArrayPool<byte>.Shared.Rent(Marshal.SizeOf<T>());
var handle = GCHandle.Alloc(buffer, GCHandleType.Pinned); var handle = GCHandle.Alloc(buffer, GCHandleType.Pinned);
try try
{ {

View File

@ -8,48 +8,84 @@ public class ClientConnection : IAsyncDisposable
{ {
private readonly QuicConnection _quicConnection; private readonly QuicConnection _quicConnection;
private QuicStreamWrapper _systemStream; private QuicStreamWrapper _systemStream;
private ILogger _logger; private ContextLogger _logger;
private CancellationTokenSource _connectionCts = new();
private ClientConnection(QuicConnection quicConnection, QuicStreamWrapper systemStream, ILogger logger) public int Id { get; }
public bool IsAuthorized { get; private set; }
private ClientConnection(int id, QuicConnection quicConnection, QuicStreamWrapper systemStream, ILogger logger)
{ {
Id = id;
_quicConnection = quicConnection; _quicConnection = quicConnection;
_systemStream = systemStream; _systemStream = systemStream;
_logger = logger; _logger = new ContextLogger($"Connection-{id}", logger);
} }
public static async Task<ClientConnection> OpenAsync( public static async Task<ClientConnection> OpenAsync(int id,
QuicConnection quicConnection, QuicConnection quicConnection, ILogger logger, CancellationToken ct)
ILogger logger,
CancellationToken ct)
{ {
var systemStream = await quicConnection.AcceptStreamAsync(QuicStreamType.Bidirectional, ct); var systemStream = await quicConnection.AcceptStreamAsync(QuicStreamType.Bidirectional, ct);
await systemStream.ReceivePingSendPong(); await systemStream.ReceivePingSendPong();
var clientConnection = new ClientConnection(quicConnection, systemStream, logger); var clientConnection = new ClientConnection(id, quicConnection, systemStream, logger);
clientConnection.HandleRequestsAsync(ct);
DataMessageHeader header = await systemStream.ReadDataMessageHeaderAsync(ct);
switch (header.type_code)
{
case MessageTypeCode.RegistrationRequest:
break;
case MessageTypeCode.AuthorizationRequest:
// if (authorized)
// clientConnection.HandleClientRequestsAsync()
break;
default:
throw new Exception($"New connection sent unexpected message: {header.type_code}");
}
return clientConnection; return clientConnection;
} }
private async void HandleRequestsAsync(CancellationToken ct)
// private async void HandleClientRequestsAsync(CancellationToken ct = default) {
// { try
// {
// } while (!_connectionCts.IsCancellationRequested)
{
try
{
var header = await _systemStream.ReadDataMessageHeaderAsync(ct);
if(IsAuthorized)
HandleAuthorizedRequest(header);
else HandleUnauthorizedRequest(header);
}
catch (Exception ex)
{
_logger.LogWarn(ex);
}
}
await DisposeAsync();
}
catch (Exception ex)
{
_logger.LogError(ex);
}
}
private void HandleUnauthorizedRequest(DataMessageHeader header)
{
switch (header.type_code)
{
default:
throw new Exception($"New connection sent unexpected message: {header.type_code}");
case MessageTypeCode.RegistrationRequest:
// TODO: registration
break;
case MessageTypeCode.AuthorizationRequest:
// TODO: authorization
IsAuthorized = true;
break;
}
}
private void HandleAuthorizedRequest(DataMessageHeader header)
{
switch (header.type_code)
{
default:
throw new Exception($"New connection sent unexpected message: {header.type_code}");
}
}
public override int GetHashCode() => Id;
public async ValueTask DisposeAsync() public async ValueTask DisposeAsync()
{ {
await _quicConnection.DisposeAsync(); await _quicConnection.DisposeAsync();

View File

@ -19,7 +19,9 @@ static class Program
try try
{ {
var config = ServerConfig.LoadOrCreate(config_path); var config = ServerConfig.LoadOrCreate(config_path);
var logger = new ConsoleLogger(); var logger = new CompositeLogger(
new ConsoleLogger(),
new FileLogger("logs", "meum-server"));
Functions.InitMsQuic(logger); Functions.InitMsQuic(logger);
var server = new Server(config, logger); var server = new Server(config, logger);
@ -29,11 +31,10 @@ static class Program
try try
{ {
var conn = await server.AcceptConnectionAsync(); var conn = await server.AcceptConnectionAsync();
} }
catch (Exception ex) catch (Exception ex)
{ {
logger.LogError("Main", ex.ToStringDemystified()); logger.LogError("MainLoop", ex);
} }
} }
} }

View File

@ -12,7 +12,7 @@ public class Server
private readonly X509Certificate _certificate; private readonly X509Certificate _certificate;
private readonly ILogger _logger; private readonly ILogger _logger;
private QuicListener? _listener; private QuicListener? _listener;
private int _lastConnectionId;
public Server(ServerConfig config, ILogger logger, X509Certificate? certificate = null) public Server(ServerConfig config, ILogger logger, X509Certificate? certificate = null)
{ {
@ -50,17 +50,16 @@ public class Server
if (_listener == null) if (_listener == null)
throw new Exception("Server is not listening"); throw new Exception("Server is not listening");
while (true) ct.ThrowIfCancellationRequested();
{ var quicConnection = await _listener.AcceptConnectionAsync(ct);
ct.ThrowIfCancellationRequested(); var timeOutCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
var quicConnection = await _listener.AcceptConnectionAsync(ct); timeOutCts.CancelAfter(Constants.ConnectionTimeout);
var timeOutCts = CancellationTokenSource.CreateLinkedTokenSource(ct); var clientConnection = await ClientConnection.OpenAsync(
timeOutCts.CancelAfter(Constants.ConnectionTimeout); _lastConnectionId,
var clientConnection = await ClientConnection.OpenAsync( quicConnection,
quicConnection, _logger,
_logger, timeOutCts.Token);
timeOutCts.Token); Interlocked.Increment(ref _lastConnectionId);
return clientConnection; return clientConnection;
}
} }
} }