|
MASA-Core
|
00001 /******************************************************************************* 00002 * 00003 * Copyright (c) 2010-2015 Edans Sandes 00004 * 00005 * This file is part of MASA-Core. 00006 * 00007 * MASA-Core is free software: you can redistribute it and/or modify 00008 * it under the terms of the GNU General Public License as published by 00009 * the Free Software Foundation, either version 3 of the License, or 00010 * (at your option) any later version. 00011 * 00012 * MASA-Core is distributed in the hope that it will be useful, 00013 * but WITHOUT ANY WARRANTY; without even the implied warranty of 00014 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00015 * GNU General Public License for more details. 00016 * 00017 * You should have received a copy of the GNU General Public License 00018 * along with MASA-Core. If not, see <http://www.gnu.org/licenses/>. 00019 * 00020 ******************************************************************************/ 00021 00022 #include "Buffer.hpp" 00023 00024 #include <pthread.h> 00025 #include <stdio.h> 00026 #include <string.h> 00027 #include <stdlib.h> 00028 //#include <sys/time.h> 00029 00030 #include "../Timer.hpp" 00031 #include <time.h> 00032 #include <unistd.h> 00033 #include "BufferLogger.hpp" 00034 00035 Buffer::Buffer() { 00036 buffer_max = 8*1024*1024; 00037 buffer_size = buffer_max+5; 00038 buffer_start = 0; 00039 buffer_end = 0; 00040 buffer = (char*)malloc(buffer_size); 00041 destroyed = false; 00042 isLogging = false; 00043 00044 pthread_mutex_init(&mutex, NULL); 00045 pthread_cond_init(¬FullCond, NULL); 00046 pthread_cond_init(¬EmptyCond, NULL); 00047 pthread_cond_init(&emptyCond, NULL); 00048 pthread_cond_init(&loggerCond, NULL); 00049 00050 blockingReadTime = 0; 00051 blockingWriteTime = 0; 00052 volatileUsage = 0; 00053 bytesRead = 0; 00054 bytesWritten = 0; 00055 00056 tempBlockingReadTime = -1; 00057 tempBlockingWriteTime = -1; 00058 00059 inputBuffer = false; 00060 00061 /*mutex = PTHREAD_MUTEX_INITIALIZER; 00062 notFullCond = PTHREAD_COND_INITIALIZER; 00063 notEmptyCond = PTHREAD_COND_INITIALIZER; 00064 emptyCond = PTHREAD_COND_INITIALIZER;*/ 00065 } 00066 00067 Buffer::~Buffer() { 00068 fprintf(stderr, "Destruct Buffer...\n"); 00069 free(buffer); 00070 } 00071 00072 void Buffer::destroy() { 00073 pthread_mutex_lock(&mutex); 00074 00075 destroyed = true; 00076 pthread_cond_signal(¬FullCond); 00077 pthread_cond_signal(¬EmptyCond); 00078 pthread_cond_signal(&emptyCond); 00079 pthread_cond_signal(&loggerCond); 00080 00081 pthread_mutex_unlock(&mutex); 00082 00083 00084 if (inputBuffer) { 00085 destroyAutoLoad(); 00086 } else { 00087 destroyAutoFlush(); 00088 } 00089 00090 joinThreads(); 00091 } 00092 00093 int Buffer::getType() { 00094 return INIT_WITH_CUSTOM_DATA; 00095 } 00096 00097 void Buffer::joinThreads() { 00098 pthread_join(threadId, NULL); 00099 if (isLogging) { 00100 isLogging = false; 00101 pthread_join(loggerThread, NULL); 00102 } 00103 } 00104 00105 bool Buffer::isDestroyed() { 00106 return destroyed; 00107 } 00108 00109 void Buffer::waitEmptyBuffer() { 00110 pthread_mutex_lock(&mutex); 00111 while (sizeUsed() > 0 && !destroyed) { 00112 fprintf (stderr, "Waiting empty buffer: %d\n", sizeUsed()); 00113 pthread_cond_wait (&emptyCond, &mutex); 00114 } 00115 pthread_mutex_unlock(&mutex); 00116 } 00117 00118 int Buffer::circularLoad(char *dst, int len) { 00119 00120 if (buffer_start+len < buffer_size) { 00121 if (dst != NULL) { 00122 memcpy(dst, buffer+buffer_start, len); 00123 } 00124 buffer_start += len; 00125 } else { 00126 if (dst != NULL) { 00127 memcpy(dst, buffer+buffer_start, buffer_size-buffer_start); 00128 memcpy(dst+(buffer_size-buffer_start), buffer, len-(buffer_size-buffer_start)); 00129 } 00130 buffer_start = len-(buffer_size-buffer_start); 00131 } 00132 if (sizeUsed() > 0) { 00133 pthread_cond_signal(¬FullCond); 00134 } else { 00135 pthread_cond_signal(&emptyCond); 00136 } 00137 return len; 00138 } 00139 00140 int Buffer::circularStore(const char *src, int len) { 00141 if (buffer_end+len < buffer_size) { 00142 memcpy(buffer+buffer_end, src, len); 00143 buffer_end += len; 00144 } else { 00145 memcpy(buffer+buffer_end, src, buffer_size-buffer_end); 00146 memcpy(buffer, src+(buffer_size-buffer_end), len - (buffer_size-buffer_end)); 00147 buffer_end = len - (buffer_size-buffer_end); 00148 } 00149 if (sizeUsed() > 0) { 00150 pthread_cond_signal(¬EmptyCond); 00151 } 00152 return len; 00153 } 00154 00155 int Buffer::sizeAvailable() { 00156 return buffer_max - sizeUsed(); 00157 } 00158 00159 int Buffer::sizeUsed() { 00160 if (buffer_end >= buffer_start) { 00161 return buffer_end - buffer_start; 00162 } else { 00163 return buffer_end - buffer_start + buffer_size; 00164 } 00165 } 00166 00167 int Buffer::read(cell_t* data, int nmemb) { 00168 return readBuffer((char*)data, sizeof(cell_t), nmemb); 00169 } 00170 00171 int Buffer::readBuffer(char* data, int size, int nmemb) 00172 { 00173 pthread_mutex_lock(&mutex); 00174 int size_total = size*nmemb; 00175 int size_left = size_total; 00176 while ((sizeUsed() < size_left) && !destroyed) { 00177 size_left -= circularLoad((char*)data+(size_total-size_left), sizeUsed()); 00178 float t0 = Timer::getGlobalTime(); 00179 tempBlockingReadTime = t0; 00180 pthread_cond_wait (¬EmptyCond, &mutex); 00181 tempBlockingReadTime = -1; 00182 float t1 = Timer::getGlobalTime(); 00183 blockingReadTime += (t1-t0); 00184 } 00185 if (!destroyed) { 00186 size_left -= circularLoad((char*)data+(size_total-size_left), size_left); 00187 } 00188 volatileUsage = sizeUsed(); 00189 if (bytesRead == 0 && inputBuffer) { 00190 pthread_cond_signal(&loggerCond); 00191 } 00192 bytesRead += (size_total-size_left); 00193 pthread_mutex_unlock(&mutex); 00194 00195 if (size_left != 0) fprintf(stderr, "readBuffer: diff len: %d %d\n", size_total-size_left, size_total); 00196 00197 return size_total-size_left; 00198 } 00199 00200 int Buffer::write(const cell_t* data, int nmemb) { 00201 return writeBuffer((char*)data, sizeof(cell_t), nmemb); 00202 } 00203 00204 int Buffer::writeBuffer(const char* data, int size, int nmemb) 00205 { 00206 pthread_mutex_lock(&mutex); 00207 int size_total = size*nmemb; 00208 int size_left = size_total; 00209 while ((sizeAvailable() < size_left) && !destroyed) { 00210 size_left -= circularStore(data+(size_total-size_left), sizeAvailable()); 00211 float t0 = Timer::getGlobalTime(); 00212 tempBlockingWriteTime = t0; 00213 pthread_cond_wait (¬FullCond, &mutex); 00214 tempBlockingWriteTime = -1; 00215 float t1 = Timer::getGlobalTime(); 00216 blockingWriteTime += (t1-t0); 00217 } 00218 if (!destroyed) { 00219 size_left -= circularStore(data+(size_total-size_left), size_left); 00220 } 00221 volatileUsage = sizeUsed(); 00222 if (bytesWritten == 0 && !inputBuffer) { 00223 pthread_cond_signal(&loggerCond); 00224 } 00225 bytesWritten += (size_total-size_left); 00226 pthread_mutex_unlock(&mutex); 00227 if (size_left != 0) fprintf(stderr, "writeBuffer: diff len: %d %d\n", size_total-size_left, size_total); 00228 00229 return size_total-size_left; 00230 } 00231 00232 void Buffer::autoFlush() { 00233 initAutoFlush(); 00234 createAutoFlushThread(); 00235 } 00236 00237 void Buffer::autoLoad() { 00238 initAutoLoad(); 00239 createAutoLoadThread(); 00240 } 00241 00242 00243 00244 void Buffer::createAutoLoadThread() { 00245 this->inputBuffer = true; 00246 int rc = pthread_create(&threadId, NULL, staticLoadThread, (void *)this); 00247 if (rc){ 00248 printf("ERROR; return code from pthread_create() is %d\n", rc); 00249 exit(-1); 00250 } 00251 } 00252 00253 00254 void Buffer::createAutoFlushThread() { 00255 this->inputBuffer = false; 00256 int rc = pthread_create(&threadId, NULL, staticFlushThread, (void *)this); 00257 if (rc){ 00258 printf("ERROR; return code from pthread_create() is %d\n", rc); 00259 exit(-1); 00260 } 00261 } 00262 00263 void *Buffer::staticFlushThread(void *arg) { 00264 Buffer* buffer = (Buffer*)arg; 00265 buffer->autoFlushThread(); 00266 return NULL; 00267 } 00268 00269 void *Buffer::staticLoadThread(void *arg) { 00270 Buffer* buffer = (Buffer*)arg; 00271 buffer->autoLoadThread(); 00272 return NULL; 00273 } 00274 00275 void *Buffer::staticLogThread(void *arg) { 00276 Buffer* buffer = (Buffer*)arg; 00277 buffer->logThread(); 00278 return NULL; 00279 } 00280 00281 int Buffer::getUsage() { 00282 return volatileUsage; 00283 } 00284 00285 int Buffer::getCapacity() { 00286 return buffer_max; 00287 } 00288 00289 float Buffer::getBlockingReadTime() { 00290 return blockingReadTime; 00291 } 00292 00293 float Buffer::getBlockingWriteTime() { 00294 return blockingWriteTime; 00295 } 00296 00297 int Buffer::getBytesWritten() { 00298 return bytesWritten; 00299 } 00300 00301 int Buffer::getBytesRead() { 00302 return bytesRead; 00303 } 00304 00305 buffer_statistics_t Buffer::getStatistics() { 00306 buffer_statistics_t stats; 00307 stats.time = Timer::getGlobalTime(); 00308 00309 pthread_mutex_lock(&mutex); 00310 if (inputBuffer) { 00311 stats.blockingTime = blockingReadTime; 00312 stats.bufferUsage = volatileUsage; 00313 stats.totalBytes = bytesRead; 00314 if (tempBlockingReadTime != -1) { 00315 stats.blockingTime += stats.time - tempBlockingReadTime; 00316 } 00317 } else { 00318 stats.blockingTime = blockingWriteTime; 00319 stats.bufferUsage = volatileUsage; 00320 stats.totalBytes = bytesWritten; 00321 if (tempBlockingWriteTime != -1) { 00322 stats.blockingTime += stats.time - tempBlockingWriteTime; 00323 } 00324 } 00325 pthread_mutex_unlock(&mutex); 00326 00327 return stats; 00328 } 00329 00330 void Buffer::logThread() { 00331 BufferLogger* logger = new BufferLogger(logFile); 00332 logger->logHeader(buffer_max); 00333 while (!isDestroyed()) { 00334 timeval event; 00335 gettimeofday(&event, NULL); 00336 00337 struct timespec time; 00338 time.tv_sec = event.tv_sec + logInterval; 00339 time.tv_nsec = event.tv_usec*1000; 00340 00341 pthread_mutex_lock(&mutex); 00342 pthread_cond_timedwait(&loggerCond, &mutex, &time); 00343 pthread_mutex_unlock(&mutex); 00344 00345 buffer_statistics_t stats; 00346 stats = getStatistics(); 00347 00348 logger->logBuffer(stats); 00349 } 00350 delete logger; 00351 } 00352 00353 void Buffer::setLogFile(string logFile, int interval) { 00354 this->logFile = logFile; 00355 this->logInterval = interval; 00356 if (this->logInterval < 1) { 00357 this->logInterval = 1; // minimum interval. 00358 } 00359 this->isLogging = true; 00360 int rc = pthread_create(&loggerThread, NULL, staticLogThread, (void *)this); 00361 if (rc){ 00362 fprintf(stderr, "setLogFile ERROR; return code from pthread_create() is %d\n", rc); 00363 exit(-1); 00364 } 00365 } 00366
1.7.6.1