From b08ec2762971ed6ce06f128b472a2a43f53515e6 Mon Sep 17 00:00:00 2001 From: Timerix Date: Sat, 9 Aug 2025 21:44:06 +0300 Subject: [PATCH] added mutexes and defers to idb --- dependencies/tlibc | 2 +- src/db/idb.c | 216 ++++++++++++++++++++++++++++++--------------- src/db/idb.h | 33 +------ src/main.c | 108 ++++++++++++----------- 4 files changed, 207 insertions(+), 152 deletions(-) diff --git a/dependencies/tlibc b/dependencies/tlibc index fe9e44a..d04aac5 160000 --- a/dependencies/tlibc +++ b/dependencies/tlibc @@ -1 +1 @@ -Subproject commit fe9e44a660e23c28255ba27522d15ab94044f55b +Subproject commit d04aac567f105cc566db7b9b8e201c1a04e4ecea diff --git a/src/db/idb.c b/src/db/idb.c index f5bd8c2..d958421 100644 --- a/src/db/idb.c +++ b/src/db/idb.c @@ -1,11 +1,48 @@ #include "idb.h" #include "tlibc/filesystem.h" +#include "tlibc/collections/HashMap.h" +#include + +typedef union Magic32 { + u32 n; + u8 bytes[4]; +} Magic32; + +typedef struct TableFileHeader { + Magic32 magic; + u16 version; + bool _dirty_bit; + u32 row_size; +} __attribute__((aligned(256))) TableFileHeader; + +typedef struct Table { + TableFileHeader header; + IncrementalDB* db; + str name; + str table_file_path; + str changes_file_path; + FILE* table_file; + FILE* changes_file; + pthread_mutex_t mutex; + u64 row_count; +} Table; + +typedef struct IncrementalDB { + str db_dir; + HashMap(Table**) tables_map; + pthread_mutex_t mutex; +} IncrementalDB; static const Magic32 TABLE_FILE_MAGIC = { .bytes = { 'I', 'D', 'B', 't' } }; #define IDB_VERSION 0x01 -Result(void) Table_setDirtyBit(Table* t, bool val); -Result(bool) Table_getDirtyBit(Table* t); +#define try_pthread(CALL) do {\ + int r = CALL;\ + if(r != 0){\ + Return RESULT_ERROR(strerror(r), false);\ + }\ +} while(0) + void Table_close(Table* t){ fclose(t->table_file); @@ -13,9 +50,11 @@ void Table_close(Table* t){ free(t->name.data); free(t->table_file_path.data); free(t->changes_file_path.data); + pthread_mutex_destroy(&t->mutex); free(t); } +// element destructor for HashMap(Table*) void TablePtr_destroy(void* t_ptr_ptr){ Table_close(*(Table**)t_ptr_ptr); } @@ -36,66 +75,72 @@ Result(void) validateTableName(str name){ } Result(void) Table_readHeader(Table* t){ + Deferral(8); // seek for start of the file - try_void(file_seek(t->table_file, 0, SeekOrigin_Start), ) + try_void(file_seek(t->table_file, 0, SeekOrigin_Start)); // read header - try_void(file_readStructsExactly(t->table_file, &t->header, sizeof(t->header), 1), ); - return RESULT_VOID; + try_void(file_readStructsExactly(t->table_file, &t->header, sizeof(t->header), 1)); + Return RESULT_VOID; } Result(void) Table_writeHeader(Table* t){ + Deferral(8); // seek for start of the file - try_void(file_seek(t->table_file, 0, SeekOrigin_Start), ) + try_void(file_seek(t->table_file, 0, SeekOrigin_Start)); // write header - try_void(file_writeStructs(t->table_file, &t->header, sizeof(t->header), 1), ); - return RESULT_VOID; + try_void(file_writeStructs(t->table_file, &t->header, sizeof(t->header), 1)); + Return RESULT_VOID; } Result(void) Table_setDirtyBit(Table* t, bool val){ + Deferral(8); t->header._dirty_bit = val; - try_void(Table_writeHeader(t), ); - return RESULT_VOID; + try_void(Table_writeHeader(t)); + Return RESULT_VOID; } Result(bool) Table_getDirtyBit(Table* t){ - try_void(Table_readHeader(t), ); - return RESULT_VALUE(i, t->header._dirty_bit); + Deferral(8); + try_void(Table_readHeader(t)); + Return RESULT_VALUE(i, t->header._dirty_bit); } Result(void) Table_calculateRowCount(Table* t){ - try(file_size, file_getSize(t->table_file), ); - i64 data_size = file_size.i - sizeof(t->header); + Deferral(8); + try(i64 file_size, i, file_getSize(t->table_file)); + i64 data_size = file_size - sizeof(t->header); if(data_size % t->header.row_size != 0){ //TODO: fix table instead of trowing error - return RESULT_ERROR_FMT( + Return RESULT_ERROR_FMT( "Table '%s' has invalid size. Last row is incomplete", t->name.data); } t->row_count = data_size / t->header.row_size; - return RESULT_VOID; + Return RESULT_VOID; } Result(void) Table_validateHeader(Table* t){ + Deferral(8); if(t->header.magic.n != TABLE_FILE_MAGIC.n || t->header.row_size == 0) { - return RESULT_ERROR_FMT( + Return RESULT_ERROR_FMT( "Table file '%s' has invalid header", t->table_file_path.data); } //TODO: check version - try(dirty_bit, Table_getDirtyBit(t), ); - if(dirty_bit.i){ + try(bool dirty_bit, i, Table_getDirtyBit(t)); + if(dirty_bit){ //TODO: handle dirty bit instead of throwing error - return RESULT_ERROR_FMT( + Return RESULT_ERROR_FMT( "Table file '%s' has dirty bit set", t->table_file_path.data); } - return RESULT_VOID; + Return RESULT_VOID; } Result(void) Table_validateRowSize(Table* t, u32 row_size){ @@ -110,42 +155,61 @@ Result(void) Table_validateRowSize(Table* t, u32 row_size){ } Result(IncrementalDB*) idb_open(str db_dir){ + Deferral(16); IncrementalDB* db = (IncrementalDB*)malloc(sizeof(IncrementalDB)); + // if object construction fails, destroy incomplete object + bool success = false; + Defer({ + if(!success) + idb_close(db); + }); + // value of *db must be set to zero or behavior of idb_close will be undefined memset(db, 0, sizeof(IncrementalDB)); db->db_dir = str_copy(db_dir); - try_void(dir_create(db->db_dir.data), - idb_close(db)); + try_void(dir_create(db->db_dir.data)); HashMap_construct(&db->tables_map, Table*, TablePtr_destroy); - return RESULT_VALUE(p, db); + try_pthread(pthread_mutex_init(&db->mutex, NULL)); + + success = true; + Return RESULT_VALUE(p, db); } void idb_close(IncrementalDB* db){ free(db->db_dir.data); HashMap_destroy(&db->tables_map); + pthread_mutex_destroy(&db->mutex); free(db); } Result(Table*) idb_getOrCreateTable(IncrementalDB* db, str _table_name, u32 row_size){ - // TODO: implement whole db lock + Deferral(64); + // db lock + try_pthread(pthread_mutex_lock(&db->mutex)); + Defer(pthread_mutex_unlock(&db->mutex)); Table** tpp = HashMap_tryGetPtr(&db->tables_map, _table_name); if(tpp != NULL){ Table* existing_table = *tpp; - try_void(Table_validateRowSize(existing_table, row_size), ); - return RESULT_VALUE(p, existing_table); + try_void(Table_validateRowSize(existing_table, row_size)); + Return RESULT_VALUE(p, existing_table); } - str table_name_null_terminated = str_copy(_table_name); - try_void(validateTableName(table_name_null_terminated), - free(table_name_null_terminated.data)); + try_void(validateTableName(_table_name)); Table* t = (Table*)malloc(sizeof(Table)); + // if object construction fails, destroy incomplete object + bool success = false; + Defer({ + if(!success) + Table_close(t); + }); + // value of *t must be set to zero or behavior of Table_close will be undefined memset(t, 0, sizeof(Table)); - t->db = db; - t->name = table_name_null_terminated; + try_pthread(pthread_mutex_init(&t->mutex, NULL)); + t->name = str_copy(_table_name); t->table_file_path = str_from_cstr( strcat_malloc(db->db_dir.data, path_seps, t->name.data, ".idb-table")); t->changes_file_path = str_from_cstr( @@ -154,48 +218,44 @@ Result(Table*) idb_getOrCreateTable(IncrementalDB* db, str _table_name, u32 row_ bool table_exists = file_exists(t->table_file_path.data); // open or create file with table data - try(_table_file, file_openOrCreateReadWrite(t->table_file_path.data), - Table_close(t)); - t->table_file = _table_file.p; - + try(t->table_file, p, file_openOrCreateReadWrite(t->table_file_path.data)); // open or create file with backups of updated rows - try(_changes_file, file_openOrCreateReadWrite(t->changes_file_path.data), - Table_close(t)); - t->changes_file = _changes_file.p; + try(t->changes_file, p, file_openOrCreateReadWrite(t->changes_file_path.data)); if(table_exists){ - try_void(Table_readHeader(t), - Table_close(t)); - try_void(Table_validateHeader(t), - Table_close(t)); - try_void(Table_validateRowSize(t, row_size), - Table_close(t)); - try_void(Table_calculateRowCount(t), - Table_close(t)); + // read table file + try_void(Table_readHeader(t)); + try_void(Table_validateHeader(t)); + try_void(Table_validateRowSize(t, row_size)); + try_void(Table_calculateRowCount(t)); } else { + // create table file t->header.magic.n = TABLE_FILE_MAGIC.n; t->header.row_size = row_size; t->header.version = IDB_VERSION; - try_void(Table_writeHeader(t), - Table_close(t)); + try_void(Table_writeHeader(t)); } if(!HashMap_tryPush(&db->tables_map, t->name, &t)){ Result(void) error_result = RESULT_ERROR_FMT( "Table '%s' is already open", t->name.data); - Table_close(t); - return error_result; + Return error_result; } - return RESULT_VALUE(p, t); + success = true; + Return RESULT_VALUE(p, t); } Result(void) idb_getRows(Table* t, u64 id, void* dst, u64 count){ - // TODO: implement table lock + Deferral(16); + // table lock + try_pthread(pthread_mutex_lock(&t->mutex)); + Defer(pthread_mutex_unlock(&t->mutex)); + if(id + count > t->row_count){ - return RESULT_ERROR_FMT( + Return RESULT_ERROR_FMT( "Can't read " IFWIN("%llu", "%lu") " rows at index " IFWIN("%llu", "%lu") " because table '%s' has only " IFWIN("%llu", "%lu") " rows", count, id, t->name.data, t->row_count); @@ -204,22 +264,28 @@ Result(void) idb_getRows(Table* t, u64 id, void* dst, u64 count){ i64 file_pos = sizeof(t->header) + id * t->header.row_size; // seek for the row position in file - try_void(file_seek(t->table_file, file_pos, SeekOrigin_Start), ); + try_void(file_seek(t->table_file, file_pos, SeekOrigin_Start)); // read rows from file - try_void(file_readStructsExactly(t->table_file, dst, t->header.row_size, count), ); + try_void(file_readStructsExactly(t->table_file, dst, t->header.row_size, count)); - return RESULT_VOID; + Return RESULT_VOID; } Result(void) idb_updateRows(Table* t, u64 id, const void* src, u64 count){ + Deferral(16); + // table lock + try_pthread(pthread_mutex_lock(&t->mutex)); + Defer(pthread_mutex_unlock(&t->mutex)); + if(id + count >= t->row_count){ - return RESULT_ERROR_FMT( + Return RESULT_ERROR_FMT( "Can't update " IFWIN("%llu", "%lu") " rows at index " IFWIN("%llu", "%lu") " because table '%s' has only " IFWIN("%llu", "%lu") " rows", count, id, t->name.data, t->row_count); } - try_void(Table_setDirtyBit(t, true), ); + try_void(Table_setDirtyBit(t, true)); + Defer(Table_setDirtyBit(t, false)); i64 file_pos = sizeof(t->header) + id * t->header.row_size; @@ -227,26 +293,38 @@ Result(void) idb_updateRows(Table* t, u64 id, const void* src, u64 count){ // TODO: save old values to the backup file // seek for the row position in file - try_void(file_seek(t->table_file, file_pos, SeekOrigin_Start), ); + try_void(file_seek(t->table_file, file_pos, SeekOrigin_Start)); // replace rows in file - try_void(file_writeStructs(t->table_file, src, t->header.row_size, count), ); + try_void(file_writeStructs(t->table_file, src, t->header.row_size, count)); - try_void(Table_setDirtyBit(t, false), ); - return RESULT_VOID; + Return RESULT_VOID; } Result(u64) idb_pushRows(Table* t, const void* src, u64 count){ - try_void(Table_setDirtyBit(t, true), ); + Deferral(16); + // table lock + try_pthread(pthread_mutex_lock(&t->mutex)); + Defer(pthread_mutex_unlock(&t->mutex)); + + try_void(Table_setDirtyBit(t, true)); + Defer(Table_setDirtyBit(t, false)); const u64 new_row_index = t->row_count; // seek for end of the file - try_void(file_seek(t->table_file, 0, SeekOrigin_End), ); + try_void(file_seek(t->table_file, 0, SeekOrigin_End)); // write new rows to the file - try_void(file_writeStructs(t->table_file, src, t->header.row_size, count), ); + try_void(file_writeStructs(t->table_file, src, t->header.row_size, count)); t->row_count += count; - - try_void(Table_setDirtyBit(t, false), ); - return RESULT_VALUE(u, new_row_index); + Return RESULT_VALUE(u, new_row_index); +} + +Result(u64) idb_getRowCount(Table* t){ + Deferral(8); + // table lock + try_pthread(pthread_mutex_lock(&t->mutex)); + Defer(pthread_mutex_unlock(&t->mutex)); + u64 count = t->row_count; + Return RESULT_VALUE(u, count); } diff --git a/src/db/idb.h b/src/db/idb.h index d6ff62d..62045b0 100644 --- a/src/db/idb.h +++ b/src/db/idb.h @@ -2,42 +2,15 @@ #include "tlibc/errors.h" #include "tlibc/string/str.h" -#include "tlibc/collections/Array.h" -#include "tlibc/collections/HashMap.h" typedef struct IncrementalDB IncrementalDB; +typedef struct Table Table; -typedef union Magic32 { - u32 n; - u8 bytes[4]; -} Magic32; - -typedef struct TableHeader { - Magic32 magic; - u16 version; - bool _dirty_bit; - u32 row_size; -} __attribute__((aligned(256))) TableHeader; - -typedef struct Table { - TableHeader header; - IncrementalDB* db; - str name; - str table_file_path; - str changes_file_path; - FILE* table_file; - FILE* changes_file; - u64 row_count; -} Table; - -typedef struct IncrementalDB { - str db_dir; - HashMap(Table**) tables_map; -} IncrementalDB; Result(IncrementalDB*) idb_open(str db_dir); void idb_close(IncrementalDB* db); + Result(Table*) idb_getOrCreateTable(IncrementalDB* db, str _table_name, u32 row_size); Result(void) idb_getRows(Table* t, u64 id, void* dst, u64 count); @@ -48,3 +21,5 @@ Result(u64) idb_pushRows(Table* t, const void* src, u64 count); Result(void) idb_updateRows(Table* t, u64 id, const void* src, u64 count); #define idb_updateRow(T, ID, SRC) idb_updateRows(T, ID, SRC, 1) + +Result(u64) idb_getRowCount(Table* t); diff --git a/src/main.c b/src/main.c index aee66d3..16e16ad 100755 --- a/src/main.c +++ b/src/main.c @@ -7,6 +7,7 @@ #include Result(void) test_aes(){ + Deferral(64); const str password = STR("abobus"); const Array(const char) data = str_castTo_Array(STR("0123456789_hii_")); @@ -39,7 +40,7 @@ Result(void) test_aes(){ free(decrypted_str.data); free(buffer.data); - return RESULT_VOID; + Return RESULT_VOID; } @@ -76,47 +77,47 @@ void line_trim(str* line, bool set_zero_at_end){ } void* test_server(void* data){ + Deferral(64); printf_safe("[server]: opening main socket\n"); - try_fatal(main_socket, socket_open_TCP(), ); + try_fatal(Socket main_socket, i, socket_open_TCP()); + Defer(socket_close(main_socket)); + EndpointIPv4 server_end = EndpointIPv4_create(AddressIPv4_LOOPBACK, 24500); - try_fatal(_20, socket_bind(main_socket.i, server_end), socket_close(main_socket.i)); - try_fatal(_30, socket_listen(main_socket.i, 64), socket_close(main_socket.i)); + try_fatal_void(socket_bind(main_socket, server_end)); + try_fatal_void(socket_listen(main_socket, 64)); printf_safe("[server]: accepting client connection\n"); - try_fatal(client_conn, socket_accept(main_socket.i, NULL), socket_close(main_socket.i)); - + try_fatal(Socket client_socket, i, socket_accept(main_socket, NULL)); + Defer(socket_close(client_socket)); + // last byte is reserved for '\0' Array(u8) buf = Array_construct(malloc(1024), u8, 1023); printf_safe("[server]: receiving data from client\n"); while(true){ - try_fatal(read_n, - socket_recv(client_conn.i, buf), - { - socket_close(client_conn.i); - socket_close(main_socket.i); - }); - if(read_n.i == 0){ + try_fatal(i32 read_n, i, socket_recv(client_socket, buf)); + if(read_n == 0){ sleepMsec(20); continue; } - str message = str_construct(buf.data, read_n.u, false); + str message = str_construct(buf.data, read_n, false); line_trim(&message, true); printf_safe("[server]: received '%s'\n", message.data); } - socket_close(client_conn.i); + printf_safe("[server]: client socket closed\n"); perror("errno:"); - socket_close(main_socket.i); - return NULL; + Return NULL; } void* test_client(void* data){ + Deferral(64); printf_safe("[client]: opening socket\n"); - try_fatal(client_socket, socket_open_TCP(), ); + try_fatal(Socket client_socket, i, socket_open_TCP()); + Defer(socket_close(client_socket)); + printf_safe("[client]: connecting to server\n"); EndpointIPv4 server_end = EndpointIPv4_create(AddressIPv4_LOOPBACK, 24500); - try_fatal(_20, - socket_connect(client_socket.i, server_end), - socket_close(client_socket.i)); + try_fatal_void(socket_connect(client_socket, server_end)); + Array(u8) buf = Array_alloc(u8, 1024); printf_safe("[client]: reading stdin\n"); while(fgets(buf.data, buf.size, stdin) != NULL){ @@ -126,38 +127,42 @@ void* test_client(void* data){ printf_safe("[client]: quit\n"); break; } - try_fatal(_50, - socket_send(client_socket.i, str_castTo_Array(line)), - socket_close(client_socket.i)); + try_fatal_void(socket_send(client_socket, str_castTo_Array(line))); } + printf_safe("[client]: closing connection\n"); - socket_close(client_socket.i); - return NULL; + Return NULL; } Result(void) test_network(){ - if(pthread_mutex_init(&stdout_mutex, NULL) != 0) - return RESULT_ERROR("can't init mutex", false); + Deferral(64); + if(pthread_mutex_init(&stdout_mutex, NULL) != 0){ + Return RESULT_ERROR("can't init mutex", false); + } pthread_t server_thread = {0}; - if(pthread_create(&server_thread, NULL, test_server, NULL) != 0) - return RESULT_ERROR("can't create server thread", false); + if(pthread_create(&server_thread, NULL, test_server, NULL) != 0){ + Return RESULT_ERROR("can't create server thread", false); + } sleepMsec(100); test_client(NULL); printf_safe("[main]: joining server thread\n"); - if(pthread_join(server_thread, NULL) != 0) - return RESULT_ERROR("can't join server thread", false); - if(pthread_mutex_destroy(&stdout_mutex) != 0) - return RESULT_ERROR("can't destroy mutex", false); + if(pthread_join(server_thread, NULL) != 0){ + Return RESULT_ERROR("can't join server thread", false); + } + if(pthread_mutex_destroy(&stdout_mutex) != 0){ + Return RESULT_ERROR("can't destroy mutex", false); + } printf_safe("[main]: completed\n"); - return RESULT_VOID; + Return RESULT_VOID; } Result(void) test_db(){ - try(_db, idb_open(STR("idb")), ); - IncrementalDB* db = _db.p; + Deferral(64); + try(IncrementalDB* db, p, idb_open(STR("idb"))); + Defer(idb_close(db)); const u32 row_size = 8; const u32 rows_count = 5; @@ -167,29 +172,26 @@ Result(void) test_db(){ char buffer[512]; memset(buffer, 0, 512); - try(_t0, idb_getOrCreateTable(db, STR("test0"), row_size), idb_close(db)); - Table* t0 = _t0.p; - printf("table '%s' created\n", t0->name.data); - printf("\t%s\n", t0->table_file_path.data); - printf("\t%s\n", t0->changes_file_path.data); - - idb_pushRows(t0, const_rows, rows_count); + try(Table* t0, p, idb_getOrCreateTable(db, STR("test0"), row_size)); + printf("table 'test0' created\n"); + + try_void(idb_pushRows(t0, const_rows, rows_count)); const u32 indices[] = { 0, 1, 4, 3, 4, 0 }; for(u32 i = 0; i < ARRAY_LEN(indices); i++){ - try_void(idb_getRow(t0, indices[i], buffer), idb_close(db)); + try_void(idb_getRow(t0, indices[i], buffer)); printf("row %u: %s\n", indices[i], buffer); } - idb_close(db); - return RESULT_VOID; + Return RESULT_VOID; } int main(){ - try_fatal(_10, network_init(), ); - // try_fatal(_20, test_aes(), ); - // try_fatal(_30, test_network(), ); - try_fatal(_40, test_db(), ); - try_fatal(_100, network_deinit(), ); - return 0; + Deferral(32); + try_fatal_void(network_init()); + Defer(network_deinit()); + // try_fatal_void(test_aes()); + // try_fatal_void(test_network()); + try_fatal_void(test_db()); + Return 0; }