MASA-Core
SpecialRowWriter.cpp
Go to the documentation of this file.
00001 /*******************************************************************************
00002  *
00003  * Copyright (c) 2010-2015   Edans Sandes
00004  *
00005  * This file is part of MASA-Core.
00006  * 
00007  * MASA-Core is free software: you can redistribute it and/or modify
00008  * it under the terms of the GNU General Public License as published by
00009  * the Free Software Foundation, either version 3 of the License, or
00010  * (at your option) any later version.
00011  * 
00012  * MASA-Core is distributed in the hope that it will be useful,
00013  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00014  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00015  * GNU General Public License for more details.
00016  * 
00017  * You should have received a copy of the GNU General Public License
00018  * along with MASA-Core.  If not, see <http://www.gnu.org/licenses/>.
00019  *
00020  ******************************************************************************/
00021 
00022 #include "SpecialRowWriter.hpp"
00023 
00024 #include <stdio.h>
00025 #include <stdlib.h>
00026 #include <string.h>
00027 #include "../libmasa/libmasa.hpp"
00028 
00029 #define DEBUG   (0)
00030 
00031 
00032 #define MY_MUTEX_INIT
00033 #define MY_MUTEX_DESTROY
00034 #define MY_MUTEX_LOCK
00035 #define MY_MUTEX_UNLOCK
00036 
00037 /*
00038 #define MY_MUTEX_INIT           pthread_mutex_init(&mutex, NULL);
00039 #define MY_MUTEX_DESTROY        pthread_mutex_destroy(&mutex);
00040 #define MY_MUTEX_LOCK           pthread_mutex_lock(&mutex);
00041 #define MY_MUTEX_UNLOCK         pthread_mutex_unlock(&mutex);
00042 */
00043 
00044 SpecialRowWriter::SpecialRowWriter(string _directory, int _i0, int _j0, int _j1)
00045         : directory(_directory), i0(_i0), j0(_j0), j1(_j1) {
00046     this->element_size = sizeof(cell_t);
00047     this->lastRow = i0;
00048 
00049     MY_MUTEX_INIT
00050 }
00051 
00052 SpecialRowWriter::~SpecialRowWriter() {
00053         flush();
00054 
00055         MY_MUTEX_DESTROY
00056 }
00057 
00058 
00059 int SpecialRowWriter::getLastRow() {
00060         return lastRow;
00061 }
00062 
00063 void SpecialRowWriter::flush(int min_i) {
00064     for (map<int, FILE*>::iterator it = specialRows.begin(); it != specialRows.end(); it++) {
00065         int i = (*it).first;
00066         FILE* file = (*it).second;
00067         if (DEBUG) printf("Unflush: %08X,%08X\n", i, j0);
00068         fclose(file);
00069 
00070                 char old_path[500];
00071                 getFileName(i, true, old_path);
00072         if (i >= min_i || min_i == -1) {
00073                         remove(old_path);
00074                         if (DEBUG) printf("Removed %s\n", old_path);
00075         } else {
00076             char new_path[500];
00077             getFileName(i, false, new_path);
00078             rename(old_path, new_path);
00079             // TODO truncate file size from min_j*8 position (create parameter)
00080         }
00081     }
00082     specialRows.clear();
00083 }
00084 
00085 void SpecialRowWriter::getFileName(int i, bool temporary, char* str) {
00086         if (temporary) {
00087                 sprintf(str, "%s/tmp.%08X.%08X", directory.c_str(), i, j0);
00088         } else {
00089                 sprintf(str, "%s/%08X.%08X", directory.c_str(), i, j0);
00090         }
00091 }
00092 
00093 int SpecialRowWriter::write(int i, void* buf, int len) {
00094         FILE* file = NULL;
00095         MY_MUTEX_LOCK
00096         file = specialRows[i];
00097         if (file == NULL) {
00098             char str[500];
00099             getFileName(i, true, str);
00100             file = fopen(str, "wb");
00101             if (file == NULL) {
00102                 fprintf(stderr, "Could not create special row: %s\n", str);
00103                 exit(1);
00104             }
00105                 specialRows[i] = file;
00106         }
00107         MY_MUTEX_UNLOCK
00108 
00109         int ret = fwrite(buf, element_size, len, file);
00110         if (ret != len) {
00111         fprintf(stderr, "Could not write bytes to special row: %d != %d\n", ret, len);
00112         perror("Special Row - fwrite");
00113         exit(1);
00114         }
00115 
00116         if (ftell(file) >= abs(j1-j0)*element_size) {
00117                 MY_MUTEX_LOCK
00118                 specialRows.erase(i);
00119                 MY_MUTEX_UNLOCK
00120 
00121         lastRow = i;
00122 
00123         fclose(file);
00124         file = NULL;
00125 
00126         char old_path[500];
00127         char new_path[500];
00128             getFileName(i, true, old_path);
00129             getFileName(i, false, new_path);
00130         rename(old_path, new_path);
00131         }
00132 
00133         return ret;
00134 }
00135 
00136 /*
00137 mkdir testSRA
00138 rm testSRA/*
00139 g++ -pthread src/common/SpecialRowWriter.cpp -o test && ./test
00140 
00141 void* testFunctionThread(void* args) {
00142         SpecialRowWriter* writer = (SpecialRowWriter*)args;
00143         int i = (int)((float)rand()/RAND_MAX*100000000);
00144         int size = 10;
00145         cell_t data[size];
00146         float prob = ((float)rand()/RAND_MAX);
00147         if (prob > 0.90-i*0.2/1000000) {
00148                 sleep(1);
00149         }
00150         for (int j=0; j<100000000; j+=size) {
00151                 if (j%1000000 == 0) printf("Write[%d,%d]\n", i, j);
00152                 if (j%100000 == 0) i++;
00153                 writer->write(i, data, size);
00154         }
00155 }
00156 
00157 int main() {
00158         pthread_t thread[400];
00159         SpecialRowWriter* writer = new SpecialRowWriter("testSRA/", 0, 0, 1000000);
00160         for (int i=0; i<100; i++) {
00161                 if (i%5 == 0) sleep(1);
00162                 int rc = pthread_create(&thread[i], NULL, testFunctionThread, (void *)writer);
00163                 printf("pthread_create(thread[%d]): %d\n", i, rc);
00164         }
00165         for (int i=0; i<100; i++) {
00166                 pthread_join(thread[i], NULL);
00167         }
00168 }
00169 
00170 */