|
MASA-Core
|
00001 /******************************************************************************* 00002 * 00003 * Copyright (c) 2010-2015 Edans Sandes 00004 * 00005 * This file is part of MASA-Core. 00006 * 00007 * MASA-Core is free software: you can redistribute it and/or modify 00008 * it under the terms of the GNU General Public License as published by 00009 * the Free Software Foundation, either version 3 of the License, or 00010 * (at your option) any later version. 00011 * 00012 * MASA-Core is distributed in the hope that it will be useful, 00013 * but WITHOUT ANY WARRANTY; without even the implied warranty of 00014 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00015 * GNU General Public License for more details. 00016 * 00017 * You should have received a copy of the GNU General Public License 00018 * along with MASA-Core. If not, see <http://www.gnu.org/licenses/>. 00019 * 00020 ******************************************************************************/ 00021 00022 #include "Job.hpp" 00023 00024 #include <iostream> 00025 #include <sstream> 00026 using namespace std; 00027 00028 #include <stdio.h> 00029 #include <string.h> 00030 #include <algorithm> 00031 #include <sys/stat.h> 00032 #include <errno.h> 00033 #include <sys/types.h> 00034 #include <wordexp.h> 00035 #include "Properties.hpp" 00036 #include "SpecialRowWriter.hpp" 00037 #include "exceptions/exceptions.hpp" 00038 00039 #define DEBUG (0) 00040 00041 #define SRA_DECAY (1.0f) 00042 00043 Job::Job(int sequencesCount) { 00044 this->alignment_params = new AlignmentParams(); 00045 this->alignment = NULL; 00046 this->alignerPool = NULL; 00047 //this->alignment = new Alignment(alignment_params); 00048 this->special_rows_path = ""; 00049 this->aligner = NULL; 00050 this->flushIntervals = NULL; 00051 this->maxFlushDeep = 20; 00052 this->pool_wait_id = -1; 00053 this->bufferLimit = 0; 00054 } 00055 00056 Job::~Job() { 00057 //delete alignment; 00058 delete alignment_params; 00059 clearSpecialRowsAreas(); 00060 } 00061 00062 int Job::initialize() { 00063 initializeWorkPath(); 00064 SequenceInfo* seq0 = alignment_params->getSequence(0)->getInfo(); 00065 SequenceInfo* seq1 = alignment_params->getSequence(1)->getInfo(); 00066 00067 calculateFlushIntervals(maxFlushDeep, getSRALimit(), seq0->getSize(), seq1->getSize()); 00068 00069 Properties prop; 00070 if (prop.initialize(this->info_filename.c_str())) { 00071 string file0 = prop.get_property("seq0"); 00072 string file1 = prop.get_property("seq1"); 00073 int file0_mismatch = (file0.compare(seq0->getDescription())); 00074 int file1_mismatch = (file1.compare(seq1->getDescription())); 00075 if (file0_mismatch || file1_mismatch) { 00076 printf("Sequence mismatch from previous run. Try cleaning work directory (--clean)\n"); 00077 } 00078 if (file0_mismatch) { 00079 printf("%s != %s\n", file0.c_str(), seq0->getDescription().c_str()); 00080 } 00081 if (file1_mismatch) { 00082 printf("%s != %s\n", file1.c_str(), seq1->getDescription().c_str()); 00083 } 00084 if (file0_mismatch || file1_mismatch) { 00085 return 0; // TODO retornar excecao 00086 } 00087 } else { 00088 FILE* f = fopen(this->info_filename.c_str(), "wt"); 00089 fprintf(f, "seq0=%s\n", seq0->getDescription().c_str()); 00090 fprintf(f, "seq1=%s\n", seq1->getDescription().c_str()); 00091 fclose(f); 00092 } 00093 00094 //cout << this->phase << endl; 00095 //cout << this->progress << endl; 00096 cout << this->crosspoints_path << endl; 00097 cout << this->special_rows_path << endl; 00098 00099 // TODO poderia ficar fora da class Job? 00100 aligner->initialize(); 00101 00102 if (getAlignerPool() != NULL) { 00103 int j0 = getAlignmentParams()->getSequence(1)->getTrimStart()-1; 00104 int j1 = getAlignmentParams()->getSequence(1)->getTrimEnd(); 00105 int jj0 = getAlignmentParams()->getSequence(1)->getModifiers()->getTrimStart()-1; 00106 int jj1 = getAlignmentParams()->getSequence(1)->getModifiers()->getTrimEnd(); 00107 00108 getAlignerPool()->registerNode(j0, j0==jj0?-1:j0, j1==jj1?-1:j1, flush_column_url); 00109 00110 // if (/*load_column_url.length() == 0 &&*/ j0 != jj0) { 00111 // load_column_url = getAlignerPool()->getLoadURL(j0); 00112 // } 00113 } 00114 00115 return 1; // TODO remover quando for retornar excecao 00116 } 00117 00118 void Job::initializeWorkPath() { 00119 if (this->pool_shared_path.length() == 0) { 00120 this->pool_shared_path = work_path + "/shared"; 00121 } 00122 00123 if (aligner->getParameters()->getForkId() != NOT_FORKED_INSTANCE) { 00124 createPath(this->work_path); 00125 00126 char suffix[20]; 00127 sprintf ( suffix, "/FORK.%02d", aligner->getParameters()->getForkId()); 00128 work_path = work_path + "/" + suffix; 00129 } 00130 fprintf(stderr, "Work Path: %s\n", work_path.c_str()); 00131 00132 this->dump_pruning_text_filename = work_path + "/pruning_dump.txt"; 00133 this->outputBufferLogFile = work_path + "/outputBuffer.log"; 00134 this->inputBufferLogFile = work_path + "/inputBuffer.log"; 00135 this->crosspoints_path = work_path + "/crosspoints"; 00136 if (this->special_rows_path.length() == 0) { 00137 this->special_rows_path = work_path + "/special_rows"; 00138 } 00139 this->info_filename = work_path + "/info"; 00140 this->status_filename = work_path + "/status"; 00141 00142 createPath(this->work_path); 00143 createPath(this->crosspoints_path); 00144 createPath(this->special_rows_path); 00145 createPath(this->pool_shared_path); 00146 } 00147 00148 void Job::setWorkPath(string workPath) { 00149 this->work_path = resolve_env(workPath); 00150 } 00151 00152 void Job::setSpecialRowsPath(string specialRowsPath) { 00153 this->special_rows_path = resolve_env(specialRowsPath); 00154 } 00155 00156 void Job::setSharedPath(string sharedPath) { 00157 this->pool_shared_path = resolve_env(sharedPath); 00158 } 00159 00160 int Job::getSequenceCount() const { 00161 return sequences.size(); 00162 } 00163 00164 void Job::addSequence(Sequence* sequence) { 00165 sequences.push_back(sequence); 00166 } 00167 00168 Sequence* Job::getSequence(int index) { 00169 return sequences[index]; 00170 } 00171 00172 AlignmentParams* Job::getAlignmentParams() const { 00173 return alignment_params; 00174 } 00175 00176 Alignment* Job::getAlignment() const { 00177 return alignment; 00178 } 00179 00180 void Job::setAlignment(Alignment* alignment) { 00181 this->alignment = alignment; 00182 } 00183 00184 void Job::loadSequenceData(Sequence* sequence) { 00185 for (int i=0; i<sequences.size(); i++) { 00186 if (sequence->getInfo()->isEqual(sequences[i]->getInfo())) { 00187 sequence->copyData(sequences[i]); 00188 } 00189 } 00190 } 00191 00192 string Job::getCrosspointFile(int stage, int id, int deep) { 00193 char str[500]; 00194 if (deep <= -1) { 00195 sprintf(str, "%s/crosspoint_%02d.%02d", crosspoints_path.c_str(), stage, id); 00196 } else { 00197 sprintf(str, "%s/crosspoint_%02d.%02d.r%02d", crosspoints_path.c_str(), stage, id, deep); 00198 } 00199 return str; 00200 } 00201 00202 string Job::getSpecialRowsPath(int stage, int id, int deep) { 00203 char str[500]; 00204 if (deep <= -1) { 00205 sprintf(str, "%s/stage.%02d.%02d", special_rows_path.c_str(), stage, id); 00206 } else { 00207 sprintf(str, "%s/stage.%02d.%02d.r%02d", special_rows_path.c_str(), stage, id, deep); 00208 } 00209 createPath(str); 00210 return string(str); 00211 } 00212 00213 void Job::clearSpecialRowsArea(SpecialRowsArea** area) { 00214 string name = (*area)->getDirectory(); 00215 //printf("Deleting %s: %p\n", name.c_str(), *area); 00216 specialRowsAreas.erase(name); 00217 delete *area; 00218 *area = NULL; 00219 } 00220 00221 AlignerPool* Job::getAlignerPool() { 00222 if (alignerPool == NULL) { 00223 if (flush_column_url.length() > 0 || load_column_url.length() > 0) { 00224 alignerPool = new AlignerPool(pool_shared_path); 00225 alignerPool->initialize(); 00226 } 00227 } 00228 return alignerPool; 00229 } 00230 00231 void Job::calculateFlushIntervals(int max_deep, long long limit, int seq0_len, int seq1_len) { 00232 if (flushIntervals != NULL) { 00233 delete flushIntervals; 00234 } 00235 flushIntervals = new int[max_deep]; 00236 if (limit < seq1_len*sizeof(cell_t)*2) { 00237 limit = seq1_len*sizeof(cell_t)*2; 00238 } 00239 if (DEBUG) printf("calculate_intervals: limit: %d\n", limit); 00240 00241 flushIntervals[0] = (int)(((long long)seq0_len)*seq1_len*sizeof(cell_t)/limit + 1); 00242 flushIntervals[1] = (int)(((long long)flushIntervals[0])*seq1_len*sizeof(cell_t)/(limit/SRA_DECAY) + 1); 00243 00244 long long interval = flushIntervals[1]; 00245 00246 for (int i=2; i<max_deep; i++) { 00247 flushIntervals[i] = (int)(((long long)flushIntervals[i-1])*(i%2==0?seq0_len:seq1_len)*sizeof(cell_t)/(limit/SRA_DECAY) + 1); 00248 if (flushIntervals[i] > flushIntervals[i-2]/2) { 00249 flushIntervals[i] = flushIntervals[i-2]/2; // Ensure that each even step decreases at least twice 00250 } 00251 } 00252 if (DEBUG) { 00253 for (int i=0; i<max_deep; i++) { 00254 fprintf(stderr, "Interval: %d\n", flushIntervals[i]); 00255 } 00256 } 00257 } 00258 00259 long long Job::getFlushInterval(int step) { 00260 if (step < maxFlushDeep) { 00261 return flushIntervals[step]; 00262 } else { 00263 return flushIntervals[maxFlushDeep-1]; 00264 } 00265 } 00266 00267 void Job::clearSpecialRowsAreas() { 00268 for (map<string, SpecialRowsArea*>::iterator it = specialRowsAreas.begin(); it != specialRowsAreas.end(); it++) { 00269 //printf("Deleting %s: %p\n", it->second->getDirectory().c_str(), it->second); 00270 delete it->second; 00271 } 00272 specialRowsAreas.clear(); 00273 } 00274 00275 SpecialRowsArea* Job::getSpecialRowsArea(int stage, int id, int deep) { 00276 string name = getSpecialRowsPath(stage, id, deep); 00277 SpecialRowsArea* area = specialRowsAreas[name]; 00278 if (area == NULL) { 00279 createPath(name); 00280 long long disk = disk_limit; 00281 if (stage == STAGE_2) { 00282 disk = (disk + ram_limit)/SRA_DECAY - ram_limit; 00283 } else if (stage == STAGE_3) { 00284 for (int i=0; i<=deep; i++) { 00285 disk = (disk + ram_limit)/SRA_DECAY - ram_limit; 00286 } 00287 } 00288 if (disk < 0) { 00289 disk = 0; 00290 } 00291 printf(" Job::getSpecialRowsArea(%d, %d, %d) -> %d/%d\n", stage, id, deep, ram_limit, disk); 00292 area = new SpecialRowsArea(name, ram_limit, disk, aligner->getScoreParameters()); 00293 specialRowsAreas[name] = area; 00294 } else { 00295 //area->reload(); 00296 } 00297 return area; 00298 } 00299 00300 string Job::getAlignmentBinaryFile(int id) { 00301 char str[500]; 00302 sprintf(str, "%s/alignment.%02d.bin", work_path.c_str(), id); 00303 return str; 00304 } 00305 00306 string Job::getAlignmentTextFile(int id) { 00307 char str[500]; 00308 sprintf(str, "%s/alignment.%02d.txt", work_path.c_str(), id); 00309 return str; 00310 } 00311 00312 00313 string Job::getWorkPath() { 00314 return this->work_path; 00315 } 00316 00317 00318 void Job::createPath(string path) { 00319 if (mkdir(path.c_str(), 0774)) { 00320 if (errno != EEXIST) { 00321 fprintf(stderr, "Path (%s) could not be created. Try to create it manually. (errno: %d)\n", path.c_str(), errno); 00322 exit(1); 00323 } 00324 } 00325 } 00326 00327 FILE* Job::fopenStatistics(int stage, int id) { 00328 char str[500]; 00329 if (stage == STAGE_GLOBAL) { 00330 sprintf(str, "%s/statistics", this->work_path.c_str()); 00331 } else if (stage == ALIGNER_STATISTICS) { 00332 sprintf(str, "%s/statistics.ALIGNER", this->work_path.c_str()); 00333 } else { 00334 sprintf(str, "%s/statistics_%02d.%02d", this->work_path.c_str(), stage, id); 00335 } 00336 FILE * file = fopen(str, "wt"); 00337 if (file == NULL) { 00338 fprintf(stderr, "Error opening statistics file: %s\n", str); 00339 exit(1); 00340 } 00341 fprintf(file, "###############################\n"); 00342 if (stage == STAGE_GLOBAL) { 00343 fprintf(file, "# GLOBAL STATISTICS #\n", stage, id); 00344 } else if (stage == ALIGNER_STATISTICS) { 00345 fprintf(file, "# ALIGNER INITIALIZATION #\n", stage, id); 00346 } else { 00347 fprintf(file, "# STATISTICS FOR STAGE #%d.%d #\n", stage, id); 00348 } 00349 fprintf(file, "###############################\n"); 00350 fflush(file); 00351 return file; 00352 } 00353 00354 long long Job::getSRALimit() { 00355 if (disk_limit <= 0 && ram_limit <= 0) { 00356 return 0; 00357 } else if (disk_limit <= 0) { 00358 return ram_limit; 00359 } else if (ram_limit <= 0) { 00360 return disk_limit; 00361 } else { 00362 return disk_limit + ram_limit; 00363 } 00364 } 00365 00366 string Job::resolve_env(string in) { 00367 wordexp_t p; 00368 char **w; 00369 int i; 00370 00371 if (in.length() == 0) { 00372 throw IllegalArgumentException("Empty argument."); 00373 } 00374 00375 int ret = wordexp(in.c_str(), &p, WRDE_NOCMD | WRDE_UNDEF); 00376 std::stringstream var; 00377 var << "Variable: " << in; 00378 if (ret == WRDE_BADVAL) { 00379 throw IllegalArgumentException("An undefined environment variable was referenced.", var.str().c_str()); 00380 } else if (ret != 0) { 00381 throw IllegalArgumentException("Wrong environment variable expansion.", var.str().c_str()); 00382 } else if (p.we_wordc == 0) { 00383 throw IllegalArgumentException("No wildcard ('*') expansion.", var.str().c_str()); 00384 } else if (p.we_wordc > 1) { 00385 throw IllegalArgumentException("Ambiguous wildcard ('*') expansion.", var.str().c_str()); 00386 } 00387 w = p.we_wordv; 00388 string out = w[0]; 00389 wordfree(&p); 00390 00391 return out; 00392 } 00393 00394 int Job::getPoolWaitId() const { 00395 return pool_wait_id; 00396 } 00397 00398 void Job::setPoolWaitId(int id) { 00399 pool_wait_id = id; 00400 } 00401 00402 int Job::getBufferLimit() const { 00403 return bufferLimit; 00404 } 00405 00406 void Job::setBufferLimit(int bufferLimit) { 00407 this->bufferLimit = bufferLimit; 00408 }
1.7.6.1