pipes and tcp listener

This commit is contained in:
Timerix22 2024-04-08 07:38:56 +05:00
parent a7bfca5f50
commit a3da186e3f
6 changed files with 258 additions and 38 deletions

View File

@ -12,15 +12,15 @@ void ListenerUDP_construct(ListenerUDP* ptr, knIPV4Endpoint listener_end){
ptr->connector_end = knIPV4Endpoint_INVALID; ptr->connector_end = knIPV4Endpoint_INVALID;
} }
void ListenerUDP_start(ListenerUDP* ptr){
}
void ListenerUDP_destruct(ListenerUDP* ptr){ void ListenerUDP_destruct(ListenerUDP* ptr){
__PipeStorage_destruct(&ptr->pipes); __PipeStorage_destruct(&ptr->pipes);
tryLast(knSocketUDP_close(ptr->main_sock), _m864, ;); tryLast(knSocketUDP_close(ptr->main_sock), _m864, ;);
} }
void ListenerUDP_start(ListenerUDP* ptr){
}
//////////////////////////////////////// ////////////////////////////////////////
// ListenerTCP // // ListenerTCP //
//////////////////////////////////////// ////////////////////////////////////////
@ -38,6 +38,66 @@ void ListenerTCP_destruct(ListenerTCP* ptr){
tryLast(knSocketTCP_close(ptr->main_sock), _m864, ;); tryLast(knSocketTCP_close(ptr->main_sock), _m864, ;);
} }
void ListenerTCP_start(ListenerTCP* ptr){ /// receives data while receivedCount < N
///@return Maybe<void>
Maybe knSocketTCP_receiveN(knSocketTCP* socket, char* buf, u32 bufsize, u32 n){
if(bufsize < n)
safethrow(ERR_UNEXPECTEDVAL, ;);
u32 receivedTotal = 0;
while(receivedTotal < n) {
try(knSocketTCP_receive(socket, buf, n-receivedTotal), m_receivedCount, ;);
receivedTotal += m_receivedCount.value.UInt64;
}
return MaybeNull;
}
void ListenerTCP_start(ListenerTCP* ptr){
const char logCtx[] = "ListenerTCP";
char buf[65535];
const u32 bufsize = sizeof(buf);
InternalPacket internal_pac;
IncomingPacket incoming_pac;
OutgoingPacket outgoing_pac;
int connection_n = 0;
while(true) {
Maybe m = knSocketTCP_accept(ptr->main_sock);
if(m.errmsg){
logExceptionAsWarning(logCtx, m);
continue;
}
knSocketTCP* connection_socket = m.value.VoidPtr;
m = knSocketTCP_receiveN(connection_socket, buf, bufsize, sizeof(InternalPacket));
if(m.errmsg){
logExceptionAsWarning(logCtx, m);
m = knSocketTCP_shutdown(connection_socket, knShutdownType_Both);
if(m.errmsg)
logExceptionAsWarning(logCtx, m);
m = knSocketTCP_close(connection_socket);
if(m.errmsg)
logExceptionAsWarning(logCtx, m);
continue;
}
if(!InternalPacket_tryParse(buf, bufsize, &internal_pac)){
logInfo(logCtx, "ListenerTCP received something unexpected");
m = knSocketTCP_shutdown(connection_socket, knShutdownType_Both);
if(m.errmsg)
logExceptionAsWarning(logCtx, m);
m = knSocketTCP_close(connection_socket);
if(m.errmsg)
logExceptionAsWarning(logCtx, m);
continue;
}
if(internal_pac.data.U64 != InternalPacketMessage_ConnectorHandshake){
logInfo(logCtx, "received InternalPacket with invalid data: %lu", internal_pac.data);
}
int attemptN = 0;
while(attemptN < 20){
// do pipes here
}
logWarning(logCtx, "connector is unreachable");
}
sleepMsec(500);
} }

44
src/Packets.c Normal file
View File

