MASA-Core
Buffer2.cpp
Go to the documentation of this file.
00001 /*******************************************************************************
00002  *
00003  * Copyright (c) 2010-2015   Edans Sandes
00004  *
00005  * This file is part of MASA-Core.
00006  * 
00007  * MASA-Core is free software: you can redistribute it and/or modify
00008  * it under the terms of the GNU General Public License as published by
00009  * the Free Software Foundation, either version 3 of the License, or
00010  * (at your option) any later version.
00011  * 
00012  * MASA-Core is distributed in the hope that it will be useful,
00013  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00014  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00015  * GNU General Public License for more details.
00016  * 
00017  * You should have received a copy of the GNU General Public License
00018  * along with MASA-Core.  If not, see <http://www.gnu.org/licenses/>.
00019  *
00020  ******************************************************************************/
00021 
00022 #include "Buffer2.hpp"
00023 
00024 #include <pthread.h>
00025 #include <stdio.h>
00026 #include <string.h>
00027 #include <stdlib.h>
00028 #include <math.h>
00029 //#include <sys/time.h>
00030 
00031 #include "../Timer.hpp"
00032 #include <time.h>
00033 #include <unistd.h>
00034 #include "BufferLogger.hpp"
00035 
00036 #define DEBUG (0)
00037 
00038 Buffer2::Buffer2(int buffer_max) {
00039     this->buffer_max = buffer_max;
00040     buffer_size = buffer_max+5;
00041     buffer_start = 0;
00042     buffer_end = 0;
00043     buffer = (cell_t*)malloc(buffer_size*sizeof(cell_t));
00044     destroyed = false;
00045         isLogging = false;
00046     
00047     pthread_mutex_init(&mutex, NULL);
00048     pthread_cond_init(&notFullCond, NULL);
00049     pthread_cond_init(&notEmptyCond, NULL);
00050     pthread_cond_init(&emptyCond, NULL);
00051     pthread_cond_init(&loggerCond, NULL);
00052 
00053         stats.bufferUsage = 0;
00054         stats.totalReadBytes = 0;
00055         stats.blockingReadTime = 0;
00056         stats.totalWriteBytes = 0;
00057         stats.blockingWriteTime = 0;
00058 
00059         tempBlockingReadTime = -1;
00060         tempBlockingWriteTime = -1;
00061 
00062     /*mutex = PTHREAD_MUTEX_INITIALIZER;
00063     notFullCond = PTHREAD_COND_INITIALIZER;
00064     notEmptyCond = PTHREAD_COND_INITIALIZER;
00065     emptyCond = PTHREAD_COND_INITIALIZER;*/
00066 }
00067 
00068 Buffer2::~Buffer2() {
00069     fprintf(stderr, "Destruct Buffer...\n");
00070     free(buffer);
00071 }
00072 
00073 void Buffer2::destroy() {
00074         fprintf(stderr, "Buffer2::destroy()...\n");
00075     pthread_mutex_lock(&mutex);
00076 
00077     destroyed = true;
00078     pthread_cond_signal(&notFullCond);
00079     pthread_cond_signal(&notEmptyCond);
00080     pthread_cond_signal(&emptyCond);
00081     pthread_cond_signal(&loggerCond);
00082 
00083     pthread_mutex_unlock(&mutex);
00084 
00085     if (isLogging) {
00086         isLogging = false;
00087         pthread_join(loggerThread, NULL);
00088     }
00089         fprintf(stderr, "Buffer2::destroy() DONE\n");
00090 }
00091 
00092 bool Buffer2::isDestroyed() {
00093     return destroyed;
00094 }
00095 
00096 void Buffer2::waitEmptyBuffer() {
00097     pthread_mutex_lock(&mutex);
00098         while (sizeUsed() > 0 && !destroyed) {
00099                 fprintf (stderr, "Waiting empty buffer: %d\n", sizeUsed());
00100         pthread_cond_wait (&emptyCond, &mutex);         
00101         }
00102     pthread_mutex_unlock(&mutex);
00103 }
00104 
00105 int Buffer2::circularLoad(cell_t* dst, int len) {
00106     
00107     if (buffer_start+len < buffer_size) {
00108         memcpy(dst, buffer+buffer_start, len*sizeof(cell_t));
00109         buffer_start += len;
00110     } else {
00111         memcpy(dst, buffer+buffer_start, (buffer_size-buffer_start)*sizeof(cell_t));
00112         memcpy(dst+(buffer_size-buffer_start), buffer, (len-(buffer_size-buffer_start))*sizeof(cell_t));
00113         buffer_start = len-(buffer_size-buffer_start);
00114     }
00115     if (sizeUsed() > 0) {
00116         pthread_cond_signal(&notFullCond);
00117     } else {
00118         pthread_cond_signal(&emptyCond);
00119         }
00120     return len;
00121 }
00122 
00123 int Buffer2::circularSkip(int len) {
00124 
00125     if (buffer_start+len < buffer_size) {
00126         buffer_start += len;
00127     } else {
00128         buffer_start = len-(buffer_size-buffer_start);
00129     }
00130     if (sizeUsed() > 0) {
00131         pthread_cond_signal(&notFullCond);
00132     } else {
00133         pthread_cond_signal(&emptyCond);
00134         }
00135     return len;
00136 }
00137 
00138 
00139 int Buffer2::circularStore(const cell_t* src, int len) {
00140     if (buffer_end+len < buffer_size) {
00141         memcpy(buffer+buffer_end, src, len*sizeof(cell_t));
00142         buffer_end += len;
00143     } else {
00144         memcpy(buffer+buffer_end, src, (buffer_size-buffer_end)*sizeof(cell_t));
00145         memcpy(buffer, src+(buffer_size-buffer_end), (len - (buffer_size-buffer_end))*sizeof(cell_t));
00146         buffer_end = len - (buffer_size-buffer_end);
00147     }
00148     if (sizeUsed() > 0) {
00149         pthread_cond_signal(&notEmptyCond);
00150     }
00151     return len;
00152 }
00153 
00154 int Buffer2::sizeAvailable() {
00155     return buffer_max - sizeUsed();
00156 }
00157 
00158 int Buffer2::sizeUsed() {
00159     if (buffer_end >= buffer_start) {
00160         return buffer_end - buffer_start;
00161     } else {
00162         return buffer_end - buffer_start + buffer_size;
00163     }
00164 }
00165 
00166 int Buffer2::readBuffer(cell_t* data, int nmemb)
00167 {
00168         pthread_mutex_lock(&mutex);
00169         if (DEBUG) printf("Buffer2::readBuffer(%d) - buf: %d\n", nmemb, sizeUsed());
00170     int size_total = nmemb;
00171     int size_left = size_total;
00172     while ((sizeUsed() < size_left) && !destroyed) {
00173         if (data == NULL) {
00174                 size_left -= circularSkip(sizeUsed());
00175         } else {
00176                 size_left -= circularLoad(data+(size_total-size_left), sizeUsed());
00177         }
00178         float t0 = Timer::getGlobalTime();
00179         tempBlockingReadTime = t0;
00180         pthread_cond_wait (&notEmptyCond, &mutex);
00181         tempBlockingReadTime = -1;
00182         float t1 = Timer::getGlobalTime();
00183         stats.blockingReadTime += (t1-t0);
00184     }
00185     if (!destroyed) {
00186         if (data == NULL) {
00187                 size_left -= circularSkip(size_left);
00188         } else {
00189                 size_left -= circularLoad(data+(size_total-size_left), size_left);
00190         }
00191     }
00192     stats.bufferUsage = sizeUsed();
00193     if (stats.totalReadBytes == 0/* && inputBuffer*/) {
00194         pthread_cond_signal(&loggerCond);
00195     }
00196     stats.totalReadBytes += (size_total-size_left);
00197     pthread_mutex_unlock(&mutex);
00198     
00199     if (size_left != 0) fprintf(stderr, "readBuffer: diff len: %d %d\n", size_total-size_left, size_total);
00200     
00201     return size_total-size_left;
00202 }
00203 
00204 int Buffer2::writeBuffer(const cell_t* data, int nmemb)
00205 {
00206     pthread_mutex_lock(&mutex);
00207         if (DEBUG) printf("Buffer2::writeBuffer(%d) - buf: %d\n", nmemb, sizeUsed());
00208     int size_total = nmemb;
00209     int size_left = size_total;
00210     while ((sizeAvailable() < size_left) && !destroyed) {
00211         size_left -= circularStore(data+(size_total-size_left), sizeAvailable());
00212         float t0 = Timer::getGlobalTime();
00213         tempBlockingWriteTime = t0;
00214         pthread_cond_wait (&notFullCond, &mutex);
00215         tempBlockingWriteTime = -1;
00216         float t1 = Timer::getGlobalTime();
00217         stats.blockingWriteTime += (t1-t0);
00218     }
00219     if (!destroyed) {
00220         size_left -= circularStore(data+(size_total-size_left), size_left);
00221     }
00222     stats.bufferUsage = sizeUsed();
00223     if (stats.totalWriteBytes == 0/* && !inputBuffer*/) {
00224         pthread_cond_signal(&loggerCond);
00225     }
00226     stats.totalWriteBytes += (size_total-size_left);
00227     pthread_mutex_unlock(&mutex);
00228     if (size_left != 0) fprintf(stderr, "writeBuffer: diff len: %d %d\n", size_total-size_left, size_total);
00229 
00230     return size_total-size_left;
00231 }
00232 
00233 void *Buffer2::staticLogThread(void *arg) {
00234     Buffer2* buffer = (Buffer2*)arg;
00235     buffer->logThread();
00236     return NULL;
00237 }
00238 
00239 int Buffer2::getCapacity() {
00240         return buffer_max;
00241 }
00242 
00243 buffer2_statistics_t Buffer2::getStatistics() {
00244         buffer2_statistics_t stats = this->stats;
00245         stats.time = Timer::getGlobalTime();
00246 
00247     pthread_mutex_lock(&mutex);
00248         if (tempBlockingReadTime != -1) {
00249                 stats.blockingReadTime += stats.time - tempBlockingReadTime;
00250         }
00251         if (tempBlockingWriteTime != -1) {
00252                 stats.blockingWriteTime += stats.time - tempBlockingWriteTime;
00253         }
00254     pthread_mutex_unlock(&mutex);
00255 
00256         return stats;
00257 }
00258 
00259 void Buffer2::logThread() {
00260         BufferLogger* logger = new BufferLogger(logFile);
00261         logger->logHeader(buffer_max);
00262         while (!isDestroyed()) {
00263             timeval event;
00264             gettimeofday(&event, NULL);
00265 
00266                 struct timespec time;
00267                 time.tv_sec = event.tv_sec + (int)logInterval;
00268                 time.tv_nsec = event.tv_usec*1000 + (int)((logInterval - floor(logInterval))*1000000000);
00269                 if (time.tv_nsec >= 1000000000) {
00270                         time.tv_nsec -= 1000000000;
00271                         time.tv_sec++;
00272                 }
00273 
00274             pthread_mutex_lock(&mutex);
00275         pthread_cond_timedwait(&loggerCond, &mutex, &time);
00276             pthread_mutex_unlock(&mutex);
00277 
00278                 buffer2_statistics_t stats;
00279                 stats = getStatistics();
00280 
00281                 logger->logBuffer(stats);
00282         }
00283         delete logger;
00284 }
00285 
00286 void Buffer2::setLogFile(string logFile, float interval) {
00287         this->logFile = logFile;
00288         this->logInterval = interval;
00289         if (this->logInterval < 0.1f) {
00290                 this->logInterval = 0.1f; // minimum interval.
00291         }
00292         this->isLogging = true;
00293     int rc = pthread_create(&loggerThread, NULL, staticLogThread, (void *)this);
00294     if (rc){
00295         fprintf(stderr, "setLogFile ERROR; return code from pthread_create() is %d\n", rc);
00296         exit(-1);
00297     }
00298 }
00299