added idb_lock functions

This commit is contained in:
Timerix 2025-11-24 23:51:00 +05:00
parent 571fdd900f
commit 7a3808ba59
12 changed files with 279 additions and 118 deletions

2
dependencies/tlibc vendored

@ -1 +1 @@
Subproject commit 3a7f09bb498658dff1b5f38e5f5bf9474ad833ba
Subproject commit b557881168a15797bd3bd3d989cd7d48d3fb0804

View File

@ -39,13 +39,11 @@ void ClientCLI_destroy(ClientCLI* self){
Client_free(self->client);
idb_close(self->db);
pthread_mutex_destroy(&self->servers_cache_mutex);
List_destroy(self->servers_cache_list);
HashMap_destroy(&self->servers_addr_id_map);
List_destroy(&self->servers.list);
HashMap_destroy(&self->servers.addr_id_map);
}
void ClientCLI_construct(ClientCLI* self){
memset(self, 0, sizeof(*self));
pthread_mutex_init(&self->servers_cache_mutex, NULL);
}
Result(void) ClientCLI_run(ClientCLI* self) {
@ -227,18 +225,19 @@ static Result(void) ClientCLI_joinNewServer(ClientCLI* self){
static Result(void) ClientCLI_selectServerFromCache(ClientCLI* self){
Deferral(8);
// lock servers cache
try_stderrcode(pthread_mutex_lock(&self->servers_cache_mutex));
Defer(pthread_mutex_unlock(&self->servers_cache_mutex));
// Lock table until this function returns.
// It may not change any data in table, but it uses associated cache structures.
idb_lockTable(self->servers.table);
Defer(idb_unlockTable(self->servers.table));
u32 servers_count = List_len(self->servers_cache_list, ServerInfo);
u32 servers_count = List_len(self->servers.list, ServerInfo);
if(servers_count == 0){
printf("No servers found in cache\n");
Return RESULT_VOID;
}
for(u32 id = 0; id < servers_count; id++){
ServerInfo* row = &List_index(self->servers_cache_list, ServerInfo, id);
ServerInfo* row = &List_index(self->servers.list, ServerInfo, id);
printf("[%02u] "FMT_str" "FMT_str"\n",
id, row->address_len, row->address, row->name_len, row->name);
}
@ -262,7 +261,7 @@ static Result(void) ClientCLI_selectServerFromCache(ClientCLI* self){
}
else break;
}
ServerInfo* server = &List_index(self->servers_cache_list, ServerInfo, id);
ServerInfo* server = &List_index(self->servers.list, ServerInfo, id);
printf("Connecting to '"FMT_str"'...\n", server->address_len, server->address);
try_void(Client_connect(self->client, server->address, server->pk_base64));
@ -290,7 +289,7 @@ static Result(void) ClientCLI_selectServerFromCache(ClientCLI* self){
memcpy(server->desc, desc.data, server->desc_len);
}
if(server_info_changed){
try_void(idb_updateRow(self->db_servers_table, id, server));
try_void(idb_updateRow(self->servers.table, id, server, false));
}
try_void(ClientCLI_showServerInfo(self, server));
@ -321,19 +320,33 @@ static Result(void) ClientCLI_openUserDB(ClientCLI* self){
Defer(free(user_db_dir.data));
try(self->db, p, idb_open(user_db_dir, user_data_key));
// load servers table
try(self->db_servers_table, p, idb_getOrCreateTable(self->db, STR("servers"), sizeof(ServerInfo)));
// load whole table to list
try(u64 servers_count, u, idb_getRowCount(self->db_servers_table));
self->servers_cache_list = List_alloc(ServerInfo, servers_count);
try_void(idb_getRows(self->db_servers_table, 0, self->servers_cache_list.data, servers_count));
self->servers_cache_list.size = sizeof(ServerInfo) * servers_count;
// Lock DB until this function returns.
idb_lockDB(self->db);
Defer(idb_unlockDB(self->db));
// Load servers table
try(self->servers.table, p,
idb_getOrCreateTable(self->db, STR("servers"), sizeof(ServerInfo), false)
);
// Lock table until this function returns.
idb_lockTable(self->servers.table);
Defer(idb_unlockTable(self->servers.table));
// load whole servers table to list
try_void(
idb_createListFromTable(self->servers.table, &self->servers.list, false)
);
// build address-id map
HashMap_construct(&self->servers_addr_id_map, u64, NULL);
try(u64 servers_count, u,
idb_getRowCount(self->servers.table, false)
);
HashMap_construct(&self->servers.addr_id_map, u64, NULL);
for(u64 id = 0; id < servers_count; id++){
ServerInfo* row = &List_index(self->servers_cache_list, ServerInfo, id);
ServerInfo* row = &List_index(self->servers.list, ServerInfo, id);
str key = str_construct(row->address, row->address_len, true);
if(!HashMap_tryPush(&self->servers_addr_id_map, key, &id)){
if(!HashMap_tryPush(&self->servers.addr_id_map, key, &id)){
Return RESULT_ERROR_FMT("duplicate server address '"FMT_str"'", key.size, key.data);
}
}
@ -369,29 +382,30 @@ static Result(ServerInfo*) ClientCLI_saveServerInfo(ClientCLI* self,
server.desc_len = desc.size;
memcpy(server.desc, desc.data, server.desc_len);
// lock servers cache
try_stderrcode(pthread_mutex_lock(&self->servers_cache_mutex));
Defer(pthread_mutex_unlock(&self->servers_cache_mutex));
// Lock table until this function returns.
// It may not change any data in table, but it uses associated cache structures.
idb_lockTable(self->servers.table);
Defer(idb_unlockTable(self->servers.table));
// try find server id in cache
ServerInfo* cached_row_ptr = NULL;
u64* id_ptr = NULL;
id_ptr = HashMap_tryGetPtr(&self->servers_addr_id_map, addr);
id_ptr = HashMap_tryGetPtr(&self->servers.addr_id_map, addr);
if(id_ptr){
// update existing server
u64 id = *id_ptr;
try_void(idb_updateRow(self->db_servers_table, id, &server));
try_assert(id < List_len(self->servers_cache_list, ServerInfo));
cached_row_ptr = &List_index(self->servers_cache_list, ServerInfo, id);
try_void(idb_updateRow(self->servers.table, id, &server, false));
try_assert(id < List_len(self->servers.list, ServerInfo));
cached_row_ptr = &List_index(self->servers.list, ServerInfo, id);
memcpy(cached_row_ptr, &server, sizeof(ServerInfo));
}
else {
// push new server
try(u64 id, u, idb_pushRow(self->db_servers_table, &server));
try_assert(id == List_len(self->servers_cache_list, ServerInfo));
List_pushMany(&self->servers_cache_list, ServerInfo, &server, 1);
cached_row_ptr = &List_index(self->servers_cache_list, ServerInfo, id);
try_assert(HashMap_tryPush(&self->servers_addr_id_map, addr, &id));
try(u64 id, u, idb_pushRow(self->servers.table, &server, false));
try_assert(id == List_len(self->servers.list, ServerInfo));
List_pushMany(&self->servers.list, ServerInfo, &server, 1);
cached_row_ptr = &List_index(self->servers.list, ServerInfo, id);
try_assert(HashMap_tryPush(&self->servers.addr_id_map, addr, &id));
}
Return RESULT_VALUE(p, cached_row_ptr);

View File

@ -9,10 +9,11 @@
typedef struct ClientCLI {
Client* client;
IncrementalDB* db;
Table* db_servers_table;
pthread_mutex_t servers_cache_mutex;
List(ServerInfo) servers_cache_list; // index is id
HashMap(u64) servers_addr_id_map; // key is server address
struct {
Table* table;
List(ServerInfo) list; // index is id
HashMap(u64) addr_id_map; // key is server address
} servers;
} ClientCLI;
void ClientCLI_construct(ClientCLI* self);

View File

@ -44,7 +44,7 @@ typedef struct IncrementalDB {
static const Magic32 TABLE_FILE_MAGIC = { .bytes = { 'I', 'D', 'B', 't' } };
void Table_close(Table* t){
static void Table_close(Table* t){
if(t == NULL)
return;
fclose(t->table_file);
@ -204,6 +204,16 @@ static Result(void) Table_validateRowSize(Table* t, u32 row_size){
}
void idb_close(IncrementalDB* db){
if(db == NULL)
return;
str_free(db->db_dir);
Array_free(db->aes_key);
HashMap_destroy(&db->tables_map);
pthread_mutex_destroy(&db->mutex);
free(db);
}
Result(IncrementalDB*) idb_open(str db_dir, NULLABLE(Array(u8) aes_key)){
Deferral(16);
try_assert(aes_key.size == 0 || aes_key.size == 16 || aes_key.size == 24 || aes_key.size == 32);
@ -228,21 +238,30 @@ Result(IncrementalDB*) idb_open(str db_dir, NULLABLE(Array(u8) aes_key)){
Return RESULT_VALUE(p, db);
}
void idb_close(IncrementalDB* db){
if(db == NULL)
return;
str_free(db->db_dir);
Array_free(db->aes_key);
HashMap_destroy(&db->tables_map);
pthread_mutex_destroy(&db->mutex);
free(db);
void idb_lockDB(IncrementalDB* db){
try_fatal_stderrcode(pthread_mutex_lock(&db->mutex));
}
Result(Table*) idb_getOrCreateTable(IncrementalDB* db, str table_name, u32 row_size){
void idb_unlockDB(IncrementalDB* db){
try_fatal_stderrcode(pthread_mutex_unlock(&db->mutex));
}
void idb_lockTable(Table* t){
try_fatal_stderrcode(pthread_mutex_lock(&t->mutex));
}
void idb_unlockTable(Table* t){
try_fatal_stderrcode(pthread_mutex_unlock(&t->mutex));
}
Result(Table*) idb_getOrCreateTable(IncrementalDB* db, str table_name, u32 row_size, bool lock_db){
Deferral(16);
// db lock
try_stderrcode(pthread_mutex_lock(&db->mutex));
Defer(pthread_mutex_unlock(&db->mutex));
if(lock_db){
idb_lockDB(db);
Defer(idb_unlockDB(db));
}
Table** tpp = HashMap_tryGetPtr(&db->tables_map, table_name);
if(tpp != NULL){
@ -320,11 +339,13 @@ Result(Table*) idb_getOrCreateTable(IncrementalDB* db, str table_name, u32 row_s
Return RESULT_VALUE(p, t);
}
Result(void) idb_getRows(Table* t, u64 id, void* dst, u64 count){
Result(void) idb_getRows(Table* t, u64 id, void* dst, u64 count, bool lock_table){
Deferral(8);
// table lock
try_stderrcode(pthread_mutex_lock(&t->mutex));
Defer(pthread_mutex_unlock(&t->mutex));
if(lock_table){
idb_lockTable(t);
Defer(idb_unlockTable(t));
}
if(id + count > t->row_count){
Return RESULT_ERROR_FMT(
@ -361,11 +382,13 @@ Result(void) idb_getRows(Table* t, u64 id, void* dst, u64 count){
Return RESULT_VOID;
}
Result(void) idb_updateRows(Table* t, u64 id, const void* src, u64 count){
Result(void) idb_updateRows(Table* t, u64 id, const void* src, u64 count, bool lock_table){
Deferral(8);
// table lock
try_stderrcode(pthread_mutex_lock(&t->mutex));
Defer(pthread_mutex_unlock(&t->mutex));
if(lock_table){
idb_lockTable(t);
Defer(idb_unlockTable(t));
}
if(id + count > t->row_count){
Return RESULT_ERROR_FMT(
@ -406,11 +429,13 @@ Result(void) idb_updateRows(Table* t, u64 id, const void* src, u64 count){
Return RESULT_VOID;
}
Result(u64) idb_pushRows(Table* t, const void* src, u64 count){
Result(u64) idb_pushRows(Table* t, const void* src, u64 count, bool lock_table){
Deferral(8);
// table lock
try_stderrcode(pthread_mutex_lock(&t->mutex));
Defer(pthread_mutex_unlock(&t->mutex));
if(lock_table){
idb_lockTable(t);
Defer(idb_unlockTable(t));
}
try_void(Table_setDirtyBit(t, true));
Defer(IGNORE_RESULT Table_setDirtyBit(t, false));
@ -442,11 +467,35 @@ Result(u64) idb_pushRows(Table* t, const void* src, u64 count){
Return RESULT_VALUE(u, new_row_index);
}
Result(u64) idb_getRowCount(Table* t){
Result(u64) idb_getRowCount(Table* t, bool lock_table){
Deferral(4);
// table lock
try_stderrcode(pthread_mutex_lock(&t->mutex));
Defer(pthread_mutex_unlock(&t->mutex));
if(lock_table){
idb_lockTable(t);
Defer(idb_unlockTable(t));
}
u64 count = t->row_count;
Return RESULT_VALUE(u, count);
}
Result(void) idb_createListFromTable(Table* t, List_* l, bool lock_table){
Deferral(1);
if(lock_table){
idb_lockTable(t);
Defer(idb_unlockTable(t));
}
u64 rows_count = t->row_count;
u64 total_size = rows_count * t->header.row_size;
*l = List_alloc_size(total_size);
l->size = total_size;
bool success = false;
Defer(if(!success) List_destroy(l));
try_void(idb_getRows(t, 0, l->data, rows_count, false));
success = true;
Return RESULT_VOID;
}

View File

@ -1,6 +1,7 @@
#pragma once
#include "tlibc/errors.h"
#include "tlibc/collections/List.h"
#define IDB_VERSION 2
#define IDB_AES_KEY_SIZE 32
@ -12,15 +13,35 @@ typedef struct Table Table;
Result(IncrementalDB*) idb_open(str db_dir, NULLABLE(Array(u8) aes_key));
void idb_close(IncrementalDB* db);
Result(Table*) idb_getOrCreateTable(IncrementalDB* db, str table_name, u32 row_size);
/// before performing atransaction on DB lock it manually or call functions with parameter lock_db=true
void idb_lockDB(IncrementalDB* db);
Result(void) idb_getRows(Table* t, u64 start_from_id, void* dst, u64 count);
#define idb_getRow(T, ID, DST) idb_getRows(T, ID, DST, 1)
/// USAGE:
/// idb_lockDB(db);
/// Defer(idb_unlockDB(db));
void idb_unlockDB(IncrementalDB* db);
Result(u64) idb_pushRows(Table* t, const void* src, u64 count);
#define idb_pushRow(T, SRC) idb_pushRows(T, SRC, 1)
/// before performing a transaction on Table lock it manually or call function with parameter lock_db=true
void idb_lockTable(Table* t);
Result(void) idb_updateRows(Table* t, u64 start_from_id, const void* src, u64 count);
#define idb_updateRow(T, ID, SRC) idb_updateRows(T, ID, SRC, 1)
/// USAGE:
/// idb_lockTable(t);
/// Defer(idb_unlockTable(t));
void idb_unlockTable(Table* t);
Result(u64) idb_getRowCount(Table* t);
Result(Table*) idb_getOrCreateTable(IncrementalDB* db, str table_name, u32 row_size, bool lock_db);
Result(void) idb_getRows(Table* t, u64 start_from_id, void* dst, u64 count, bool lock_table);
#define idb_getRow(T, ID, DST, LOCK) idb_getRows(T, ID, DST, 1, LOCK)
Result(u64) idb_pushRows(Table* t, const void* src, u64 count, bool lock_table);
#define idb_pushRow(T, SRC, LOCK) idb_pushRows(T, SRC, 1, LOCK)
Result(void) idb_updateRows(Table* t, u64 start_from_id, const void* src, u64 count, bool lock_table);
#define idb_updateRow(T, ID, SRC, LOCK) idb_updateRows(T, ID, SRC, 1, LOCK)
Result(u64) idb_getRowCount(Table* t, bool lock_table);
/// construct new list and load whole table into it
Result(void) idb_createListFromTable(Table* t, List_* l, bool lock_table);

View File

@ -1,7 +1,9 @@
#pragma once
#include "tcp-chat/common_constants.h"
#include "tlibc/time.h"
#include "tlibc/magic.h"
// TODO: add table versions
typedef struct UserInfo {
u16 name_len;
@ -30,15 +32,30 @@ typedef struct ChannelInfo {
char desc[CHANNEL_DESC_SIZE_MAX + 1];
} ATTRIBUTE_ALIGNED(4*1024) ChannelInfo;
typedef struct MessageMetadata {
DateTime receiving_time_utc;
u64 sender_id;
// not a table
typedef struct MessageMeta {
/*
In block messages can be stored with some padding (zero bytes) between them.
To distinguish message from padding, each message starts with MESSAGE_MAGIC.
*/
Magic32 magic;
u16 data_size;
} ATTRIBUTE_ALIGNED(64) MessageMetadata;
u64 id;
u64 sender_id;
DateTime receiving_time_utc;
} ATTRIBUTE_ALIGNED(64) MessageMeta;
#define MESSAGE_MAGIC ((Magic32){ .bytes = { 'M', 's', 'g', 'S' } })
// Stores some number of messages. Look in MessageBlockMeta to see how much.
typedef struct MessageBlock {
/* sequence of messages (MessageMetadata, byte[]) */
u8 data[16*1024];
} MessageBlock;
/* ((sequence MessageMeta), (sequence binary-data)) */
u8 data[64*1024 - 4];
} ATTRIBUTE_ALIGNED(64) MessageBlock;
// is used to find in which MessageBlock a message is stored
typedef struct MessageBlockMeta {
u64 message_id_first;
u32 messages_count;
} ATTRIBUTE_ALIGNED(16) MessageBlockMeta;

View File

@ -140,16 +140,19 @@ void SendMessageResponse_construct(SendMessageResponse* ptr, PacketHeader* heade
typedef struct GetMessageBlockRequest {
u64 message_block_id;
u64 message_id_first;
u32 messages_count;
} ALIGN_PACKET_STRUCT GetMessageBlockRequest;
void GetMessageBlockRequest_construct(GetMessageBlockRequest* ptr, PacketHeader* header,
u64 message_block_id);
u64 message_id_first, u32 messages_count);
typedef struct GetMessageBlockResponse {
u64 message_id_first;
u32 messages_count;
u32 data_size;
/* stream of size data_size */
/* stream of size data_size : ((sequence MessageMeta), (sequence binary-data)) */
} ALIGN_PACKET_STRUCT GetMessageBlockResponse;
void GetMessageBlockResponse_construct(GetMessageBlockResponse* ptr, PacketHeader* header,

View File

@ -34,15 +34,15 @@ declare_RequestHandler(Login)
}
// lock users cache
try_stderrcode(pthread_mutex_lock(&conn->server->users_cache_mutex));
idb_lockTable(conn->server->users.table);
bool unlocked_users_cache_mutex = false;
Defer(
if(!unlocked_users_cache_mutex)
pthread_mutex_unlock(&conn->server->users_cache_mutex)
idb_unlockTable(conn->server->users.table)
);
// try get id from name cache
u64* id_ptr = HashMap_tryGetPtr(&conn->server->users_name_id_map, username_str);
u64* id_ptr = HashMap_tryGetPtr(&conn->server->users.username_id_map, username_str);
if(id_ptr == NULL){
try_void(sendErrorMessage_f(log_ctx, conn, res_head,
LogSeverity_Warn,
@ -54,8 +54,8 @@ declare_RequestHandler(Login)
u64 user_id = *id_ptr;
// get user by id
try_assert(user_id < List_len(conn->server->users_cache_list, UserInfo));
UserInfo* u = &List_index(conn->server->users_cache_list, UserInfo, user_id);
try_assert(user_id < List_len(conn->server->users.cache_list, UserInfo));
UserInfo* u = &List_index(conn->server->users.cache_list, UserInfo, user_id);
// validate token hash
if(memcmp(req.token, u->token, sizeof(req.token)) != 0){
@ -67,7 +67,7 @@ declare_RequestHandler(Login)
}
// manually unlock mutex
pthread_mutex_unlock(&conn->server->users_cache_mutex);
idb_unlockTable(conn->server->users.table);
unlocked_users_cache_mutex = true;
// authorize

View File

@ -34,16 +34,16 @@ declare_RequestHandler(Register)
}
// lock users cache
try_stderrcode(pthread_mutex_lock(&conn->server->users_cache_mutex));
idb_lockTable(conn->server->users.table);
bool unlocked_users_cache_mutex = false;
// unlock mutex on error catch
Defer(
if(!unlocked_users_cache_mutex)
pthread_mutex_unlock(&conn->server->users_cache_mutex)
idb_unlockTable(conn->server->users.table)
);
// check if name is taken
if(HashMap_tryGetPtr(&conn->server->users_name_id_map, username_str) != NULL){
if(HashMap_tryGetPtr(&conn->server->users.username_id_map, username_str) != NULL){
try_void(sendErrorMessage_f(log_ctx, conn, res_head,
LogSeverity_Warn,
"Username '%s' already exists",
@ -63,13 +63,15 @@ declare_RequestHandler(Register)
DateTime_getUTC(&user.registration_time_utc);
// save new user to db and cache
try(u64 user_id, u, idb_pushRow(conn->server->db_users_table, &user));
try_assert(user_id == List_len(conn->server->users_cache_list, UserInfo));
List_pushMany(&conn->server->users_cache_list, UserInfo, &user, 1);
try_assert(HashMap_tryPush(&conn->server->users_name_id_map, username_str, &user_id));
try(u64 user_id, u,
idb_pushRow(conn->server->users.table, &user, false)
);
try_assert(user_id == List_len(conn->server->users.cache_list, UserInfo));
List_pushMany(&conn->server->users.cache_list, UserInfo, &user, 1);
try_assert(HashMap_tryPush(&conn->server->users.username_id_map, username_str, &user_id));
// manually unlock mutex
pthread_mutex_unlock(&conn->server->users_cache_mutex);
idb_unlockTable(conn->server->users.table);
unlocked_users_cache_mutex = true;
logInfo(log_ctx, "registered user '%s'", username_str.data);

View File

@ -13,6 +13,8 @@ declare_RequestHandler(SendMessage)
try_void(PacketHeader_validateContentSize(req_head, sizeof(req)));
try_void(EncryptedSocketTCP_recvStruct(&conn->sock, &req));
// send response
SendMessageResponse res;
SendMessageResponse_construct(&res, res_head, );

View File

@ -20,9 +20,13 @@ void Server_free(Server* self){
RSA_destroyPublicKey(&self->rsa_pk);
idb_close(self->db);
pthread_mutex_destroy(&self->users_cache_mutex);
List_destroy(self->users_cache_list);
HashMap_destroy(&self->users_name_id_map);
List_destroy(&self->users.cache_list);
HashMap_destroy(&self->users.username_id_map);
List_destroy(&self->messages.blocks_meta_list);
LList_MessageBlock_destroy(&self->messages.blocks_queue);
free(self->messages.incomplete_block);
free(self);
}
@ -92,24 +96,59 @@ Result(Server*) Server_create(str config_str, void* logger, LogFunction_t log_fu
// build users cache
logDebug(log_ctx, "loading users...");
pthread_mutex_init(&self->users_cache_mutex, NULL);
try(self->db_users_table, p, idb_getOrCreateTable(self->db, STR("users"), sizeof(UserInfo)));
// load whole table to list
try(u64 users_count, u, idb_getRowCount(self->db_users_table));
self->users_cache_list = List_alloc(UserInfo, users_count);
try_void(idb_getRows(self->db_users_table, 0, self->users_cache_list.data, users_count));
self->users_cache_list.size = sizeof(UserInfo) * users_count;
try(self->users.table, p,
idb_getOrCreateTable(self->db, STR("users"), sizeof(UserInfo), false)
);
// load whole users table to list
try_void(
idb_createListFromTable(self->users.table, &self->users.cache_list, false)
);
// build name-id map
HashMap_construct(&self->users_name_id_map, u64, NULL);
try(u64 users_count, u, idb_getRowCount(self->users.table, false));
HashMap_construct(&self->users.username_id_map, u64, NULL);
for(u64 id = 0; id < users_count; id++){
UserInfo* row = &List_index(self->users_cache_list, UserInfo, id);
UserInfo* row = &List_index(self->users.cache_list, UserInfo, id);
str key = str_construct(row->name, row->name_len, true);
if(!HashMap_tryPush(&self->users_name_id_map, key, &id)){
if(!HashMap_tryPush(&self->users.username_id_map, key, &id)){
Return RESULT_ERROR_FMT("duplicate user name '"FMT_str"'", key.size, key.data);
}
}
logDebug(log_ctx, "loaded "FMT_u64" users", users_count);
// build messages cache
logDebug(log_ctx, "loading messages...");
try(self->messages.blocks_table, p,
idb_getOrCreateTable(self->db, STR("message_blocks"), sizeof(MessageBlock), false)
);
try(self->messages.blocks_meta_table, p,
idb_getOrCreateTable(self->db, STR("message_blocks_meta"), sizeof(MessageBlockMeta), false)
);
// load whole message_blocks_meta table to list
try_void(
idb_createListFromTable(self->messages.blocks_meta_table, &self->messages.blocks_meta_list, false)
);
// load N last blocks to the queue
self->messages.incomplete_block = LLNode_MessageBlock_createZero();
self->messages.blocks_queue = LList_construct(MessageBlock, NULL);
try(u64 message_blocks_count, u, idb_getRowCount(self->messages.blocks_table, false));
u64 first_id = 0;
if(message_blocks_count > MESSAGE_BLOCKS_CACHE_COUNT)
first_id = message_blocks_count - MESSAGE_BLOCKS_CACHE_COUNT;
for(u64 id = first_id; id < message_blocks_count; id++){
LLNode(MessageBlock)* node = LLNode_MessageBlock_createZero();
LList_MessageBlock_insertAfter(
&self->messages.blocks_queue,
self->messages.blocks_queue.last,
node
);
try_void(idb_getRow(self->messages.blocks_table, id, node->value.data, false));
}
success = true;
Return RESULT_VALUE(p, self);
}

View File

@ -1,7 +1,7 @@
#pragma once
#include <pthread.h>
#include "tlibc/collections/HashMap.h"
#include "tlibc/collections/List.h"
#include "tlibc/collections/LList.h"
#include "tcp-chat/server.h"
#include "cryptography/AES.h"
#include "cryptography/RSA.h"
@ -11,6 +11,10 @@
typedef struct ClientConnection ClientConnection;
LList_declare(MessageBlock);
#define MESSAGE_BLOCKS_CACHE_COUNT 50
typedef struct Server {
/* from constructor */
void* logger;
@ -26,10 +30,19 @@ typedef struct Server {
/* database and cache */
IncrementalDB* db;
Table* db_users_table;
pthread_mutex_t users_cache_mutex;
List(UserInfo) users_cache_list; // index is id
HashMap(u64) users_name_id_map; // key is user name
struct {
Table* table;
List(UserInfo) cache_list; // index is id
HashMap(u64) username_id_map;
} users;
/* messages */
struct {
Table* blocks_table;
Table* blocks_meta_table;
List(MessageBlockMeta) blocks_meta_list; // index is id
LList(MessageBlock) blocks_queue; // last N MessageBlocks, ascending
LLNode(MessageBlock)* incomplete_block; // new messages are written here until block is full
} messages;
} Server;