@ -0,0 +1,44 @@
#include "port-tunnel.h"
const MagicUnion InternalPacket_magic = {.array="prtun-N"};
const MagicUnion IncomingPacket_magic = {.array="prtun-I"};
const MagicUnion OutgoingPacket_magic = {.array="prtun-O"};
bool InternalPacket_tryParse(const char* data, u32 data_length, InternalPacket* pac_ptr){
if(data_length < sizeof(InternalPacket))
return false;
InternalPacket pac = *(InternalPacket*)(void*)data;
if(pac.data.U64 != InternalPacket_magic.U64)
return false;
*pac_ptr = pac;
return true;
}
bool IncomingPacket_tryParse(const char* data, u32 data_length, IncomingPacket* pac_ptr){
if(data_length < sizeof(IncomingPacketHeader))
return false;
IncomingPacketHeader header = *(IncomingPacketHeader*)(void*)data;
if(header.data.U64 != IncomingPacket_magic.U64)
return false;
if(knIPV4Endpoint_isINVALID(header.real_sender_end))
return false;
pac_ptr->header = header;
pac_ptr->payload = data + sizeof(IncomingPacketHeader);
return true;
}
bool OutgoingPacket_tryParse(const char* data, u32 data_length, OutgoingPacket* pac_ptr){
if(data_length < sizeof(OutgoingPacketHeader))
return false;
OutgoingPacketHeader header = *(OutgoingPacketHeader*)(void*)data;
if(header.data.U64 != OutgoingPacket_magic.U64)
return false;
if(knIPV4Endpoint_isINVALID(header.real_destination_end))
return false;
pac_ptr->header = header;
pac_ptr->payload = data + sizeof(OutgoingPacketHeader);
return true;
}

View File

