MASA-Core
Peer.cpp
Go to the documentation of this file.
00001 /*******************************************************************************
00002  *
00003  * Copyright (c) 2010-2015   Edans Sandes
00004  *
00005  * This file is part of MASA-Core.
00006  * 
00007  * MASA-Core is free software: you can redistribute it and/or modify
00008  * it under the terms of the GNU General Public License as published by
00009  * the Free Software Foundation, either version 3 of the License, or
00010  * (at your option) any later version.
00011  * 
00012  * MASA-Core is distributed in the hope that it will be useful,
00013  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00014  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00015  * GNU General Public License for more details.
00016  * 
00017  * You should have received a copy of the GNU General Public License
00018  * along with MASA-Core.  If not, see <http://www.gnu.org/licenses/>.
00019  *
00020  ******************************************************************************/
00021 
00022 #include "Peer.hpp"
00023 
00024 #include "command/Command.hpp"
00025 #include "MasaNet.hpp"
00026 
00027 #include <stdio.h>
00028 #include <stdlib.h>
00029 #include <string.h>
00030 #include <string>
00031 using namespace std;
00032 
00033 #include <sys/socket.h> /* for socket(), bind(), and connect() */
00034 #include <arpa/inet.h>  /* for sockaddr_in and inet_ntoa() */
00035 #include <signal.h>
00036 #include <unistd.h>
00037 #include <errno.h>
00038 
00039 #define MAGIC_STRING    "MASA_NET"
00040 #define MAGIC_ANSWER    "MASA_ACK"
00041 
00042 #define MASA_NET_VERSION_MAJOR  0
00043 #define MASA_NET_VERSION_MINOR  1
00044 
00045 #define FLAG_NONE                       0x00000000
00046 
00047 #define SEND_ERROR_MSG  ("send: socket error")
00048 #define RECV_ERROR_MSG  ("recv: socket error")
00049 
00050 map<int, cmd_creator_f> Peer::cmdCreators;
00051 bool Peer::hasStaticEvent = false;
00052 timeval Peer::staticEvent;
00053 
00054 Peer::Peer(int socket, const string& localId, bool initiator, int connectionType) {
00055         this->socket = socket;
00056         this->connectionType = connectionType;
00057         this->initiator = initiator;
00058         this->connected = false;
00059         this->serial = 0;
00060         this->localId = localId;
00061         this->remoteType = TYPE_UNKNOWN;
00062         this->localType = TYPE_UNKNOWN;
00063         this->ringType = RING_NONE;
00064         this->error = false;
00065         this->handshakeDone = false;
00066 
00067         struct sockaddr_in sin;
00068         socklen_t len = sizeof(sin);
00069         if (getsockname(socket, (struct sockaddr *)&sin, &len) == -1) {
00070             perror("getsockname");
00071         }
00072 
00073         char desc[256];
00074         sprintf(desc, "%s:%d", inet_ntoa(sin.sin_addr), ntohs(sin.sin_port));
00075         localAddress = string(desc);
00076 
00077         if (getpeername(socket, (struct sockaddr *)&sin, &len) == -1) {
00078             perror("getpeername");
00079         }
00080         sprintf(desc, "%s:%d", inet_ntoa(sin.sin_addr), ntohs(sin.sin_port));
00081         remoteAddress = string(desc);
00082 
00083         fprintf(stderr, "Peer::Peer(%d, %s, %d): %p [%s]\n", socket, localId.c_str(), initiator, this, desc);
00084 
00085         pthread_mutex_init(&mutex, NULL);
00086     pthread_cond_init(&responseCond, NULL);
00087     pthread_cond_init(&handshakeCond, NULL);
00088 
00089     /* This code will prevent the SIGPIPE signal from being raised, but you
00090      * will get a read / write error when trying to use the socket, so you
00091      * will need to check for that.
00092      */
00093     signal(SIGPIPE, SIG_IGN);
00094 }
00095 
00096 Peer::Peer(string remoteId, string remoteAddress,
00097                 const int remoteType, const int ringType, int connectionType) {
00098         this->remoteId = remoteId;
00099         this->remoteAddress = remoteAddress;
00100         this->remoteType = remoteType;
00101         this->ringType = ringType;
00102         this->localType = TYPE_UNKNOWN;
00103         this->connected = false;
00104         this->serial = 0;
00105         this->initiator = false;
00106         this->socket = 0;
00107         this->error = false;
00108         this->handshakeDone = false;
00109         this->connectionType = connectionType;
00110 
00111         // TODO criar mutex, conds?
00112 }
00113 
00114 Peer::~Peer() {
00115         this->connected = false;
00116         pthread_mutex_destroy(&mutex);
00117     pthread_cond_destroy(&responseCond);
00118     pthread_cond_destroy(&handshakeCond);
00119 }
00120 
00121 bool Peer::handshake() {
00122         if (initiator) {
00123                 connected = handshakeInitiator();
00124         } else {
00125                 connected = handshakeInitiated();
00126         }
00127         pthread_mutex_lock(&mutex);
00128         handshakeDone = true;
00129         pthread_cond_broadcast(&handshakeCond);
00130         pthread_mutex_unlock(&mutex);
00131 
00132         if (connected) {
00133                 if (connectionType == CONNECTION_TYPE_DATA) {
00134                         connected = callback->onConnectData(this);
00135                 } else {
00136                         connected = callback->onConnect(this);
00137                 }
00138         }
00139 
00140         if (!connected) {
00141                 finalize();
00142         }
00143         return connected;
00144 }
00145 
00146 void Peer::finalize() {
00147         if (socket != 0) {
00148                 fprintf(stderr, "Peer::finalize (%s) %d\n", remoteId.c_str(), socket);
00149                 close(socket);
00150                 socket = 0;
00151         }
00152 }
00153 
00154 bool Peer::waitHandshake() {
00155         printf("Wait 1\n");
00156         pthread_mutex_lock(&mutex);
00157         printf("Wait 2\n");
00158         while (!handshakeDone) {
00159                 printf("Wait 3\n");
00160                 pthread_cond_wait(&handshakeCond, &mutex);
00161         }
00162         printf("Wait 4\n");
00163         pthread_mutex_unlock(&mutex);
00164         return connected;
00165 }
00166 
00167 bool Peer::handshakeInitiator() {
00168         char magic[32];
00169         recv_array(magic, strlen(MAGIC_STRING));
00170         if (memcmp(magic, MAGIC_STRING, strlen(MAGIC_STRING)) != 0) {
00171         fprintf(stderr, "handshake: wrong magic string: %s/%s %d\n", MAGIC_STRING, magic, socket);
00172         return 0;
00173         }
00174 
00175         send_array(MAGIC_ANSWER, strlen(MAGIC_ANSWER));
00176 
00177         int major = recv_int8();
00178         int minor = recv_int8();
00179 
00180         if (major != MASA_NET_VERSION_MAJOR) {
00181         fprintf(stderr, "handshake: unsupported version: %d.%d\n", major, minor);
00182         return 0;
00183         }
00184 
00185         int flags = FLAG_NONE;
00186         send_int32(flags);
00187 
00188         send_int8(connectionType);
00189 
00190         send_vls8(localId);
00191         remoteId = recv_vls8();
00192 
00193         send_int16(localType);
00194         remoteType = recv_int16();
00195 
00196         send_vls8(localAddress);
00197 
00198         return 1;
00199 }
00200 
00201 string Peer::toString() {
00202     stringstream msg;
00203         msg << remoteId << "[";
00204         switch (remoteType) {
00205                 case TYPE_CLI:
00206                         msg << "CL";
00207                         break;
00208                 case TYPE_PROCESSING_NODE:
00209                         msg << "PE";
00210                         break;
00211                 case TYPE_GATEWAY:
00212                         msg << "GW";
00213                         break;
00214                 case TYPE_UNKNOWN:
00215                         msg << "??";
00216                         break;
00217         }
00218         msg << "](" << remoteAddress << ")";
00219         if (ringType == RING_RIGHT) {
00220                 msg << " *R";
00221         } else if (ringType == RING_LEFT) {
00222                 msg << " *L";
00223         }
00224         if (connectionType == CONNECTION_TYPE_CTRL) {
00225                 msg << " CTRL";
00226         } else if (connectionType == CONNECTION_TYPE_DATA) {
00227                 msg << " DATA";
00228         } else {
00229                 msg << " ??";
00230         }
00231         return msg.str();
00232 }
00233 
00234 void Peer::registerCommandCreator(int id, cmd_creator_f creator) {
00235         cmdCreators[id] = creator;
00236 }
00237 
00238 bool Peer::handshakeInitiated() {
00239         send_array(MAGIC_STRING, strlen(MAGIC_STRING));
00240 
00241         char magic[32];
00242         recv_array(magic, strlen(MAGIC_ANSWER));
00243         if (memcmp(magic, MAGIC_ANSWER, strlen(MAGIC_ANSWER)) != 0) {
00244         fprintf(stderr, "handshake: wrong magic answer: %s/%s %d\n", MAGIC_ANSWER, magic, socket);
00245         return 0;
00246         }
00247 
00248         send_int8(MASA_NET_VERSION_MAJOR);
00249         send_int8(MASA_NET_VERSION_MINOR);
00250 
00251         int flags = recv_int32();
00252 
00253         connectionType = recv_int8();
00254 
00255         remoteId = recv_vls8();
00256         send_vls8(localId);
00257 
00258         remoteType = recv_int16();
00259         send_int16(localType);
00260 
00261         remoteAddress = recv_vls8();
00262 
00263         return 1;
00264 }
00265 
00266 
00267 bool Peer::isConnected() {
00268         return connected;
00269 }
00270 
00271 void Peer::addHook(int id, int serial) {
00272         pthread_t tid = pthread_self();
00273 
00274         pthread_mutex_lock(&mutex);
00275         hookThreads[id].insert(tid);
00276         hookSerial[tid] = serial;
00277         hookCommand[tid] = NULL;
00278         pthread_mutex_unlock(&mutex);
00279 }
00280 
00281 Command* Peer::waitHook() {
00282         pthread_t tid = pthread_self();
00283         Command* ret = NULL;
00284 
00285         pthread_mutex_lock(&mutex);
00286         while (hookCommand[tid] == NULL) {
00287                 pthread_cond_wait (&responseCond, &mutex);
00288                 // TODO colocar timeout?
00289                 // FIXME tratar caso de desconexão
00290         }
00291         ret = hookCommand[tid];
00292         hookCommand[tid] = NULL;
00293         hookThreads[ret->getId()].erase(tid);
00294         pthread_mutex_unlock(&mutex);
00295         return ret;
00296 }
00297 
00298 void Peer::notifyHook(Command* cmd) {
00299         int id = cmd->getId();
00300         pthread_mutex_lock(&mutex);
00301         printf("Hook Received %d. %d\n", id, hookThreads[id].size());
00302         if (hookThreads[id].size() > 0) {
00303                 bool wakeup = false;
00304                 for (std::set<pthread_t>::iterator it=hookThreads[id].begin(); it!=hookThreads[id].end(); ++it) {
00305                         printf("Hook Received: %d %d\n", id, hookSerial[*it]);
00306                         if (hookSerial[*it] == -1 || hookSerial[*it] == cmd->getSerial()) {
00307                                 hookCommand[*it] = cmd;
00308                                 hookThreads[id].erase(*it);
00309                                 wakeup = true;
00310                         }
00311                 }
00312                 if (wakeup) {
00313                         pthread_cond_broadcast(&responseCond);
00314                 }
00315         }
00316         pthread_mutex_unlock(&mutex);
00317 }
00318 
00319 int Peer::getNextSerial() {
00320         return ++serial;
00321 }
00322 
00323 
00324 int Peer::getLocalType() const {
00325         return localType;
00326 }
00327 
00328 void Peer::setLocalType(int localType) {
00329         this->localType = localType;
00330 }
00331 
00332 int Peer::getRemoteType() const {
00333         return remoteType;
00334 }
00335 
00336 const string& Peer::getLocalAddress() const {
00337         return localAddress;
00338 }
00339 
00340 const string& Peer::getRemoteAddress() const {
00341         return remoteAddress;
00342 }
00343 
00344 const string& Peer::getLocalId() const {
00345         return localId;
00346 }
00347 
00348 const string& Peer::getRemoteId() const {
00349         return remoteId;
00350 }
00351 
00352 void Peer::setLocalAddress(const string& publicAddress) {
00353         if (publicAddress[0] != ':') {
00354                 this->localAddress = publicAddress;
00355         } else {
00356                 int port;
00357                 sscanf(publicAddress.c_str(), ":%d", &port);
00358 
00359                 struct sockaddr_in sin;
00360                 socklen_t len = sizeof(sin);
00361                 if (getsockname(socket, (struct sockaddr *)&sin, &len) == -1) {
00362                     perror("getsockname");
00363                 }
00364                 char desc[256];
00365                 sprintf(desc, "%s:%d", inet_ntoa(sin.sin_addr), port);
00366                 this->localAddress = string(desc);
00367         }
00368 }
00369 
00370 Command* Peer::sendCommand(Command* command) {
00371         fprintf(stderr, "Will Lock: %d \n", command->getId());
00372         pthread_mutex_lock(&mutex);
00373         fprintf(stderr, " Locked 1\n");
00374         int id = command->getId();
00375         fprintf(stderr, " Locked 2\n");
00376         send_int16(id);
00377         fprintf(stderr, " Locked 3\n");
00378         if (id & REQUEST_COMMAND) {
00379                 int serial = getNextSerial();
00380                 command->setSerial(serial);
00381                 send_int32(serial);
00382         } else if (id & RESPONSE_COMMAND) {
00383                 send_int32(command->getSerial());
00384         }
00385         fprintf(stderr, " Locked 4\n");
00386         command->send(this);
00387         fprintf(stderr, "sendCommand(%s->%s): %d.%d \n", localId.c_str(), remoteId.c_str(), id, command->getSerial());
00388 
00389 
00390 
00391         Command* ret = NULL;
00392         if (id & REQUEST_COMMAND) {
00393                 pthread_t tid = pthread_self();
00394                 hookThreads[(id ^ REQUEST_COMMAND) | RESPONSE_COMMAND].insert(tid);
00395                 hookSerial[tid] = serial;
00396                 hookCommand[tid] = NULL;
00397                 printf("hook: %d %d\n", (id ^ REQUEST_COMMAND) | RESPONSE_COMMAND, serial);
00398                 while (hookCommand[tid] == NULL) {
00399                         fprintf(stderr, "wait response\n");
00400                         pthread_cond_wait (&responseCond, &mutex);
00401                 }
00402                 ret = hookCommand[tid];
00403         }
00404 
00405         pthread_mutex_unlock(&mutex);
00406 
00407         return ret;
00408 
00409 }
00410 
00411 
00412 Command* Peer::recvCommand() {
00413         fprintf(stderr, "recvCommand(%s<-%s): Waiting... %p\n", localId.c_str(), remoteId.c_str(), this);
00414         int id = recv_int16();
00415         int serial = 0;
00416         if ((id & REQUEST_COMMAND) || (id & RESPONSE_COMMAND)) {
00417                 serial = recv_int32();
00418         }
00419         cmd_creator_f creator = cmdCreators[id];
00420         if (creator == NULL) {
00421                 fprintf(stderr, "cmd: unsupported command [%d]\n", id);
00422                 // TODO
00423         }
00424         Command* cmd = creator();
00425         cmd->setSerial(serial);
00426         cmd->receive(this);
00427         fprintf(stderr, "recvCommand(%s<-%s): %d.%d \n", localId.c_str(), remoteId.c_str(), id, cmd->getSerial());
00428 
00429         /*if (id & RESPONSE_COMMAND) {
00430                 pthread_mutex_lock(&mutex);
00431                 response = cmd;
00432                 pthread_cond_broadcast (&responseCond);
00433                 while (response != NULL && waitingResponse > 0) {
00434                         pthread_cond_wait (&responseCond, &mutex);
00435                         // TODO colocar timeout?
00436                         // FIXME tratar caso de disconexão
00437                 }
00438                 pthread_mutex_unlock(&mutex);
00439         }*/
00440 
00441         notifyHook(cmd);
00442 
00443         return cmd;
00444 }
00445 
00446 
00447 
00448 void Peer::handleSendError(int ret) {
00449     if (ret <= 0) {
00450         callback->onDisconnect(this);
00451         throw IOException(SEND_ERROR_MSG);
00452     }
00453 }
00454 
00455 void Peer::handleRecvError(int ret) {
00456     if (ret <= 0) {
00457         perror("Error reading from server");
00458         fprintf(stderr, "Ret: %d (%d)\n", ret, socket);
00459 
00460         callback->onDisconnect(this);
00461         throw IOException(RECV_ERROR_MSG);
00462     }
00463 }
00464 
00465 
00466 void Peer::send_int32(int value) {
00467         int msg = htonl(value);
00468         int ret = send(socket, &msg, sizeof(msg), 0);
00469         handleSendError(ret);
00470 }
00471 
00472 int Peer::recv_int32() {
00473         int msg;
00474         int ret = recv(socket, &msg, sizeof(msg), 0);
00475         handleRecvError(ret);
00476 
00477     return ntohl(msg);
00478 }
00479 
00480 void Peer::send_int16(short value) {
00481         short msg = htons(value);
00482         int ret = send(socket, &msg, sizeof(msg), 0);
00483         handleSendError(ret);
00484 }
00485 
00486 short Peer::recv_int16() {
00487         short msg;
00488         int ret = recv(socket, &msg, sizeof(msg), 0);
00489         handleRecvError(ret);
00490 
00491     return ntohs(msg);
00492 }
00493 
00494 void Peer::send_int8(char value) {
00495         char msg = value;
00496         int ret = send(socket, &msg, sizeof(msg), 0);
00497         handleSendError(ret);
00498 }
00499 
00500 char Peer::recv_int8() {
00501         char msg;
00502         int ret = recv(socket, &msg, sizeof(msg), 0);
00503         handleRecvError(ret);
00504 
00505     return msg;
00506 }
00507 
00508 void Peer::send_array(const char* value, int len) {
00509         int ret = send(socket, value, len, 0);
00510         handleSendError(ret);
00511 }
00512 
00513 void Peer::recv_array(char* value, int len) {
00514         char msg;
00515         int ret = recv(socket, value, len, 0);
00516         handleRecvError(ret);
00517 
00518 }
00519 
00520 void Peer::send_vls8(const char* value) {
00521         int len = strlen(value);
00522         if (len > 255) {
00523                 len = 255;
00524         }
00525         send_int8(len);
00526         send_array(value, len);
00527 }
00528 
00529 void Peer::recv_vls8(char* value, int max) {
00530         unsigned char len = (unsigned char)recv_int8();
00531         //fprintf(stderr, "recv_vls8: %d\n", len);
00532         if (max != -1 && len > max-1) {
00533                 recv_array(value, max-1);
00534                 value[max-1] = '\0';
00535                 recv_dummy(len-max+1);
00536         } else {
00537                 recv_array(value, len);
00538                 value[len] = '\0';
00539         }
00540 }
00541 
00542 void Peer::send_vls8(const string& value) {
00543         send_vls8(value.c_str());
00544 }
00545 
00546 string Peer::recv_vls8() {
00547         char str[256];
00548         recv_vls8(str, sizeof(str));
00549         return string(str);
00550 }
00551 
00552 void Peer::recv_dummy(int len) {
00553         char dummy[len];
00554         recv_array(dummy, len);
00555 }
00556 
00557 float Peer::getGlobalTime() {
00558         if (!hasStaticEvent) {
00559                 hasStaticEvent = true;
00560                 gettimeofday(&staticEvent, NULL);
00561         }
00562 
00563         timeval event;
00564         gettimeofday(&event, NULL);
00565 
00566         float diff = getElapsedTime(&event, &staticEvent);
00567         return diff;
00568 }
00569 
00570 void Peer::setCallback(MasaNetCallbacks* callback) {
00571         this->callback = callback;
00572 }
00573 
00574 bool Peer::isInitiator() const {
00575         return initiator;
00576 }
00577 
00578 int Peer::getSocket() const {
00579         return socket;
00580 }
00581 
00582 int Peer::getConnectionType() const {
00583         return connectionType;
00584 }
00585 
00586 float Peer::getElapsedTime(timeval *end_time, timeval *start_time) {
00587         timeval temp_diff;
00588 
00589         temp_diff.tv_sec = end_time->tv_sec - start_time->tv_sec;
00590         temp_diff.tv_usec = end_time->tv_usec - start_time->tv_usec;
00591 
00592         if (temp_diff.tv_usec < 0) {
00593                 long long nsec = temp_diff.tv_usec/1000000;
00594                 temp_diff.tv_usec += 1000000 * nsec;
00595                 temp_diff.tv_sec -= nsec;
00596         }
00597 
00598         return (1000000LL * temp_diff.tv_sec + temp_diff.tv_usec)/1000.0f;
00599 
00600 }