MASA-Core
Buffer.hpp
Go to the documentation of this file.
00001 /*******************************************************************************
00002  *
00003  * Copyright (c) 2010-2015   Edans Sandes
00004  *
00005  * This file is part of MASA-Core.
00006  * 
00007  * MASA-Core is free software: you can redistribute it and/or modify
00008  * it under the terms of the GNU General Public License as published by
00009  * the Free Software Foundation, either version 3 of the License, or
00010  * (at your option) any later version.
00011  * 
00012  * MASA-Core is distributed in the hope that it will be useful,
00013  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00014  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00015  * GNU General Public License for more details.
00016  * 
00017  * You should have received a copy of the GNU General Public License
00018  * along with MASA-Core.  If not, see <http://www.gnu.org/licenses/>.
00019  *
00020  ******************************************************************************/
00021 
00022 #ifndef BUFFER_H
00023 #define BUFFER_H
00024 
00025 #include <pthread.h>
00026 #include <string>
00027 using namespace std;
00028 
00029 #include "../io/CellsReader.hpp"
00030 #include "../io/CellsWriter.hpp"
00031 
00032 struct buffer_statistics_t {
00033         float time;
00034         int bufferUsage;
00035         int totalBytes;
00036         float blockingTime;
00037 
00038         const buffer_statistics_t operator-(const buffer_statistics_t &other) const {
00039                 buffer_statistics_t ret;
00040                 ret.time = this->time - other.time;
00041                 ret.bufferUsage = this->bufferUsage - other.bufferUsage;
00042                 ret.totalBytes = this->totalBytes - other.totalBytes;
00043                 ret.blockingTime = this->blockingTime - other.blockingTime;
00044                 return ret;
00045         }
00046 };
00047 
00048 
00049 
00050 class Buffer : public CellsReader, public CellsWriter
00051 {
00052 public:
00053         Buffer();
00054         virtual ~Buffer();
00055         
00056         virtual int read(cell_t* data, int nmemb);
00057         virtual int write(const cell_t* data, int nmemb);
00058         virtual int getType();
00059 
00060         int readBuffer(char* data, int size, int nmemb);
00061         int writeBuffer(const char* data, int size, int nmemb);
00062         void waitEmptyBuffer();
00063         
00064         void destroy();
00065         bool isDestroyed();
00066 
00067     virtual void autoFlushThread() = 0;
00068     virtual void autoLoadThread() = 0;
00069     
00070     virtual void initAutoFlush() = 0;
00071     virtual void initAutoLoad() = 0;
00072 
00073     /**
00074      * This method is called prior to the auto-load thread destruction.
00075      * See the Buffer::destroyAutoLoad() method description.
00076      */
00077     virtual void destroyAutoFlush() = 0;
00078 
00079     /**
00080      * This method is called prior to the auto-load thread destruction.
00081      * For example, if the auto-load thread is waiting for a socket input,
00082      * then the destroyAutoLoad must destroy the socket in order to wake
00083      * the thread execution. Note that during and after this method is called,
00084      * the Buffer::isDestroyed() method will always return true.
00085      */
00086     virtual void destroyAutoLoad() = 0;
00087     
00088         void autoFlush();
00089         void autoLoad();
00090         
00091         /**
00092          * Use this method only for statistics purpose, since the returned value
00093          * is volatile and not synchronized (without mutex).
00094          * @return the number of bytes in the buffer.
00095          */
00096         int getUsage();
00097 
00098         /**
00099          * Returns the maximum capacity in bytes of the buffer.
00100          * @return The size of the buffer in bytes.
00101          */
00102         int getCapacity();
00103 
00104         /**
00105          * Returns the number of milliseconds that was blocked waiting for data.
00106          * @return blocking time in readBuffer method.
00107          */
00108         float getBlockingReadTime();
00109 
00110         /**
00111          * Returns the number of milliseconds that was blocked waiting for space.
00112          * @return blocking time in writeBuffer method.
00113          */
00114         float getBlockingWriteTime();
00115 
00116         /**
00117          * Returns the amount of bytes written in the buffer since its creation.
00118          * @return the number of bytes written.
00119          */
00120         int getBytesWritten();
00121 
00122         /**
00123          * Returns the amount of bytes read from the buffer since its creation.
00124          * @return the number of bytes read.
00125          */
00126         int getBytesRead();
00127 
00128         /**
00129          * Returns a struct containing statistics of the buffer.
00130          * @return statistics of the buffer.
00131          */
00132         buffer_statistics_t getStatistics();
00133 
00134         /**
00135          * Defines a logfile to receive the buffer statistics in each
00136          * interval period.
00137          *
00138          * @param logFile the filename of the log file.
00139          * @param interval The period in seconds to log the statistics.
00140          */
00141     void setLogFile(string logFile, int interval);
00142 
00143         private:
00144                 float blockingReadTime;
00145                 float blockingWriteTime;
00146                 int volatileUsage;
00147                 int bytesRead;
00148                 int bytesWritten;
00149 
00150         float tempBlockingWriteTime;
00151         float tempBlockingReadTime;
00152 
00153 
00154                 pthread_mutex_t mutex;
00155                 pthread_cond_t notFullCond;
00156                 pthread_cond_t notEmptyCond;
00157                 pthread_cond_t emptyCond;
00158                 pthread_cond_t loggerCond;
00159                 
00160                 bool destroyed;
00161                 
00162                 char* buffer;
00163                 int buffer_size;
00164                 int buffer_start;
00165                 int buffer_end;
00166                 int buffer_max;
00167 
00168         pthread_t threadId;
00169         bool inputBuffer;
00170         
00171         pthread_t loggerThread;
00172         string logFile;
00173         bool isLogging;
00174         int logInterval; // in seconds
00175 
00176                 int circularLoad(char *dst, int len);
00177                 int circularStore(const char *src, int len);
00178                 int sizeUsed();
00179                 int sizeAvailable();
00180 
00181         static void* staticFlushThread(void *arg);
00182         static void* staticLoadThread(void *arg);
00183         static void* staticLogThread(void *arg);
00184 
00185         void createAutoLoadThread();
00186         void createAutoFlushThread();
00187             void logThread();
00188 
00189                 void joinThreads();
00190 };
00191 
00192 #endif // BUFFER_H