MASA-Core
AlignerPool.cpp
Go to the documentation of this file.
00001 /*******************************************************************************
00002  *
00003  * Copyright (c) 2010-2015   Edans Sandes
00004  *
00005  * This file is part of MASA-Core.
00006  * 
00007  * MASA-Core is free software: you can redistribute it and/or modify
00008  * it under the terms of the GNU General Public License as published by
00009  * the Free Software Foundation, either version 3 of the License, or
00010  * (at your option) any later version.
00011  * 
00012  * MASA-Core is distributed in the hope that it will be useful,
00013  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00014  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00015  * GNU General Public License for more details.
00016  * 
00017  * You should have received a copy of the GNU General Public License
00018  * along with MASA-Core.  If not, see <http://www.gnu.org/licenses/>.
00019  *
00020  ******************************************************************************/
00021 
00022 #include "AlignerPool.hpp"
00023 #include <stdio.h>
00024 #include <stdlib.h>
00025 #include <unistd.h>
00026 #include <sys/file.h>
00027 
00028 AlignerPool::AlignerPool(string sharedPath) {
00029         this->sharedPath = sharedPath;
00030         this->crosspointIdSentCounter = 0;
00031         this->crosspointIdRecvCounter = 0;
00032         this->bestNodeScore.score = -INF;
00033         this->bestNodeScore.i = -1;
00034         this->bestNodeScore.j = -1;
00035 }
00036 
00037 AlignerPool::~AlignerPool() {
00038 
00039 }
00040 
00041 void AlignerPool::waitId(int id) {
00042         string filename = getMsgFile("stage1", id);
00043         waitSignal(filename);
00044 }
00045 
00046 void AlignerPool::dispatchScore(score_t score) {
00047         string filename = getMsgFile("stage1", right);
00048 
00049         FILE* file = fopen(filename.c_str(), "wt");
00050         fprintf(file, "%d %d %d\n", score.i, score.j, score.score);
00051         fclose(file);
00052 
00053         sendSignal(filename);
00054 
00055 }
00056 
00057 score_t AlignerPool::receiveScore() {
00058         score_t score;
00059 
00060         string filename = getMsgFile("stage1", left);
00061         waitSignal(filename);
00062 
00063         FILE* file = fopen(filename.c_str(), "rt");
00064         fscanf(file, "%d %d %d\n", &score.i, &score.j, &score.score);
00065         fclose(file);
00066         return score;
00067 
00068 }
00069 
00070 void AlignerPool::dispatchCrosspoint(crosspoint_t crosspoint, int final) {
00071         string filename = getMsgFile("stage2", left, crosspointIdSentCounter++);
00072 
00073         FILE* file = fopen(filename.c_str(), "wt");
00074         fprintf(file, "%d %d %d %d %d\n", crosspoint.i, crosspoint.j, crosspoint.score, crosspoint.type, final);
00075         fclose(file);
00076 
00077         sendSignal(filename);
00078 }
00079 
00080 crosspoint_t AlignerPool::receiveCrosspoint(int* final) {
00081         crosspoint_t c;
00082         bool _final;
00083 
00084         string filename = getMsgFile("stage2", right, crosspointIdRecvCounter++);
00085         waitSignal(filename);
00086 
00087         while (peekSignal(getMsgFile("stage2", right, crosspointIdRecvCounter))) {
00088                 filename = getMsgFile("stage2", right, crosspointIdRecvCounter++);
00089         }
00090 
00091         FILE* file = fopen(filename.c_str(), "rt");
00092         fscanf(file, "%d %d %d %d %d\n", &c.i, &c.j, &c.score, &c.type, &_final);
00093         fclose(file);
00094 
00095         if (final != NULL) {
00096                 *final = _final;
00097         }
00098 
00099         return c;
00100 }
00101 
00102 
00103 void AlignerPool::dispatchCrosspointFile(CrosspointsFile* file) {
00104         string filename = getMsgFile("stage4", left);
00105 
00106         file->writeToFile(filename.c_str());
00107         sendSignal(filename);
00108 
00109 }
00110 
00111 CrosspointsFile* AlignerPool::receiveCrosspointFile() {
00112         string filename = getMsgFile("stage4", right);
00113         waitSignal(filename);
00114 
00115         CrosspointsFile* crosspoints = new CrosspointsFile(filename);
00116         crosspoints->loadCrosspoints();
00117 
00118         return crosspoints;
00119 }
00120 
00121 void AlignerPool::registerNode(int id, int left, int right, string flushURL) {
00122         string filename = getMsgFile("register", id);
00123         this->left = left;
00124         this->right = right;
00125 
00126         printf("%d: %d,%d\n", id, left, right);
00127         FILE* file = fopen(filename.c_str(), "wt");
00128         fprintf(file, "%s\n", flushURL.c_str());
00129         fclose(file);
00130 
00131         sendSignal(filename);
00132 }
00133 
00134 string AlignerPool::getLoadURL(int id) {
00135         string filename = getMsgFile("register", id);
00136         waitSignal(filename);
00137 
00138         FILE* file = fopen(filename.c_str(), "rt");
00139         char url[200];
00140         fscanf(file, "%s\n", url);
00141         fclose(file);
00142 
00143         return string(url);
00144 }
00145 
00146 
00147 
00148 void AlignerPool::initialize() {
00149 
00150 }
00151 
00152 string AlignerPool::getMsgFile(string prefix, int id, int count) {
00153         char str[100];
00154         if (count == -1) {
00155                 sprintf(str, ".%08X", id);
00156         } else {
00157                 sprintf(str, ".%08X.%02d", id, count);
00158         }
00159         return this->sharedPath + "/" + prefix + string(str);
00160 }
00161 
00162 string AlignerPool::getSignalFile(string msgFile) {
00163         return msgFile + ".signal";
00164 }
00165 
00166 void AlignerPool::sendSignal(string msgFile) {
00167         string signalFile = getSignalFile(msgFile);
00168         FILE* file = fopen(signalFile.c_str(), "wt");
00169         fclose(file);
00170         sync();
00171         printf("[%d] Signal Sent: %s\n", getpid(), signalFile.c_str());
00172 }
00173 
00174 bool AlignerPool::isFirstNode() {
00175         return left == -1;
00176 }
00177 
00178 bool AlignerPool::isLastNode() {
00179         return right == -1;
00180 }
00181 
00182 const score_t& AlignerPool::getBestNodeScore() const {
00183         return bestNodeScore;
00184 }
00185 
00186 void AlignerPool::setBestNodeScore(const score_t& bestNodeScore) {
00187         this->bestNodeScore = bestNodeScore;
00188 }
00189 
00190 bool AlignerPool::peekSignal(string msgFile) {
00191         string signalFile = getSignalFile(msgFile);
00192         FILE* file = fopen(signalFile.c_str(), "rt");
00193         bool signalOk = false;
00194         if (file != NULL) {
00195                 signalOk = true;
00196                 fclose(file);
00197         }
00198         return signalOk;
00199 }
00200 
00201 void AlignerPool::waitSignal(string msgFile) {
00202         int count = 0;
00203         bool signalOk = false;
00204         while (!signalOk) {
00205                 if (count % 1000 == 0) {
00206                         printf("[%d] Waiting Signal for msg: %s\n", getpid(), msgFile.c_str());
00207                 }
00208                 signalOk = peekSignal(msgFile);
00209                 if (!signalOk) {
00210                         usleep(10000);
00211                         count++;
00212                 }
00213         }
00214         printf("[%d] Signal Received for msg: %s\n", getpid(), msgFile.c_str());
00215 }
00216