00001 /* 00002 * This file is part of openMask © INRIA, CNRS, Universite de Rennes 1 1993-2002, thereinafter the Software 00003 * 00004 * The Software has been developped within the Siames Project. 00005 * INRIA, the University of Rennes 1 and CNRS jointly hold intellectual property rights 00006 * 00007 * The Software has been registered with the Agence pour la Protection des 00008 * Programmes (APP) under registration number IDDN.FR.001.510008.00.S.P.2001.000.41200 00009 * 00010 * This file may be distributed under the terms of the Q Public License 00011 * version 1.0 as defined by Trolltech AS of Norway and appearing in the file 00012 * LICENSE.QPL included in the packaging of this file. 00013 * 00014 * Licensees holding valid specific licenses issued by INRIA, CNRS or Université de Rennes 1 00015 * for the software may use this file in accordance with that specific license 00016 * 00017 */ 00018 #include <OMKSvm.h> 00019 #include <OMKSvmLink.h> 00020 #include "OMKSynchronisationMessage.h" 00021 #include <OMKPvmMessage.h> 00022 #include <OMKPvmMulticastMessage.h> 00023 #include <OMKPvmUnicastMessage.h> 00024 #include <OMKProcess.h> 00025 #include <OMKPvmController.h> 00026 #include "OMKPvmNameServer.h" 00027 #include "OMKPvmCentralNameServer.h" 00028 #include <pvm3.h> 00029 #include "OMKPvmSvm.h" 00030 #ifndef _MSC_VER 00031 #include <sys/time.h> 00032 #endif 00033 #include "OMKTracer.h" 00034 00035 using namespace std; 00036 using namespace OMK ; 00037 00038 std::string Svm::_slaveGroupName = "spawnedControllers"; 00039 std::string Svm::_masterSiteName = "OpenMaskMasterDistributedSite" ; 00040 00041 #ifdef _MSC_VER 00042 int 00043 gettimeofday(struct timeval *t, struct timezone *tzp) 00044 { 00045 struct _timeb timebuffer; 00046 00047 /* this calls the time and returns sec ** msec */ 00048 _ftime( &timebuffer ); 00049 00050 t->tv_sec=timebuffer.time; 00051 t->tv_usec=timebuffer.millitm*1000; 00052 return 1; 00053 } 00054 #endif 00055 00056 //----------------------------------------------------------------------- 00057 00058 Svm::Svm (NameToPointerMap<Process> * tab, const Date & latence, const Date & timeOut, 00059 const int deadReckoningInterval, const bool yieldNeeded) : 00060 // _processTable ( tab ), 00061 // _activeProcessTable (tab ), 00062 _troubleFlag ( false ), 00063 _simStep ( 0 ), 00064 _siteId ( -1 ), //negative values indicates an error 00065 _numberOfSlaves ( 0 ), 00066 _synchronisationLatency (latence), 00067 _synchronisationTimeOut (timeOut), 00068 _deadReckoningInterval (deadReckoningInterval), 00069 _yieldNeeded (yieldNeeded) 00070 { 00071 _processTable = new NameToPointerMap<Process>(); 00072 for ( NameToPointerMap<Process>::iterator processIterator = tab->begin () ; 00073 processIterator != tab->end () ; 00074 ++processIterator 00075 ) 00076 { 00077 _processTable->addObjectWithIndex(processIterator->first, processIterator->second ); 00078 00079 } 00080 00081 //--------------chadi ajout 1/6 ------------------------ 00082 _temporaryTable = new NameToPointerMap<Process>(); 00083 _disconnectedTable = new NameToPointerMap<Process>(); 00084 _activeProcessTable = new NameToPointerMap<Process>(); 00085 _disconnectedTable->clear(); 00086 for ( NameToPointerMap<Process>::iterator processIterator = _processTable->begin () ; 00087 processIterator != _processTable->end () ; 00088 ++processIterator 00089 ) 00090 { 00091 _activeProcessTable->addObjectWithIndex(processIterator->first, processIterator->second ); 00092 00093 } 00094 00095 firstTime = true; 00096 aRecevoir.clear(); 00097 //--------------fin chadi ajout 1/6 ------------------------ 00098 // TDTD ajout pour simuler des défaillances réseau 00099 #ifdef _MSC_VER 00100 srand( getpid() ) ; 00101 #else 00102 srandom (getpid ()) ; 00103 #endif 00104 } 00105 00106 00107 Svm::~Svm () 00108 { 00109 } 00110 00111 00112 const Date & Svm::getSynchronisationLatency() 00113 { 00114 return _synchronisationLatency; 00115 } 00116 00117 00118 00119 void Svm::init (const Date & initialSimulationDate) 00120 { 00121 if ( getParentSiteId () == 0 ) // wasn't created by a spawn 00122 { 00123 createDistributedSimulation ( initialSimulationDate ) ; 00124 } 00125 else 00126 { 00127 connectToDistributedSimulation () ; 00128 } 00129 } 00130 00131 00132 //----------------------------------------------------------------------- 00133 00134 //*********************** chadi ***************************** 00135 00136 void Svm::addNewSiteToSimulation (const Date & date) 00137 { 00138 dateVar = date ; 00139 00140 Process * theNewProcessToAdd = new Process ( "visualProcessC", "maxipes.irisa.fr" ); 00141 00142 00144 00145 // addNewWorkstation ( "barbux.irisa.fr" ) ; 00146 addNewWorkstation ( theNewProcessToAdd->getHostMachineName () ) ; 00147 00148 _processTable->addObjectWithIndex ("visualProcessC", theNewProcessToAdd ) ; 00149 00150 00151 int adrLoc ; 00152 00153 int processSiteId ; 00154 00155 cout<<"New Process will be Spawned .....///"<<endl; 00156 cout<<"theNewProcessToAdd is named : "<<theNewProcessToAdd->getProcessName()<<endl; 00157 00158 // spawn a new process on the specified workstation 00159 adrLoc = spawnProcess (theNewProcessToAdd) ; 00160 00161 cout<<"New Process Spawned .....valeur de spawn est egale a : "<<adrLoc<<endl; 00162 00163 // create a SvmLink, add it to processInfo 00164 theNewProcessToAdd->setSvmLink (createSvmLink (adrLoc)) ; 00165 00166 _idToProcessTable.insert ( make_pair ( adrLoc,theNewProcessToAdd ) ) ; 00167 00168 theNewProcessToAdd->getSvmLink ()->getOutgoingBuffer().insertTimeStamp ( date ) ; 00169 00170 00171 // send the name of the process (in the Svm) to the created process 00172 theNewProcessToAdd->getSvmLink ()->getOutgoingBuffer()<<theNewProcessToAdd->getProcessName() ; 00173 00174 theNewProcessToAdd->getSvmLink ()->getOutgoingBuffer().send (PvmMessage::SiteName) ; 00175 00176 //prepare processSiteIds for the next step 00177 processSiteId = theNewProcessToAdd->getSvmLink ()->getTID() ; 00178 // processSiteIds.push_back ( theNewProcessToAdd->getSvmLink ()->getTID() ) ; 00179 00180 00181 // Encodage des attributs Svm de chaque processus 00182 PvmUnicastMessage nameToIdMessage ( processSiteId ) ; 00183 nameToIdMessage.insertTimeStamp ( date ) ; 00184 00186 NameToPointerMap<Process>::iterator i ; 00187 for (i = _processTable->begin () ; i != _processTable->end () ; i ++) 00188 { 00189 nameToIdMessage << *((*i).second) ; 00190 } 00191 00192 // Transfert des adresses vers le controleur local du process ajouté 00193 nameToIdMessage.send ( PvmMessage::ProcessTable ) ; 00194 00195 00196 // envoyer le processTable Update a tt le monde 00197 PvmMulticastMessage updateProcessTableMessage ( processSiteIds ) ; 00198 updateProcessTableMessage.insertTimeStamp ( date ) ; 00199 00200 updateProcessTableMessage << *theNewProcessToAdd ; 00201 // updateProcessTableMessage << (theNewProcessToAdd->getProcessName()) ; 00202 cout<<" theNewProcessToAdd->getProcessName() = "<<theNewProcessToAdd->getProcessName()<<endl; 00203 00204 updateProcessTableMessage.send ( PvmMessage::updateProcessTable ) ; 00205 00206 00207 processSiteIds.push_back ( theNewProcessToAdd->getSvmLink ()->getTID() ) ; 00208 cout<<"Finishing method addNewSiteToSimulation"<<endl; 00209 00210 } 00211 00212 void Svm::testIfNewProcessAdded () 00213 { 00214 PvmIncomingMessage * message = NULL ; 00215 message = _linkToCentralSite->testIfRequestUpdateProcessTable (PvmMessage::updateProcessTable) ; 00216 if ( message != NULL ) 00217 { 00218 Name zico ; 00219 // *message>>zico ; 00220 // cout<<"le nom de process recu est : "<<zico<<endl; 00221 Process* p = new Process ( "visualProcessC", "maxipes.irisa.fr" ); 00222 00223 // Process *p ; 00224 // *message>> *p ; 00225 _processTable->addObjectWithIndex(p->getProcessName(),p ) ; 00226 ++_numberOfSlaves ; 00227 00228 // Process * p = (*i).second ; 00229 // creation d'un canal sans destinataire 00230 p->setSvmLink (createSvmLink ()) ; 00231 // maj du destinataire ????????????????????????????????? 00232 *npp >> *p ; 00233 00234 _idToProcessTable.insert ( make_pair ( p->getSvmLink()->getTID(),p ) ) ; 00235 00236 PvmUnicastMessage initSucMessage ( p->getSvmLink()->getTID() ) ; 00237 initSucMessage.insertTimeStamp ( dateVar ) ; 00238 initSucMessage.send ( PvmMessage::LocalInitSuccessfull ) ; 00239 00240 NameToPointerMap<Process>::iterator i ; 00241 for (i = _processTable->begin () ; i != _processTable->end () ; i ++) 00242 { 00243 cout<<"_processTable contient: "<<(*i).second->getProcessName()<<endl; 00244 } 00245 00246 syncDistributedSites(); 00247 } 00248 } 00249 00250 00251 00252 //**** 00253 //************************************************************ 00254 00256 /* 00257 void Svm::createDistributedSimulation(const Date & initialSimulationDate ) 00258 { 00260 Process * miaou = new Process ( "visualProcessTest", "barbux.irisa.fr" ); 00261 _processTable->addObjectWithIndex ("visualProcessTest", miaou ) ; 00263 00264 00265 #ifdef _DEBUGPVMEXEC 00266 cerr << "Svm::createDistributedSimulation : creating the distributed simulation" << endl ; 00267 #endif 00268 _siteName = _masterSiteName ; 00269 00271 00272 NameToPointerMap<Process>::iterator i ; 00273 00274 for (i = _processTable->begin () ; i != _processTable->end () ; ++i ) 00275 { 00276 addNewWorkstation ( (*i).second->getHostMachineName () ) ; 00277 } 00278 00280 // Process * miaou = new Process ( "visualProcessTest", "barbux.irisa.fr" ); 00281 00282 // addNewWorkstation ( "barbux.irisa.fr" ) ; 00283 00284 // _processTable->addObjectWithIndex ("visualProcessTest", miaou ) ; 00285 00286 for ( NameToPointerMap<Process>::iterator pi = _processTable->begin () ; 00287 pi != _processTable->end () ; 00288 ++pi 00289 ) 00290 { 00291 00292 cout<<"pour s'assurer ke le proc est ajoute koi (createDistributedSimulation) : "<<endl; 00293 cout<<"nom de proc est: "<< pi->second->getProcessName()<<" nom de host machine est: "<<pi->second->getHostMachineName()<<endl; 00294 } 00296 00297 // creation des processus locaux 00298 #ifdef _DEBUGPVMEXEC 00299 cerr << "Creation des processus locaux..................." << endl ; 00300 #endif 00301 Process * processInfo ; 00302 int adrLoc ; 00303 00304 std::vector<int> processSiteIds ; 00305 00306 for (i = _processTable->begin () ; i != _processTable->end () ; i ++) 00307 { 00308 processInfo = (*i).second ; 00309 00310 // spawn a new process on the specified workstation 00311 adrLoc = spawnProcess (processInfo) ; 00312 00313 // create a SvmLink, add it to processInfo 00314 processInfo->setSvmLink (createSvmLink (adrLoc)) ; 00315 00316 _idToProcessTable.insert ( make_pair ( adrLoc,processInfo ) ) ; 00317 00318 processInfo->getSvmLink ()->getOutgoingBuffer().insertTimeStamp ( initialSimulationDate ) ; 00319 00320 // send the name of the process (in the Svm) to the created process 00321 processInfo->getSvmLink ()->getOutgoingBuffer()<<(*i).first ; 00322 00323 processInfo->getSvmLink ()->getOutgoingBuffer().send (PvmMessage::SiteName) ; 00324 00325 //prepare processSiteIds for the next step 00326 processSiteIds.push_back ( processInfo->getSvmLink ()->getTID() ) ; 00327 00328 #ifdef _DEBUGPVMMESS 00329 cerr<<typeid((*i).first).name()<<endl; 00330 cerr<<(*i).first<<" of id "<<processInfo->getSvmLink ()->getTID()<<endl; 00331 #endif 00332 } 00333 00334 00335 // Encodage des attributs Svm de chaque processus 00336 PvmMulticastMessage nameToIdMessage ( processSiteIds ) ; 00337 nameToIdMessage.insertTimeStamp ( initialSimulationDate ) ; 00338 00339 for (i = _processTable->begin () ; i != _processTable->end () ; i ++) 00340 { 00341 nameToIdMessage << *((*i).second) ; 00342 } 00343 00344 // Transfert des adresses vers chaque controleur local 00345 nameToIdMessage.send ( PvmMessage::ProcessTable ) ; 00346 00347 serveNameRequestsUntilEnd () ; 00348 00349 } 00350 00351 */ 00352 00354 00355 // aspects fonctionnement secondaire 00356 //**************************************************** 00357 // RK 1302 : Suppression du nom du pss 00358 00359 void Svm::createDistributedSimulation(const Date & initialSimulationDate ) 00360 { 00361 #ifdef _DEBUGPVMEXEC 00362 cerr << "Svm::createDistributedSimulation : creating the distributed simulation" << endl ; 00363 #endif 00364 _siteName = _masterSiteName ; 00365 00367 00368 NameToPointerMap<Process>::iterator i ; 00369 00370 for (i = _processTable->begin () ; i != _processTable->end () ; ++i ) 00371 { 00372 addNewWorkstation ( (*i).second->getHostMachineName () ) ; 00373 } 00374 00375 00376 // creation des processus locaux 00377 #ifdef _DEBUGPVMEXEC 00378 cerr << "Creation des processus locaux..................." << endl ; 00379 #endif 00380 Process * processInfo ; 00381 int adrLoc ; 00382 00383 // std::vector<int> processSiteIds ; 00384 00385 for (i = _processTable->begin () ; i != _processTable->end () ; i ++) 00386 { 00387 processInfo = (*i).second ; 00388 00389 // spawn a new process on the specified workstation 00390 adrLoc = spawnProcess (processInfo) ; 00391 00392 cout<<"my TID is : "<<adrLoc<<endl ; 00393 00394 // create a SvmLink, add it to processInfo 00395 processInfo->setSvmLink (createSvmLink (adrLoc)) ; 00396 00397 _idToProcessTable.insert ( make_pair ( adrLoc,processInfo ) ) ; 00398 00399 processInfo->getSvmLink ()->getOutgoingBuffer().insertTimeStamp ( initialSimulationDate ) ; 00400 00401 // send the name of the process (in the Svm) to the created process 00402 processInfo->getSvmLink ()->getOutgoingBuffer()<<(*i).first ; 00403 00404 processInfo->getSvmLink ()->getOutgoingBuffer().send (PvmMessage::SiteName) ; 00405 00406 //prepare processSiteIds for the next step 00407 processSiteIds.push_back ( processInfo->getSvmLink ()->getTID() ) ; 00408 00409 #ifdef _DEBUGPVMMESS 00410 cerr<<typeid((*i).first).name()<<endl; 00411 cerr<<(*i).first<<" of id "<<processInfo->getSvmLink ()->getTID()<<endl; 00412 #endif 00413 } 00414 00415 00416 // Encodage des attributs Svm de chaque processus 00417 PvmMulticastMessage nameToIdMessage ( processSiteIds ) ; 00418 nameToIdMessage.insertTimeStamp ( initialSimulationDate ) ; 00419 00420 for (i = _processTable->begin () ; i != _processTable->end () ; i ++) 00421 { 00422 nameToIdMessage << *((*i).second) ; 00423 } 00424 00425 // Transfert des adresses vers chaque controleur local 00426 nameToIdMessage.send ( PvmMessage::ProcessTable ) ; 00427 00428 serveNameRequestsUntilEnd () ; 00429 00430 } 00431 00432 00433 00434 void Svm::serveNameRequestsUntilEnd (void) 00435 { 00436 bool serving = true ; 00437 00438 //convert the actual name Server to a PvmCentralNameServer 00439 PvmCentralNameServer * nameServer = new PvmCentralNameServer ( *Name::getNameServer() ) ; 00440 Name::setNameServer (nameServer ) ; 00441 00442 00443 PvmIncomingMessage receiveBuffer ; 00444 std::pair<PvmMessage::MessageTag, int> receivedRequest ; 00445 00446 while ( serving ) 00447 { 00448 00449 receivedRequest = waitForAnyRequests ( receiveBuffer ) ; 00450 00451 switch (receivedRequest.first) 00452 { 00453 case PvmMessage::EndOfSimulation : 00454 { 00455 const Date & endDate (receiveBuffer.getMessageDate() ); 00456 Name endedProcess ; 00457 receiveBuffer>>endedProcess ; 00458 cerr<<endedProcess<<" finished at "<<endDate<<endl; 00459 _processTable->erase ( endedProcess ) ; 00460 if ( _processTable->size() == 0 ) 00461 { 00462 disconnectFromDistributedSimulation ( endDate ) ; 00463 serving = false ; 00464 } 00465 } 00466 break; 00467 case PvmMessage::NameServiceGetId : 00468 { 00469 int stringLength ; 00470 receiveBuffer>>stringLength ; 00471 char * stringId = new char[stringLength] ; 00472 receiveBuffer>>stringId ; 00473 assert ( stringId [stringLength - 1] == 0 ) ; 00474 OMTRACEID("PVMSVM", 00475 "Requested id for string "<<stringId<<"| of length "<<stringLength 00476 <<" from "<< hex << receivedRequest.second << dec); 00477 if ( stringLength ==1) 00478 { 00479 OMTRACE ("Ask for Id of " << stringId <<"|"); 00480 Name::idType id = nameServer->getIdentifier( stringId ) ; 00481 OMTRACE ("id=" << id <<"|"); 00482 OMTRACE ("StringAssociatedTo= " << nameServer->getStringAssociatedTo (id) <<"|"); 00483 } 00484 Name::idType id = nameServer->getIdentifier( stringId ) ; 00485 OMTRACEID("PVMSVM"," sending "<<id<< "|"); 00486 delete [] stringId ; 00487 pvm_initsend( PvmSvm::pvmDataEncoding ) ; 00488 pvm_pklong (&id, 1, 1); 00489 pvm_send (receivedRequest.second,PvmMessage::NameServiceReturnId) ; 00490 } 00491 break; 00492 case PvmMessage::NameServiceGetString: 00493 { 00494 Name::idType id ; 00495 receiveBuffer>>id ; 00496 OMTRACEID("PVMSVM", 00497 "Requested String for id "<<id 00498 <<"| from "<< hex << receivedRequest.second<< dec ); 00499 std::string resultingString( nameServer->getStringAssociatedTo (id) ) ; 00500 OMTRACEID("PVMSVM"," sending "<<resultingString<<"|"); 00501 pvm_initsend( PvmSvm::pvmDataEncoding ) ; 00502 pvm_pkstr( const_cast<char *>( resultingString.c_str() ) ) ; 00503 pvm_send (receivedRequest.second,PvmMessage::NameServiceReturnString) ; 00504 } 00505 break; 00506 case PvmMessage::NameServiceVerifyLocalNameServer: 00507 { 00508 nameServer->verifyCompatibilityWithLocalNameServer (receivedRequest.second, receiveBuffer) ; 00509 } 00510 break; 00511 //added by Chadi 00512 case PvmMessage::addNewSite: 00513 { 00514 addNewSiteToSimulation(receiveBuffer.getMessageDate()); 00515 } 00516 break; 00517 //end added 00518 default: 00519 OMTRACEID("PVMSVM","Svm::serveNameRequestsUntilEnd() : unexpected request " 00520 <<receivedRequest.first); 00521 //an unexpected request is received 00523 // switch (Controller::warningLevel) 00524 // { 00525 // case Controller::AllWarnings : 00526 // cerr<<"Svm::serveNameRequestsUntilEnd() : unexpected request " 00527 // <<receivedRequest.first<<endl; 00528 // break ; 00529 // case Controller::FatalWarnings : 00530 // cerr<<"Svm::serveNameRequestsUntilEnd() : unexpected request" 00531 // <<receivedRequest.first<<endl; 00532 // exit (2) ; 00533 // break; 00534 // default://nothing to do 00535 // break; 00536 // } 00538 break; 00539 } 00540 } 00541 } 00543 void Svm::addNewSiteRequest (const Date & date) 00544 { 00545 PvmUnicastMessage mess ( getParentSiteId () ) ; 00546 mess.insertTimeStamp ( date ) ; 00547 mess.send ( PvmMessage::addNewSite ) ; 00548 } 00550 00551 00552 void Svm::connectToDistributedSimulation () 00553 { 00554 //TDTD 00555 //#ifdef _DEBUGPVMEXEC 00556 cerr<<"Svm::connectToDistributedSimulation (): connecting to the distributed simulation"<<endl; 00557 //#endif 00558 00559 PvmIncomingMessage * np = NULL ; 00560 00561 // connect to the global controller 00562 _linkToCentralSite = createSvmLink ( getParentSiteId () ) ; 00563 00564 // join the group of slave processes 00565 joinSvmGroup (_slaveGroupName) ; 00566 00567 // create a distributed version of the nameServer 00568 PvmNameServer * pvmNameServer = new PvmNameServer ( getParentSiteId() , *Name::getNameServer() ) ; 00569 00570 Name::setNameServer ( pvmNameServer ) ; 00571 00572 // find out the name of this site 00573 np = & _linkToCentralSite->waitForMessage (PvmMessage::SiteName) ; 00574 *np >> _siteName ; 00575 npp = np ; 00576 00577 // cout<<"my site name is _siteName: "<<_siteName<<endl; 00578 // NameToPointerMap<Process>::iterator j ; 00579 // for (j = _processTable->begin () ; j != _processTable->end () ; j ++) 00580 // { 00581 // cout<<"connect: _processTable contient: "<<(*j).second->getProcessName()<<endl; 00582 // } 00583 00584 00585 00586 // find out the adresses of the other sites 00587 // - extract the timestamp of the communication 00588 _linkToCentralSite->waitForMessage (PvmMessage::ProcessTable) ; 00589 00590 // - extract, for each process, it's adress in the virtual machine and create a link 00591 NameToPointerMap<Process>::iterator i ; 00592 for (i = _processTable->begin () ; i != _processTable->end () ; i ++) 00593 { 00594 ++_numberOfSlaves ; //we add ourselves, but as _numberOfSlaves is initialised to 0, it's ok 00595 00596 // Remarque : creer un canal sur soi-meme n'est pas genant, 00597 // on met juste a jour les tids dans la table des pss 00598 00599 Process * p = (*i).second ; 00600 00601 // creation d'un canal sans destinataire 00602 p->setSvmLink (createSvmLink ()) ; 00603 00604 // maj du destinataire 00605 *np >> *p ; 00606 00607 _idToProcessTable.insert ( make_pair ( p->getSvmLink()->getTID(),p ) ) ; 00608 00609 cout<<"le TID du process ajoute est : "<< p->getSvmLink()->getTID()<<" et son nom est :"<<p->getProcessName()<<endl; 00610 } 00611 00612 00613 //wait for all other sites to be connected to the distributed simulation. 00614 //avoids sending messages to sites still connected or worse, not yet connected 00615 syncDistributedSites () ; 00616 cout<<"EEEE"<<endl; 00617 } 00618 00619 //----------------------------------------------------------------------- 00620 00621 void Svm::disconnectFromDistributedSimulation (const Date & endOfSimulationDate ) 00622 { 00623 #ifdef _DEBUGPVMEXEC 00624 cout <<"Svm::disconnectFromDistributedSimulation"<< endl ; 00625 #endif 00626 int recepient = getParentSiteId () ; 00627 00628 if ( recepient != 0 ) 00629 // a local PvmController has finished. Notify the central controller 00630 { 00631 PvmUnicastMessage endMessage (recepient) ; 00632 00633 endMessage.insertTimeStamp ( endOfSimulationDate ) ; 00634 00635 endMessage << getSiteName () ; 00636 00637 endMessage.send ( PvmMessage::EndOfSimulation ) ; 00638 } 00639 else 00640 //the central controller is deconnecting. Notify all local controllers 00641 { 00642 std::vector<int> processSiteIds ; 00643 for (NameToPointerMap<Process>::iterator i = _processTable->begin () ; 00644 i != _processTable-> end () ; 00645 i ++) 00646 { 00647 processSiteIds.push_back ( (i->second)->getSvmLink ()->getTID() ) ; 00648 } 00649 PvmMulticastMessage endOfSimulationMessage ( processSiteIds ) ; 00650 00651 endOfSimulationMessage.insertTimeStamp ( endOfSimulationDate ) ; 00652 00653 endOfSimulationMessage << getSiteName () ; 00654 00655 endOfSimulationMessage.send ( PvmMessage::EndOfSimulation ) ; 00656 } 00657 } 00658 00659 00660 //----------------------------------------------------------------------- 00661 00662 00663 void Svm::broadcast (PvmMessage::MessageTag tag, PvmMulticastMessage * mess) 00664 { 00665 //the first implementation, commented, dies in pvm with an unexpected error 00666 //I haven't had time to trace the problem, so the second implementation is used 00667 00668 /* first implementation 00669 initBeforeMessagePacking () ; 00670 00671 if ( mess != NULL ) 00672 { 00673 mess->packMessage() ; 00674 } 00675 else 00676 { 00677 mess = new PvmOutgoingMessage () ; 00678 mess->insertTimeStamp(0) ; 00679 *mess << "dummy"; 00680 mess->packMessage() ; 00681 } 00682 00683 broadcastToGroup ( slaveGroup, tag ) ; 00684 */ 00685 00686 #ifdef _DEBUGPVMMESS 00687 cerr<<"Svm::broadcast ("<<tag<<","<<mess<<") ; "<<endl; 00688 #endif 00689 00690 NameToPointerMap<Process>::iterator processIter ; 00691 00692 if (mess == NULL ) 00693 { 00694 std::vector<int> distantRecepients ; 00695 for (processIter = _processTable->begin () ; processIter != _processTable->end () ; ++processIter) 00696 { 00697 if (_siteName != (*processIter).second->getProcessName ()) 00698 { 00699 // On connait alors le canal a observer 00700 distantRecepients.push_back ( (*processIter).second->getSvmLink ()->getTID () ) ; 00701 } 00702 } 00703 00705 mess = new PvmMulticastMessage ( distantRecepients ) ; 00706 mess->insertTimeStamp(0) ; 00707 } 00708 00709 mess -> send ( tag ) ; 00710 00711 } 00712 00713 00714 00715 void Svm::synchronizeOn ( PvmController & parsingController, PvmMessage::MessageTag tag ) 00716 { 00717 broadcast ( tag ) ; 00718 #ifdef _DEBUGPVMSYNCHRO 00719 cerr<< "Svm::waitForAllProcesses("<<tag<<") "<<_numberOfSlaves <<endl; 00720 usleep(1000000) ; 00721 #endif 00722 00723 int numberOfReadyProcesses = 1 ; //this process has entered the barrier 00724 00725 while ( numberOfReadyProcesses != _numberOfSlaves ) 00726 { 00727 00728 // here, we ignore the results, because we suppose one process will only send tag once during synchronization 00729 waitForAnswerToBlockingRequest ( parsingController, tag ) ; 00730 00731 numberOfReadyProcesses++ ; 00732 00733 #ifdef _DEBUGPVMMESS 00734 cerr<< "Svm::exitSoftBarrier numberToEnterBarrier: "<<numberOfReadyProcesses<<endl; 00735 usleep(1000000) ; 00736 #endif 00737 00738 } 00739 } 00740 00741 //----------------------------------------------------------------------- 00742 00743 void Svm::syncDistributedSites() 00744 { 00745 groupBarrier( _slaveGroupName, _numberOfSlaves ) ; 00746 } 00747 00748 //----------------------------------------------------------------------- 00749 00750 SvmLink * Svm::getLinkToProcessNamed( const Name & processName ) 00751 { 00752 // check that we know this process 00753 if( _processTable->find( processName ) != _processTable->end() ) 00754 { 00755 Process *pss = _processTable->getObjectOfIndex( processName ) ; 00756 return pss->getSvmLink() ; 00757 } 00758 else 00759 { 00761 // #ifdef _USESSTREAM 00762 // ostringstream o ; 00763 // o << "Svm::getLinkToProcessNamed : Impossible de trouver le processus " 00764 // << processName << " afin de recuperer son canal !" ; 00765 // Controller::warning (o.str(), Controller::SomeWarnings ) ; 00766 // #else 00767 // ostrstream o ; 00768 // o << "Svm::getLinkToProcessNamed : Impossible de trouver le processus " 00769 // << processName << " afin de recuperer son canal !" ; 00770 // o.put ('\0') ; 00771 // Controller::warning (o.str(), Controller::SomeWarnings ) ; 00772 // delete o.str() ; 00773 // #endif 00775 return 0 ; 00776 } 00777 } 00778 00779 //----------------------------------------------------------------------- 00780 00781 Process * Svm::getProcessDescriptorNamed( const Name & processName ) 00782 { 00783 // check that we know this process 00784 if( _processTable->find( processName ) != _processTable->end() ) 00785 { 00786 Process *pss = _processTable->getObjectOfIndex( processName ) ; 00787 return pss ; 00788 } 00789 else 00790 { 00792 // #ifdef _USESSTREAM 00793 // ostringstream o ; 00794 // o << "Svm::getProcessDescriptorNamed: process " << processName << " unknown !" ; 00795 // Controller::warning (o.str(), Controller::SomeWarnings ) ; 00796 // #else 00797 // ostrstream o ; 00798 // o << "Svm::getProcessDescriptorNamed: process " << processName << " unknown !" ; 00799 // o.put('\0') ; 00800 // Controller::warning (o.str(), Controller::SomeWarnings ) ; 00801 // delete o.str() ; 00802 // #endif 00804 return 0 ; 00805 } 00806 } 00807 00808 //----------------------------------------------------------------------- 00809 00810 00811 //-----------------chadi ajout 2/6--------------------------- 00812 00813 // on cherche dans la liste des deconnectes si le processus existe ou pas 00814 00815 Process * Svm::testIfDisconnectedProcessNamed( const Name & processName ) 00816 { 00817 // check that we know this process as a deconnected process 00818 if( _disconnectedTable->find( processName ) != _disconnectedTable->end() ) 00819 { 00820 Process *pss = _disconnectedTable->getObjectOfIndex( processName ) ; 00821 return pss ; 00822 } 00823 else 00824 { 00825 return 0 ; 00826 } 00827 } 00828 00829 //-------------------fin chadi ajout 2/6-------------------------- 00830 00831 00832 00833 00834 00835 00836 void Svm::processReceivedMessages (PvmController & parsingController, const PvmMessage::MessageTag tag) 00837 { 00838 #ifdef _DEBUGPVMMESS 00839 cerr << "Svm::processReceivedMessages"<<endl; 00840 #endif 00841 NameToPointerMap<Process>::iterator pOMKs ; // Iterateur pour parcours tableau 00842 SvmLink * canal ; // Canal vers le precedent processus 00843 PvmIncomingMessage * messageRecu = NULL ; 00844 for (pOMKs = _processTable->begin () ; 00845 pOMKs != _processTable->end () ; 00846 pOMKs ++) 00847 { 00848 if (_siteName != (*pOMKs).second->getProcessName ()) 00849 { 00850 // On connait alors le canal a observer 00851 canal = (*pOMKs).second->getSvmLink () ; 00852 #ifdef _DEBUGPVMMESS 00853 cout << "Svm::processReceivedMessages from " 00854 << (*pOMKs).second->getProcessName () << " tag : " 00855 << tag << endl ; 00856 //canal->affiche () ; 00857 #endif 00858 // On fait une reception non bloquante 00859 messageRecu = canal->testForMessage (tag) ; 00860 while ( messageRecu->hasMessage () ) 00861 { 00862 #ifdef _DEBUGPVMMESS 00863 cerr << "Svm::processReceivedMessages : message received of size " <<messageRecu->getSize()<< endl; 00864 #endif 00865 #ifdef _DEBUGPVMMESS 00866 cerr << "Svm::processReceivedMessages: extracted emission date: " <<messageRecu->getMessageDate()<< endl; 00867 #endif 00868 //cerr << "Svm::processReceivedMessages : date " << dateEmission << endl ; 00869 // On envoie le message et l'objet au controleur local 00870 parsingController.parseSynchronisationMessage (messageRecu) ; 00871 messageRecu = canal->testForMessage (tag) ; 00872 } 00873 } 00874 } 00875 } 00876 00877 //----------------------------------------------------------------------- 00878 00879 void Svm::waitAndProcessMessages (PvmController & parsingController, 00880 const PvmMessage::MessageTag tag) 00881 { 00882 #ifdef _DEBUGPVMMESS 00883 cerr<<"Svm::waitAndProcessMessages tag "<<tag<<endl<<"Processus : "<<endl; 00884 NameToPointerMap<Process>::iterator pOMK ; 00885 for (pOMK = _processTable->begin () ; pOMK != _processTable->end () ; pOMK ++) { 00886 cerr<<(*pOMK).second->getProcessName (); 00887 } 00888 cerr<<endl; 00889 #endif 00890 NameToPointerMap<Process>::iterator pOMKs ; // Iterateur pour parcours tableau 00891 SvmLink * canal ; // Canal vers le precedent processus 00892 PvmIncomingMessage * messageRecu ; 00893 #ifdef _DEBUGPVMMESS 00894 cerr<<"Svm::waitAndProcessMessages écoute de tous les processus"<<endl; 00895 #endif 00896 for (pOMKs = _processTable->begin () ; pOMKs!= _processTable->end () ; pOMKs ++) { 00897 #ifdef _DEBUGPVMMESS 00898 cerr<<"Svm::waitAndProcessMessages écoute d'un de plus : "<<(*pOMKs).second->getProcessName ()<<endl; 00899 #endif 00900 if (_siteName != (*pOMKs).second->getProcessName ()) { 00901 // On connait alors le canal a observer 00902 canal = (*pOMKs).second->getSvmLink () ; 00903 #ifdef _DEBUGPVMMESS 00904 cout << "OMKmSvm::waitAndProcessMessages sur " 00905 << (*pOMKs).second->getProcessName () << " tag : " 00906 << tag << endl ; 00907 canal->printDebuggingInformation () ; 00908 #endif 00909 // On recoit le message en bloquant 00910 messageRecu = & canal->waitForMessage (tag) ; 00911 #ifdef _DEBUGPVMMESS 00912 cerr << "Svm::waitAndProcessMessages : message received"<< endl ; 00913 #endif 00914 // On envoie le message et l'objet au controleur local 00915 parsingController.parseSynchronisationMessage (messageRecu) ; 00916 } 00917 } 00918 } 00919 00920 //----------------------------------------------------------------------- 00921 00922 void Svm::waitForMessage (PvmController & parsingController, const PvmMessage::MessageTag tag) 00923 { 00924 NameToPointerMap<Process>::iterator pOMKs ; // Iterateur pour parcours tableau 00925 SvmLink * canal ; // Canal vers le precedent processus 00926 PvmIncomingMessage * messageRecu = NULL ; 00927 bool rienRecu=true; 00928 while (rienRecu) {//on ne teste pas la nullite du message, 00929 //car en cas de message ne contenant que la date, on le vide ! 00930 for (pOMKs = _processTable->begin () ; pOMKs!= _processTable->end () ; pOMKs ++) { 00931 if (_siteName != (*pOMKs).second->getProcessName ()) { 00932 // On connait alors le canal a observer 00933 canal = (*pOMKs).second->getSvmLink () ; 00934 #ifdef _DEBUGPVMMESS 00935 cout << "OMKmSvm::waitForMessage sur " 00936 << (*pOMKs).second->getProcessName () << " tag : " 00937 << tag << endl ; 00938 canal->printDebuggingInformation () ; 00939 #endif 00940 // On recoit le message en bloquant 00941 messageRecu = canal->testForMessage (tag) ; 00942 // On envoie le message et l'objet au controleur local 00943 if (messageRecu->hasMessage() ) 00944 { 00945 #ifdef _DEBUGPVMMESS 00946 cerr << "Svm::waitForMessage : message received" << endl ; 00947 #endif 00948 parsingController.parseSynchronisationMessage (messageRecu) ; 00949 rienRecu=false; 00950 } 00951 } 00952 } 00953 messageRecu = _linkToCentralSite->testForMessage (tag) ; 00954 //cerr << "Svm::waitForMessage : message recu : " << *messageRecu << endl ; 00955 if (messageRecu->hasMessage() ) 00956 { 00957 #ifdef _DEBUGPVMMESS 00958 cerr << "Svm::waitForMessage : message received" << endl ; 00959 #endif 00960 parsingController.parseSynchronisationMessage (messageRecu) ; 00961 rienRecu=false; 00962 } 00963 } 00964 } 00965 00966 //------------------------------------------------------ 00967 /* 00968 void Svm::flushDisconnectedSitesOutgoingBuffer() 00969 { 00970 for ( NameToPointerMap<Process>::iterator prIt = _disconnectedTable->begin () ; 00971 prIt != _disconnectedTable->end () ; 00972 ++prIt 00973 ) 00974 { 00975 prIt->second->getSvmLink ()->getOutgoingBuffer().flushCurrentBuffer(); 00976 } 00977 } 00978 */ 00979 //------------------------------------------------------ 00980 00982 00983 00984 // void Svm::synchroniseReceiveAndProcessMessages (PvmController & parsingController, const PvmMessage::MessageTag tag) 00985 // { 00986 // NameToPointerMap<Process> aRecevoir ; 00987 // static int _mostAncientDate = 10000 ; 00988 00989 // //First make a list of processes that we need to receive values from 00990 // for ( NameToPointerMap<Process>::iterator pOMKsE = _processTable->begin () ; 00991 // pOMKsE != _processTable->end () ; 00992 // pOMKsE ++) 00993 // { 00994 // if ( (pOMKsE->first != _siteName) && 00995 // ( pOMKsE->second->getDateOfLastMessage () + 00996 // pOMKsE->second->getPeriod () + 00997 // _synchronisationLatency < parsingController.getSimulatedDate ())) 00998 // { 00999 // if (pOMKsE->second->getDateOfLastMessage () < _mostAncientDate) 01000 // { 01001 // _mostAncientDate = pOMKsE->second->getDateOfLastMessage (); 01002 // } 01003 // _disconnectedTable->addObjectWithIndex (pOMKsE->first, pOMKsE->second) ; 01004 // #ifdef _DEBUGPVMMESS 01005 // cerr << "Svm::synchroniseReceiveAndProcessMessages: trying reception from: " 01006 // << pOMKsE->first << endl ; 01007 // #endif 01008 // } 01009 // } 01010 01011 // PvmIncomingMessage receiveBuffer ; 01012 // PvmMessage::MessageTag receivedRequest ; 01013 // Process * receivingProcess ; 01014 01015 // struct timeval start_timer,end_timer; 01016 // gettimeofday(&start_timer, NULL); 01017 // int time_in_ms = 0; 01018 // int _simDate = parsingController.getSimulatedDate () ; 01019 // while ( _mostAncientDate + time_in_ms < _simDate ) 01020 // { 01021 // for ( NameToPointerMap<Process>::iterator processIterator = _disconnectedTable->begin () ; 01022 // processIterator != _disconnectedTable->end () ; 01023 // ++processIterator 01024 // ) 01025 // { 01026 01027 // //this new implementation doesn't do blocking receives, as some processes could send urgent requests 01028 // receivingProcess = processIterator->second ; 01029 01030 // receivedRequest = receivingProcess->getSvmLink ()->testForAnyMessage ( receiveBuffer ) ; 01031 01032 // if ( receiveBuffer.hasMessage() ) 01033 // { 01034 01035 // switch (receivedRequest) 01036 // { 01037 // case PvmMessage::SynchronisationMessage : 01038 // { 01039 01040 // //have the controller process the message 01041 // parsingController.parseSynchronisationMessage ( &receiveBuffer ) ; 01042 01043 // receivingProcess->setDateOfLastMessage ( receiveBuffer.getMessageDate () ) ; 01044 01045 // //an additionnal test could be performed here to make sure correct date of last message has happened 01046 // _disconnectedTable->erase ( receivingProcess->getProcessName() ) ; 01047 // // } 01048 01049 // } 01050 // break ; 01051 // case PvmMessage::MirrorNeedsInitialValues : 01052 // { 01053 // parsingController.sendInitialValuesToMirror ( receiveBuffer ) ; 01054 // } 01055 // break ; 01056 // case PvmMessage::InitialValuesForMirror : 01057 // { 01058 // //have the controller process the message 01059 // parsingController.parseSynchronisationMessage ( &receiveBuffer ) ; 01060 01061 // } 01062 // break ; 01063 // case PvmMessage::LocalInitSuccessfull : 01064 // case PvmMessage::ProcessTable : 01065 // case PvmMessage::EndOfSimulation : 01066 // case PvmMessage::EnterBarrier : 01067 // case PvmMessage::SiteName : 01068 // case PvmMessage::ExitBarrier : 01069 // case PvmMessage::NameServiceGetId : 01070 // case PvmMessage::NameServiceGetString : 01071 // case PvmMessage::NameServiceReturnId : 01072 // case PvmMessage::NameServiceReturnString : 01073 // case PvmMessage::NameServiceVerifyLocalNameServer : 01074 // case PvmMessage::NameServiceVerifyResult : 01075 // default: 01076 // { 01077 // Controller::warning ("Svm::waitForAnswerToBlockingRequest(): received unexpected message", 01078 // Controller::AllWarnings ) ; 01079 // } 01080 // } 01081 // } 01082 01083 01084 // } //end for 01085 01086 // gettimeofday(&end_timer, NULL); 01087 01088 // time_in_ms = ( (end_timer.tv_sec-start_timer.tv_sec) * 1000000 + 01089 // (end_timer.tv_usec-start_timer.tv_usec) )/1000 ; 01090 01091 // // cout<<"la valeur de dif en millisec : "<<time_in_ms<<endl; 01092 // // cout<<"la valeur de start est : "<<start<<endl; 01093 // // cout<<"la valeur de end est : "<<end<<endl; 01094 01095 // } //end while 01096 01097 // } 01098 01099 01101 01102 01103 void Svm::relaxedSynchroniseReceiveAndProcessMessages (PvmController & parsingController, const PvmMessage::MessageTag tag) { 01104 01105 PvmIncomingMessage receiveBuffer ; 01106 PvmMessage::MessageTag receivedRequest ; 01107 Process * receivingProcess ; 01108 static Name visualProcessA ("visualProcessA") ; 01109 static Name visualProcessB ("visualProcessB") ; 01110 static Name visualProcessC ("visualProcessC") ; 01111 static Name visualProcessD ("visualProcessD") ; 01112 01113 //std::cerr <<"Svm:: le controleur est sur le processus : " << _siteName << std::endl ; 01114 // if (_siteName == visualProcessB) { // simulation d'un pb de latence réseau 01115 // int sommeil = random () % 15000 ; 01116 // usleep (sommeil) ; 01117 // //std::cerr <<"ralentissement de : " << sommeil << std::endl ; 01118 // } else if (_siteName == visualProcessC) { // simulation d'un pb de latence réseau 01119 // int sommeil = random () % 30000 ; 01120 // usleep (sommeil) ; 01121 // //std::cerr <<"ralentissement de : " << sommeil << std::endl ; 01122 // } else if (_siteName == visualProcessD) { // simulation d'un pb de latence réseau 01123 // int sommeil = random () % 45000 ; 01124 // usleep (sommeil) ; 01125 // //std::cerr <<"ralentissement de : " << sommeil << std::endl ; 01126 // } 01127 01128 01129 // if (! _disconnectedTable->empty ()) { 01130 01131 // first, we try only once to receive something from the disconnected processes 01132 NameToPointerMap<Process> copyOfDisconnected ; 01133 for (NameToPointerMap<Process>::iterator prIt = _disconnectedTable->begin () ; 01134 prIt != _disconnectedTable->end () ; 01135 ++ prIt) { 01136 copyOfDisconnected.addObjectWithIndex (prIt->first, prIt->second) ; 01137 } 01138 for (NameToPointerMap<Process>::iterator prIt = copyOfDisconnected.begin () ; 01139 prIt != copyOfDisconnected.end () ; 01140 ++ prIt) { 01141 if (prIt->first != _siteName) { 01142 receivingProcess = prIt->second ; 01143 receivedRequest = receivingProcess->getSvmLink ()->testForAnyMessage (receiveBuffer) ; 01144 if (receiveBuffer.hasMessage ()) { 01145 // this process has become active again, we put it in the list of processes that we need to receive values from 01146 // TDTD ajout de trace pour debug 01147 //std::cerr << " 1 buffer reçu : " << receivedRequest << " à date " << receiveBuffer.getMessageDate ()<< " de " << receivingProcess->getProcessName() << std::endl ; 01148 // TDTD essai de mise en commentaire pour voir... c'est normalement fait plus loin, avec un test... 01149 //_activeProcessTable->addObjectWithIndex (prIt->first, prIt->second) ; 01150 // fin TDTD 01151 // TDTD : essai pour voir... 01152 _activeProcessTable->addObjectWithIndex (prIt->first, prIt->second) ; 01153 _disconnectedTable->erase (prIt->second->getProcessName ()) ; 01154 //TDTDTD enlever le processus de la map _disconnectedProcessusMap 01155 _disconnectedProcessusMap.erase (prIt->first) ; 01156 01157 // fin TDTD 01158 switch (receivedRequest) { 01159 case PvmMessage::SynchronisationMessage : { 01160 // cout<<"parsingController.getSimulatedDate() = "<<parsingController.getSimulatedDate()<<endl; 01161 parsingController.parseSynchronisationMessage (&receiveBuffer) ; 01162 receivingProcess->setDateOfLastMessage (receiveBuffer.getMessageDate ()) ; 01163 } 01164 break ; 01165 case PvmMessage::MirrorNeedsInitialValues : { 01166 parsingController.sendInitialValuesToMirror (receiveBuffer) ; 01167 } 01168 break ; 01169 case PvmMessage::InitialValuesForMirror : { 01170 //have the controller process the message 01171 parsingController.parseSynchronisationMessage (&receiveBuffer) ; 01172 } 01173 break ; 01174 case PvmMessage::LocalInitSuccessfull : 01175 // case PvmMessage::pingMessage : 01176 // { 01177 // cout<<"i got the ping message"<<endl; 01178 // _activeProcessTable->addObjectWithIndex(prIt->first, processIterator->second) ; 01179 // _disconnectedTable->erase( prIt->second->getProcessName() ); 01180 // } 01181 break; 01182 case PvmMessage::ProcessTable : 01183 case PvmMessage::EndOfSimulation : 01184 case PvmMessage::EnterBarrier : 01185 case PvmMessage::SiteName : 01186 case PvmMessage::ExitBarrier : 01187 case PvmMessage::NameServiceGetId : 01188 case PvmMessage::NameServiceGetString : 01189 case PvmMessage::NameServiceReturnId : 01190 case PvmMessage::NameServiceReturnString : 01191 case PvmMessage::NameServiceVerifyLocalNameServer : 01192 case PvmMessage::NameServiceVerifyResult : 01193 default: { 01194 std::cerr << "Svm::waitForAnswerToBlockingRequest(): received unexpected message" << std::endl ; 01195 } 01196 } //end switch 01197 } else { 01198 // TDTD 01199 // if the proces was declared active, it is now declared inactive, not to send to him datas that will cost time to read 01200 // so that it will still be late... 01201 if (_activeProcessTable->find (prIt->second->getProcessName ()) != _activeProcessTable->end ()) { 01202 _activeProcessTable->erase (prIt->second->getProcessName ()) ; 01203 } 01204 // fin TDTD 01205 } 01206 } 01207 } //end for 01208 // cout<<"size of _activeProcessTable : "<<_activeProcessTable->size()<<endl; 01209 // } 01210 //second, make a list of processes that we need to receive values from 01211 for (NameToPointerMap<Process>::iterator pOMKsE = _activeProcessTable->begin () ; 01212 pOMKsE != _activeProcessTable->end () ; 01213 pOMKsE ++) { 01214 if ((pOMKsE->first != _siteName) && 01215 (pOMKsE->second->getDateOfLastMessage () + 01216 pOMKsE->second->getPeriod () + 01217 _synchronisationLatency < parsingController.getSimulatedDate ())) { 01218 aRecevoir.addObjectWithIndex (pOMKsE->first, pOMKsE->second) ; 01219 #ifdef _DEBUGPVMMESS 01220 cerr << "Svm::synchroniseReceiveAndProcessMessages: trying reception from: " 01221 << pOMKsE->first << endl ; 01222 #endif 01223 } 01224 } 01225 01226 struct timeval start_timer,end_timer; 01227 gettimeofday(&start_timer, NULL); 01228 double time_in_ms = 0; 01229 01230 // third, try to receive something from the active processes 01231 // valeur initiale proposée par Chadi : 200 ms 01232 while ((time_in_ms < _synchronisationTimeOut) && ! aRecevoir.empty ()) { // TDTD : valeur délicate à estimer pour le cas multi processus sur une seule machine 01233 // TDTD : ici je remplace processTable par activeProcesstable 01234 // for (NameToPointerMap<Process>::iterator processIterator = _processTable->begin () ; 01235 // processIterator != _processTable->end () ; 01236 // ++processIterator) { 01237 for (NameToPointerMap<Process>::iterator processIterator = _activeProcessTable->begin () ; 01238 processIterator != _activeProcessTable->end () ; 01239 ++processIterator) { 01240 // fin TDTD 01241 //this new implementation doesn't do blocking receives, as some processes could send urgent requests 01242 receivingProcess = processIterator->second ; 01243 receivedRequest = receivingProcess->getSvmLink ()->testForAnyMessage (receiveBuffer) ; 01244 if (receiveBuffer.hasMessage ()) { 01245 // TDTD ajout de trace pour debug 01246 //std::cerr << " 2 buffer reçu : " << receivedRequest << " à date " << receiveBuffer.getMessageDate ()<< " de " << receivingProcess->getProcessName() << std::endl ; 01247 //---------------------------chadi ajout 4/6 --------------------- 01248 // aRecevoir.erase ( receivingProcess->getProcessName() ) ; 01249 // start = clock(); 01250 01251 // Date dlm = receivingProcess->getDateOfLastMessage(); 01252 // receivingProcess->setDateOfLastMessage( receiveBuffer.getMessageDate()) ; 01253 // cout<<"date of last message est = "<<dlm<<" et le message ke je viens de recevoir est daté: "<<receivingProcess->getDateOfLastMessage()<<"donc la difference est de l'ordre de :"<<receivingProcess->getDateOfLastMessage()-dlm<<endl; 01254 // TDTD : je mets ce code plus haut 01255 // if (_disconnectedTable->find (processIterator->second->getProcessName ()) != _disconnectedTable->end ()) { 01256 // _activeProcessTable->addObjectWithIndex (processIterator->first, processIterator->second) ; 01257 // _disconnectedTable->erase (processIterator->second->getProcessName ()) ; 01258 // //std::cout<<" j'enleve de la liste de deconnecte le processus suivant: "<<processIterator->first<<std::endl; 01259 // _simStep = 0 ; 01260 // } 01261 // fin TDTD 01262 //-----------------------fin chadi ajout 4/6 ------------------------- 01263 // if (receiveBuffer.getMessageDate()+90 < parsingController.getSimulatedDate()) 01264 // { } 01265 // else { 01266 switch (receivedRequest) { 01267 case PvmMessage::SynchronisationMessage : { 01268 // cout<<"parsingController.getSimulatedDate() = "<<parsingController.getSimulatedDate()<<endl; 01269 // To filter ancient synchro messages 01270 parsingController.parseSynchronisationMessage (&receiveBuffer) ; 01271 receivingProcess->setDateOfLastMessage (receiveBuffer.getMessageDate ()) ; 01272 if (parsingController.getSimulatedDate () < receiveBuffer.getMessageDate () + 30) { 01273 // on a déjà reçu des messages assez récents de ce processus, on n'en demande pas d'autres 01274 aRecevoir.erase (receivingProcess->getProcessName ()) ; 01275 } 01276 } 01277 break ; 01278 case PvmMessage::MirrorNeedsInitialValues : { 01279 parsingController.sendInitialValuesToMirror (receiveBuffer) ; 01280 } break ; 01281 case PvmMessage::InitialValuesForMirror : { 01282 //have the controller process the message 01283 parsingController.parseSynchronisationMessage (&receiveBuffer) ; 01284 } break ; 01285 case PvmMessage::LocalInitSuccessfull : 01286 // case PvmMessage::pingMessage : 01287 // { 01288 // cout<<"i got the ping message"<<endl; 01289 // _activeProcessTable->addObjectWithIndex(processIterator->first, processIterator->second) ; 01290 // _disconnectedTable->erase( processIterator->second->getProcessName() ); 01291 // } 01292 break ; 01293 case PvmMessage::ProcessTable : 01294 case PvmMessage::EndOfSimulation : 01295 case PvmMessage::EnterBarrier : 01296 case PvmMessage::SiteName : 01297 case PvmMessage::ExitBarrier : 01298 case PvmMessage::NameServiceGetId : 01299 case PvmMessage::NameServiceGetString : 01300 case PvmMessage::NameServiceReturnId : 01301 case PvmMessage::NameServiceReturnString : 01302 case PvmMessage::NameServiceVerifyLocalNameServer : 01303 case PvmMessage::NameServiceVerifyResult : 01304 default: { 01305 std::cerr << "Svm::waitForAnswerToBlockingRequest(): received unexpected message" << std::endl ; 01306 } 01307 } //end switch 01308 } 01309 } //end for 01310 01311 gettimeofday (&end_timer, NULL) ; 01312 time_in_ms = (end_timer.tv_sec-start_timer.tv_sec) * 1000.0 + 01313 (end_timer.tv_usec-start_timer.tv_usec) /1000.0 ; 01314 01315 // TDTD : ajout d'un arrêt pour pouvoir travailler avec PVM sur une seule machine et plusieurs process 01316 if (_yieldNeeded) { 01317 #ifdef _MSC_VER 01318 Sleep( 0 ) ; 01319 #else 01320 usleep (0) ; 01321 #endif 01322 } 01323 } //end while 01324 01325 // fourth, remove the inactive processes from the _activeProcessTable and place them in the _disconnectedTable 01326 if (! aRecevoir.empty ()) { 01327 for (NameToPointerMap<Process>::iterator processIterator = aRecevoir.begin () ; 01328 processIterator != aRecevoir.end () ; 01329 ++ processIterator) { 01330 _disconnectedTable->addObjectWithIndex (processIterator->first, processIterator->second) ; 01331 // TDTDTD ajouter le nom du processus dans la map _disconnectedProcessusMap avec la dernière date simulée arrivée 01332 _disconnectedProcessusMap.insert (std::pair<Name, Type::IntType> (processIterator->first, 01333 (int)processIterator->second->getDateOfLastMessage ())) ; 01334 // TDTD : je ne comprends pas trop pourquoi mais ici les lignes suivantes doivent être commentéee... 01335 // if (_activeProcessTable->find (processIterator->second->getProcessName ()) != _disconnectedTable->end ()) { 01336 // _activeProcessTable->erase (processIterator->second->getProcessName ()) ; 01337 // } 01338 // fin TDTD : en fait c'était à cause de l'envoi aux seuls non inactifs (et non pas non en retard), voir la méthode send...withTag... 01339 //std::cout << "je rajoute le processus suivant a la table des deconnectes " << processIterator->second->getProcessName () << std::endl ; 01340 } 01341 aRecevoir.clear () ; 01342 removeProcessProlog () ; 01343 } 01344 // TDTD ajout trace pour debug 01345 //std::cout<<"longueur de la table des deconnectés : "<<_disconnectedTable->size ()<<std::endl; 01346 #ifdef _DEBUGPVMMESS 01347 cerr << "Svm::synchroniseReceiveAndProcessMessages : on n'est pas bloque la " << endl ; 01348 #endif 01349 01350 } 01351 01352 01353 01354 01355 /* TDTD : version intermédiaire de sauvegarde 01356 void Svm::relaxedSynchroniseReceiveAndProcessMessages (PvmController & parsingController, const PvmMessage::MessageTag tag) 01357 { 01358 // if (firstTime) { 01359 // firstTime = false; 01360 // synchroniseReceiveAndProcessMessages( parsingController,tag); 01361 // } 01362 // else { 01363 // NameToPointerMap<Process> aRecevoir ; 01364 01365 01366 PvmIncomingMessage receiveBuffer ; 01367 PvmMessage::MessageTag receivedRequest ; 01368 Process * receivingProcess ; 01369 01370 01371 01373 01374 if (!_disconnectedTable->empty()) 01375 { 01376 01377 for ( NameToPointerMap<Process>::iterator prIt = _disconnectedTable->begin () ; 01378 prIt != _disconnectedTable->end () ; 01379 ++prIt 01380 ) 01381 { 01382 if (prIt->first != _siteName) 01383 { 01384 receivingProcess = prIt->second ; 01385 receivedRequest = receivingProcess->getSvmLink ()->testForAnyMessage ( receiveBuffer ) ; 01386 if ( receiveBuffer.hasMessage() ) 01387 { 01388 // TDTD ajout de trace pour debug 01389 //std::cerr << " 1 buffer reçu : " << receivedRequest << " à date " << receiveBuffer.getMessageDate ()<< " de " << receivingProcess->getProcessName() << std::endl ; 01390 //cout<<"TRUC BIDULE"<<endl; 01391 _activeProcessTable->addObjectWithIndex(prIt->first, prIt->second) ; 01392 aRecevoir.addObjectWithIndex (prIt->first, prIt->second) ; 01393 // _disconnectedTable->erase( prIt->second->getProcessName() ); 01394 01395 //cout<<"DEBUT: je rajoute a la liste des ACTIVES le processus suivant: "<<prIt->first<<endl; 01396 //TDTD : à vérifier avec l'algorithme... 01397 01398 01399 // cout<<"DEBUT: j'enleve de la liste de deconnecte le processus suivant: "<<prIt->first<<endl; 01400 switch (receivedRequest) 01401 { 01402 case PvmMessage::SynchronisationMessage : 01403 { 01404 // To filter ancient synchro messages 01405 // if (receiveBuffer.getMessageDate()+90 < parsingController.getSimulatedDate()) 01406 // { receivingProcess->setDateOfLastMessage ( receiveBuffer.getMessageDate () ) ; } 01407 // else { 01408 01409 //have the controller process the message 01410 parsingController.parseSynchronisationMessage ( &receiveBuffer ) ; 01411 01412 receivingProcess->setDateOfLastMessage ( receiveBuffer.getMessageDate () ) ; 01413 01414 //an additionnal test could be performed here to make sure correct date of last message has happened 01415 // aRecevoir.erase ( receivingProcess->getProcessName() ) ; 01416 // } 01417 01418 } 01419 break ; 01420 case PvmMessage::MirrorNeedsInitialValues : 01421 { 01422 parsingController.sendInitialValuesToMirror ( receiveBuffer ) ; 01423 } 01424 break ; 01425 case PvmMessage::InitialValuesForMirror : 01426 { 01427 //have the controller process the message 01428 parsingController.parseSynchronisationMessage ( &receiveBuffer ) ; 01429 01430 } 01431 break ; 01432 case PvmMessage::LocalInitSuccessfull : 01433 // case PvmMessage::pingMessage : 01434 // { 01435 // cout<<"i got the ping message"<<endl; 01436 // _activeProcessTable->addObjectWithIndex(prIt->first, processIterator->second) ; 01437 // _disconnectedTable->erase( prIt->second->getProcessName() ); 01438 // } 01439 break; 01440 case PvmMessage::ProcessTable : 01441 case PvmMessage::EndOfSimulation : 01442 case PvmMessage::EnterBarrier : 01443 case PvmMessage::SiteName : 01444 case PvmMessage::ExitBarrier : 01445 case PvmMessage::NameServiceGetId : 01446 case PvmMessage::NameServiceGetString : 01447 case PvmMessage::NameServiceReturnId : 01448 case PvmMessage::NameServiceReturnString : 01449 case PvmMessage::NameServiceVerifyLocalNameServer : 01450 case PvmMessage::NameServiceVerifyResult : 01451 default: 01452 { 01453 std::cerr << "Svm::waitForAnswerToBlockingRequest(): received unexpected message" << std::endl ; 01454 } 01455 } //end switch 01456 01457 01458 // aRecevoir.addObjectWithIndex(prIt->first, prIt->second) ; 01459 // _disconnectedTable->erase( prIt->second->getProcessName() ); 01460 // cout<<"je rajoute a la liste des actives le processus suivant: "<<prIt->first<<endl; 01461 // Date dlm = receivingProcess->getDateOfLastMessage(); 01462 // receivingProcess->setDateOfLastMessage( receiveBuffer.getMessageDate()) ; 01463 // cout<<"date of last message est = "<<dlm<<" et le message ke je viens de recevoir est daté: "<<receivingProcess->getDateOfLastMessage()<<"donc la difference est de l'ordre de :"<<receivingProcess->getDateOfLastMessage()-dlm<<endl; 01464 // while ( dlm+50 < receivingProcess->getDateOfLastMessage() && receiveBuffer.hasMessage() ) 01465 // { 01466 // receivedRequest = receivingProcess->getSvmLink ()->testForAnyMessage ( receiveBuffer ) ; 01467 // if (receiveBuffer.hasMessage()) 01468 // { 01469 // receivingProcess->setDateOfLastMessage( receiveBuffer.getMessageDate()) ; 01470 // dlm = receivingProcess->getDateOfLastMessage(); 01471 // } 01472 // receivedRequest = receivingProcess->getSvmLink ()->testForAnyMessage ( receiveBuffer ) ; 01473 // } 01474 } 01475 01476 } 01477 } //end for 01478 01479 } 01480 01481 01482 01484 01485 // cout<<"size of _activeProcessTable : "<<_activeProcessTable->size()<<endl; 01486 01487 //First make a list of processes that we need to receive values from 01488 for ( NameToPointerMap<Process>::iterator pOMKsE = _activeProcessTable->begin () ; 01489 pOMKsE != _activeProcessTable->end () ; 01490 pOMKsE ++) 01491 { 01492 if ( (pOMKsE->first != _siteName) && 01493 ( pOMKsE->second->getDateOfLastMessage () + 01494 pOMKsE->second->getPeriod () + 01495 _synchronisationLatency < parsingController.getSimulatedDate ())) 01496 { 01497 aRecevoir.addObjectWithIndex (pOMKsE->first, pOMKsE->second) ; 01498 #ifdef _DEBUGPVMMESS 01499 cerr << "Svm::synchroniseReceiveAndProcessMessages: trying reception from: " 01500 << pOMKsE->first << endl ; 01501 #endif 01502 } 01503 } 01504 01505 struct timeval start_timer,end_timer; 01506 gettimeofday(&start_timer, NULL); 01507 double time_in_ms = 0; 01508 01509 01510 while ( (time_in_ms < 200) && !aRecevoir.empty() ) 01511 { 01512 for ( NameToPointerMap<Process>::iterator processIterator = _processTable->begin () ; 01513 processIterator != _processTable->end () ; 01514 ++processIterator 01515 ) 01516 { 01517 01518 //this new implementation doesn't do blocking receives, as some processes could send urgent requests 01519 receivingProcess = processIterator->second ; 01520 01521 receivedRequest = receivingProcess->getSvmLink ()->testForAnyMessage ( receiveBuffer ) ; 01522 01523 if ( receiveBuffer.hasMessage() ) 01524 { 01525 // TDTD ajout de trace pour debug 01526 //std::cerr << " 2 buffer reçu : " << receivedRequest << " à date " << receiveBuffer.getMessageDate ()<< " de " << receivingProcess->getProcessName() << std::endl ; 01527 //---------------------------chadi ajout 4/6 --------------------- 01528 // aRecevoir.erase ( receivingProcess->getProcessName() ) ; 01529 // start = clock(); 01530 01531 // Date dlm = receivingProcess->getDateOfLastMessage(); 01532 // receivingProcess->setDateOfLastMessage( receiveBuffer.getMessageDate()) ; 01533 // cout<<"date of last message est = "<<dlm<<" et le message ke je viens de recevoir est daté: "<<receivingProcess->getDateOfLastMessage()<<"donc la difference est de l'ordre de :"<<receivingProcess->getDateOfLastMessage()-dlm<<endl; 01534 01535 if (_disconnectedTable->find(processIterator->second->getProcessName())!=_disconnectedTable->end()) 01536 { 01537 _activeProcessTable->addObjectWithIndex(processIterator->first, processIterator->second) ; 01538 _disconnectedTable->erase( processIterator->second->getProcessName() ); 01539 // cout<<" j'enleve de la liste de deconnecte le processus suivant: "<<processIterator->first<<endl; 01540 _simStep = 0 ; 01541 } 01542 //-----------------------fin chadi ajout 4/6 ------------------------- 01543 // if (receiveBuffer.getMessageDate()+90 < parsingController.getSimulatedDate()) 01544 // { } 01545 // else { 01546 switch (receivedRequest) 01547 { 01548 case PvmMessage::SynchronisationMessage : 01549 { 01550 // cout<<"parsingController.getSimulatedDate() = "<<parsingController.getSimulatedDate()<<endl; 01551 // To filter ancient synchro messages 01552 if (receiveBuffer.getMessageDate()+30 < parsingController.getSimulatedDate()) 01553 { //cout<<"receiveBuffer.getMessageDate()+90 < parsingController.getSimulatedDate()"<<endl; //receivingProcess->setDateOfLastMessage ( receiveBuffer.getMessageDate () ) ; } 01554 01555 // TDTD : ajout d'un manque cruel : le traitement du message en retard... 01556 parsingController.parseSynchronisationMessage ( &receiveBuffer ) ; 01557 // fin TDTD 01558 receivingProcess->setDateOfLastMessage ( receiveBuffer.getMessageDate () ) ; } 01559 01560 else { 01561 01562 //have the controller process the message 01563 parsingController.parseSynchronisationMessage ( &receiveBuffer ) ; 01564 01565 receivingProcess->setDateOfLastMessage ( receiveBuffer.getMessageDate () ) ; 01566 01567 //an additionnal test could be performed here to make sure correct date of last message has happened 01568 aRecevoir.erase ( receivingProcess->getProcessName() ) ; 01569 // cout<<"Je viens d'effacer le process de aRecevoir ..."<<endl; 01570 } 01571 01572 } 01573 break ; 01574 case PvmMessage::MirrorNeedsInitialValues : 01575 { 01576 parsingController.sendInitialValuesToMirror ( receiveBuffer ) ; 01577 } 01578 break ; 01579 case PvmMessage::InitialValuesForMirror : 01580 { 01581 //have the controller process the message 01582 parsingController.parseSynchronisationMessage ( &receiveBuffer ) ; 01583 01584 } 01585 break ; 01586 case PvmMessage::LocalInitSuccessfull : 01587 // case PvmMessage::pingMessage : 01588 // { 01589 // cout<<"i got the ping message"<<endl; 01590 // _activeProcessTable->addObjectWithIndex(processIterator->first, processIterator->second) ; 01591 // _disconnectedTable->erase( processIterator->second->getProcessName() ); 01592 // } 01593 break; 01594 case PvmMessage::ProcessTable : 01595 case PvmMessage::EndOfSimulation : 01596 case PvmMessage::EnterBarrier : 01597 case PvmMessage::SiteName : 01598 case PvmMessage::ExitBarrier : 01599 case PvmMessage::NameServiceGetId : 01600 case PvmMessage::NameServiceGetString : 01601 case PvmMessage::NameServiceReturnId : 01602 case PvmMessage::NameServiceReturnString : 01603 case PvmMessage::NameServiceVerifyLocalNameServer : 01604 case PvmMessage::NameServiceVerifyResult : 01605 default: 01606 { 01607 std::cerr << "Svm::waitForAnswerToBlockingRequest(): received unexpected message" << std::endl ; 01608 } 01609 } //end switch 01610 // } 01611 // receivedRequest = receivingProcess->getSvmLink ()->testForAnyMessage ( receiveBuffer ) ; 01612 } 01613 01614 01615 } //end for 01616 01617 gettimeofday(&end_timer, NULL); 01618 01619 time_in_ms = (end_timer.tv_sec-start_timer.tv_sec) * 1000.0 + 01620 (end_timer.tv_usec-start_timer.tv_usec) /1000.0 ; 01621 // TDTD : ajout d'un arrêt pour pouvoir travailler avec PVM sur une seule machine et plusieurs process 01622 usleep (1) ; 01623 01624 } //end while 01625 01626 if ( !aRecevoir.empty() ) 01627 { 01628 for ( NameToPointerMap<Process>::iterator processIterator = aRecevoir.begin () ; 01629 processIterator != aRecevoir.end () ; 01630 ++processIterator 01631 ) 01632 { 01633 _disconnectedTable->addObjectWithIndex(processIterator->first, processIterator->second ); 01634 01635 // _temporaryTable->addObjectWithIndex(processIterator->first, processIterator->second ); 01636 01637 // aRecevoir.erase ( processIterator->first ) ; 01638 // _activeProcessTable->erase( processIterator->first ) ; 01639 01640 std::cout<<"je rajoute le processus suivant a la table des deconnectes "<<processIterator->second->getProcessName()<<std::endl; 01641 } 01642 aRecevoir.clear(); 01643 removeProcessProlog () ; 01644 } 01645 // TDTD ajout trace pour debug 01646 std::cout<<"longueur de la table des deconnectés : "<<_disconnectedTable->size ()<<std::endl; 01647 01648 01649 #ifdef _DEBUGPVMMESS 01650 cerr << "Svm::synchroniseReceiveAndProcessMessages : on n'est pas bloque la " << endl ; 01651 #endif 01652 01653 // } //else firsttime 01654 01655 } 01656 */ 01657 01658 void Svm::removeProcessProlog () 01659 { 01660 _simStep++ ; 01661 //cout<<"_simStep est egale :"<<_simStep<<endl; 01662 if (_simStep == 2) { 01663 removeProcessFromActiveTable() ; 01664 // cout<<"_simStep est egale (2) :"<<_simStep<<endl; 01665 _simStep = 0 ; 01666 } 01667 } 01668 01669 void Svm::removeProcessFromActiveTable() 01670 { 01671 for ( NameToPointerMap<Process>::iterator processIterator = _disconnectedTable->begin () ; 01672 processIterator != _disconnectedTable->end () ; 01673 ++processIterator 01674 ) 01675 { 01676 _activeProcessTable->erase( processIterator->first ) ; 01677 // cout<<"le process "<<processIterator->first<<" a ete vire de la liste active"<<endl; 01678 } 01679 // _temporaryTable->clear() ; 01680 // _simStep = 0 ; 01681 } 01682 01684 01685 01686 // void Svm::synchroniseReceiveAndProcessMessages (PvmController & parsingController, const PvmMessage::MessageTag tag) 01687 // { 01688 // NameToPointerMap<Process> aRecevoir ; 01689 01690 01691 // //First make a list of processes that we need to receive values from// //First make a list of processes that we need to receive values from 01692 01693 // for ( NameToPointerMap<Process>::iterator pOMKsE = _processTable->begin () ; 01694 // pOMKsE != _processTable->end () ; 01695 // pOMKsE ++) 01696 // { 01697 // if ( (pOMKsE->first != _siteName) && 01698 // ( pOMKsE->second->getDateOfLastMessage () + 01699 // pOMKsE->second->getPeriod () + 01700 // _synchronisationLatency < parsingController.getSimulatedDate ())) 01701 // { 01702 // aRecevoir.addObjectWithIndex (pOMKsE->first, pOMKsE->second) ; 01703 // #ifdef _DEBUGPVMMESS 01704 // cerr << "Svm::synchroniseReceiveAndProcessMessages: trying reception from: " 01705 // << pOMKsE->first << endl ; 01706 // #endif 01707 // } 01708 // } 01709 01710 // PvmIncomingMessage receiveBuffer ; 01711 // PvmMessage::MessageTag receivedRequest ; 01712 // Process * receivingProcess ; 01713 01714 // struct timeval start_timer,end_timer; 01715 // gettimeofday(&start_timer, NULL); 01716 // int time_in_ms = 0; 01717 // while ( (time_in_ms < 100) && !aRecevoir.empty() ) 01718 // { 01719 // for ( NameToPointerMap<Process>::iterator processIterator = aRecevoir.begin () ; 01720 // processIterator != aRecevoir.end () ; 01721 // ++processIterator 01722 // ) 01723 // { 01724 01725 // //this new implementation doesn't do blocking receives, as some processes could send urgent requests 01726 // receivingProcess = processIterator->second ; 01727 01728 // receivedRequest = receivingProcess->getSvmLink ()->testForAnyMessage ( receiveBuffer ) ; 01729 01730 // if ( receiveBuffer.hasMessage() ) 01731 // { 01732 01733 // //---------------------------chadi ajout 4/6 --------------------- 01734 // // aRecevoir.erase ( receivingProcess->getProcessName() ) ; 01735 // // start = clock(); 01736 01737 // if (_disconnectedTable->find(processIterator->second->getProcessName())!=_disconnectedTable->end()) 01738 // { 01739 // // removeFromDisconnectedTable(processIterator); 01740 // _disconnectedTable->erase( processIterator->second->getProcessName() ); 01741 // cout<<" j'enleve de la liste de deconnecte le processus suivant: "<<processIterator->first<<endl; 01742 // } 01743 // //-----------------------fin chadi ajout 4/6 ------------------------- 01744 // if (receiveBuffer.getMessageDate()+90 < parsingController.getSimulatedDate()) 01745 // { } 01746 // else { 01747 // switch (receivedRequest) 01748 // { 01749 // case PvmMessage::SynchronisationMessage : 01750 // { 01751 // // To filter ancient synchro messages 01752 // if (receiveBuffer.getMessageDate()+50 < parsingController.getSimulatedDate()) 01753 // { } 01754 // else { 01755 01756 // //have the controller process the message 01757 // parsingController.parseSynchronisationMessage ( &receiveBuffer ) ; 01758 01759 // receivingProcess->setDateOfLastMessage ( receiveBuffer.getMessageDate () ) ; 01760 01761 // //an additionnal test could be performed here to make sure correct date of last message has happened 01762 // aRecevoir.erase ( receivingProcess->getProcessName() ) ; 01763 // } 01764 01765 // } 01766 // break ; 01767 // case PvmMessage::MirrorNeedsInitialValues : 01768 // { 01769 // parsingController.sendInitialValuesToMirror ( receiveBuffer ) ; 01770 // } 01771 // break ; 01772 // case PvmMessage::InitialValuesForMirror : 01773 // { 01774 // //have the controller process the message 01775 // parsingController.parseSynchronisationMessage ( &receiveBuffer ) ; 01776 01777 // } 01778 // break ; 01779 // case PvmMessage::LocalInitSuccessfull : 01780 // case PvmMessage::ProcessTable : 01781 // case PvmMessage::EndOfSimulation : 01782 // case PvmMessage::EnterBarrier : 01783 // case PvmMessage::SiteName : 01784 // case PvmMessage::ExitBarrier : 01785 // case PvmMessage::NameServiceGetId : 01786 // case PvmMessage::NameServiceGetString : 01787 // case PvmMessage::NameServiceReturnId : 01788 // case PvmMessage::NameServiceReturnString : 01789 // case PvmMessage::NameServiceVerifyLocalNameServer : 01790 // case PvmMessage::NameServiceVerifyResult : 01791 // default: 01792 // { 01793 // Controller::warning ("Svm::waitForAnswerToBlockingRequest(): received unexpected message", 01794 // Controller::AllWarnings ) ; 01795 // } 01796 // } //end switch 01797 // } 01798 // } 01799 01800 01801 // } //end for 01802 01803 // gettimeofday(&end_timer, NULL); 01804 01805 // time_in_ms = ( (end_timer.tv_sec-start_timer.tv_sec) * 1000000 + 01806 // (end_timer.tv_usec-start_timer.tv_usec) )/1000 ; 01807 01808 // // cout<<"la valeur de dif en millisec : "<<time_in_ms<<endl; 01809 // // cout<<"la valeur de start est : "<<start<<endl; 01810 // // cout<<"la valeur de end est : "<<end<<endl; 01811 01812 // } //end while 01813 01814 // if (!aRecevoir.empty()) 01815 // { 01816 // for ( NameToPointerMap<Process>::iterator processIterator = aRecevoir.begin () ; 01817 // processIterator != aRecevoir.end () ; 01818 // ++processIterator 01819 // ) 01820 // { 01821 // _disconnectedTable->addObjectWithIndex(processIterator->first, processIterator->second ); 01822 // aRecevoir.erase ( processIterator->first ) ; 01823 // cout<<"je rajoute le processus suivant a la table des deconnectes "<<processIterator->second->getProcessName()<<endl; 01824 // } 01825 // } 01826 01827 // #ifdef _DEBUGPVMMESS 01828 // cerr << "Svm::synchroniseReceiveAndProcessMessages : on n'est pas bloque la " << endl ; 01829 // #endif 01830 // } 01831 01832 01833 01834 01835 01836 01838 //----------------------------------------------------------------------- 01839 01840 // void Svm::synchroniseReceiveAndProcessMessages (PvmController & parsingController, const PvmMessage::MessageTag tag) 01841 // { 01842 // NameToPointerMap<Process> aRecevoir ; 01843 01844 01845 // //First make a list of processes that we need to receive values from 01846 // for ( NameToPointerMap<Process>::iterator pOMKsE = _processTable->begin () ; 01847 // pOMKsE != _processTable->end () ; 01848 // pOMKsE ++) 01849 // { 01850 // if ( (pOMKsE->first != _siteName) && 01851 // ( pOMKsE->second->getDateOfLastMessage () + 01852 // pOMKsE->second->getPeriod () + 01853 // _synchronisationLatency < parsingController.getSimulatedDate ())) // || (pOMKsE->second->getIterationFlag()>=1000) )) 01854 // //------- chadi ajout 3/6-------)&& !(disconnectedProcess(pOMKsE))) 01855 // { 01856 // aRecevoir.addObjectWithIndex (pOMKsE->first, pOMKsE->second) ; 01857 // #ifdef _DEBUGPVMMESS 01858 // cerr << "Svm::synchroniseReceiveAndProcessMessages: trying reception from: " 01859 // << pOMKsE->first << endl ; 01860 // #endif 01861 // } 01862 // } 01863 01864 01865 // PvmIncomingMessage receiveBuffer ; 01866 // PvmMessage::MessageTag receivedRequest ; 01867 // Process * receivingProcess ; 01868 01869 // while (! aRecevoir.empty () ) 01870 // { 01871 01872 // for ( NameToPointerMap<Process>::iterator processIterator = aRecevoir.begin () ; 01873 // processIterator != aRecevoir.end () ; 01874 // ++processIterator 01875 // ) 01876 // { 01877 // //*****************chadi: faux deconnexion d'un site ***************** 01878 // // if ( processIterator->second->getProcessName()== "visualProcessC" ) 01879 // // { 01880 // // addToDisconnectedTable( processIterator ); 01881 // // _disconnectedTable->addObjectWithIndex(processIterator->first,processIterator->second ); 01882 // // } 01883 // //************************************************************* 01884 // //this new implementation doesn't do blocking receives, as some processes could send urgent requests 01885 // receivingProcess = processIterator->second ; 01886 01887 // receivedRequest = receivingProcess->getSvmLink ()->testForAnyMessage ( receiveBuffer ) ; 01888 01889 // if ( receiveBuffer.hasMessage() ) 01890 // { 01891 01892 // //---------------------------chadi ajout 4/6 --------------------- 01893 01894 // // cout<<"parsingController.getSimulatedDate = "<<parsingController.getSimulatedDate()<<endl; 01895 // // cout<<"receiveBuffer.getMessageDate = "<<receiveBuffer.getMessageDate()<<" en provenance du processus: "<<processIterator->second->getProcessName()<<endl; 01896 // // cout<<"_synchronisationLatency = "<<_synchronisationLatency<<endl; 01897 // // cout<<"la periode de "<<processIterator->second->getProcessName()<<" est egale a "<< processIterator->second->getPeriod()<<endl; 01898 01899 // // cout<<"difference de temps ou pas : "<<receiveBuffer.getMessageDate() - receiveBuffer.getDateOfLastMessage()<<endl; 01900 01901 // // cout<<"iterateur->first : " <<processIterator->first<<" iterateur->second : "<<processIterator->second<<endl; 01902 01903 // if (_disconnectedTable->find(processIterator->second->getProcessName())!=_disconnectedTable->end()) 01904 // { 01905 // processIterator->second->resetIterationFlag(); 01906 // // aRecevoir.addObjectWithIndex (processIterator->first, processIterator->second) ; 01907 // cout<<"j'enleve de la liste des deconnectes le processus suivant: "<<processIterator->second->getProcessName()<<endl; 01908 // removeFromDisconnectedTable(processIterator); 01909 // } 01910 01911 01912 // //-----------------------fin chadi ajout 4/6 ------------------------- 01913 01914 // switch (receivedRequest) 01915 // { 01916 // case PvmMessage::SynchronisationMessage : 01917 // { 01918 01919 // if (receiveBuffer.getMessageDate()+90 < parsingController.getSimulatedDate()) 01920 // { 01921 // //cout<<"c pas un bon signe ..."<<endl; 01922 // } 01923 // else 01924 // { 01925 01926 // //have the controller process the message 01927 // parsingController.parseSynchronisationMessage ( &receiveBuffer ) ; 01928 01929 // receivingProcess->setDateOfLastMessage ( receiveBuffer.getMessageDate () ) ; 01930 01931 // //an additionnal test could be performed here to make sure correct date of last message has happened 01932 // aRecevoir.erase ( receivingProcess->getProcessName() ) ; 01933 // } 01934 // } 01935 // break ; 01936 // case PvmMessage::MirrorNeedsInitialValues : 01937 // { 01938 // parsingController.sendInitialValuesToMirror ( receiveBuffer ) ; 01939 // } 01940 // break ; 01941 // case PvmMessage::InitialValuesForMirror : 01942 // { 01943 // //have the controller process the message 01944 // parsingController.parseSynchronisationMessage ( &receiveBuffer ) ; 01945 01946 // } 01947 // break ; 01948 // // case PvmMessage::updateProcessTable : 01949 // { 01950 // cout<<"ProcessTable Will be Updated ....."<<endl; 01951 // _processTable->addObjectWithIndex("visualProcessC", new Process ( "visualProcessTest", "maxipes.irisa.fr" )) ; 01952 // cout<<"ProcessTable Updated Succeffully ....."<<endl; 01953 // ++_numberOfSlaves ; 01954 // } 01955 // break ; 01956 // case PvmMessage::LocalInitSuccessfull : 01957 // case PvmMessage::ProcessTable : 01958 // case PvmMessage::EndOfSimulation : 01959 // case PvmMessage::EnterBarrier : 01960 // case PvmMessage::SiteName : 01961 // case PvmMessage::ExitBarrier : 01962 // case PvmMessage::NameServiceGetId : 01963 // case PvmMessage::NameServiceGetString : 01964 // case PvmMessage::NameServiceReturnId : 01965 // case PvmMessage::NameServiceReturnString : 01966 // case PvmMessage::NameServiceVerifyLocalNameServer : 01967 // case PvmMessage::NameServiceVerifyResult : 01968 // case PvmMessage::updateProcessTable : 01969 // case PvmMessage::addNewSite : 01970 01971 // default: 01972 // { 01973 // Controller::warning ("Svm::waitForAnswerToBlockingRequest(): received unexpected message", 01974 // Controller::AllWarnings ) ; 01975 // } 01976 // } 01977 01978 // } 01979 // //-------------------------------chadi ajout 5/6 ---------------------- 01980 01981 // else 01982 01983 // { 01984 // // cout<<"nb iteration avant inc = "<<processIterator->second->getIterationFlag()<<endl; 01985 01986 // processIterator->second->incrementeIterationFlag(); 01987 01988 // // cout<<"nb iteration apres inc = "<<processIterator->second->getIterationFlag()<<endl; 01989 01990 // // if ( processIterator->second->disconnectedProcess()) 01991 // if ( processIterator->second->getIterationFlag() >= 200000) 01992 01993 // // if (receivingProcess->getDateOfLastMessage()+50 < parsingController.getSimulatedDate()) 01994 // { 01995 // // addToDisconnectedTable( processIterator ); 01996 // _disconnectedTable->addObjectWithIndex(processIterator->first,processIterator->second ); 01997 // aRecevoir.erase( processIterator->second->getProcessName()); 01998 // // cout<<"je rajoute le processus suivant a la table des deconnectes "<<processIterator->second->getProcessName()<<" car processIterator->second->getIterationFlag()="<<processIterator->second->getIterationFlag()<<endl; 01999 // // cout<<"getDateOfLastMessage()+50 = "<< receivingProcess->getDateOfLastMessage()+50<<" tandis ke parsingController.getSimulatedDate = "<<parsingController.getSimulatedDate()<<endl; 02000 // // cout<<"processIterator->second->getIterationFlag() ="<<processIterator->second->getIterationFlag()<<endl; 02001 // } 02002 // } 02003 02004 // //-----------------------------fin chadi ajout 5/6 ----------------------- 02005 02006 // } //end for 02007 // } //end while 02008 02009 02010 // #ifdef _DEBUGPVMMESS 02011 // cerr << "Svm::synchroniseReceiveAndProcessMessages : on n'est pas bloque la " << endl ; 02012 // #endif 02013 // } 02014 02015 02016 //----------------------------------------------------------------------- 02017 02018 02019 //----------------------- The Original One ------------------------------------------------ 02020 02021 void Svm::synchroniseReceiveAndProcessMessages (PvmController & parsingController, const PvmMessage::MessageTag tag) 02022 { 02023 //cout<<"GGGGG"<<endl; 02024 NameToPointerMap<Process> aRecevoir ; 02025 02026 //First make a list of processes that we need to receive values from 02027 for ( NameToPointerMap<Process>::iterator pOMKsE = _processTable->begin () ; 02028 pOMKsE != _processTable->end () ; 02029 pOMKsE ++) 02030 { 02031 if ( (pOMKsE->first != _siteName) && 02032 ( pOMKsE->second->getDateOfLastMessage () + 02033 pOMKsE->second->getPeriod () + 02034 _synchronisationLatency < parsingController.getSimulatedDate () ) ) 02035 { 02036 aRecevoir.addObjectWithIndex (pOMKsE->first, pOMKsE->second) ; 02037 #ifdef _DEBUGPVMMESS 02038 cerr << "Svm::synchroniseReceiveAndProcessMessages: trying reception from: " 02039 << pOMKsE->first << endl ; 02040 #endif 02041 } 02042 } 02043 02044 02045 PvmIncomingMessage receiveBuffer ; 02046 PvmMessage::MessageTag receivedRequest ; 02047 Process * receivingProcess ; 02048 02049 while (! aRecevoir.empty () ) 02050 { 02051 for ( NameToPointerMap<Process>::iterator processIterator = _processTable->begin () ; 02052 processIterator != _processTable->end () ; 02053 ++processIterator 02054 ) 02055 { 02056 //this new implementation doesn't do blocking receives, as some processes could send urgent requests 02057 receivingProcess = processIterator->second ; 02058 02059 receivedRequest = receivingProcess->getSvmLink ()->testForAnyMessage ( receiveBuffer ) ; 02060 02061 if ( receiveBuffer.hasMessage() ) 02062 { 02063 // TDTD ajout de trace pour debug 02064 //std::cerr << "buffer reçu : " << receivedRequest << " à date " << receiveBuffer.getMessageDate () 02065 // << " de " << receivingProcess->getProcessName() << std::endl ; 02066 switch (receivedRequest) 02067 { 02068 case PvmMessage::SynchronisationMessage : 02069 { 02070 //have the controller process the message 02071 parsingController.parseSynchronisationMessage ( &receiveBuffer ) ; 02072 02073 receivingProcess->setDateOfLastMessage ( receiveBuffer.getMessageDate () ) ; 02074 02075 //an additionnal test could be performed here to make sure correct date of last message has happened 02076 aRecevoir.erase ( receivingProcess->getProcessName() ) ; 02077 } 02078 break ; 02079 case PvmMessage::MirrorNeedsInitialValues : 02080 { 02081 parsingController.sendInitialValuesToMirror ( receiveBuffer ) ; 02082 } 02083 break ; 02084 case PvmMessage::InitialValuesForMirror : 02085 { 02086 //have the controller process the message 02087 parsingController.parseSynchronisationMessage ( &receiveBuffer ) ; 02088 02089 } 02090 break ; 02091 case PvmMessage::LocalInitSuccessfull : 02092 case PvmMessage::ProcessTable : 02093 case PvmMessage::EndOfSimulation : 02094 case PvmMessage::EnterBarrier : 02095 case PvmMessage::SiteName : 02096 case PvmMessage::ExitBarrier : 02097 case PvmMessage::NameServiceGetId : 02098 case PvmMessage::NameServiceGetString : 02099 case PvmMessage::NameServiceReturnId : 02100 case PvmMessage::NameServiceReturnString : 02101 case PvmMessage::NameServiceVerifyLocalNameServer : 02102 case PvmMessage::NameServiceVerifyResult : 02103 default: 02104 { 02106 std::cerr << "Svm::waitForAnswerToBlockingRequest(): received unexpected message" << std::endl ; 02107 /* END CHADI */ 02108 } 02109 } 02110 } 02111 } 02112 // TDTD : ajout d'un arrêt pour pouvoir travailler avec PVM sur une seule machine et plusieurs process 02113 if (_yieldNeeded) { 02114 #ifdef _MSC_VER 02115 Sleep( 0 ) ; 02116 #else 02117 usleep (0) ; 02118 #endif 02119 } 02120 } 02121 02122 02123 #ifdef _DEBUGPVMMESS 02124 cerr << "Svm::synchroniseReceiveAndProcessMessages : on n'est pas bloque la " << endl ; 02125 #endif 02126 } 02127 02128 02129 //----------------------------------------------------------------------------- 02130 02131 02132 02133 void Svm::sendCurrentBuffersWithTag(const PvmMessage::MessageTag tag) 02134 { 02135 NameToPointerMap<Process>::iterator pOMKs ; // Iterateur pour parcours tableau 02136 SvmLink * canal ; // Canal vers le precedent processus 02137 static int nbSteps = 1 ; 02138 #ifdef _DEBUGPVMMESS 02139 cerr<<"Svm::sendCurrentBufferWithTag tag "<<tag<<endl<<"Processus : "<<endl; 02140 NameToPointerMap<Process>::iterator pOMK ; 02141 for (pOMK = _processTable->begin () ; pOMK != _processTable->end () ; pOMK ++) 02142 { 02143 cerr<<(*pOMK).second->getProcessName (); 02144 } 02145 cerr<<endl; 02146 #endif 02147 02148 if (_deadReckoningInterval > 0) { 02149 nbSteps = (nbSteps + 1) % _deadReckoningInterval ; 02150 } 02151 02152 // TDTD on envoie seulement les synchros aux actifs... 02153 for (pOMKs = _processTable->begin () ; pOMKs != _processTable->end () ; pOMKs ++) 02154 //for (pOMKs = _activeProcessTable->begin () ; pOMKs != _activeProcessTable->end () ; pOMKs ++) 02155 // fin TDTD 02156 { 02157 #ifdef _DEBUGPVMMESS 02158 cerr<<"Traitement des emissions vers "<<(*pOMKs).second->getProcessName ()<<endl; 02159 #endif 02160 if (_siteName != (*pOMKs).second->getProcessName ()) 02161 { 02162 02163 // On connait alors le canal a observer 02164 canal = (*pOMKs).second->getSvmLink () ; 02165 02166 if ( tag == PvmMessage::SynchronisationMessage ) 02167 { 02168 (*pOMKs).second->getSvmLink ()->getOutgoingBuffer()<<SynchronisationMessage::endOfSynchronisationFragment ; 02169 02170 } 02171 02172 // On transmet le message du canal 02173 02174 // TDTD ajout pour purger les envois aux inactifs 02175 //std::cerr << "sur site : " << _siteName << " : " ; 02176 if (_activeProcessTable->find ((*pOMKs).second->getProcessName ()) != _activeProcessTable->end ()) { 02177 //std::cerr << "envoi de syncho à actif : " << (*pOMKs).second->getProcessName () ; 02178 canal->sendOutgoingBuffer ( tag ) ; 02179 } else { 02180 if (nbSteps == 0) { // un coup de dead-reckoning ? 02181 //std::cerr << "envoi de syncho à inactif : " << (*pOMKs).second->getProcessName () ; 02182 canal->sendOutgoingBuffer ( tag ) ; 02183 } else { 02184 canal->getOutgoingBuffer().flushCurrentBuffer () ; 02185 //std::cerr << "pas d'envoi de syncho à inactif : " << (*pOMKs).second->getProcessName () ; 02186 } 02187 } 02188 //std::cerr << endl ; 02189 // fin TDTD 02190 #ifdef _DEBUGPVMMESS 02191 cerr << "OMKmSvm::transmission avec tag : " 02192 << tag << endl ; 02193 #endif 02194 } 02195 #ifdef _DEBUGPVMMESS 02196 else 02197 { 02198 cerr << "Rien à faire "<<endl ; 02199 //canal->affiche () ; 02200 } 02201 #endif 02202 } 02203 } 02204 02205 //----------------------------------------------------------------------- 02206 02207 void Svm::waitForMessageFrom (PvmController & parsingController, 02208 const Name & processName, 02209 const PvmMessage::MessageTag tag) 02210 { 02211 SvmLink * canal ; // Canal vers le precedent processus 02212 PvmIncomingMessage * messageRecu = NULL ; 02213 02214 // On recupere le canal a observer 02215 canal = getLinkToProcessNamed (processName) ; 02216 #ifdef _DEBUGPVMMESS 02217 cout << "OMKmSvm::waitForMessageFrom sur " 02218 << processName << " tag : " 02219 << tag << endl ; 02220 canal->printDebuggingInformation() ; 02221 #endif 02222 // On recoit le message en bloquant 02223 messageRecu = & canal->waitForMessage (tag) ; 02224 // On envoie le message et l'objet au controleur local 02225 parsingController.parseSynchronisationMessage (messageRecu) ; 02226 // On libere le message ??? 02227 //delete messageRecu; 02228 } 02229 02230 02231 //----------------------------------------------------------------------- 02233 02234 void Svm::timestampCurrentSendBuffers (const Date & date) 02235 { 02236 NameToPointerMap<Process>::iterator i ; 02237 for (i = _processTable->begin () ; i != _processTable->end () ; i ++) 02238 { 02239 if (_siteName != (*i).second->getProcessName ()) 02240 { 02241 i->second->getSvmLink ()->getOutgoingBuffer ().insertTimeStamp ( date ) ; 02242 } 02243 } 02244 } 02245 02246 02247 const Name & Svm::getSiteName() const 02248 { 02249 return _siteName; 02250 } 02251 02252 //----------------------------------------------------------------------- 02253 02254 Process * Svm::waitForAnswerToBlockingRequest (PvmController & parsingController, PvmMessage::MessageTag tag) 02255 { 02256 // this implementation assumes that only one thread at a time will enter this function 02257 // numberOfThread is an unsafe method to test this 02258 static int numberOfThreads = 0 ; 02259 ++numberOfThreads ; 02260 assert ( numberOfThreads == 1) ; 02261 02262 Process * result ; 02263 02264 PvmIncomingMessage receiveBuffer ; 02265 std::pair<PvmMessage::MessageTag, int> receivedRequest ; 02266 02267 bool serving = true ; 02268 02269 while ( serving ) 02270 { 02271 02272 receivedRequest = waitForAnyRequests ( receiveBuffer ) ; 02273 02274 //find the sender process (result of this member function) 02275 map<int,Process *>::iterator i = _idToProcessTable.find ( receivedRequest.second ) ; 02276 02277 // cout<<"receivedRequest.second est egale : "<<receivedRequest.second<<endl; 02278 // cout<<"receivedRequest.first est egale : "<<receivedRequest.first<<endl; 02279 // the message should come from a known process 02280 assert ( i != _idToProcessTable.end() ) ; 02281 02282 result = i->second ; 02283 02284 switch (receivedRequest.first) 02285 { 02286 case PvmMessage::SynchronisationMessage : 02287 { 02288 //have the controller process the message 02289 parsingController.parseSynchronisationMessage ( &receiveBuffer ) ; 02290 02291 assert ( result != NULL ) ; 02292 02293 result->setDateOfLastMessage ( receiveBuffer.getMessageDate () ) ; 02294 } 02295 break ; 02296 case PvmMessage::MirrorNeedsInitialValues : 02297 { 02298 parsingController.sendInitialValuesToMirror ( receiveBuffer ) ; 02299 } 02300 break ; 02301 case PvmMessage::InitialValuesForMirror : 02302 { 02303 //have the controller process the message 02304 parsingController.parseSynchronisationMessage ( &receiveBuffer ) ; 02305 02306 } 02307 break ; 02308 case PvmMessage::LocalInitSuccessfull : 02309 case PvmMessage::ProcessTable : 02310 case PvmMessage::EndOfSimulation : 02311 case PvmMessage::EnterBarrier : 02312 case PvmMessage::SiteName : 02313 case PvmMessage::ExitBarrier : 02314 case PvmMessage::NameServiceGetId : 02315 case PvmMessage::NameServiceGetString : 02316 case PvmMessage::NameServiceReturnId : 02317 case PvmMessage::NameServiceReturnString : 02318 case PvmMessage::NameServiceVerifyLocalNameServer : 02319 case PvmMessage::NameServiceVerifyResult : 02320 case PvmMessage::updateProcessTable : 02321 case PvmMessage::addNewSite : 02322 default: 02323 { 02328 } 02329 } 02330 02331 serving = ( receivedRequest.first != tag ) ; 02332 02333 } 02334 02335 --numberOfThreads ; 02336 02337 return result ; 02338 } 02339 02340 //------------------------------------------------------------------------------------------ 02341 02342 //----------------chadi ajout 6/6-------------------------------------------------------- 02343 02344 void Svm::addToDisconnectedTable( Name& nameOfProcess , Process* pointerToTheProcess ) 02345 02346 { 02347 //*************************** 02348 // if ( processIterator->second->getProcessName()== "visualProcessC" ) 02349 // { } 02350 // else { 02351 //**************************** 02352 02353 02354 _disconnectedTable->addObjectWithIndex(nameOfProcess, pointerToTheProcess ); 02355 // _everybodyTable->erase(processIterator->second->getProcessName()); 02356 // } 02357 _troubleFlag = true ; 02358 } 02359 02360 void Svm::removeFromDisconnectedTable( NameToPointerMap<Process>::iterator processIterator ) 02361 { 02362 //*************************** 02363 // if ( processIterator->second->getProcessName()== "visualProcessC" ) 02364 // { } 02365 // else { 02366 //**************************** 02367 // _everybodyTable->addObjectWithIndex(processIterator->first,processIterator->second ); 02368 02369 _disconnectedTable->erase( processIterator->second->getProcessName() ); 02370 // } 02371 _troubleFlag = false ; 02372 } 02373 02374 //-----------------------fin chadi ajout 6/6 --------------------------------------------- 02375 02376 02377 bool Svm::disconnectedProcess(Name& zico) 02378 { 02379 if (_disconnectedTable->find(zico)!=_disconnectedTable->end()) 02380 { return true;} 02381 else 02382 return false; 02383 } 02384 02385 //------------------------------------------------------------------------------------------- 02386 02387 bool Svm::getTroubleFlag () 02388 { 02389 if (_troubleFlag == true) 02390 { return true; } 02391 return false; 02392 } 02393 02394 //------------------------------------------------------------------- 02395 02396 bool Svm::disconnectedTableIsEmpty () 02397 { 02398 if ( _disconnectedTable->empty() ) 02399 { 02400 return true; 02401 } 02402 return false; 02403 } 02404 02405 //------------------------------------------------------------------- 02406 02407 bool Svm::reconnexionEstablished () 02408 { 02409 NameToPointerMap<Process>::iterator prIt = _disconnectedTable->begin () ; 02410 02411 // if ( testIfSiteRecovered( (*prIt).second->getHostMachineName() ) ) 02412 if ( testIfSiteRecovered( (*prIt).second ) ) 02413 { 02414 // cout<<" Svm::reconnexionEstablished OK"<<endl; 02415 return true ; 02416 } 02417 02418 else 02419 { 02420 // cout<<" Svm::reconnexionEstablished NOTOK"<<endl; 02421 return false ; 02422 } 02423 02424 } 02425 02426 //----------------------------------------------------------------- 02427 02428 void Svm::sendPingMessage () 02429 { 02430 for ( NameToPointerMap<Process>::iterator prIt = _disconnectedTable->begin () ; 02431 prIt != _disconnectedTable->end () ; 02432 ++prIt 02433 ) 02434 { 02435 ((*prIt).second)->getSvmLink()->getOutgoingBuffer()<<"ping" ; 02436 ((*prIt).second)->getSvmLink()->getOutgoingBuffer().send(PvmMessage::pingMessage) ; 02437 cout<<"message ping est fait "<<endl; 02438 } 02439 } 02440 02441 //----------------------------------------------------------------- 02442 // ajouts TDTD pour gestion plus précise des déconnectés 02443 //----------------------------------------------------------------- 02444 02445 std::list<Name> Svm::getDisconnectedProcessusList () { 02446 _disconnectedProcessusList.clear () ; 02447 for (NameToPointerMap<Process>::iterator prIt = _disconnectedTable->begin () ; 02448 prIt != _disconnectedTable->end () ; 02449 prIt ++) { 02450 _disconnectedProcessusList.push_back ((*prIt).first) ; 02451 } 02452 return _disconnectedProcessusList ; 02453 } 02454 02455 //----------------------------------------------------------------- 02456 02457 std::map<Name, int> Svm::getDisconnectedProcessusMap () { 02458 return _disconnectedProcessusMap ; 02459 } 02460 02461 //----------------------fin chadi ajout 2/3 --------------------------
Documentation generated on Mon Jun 9 11:45:57 2008 |
Generated with doxygen by Dimitri van Heesch , 1997-2007 |