MASA-Core
Buffer.cpp
Go to the documentation of this file.
00001 /*******************************************************************************
00002  *
00003  * Copyright (c) 2010-2015   Edans Sandes
00004  *
00005  * This file is part of MASA-Core.
00006  * 
00007  * MASA-Core is free software: you can redistribute it and/or modify
00008  * it under the terms of the GNU General Public License as published by
00009  * the Free Software Foundation, either version 3 of the License, or
00010  * (at your option) any later version.
00011  * 
00012  * MASA-Core is distributed in the hope that it will be useful,
00013  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00014  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00015  * GNU General Public License for more details.
00016  * 
00017  * You should have received a copy of the GNU General Public License
00018  * along with MASA-Core.  If not, see <http://www.gnu.org/licenses/>.
00019  *
00020  ******************************************************************************/
00021 
00022 #include "Buffer.hpp"
00023 
00024 #include <pthread.h>
00025 #include <stdio.h>
00026 #include <string.h>
00027 #include <stdlib.h>
00028 //#include <sys/time.h>
00029 
00030 #include "../Timer.hpp"
00031 #include <time.h>
00032 #include <unistd.h>
00033 #include "BufferLogger.hpp"
00034 
00035 Buffer::Buffer() {
00036     buffer_max = 8*1024*1024;
00037     buffer_size = buffer_max+5;
00038     buffer_start = 0;
00039     buffer_end = 0;
00040     buffer = (char*)malloc(buffer_size);
00041     destroyed = false;
00042         isLogging = false;
00043     
00044     pthread_mutex_init(&mutex, NULL);
00045     pthread_cond_init(&notFullCond, NULL);
00046     pthread_cond_init(&notEmptyCond, NULL);
00047     pthread_cond_init(&emptyCond, NULL);
00048     pthread_cond_init(&loggerCond, NULL);
00049 
00050         blockingReadTime = 0;
00051         blockingWriteTime = 0;
00052         volatileUsage = 0;
00053         bytesRead = 0;
00054         bytesWritten = 0;
00055 
00056         tempBlockingReadTime = -1;
00057         tempBlockingWriteTime = -1;
00058 
00059         inputBuffer = false;
00060 
00061     /*mutex = PTHREAD_MUTEX_INITIALIZER;
00062     notFullCond = PTHREAD_COND_INITIALIZER;
00063     notEmptyCond = PTHREAD_COND_INITIALIZER;
00064     emptyCond = PTHREAD_COND_INITIALIZER;*/
00065 }
00066 
00067 Buffer::~Buffer() {
00068     fprintf(stderr, "Destruct Buffer...\n");
00069     free(buffer);
00070 }
00071 
00072 void Buffer::destroy() {
00073     pthread_mutex_lock(&mutex);
00074 
00075     destroyed = true;
00076     pthread_cond_signal(&notFullCond);
00077     pthread_cond_signal(&notEmptyCond);
00078     pthread_cond_signal(&emptyCond);
00079     pthread_cond_signal(&loggerCond);
00080 
00081     pthread_mutex_unlock(&mutex);
00082 
00083 
00084     if (inputBuffer) {
00085         destroyAutoLoad();
00086     } else {
00087         destroyAutoFlush();
00088     }
00089 
00090     joinThreads();
00091 }
00092 
00093 int Buffer::getType() {
00094         return INIT_WITH_CUSTOM_DATA;
00095 }
00096 
00097 void Buffer::joinThreads() {
00098     pthread_join(threadId, NULL);
00099     if (isLogging) {
00100         isLogging = false;
00101         pthread_join(loggerThread, NULL);
00102     }
00103 }
00104 
00105 bool Buffer::isDestroyed() {
00106     return destroyed;
00107 }
00108 
00109 void Buffer::waitEmptyBuffer() {
00110     pthread_mutex_lock(&mutex);
00111         while (sizeUsed() > 0 && !destroyed) {
00112                 fprintf (stderr, "Waiting empty buffer: %d\n", sizeUsed());
00113         pthread_cond_wait (&emptyCond, &mutex);         
00114         }
00115     pthread_mutex_unlock(&mutex);
00116 }
00117 
00118 int Buffer::circularLoad(char *dst, int len) {
00119     
00120     if (buffer_start+len < buffer_size) {
00121         if (dst != NULL) {
00122                 memcpy(dst, buffer+buffer_start, len);
00123         }
00124         buffer_start += len;
00125     } else {
00126         if (dst != NULL) {
00127                 memcpy(dst, buffer+buffer_start, buffer_size-buffer_start);
00128                 memcpy(dst+(buffer_size-buffer_start), buffer, len-(buffer_size-buffer_start));
00129         }
00130         buffer_start = len-(buffer_size-buffer_start);
00131     }
00132     if (sizeUsed() > 0) {
00133         pthread_cond_signal(&notFullCond);
00134     } else {
00135         pthread_cond_signal(&emptyCond);
00136         }
00137     return len;
00138 }
00139 
00140 int Buffer::circularStore(const char *src, int len) {
00141     if (buffer_end+len < buffer_size) {
00142         memcpy(buffer+buffer_end, src, len);
00143         buffer_end += len;
00144     } else {
00145         memcpy(buffer+buffer_end, src, buffer_size-buffer_end);
00146         memcpy(buffer, src+(buffer_size-buffer_end), len - (buffer_size-buffer_end));
00147         buffer_end = len - (buffer_size-buffer_end);
00148     }
00149     if (sizeUsed() > 0) {
00150         pthread_cond_signal(&notEmptyCond);
00151     }
00152     return len;
00153 }
00154 
00155 int Buffer::sizeAvailable() {
00156     return buffer_max - sizeUsed();
00157 }
00158 
00159 int Buffer::sizeUsed() {
00160     if (buffer_end >= buffer_start) {
00161         return buffer_end - buffer_start;
00162     } else {
00163         return buffer_end - buffer_start + buffer_size;
00164     }
00165 }
00166 
00167 int Buffer::read(cell_t* data, int nmemb) {
00168         return readBuffer((char*)data, sizeof(cell_t), nmemb);
00169 }
00170 
00171 int Buffer::readBuffer(char* data, int size, int nmemb)
00172 {
00173     pthread_mutex_lock(&mutex);
00174     int size_total = size*nmemb;
00175     int size_left = size_total;
00176     while ((sizeUsed() < size_left) && !destroyed) {
00177         size_left -= circularLoad((char*)data+(size_total-size_left), sizeUsed());
00178         float t0 = Timer::getGlobalTime();
00179         tempBlockingReadTime = t0;
00180         pthread_cond_wait (&notEmptyCond, &mutex);
00181         tempBlockingReadTime = -1;
00182         float t1 = Timer::getGlobalTime();
00183         blockingReadTime += (t1-t0);
00184     }
00185     if (!destroyed) {
00186         size_left -= circularLoad((char*)data+(size_total-size_left), size_left);
00187     }
00188     volatileUsage = sizeUsed();
00189     if (bytesRead == 0 && inputBuffer) {
00190         pthread_cond_signal(&loggerCond);
00191     }
00192     bytesRead += (size_total-size_left);
00193     pthread_mutex_unlock(&mutex);
00194     
00195     if (size_left != 0) fprintf(stderr, "readBuffer: diff len: %d %d\n", size_total-size_left, size_total);
00196     
00197     return size_total-size_left;
00198 }
00199 
00200 int Buffer::write(const cell_t* data, int nmemb) {
00201         return writeBuffer((char*)data, sizeof(cell_t), nmemb);
00202 }
00203 
00204 int Buffer::writeBuffer(const char* data, int size, int nmemb)
00205 {
00206     pthread_mutex_lock(&mutex);
00207     int size_total = size*nmemb;
00208     int size_left = size_total;
00209     while ((sizeAvailable() < size_left) && !destroyed) {
00210         size_left -= circularStore(data+(size_total-size_left), sizeAvailable());
00211         float t0 = Timer::getGlobalTime();
00212         tempBlockingWriteTime = t0;
00213         pthread_cond_wait (&notFullCond, &mutex);
00214         tempBlockingWriteTime = -1;
00215         float t1 = Timer::getGlobalTime();
00216         blockingWriteTime += (t1-t0);
00217     }
00218     if (!destroyed) {
00219         size_left -= circularStore(data+(size_total-size_left), size_left);
00220     }
00221     volatileUsage = sizeUsed();
00222     if (bytesWritten == 0 && !inputBuffer) {
00223         pthread_cond_signal(&loggerCond);
00224     }
00225     bytesWritten += (size_total-size_left);
00226     pthread_mutex_unlock(&mutex);
00227     if (size_left != 0) fprintf(stderr, "writeBuffer: diff len: %d %d\n", size_total-size_left, size_total);
00228 
00229     return size_total-size_left;
00230 }
00231 
00232 void Buffer::autoFlush() {
00233     initAutoFlush();
00234     createAutoFlushThread();
00235 }
00236 
00237 void Buffer::autoLoad() {
00238     initAutoLoad();
00239     createAutoLoadThread();
00240 }
00241 
00242 
00243 
00244 void Buffer::createAutoLoadThread() {
00245         this->inputBuffer = true;
00246     int rc = pthread_create(&threadId, NULL, staticLoadThread, (void *)this);
00247     if (rc){
00248         printf("ERROR; return code from pthread_create() is %d\n", rc);
00249         exit(-1);
00250     }
00251 }
00252 
00253 
00254 void Buffer::createAutoFlushThread() {
00255         this->inputBuffer = false;
00256     int rc = pthread_create(&threadId, NULL, staticFlushThread, (void *)this);
00257     if (rc){
00258         printf("ERROR; return code from pthread_create() is %d\n", rc);
00259         exit(-1);
00260     }
00261 }
00262 
00263 void *Buffer::staticFlushThread(void *arg) {
00264     Buffer* buffer = (Buffer*)arg;
00265     buffer->autoFlushThread();
00266     return NULL;
00267 }
00268 
00269 void *Buffer::staticLoadThread(void *arg) {
00270     Buffer* buffer = (Buffer*)arg;
00271     buffer->autoLoadThread();
00272     return NULL;
00273 }
00274 
00275 void *Buffer::staticLogThread(void *arg) {
00276     Buffer* buffer = (Buffer*)arg;
00277     buffer->logThread();
00278     return NULL;
00279 }
00280 
00281 int Buffer::getUsage() {
00282         return volatileUsage;
00283 }
00284 
00285 int Buffer::getCapacity() {
00286         return buffer_max;
00287 }
00288 
00289 float Buffer::getBlockingReadTime() {
00290         return blockingReadTime;
00291 }
00292 
00293 float Buffer::getBlockingWriteTime() {
00294         return blockingWriteTime;
00295 }
00296 
00297 int Buffer::getBytesWritten() {
00298         return bytesWritten;
00299 }
00300 
00301 int Buffer::getBytesRead() {
00302         return bytesRead;
00303 }
00304 
00305 buffer_statistics_t Buffer::getStatistics() {
00306         buffer_statistics_t stats;
00307         stats.time = Timer::getGlobalTime();
00308 
00309     pthread_mutex_lock(&mutex);
00310         if (inputBuffer) {
00311                 stats.blockingTime = blockingReadTime;
00312                 stats.bufferUsage = volatileUsage;
00313                 stats.totalBytes = bytesRead;
00314                 if (tempBlockingReadTime != -1) {
00315                         stats.blockingTime += stats.time - tempBlockingReadTime;
00316                 }
00317         } else {
00318                 stats.blockingTime = blockingWriteTime;
00319                 stats.bufferUsage = volatileUsage;
00320                 stats.totalBytes = bytesWritten;
00321                 if (tempBlockingWriteTime != -1) {
00322                         stats.blockingTime += stats.time - tempBlockingWriteTime;
00323                 }
00324         }
00325     pthread_mutex_unlock(&mutex);
00326 
00327         return stats;
00328 }
00329 
00330 void Buffer::logThread() {
00331         BufferLogger* logger = new BufferLogger(logFile);
00332         logger->logHeader(buffer_max);
00333         while (!isDestroyed()) {
00334             timeval event;
00335             gettimeofday(&event, NULL);
00336 
00337                 struct timespec time;
00338                 time.tv_sec = event.tv_sec + logInterval;
00339                 time.tv_nsec = event.tv_usec*1000;
00340 
00341             pthread_mutex_lock(&mutex);
00342         pthread_cond_timedwait(&loggerCond, &mutex, &time);
00343             pthread_mutex_unlock(&mutex);
00344 
00345                 buffer_statistics_t stats;
00346                 stats = getStatistics();
00347 
00348                 logger->logBuffer(stats);
00349         }
00350         delete logger;
00351 }
00352 
00353 void Buffer::setLogFile(string logFile, int interval) {
00354         this->logFile = logFile;
00355         this->logInterval = interval;
00356         if (this->logInterval < 1) {
00357                 this->logInterval = 1; // minimum interval.
00358         }
00359         this->isLogging = true;
00360     int rc = pthread_create(&loggerThread, NULL, staticLogThread, (void *)this);
00361     if (rc){
00362         fprintf(stderr, "setLogFile ERROR; return code from pthread_create() is %d\n", rc);
00363         exit(-1);
00364     }
00365 }
00366