#include "idb.h" #include "tlibc/filesystem.h" #include "tlibc/collections/HashMap.h" #include typedef union Magic32 { u32 n; u8 bytes[4]; } Magic32; typedef struct TableFileHeader { Magic32 magic; u16 version; bool _dirty_bit; u32 row_size; } __attribute__((aligned(256))) TableFileHeader; typedef struct Table { TableFileHeader header; IncrementalDB* db; str name; str table_file_path; str changes_file_path; FILE* table_file; FILE* changes_file; pthread_mutex_t mutex; u64 row_count; } Table; typedef struct IncrementalDB { str db_dir; HashMap(Table**) tables_map; pthread_mutex_t mutex; } IncrementalDB; static const Magic32 TABLE_FILE_MAGIC = { .bytes = { 'I', 'D', 'B', 't' } }; #define IDB_VERSION 0x01 #define try_pthread(CALL) do {\ int r = CALL;\ if(r != 0){\ Return RESULT_ERROR(strerror(r), false);\ }\ } while(0) void Table_close(Table* t){ fclose(t->table_file); fclose(t->changes_file); free(t->name.data); free(t->table_file_path.data); free(t->changes_file_path.data); pthread_mutex_destroy(&t->mutex); free(t); } // element destructor for HashMap(Table*) void TablePtr_destroy(void* t_ptr_ptr){ Table_close(*(Table**)t_ptr_ptr); } /// @param name must be null-terminated Result(void) validateTableName(str name){ char forbidden_characters[] = { '/', '\\', ':', ';', '?', '"', '\'', '\n', '\r', '\t'}; for(u32 i = 0; i < ARRAY_LEN(forbidden_characters); i++) { char c = forbidden_characters[i]; if(str_seekChar(name, c, 0) != -1){ return RESULT_ERROR_FMT( "Table name '%s' contains forbidden character '%c'", name.data, c); } } return RESULT_VOID; } Result(void) Table_readHeader(Table* t){ Deferral(8); // seek for start of the file try_void(file_seek(t->table_file, 0, SeekOrigin_Start)); // read header try_void(file_readStructsExactly(t->table_file, &t->header, sizeof(t->header), 1)); Return RESULT_VOID; } Result(void) Table_writeHeader(Table* t){ Deferral(8); // seek for start of the file try_void(file_seek(t->table_file, 0, SeekOrigin_Start)); // write header try_void(file_writeStructs(t->table_file, &t->header, sizeof(t->header), 1)); Return RESULT_VOID; } Result(void) Table_setDirtyBit(Table* t, bool val){ Deferral(8); t->header._dirty_bit = val; try_void(Table_writeHeader(t)); Return RESULT_VOID; } Result(bool) Table_getDirtyBit(Table* t){ Deferral(8); try_void(Table_readHeader(t)); Return RESULT_VALUE(i, t->header._dirty_bit); } Result(void) Table_calculateRowCount(Table* t){ Deferral(8); try(i64 file_size, i, file_getSize(t->table_file)); i64 data_size = file_size - sizeof(t->header); if(data_size % t->header.row_size != 0){ //TODO: fix table instead of trowing error Return RESULT_ERROR_FMT( "Table '%s' has invalid size. Last row is incomplete", t->name.data); } t->row_count = data_size / t->header.row_size; Return RESULT_VOID; } Result(void) Table_validateHeader(Table* t){ Deferral(8); if(t->header.magic.n != TABLE_FILE_MAGIC.n || t->header.row_size == 0) { Return RESULT_ERROR_FMT( "Table file '%s' has invalid header", t->table_file_path.data); } //TODO: check version try(bool dirty_bit, i, Table_getDirtyBit(t)); if(dirty_bit){ //TODO: handle dirty bit instead of throwing error Return RESULT_ERROR_FMT( "Table file '%s' has dirty bit set", t->table_file_path.data); } Return RESULT_VOID; } Result(void) Table_validateRowSize(Table* t, u32 row_size){ if(row_size != t->header.row_size){ Result(void) error_result = RESULT_ERROR_FMT( "Requested row size (%u) doesn't match saved row size (%u)", row_size, t->header.row_size); return error_result; } return RESULT_VOID; } Result(IncrementalDB*) idb_open(str db_dir){ Deferral(16); IncrementalDB* db = (IncrementalDB*)malloc(sizeof(IncrementalDB)); // if object construction fails, destroy incomplete object bool success = false; Defer({ if(!success) idb_close(db); }); // value of *db must be set to zero or behavior of idb_close will be undefined memset(db, 0, sizeof(IncrementalDB)); db->db_dir = str_copy(db_dir); try_void(dir_create(db->db_dir.data)); HashMap_construct(&db->tables_map, Table*, TablePtr_destroy); try_pthread(pthread_mutex_init(&db->mutex, NULL)); success = true; Return RESULT_VALUE(p, db); } void idb_close(IncrementalDB* db){ free(db->db_dir.data); HashMap_destroy(&db->tables_map); pthread_mutex_destroy(&db->mutex); free(db); } Result(Table*) idb_getOrCreateTable(IncrementalDB* db, str _table_name, u32 row_size){ Deferral(64); // db lock try_pthread(pthread_mutex_lock(&db->mutex)); Defer(pthread_mutex_unlock(&db->mutex)); Table** tpp = HashMap_tryGetPtr(&db->tables_map, _table_name); if(tpp != NULL){ Table* existing_table = *tpp; try_void(Table_validateRowSize(existing_table, row_size)); Return RESULT_VALUE(p, existing_table); } try_void(validateTableName(_table_name)); Table* t = (Table*)malloc(sizeof(Table)); // if object construction fails, destroy incomplete object bool success = false; Defer({ if(!success) Table_close(t); }); // value of *t must be set to zero or behavior of Table_close will be undefined memset(t, 0, sizeof(Table)); t->db = db; try_pthread(pthread_mutex_init(&t->mutex, NULL)); t->name = str_copy(_table_name); t->table_file_path = str_from_cstr( strcat_malloc(db->db_dir.data, path_seps, t->name.data, ".idb-table")); t->changes_file_path = str_from_cstr( strcat_malloc(db->db_dir.data, path_seps, t->name.data, ".idb-changes")); bool table_exists = file_exists(t->table_file_path.data); // open or create file with table data try(t->table_file, p, file_openOrCreateReadWrite(t->table_file_path.data)); // open or create file with backups of updated rows try(t->changes_file, p, file_openOrCreateReadWrite(t->changes_file_path.data)); if(table_exists){ // read table file try_void(Table_readHeader(t)); try_void(Table_validateHeader(t)); try_void(Table_validateRowSize(t, row_size)); try_void(Table_calculateRowCount(t)); } else { // create table file t->header.magic.n = TABLE_FILE_MAGIC.n; t->header.row_size = row_size; t->header.version = IDB_VERSION; try_void(Table_writeHeader(t)); } if(!HashMap_tryPush(&db->tables_map, t->name, &t)){ Result(void) error_result = RESULT_ERROR_FMT( "Table '%s' is already open", t->name.data); Return error_result; } success = true; Return RESULT_VALUE(p, t); } Result(void) idb_getRows(Table* t, u64 id, void* dst, u64 count){ Deferral(16); // table lock try_pthread(pthread_mutex_lock(&t->mutex)); Defer(pthread_mutex_unlock(&t->mutex)); if(id + count > t->row_count){ Return RESULT_ERROR_FMT( "Can't read " IFWIN("%llu", "%lu") " rows at index " IFWIN("%llu", "%lu") " because table '%s' has only " IFWIN("%llu", "%lu") " rows", count, id, t->name.data, t->row_count); } i64 file_pos = sizeof(t->header) + id * t->header.row_size; // seek for the row position in file try_void(file_seek(t->table_file, file_pos, SeekOrigin_Start)); // read rows from file try_void(file_readStructsExactly(t->table_file, dst, t->header.row_size, count)); Return RESULT_VOID; } Result(void) idb_updateRows(Table* t, u64 id, const void* src, u64 count){ Deferral(16); // table lock try_pthread(pthread_mutex_lock(&t->mutex)); Defer(pthread_mutex_unlock(&t->mutex)); if(id + count >= t->row_count){ Return RESULT_ERROR_FMT( "Can't update " IFWIN("%llu", "%lu") " rows at index " IFWIN("%llu", "%lu") " because table '%s' has only " IFWIN("%llu", "%lu") " rows", count, id, t->name.data, t->row_count); } try_void(Table_setDirtyBit(t, true)); Defer(Table_setDirtyBit(t, false)); i64 file_pos = sizeof(t->header) + id * t->header.row_size; // TODO: set dirty bit in backup file too // TODO: save old values to the backup file // seek for the row position in file try_void(file_seek(t->table_file, file_pos, SeekOrigin_Start)); // replace rows in file try_void(file_writeStructs(t->table_file, src, t->header.row_size, count)); Return RESULT_VOID; } Result(u64) idb_pushRows(Table* t, const void* src, u64 count){ Deferral(16); // table lock try_pthread(pthread_mutex_lock(&t->mutex)); Defer(pthread_mutex_unlock(&t->mutex)); try_void(Table_setDirtyBit(t, true)); Defer(Table_setDirtyBit(t, false)); const u64 new_row_index = t->row_count; // seek for end of the file try_void(file_seek(t->table_file, 0, SeekOrigin_End)); // write new rows to the file try_void(file_writeStructs(t->table_file, src, t->header.row_size, count)); t->row_count += count; Return RESULT_VALUE(u, new_row_index); } Result(u64) idb_getRowCount(Table* t){ Deferral(8); // table lock try_pthread(pthread_mutex_lock(&t->mutex)); Defer(pthread_mutex_unlock(&t->mutex)); u64 count = t->row_count; Return RESULT_VALUE(u, count); }