@ -23,10 +23,10 @@ void PipeUDP_construct(PipeUDP* pipe, knSocketUDP* pipe_in_sock, knIPV4Endpoint
void PipeUDP_destruct(PipeUDP* pipe){ void PipeUDP_destruct(PipeUDP* pipe){
Maybe m = knSocketUDP_shutdown(pipe->pipe_in_sock, knShutdownType_Both); Maybe m = knSocketUDP_shutdown(pipe->pipe_in_sock, knShutdownType_Both);
if(m.errmsg) if(m.errmsg)
logError("%s", m.errmsg); logExceptionAsError("PipeUDP", m)
m = knSocketUDP_close(pipe->pipe_in_sock); m = knSocketUDP_close(pipe->pipe_in_sock);
if(m.errmsg) if(m.errmsg)
logError("%s", m.errmsg); logExceptionAsError("PipeUDP", m);
} }
void PipeUDP_startAsync(PipeUDP* pipe){ void PipeUDP_startAsync(PipeUDP* pipe){
@ -41,19 +41,29 @@ void PipeUDP_stop(PipeUDP* pipe){
// PipeTCP // // PipeTCP //
//////////////////////////////////////// ////////////////////////////////////////
void PipeTCP_construct(PipeTCP* pipe, knSocketTCP* pipe_in_sock, knIPV4Endpoint pipe_out_end){ Maybe PipeTCP_construct(PipeTCP* pipe, knSocketTCP* pipe_in_sock, knIPV4Endpoint pipe_out_end){
PipeBase_construct(&pipe->base); PipeBase_construct(&pipe->base);
pipe->pipe_in_sock = pipe_in_sock; pipe->pipe_in_sock = pipe_in_sock;
pipe->pipe_out_end = pipe_out_end; try(knSocketTCP_open(false), _m_out_sock, ;);
pipe->pipe_out_sock = _m_out_sock.value.VoidPtr;
try(knSocketTCP_connect(pipe->pipe_out_sock, pipe_out_end), _m17576, knSocketTCP_close(pipe->pipe_out_sock));
return MaybeNull;
} }
void PipeTCP_destruct(PipeTCP* pipe){ void PipeTCP_destruct(PipeTCP* pipe){
Maybe m = knSocketTCP_shutdown(pipe->pipe_in_sock, knShutdownType_Both); Maybe m = knSocketTCP_shutdown(pipe->pipe_in_sock, knShutdownType_Both);
if(m.errmsg) if(m.errmsg)
logError("%s", m.errmsg); logExceptionAsError("PipeTCP", m);
m = knSocketTCP_close(pipe->pipe_in_sock); m = knSocketTCP_close(pipe->pipe_in_sock);
if(m.errmsg) if(m.errmsg)
logError("%s", m.errmsg); logExceptionAsError("PipeTCP", m);
m = knSocketTCP_shutdown(pipe->pipe_out_sock, knShutdownType_Both);
if(m.errmsg)
logExceptionAsError("PipeTCP", m);
m = knSocketTCP_close(pipe->pipe_out_sock);
if(m.errmsg)
logExceptionAsError("PipeTCP", m);
} }
void PipeTCP_startAsync(PipeTCP* pipe){ void PipeTCP_startAsync(PipeTCP* pipe){

View File

@ -9,7 +9,13 @@ kt_define(ListenerTCP, (freeMembers_t)ListenerTCP_destruct, NULL)
kt_define(ConnectorUDP, (freeMembers_t)ConnectorUDP_destruct, NULL) kt_define(ConnectorUDP, (freeMembers_t)ConnectorUDP_destruct, NULL)
kt_define(ConnectorTCP, (freeMembers_t)ConnectorTCP_destruct, NULL) kt_define(ConnectorTCP, (freeMembers_t)ConnectorTCP_destruct, NULL)
LogLevel _log_level = LogLevel_Info; LogLevel _log_level_global = LogLevel_Info;
const char* LogLevel_nameTable[] = {
[LogLevel_Debug] = "Debug",
[LogLevel_Info] = "Info",
[LogLevel_Warning] = "Warning",
[LogLevel_Error] = "Error"
};
int errs(char* err_msg){ int errs(char* err_msg){
throw(err_msg); throw(err_msg);
@ -21,6 +27,7 @@ int errs(char* err_msg){
#define argNext() argv[++argi < argc ? argi : errs(cptr_concat("option '",arg,"' must have a parameter"))] #define argNext() argv[++argi < argc ? argi : errs(cptr_concat("option '",arg,"' must have a parameter"))]
int main(int argc, const char* const* argv){ int main(int argc, const char* const* argv){
const char logCtx[] = "Main";
#ifdef DEBUG #ifdef DEBUG
kt_beginInit(true); kt_beginInit(true);
#else #else
@ -155,25 +162,25 @@ int main(int argc, const char* const* argv){
setLogLevel(log_level); setLogLevel(log_level);
// print parsed args // print parsed args
logDebug("tunnel_protocol: %i", tunnel_protocol); logDebug(logCtx, "tunnel_protocol: %i", tunnel_protocol);
logDebug("input_mode: %i", input_mode); logDebug(logCtx, "input_mode: %i", input_mode);
logDebug("output_mode: %i", output_mode); logDebug(logCtx, "output_mode: %i", output_mode);
char* temps; char* temps;
if(!knIPV4Endpoint_isINVALID(listener_end)){ if(!knIPV4Endpoint_isINVALID(listener_end)){
temps = knIPV4Endpoint_toString(&listener_end); temps = knIPV4Endpoint_toString(&listener_end);
logDebug("listener_end: %s", temps); logDebug(logCtx, "listener_end: %s", temps);
free(temps); free(temps);
} }
else logDebug("listener_end: INVALID"); else logDebug(logCtx, "listener_end: INVALID");
if(!knIPV4Endpoint_isINVALID(connector_end)){ if(!knIPV4Endpoint_isINVALID(connector_end)){
temps = knIPV4Endpoint_toString(&connector_end); temps = knIPV4Endpoint_toString(&connector_end);
logDebug("connector_end: %s", temps); logDebug(logCtx, "connector_end: %s", temps);
free(temps); free(temps);
} }
else logDebug("connector_end: INVALID"); else logDebug(logCtx, "connector_end: INVALID");
logDebug("encryption_key: %s", encryption_key ? encryption_key : "NULL"); logDebug(logCtx, "encryption_key: %s", encryption_key ? encryption_key : "NULL");
logDebug("decryption_key: %s", decryption_key ? decryption_key : "NULL"); logDebug(logCtx, "decryption_key: %s", decryption_key ? decryption_key : "NULL");
logDebug("log_level: %i", log_level); logDebug(logCtx, "log_level: %i", log_level);
// create and run Listener or Connector // create and run Listener or Connector
if(input_mode == InputMode_Connect){ if(input_mode == InputMode_Connect){

View File

@ -17,9 +17,10 @@ extern const char* help_message;
/// nanoseconds /// nanoseconds
typedef u64 nsec_t; typedef u64 nsec_t;
/// microseconds
/// miliseconds
typedef u64 usec_t; typedef u64 usec_t;
/// miliseconds
typedef u64 msec_t;
/// system time now in nanoseconds /// system time now in nanoseconds
///@return u64 will overflow in 13 years ///@return u64 will overflow in 13 years
@ -29,6 +30,10 @@ nsec_t getTimeNsec();
///@return u64 will overflow in 58494 years ///@return u64 will overflow in 58494 years
usec_t getTimeUsec(); usec_t getTimeUsec();
void sleepNsec(nsec_t time);
void sleepUsec(usec_t time);
void sleepMsec(msec_t time);
//////////////////////////////////////// ////////////////////////////////////////
// Enums // // Enums //
//////////////////////////////////////// ////////////////////////////////////////
@ -62,21 +67,34 @@ typedef enum LogLevel {
LogLevel_Debug = 4 LogLevel_Debug = 4
} LogLevel; } LogLevel;
extern LogLevel _log_level; extern const char* LogLevel_nameTable[];
#define __logTryPrint(minimumLogLevel, format, args...)\ extern LogLevel _log_level_global;
if(_log_level >= minimumLogLevel) \
kprintf(format "\n" ,##args)
#define logDebug(format, args...) __logTryPrint(LogLevel_Debug, FGRY format ,##args) #define __logTryPrint(context, logLevel, format, args...)\
if(_log_level_global >= logLevel) \
kprintf("[%s/%s]" format "\n", context, LogLevel_nameTable[logLevel] ,##args)
#define logInfo(format, args...) __logTryPrint(LogLevel_Info, FWHI format ,##args) #define logDebug(context, format, args...) __logTryPrint(context, LogLevel_Debug, FGRY format ,##args)
#define logWarning(format, args...) __logTryPrint(LogLevel_Warning, FYEL format ,##args) #define logInfo(context, format, args...) __logTryPrint(context, LogLevel_Info, FWHI format ,##args)
#define logError(format, args...) __logTryPrint(LogLevel_Error, FRED format ,##args) #define logWarning(context, format, args...) __logTryPrint(context, LogLevel_Warning, FYEL format ,##args)
#define setLogLevel(l) _log_level = l #define logError(context, format, args...) __logTryPrint(context, LogLevel_Error, FRED format ,##args)
#define setLogLevel(l) _log_level_global = l
#define logExceptionAsWarning(context, MAYBE) {\
char* msg = __extendErrMsg(MAYBE.errmsg, __FILE__,__LINE__,__func__);\
logWarning(context, "%s", msg);\
free(msg);\
}
#define logExceptionAsError(context, MAYBE) {\
char* msg = __extendErrMsg(MAYBE.errmsg, __FILE__,__LINE__,__func__);\
logError(context, "%s", msg);\
free(msg);\
}
//////////////////////////////////////// ////////////////////////////////////////
// Pipes // // Pipes //
@ -101,11 +119,11 @@ void PipeUDP_stopAsync(PipeUDP* pipe);
STRUCT(PipeTCP, STRUCT(PipeTCP,
PipeBase base; PipeBase base;
knIPV4Endpoint pipe_out_end;
knSocketTCP* pipe_in_sock; knSocketTCP* pipe_in_sock;
knSocketTCP* pipe_out_sock;
) )
void PipeTCP_construct(PipeTCP* pipe, knSocketTCP* pipe_in_sock, knIPV4Endpoint pipe_out_end); Maybe PipeTCP_construct(PipeTCP* pipe, knSocketTCP* pipe_in_sock, knIPV4Endpoint pipe_out_end);
void PipeTCP_destruct(PipeTCP* pipe); void PipeTCP_destruct(PipeTCP* pipe);
void PipeTCP_startAsync(PipeTCP* pipe); void PipeTCP_startAsync(PipeTCP* pipe);
void PipeTCP_stopAsync(PipeTCP* pipe); void PipeTCP_stopAsync(PipeTCP* pipe);
@ -142,7 +160,7 @@ void ListenerUDP_destruct(ListenerUDP* ptr);
void ListenerUDP_start(ListenerUDP* ptr); void ListenerUDP_start(ListenerUDP* ptr);
STRUCT(ListenerTCP, STRUCT(ListenerTCP,
/* Hashtable<EndpointString, PipeUDP*> pipes.table */ /* Hashtable<EndpointString, PipeTCP*> pipes.table */
PipeStorage pipes; PipeStorage pipes;
knIPV4Endpoint connector_end; knIPV4Endpoint connector_end;
knSocketTCP* main_sock; knSocketTCP* main_sock;
@ -184,6 +202,66 @@ void ConnectorTCP_destruct(ConnectorTCP* ptr);
///@return doesn't return, INFINITE LOOP ///@return doesn't return, INFINITE LOOP
void ConnectorTCP_start(ConnectorTCP* ptr); void ConnectorTCP_start(ConnectorTCP* ptr);
////////////////////////////////////////
// Packets //
////////////////////////////////////////
typedef union MagicUnion {
char array[8];
u64 U64;
} MagicUnion;
typedef enum InternalPacketMessage {
InternalPacketMessage_ConnectorHandshake,
InternalPacketMessage_ListenerHandshake,
} __attribute__((__aligned__(8))) InternalPacketMessage;
/// connector <-> listener
typedef struct InternalPacket {
/* 8 bytes */
MagicUnion data;
/* 8 bytes */
union {
InternalPacketMessage message;
char __padding[8];
};
} InternalPacket;
bool InternalPacket_tryParse(const char* data, u32 data_length, InternalPacket* pac_ptr);
/// read_sender -> listener -> connector -> real_receiver
typedef struct IncomingPacketHeader {
/* 8 */
MagicUnion data;
/* 4(ipv4)+2(port)+2(padding) */
knIPV4Endpoint real_sender_end;
} IncomingPacketHeader;
typedef struct IncomingPacket {
IncomingPacketHeader header;
const char* payload;
} IncomingPacket;
bool IncomingPacket_tryParse(const char* data, u32 data_length, IncomingPacket* pac_ptr);
/// read_sender <- listener <- connector <- real_receiver
typedef struct OutgoingPacketHeader {
/* 8 */
MagicUnion data;
/* 4(ipv4)+2(port)+2(padding) */
knIPV4Endpoint real_destination_end;
} OutgoingPacketHeader;
typedef struct OutgoingPacket {
OutgoingPacketHeader header;
const char* payload;
} OutgoingPacket;
bool OutgoingPacket_tryParse(const char* data, u32 data_length, OutgoingPacket* pac_ptr);
#if __cplusplus #if __cplusplus
} }
#endif #endif

View File

@ -1,10 +1,14 @@
#include "port-tunnel.h" #include "port-tunnel.h"
#define K 1000
#define M 1000000
#define G 1000000000
nsec_t getTimeNsec(){ nsec_t getTimeNsec(){
struct timespec t; struct timespec t;
if(clock_gettime(CLOCK_REALTIME, &t) != 0) if(clock_gettime(CLOCK_REALTIME, &t) != 0)
throw(ERR_UNEXPECTEDVAL); throw(ERR_UNEXPECTEDVAL);
u64 n = t.tv_sec * 10e9 + t.tv_nsec; u64 n = t.tv_sec * G + t.tv_nsec;
return n; return n;
} }
@ -12,6 +16,23 @@ usec_t getTimeUsec(){
struct timespec t; struct timespec t;
if(clock_gettime(CLOCK_REALTIME, &t) != 0) if(clock_gettime(CLOCK_REALTIME, &t) != 0)
throw(ERR_UNEXPECTEDVAL); throw(ERR_UNEXPECTEDVAL);
u64 n = t.tv_sec * 10e6 + t.tv_nsec / 10e3; u64 n = t.tv_sec * M + t.tv_nsec / K;
return n; return n;
} }
void sleepNsec(nsec_t time){
struct timespec t = {
.tv_sec = time / G,
.tv_nsec = time % G
};
if(nanosleep(&t, NULL) != 0)
logDebug("nanosleep failed");
}
void sleepUsec(usec_t time){
sleepNsec(time * K);
}
void sleepMsec(msec_t time){
sleepNsec(time * M);
}