From 7a3808ba595890bb898cbcba9194b9b80922fb92 Mon Sep 17 00:00:00 2001 From: Timerix Date: Mon, 24 Nov 2025 23:51:00 +0500 Subject: [PATCH] added idb_lock functions --- dependencies/tlibc | 2 +- src/cli/ClientCLI/ClientCLI.c | 80 ++++++++++++--------- src/cli/ClientCLI/ClientCLI.h | 9 +-- src/db/idb.c | 107 +++++++++++++++++++++-------- src/db/idb.h | 37 +++++++--- src/db/tables.h | 33 ++++++--- src/network/tcp-chat-protocol/v1.h | 9 ++- src/server/responses/Login.c | 12 ++-- src/server/responses/Register.c | 18 ++--- src/server/responses/SendMessage.c | 2 + src/server/server.c | 65 ++++++++++++++---- src/server/server_internal.h | 23 +++++-- 12 files changed, 279 insertions(+), 118 deletions(-) diff --git a/dependencies/tlibc b/dependencies/tlibc index 3a7f09b..b557881 160000 --- a/dependencies/tlibc +++ b/dependencies/tlibc @@ -1 +1 @@ -Subproject commit 3a7f09bb498658dff1b5f38e5f5bf9474ad833ba +Subproject commit b557881168a15797bd3bd3d989cd7d48d3fb0804 diff --git a/src/cli/ClientCLI/ClientCLI.c b/src/cli/ClientCLI/ClientCLI.c index 5f6bbcc..1894d70 100644 --- a/src/cli/ClientCLI/ClientCLI.c +++ b/src/cli/ClientCLI/ClientCLI.c @@ -39,13 +39,11 @@ void ClientCLI_destroy(ClientCLI* self){ Client_free(self->client); idb_close(self->db); - pthread_mutex_destroy(&self->servers_cache_mutex); - List_destroy(self->servers_cache_list); - HashMap_destroy(&self->servers_addr_id_map); + List_destroy(&self->servers.list); + HashMap_destroy(&self->servers.addr_id_map); } void ClientCLI_construct(ClientCLI* self){ memset(self, 0, sizeof(*self)); - pthread_mutex_init(&self->servers_cache_mutex, NULL); } Result(void) ClientCLI_run(ClientCLI* self) { @@ -227,18 +225,19 @@ static Result(void) ClientCLI_joinNewServer(ClientCLI* self){ static Result(void) ClientCLI_selectServerFromCache(ClientCLI* self){ Deferral(8); - // lock servers cache - try_stderrcode(pthread_mutex_lock(&self->servers_cache_mutex)); - Defer(pthread_mutex_unlock(&self->servers_cache_mutex)); + // Lock table until this function returns. + // It may not change any data in table, but it uses associated cache structures. + idb_lockTable(self->servers.table); + Defer(idb_unlockTable(self->servers.table)); - u32 servers_count = List_len(self->servers_cache_list, ServerInfo); + u32 servers_count = List_len(self->servers.list, ServerInfo); if(servers_count == 0){ printf("No servers found in cache\n"); Return RESULT_VOID; } for(u32 id = 0; id < servers_count; id++){ - ServerInfo* row = &List_index(self->servers_cache_list, ServerInfo, id); + ServerInfo* row = &List_index(self->servers.list, ServerInfo, id); printf("[%02u] "FMT_str" "FMT_str"\n", id, row->address_len, row->address, row->name_len, row->name); } @@ -262,7 +261,7 @@ static Result(void) ClientCLI_selectServerFromCache(ClientCLI* self){ } else break; } - ServerInfo* server = &List_index(self->servers_cache_list, ServerInfo, id); + ServerInfo* server = &List_index(self->servers.list, ServerInfo, id); printf("Connecting to '"FMT_str"'...\n", server->address_len, server->address); try_void(Client_connect(self->client, server->address, server->pk_base64)); @@ -290,7 +289,7 @@ static Result(void) ClientCLI_selectServerFromCache(ClientCLI* self){ memcpy(server->desc, desc.data, server->desc_len); } if(server_info_changed){ - try_void(idb_updateRow(self->db_servers_table, id, server)); + try_void(idb_updateRow(self->servers.table, id, server, false)); } try_void(ClientCLI_showServerInfo(self, server)); @@ -320,20 +319,34 @@ static Result(void) ClientCLI_openUserDB(ClientCLI* self){ str user_db_dir = str_from_cstr(strcat_malloc("client-db", path_seps, username.data)); Defer(free(user_db_dir.data)); try(self->db, p, idb_open(user_db_dir, user_data_key)); + + // Lock DB until this function returns. + idb_lockDB(self->db); + Defer(idb_unlockDB(self->db)); + + // Load servers table + try(self->servers.table, p, + idb_getOrCreateTable(self->db, STR("servers"), sizeof(ServerInfo), false) + ); - // load servers table - try(self->db_servers_table, p, idb_getOrCreateTable(self->db, STR("servers"), sizeof(ServerInfo))); - // load whole table to list - try(u64 servers_count, u, idb_getRowCount(self->db_servers_table)); - self->servers_cache_list = List_alloc(ServerInfo, servers_count); - try_void(idb_getRows(self->db_servers_table, 0, self->servers_cache_list.data, servers_count)); - self->servers_cache_list.size = sizeof(ServerInfo) * servers_count; + // Lock table until this function returns. + idb_lockTable(self->servers.table); + Defer(idb_unlockTable(self->servers.table)); + + // load whole servers table to list + try_void( + idb_createListFromTable(self->servers.table, &self->servers.list, false) + ); + // build address-id map - HashMap_construct(&self->servers_addr_id_map, u64, NULL); + try(u64 servers_count, u, + idb_getRowCount(self->servers.table, false) + ); + HashMap_construct(&self->servers.addr_id_map, u64, NULL); for(u64 id = 0; id < servers_count; id++){ - ServerInfo* row = &List_index(self->servers_cache_list, ServerInfo, id); + ServerInfo* row = &List_index(self->servers.list, ServerInfo, id); str key = str_construct(row->address, row->address_len, true); - if(!HashMap_tryPush(&self->servers_addr_id_map, key, &id)){ + if(!HashMap_tryPush(&self->servers.addr_id_map, key, &id)){ Return RESULT_ERROR_FMT("duplicate server address '"FMT_str"'", key.size, key.data); } } @@ -369,29 +382,30 @@ static Result(ServerInfo*) ClientCLI_saveServerInfo(ClientCLI* self, server.desc_len = desc.size; memcpy(server.desc, desc.data, server.desc_len); - // lock servers cache - try_stderrcode(pthread_mutex_lock(&self->servers_cache_mutex)); - Defer(pthread_mutex_unlock(&self->servers_cache_mutex)); + // Lock table until this function returns. + // It may not change any data in table, but it uses associated cache structures. + idb_lockTable(self->servers.table); + Defer(idb_unlockTable(self->servers.table)); // try find server id in cache ServerInfo* cached_row_ptr = NULL; u64* id_ptr = NULL; - id_ptr = HashMap_tryGetPtr(&self->servers_addr_id_map, addr); + id_ptr = HashMap_tryGetPtr(&self->servers.addr_id_map, addr); if(id_ptr){ // update existing server u64 id = *id_ptr; - try_void(idb_updateRow(self->db_servers_table, id, &server)); - try_assert(id < List_len(self->servers_cache_list, ServerInfo)); - cached_row_ptr = &List_index(self->servers_cache_list, ServerInfo, id); + try_void(idb_updateRow(self->servers.table, id, &server, false)); + try_assert(id < List_len(self->servers.list, ServerInfo)); + cached_row_ptr = &List_index(self->servers.list, ServerInfo, id); memcpy(cached_row_ptr, &server, sizeof(ServerInfo)); } else { // push new server - try(u64 id, u, idb_pushRow(self->db_servers_table, &server)); - try_assert(id == List_len(self->servers_cache_list, ServerInfo)); - List_pushMany(&self->servers_cache_list, ServerInfo, &server, 1); - cached_row_ptr = &List_index(self->servers_cache_list, ServerInfo, id); - try_assert(HashMap_tryPush(&self->servers_addr_id_map, addr, &id)); + try(u64 id, u, idb_pushRow(self->servers.table, &server, false)); + try_assert(id == List_len(self->servers.list, ServerInfo)); + List_pushMany(&self->servers.list, ServerInfo, &server, 1); + cached_row_ptr = &List_index(self->servers.list, ServerInfo, id); + try_assert(HashMap_tryPush(&self->servers.addr_id_map, addr, &id)); } Return RESULT_VALUE(p, cached_row_ptr); diff --git a/src/cli/ClientCLI/ClientCLI.h b/src/cli/ClientCLI/ClientCLI.h index e6b7570..8a7b8c7 100644 --- a/src/cli/ClientCLI/ClientCLI.h +++ b/src/cli/ClientCLI/ClientCLI.h @@ -9,10 +9,11 @@ typedef struct ClientCLI { Client* client; IncrementalDB* db; - Table* db_servers_table; - pthread_mutex_t servers_cache_mutex; - List(ServerInfo) servers_cache_list; // index is id - HashMap(u64) servers_addr_id_map; // key is server address + struct { + Table* table; + List(ServerInfo) list; // index is id + HashMap(u64) addr_id_map; // key is server address + } servers; } ClientCLI; void ClientCLI_construct(ClientCLI* self); diff --git a/src/db/idb.c b/src/db/idb.c index bb04135..9a157aa 100644 --- a/src/db/idb.c +++ b/src/db/idb.c @@ -44,7 +44,7 @@ typedef struct IncrementalDB { static const Magic32 TABLE_FILE_MAGIC = { .bytes = { 'I', 'D', 'B', 't' } }; -void Table_close(Table* t){ +static void Table_close(Table* t){ if(t == NULL) return; fclose(t->table_file); @@ -204,6 +204,16 @@ static Result(void) Table_validateRowSize(Table* t, u32 row_size){ } +void idb_close(IncrementalDB* db){ + if(db == NULL) + return; + str_free(db->db_dir); + Array_free(db->aes_key); + HashMap_destroy(&db->tables_map); + pthread_mutex_destroy(&db->mutex); + free(db); +} + Result(IncrementalDB*) idb_open(str db_dir, NULLABLE(Array(u8) aes_key)){ Deferral(16); try_assert(aes_key.size == 0 || aes_key.size == 16 || aes_key.size == 24 || aes_key.size == 32); @@ -228,21 +238,30 @@ Result(IncrementalDB*) idb_open(str db_dir, NULLABLE(Array(u8) aes_key)){ Return RESULT_VALUE(p, db); } -void idb_close(IncrementalDB* db){ - if(db == NULL) - return; - str_free(db->db_dir); - Array_free(db->aes_key); - HashMap_destroy(&db->tables_map); - pthread_mutex_destroy(&db->mutex); - free(db); +void idb_lockDB(IncrementalDB* db){ + try_fatal_stderrcode(pthread_mutex_lock(&db->mutex)); } -Result(Table*) idb_getOrCreateTable(IncrementalDB* db, str table_name, u32 row_size){ +void idb_unlockDB(IncrementalDB* db){ + try_fatal_stderrcode(pthread_mutex_unlock(&db->mutex)); +} + +void idb_lockTable(Table* t){ + try_fatal_stderrcode(pthread_mutex_lock(&t->mutex)); +} + +void idb_unlockTable(Table* t){ + try_fatal_stderrcode(pthread_mutex_unlock(&t->mutex)); +} + + +Result(Table*) idb_getOrCreateTable(IncrementalDB* db, str table_name, u32 row_size, bool lock_db){ Deferral(16); - // db lock - try_stderrcode(pthread_mutex_lock(&db->mutex)); - Defer(pthread_mutex_unlock(&db->mutex)); + + if(lock_db){ + idb_lockDB(db); + Defer(idb_unlockDB(db)); + } Table** tpp = HashMap_tryGetPtr(&db->tables_map, table_name); if(tpp != NULL){ @@ -320,11 +339,13 @@ Result(Table*) idb_getOrCreateTable(IncrementalDB* db, str table_name, u32 row_s Return RESULT_VALUE(p, t); } -Result(void) idb_getRows(Table* t, u64 id, void* dst, u64 count){ +Result(void) idb_getRows(Table* t, u64 id, void* dst, u64 count, bool lock_table){ Deferral(8); - // table lock - try_stderrcode(pthread_mutex_lock(&t->mutex)); - Defer(pthread_mutex_unlock(&t->mutex)); + + if(lock_table){ + idb_lockTable(t); + Defer(idb_unlockTable(t)); + } if(id + count > t->row_count){ Return RESULT_ERROR_FMT( @@ -361,11 +382,13 @@ Result(void) idb_getRows(Table* t, u64 id, void* dst, u64 count){ Return RESULT_VOID; } -Result(void) idb_updateRows(Table* t, u64 id, const void* src, u64 count){ +Result(void) idb_updateRows(Table* t, u64 id, const void* src, u64 count, bool lock_table){ Deferral(8); - // table lock - try_stderrcode(pthread_mutex_lock(&t->mutex)); - Defer(pthread_mutex_unlock(&t->mutex)); + + if(lock_table){ + idb_lockTable(t); + Defer(idb_unlockTable(t)); + } if(id + count > t->row_count){ Return RESULT_ERROR_FMT( @@ -406,11 +429,13 @@ Result(void) idb_updateRows(Table* t, u64 id, const void* src, u64 count){ Return RESULT_VOID; } -Result(u64) idb_pushRows(Table* t, const void* src, u64 count){ +Result(u64) idb_pushRows(Table* t, const void* src, u64 count, bool lock_table){ Deferral(8); - // table lock - try_stderrcode(pthread_mutex_lock(&t->mutex)); - Defer(pthread_mutex_unlock(&t->mutex)); + + if(lock_table){ + idb_lockTable(t); + Defer(idb_unlockTable(t)); + } try_void(Table_setDirtyBit(t, true)); Defer(IGNORE_RESULT Table_setDirtyBit(t, false)); @@ -442,11 +467,35 @@ Result(u64) idb_pushRows(Table* t, const void* src, u64 count){ Return RESULT_VALUE(u, new_row_index); } -Result(u64) idb_getRowCount(Table* t){ +Result(u64) idb_getRowCount(Table* t, bool lock_table){ Deferral(4); - // table lock - try_stderrcode(pthread_mutex_lock(&t->mutex)); - Defer(pthread_mutex_unlock(&t->mutex)); + + if(lock_table){ + idb_lockTable(t); + Defer(idb_unlockTable(t)); + } + u64 count = t->row_count; Return RESULT_VALUE(u, count); } + +Result(void) idb_createListFromTable(Table* t, List_* l, bool lock_table){ + Deferral(1); + + if(lock_table){ + idb_lockTable(t); + Defer(idb_unlockTable(t)); + } + + u64 rows_count = t->row_count; + u64 total_size = rows_count * t->header.row_size; + *l = List_alloc_size(total_size); + l->size = total_size; + bool success = false; + Defer(if(!success) List_destroy(l)); + + try_void(idb_getRows(t, 0, l->data, rows_count, false)); + + success = true; + Return RESULT_VOID; +} diff --git a/src/db/idb.h b/src/db/idb.h index a71f28e..c51fd6c 100644 --- a/src/db/idb.h +++ b/src/db/idb.h @@ -1,6 +1,7 @@ #pragma once #include "tlibc/errors.h" +#include "tlibc/collections/List.h" #define IDB_VERSION 2 #define IDB_AES_KEY_SIZE 32 @@ -12,15 +13,35 @@ typedef struct Table Table; Result(IncrementalDB*) idb_open(str db_dir, NULLABLE(Array(u8) aes_key)); void idb_close(IncrementalDB* db); -Result(Table*) idb_getOrCreateTable(IncrementalDB* db, str table_name, u32 row_size); +/// before performing atransaction on DB lock it manually or call functions with parameter lock_db=true +void idb_lockDB(IncrementalDB* db); -Result(void) idb_getRows(Table* t, u64 start_from_id, void* dst, u64 count); -#define idb_getRow(T, ID, DST) idb_getRows(T, ID, DST, 1) +/// USAGE: +/// idb_lockDB(db); +/// Defer(idb_unlockDB(db)); +void idb_unlockDB(IncrementalDB* db); -Result(u64) idb_pushRows(Table* t, const void* src, u64 count); -#define idb_pushRow(T, SRC) idb_pushRows(T, SRC, 1) +/// before performing a transaction on Table lock it manually or call function with parameter lock_db=true +void idb_lockTable(Table* t); -Result(void) idb_updateRows(Table* t, u64 start_from_id, const void* src, u64 count); -#define idb_updateRow(T, ID, SRC) idb_updateRows(T, ID, SRC, 1) +/// USAGE: +/// idb_lockTable(t); +/// Defer(idb_unlockTable(t)); +void idb_unlockTable(Table* t); -Result(u64) idb_getRowCount(Table* t); + +Result(Table*) idb_getOrCreateTable(IncrementalDB* db, str table_name, u32 row_size, bool lock_db); + +Result(void) idb_getRows(Table* t, u64 start_from_id, void* dst, u64 count, bool lock_table); +#define idb_getRow(T, ID, DST, LOCK) idb_getRows(T, ID, DST, 1, LOCK) + +Result(u64) idb_pushRows(Table* t, const void* src, u64 count, bool lock_table); +#define idb_pushRow(T, SRC, LOCK) idb_pushRows(T, SRC, 1, LOCK) + +Result(void) idb_updateRows(Table* t, u64 start_from_id, const void* src, u64 count, bool lock_table); +#define idb_updateRow(T, ID, SRC, LOCK) idb_updateRows(T, ID, SRC, 1, LOCK) + +Result(u64) idb_getRowCount(Table* t, bool lock_table); + +/// construct new list and load whole table into it +Result(void) idb_createListFromTable(Table* t, List_* l, bool lock_table); diff --git a/src/db/tables.h b/src/db/tables.h index 036f008..42ad2f7 100644 --- a/src/db/tables.h +++ b/src/db/tables.h @@ -1,7 +1,9 @@ #pragma once #include "tcp-chat/common_constants.h" #include "tlibc/time.h" +#include "tlibc/magic.h" +// TODO: add table versions typedef struct UserInfo { u16 name_len; @@ -30,15 +32,30 @@ typedef struct ChannelInfo { char desc[CHANNEL_DESC_SIZE_MAX + 1]; } ATTRIBUTE_ALIGNED(4*1024) ChannelInfo; - -typedef struct MessageMetadata { - DateTime receiving_time_utc; - u64 sender_id; +// not a table +typedef struct MessageMeta { + /* + In block messages can be stored with some padding (zero bytes) between them. + To distinguish message from padding, each message starts with MESSAGE_MAGIC. + */ + Magic32 magic; u16 data_size; -} ATTRIBUTE_ALIGNED(64) MessageMetadata; + u64 id; + u64 sender_id; + DateTime receiving_time_utc; +} ATTRIBUTE_ALIGNED(64) MessageMeta; +#define MESSAGE_MAGIC ((Magic32){ .bytes = { 'M', 's', 'g', 'S' } }) +// Stores some number of messages. Look in MessageBlockMeta to see how much. typedef struct MessageBlock { - /* sequence of messages (MessageMetadata, byte[]) */ - u8 data[16*1024]; -} MessageBlock; + /* ((sequence MessageMeta), (sequence binary-data)) */ + u8 data[64*1024 - 4]; +} ATTRIBUTE_ALIGNED(64) MessageBlock; + +// is used to find in which MessageBlock a message is stored +typedef struct MessageBlockMeta { + u64 message_id_first; + u32 messages_count; +} ATTRIBUTE_ALIGNED(16) MessageBlockMeta; + diff --git a/src/network/tcp-chat-protocol/v1.h b/src/network/tcp-chat-protocol/v1.h index a403efe..2c0f92e 100644 --- a/src/network/tcp-chat-protocol/v1.h +++ b/src/network/tcp-chat-protocol/v1.h @@ -140,16 +140,19 @@ void SendMessageResponse_construct(SendMessageResponse* ptr, PacketHeader* heade typedef struct GetMessageBlockRequest { - u64 message_block_id; + u64 message_id_first; + u32 messages_count; } ALIGN_PACKET_STRUCT GetMessageBlockRequest; void GetMessageBlockRequest_construct(GetMessageBlockRequest* ptr, PacketHeader* header, - u64 message_block_id); + u64 message_id_first, u32 messages_count); typedef struct GetMessageBlockResponse { + u64 message_id_first; + u32 messages_count; u32 data_size; - /* stream of size data_size */ + /* stream of size data_size : ((sequence MessageMeta), (sequence binary-data)) */ } ALIGN_PACKET_STRUCT GetMessageBlockResponse; void GetMessageBlockResponse_construct(GetMessageBlockResponse* ptr, PacketHeader* header, diff --git a/src/server/responses/Login.c b/src/server/responses/Login.c index 869f00c..13bfb75 100644 --- a/src/server/responses/Login.c +++ b/src/server/responses/Login.c @@ -34,15 +34,15 @@ declare_RequestHandler(Login) } // lock users cache - try_stderrcode(pthread_mutex_lock(&conn->server->users_cache_mutex)); + idb_lockTable(conn->server->users.table); bool unlocked_users_cache_mutex = false; Defer( if(!unlocked_users_cache_mutex) - pthread_mutex_unlock(&conn->server->users_cache_mutex) + idb_unlockTable(conn->server->users.table) ); // try get id from name cache - u64* id_ptr = HashMap_tryGetPtr(&conn->server->users_name_id_map, username_str); + u64* id_ptr = HashMap_tryGetPtr(&conn->server->users.username_id_map, username_str); if(id_ptr == NULL){ try_void(sendErrorMessage_f(log_ctx, conn, res_head, LogSeverity_Warn, @@ -54,8 +54,8 @@ declare_RequestHandler(Login) u64 user_id = *id_ptr; // get user by id - try_assert(user_id < List_len(conn->server->users_cache_list, UserInfo)); - UserInfo* u = &List_index(conn->server->users_cache_list, UserInfo, user_id); + try_assert(user_id < List_len(conn->server->users.cache_list, UserInfo)); + UserInfo* u = &List_index(conn->server->users.cache_list, UserInfo, user_id); // validate token hash if(memcmp(req.token, u->token, sizeof(req.token)) != 0){ @@ -67,7 +67,7 @@ declare_RequestHandler(Login) } // manually unlock mutex - pthread_mutex_unlock(&conn->server->users_cache_mutex); + idb_unlockTable(conn->server->users.table); unlocked_users_cache_mutex = true; // authorize diff --git a/src/server/responses/Register.c b/src/server/responses/Register.c index c8a432e..2ea8b5c 100644 --- a/src/server/responses/Register.c +++ b/src/server/responses/Register.c @@ -34,16 +34,16 @@ declare_RequestHandler(Register) } // lock users cache - try_stderrcode(pthread_mutex_lock(&conn->server->users_cache_mutex)); + idb_lockTable(conn->server->users.table); bool unlocked_users_cache_mutex = false; // unlock mutex on error catch Defer( if(!unlocked_users_cache_mutex) - pthread_mutex_unlock(&conn->server->users_cache_mutex) + idb_unlockTable(conn->server->users.table) ); // check if name is taken - if(HashMap_tryGetPtr(&conn->server->users_name_id_map, username_str) != NULL){ + if(HashMap_tryGetPtr(&conn->server->users.username_id_map, username_str) != NULL){ try_void(sendErrorMessage_f(log_ctx, conn, res_head, LogSeverity_Warn, "Username '%s' already exists", @@ -63,13 +63,15 @@ declare_RequestHandler(Register) DateTime_getUTC(&user.registration_time_utc); // save new user to db and cache - try(u64 user_id, u, idb_pushRow(conn->server->db_users_table, &user)); - try_assert(user_id == List_len(conn->server->users_cache_list, UserInfo)); - List_pushMany(&conn->server->users_cache_list, UserInfo, &user, 1); - try_assert(HashMap_tryPush(&conn->server->users_name_id_map, username_str, &user_id)); + try(u64 user_id, u, + idb_pushRow(conn->server->users.table, &user, false) + ); + try_assert(user_id == List_len(conn->server->users.cache_list, UserInfo)); + List_pushMany(&conn->server->users.cache_list, UserInfo, &user, 1); + try_assert(HashMap_tryPush(&conn->server->users.username_id_map, username_str, &user_id)); // manually unlock mutex - pthread_mutex_unlock(&conn->server->users_cache_mutex); + idb_unlockTable(conn->server->users.table); unlocked_users_cache_mutex = true; logInfo(log_ctx, "registered user '%s'", username_str.data); diff --git a/src/server/responses/SendMessage.c b/src/server/responses/SendMessage.c index 796567c..4a64812 100644 --- a/src/server/responses/SendMessage.c +++ b/src/server/responses/SendMessage.c @@ -13,6 +13,8 @@ declare_RequestHandler(SendMessage) try_void(PacketHeader_validateContentSize(req_head, sizeof(req))); try_void(EncryptedSocketTCP_recvStruct(&conn->sock, &req)); + + // send response SendMessageResponse res; SendMessageResponse_construct(&res, res_head, ); diff --git a/src/server/server.c b/src/server/server.c index d58d54c..edf617d 100644 --- a/src/server/server.c +++ b/src/server/server.c @@ -20,9 +20,13 @@ void Server_free(Server* self){ RSA_destroyPublicKey(&self->rsa_pk); idb_close(self->db); - pthread_mutex_destroy(&self->users_cache_mutex); - List_destroy(self->users_cache_list); - HashMap_destroy(&self->users_name_id_map); + + List_destroy(&self->users.cache_list); + HashMap_destroy(&self->users.username_id_map); + + List_destroy(&self->messages.blocks_meta_list); + LList_MessageBlock_destroy(&self->messages.blocks_queue); + free(self->messages.incomplete_block); free(self); } @@ -92,24 +96,59 @@ Result(Server*) Server_create(str config_str, void* logger, LogFunction_t log_fu // build users cache logDebug(log_ctx, "loading users..."); - pthread_mutex_init(&self->users_cache_mutex, NULL); - try(self->db_users_table, p, idb_getOrCreateTable(self->db, STR("users"), sizeof(UserInfo))); - // load whole table to list - try(u64 users_count, u, idb_getRowCount(self->db_users_table)); - self->users_cache_list = List_alloc(UserInfo, users_count); - try_void(idb_getRows(self->db_users_table, 0, self->users_cache_list.data, users_count)); - self->users_cache_list.size = sizeof(UserInfo) * users_count; + try(self->users.table, p, + idb_getOrCreateTable(self->db, STR("users"), sizeof(UserInfo), false) + ); + + // load whole users table to list + try_void( + idb_createListFromTable(self->users.table, &self->users.cache_list, false) + ); + // build name-id map - HashMap_construct(&self->users_name_id_map, u64, NULL); + try(u64 users_count, u, idb_getRowCount(self->users.table, false)); + HashMap_construct(&self->users.username_id_map, u64, NULL); for(u64 id = 0; id < users_count; id++){ - UserInfo* row = &List_index(self->users_cache_list, UserInfo, id); + UserInfo* row = &List_index(self->users.cache_list, UserInfo, id); str key = str_construct(row->name, row->name_len, true); - if(!HashMap_tryPush(&self->users_name_id_map, key, &id)){ + if(!HashMap_tryPush(&self->users.username_id_map, key, &id)){ Return RESULT_ERROR_FMT("duplicate user name '"FMT_str"'", key.size, key.data); } } logDebug(log_ctx, "loaded "FMT_u64" users", users_count); + + // build messages cache + logDebug(log_ctx, "loading messages..."); + try(self->messages.blocks_table, p, + idb_getOrCreateTable(self->db, STR("message_blocks"), sizeof(MessageBlock), false) + ); + try(self->messages.blocks_meta_table, p, + idb_getOrCreateTable(self->db, STR("message_blocks_meta"), sizeof(MessageBlockMeta), false) + ); + + // load whole message_blocks_meta table to list + try_void( + idb_createListFromTable(self->messages.blocks_meta_table, &self->messages.blocks_meta_list, false) + ); + + // load N last blocks to the queue + self->messages.incomplete_block = LLNode_MessageBlock_createZero(); + self->messages.blocks_queue = LList_construct(MessageBlock, NULL); + try(u64 message_blocks_count, u, idb_getRowCount(self->messages.blocks_table, false)); + u64 first_id = 0; + if(message_blocks_count > MESSAGE_BLOCKS_CACHE_COUNT) + first_id = message_blocks_count - MESSAGE_BLOCKS_CACHE_COUNT; + for(u64 id = first_id; id < message_blocks_count; id++){ + LLNode(MessageBlock)* node = LLNode_MessageBlock_createZero(); + LList_MessageBlock_insertAfter( + &self->messages.blocks_queue, + self->messages.blocks_queue.last, + node + ); + try_void(idb_getRow(self->messages.blocks_table, id, node->value.data, false)); + } + success = true; Return RESULT_VALUE(p, self); } diff --git a/src/server/server_internal.h b/src/server/server_internal.h index 2593b31..1c7c9f2 100644 --- a/src/server/server_internal.h +++ b/src/server/server_internal.h @@ -1,7 +1,7 @@ #pragma once -#include #include "tlibc/collections/HashMap.h" #include "tlibc/collections/List.h" +#include "tlibc/collections/LList.h" #include "tcp-chat/server.h" #include "cryptography/AES.h" #include "cryptography/RSA.h" @@ -11,6 +11,10 @@ typedef struct ClientConnection ClientConnection; +LList_declare(MessageBlock); + +#define MESSAGE_BLOCKS_CACHE_COUNT 50 + typedef struct Server { /* from constructor */ void* logger; @@ -26,10 +30,19 @@ typedef struct Server { /* database and cache */ IncrementalDB* db; - Table* db_users_table; - pthread_mutex_t users_cache_mutex; - List(UserInfo) users_cache_list; // index is id - HashMap(u64) users_name_id_map; // key is user name + struct { + Table* table; + List(UserInfo) cache_list; // index is id + HashMap(u64) username_id_map; + } users; + /* messages */ + struct { + Table* blocks_table; + Table* blocks_meta_table; + List(MessageBlockMeta) blocks_meta_list; // index is id + LList(MessageBlock) blocks_queue; // last N MessageBlocks, ascending + LLNode(MessageBlock)* incomplete_block; // new messages are written here until block is full + } messages; } Server;