MASA-Core
Job.cpp
Go to the documentation of this file.
00001 /*******************************************************************************
00002  *
00003  * Copyright (c) 2010-2015   Edans Sandes
00004  *
00005  * This file is part of MASA-Core.
00006  * 
00007  * MASA-Core is free software: you can redistribute it and/or modify
00008  * it under the terms of the GNU General Public License as published by
00009  * the Free Software Foundation, either version 3 of the License, or
00010  * (at your option) any later version.
00011  * 
00012  * MASA-Core is distributed in the hope that it will be useful,
00013  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00014  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00015  * GNU General Public License for more details.
00016  * 
00017  * You should have received a copy of the GNU General Public License
00018  * along with MASA-Core.  If not, see <http://www.gnu.org/licenses/>.
00019  *
00020  ******************************************************************************/
00021 
00022 #include "Job.hpp"
00023 
00024 #include <iostream>
00025 #include <sstream>
00026 using namespace std;
00027 
00028 #include <stdio.h>
00029 #include <string.h>
00030 #include <algorithm>
00031 #include <sys/stat.h>
00032 #include <errno.h>
00033 #include <sys/types.h>
00034 #include <wordexp.h>
00035 #include "Properties.hpp"
00036 #include "SpecialRowWriter.hpp"
00037 #include "exceptions/exceptions.hpp"
00038 
00039 #define DEBUG (0)
00040 
00041 #define SRA_DECAY (1.0f)
00042 
00043 Job::Job(int sequencesCount) {
00044     this->alignment_params = new AlignmentParams();
00045     this->alignment = NULL;
00046     this->alignerPool = NULL;
00047     //this->alignment = new Alignment(alignment_params);
00048     this->special_rows_path = "";
00049     this->aligner = NULL;
00050     this->flushIntervals = NULL;
00051     this->maxFlushDeep = 20;
00052     this->pool_wait_id = -1;
00053     this->bufferLimit = 0;
00054 }
00055 
00056 Job::~Job() {
00057         //delete alignment;
00058         delete alignment_params;
00059         clearSpecialRowsAreas();
00060 }
00061 
00062 int Job::initialize() {
00063     initializeWorkPath();
00064         SequenceInfo* seq0 = alignment_params->getSequence(0)->getInfo();
00065         SequenceInfo* seq1 = alignment_params->getSequence(1)->getInfo();
00066 
00067         calculateFlushIntervals(maxFlushDeep, getSRALimit(), seq0->getSize(), seq1->getSize());
00068 
00069     Properties prop;
00070     if (prop.initialize(this->info_filename.c_str())) {
00071         string file0 = prop.get_property("seq0");
00072         string file1 = prop.get_property("seq1");
00073         int file0_mismatch = (file0.compare(seq0->getDescription()));
00074         int file1_mismatch = (file1.compare(seq1->getDescription()));
00075         if (file0_mismatch || file1_mismatch) {
00076             printf("Sequence mismatch from previous run. Try cleaning work directory (--clean)\n");
00077         }
00078         if (file0_mismatch) {
00079             printf("%s != %s\n", file0.c_str(), seq0->getDescription().c_str());
00080         }
00081         if (file1_mismatch) {
00082             printf("%s != %s\n", file1.c_str(), seq1->getDescription().c_str());
00083         }
00084         if (file0_mismatch || file1_mismatch) {
00085             return 0;  // TODO retornar excecao
00086         }
00087     } else {
00088                 FILE* f = fopen(this->info_filename.c_str(), "wt");
00089         fprintf(f, "seq0=%s\n", seq0->getDescription().c_str());
00090         fprintf(f, "seq1=%s\n", seq1->getDescription().c_str());
00091         fclose(f);
00092     }
00093 
00094     //cout << this->phase << endl;
00095     //cout << this->progress << endl;
00096     cout << this->crosspoints_path << endl;
00097     cout << this->special_rows_path << endl;
00098 
00099     // TODO poderia ficar fora da class Job?
00100         aligner->initialize();
00101 
00102     if (getAlignerPool() != NULL) {
00103                 int j0 = getAlignmentParams()->getSequence(1)->getTrimStart()-1;
00104                 int j1 = getAlignmentParams()->getSequence(1)->getTrimEnd();
00105                 int jj0 = getAlignmentParams()->getSequence(1)->getModifiers()->getTrimStart()-1;
00106                 int jj1 = getAlignmentParams()->getSequence(1)->getModifiers()->getTrimEnd();
00107 
00108                 getAlignerPool()->registerNode(j0, j0==jj0?-1:j0, j1==jj1?-1:j1, flush_column_url);
00109 
00110 //              if (/*load_column_url.length() == 0 &&*/ j0 != jj0) {
00111 //                      load_column_url = getAlignerPool()->getLoadURL(j0);
00112 //              }
00113     }
00114 
00115     return 1; // TODO remover quando for retornar excecao
00116 }
00117 
00118 void Job::initializeWorkPath() {
00119     if (this->pool_shared_path.length() == 0) {
00120         this->pool_shared_path = work_path + "/shared";
00121     }
00122 
00123     if (aligner->getParameters()->getForkId() != NOT_FORKED_INSTANCE) {
00124         createPath(this->work_path);
00125 
00126         char suffix[20];
00127         sprintf ( suffix, "/FORK.%02d", aligner->getParameters()->getForkId());
00128         work_path = work_path + "/" + suffix;
00129     }
00130     fprintf(stderr, "Work Path: %s\n", work_path.c_str());
00131 
00132         this->dump_pruning_text_filename = work_path + "/pruning_dump.txt";
00133         this->outputBufferLogFile = work_path + "/outputBuffer.log";
00134         this->inputBufferLogFile = work_path + "/inputBuffer.log";
00135     this->crosspoints_path = work_path + "/crosspoints";
00136     if (this->special_rows_path.length() == 0) {
00137         this->special_rows_path = work_path + "/special_rows";
00138     }
00139     this->info_filename = work_path + "/info";
00140     this->status_filename = work_path + "/status";
00141 
00142         createPath(this->work_path);
00143     createPath(this->crosspoints_path);
00144     createPath(this->special_rows_path);
00145     createPath(this->pool_shared_path);
00146 }
00147 
00148 void Job::setWorkPath(string workPath) {
00149     this->work_path = resolve_env(workPath);
00150 }
00151 
00152 void Job::setSpecialRowsPath(string specialRowsPath) {
00153         this->special_rows_path = resolve_env(specialRowsPath);
00154 }
00155 
00156 void Job::setSharedPath(string sharedPath) {
00157     this->pool_shared_path = resolve_env(sharedPath);
00158 }
00159 
00160 int Job::getSequenceCount() const {
00161         return sequences.size();
00162 }
00163 
00164 void Job::addSequence(Sequence* sequence) {
00165         sequences.push_back(sequence);
00166 }
00167 
00168 Sequence* Job::getSequence(int index) {
00169         return sequences[index];
00170 }
00171 
00172 AlignmentParams* Job::getAlignmentParams() const {
00173         return alignment_params;
00174 }
00175 
00176 Alignment* Job::getAlignment() const {
00177         return alignment;
00178 }
00179 
00180 void Job::setAlignment(Alignment* alignment) {
00181         this->alignment = alignment;
00182 }
00183 
00184 void Job::loadSequenceData(Sequence* sequence) {
00185         for (int i=0; i<sequences.size(); i++) {
00186                 if (sequence->getInfo()->isEqual(sequences[i]->getInfo())) {
00187                         sequence->copyData(sequences[i]);
00188                 }
00189         }
00190 }
00191 
00192 string Job::getCrosspointFile(int stage, int id, int deep) {
00193     char str[500];
00194     if (deep <= -1) {
00195         sprintf(str, "%s/crosspoint_%02d.%02d", crosspoints_path.c_str(), stage, id);
00196     } else {
00197         sprintf(str, "%s/crosspoint_%02d.%02d.r%02d", crosspoints_path.c_str(), stage, id, deep);
00198     }
00199     return str;
00200 }
00201 
00202 string Job::getSpecialRowsPath(int stage, int id, int deep) {
00203     char str[500];
00204     if (deep <= -1) {
00205         sprintf(str, "%s/stage.%02d.%02d", special_rows_path.c_str(), stage, id);
00206     } else {
00207         sprintf(str, "%s/stage.%02d.%02d.r%02d", special_rows_path.c_str(), stage, id, deep);
00208     }
00209     createPath(str);
00210     return string(str);
00211 }
00212 
00213 void Job::clearSpecialRowsArea(SpecialRowsArea** area) {
00214         string name = (*area)->getDirectory();
00215         //printf("Deleting %s: %p\n", name.c_str(), *area);
00216         specialRowsAreas.erase(name);
00217         delete *area;
00218         *area = NULL;
00219 }
00220 
00221 AlignerPool* Job::getAlignerPool() {
00222         if (alignerPool == NULL) {
00223                 if (flush_column_url.length() > 0 || load_column_url.length() > 0) {
00224                         alignerPool = new AlignerPool(pool_shared_path);
00225                         alignerPool->initialize();
00226                 }
00227         }
00228         return alignerPool;
00229 }
00230 
00231 void Job::calculateFlushIntervals(int max_deep, long long limit, int seq0_len, int seq1_len) {
00232         if (flushIntervals != NULL) {
00233                 delete flushIntervals;
00234         }
00235         flushIntervals = new int[max_deep];
00236         if (limit < seq1_len*sizeof(cell_t)*2) {
00237                 limit = seq1_len*sizeof(cell_t)*2;
00238         }
00239         if (DEBUG) printf("calculate_intervals: limit: %d\n", limit);
00240 
00241         flushIntervals[0] = (int)(((long long)seq0_len)*seq1_len*sizeof(cell_t)/limit + 1);
00242         flushIntervals[1] = (int)(((long long)flushIntervals[0])*seq1_len*sizeof(cell_t)/(limit/SRA_DECAY) + 1);
00243 
00244         long long interval = flushIntervals[1];
00245 
00246     for (int i=2; i<max_deep; i++) {
00247         flushIntervals[i] = (int)(((long long)flushIntervals[i-1])*(i%2==0?seq0_len:seq1_len)*sizeof(cell_t)/(limit/SRA_DECAY) + 1);
00248         if (flushIntervals[i] > flushIntervals[i-2]/2) {
00249                 flushIntervals[i] = flushIntervals[i-2]/2; // Ensure that each even step decreases at least twice
00250         }
00251     }
00252     if (DEBUG) {
00253                 for (int i=0; i<max_deep; i++) {
00254                         fprintf(stderr, "Interval: %d\n", flushIntervals[i]);
00255                 }
00256     }
00257 }
00258 
00259 long long Job::getFlushInterval(int step) {
00260         if (step < maxFlushDeep) {
00261                 return flushIntervals[step];
00262         } else {
00263                 return flushIntervals[maxFlushDeep-1];
00264         }
00265 }
00266 
00267 void Job::clearSpecialRowsAreas() {
00268         for (map<string, SpecialRowsArea*>::iterator it = specialRowsAreas.begin(); it != specialRowsAreas.end(); it++) {
00269                 //printf("Deleting %s: %p\n", it->second->getDirectory().c_str(), it->second);
00270                 delete it->second;
00271         }
00272         specialRowsAreas.clear();
00273 }
00274 
00275 SpecialRowsArea* Job::getSpecialRowsArea(int stage, int id, int deep) {
00276     string name = getSpecialRowsPath(stage, id, deep);
00277     SpecialRowsArea* area = specialRowsAreas[name];
00278     if (area == NULL) {
00279         createPath(name);
00280         long long disk = disk_limit;
00281         if (stage == STAGE_2) {
00282                 disk = (disk + ram_limit)/SRA_DECAY - ram_limit;
00283         } else if (stage == STAGE_3) {
00284                 for (int i=0; i<=deep; i++) {
00285                         disk = (disk + ram_limit)/SRA_DECAY - ram_limit;
00286                 }
00287         }
00288         if (disk < 0) {
00289                 disk = 0;
00290         }
00291         printf(" Job::getSpecialRowsArea(%d, %d, %d) -> %d/%d\n", stage, id, deep, ram_limit, disk);
00292         area = new SpecialRowsArea(name, ram_limit, disk, aligner->getScoreParameters());
00293         specialRowsAreas[name] = area;
00294     } else {
00295         //area->reload();
00296     }
00297     return area;
00298 }
00299 
00300 string Job::getAlignmentBinaryFile(int id) {
00301     char str[500];
00302     sprintf(str, "%s/alignment.%02d.bin", work_path.c_str(), id);
00303         return str;
00304 }
00305 
00306 string Job::getAlignmentTextFile(int id) {
00307     char str[500];
00308     sprintf(str, "%s/alignment.%02d.txt", work_path.c_str(), id);
00309         return str;
00310 }
00311 
00312 
00313 string Job::getWorkPath() {
00314     return this->work_path;
00315 }
00316 
00317 
00318 void Job::createPath(string path) {
00319         if (mkdir(path.c_str(), 0774)) {
00320                 if (errno != EEXIST) {
00321                         fprintf(stderr, "Path (%s) could not be created. Try to create it manually. (errno: %d)\n", path.c_str(), errno);
00322                         exit(1);
00323                 }
00324         }
00325 }
00326 
00327 FILE* Job::fopenStatistics(int stage, int id) {
00328         char str[500];
00329         if (stage == STAGE_GLOBAL) {
00330                 sprintf(str, "%s/statistics", this->work_path.c_str());
00331         } else if (stage == ALIGNER_STATISTICS) {
00332                 sprintf(str, "%s/statistics.ALIGNER", this->work_path.c_str());
00333         } else {
00334                 sprintf(str, "%s/statistics_%02d.%02d", this->work_path.c_str(), stage, id);
00335         }
00336         FILE * file = fopen(str, "wt");
00337         if (file == NULL) {
00338                 fprintf(stderr, "Error opening statistics file: %s\n", str);
00339                 exit(1);
00340         }
00341         fprintf(file, "###############################\n");
00342         if (stage == STAGE_GLOBAL) {
00343                 fprintf(file, "#  GLOBAL STATISTICS          #\n", stage, id);
00344         } else if (stage == ALIGNER_STATISTICS) {
00345                 fprintf(file, "#  ALIGNER INITIALIZATION     #\n", stage, id);
00346         } else {
00347                 fprintf(file, "#  STATISTICS FOR STAGE #%d.%d  #\n", stage, id);
00348         }
00349         fprintf(file, "###############################\n");
00350         fflush(file);
00351         return file;
00352 }
00353 
00354 long long Job::getSRALimit() {
00355         if (disk_limit <= 0 && ram_limit <= 0) {
00356                 return 0;
00357         } else if (disk_limit <= 0) {
00358                 return ram_limit;
00359         } else if (ram_limit <= 0) {
00360                 return disk_limit;
00361         } else {
00362                 return disk_limit + ram_limit;
00363         }
00364 }
00365 
00366 string Job::resolve_env(string in) {
00367         wordexp_t p;
00368         char **w;
00369         int i;
00370 
00371         if (in.length() == 0) {
00372                 throw IllegalArgumentException("Empty argument.");
00373         }
00374 
00375         int ret = wordexp(in.c_str(), &p, WRDE_NOCMD | WRDE_UNDEF);
00376         std::stringstream var;
00377         var << "Variable: " << in;
00378         if (ret == WRDE_BADVAL) {
00379                 throw IllegalArgumentException("An undefined environment variable was referenced.", var.str().c_str());
00380         } else if (ret != 0) {
00381                 throw IllegalArgumentException("Wrong environment variable expansion.", var.str().c_str());
00382         } else if (p.we_wordc == 0) {
00383                 throw IllegalArgumentException("No wildcard ('*') expansion.", var.str().c_str());
00384         } else if (p.we_wordc > 1) {
00385                 throw IllegalArgumentException("Ambiguous wildcard ('*') expansion.", var.str().c_str());
00386         }
00387         w = p.we_wordv;
00388         string out = w[0];
00389         wordfree(&p);
00390 
00391         return out;
00392 }
00393 
00394 int Job::getPoolWaitId() const {
00395         return pool_wait_id;
00396 }
00397 
00398 void Job::setPoolWaitId(int id) {
00399         pool_wait_id = id;
00400 }
00401 
00402 int Job::getBufferLimit() const {
00403         return bufferLimit;
00404 }
00405 
00406 void Job::setBufferLimit(int bufferLimit) {
00407         this->bufferLimit = bufferLimit;
00408 }