|
MASA-Core
|
00001 /******************************************************************************* 00002 * 00003 * Copyright (c) 2010-2015 Edans Sandes 00004 * 00005 * This file is part of MASA-Core. 00006 * 00007 * MASA-Core is free software: you can redistribute it and/or modify 00008 * it under the terms of the GNU General Public License as published by 00009 * the Free Software Foundation, either version 3 of the License, or 00010 * (at your option) any later version. 00011 * 00012 * MASA-Core is distributed in the hope that it will be useful, 00013 * but WITHOUT ANY WARRANTY; without even the implied warranty of 00014 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00015 * GNU General Public License for more details. 00016 * 00017 * You should have received a copy of the GNU General Public License 00018 * along with MASA-Core. If not, see <http://www.gnu.org/licenses/>. 00019 * 00020 ******************************************************************************/ 00021 00022 #include "Buffer2.hpp" 00023 00024 #include <pthread.h> 00025 #include <stdio.h> 00026 #include <string.h> 00027 #include <stdlib.h> 00028 #include <math.h> 00029 //#include <sys/time.h> 00030 00031 #include "../Timer.hpp" 00032 #include <time.h> 00033 #include <unistd.h> 00034 #include "BufferLogger.hpp" 00035 00036 #define DEBUG (0) 00037 00038 Buffer2::Buffer2(int buffer_max) { 00039 this->buffer_max = buffer_max; 00040 buffer_size = buffer_max+5; 00041 buffer_start = 0; 00042 buffer_end = 0; 00043 buffer = (cell_t*)malloc(buffer_size*sizeof(cell_t)); 00044 destroyed = false; 00045 isLogging = false; 00046 00047 pthread_mutex_init(&mutex, NULL); 00048 pthread_cond_init(¬FullCond, NULL); 00049 pthread_cond_init(¬EmptyCond, NULL); 00050 pthread_cond_init(&emptyCond, NULL); 00051 pthread_cond_init(&loggerCond, NULL); 00052 00053 stats.bufferUsage = 0; 00054 stats.totalReadBytes = 0; 00055 stats.blockingReadTime = 0; 00056 stats.totalWriteBytes = 0; 00057 stats.blockingWriteTime = 0; 00058 00059 tempBlockingReadTime = -1; 00060 tempBlockingWriteTime = -1; 00061 00062 /*mutex = PTHREAD_MUTEX_INITIALIZER; 00063 notFullCond = PTHREAD_COND_INITIALIZER; 00064 notEmptyCond = PTHREAD_COND_INITIALIZER; 00065 emptyCond = PTHREAD_COND_INITIALIZER;*/ 00066 } 00067 00068 Buffer2::~Buffer2() { 00069 fprintf(stderr, "Destruct Buffer...\n"); 00070 free(buffer); 00071 } 00072 00073 void Buffer2::destroy() { 00074 fprintf(stderr, "Buffer2::destroy()...\n"); 00075 pthread_mutex_lock(&mutex); 00076 00077 destroyed = true; 00078 pthread_cond_signal(¬FullCond); 00079 pthread_cond_signal(¬EmptyCond); 00080 pthread_cond_signal(&emptyCond); 00081 pthread_cond_signal(&loggerCond); 00082 00083 pthread_mutex_unlock(&mutex); 00084 00085 if (isLogging) { 00086 isLogging = false; 00087 pthread_join(loggerThread, NULL); 00088 } 00089 fprintf(stderr, "Buffer2::destroy() DONE\n"); 00090 } 00091 00092 bool Buffer2::isDestroyed() { 00093 return destroyed; 00094 } 00095 00096 void Buffer2::waitEmptyBuffer() { 00097 pthread_mutex_lock(&mutex); 00098 while (sizeUsed() > 0 && !destroyed) { 00099 fprintf (stderr, "Waiting empty buffer: %d\n", sizeUsed()); 00100 pthread_cond_wait (&emptyCond, &mutex); 00101 } 00102 pthread_mutex_unlock(&mutex); 00103 } 00104 00105 int Buffer2::circularLoad(cell_t* dst, int len) { 00106 00107 if (buffer_start+len < buffer_size) { 00108 memcpy(dst, buffer+buffer_start, len*sizeof(cell_t)); 00109 buffer_start += len; 00110 } else { 00111 memcpy(dst, buffer+buffer_start, (buffer_size-buffer_start)*sizeof(cell_t)); 00112 memcpy(dst+(buffer_size-buffer_start), buffer, (len-(buffer_size-buffer_start))*sizeof(cell_t)); 00113 buffer_start = len-(buffer_size-buffer_start); 00114 } 00115 if (sizeUsed() > 0) { 00116 pthread_cond_signal(¬FullCond); 00117 } else { 00118 pthread_cond_signal(&emptyCond); 00119 } 00120 return len; 00121 } 00122 00123 int Buffer2::circularSkip(int len) { 00124 00125 if (buffer_start+len < buffer_size) { 00126 buffer_start += len; 00127 } else { 00128 buffer_start = len-(buffer_size-buffer_start); 00129 } 00130 if (sizeUsed() > 0) { 00131 pthread_cond_signal(¬FullCond); 00132 } else { 00133 pthread_cond_signal(&emptyCond); 00134 } 00135 return len; 00136 } 00137 00138 00139 int Buffer2::circularStore(const cell_t* src, int len) { 00140 if (buffer_end+len < buffer_size) { 00141 memcpy(buffer+buffer_end, src, len*sizeof(cell_t)); 00142 buffer_end += len; 00143 } else { 00144 memcpy(buffer+buffer_end, src, (buffer_size-buffer_end)*sizeof(cell_t)); 00145 memcpy(buffer, src+(buffer_size-buffer_end), (len - (buffer_size-buffer_end))*sizeof(cell_t)); 00146 buffer_end = len - (buffer_size-buffer_end); 00147 } 00148 if (sizeUsed() > 0) { 00149 pthread_cond_signal(¬EmptyCond); 00150 } 00151 return len; 00152 } 00153 00154 int Buffer2::sizeAvailable() { 00155 return buffer_max - sizeUsed(); 00156 } 00157 00158 int Buffer2::sizeUsed() { 00159 if (buffer_end >= buffer_start) { 00160 return buffer_end - buffer_start; 00161 } else { 00162 return buffer_end - buffer_start + buffer_size; 00163 } 00164 } 00165 00166 int Buffer2::readBuffer(cell_t* data, int nmemb) 00167 { 00168 pthread_mutex_lock(&mutex); 00169 if (DEBUG) printf("Buffer2::readBuffer(%d) - buf: %d\n", nmemb, sizeUsed()); 00170 int size_total = nmemb; 00171 int size_left = size_total; 00172 while ((sizeUsed() < size_left) && !destroyed) { 00173 if (data == NULL) { 00174 size_left -= circularSkip(sizeUsed()); 00175 } else { 00176 size_left -= circularLoad(data+(size_total-size_left), sizeUsed()); 00177 } 00178 float t0 = Timer::getGlobalTime(); 00179 tempBlockingReadTime = t0; 00180 pthread_cond_wait (¬EmptyCond, &mutex); 00181 tempBlockingReadTime = -1; 00182 float t1 = Timer::getGlobalTime(); 00183 stats.blockingReadTime += (t1-t0); 00184 } 00185 if (!destroyed) { 00186 if (data == NULL) { 00187 size_left -= circularSkip(size_left); 00188 } else { 00189 size_left -= circularLoad(data+(size_total-size_left), size_left); 00190 } 00191 } 00192 stats.bufferUsage = sizeUsed(); 00193 if (stats.totalReadBytes == 0/* && inputBuffer*/) { 00194 pthread_cond_signal(&loggerCond); 00195 } 00196 stats.totalReadBytes += (size_total-size_left); 00197 pthread_mutex_unlock(&mutex); 00198 00199 if (size_left != 0) fprintf(stderr, "readBuffer: diff len: %d %d\n", size_total-size_left, size_total); 00200 00201 return size_total-size_left; 00202 } 00203 00204 int Buffer2::writeBuffer(const cell_t* data, int nmemb) 00205 { 00206 pthread_mutex_lock(&mutex); 00207 if (DEBUG) printf("Buffer2::writeBuffer(%d) - buf: %d\n", nmemb, sizeUsed()); 00208 int size_total = nmemb; 00209 int size_left = size_total; 00210 while ((sizeAvailable() < size_left) && !destroyed) { 00211 size_left -= circularStore(data+(size_total-size_left), sizeAvailable()); 00212 float t0 = Timer::getGlobalTime(); 00213 tempBlockingWriteTime = t0; 00214 pthread_cond_wait (¬FullCond, &mutex); 00215 tempBlockingWriteTime = -1; 00216 float t1 = Timer::getGlobalTime(); 00217 stats.blockingWriteTime += (t1-t0); 00218 } 00219 if (!destroyed) { 00220 size_left -= circularStore(data+(size_total-size_left), size_left); 00221 } 00222 stats.bufferUsage = sizeUsed(); 00223 if (stats.totalWriteBytes == 0/* && !inputBuffer*/) { 00224 pthread_cond_signal(&loggerCond); 00225 } 00226 stats.totalWriteBytes += (size_total-size_left); 00227 pthread_mutex_unlock(&mutex); 00228 if (size_left != 0) fprintf(stderr, "writeBuffer: diff len: %d %d\n", size_total-size_left, size_total); 00229 00230 return size_total-size_left; 00231 } 00232 00233 void *Buffer2::staticLogThread(void *arg) { 00234 Buffer2* buffer = (Buffer2*)arg; 00235 buffer->logThread(); 00236 return NULL; 00237 } 00238 00239 int Buffer2::getCapacity() { 00240 return buffer_max; 00241 } 00242 00243 buffer2_statistics_t Buffer2::getStatistics() { 00244 buffer2_statistics_t stats = this->stats; 00245 stats.time = Timer::getGlobalTime(); 00246 00247 pthread_mutex_lock(&mutex); 00248 if (tempBlockingReadTime != -1) { 00249 stats.blockingReadTime += stats.time - tempBlockingReadTime; 00250 } 00251 if (tempBlockingWriteTime != -1) { 00252 stats.blockingWriteTime += stats.time - tempBlockingWriteTime; 00253 } 00254 pthread_mutex_unlock(&mutex); 00255 00256 return stats; 00257 } 00258 00259 void Buffer2::logThread() { 00260 BufferLogger* logger = new BufferLogger(logFile); 00261 logger->logHeader(buffer_max); 00262 while (!isDestroyed()) { 00263 timeval event; 00264 gettimeofday(&event, NULL); 00265 00266 struct timespec time; 00267 time.tv_sec = event.tv_sec + (int)logInterval; 00268 time.tv_nsec = event.tv_usec*1000 + (int)((logInterval - floor(logInterval))*1000000000); 00269 if (time.tv_nsec >= 1000000000) { 00270 time.tv_nsec -= 1000000000; 00271 time.tv_sec++; 00272 } 00273 00274 pthread_mutex_lock(&mutex); 00275 pthread_cond_timedwait(&loggerCond, &mutex, &time); 00276 pthread_mutex_unlock(&mutex); 00277 00278 buffer2_statistics_t stats; 00279 stats = getStatistics(); 00280 00281 logger->logBuffer(stats); 00282 } 00283 delete logger; 00284 } 00285 00286 void Buffer2::setLogFile(string logFile, float interval) { 00287 this->logFile = logFile; 00288 this->logInterval = interval; 00289 if (this->logInterval < 0.1f) { 00290 this->logInterval = 0.1f; // minimum interval. 00291 } 00292 this->isLogging = true; 00293 int rc = pthread_create(&loggerThread, NULL, staticLogThread, (void *)this); 00294 if (rc){ 00295 fprintf(stderr, "setLogFile ERROR; return code from pthread_create() is %d\n", rc); 00296 exit(-1); 00297 } 00298 } 00299
1.7.6.1