OMKSvm.cxx

Go to the documentation of this file.
00001 /*
00002  * This file is part of openMask © INRIA, CNRS, Universite de Rennes 1 1993-2002, thereinafter the Software
00003  * 
00004  * The Software has been developped within the Siames Project. 
00005  * INRIA, the University of Rennes 1 and CNRS jointly hold intellectual property rights
00006  * 
00007  * The Software has been registered with the Agence pour la Protection des
00008  * Programmes (APP) under registration number IDDN.FR.001.510008.00.S.P.2001.000.41200
00009  *  
00010  * This file may be distributed under the terms of the Q Public License
00011  * version 1.0 as defined by Trolltech AS of Norway and appearing in the file
00012  * LICENSE.QPL included in the packaging of this file.
00013  *
00014  * Licensees holding valid specific licenses issued by INRIA, CNRS or Université de Rennes 1 
00015  * for the software may use this file in accordance with that specific license
00016  *
00017  */
00018 #include <OMKSvm.h>
00019 #include <OMKSvmLink.h>
00020 #include "OMKSynchronisationMessage.h"
00021 #include <OMKPvmMessage.h>
00022 #include <OMKPvmMulticastMessage.h>
00023 #include <OMKPvmUnicastMessage.h>
00024 #include <OMKProcess.h>
00025 #include <OMKPvmController.h>
00026 #include "OMKPvmNameServer.h"
00027 #include "OMKPvmCentralNameServer.h"
00028 #include <pvm3.h>
00029 #include "OMKPvmSvm.h"
00030 #ifndef _MSC_VER
00031 #include <sys/time.h>
00032 #endif
00033 #include "OMKTracer.h"
00034 
00035 using namespace std;
00036 using namespace OMK ;
00037 
00038 std::string Svm::_slaveGroupName = "spawnedControllers";
00039 std::string Svm::_masterSiteName = "OpenMaskMasterDistributedSite" ;
00040 
00041 #ifdef _MSC_VER
00042 int
00043 gettimeofday(struct timeval *t, struct timezone *tzp)
00044 {
00045         struct _timeb timebuffer;
00046 
00047         /* this calls the time and returns sec ** msec */
00048         _ftime( &timebuffer );
00049 
00050         t->tv_sec=timebuffer.time;
00051         t->tv_usec=timebuffer.millitm*1000;
00052         return 1;
00053 }
00054 #endif
00055 
00056 //-----------------------------------------------------------------------
00057 
00058 Svm::Svm (NameToPointerMap<Process> * tab, const Date & latence, const Date & timeOut,
00059                 const int deadReckoningInterval, const bool yieldNeeded) : 
00060 //   _processTable ( tab ),
00061 //   _activeProcessTable (tab ),
00062    _troubleFlag ( false ),
00063    _simStep ( 0 ),
00064    _siteId ( -1 ), //negative values indicates an error
00065    _numberOfSlaves ( 0 ),
00066    _synchronisationLatency (latence),
00067    _synchronisationTimeOut (timeOut),
00068    _deadReckoningInterval (deadReckoningInterval),
00069    _yieldNeeded (yieldNeeded)
00070 {
00071 _processTable = new NameToPointerMap<Process>();
00072    for ( NameToPointerMap<Process>::iterator processIterator = tab->begin () ;
00073             processIterator != tab->end () ;
00074             ++processIterator 
00075             )
00076         {
00077           _processTable->addObjectWithIndex(processIterator->first, processIterator->second );
00078  
00079         }
00080 
00081 //--------------chadi ajout 1/6 ------------------------
00082   _temporaryTable = new  NameToPointerMap<Process>();
00083   _disconnectedTable = new  NameToPointerMap<Process>();
00084   _activeProcessTable = new  NameToPointerMap<Process>();
00085   _disconnectedTable->clear();
00086    for ( NameToPointerMap<Process>::iterator processIterator = _processTable->begin () ;
00087             processIterator != _processTable->end () ;
00088             ++processIterator 
00089             )
00090         {
00091           _activeProcessTable->addObjectWithIndex(processIterator->first, processIterator->second );
00092  
00093         }
00094   
00095    firstTime = true;
00096  aRecevoir.clear();
00097 //--------------fin chadi ajout 1/6 ------------------------
00098 // TDTD ajout pour simuler des défaillances réseau
00099 #ifdef _MSC_VER
00100         srand( getpid() ) ;
00101 #else
00102         srandom (getpid ()) ;
00103 #endif
00104 }
00105 
00106 
00107 Svm::~Svm () 
00108 {
00109 }
00110 
00111 
00112 const Date & Svm::getSynchronisationLatency() 
00113 {
00114   return _synchronisationLatency;
00115 }
00116 
00117 
00118 
00119 void Svm::init (const Date & initialSimulationDate) 
00120 {
00121    if ( getParentSiteId () == 0 )  // wasn't created by a spawn
00122      {
00123        createDistributedSimulation ( initialSimulationDate ) ;
00124      }
00125    else
00126      {
00127        connectToDistributedSimulation () ;
00128      }
00129 }
00130 
00131 
00132 //-----------------------------------------------------------------------
00133 
00134 //***********************  chadi *****************************
00135 
00136 void Svm::addNewSiteToSimulation (const Date & date)
00137 {
00138   dateVar = date ;
00139 
00140  Process * theNewProcessToAdd = new Process ( "visualProcessC", "maxipes.irisa.fr" );
00141 
00142 
00144  
00145  // addNewWorkstation ( "barbux.irisa.fr" ) ;
00146  addNewWorkstation ( theNewProcessToAdd->getHostMachineName () ) ;
00147  
00148  _processTable->addObjectWithIndex ("visualProcessC", theNewProcessToAdd ) ;
00149  
00150  
00151  int adrLoc ;
00152  
00153  int processSiteId ;
00154 
00155  cout<<"New Process will be Spawned .....///"<<endl;
00156  cout<<"theNewProcessToAdd is named : "<<theNewProcessToAdd->getProcessName()<<endl;
00157  
00158  // spawn a new process on the specified workstation
00159  adrLoc = spawnProcess (theNewProcessToAdd) ;
00160 
00161  cout<<"New Process Spawned .....valeur de spawn est egale a : "<<adrLoc<<endl;
00162  
00163  // create a SvmLink, add it to processInfo
00164  theNewProcessToAdd->setSvmLink (createSvmLink (adrLoc)) ; 
00165  
00166  _idToProcessTable.insert ( make_pair ( adrLoc,theNewProcessToAdd ) ) ;
00167  
00168  theNewProcessToAdd->getSvmLink ()->getOutgoingBuffer().insertTimeStamp ( date ) ;
00169 
00170  
00171  // send the name of the process (in the Svm) to the created process
00172  theNewProcessToAdd->getSvmLink ()->getOutgoingBuffer()<<theNewProcessToAdd->getProcessName() ;
00173  
00174  theNewProcessToAdd->getSvmLink ()->getOutgoingBuffer().send (PvmMessage::SiteName) ;
00175  
00176  //prepare processSiteIds for the next step
00177  processSiteId =  theNewProcessToAdd->getSvmLink ()->getTID()  ;      
00178 // processSiteIds.push_back ( theNewProcessToAdd->getSvmLink ()->getTID() ) ;  
00179  
00180   
00181  // Encodage des attributs Svm de chaque processus
00182  PvmUnicastMessage nameToIdMessage ( processSiteId ) ;
00183  nameToIdMessage.insertTimeStamp ( date ) ;
00184 
00186  NameToPointerMap<Process>::iterator i ;
00187  for (i = _processTable->begin () ; i != _processTable->end () ; i ++) 
00188    {
00189      nameToIdMessage << *((*i).second) ;
00190    }
00191  
00192  // Transfert des adresses vers le controleur local du process ajouté
00193  nameToIdMessage.send ( PvmMessage::ProcessTable ) ;
00194  
00195  
00196  // envoyer le processTable Update a tt le monde
00197  PvmMulticastMessage updateProcessTableMessage ( processSiteIds ) ;
00198  updateProcessTableMessage.insertTimeStamp ( date ) ;
00199 
00200  updateProcessTableMessage << *theNewProcessToAdd ;
00201 // updateProcessTableMessage << (theNewProcessToAdd->getProcessName()) ;
00202  cout<<" theNewProcessToAdd->getProcessName() = "<<theNewProcessToAdd->getProcessName()<<endl;
00203 
00204  updateProcessTableMessage.send ( PvmMessage::updateProcessTable ) ;
00205  
00206 
00207  processSiteIds.push_back ( theNewProcessToAdd->getSvmLink ()->getTID() ) ; 
00208  cout<<"Finishing method addNewSiteToSimulation"<<endl;
00209 
00210 }
00211 
00212 void Svm::testIfNewProcessAdded () 
00213 {
00214  PvmIncomingMessage * message = NULL ;
00215  message =  _linkToCentralSite->testIfRequestUpdateProcessTable (PvmMessage::updateProcessTable) ;
00216  if ( message != NULL )
00217    {  
00218      Name zico ;
00219     // *message>>zico ;
00220     // cout<<"le nom de process recu est :  "<<zico<<endl; 
00221      Process* p = new Process  ( "visualProcessC", "maxipes.irisa.fr" ); 
00222 
00223     // Process *p ;             
00224     // *message>> *p ;
00225      _processTable->addObjectWithIndex(p->getProcessName(),p ) ;
00226      ++_numberOfSlaves ;
00227     
00228     // Process * p = (*i).second ;
00229      // creation d'un canal sans destinataire 
00230      p->setSvmLink (createSvmLink ()) ;
00231      // maj du destinataire ?????????????????????????????????
00232      *npp  >> *p ;
00233      
00234      _idToProcessTable.insert ( make_pair ( p->getSvmLink()->getTID(),p ) ) ;
00235      
00236      PvmUnicastMessage initSucMessage ( p->getSvmLink()->getTID() ) ;
00237      initSucMessage.insertTimeStamp ( dateVar ) ;
00238      initSucMessage.send ( PvmMessage::LocalInitSuccessfull ) ;
00239      
00240      NameToPointerMap<Process>::iterator i ;
00241      for (i = _processTable->begin () ; i != _processTable->end () ; i ++) 
00242         {
00243           cout<<"_processTable contient: "<<(*i).second->getProcessName()<<endl; 
00244         }
00245 
00246      syncDistributedSites();  
00247    }
00248 }
00249 
00250 
00251 
00252 //****
00253 //************************************************************
00254 
00256 /*
00257 void Svm::createDistributedSimulation(const Date & initialSimulationDate )
00258 {
00260   Process * miaou = new Process ( "visualProcessTest", "barbux.irisa.fr" );
00261  _processTable->addObjectWithIndex ("visualProcessTest", miaou ) ;
00263 
00264 
00265 #ifdef _DEBUGPVMEXEC
00266    cerr << "Svm::createDistributedSimulation : creating the distributed simulation" << endl ;
00267 #endif
00268    _siteName = _masterSiteName ;
00269 
00271      
00272      NameToPointerMap<Process>::iterator i ;
00273      
00274      for (i = _processTable->begin () ; i != _processTable->end () ; ++i ) 
00275        {
00276          addNewWorkstation ( (*i).second->getHostMachineName () ) ;
00277        }
00278 
00280   //   Process * miaou = new Process ( "visualProcessTest", "barbux.irisa.fr" );
00281                 
00282   //   addNewWorkstation ( "barbux.irisa.fr" ) ;
00283 
00284   //   _processTable->addObjectWithIndex ("visualProcessTest", miaou ) ;
00285 
00286  for ( NameToPointerMap<Process>::iterator pi = _processTable->begin () ;
00287                pi != _processTable->end () ;
00288                ++pi 
00289                )
00290             {
00291               
00292               cout<<"pour s'assurer ke le proc est ajoute koi (createDistributedSimulation) : "<<endl;
00293               cout<<"nom de proc est: "<< pi->second->getProcessName()<<" nom de host machine est: "<<pi->second->getHostMachineName()<<endl;
00294             }
00296 
00297      // creation des processus locaux
00298 #ifdef _DEBUGPVMEXEC
00299      cerr << "Creation des processus locaux..................." << endl ;
00300 #endif
00301      Process * processInfo ;
00302      int adrLoc ;
00303 
00304      std::vector<int> processSiteIds ;
00305      
00306      for (i = _processTable->begin () ; i != _processTable->end () ; i ++) 
00307        {
00308          processInfo = (*i).second ;
00309          
00310          // spawn a new process on the specified workstation
00311          adrLoc = spawnProcess (processInfo) ;
00312          
00313          // create a SvmLink, add it to processInfo
00314          processInfo->setSvmLink (createSvmLink (adrLoc)) ; 
00315          
00316          _idToProcessTable.insert ( make_pair ( adrLoc,processInfo ) ) ;
00317 
00318          processInfo->getSvmLink ()->getOutgoingBuffer().insertTimeStamp ( initialSimulationDate ) ;
00319 
00320          // send the name of the process (in the Svm) to the created process
00321          processInfo->getSvmLink ()->getOutgoingBuffer()<<(*i).first ;
00322          
00323          processInfo->getSvmLink ()->getOutgoingBuffer().send (PvmMessage::SiteName) ;
00324 
00325          //prepare processSiteIds for the next step
00326          processSiteIds.push_back ( processInfo->getSvmLink ()->getTID() ) ;
00327 
00328 #ifdef _DEBUGPVMMESS
00329          cerr<<typeid((*i).first).name()<<endl;
00330          cerr<<(*i).first<<" of id "<<processInfo->getSvmLink ()->getTID()<<endl;
00331 #endif
00332        }
00333      
00334 
00335      // Encodage des attributs Svm de chaque processus
00336      PvmMulticastMessage nameToIdMessage ( processSiteIds ) ;
00337      nameToIdMessage.insertTimeStamp ( initialSimulationDate ) ;
00338      
00339      for (i = _processTable->begin () ; i != _processTable->end () ; i ++) 
00340        {
00341          nameToIdMessage << *((*i).second) ;
00342        }
00343      
00344      // Transfert des adresses vers chaque controleur local
00345      nameToIdMessage.send ( PvmMessage::ProcessTable ) ;
00346      
00347      serveNameRequestsUntilEnd () ;
00348 
00349 }
00350 
00351 */
00352 
00354 
00355 // aspects fonctionnement secondaire
00356 //****************************************************
00357 // RK 1302 : Suppression du nom du pss
00358 
00359 void Svm::createDistributedSimulation(const Date & initialSimulationDate )
00360 {
00361 #ifdef _DEBUGPVMEXEC
00362    cerr << "Svm::createDistributedSimulation : creating the distributed simulation" << endl ;
00363 #endif
00364    _siteName = _masterSiteName ;
00365 
00367      
00368      NameToPointerMap<Process>::iterator i ;
00369      
00370      for (i = _processTable->begin () ; i != _processTable->end () ; ++i ) 
00371        {
00372          addNewWorkstation ( (*i).second->getHostMachineName () ) ;
00373        }
00374 
00375      
00376      // creation des processus locaux
00377 #ifdef _DEBUGPVMEXEC
00378      cerr << "Creation des processus locaux..................." << endl ;
00379 #endif
00380      Process * processInfo ;
00381      int adrLoc ;
00382 
00383      //   std::vector<int> processSiteIds ;
00384      
00385      for (i = _processTable->begin () ; i != _processTable->end () ; i ++) 
00386        {
00387          processInfo = (*i).second ;
00388          
00389          // spawn a new process on the specified workstation
00390          adrLoc = spawnProcess (processInfo) ;
00391 
00392          cout<<"my TID is : "<<adrLoc<<endl ;
00393          
00394          // create a SvmLink, add it to processInfo
00395          processInfo->setSvmLink (createSvmLink (adrLoc)) ; 
00396          
00397          _idToProcessTable.insert ( make_pair ( adrLoc,processInfo ) ) ;
00398 
00399          processInfo->getSvmLink ()->getOutgoingBuffer().insertTimeStamp ( initialSimulationDate ) ;
00400 
00401          // send the name of the process (in the Svm) to the created process
00402          processInfo->getSvmLink ()->getOutgoingBuffer()<<(*i).first ;
00403          
00404          processInfo->getSvmLink ()->getOutgoingBuffer().send (PvmMessage::SiteName) ;
00405 
00406          //prepare processSiteIds for the next step
00407          processSiteIds.push_back ( processInfo->getSvmLink ()->getTID() ) ;
00408 
00409 #ifdef _DEBUGPVMMESS
00410          cerr<<typeid((*i).first).name()<<endl;
00411          cerr<<(*i).first<<" of id "<<processInfo->getSvmLink ()->getTID()<<endl;
00412 #endif
00413        }
00414      
00415 
00416      // Encodage des attributs Svm de chaque processus
00417      PvmMulticastMessage nameToIdMessage ( processSiteIds ) ;
00418      nameToIdMessage.insertTimeStamp ( initialSimulationDate ) ;
00419      
00420      for (i = _processTable->begin () ; i != _processTable->end () ; i ++) 
00421        {
00422          nameToIdMessage << *((*i).second) ;
00423        }
00424      
00425      // Transfert des adresses vers chaque controleur local
00426      nameToIdMessage.send ( PvmMessage::ProcessTable ) ;
00427      
00428      serveNameRequestsUntilEnd () ;
00429 
00430 }
00431 
00432 
00433 
00434 void Svm::serveNameRequestsUntilEnd (void) 
00435 {
00436   bool serving = true ;
00437 
00438   //convert the actual name Server to a PvmCentralNameServer
00439   PvmCentralNameServer * nameServer = new PvmCentralNameServer ( *Name::getNameServer() ) ;
00440   Name::setNameServer (nameServer ) ;
00441 
00442 
00443   PvmIncomingMessage receiveBuffer ;
00444   std::pair<PvmMessage::MessageTag,  int> receivedRequest ;  
00445 
00446   while ( serving )
00447     {
00448 
00449       receivedRequest = waitForAnyRequests ( receiveBuffer ) ;
00450 
00451       switch (receivedRequest.first)
00452         {
00453         case PvmMessage::EndOfSimulation :
00454            {
00455               const Date & endDate (receiveBuffer.getMessageDate() );
00456               Name endedProcess ;
00457               receiveBuffer>>endedProcess ;
00458               cerr<<endedProcess<<" finished at "<<endDate<<endl;
00459               _processTable->erase ( endedProcess ) ;
00460               if ( _processTable->size() == 0 ) 
00461                  {  
00462                     disconnectFromDistributedSimulation ( endDate ) ;
00463                     serving = false ;
00464                  }
00465            }
00466            break;
00467         case PvmMessage::NameServiceGetId :
00468           {
00469             int stringLength ;
00470             receiveBuffer>>stringLength ;
00471             char * stringId = new char[stringLength] ;
00472             receiveBuffer>>stringId ;
00473             assert ( stringId [stringLength - 1] == 0 ) ;
00474             OMTRACEID("PVMSVM",
00475           "Requested id for string "<<stringId<<"| of length "<<stringLength
00476                 <<" from "<< hex << receivedRequest.second << dec);
00477             if ( stringLength ==1)
00478       {
00479         OMTRACE ("Ask for Id of " << stringId <<"|");
00480         Name::idType id = nameServer->getIdentifier( stringId ) ;
00481         OMTRACE ("id=" << id  <<"|");
00482         OMTRACE ("StringAssociatedTo=  " << nameServer->getStringAssociatedTo (id) <<"|");
00483       }
00484       Name::idType id = nameServer->getIdentifier( stringId ) ;
00485             OMTRACEID("PVMSVM"," sending "<<id<< "|");
00486             delete [] stringId ;
00487             pvm_initsend( PvmSvm::pvmDataEncoding ) ;
00488             pvm_pklong (&id, 1, 1);
00489             pvm_send (receivedRequest.second,PvmMessage::NameServiceReturnId) ;  
00490           }
00491           break;
00492         case PvmMessage::NameServiceGetString:
00493           {
00494             Name::idType id ;
00495             receiveBuffer>>id ;
00496             OMTRACEID("PVMSVM",
00497             "Requested String for id "<<id
00498                 <<"| from "<< hex << receivedRequest.second<< dec );
00499       std::string resultingString( nameServer->getStringAssociatedTo (id) ) ;
00500             OMTRACEID("PVMSVM"," sending "<<resultingString<<"|");
00501             pvm_initsend( PvmSvm::pvmDataEncoding ) ;
00502             pvm_pkstr( const_cast<char *>( resultingString.c_str() ) ) ;
00503             pvm_send (receivedRequest.second,PvmMessage::NameServiceReturnString) ;  
00504           }
00505           break;
00506         case PvmMessage::NameServiceVerifyLocalNameServer:
00507           {
00508             nameServer->verifyCompatibilityWithLocalNameServer (receivedRequest.second, receiveBuffer) ;
00509           }
00510           break;
00511           //added by Chadi
00512         case PvmMessage::addNewSite:
00513           {
00514             addNewSiteToSimulation(receiveBuffer.getMessageDate());
00515           }
00516           break;
00517           //end added
00518         default:
00519             OMTRACEID("PVMSVM","Svm::serveNameRequestsUntilEnd() : unexpected request "
00520                                               <<receivedRequest.first);
00521           //an unexpected request is received
00523 //        switch (Controller::warningLevel)
00524 //          {
00525 //          case Controller::AllWarnings :
00526 //            cerr<<"Svm::serveNameRequestsUntilEnd() : unexpected request "
00527 //                <<receivedRequest.first<<endl;
00528 //            break ;
00529 //          case Controller::FatalWarnings :
00530 //            cerr<<"Svm::serveNameRequestsUntilEnd() : unexpected request"
00531 //                <<receivedRequest.first<<endl;
00532 //            exit (2) ;              
00533 //            break;
00534 //          default://nothing to do
00535 //            break;
00536 //          }
00538           break;
00539         }
00540     }
00541 }
00543 void Svm::addNewSiteRequest (const Date & date)
00544 {
00545   PvmUnicastMessage mess ( getParentSiteId () ) ;
00546   mess.insertTimeStamp ( date ) ;
00547   mess.send ( PvmMessage::addNewSite ) ;
00548 }
00550 
00551 
00552 void Svm::connectToDistributedSimulation () 
00553 {
00554 //TDTD
00555 //#ifdef _DEBUGPVMEXEC
00556   cerr<<"Svm::connectToDistributedSimulation (): connecting to the distributed simulation"<<endl;
00557 //#endif
00558 
00559    PvmIncomingMessage * np = NULL ;
00560 
00561    // connect to the global controller
00562    _linkToCentralSite = createSvmLink ( getParentSiteId () ) ;
00563  
00564    // join the group of slave processes
00565    joinSvmGroup (_slaveGroupName) ;
00566  
00567    // create a distributed version of the nameServer
00568    PvmNameServer * pvmNameServer = new PvmNameServer ( getParentSiteId() , *Name::getNameServer() ) ; 
00569  
00570    Name::setNameServer ( pvmNameServer ) ; 
00571  
00572    // find out the name of this site
00573    np = & _linkToCentralSite->waitForMessage (PvmMessage::SiteName) ;
00574    *np >> _siteName ;
00575    npp = np ;
00576 
00577 //    cout<<"my site name is _siteName: "<<_siteName<<endl;
00578 //    NameToPointerMap<Process>::iterator j ;
00579 //    for (j = _processTable->begin () ; j != _processTable->end () ; j ++) 
00580 //      {
00581 //        cout<<"connect: _processTable contient: "<<(*j).second->getProcessName()<<endl; 
00582 //      }
00583    
00584 
00585  
00586    // find out the adresses of the other sites 
00587    // - extract the timestamp of the communication
00588    _linkToCentralSite->waitForMessage (PvmMessage::ProcessTable) ;
00589 
00590    // - extract, for each process, it's adress in the virtual machine and create a link
00591    NameToPointerMap<Process>::iterator i ;
00592    for (i = _processTable->begin () ; i != _processTable->end () ; i ++) 
00593      {
00594        ++_numberOfSlaves ; //we add ourselves, but as _numberOfSlaves is initialised to 0, it's ok
00595 
00596       // Remarque : creer un canal sur soi-meme n'est pas genant,
00597       // on met juste a jour les tids dans la table des pss
00598 
00599       Process * p = (*i).second ;
00600 
00601       // creation d'un canal sans destinataire 
00602       p->setSvmLink (createSvmLink ()) ;
00603 
00604       // maj du destinataire
00605       *np  >> *p ;
00606 
00607       _idToProcessTable.insert ( make_pair ( p->getSvmLink()->getTID(),p ) ) ;
00608 
00609       cout<<"le TID du process ajoute est : "<< p->getSvmLink()->getTID()<<" et son nom est :"<<p->getProcessName()<<endl; 
00610    }
00611 
00612 
00613    //wait for all other sites to be connected to the distributed simulation.
00614    //avoids sending messages to sites still connected or worse, not yet connected
00615    syncDistributedSites () ;
00616    cout<<"EEEE"<<endl;
00617 }
00618 
00619 //-----------------------------------------------------------------------
00620 
00621 void Svm::disconnectFromDistributedSimulation (const Date & endOfSimulationDate ) 
00622 {
00623 #ifdef _DEBUGPVMEXEC
00624    cout <<"Svm::disconnectFromDistributedSimulation"<< endl ;
00625 #endif
00626    int recepient = getParentSiteId () ;
00627 
00628    if ( recepient != 0 )
00629       // a local PvmController has finished. Notify the central controller
00630       {
00631          PvmUnicastMessage endMessage (recepient) ;
00632          
00633          endMessage.insertTimeStamp ( endOfSimulationDate ) ;
00634  
00635          endMessage << getSiteName () ;
00636 
00637          endMessage.send ( PvmMessage::EndOfSimulation ) ;
00638       }
00639    else
00640       //the central controller is deconnecting. Notify all local controllers
00641       {
00642          std::vector<int> processSiteIds ;
00643          for (NameToPointerMap<Process>::iterator i = _processTable->begin () ; 
00644               i != _processTable-> end () ; 
00645               i ++) 
00646             {
00647                processSiteIds.push_back ( (i->second)->getSvmLink ()->getTID() ) ;
00648             }
00649          PvmMulticastMessage endOfSimulationMessage ( processSiteIds ) ;             
00650 
00651          endOfSimulationMessage.insertTimeStamp ( endOfSimulationDate ) ;
00652 
00653          endOfSimulationMessage << getSiteName () ;
00654 
00655          endOfSimulationMessage.send ( PvmMessage::EndOfSimulation ) ;
00656       }
00657 }
00658 
00659 
00660 //-----------------------------------------------------------------------
00661 
00662 
00663 void Svm::broadcast (PvmMessage::MessageTag tag, PvmMulticastMessage * mess)
00664 {
00665   //the first implementation, commented, dies in pvm with an unexpected error
00666   //I haven't had time to trace the problem, so the second implementation is used
00667 
00668   /* first implementation
00669   initBeforeMessagePacking () ;
00670   
00671   if ( mess != NULL )
00672     {
00673       mess->packMessage() ;
00674     }
00675   else
00676     {
00677       mess = new PvmOutgoingMessage () ;
00678       mess->insertTimeStamp(0) ;
00679       *mess << "dummy";
00680       mess->packMessage() ;
00681     }
00682   
00683   broadcastToGroup ( slaveGroup, tag ) ;
00684   */
00685 
00686 #ifdef _DEBUGPVMMESS
00687   cerr<<"Svm::broadcast ("<<tag<<","<<mess<<") ; "<<endl;
00688 #endif
00689 
00690   NameToPointerMap<Process>::iterator processIter ; 
00691   
00692   if (mess == NULL )
00693     {
00694       std::vector<int> distantRecepients ;
00695       for (processIter = _processTable->begin () ; processIter != _processTable->end () ; ++processIter) 
00696         {
00697           if (_siteName != (*processIter).second->getProcessName ()) 
00698             {
00699               // On connait alors le canal a observer
00700               distantRecepients.push_back ( (*processIter).second->getSvmLink ()->getTID () ) ;
00701             }
00702         }
00703 
00705       mess = new PvmMulticastMessage ( distantRecepients ) ;
00706       mess->insertTimeStamp(0) ;
00707     }
00708 
00709   mess -> send ( tag ) ;
00710 
00711 }
00712 
00713 
00714 
00715 void Svm::synchronizeOn ( PvmController & parsingController, PvmMessage::MessageTag tag )
00716 {
00717   broadcast ( tag ) ;
00718 #ifdef _DEBUGPVMSYNCHRO
00719   cerr<< "Svm::waitForAllProcesses("<<tag<<") "<<_numberOfSlaves <<endl;
00720   usleep(1000000) ;
00721 #endif
00722 
00723   int numberOfReadyProcesses = 1 ; //this process has entered the barrier
00724   
00725   while ( numberOfReadyProcesses != _numberOfSlaves )
00726      {
00727 
00728         // here, we ignore the results, because we suppose one process will only send tag once during synchronization
00729         waitForAnswerToBlockingRequest ( parsingController, tag ) ;
00730 
00731         numberOfReadyProcesses++ ;
00732 
00733 #ifdef _DEBUGPVMMESS
00734   cerr<< "Svm::exitSoftBarrier numberToEnterBarrier: "<<numberOfReadyProcesses<<endl;
00735   usleep(1000000) ;
00736 #endif
00737 
00738      }
00739 }
00740 
00741 //-----------------------------------------------------------------------
00742 
00743 void Svm::syncDistributedSites()
00744 {
00745   groupBarrier( _slaveGroupName, _numberOfSlaves ) ;
00746 }
00747 
00748 //-----------------------------------------------------------------------
00749 
00750 SvmLink * Svm::getLinkToProcessNamed( const Name & processName )
00751 {
00752   // check that we know this process
00753   if( _processTable->find( processName ) != _processTable->end() )
00754   {
00755     Process *pss = _processTable->getObjectOfIndex( processName ) ;
00756     return pss->getSvmLink() ;
00757   }
00758   else
00759   {
00761 // #ifdef _USESSTREAM
00762 //     ostringstream o ;
00763 //     o << "Svm::getLinkToProcessNamed : Impossible de trouver le processus "
00764 //       << processName << " afin de recuperer son canal !" ;
00765 //     Controller::warning (o.str(), Controller::SomeWarnings ) ;
00766 // #else
00767 //     ostrstream o ;
00768 //     o << "Svm::getLinkToProcessNamed : Impossible de trouver le processus "
00769 //       << processName << " afin de recuperer son canal !" ;
00770 //     o.put ('\0') ;
00771 //     Controller::warning (o.str(), Controller::SomeWarnings ) ;
00772 //     delete o.str() ;
00773 // #endif
00775     return 0 ;
00776   }
00777 }
00778 
00779 //-----------------------------------------------------------------------
00780 
00781 Process * Svm::getProcessDescriptorNamed( const Name & processName )
00782 {
00783   // check that we know this process
00784   if( _processTable->find( processName ) != _processTable->end() )
00785   {
00786     Process *pss = _processTable->getObjectOfIndex( processName ) ;
00787     return pss ;
00788   }
00789   else 
00790   {
00792 // #ifdef _USESSTREAM
00793 //     ostringstream o  ;
00794 //     o << "Svm::getProcessDescriptorNamed: process " << processName << " unknown !" ;
00795 //     Controller::warning (o.str(), Controller::SomeWarnings ) ;
00796 // #else
00797 //     ostrstream o  ;
00798 //     o << "Svm::getProcessDescriptorNamed: process " << processName << " unknown !" ;
00799 //     o.put('\0') ;
00800 //     Controller::warning (o.str(), Controller::SomeWarnings ) ;
00801 //     delete o.str() ;
00802 // #endif
00804     return 0 ;
00805   }
00806 }
00807 
00808 //-----------------------------------------------------------------------
00809 
00810 
00811 //-----------------chadi ajout 2/6---------------------------
00812 
00813 // on cherche dans la liste des deconnectes si le processus existe ou pas
00814 
00815 Process * Svm::testIfDisconnectedProcessNamed( const Name & processName )
00816 {
00817   // check that we know this process as a deconnected process
00818   if( _disconnectedTable->find( processName ) != _disconnectedTable->end() )
00819   {
00820     Process *pss = _disconnectedTable->getObjectOfIndex( processName ) ;
00821     return pss ;
00822   }
00823   else 
00824   {
00825     return 0 ;
00826   }
00827 }
00828 
00829 //-------------------fin chadi ajout 2/6--------------------------
00830 
00831 
00832 
00833 
00834 
00835 
00836 void Svm::processReceivedMessages (PvmController & parsingController, const PvmMessage::MessageTag tag) 
00837 {
00838 #ifdef _DEBUGPVMMESS
00839    cerr << "Svm::processReceivedMessages"<<endl;
00840 #endif
00841    NameToPointerMap<Process>::iterator pOMKs ; // Iterateur pour parcours tableau
00842    SvmLink * canal ; // Canal vers le precedent processus
00843    PvmIncomingMessage * messageRecu = NULL ;
00844    for (pOMKs = _processTable->begin () ; 
00845         pOMKs != _processTable->end () ; 
00846         pOMKs ++) 
00847       {
00848          if (_siteName != (*pOMKs).second->getProcessName ()) 
00849             {
00850                // On connait alors le canal a observer
00851                canal = (*pOMKs).second->getSvmLink () ;
00852 #ifdef _DEBUGPVMMESS
00853                cout << "Svm::processReceivedMessages from "
00854                     << (*pOMKs).second->getProcessName () << " tag : "
00855                     << tag << endl ;
00856                //canal->affiche () ;
00857 #endif
00858                // On fait une reception non bloquante
00859                messageRecu = canal->testForMessage (tag) ;
00860                while ( messageRecu->hasMessage () ) 
00861                   {
00862 #ifdef _DEBUGPVMMESS
00863                      cerr << "Svm::processReceivedMessages : message received of size " <<messageRecu->getSize()<< endl;
00864 #endif        
00865 #ifdef _DEBUGPVMMESS
00866                      cerr << "Svm::processReceivedMessages: extracted emission date: " <<messageRecu->getMessageDate()<< endl;
00867 #endif        
00868                      //cerr << "Svm::processReceivedMessages : date " << dateEmission << endl ;
00869                      // On envoie le message et l'objet au controleur local
00870                      parsingController.parseSynchronisationMessage (messageRecu) ;
00871                      messageRecu = canal->testForMessage (tag) ;
00872                   }
00873             }
00874       }
00875 }
00876 
00877 //-----------------------------------------------------------------------
00878 
00879 void Svm::waitAndProcessMessages (PvmController & parsingController, 
00880                                      const PvmMessage::MessageTag tag) 
00881 {
00882 #ifdef _DEBUGPVMMESS
00883    cerr<<"Svm::waitAndProcessMessages tag "<<tag<<endl<<"Processus : "<<endl;
00884    NameToPointerMap<Process>::iterator pOMK ;
00885    for (pOMK = _processTable->begin () ; pOMK != _processTable->end () ; pOMK ++) {
00886       cerr<<(*pOMK).second->getProcessName ();
00887    }
00888    cerr<<endl;
00889 #endif
00890    NameToPointerMap<Process>::iterator pOMKs ; // Iterateur pour parcours tableau
00891    SvmLink * canal ; // Canal vers le precedent processus
00892    PvmIncomingMessage * messageRecu ;
00893 #ifdef _DEBUGPVMMESS
00894    cerr<<"Svm::waitAndProcessMessages écoute de tous les processus"<<endl;
00895 #endif
00896    for (pOMKs = _processTable->begin () ; pOMKs!= _processTable->end () ; pOMKs ++) {
00897 #ifdef _DEBUGPVMMESS
00898       cerr<<"Svm::waitAndProcessMessages écoute d'un de plus : "<<(*pOMKs).second->getProcessName ()<<endl;
00899 #endif
00900       if (_siteName != (*pOMKs).second->getProcessName ()) {
00901          // On connait alors le canal a observer
00902          canal = (*pOMKs).second->getSvmLink () ;
00903 #ifdef _DEBUGPVMMESS
00904          cout << "OMKmSvm::waitAndProcessMessages sur "
00905               << (*pOMKs).second->getProcessName () << " tag : "
00906               << tag << endl ;
00907          canal->printDebuggingInformation () ;
00908 #endif
00909          // On recoit le message en bloquant
00910          messageRecu = & canal->waitForMessage (tag) ;
00911 #ifdef _DEBUGPVMMESS
00912          cerr << "Svm::waitAndProcessMessages : message received"<< endl ;
00913 #endif
00914          // On envoie le message et l'objet au controleur local
00915          parsingController.parseSynchronisationMessage (messageRecu) ;
00916       }
00917    }
00918 }
00919 
00920 //-----------------------------------------------------------------------
00921 
00922 void Svm::waitForMessage (PvmController & parsingController, const PvmMessage::MessageTag tag) 
00923 {
00924    NameToPointerMap<Process>::iterator pOMKs ; // Iterateur pour parcours tableau
00925    SvmLink * canal ; // Canal vers le precedent processus
00926    PvmIncomingMessage * messageRecu = NULL ;
00927    bool rienRecu=true;
00928    while (rienRecu) {//on ne teste pas la nullite du message, 
00929       //car en cas de message ne contenant que la date, on le vide !
00930       for (pOMKs = _processTable->begin () ; pOMKs!= _processTable->end () ; pOMKs ++) {
00931          if (_siteName != (*pOMKs).second->getProcessName ()) {
00932             // On connait alors le canal a observer
00933             canal = (*pOMKs).second->getSvmLink () ;
00934 #ifdef _DEBUGPVMMESS
00935             cout << "OMKmSvm::waitForMessage sur "
00936                  << (*pOMKs).second->getProcessName () << " tag : "
00937                  << tag << endl ;
00938             canal->printDebuggingInformation () ;
00939 #endif
00940             // On recoit le message en bloquant
00941             messageRecu = canal->testForMessage (tag) ;
00942             // On envoie le message et l'objet au controleur local
00943             if (messageRecu->hasMessage() ) 
00944                {
00945 #ifdef _DEBUGPVMMESS
00946                cerr << "Svm::waitForMessage : message received" << endl ;
00947 #endif
00948                parsingController.parseSynchronisationMessage (messageRecu) ;
00949                rienRecu=false;
00950             }
00951          }
00952       }
00953       messageRecu = _linkToCentralSite->testForMessage (tag) ;
00954       //cerr << "Svm::waitForMessage : message recu : " << *messageRecu << endl ;
00955       if (messageRecu->hasMessage() ) 
00956          {
00957 #ifdef _DEBUGPVMMESS
00958          cerr << "Svm::waitForMessage : message received" << endl ;
00959 #endif
00960          parsingController.parseSynchronisationMessage (messageRecu) ;
00961          rienRecu=false;
00962       }
00963    }
00964 }
00965 
00966 //------------------------------------------------------
00967 /*
00968 void Svm::flushDisconnectedSitesOutgoingBuffer()
00969 {
00970   for ( NameToPointerMap<Process>::iterator prIt = _disconnectedTable->begin () ;
00971         prIt != _disconnectedTable->end () ;
00972         ++prIt 
00973         )
00974     {
00975       prIt->second->getSvmLink ()->getOutgoingBuffer().flushCurrentBuffer();    
00976     }
00977 }
00978 */
00979 //------------------------------------------------------
00980 
00982 
00983 
00984 // void Svm::synchroniseReceiveAndProcessMessages (PvmController & parsingController, const PvmMessage::MessageTag tag) 
00985 // {
00986 //   NameToPointerMap<Process> aRecevoir ;
00987 //   static int _mostAncientDate = 10000 ;
00988   
00989 //   //First make a list of processes that we need to receive values from
00990 //   for (   NameToPointerMap<Process>::iterator pOMKsE = _processTable->begin () ; 
00991 //        pOMKsE != _processTable->end () ; 
00992 //        pOMKsE ++) 
00993 //     {
00994 //       if ( (pOMKsE->first != _siteName) &&
00995 //         ( pOMKsE->second->getDateOfLastMessage () +
00996 //           pOMKsE->second->getPeriod () +
00997 //           _synchronisationLatency < parsingController.getSimulatedDate ()))
00998 //      {
00999 //        if (pOMKsE->second->getDateOfLastMessage () < _mostAncientDate)
01000 //          {
01001 //            _mostAncientDate = pOMKsE->second->getDateOfLastMessage ();
01002 //          }
01003 //        _disconnectedTable->addObjectWithIndex (pOMKsE->first, pOMKsE->second) ;
01004 // #ifdef _DEBUGPVMMESS
01005 //        cerr << "Svm::synchroniseReceiveAndProcessMessages: trying reception from: "
01006 //             << pOMKsE->first << endl ;
01007 // #endif
01008 //      }
01009 //     }
01010   
01011 //   PvmIncomingMessage receiveBuffer ;
01012 //   PvmMessage::MessageTag receivedRequest ;  
01013 //   Process * receivingProcess ;
01014   
01015 //   struct timeval start_timer,end_timer;
01016 //   gettimeofday(&start_timer, NULL);
01017 //   int time_in_ms = 0;
01018 //   int _simDate =  parsingController.getSimulatedDate () ;
01019 //   while ( _mostAncientDate + time_in_ms  < _simDate ) 
01020 //     { 
01021 //       for ( NameToPointerMap<Process>::iterator processIterator = _disconnectedTable->begin () ;
01022 //          processIterator != _disconnectedTable->end () ;
01023 //          ++processIterator 
01024 //          )
01025 //      {
01026           
01027 //        //this new implementation doesn't do blocking receives, as some processes could send urgent requests
01028 //        receivingProcess = processIterator->second ;
01029          
01030 //        receivedRequest = receivingProcess->getSvmLink ()->testForAnyMessage ( receiveBuffer ) ;
01031          
01032 //        if ( receiveBuffer.hasMessage() )
01033 //          {
01034               
01035 //            switch (receivedRequest)
01036 //              {
01037 //              case PvmMessage::SynchronisationMessage :
01038 //                {
01039                     
01040 //                              //have the controller process the message
01041 //                      parsingController.parseSynchronisationMessage ( &receiveBuffer ) ;
01042                         
01043 //                      receivingProcess->setDateOfLastMessage ( receiveBuffer.getMessageDate () ) ;
01044                         
01045 //                              //an additionnal test could be performed here to make sure correct date of last message has happened
01046 //                      _disconnectedTable->erase ( receivingProcess->getProcessName() ) ;
01047 //                  //  }
01048                     
01049 //                }
01050 //              break ;
01051 //              case PvmMessage::MirrorNeedsInitialValues :
01052 //                {
01053 //                  parsingController.sendInitialValuesToMirror ( receiveBuffer ) ;      
01054 //                }
01055 //              break ;
01056 //              case PvmMessage::InitialValuesForMirror :
01057 //                {
01058 //                  //have the controller process the message
01059 //                  parsingController.parseSynchronisationMessage ( &receiveBuffer ) ;
01060                     
01061 //                }
01062 //              break ;
01063 //              case PvmMessage::LocalInitSuccessfull :
01064 //              case PvmMessage::ProcessTable :
01065 //              case PvmMessage::EndOfSimulation :
01066 //              case PvmMessage::EnterBarrier :
01067 //              case PvmMessage::SiteName :
01068 //              case PvmMessage::ExitBarrier :
01069 //              case PvmMessage::NameServiceGetId :
01070 //              case PvmMessage::NameServiceGetString :
01071 //              case PvmMessage::NameServiceReturnId :
01072 //              case PvmMessage::NameServiceReturnString :
01073 //              case PvmMessage::NameServiceVerifyLocalNameServer :
01074 //              case PvmMessage::NameServiceVerifyResult :
01075 //              default:
01076 //                {
01077 //                  Controller::warning ("Svm::waitForAnswerToBlockingRequest(): received unexpected message",
01078 //                                         Controller::AllWarnings ) ;
01079 //                }
01080 //              }
01081 //          }     
01082           
01083           
01084 //      } //end for
01085       
01086 //       gettimeofday(&end_timer, NULL);
01087       
01088 //       time_in_ms = ( (end_timer.tv_sec-start_timer.tv_sec) * 1000000 +
01089 //                   (end_timer.tv_usec-start_timer.tv_usec) )/1000 ;
01090       
01091 //   //    cout<<"la valeur de dif en millisec : "<<time_in_ms<<endl;
01092 //       //      cout<<"la valeur de start est : "<<start<<endl;
01093 //       //      cout<<"la valeur de end est : "<<end<<endl;
01094       
01095 //     }  //end while
01096   
01097 // }
01098 
01099 
01101 
01102 
01103 void Svm::relaxedSynchroniseReceiveAndProcessMessages (PvmController & parsingController, const PvmMessage::MessageTag tag) {
01104 
01105    PvmIncomingMessage receiveBuffer ;
01106    PvmMessage::MessageTag receivedRequest ;  
01107    Process * receivingProcess ;
01108    static Name visualProcessA ("visualProcessA") ;
01109    static Name visualProcessB ("visualProcessB") ;
01110    static Name visualProcessC ("visualProcessC") ;
01111    static Name visualProcessD ("visualProcessD") ;
01112 
01113    //std::cerr <<"Svm:: le controleur est sur le processus : " << _siteName << std::endl ;
01114 //    if (_siteName == visualProcessB) { // simulation d'un pb de latence réseau
01115 //       int sommeil = random () % 15000 ;
01116 //       usleep (sommeil) ;
01117 //       //std::cerr <<"ralentissement de : " << sommeil << std::endl ;
01118 //    } else if (_siteName == visualProcessC) { // simulation d'un pb de latence réseau
01119 //       int sommeil = random () % 30000 ;
01120 //       usleep (sommeil) ;
01121 //       //std::cerr <<"ralentissement de : " << sommeil << std::endl ;
01122 //    } else if (_siteName == visualProcessD) { // simulation d'un pb de latence réseau
01123 //       int sommeil = random () % 45000 ;
01124 //       usleep (sommeil) ;
01125 //       //std::cerr <<"ralentissement de : " << sommeil << std::endl ;
01126 //    }
01127 
01128 
01129 //  if (! _disconnectedTable->empty ()) {
01130 
01131    // first, we try only once to receive something from the disconnected processes
01132    NameToPointerMap<Process> copyOfDisconnected ;
01133    for (NameToPointerMap<Process>::iterator prIt = _disconnectedTable->begin () ;
01134         prIt != _disconnectedTable->end () ;
01135         ++ prIt) {
01136       copyOfDisconnected.addObjectWithIndex (prIt->first, prIt->second) ;
01137    }
01138    for (NameToPointerMap<Process>::iterator prIt = copyOfDisconnected.begin () ;
01139         prIt != copyOfDisconnected.end () ;
01140         ++ prIt) {
01141       if (prIt->first != _siteName) {
01142          receivingProcess = prIt->second ;       
01143          receivedRequest = receivingProcess->getSvmLink ()->testForAnyMessage (receiveBuffer) ;  
01144          if (receiveBuffer.hasMessage ()) {
01145             // this process has become active again, we put it in the list of processes that we need to receive values from
01146             // TDTD ajout de trace pour debug
01147             //std::cerr << " 1 buffer reçu : " << receivedRequest << " à date " << receiveBuffer.getMessageDate ()<< " de " << receivingProcess->getProcessName() << std::endl ;
01148             // TDTD essai de mise en commentaire pour voir... c'est normalement fait plus loin, avec un test...
01149             //_activeProcessTable->addObjectWithIndex (prIt->first, prIt->second) ;
01150             // fin TDTD
01151             // TDTD : essai pour voir...
01152             _activeProcessTable->addObjectWithIndex (prIt->first, prIt->second) ;
01153             _disconnectedTable->erase (prIt->second->getProcessName ()) ;
01154 //TDTDTD enlever le processus de la map _disconnectedProcessusMap
01155             _disconnectedProcessusMap.erase (prIt->first) ;
01156 
01157             // fin TDTD
01158             switch (receivedRequest) {
01159             case PvmMessage::SynchronisationMessage : {
01160                // cout<<"parsingController.getSimulatedDate() = "<<parsingController.getSimulatedDate()<<endl;
01161                parsingController.parseSynchronisationMessage (&receiveBuffer) ;
01162                receivingProcess->setDateOfLastMessage (receiveBuffer.getMessageDate ()) ;
01163             }
01164                break ;
01165             case PvmMessage::MirrorNeedsInitialValues : {
01166                parsingController.sendInitialValuesToMirror (receiveBuffer) ;      
01167             }
01168                break ;
01169             case PvmMessage::InitialValuesForMirror : {
01170                //have the controller process the message
01171                parsingController.parseSynchronisationMessage (&receiveBuffer) ;
01172             }
01173                break ;
01174             case PvmMessage::LocalInitSuccessfull :
01175                //  case PvmMessage::pingMessage :
01176 //                    {
01177 //                      cout<<"i got the ping message"<<endl;
01178 //                      _activeProcessTable->addObjectWithIndex(prIt->first, processIterator->second) ;
01179 //                      _disconnectedTable->erase( prIt->second->getProcessName() );
01180 //                    }
01181                break;
01182             case PvmMessage::ProcessTable :
01183             case PvmMessage::EndOfSimulation :
01184             case PvmMessage::EnterBarrier :
01185             case PvmMessage::SiteName :
01186             case PvmMessage::ExitBarrier :
01187             case PvmMessage::NameServiceGetId :
01188             case PvmMessage::NameServiceGetString :
01189             case PvmMessage::NameServiceReturnId :
01190             case PvmMessage::NameServiceReturnString :
01191             case PvmMessage::NameServiceVerifyLocalNameServer :
01192             case PvmMessage::NameServiceVerifyResult :
01193             default: {
01194                std::cerr << "Svm::waitForAnswerToBlockingRequest(): received unexpected message" << std::endl ;
01195             }
01196             } //end switch
01197          } else {
01198             // TDTD
01199             // if the proces was declared active, it is now declared inactive, not to send to him datas that will cost time to read
01200             // so that it will still be late...
01201             if (_activeProcessTable->find (prIt->second->getProcessName ()) != _activeProcessTable->end ()) {
01202                _activeProcessTable->erase (prIt->second->getProcessName ()) ;
01203             }
01204             // fin TDTD
01205          }                    
01206       }
01207    } //end for
01208    //     cout<<"size of _activeProcessTable  : "<<_activeProcessTable->size()<<endl;
01209 //    }
01210    //second, make a list of processes that we need to receive values from
01211    for (NameToPointerMap<Process>::iterator pOMKsE = _activeProcessTable->begin () ; 
01212         pOMKsE != _activeProcessTable->end () ; 
01213         pOMKsE ++) {
01214       if ((pOMKsE->first != _siteName) &&
01215           (pOMKsE->second->getDateOfLastMessage () +
01216            pOMKsE->second->getPeriod () +
01217            _synchronisationLatency < parsingController.getSimulatedDate ())) {
01218          aRecevoir.addObjectWithIndex (pOMKsE->first, pOMKsE->second) ;
01219 #ifdef _DEBUGPVMMESS
01220          cerr << "Svm::synchroniseReceiveAndProcessMessages: trying reception from: "
01221               << pOMKsE->first << endl ;
01222 #endif
01223       }
01224    }
01225   
01226     struct timeval start_timer,end_timer;
01227     gettimeofday(&start_timer, NULL);
01228     double time_in_ms = 0;
01229 
01230    // third, try to receive something from the active processes
01231    // valeur initiale proposée par Chadi : 200 ms
01232    while ((time_in_ms < _synchronisationTimeOut) && ! aRecevoir.empty ()) { // TDTD : valeur délicate à estimer pour le cas multi processus sur une seule machine
01233 // TDTD : ici je remplace processTable par activeProcesstable
01234 //       for (NameToPointerMap<Process>::iterator processIterator = _processTable->begin () ;
01235 //         processIterator != _processTable->end () ;
01236 //         ++processIterator) {
01237       for (NameToPointerMap<Process>::iterator processIterator = _activeProcessTable->begin () ;
01238            processIterator != _activeProcessTable->end () ;
01239            ++processIterator) {
01240 // fin TDTD
01241          //this new implementation doesn't do blocking receives, as some processes could send urgent requests
01242          receivingProcess = processIterator->second ;
01243          receivedRequest = receivingProcess->getSvmLink ()->testForAnyMessage (receiveBuffer) ;
01244          if (receiveBuffer.hasMessage ()) {
01245             // TDTD ajout de trace pour debug
01246             //std::cerr << " 2 buffer reçu : " << receivedRequest << " à date " << receiveBuffer.getMessageDate ()<< " de " << receivingProcess->getProcessName() << std::endl ;
01247             //---------------------------chadi ajout 4/6 ---------------------
01248             //  aRecevoir.erase ( receivingProcess->getProcessName() ) ;
01249             // start = clock();
01250 
01251             //  Date dlm = receivingProcess->getDateOfLastMessage();
01252 //            receivingProcess->setDateOfLastMessage( receiveBuffer.getMessageDate()) ;
01253 //            cout<<"date of last message est = "<<dlm<<" et le message ke je viens de recevoir est daté: "<<receivingProcess->getDateOfLastMessage()<<"donc la difference est de l'ordre de :"<<receivingProcess->getDateOfLastMessage()-dlm<<endl;
01254 // TDTD : je mets ce code plus haut
01255 //          if (_disconnectedTable->find (processIterator->second->getProcessName ()) != _disconnectedTable->end ()) {
01256 //             _activeProcessTable->addObjectWithIndex (processIterator->first, processIterator->second) ;
01257 //             _disconnectedTable->erase (processIterator->second->getProcessName ()) ;
01258 //             //std::cout<<" j'enleve de la liste de deconnecte le processus suivant: "<<processIterator->first<<std::endl;
01259 //             _simStep = 0 ;
01260 //          }
01261 // fin TDTD
01262             //-----------------------fin chadi ajout 4/6 -------------------------
01263 //            if (receiveBuffer.getMessageDate()+90 < parsingController.getSimulatedDate())
01264 //              { }
01265 //            else {
01266             switch (receivedRequest) {
01267             case PvmMessage::SynchronisationMessage : {
01268                // cout<<"parsingController.getSimulatedDate() = "<<parsingController.getSimulatedDate()<<endl;
01269                // To filter ancient synchro messages
01270                parsingController.parseSynchronisationMessage (&receiveBuffer) ;
01271                receivingProcess->setDateOfLastMessage (receiveBuffer.getMessageDate ()) ;
01272                if (parsingController.getSimulatedDate () < receiveBuffer.getMessageDate () + 30) {
01273                   // on a déjà reçu des messages assez récents de ce processus, on n'en demande pas d'autres
01274                   aRecevoir.erase (receivingProcess->getProcessName ()) ;
01275                }
01276             }
01277                break ;
01278             case PvmMessage::MirrorNeedsInitialValues : {
01279                parsingController.sendInitialValuesToMirror (receiveBuffer) ;      
01280             } break ;
01281             case PvmMessage::InitialValuesForMirror : {
01282                //have the controller process the message
01283                parsingController.parseSynchronisationMessage (&receiveBuffer) ;
01284             } break ;
01285             case PvmMessage::LocalInitSuccessfull :
01286                //  case PvmMessage::pingMessage :
01287 //                  {
01288 //                    cout<<"i got the ping message"<<endl;
01289 //                    _activeProcessTable->addObjectWithIndex(processIterator->first, processIterator->second) ;
01290 //                    _disconnectedTable->erase( processIterator->second->getProcessName() );
01291 //                  }
01292                break ;
01293             case PvmMessage::ProcessTable :
01294             case PvmMessage::EndOfSimulation :
01295             case PvmMessage::EnterBarrier :
01296             case PvmMessage::SiteName :
01297             case PvmMessage::ExitBarrier :
01298             case PvmMessage::NameServiceGetId :
01299             case PvmMessage::NameServiceGetString :
01300             case PvmMessage::NameServiceReturnId :
01301             case PvmMessage::NameServiceReturnString :
01302             case PvmMessage::NameServiceVerifyLocalNameServer :
01303             case PvmMessage::NameServiceVerifyResult :
01304             default: {
01305                std::cerr << "Svm::waitForAnswerToBlockingRequest(): received unexpected message" << std::endl ;
01306             }
01307             } //end switch
01308          }                
01309       } //end for
01310       
01311       gettimeofday (&end_timer, NULL) ;
01312       time_in_ms = (end_timer.tv_sec-start_timer.tv_sec) * 1000.0 +
01313          (end_timer.tv_usec-start_timer.tv_usec) /1000.0 ;
01314 
01315       // TDTD : ajout d'un arrêt pour pouvoir travailler avec PVM sur une seule machine et plusieurs process
01316       if (_yieldNeeded) {
01317 #ifdef _MSC_VER
01318                   Sleep( 0 ) ;
01319 #else
01320          usleep (0) ;
01321 #endif
01322       }
01323    }  //end while
01324 
01325    // fourth, remove the inactive processes from the _activeProcessTable and place them in the _disconnectedTable
01326    if  (! aRecevoir.empty ()) {
01327       for (NameToPointerMap<Process>::iterator processIterator = aRecevoir.begin () ;
01328            processIterator != aRecevoir.end () ;
01329            ++ processIterator) {
01330          _disconnectedTable->addObjectWithIndex (processIterator->first, processIterator->second) ;
01331 // TDTDTD ajouter le nom du processus dans la map _disconnectedProcessusMap avec la dernière date simulée arrivée
01332            _disconnectedProcessusMap.insert (std::pair<Name, Type::IntType> (processIterator->first,
01333                                                                              (int)processIterator->second->getDateOfLastMessage ())) ;
01334          // TDTD : je ne comprends pas trop pourquoi mais ici les lignes suivantes doivent être commentéee...
01335 //       if (_activeProcessTable->find (processIterator->second->getProcessName ()) != _disconnectedTable->end ()) {
01336 //          _activeProcessTable->erase (processIterator->second->getProcessName ()) ;
01337 //       }
01338          // fin TDTD : en fait c'était à cause de l'envoi aux seuls non inactifs (et non pas non en retard), voir la méthode send...withTag...
01339          //std::cout << "je rajoute le processus suivant a la table des deconnectes " << processIterator->second->getProcessName () << std::endl ;
01340       }
01341       aRecevoir.clear () ;
01342       removeProcessProlog () ;
01343    }
01344 // TDTD ajout trace pour debug
01345    //std::cout<<"longueur de la table des deconnectés : "<<_disconnectedTable->size ()<<std::endl;
01346 #ifdef _DEBUGPVMMESS
01347    cerr << "Svm::synchroniseReceiveAndProcessMessages : on n'est pas bloque la " << endl ;
01348 #endif
01349 
01350 }
01351 
01352 
01353 
01354 
01355  /* TDTD : version intermédiaire de sauvegarde
01356 void Svm::relaxedSynchroniseReceiveAndProcessMessages (PvmController & parsingController, const PvmMessage::MessageTag tag) 
01357 {
01358  //  if (firstTime) {
01359 //     firstTime = false;
01360 //     synchroniseReceiveAndProcessMessages( parsingController,tag);
01361 //       }
01362 //   else {
01363  // NameToPointerMap<Process> aRecevoir ;
01364 
01365 
01366   PvmIncomingMessage receiveBuffer ;
01367   PvmMessage::MessageTag receivedRequest ;  
01368   Process * receivingProcess ;
01369   
01370  
01371 
01373 
01374   if (!_disconnectedTable->empty())
01375     {
01376       
01377       for ( NameToPointerMap<Process>::iterator prIt = _disconnectedTable->begin () ;
01378             prIt != _disconnectedTable->end () ;
01379             ++prIt 
01380             )
01381         {
01382           if (prIt->first != _siteName)
01383             {
01384               receivingProcess = prIt->second ;  
01385               receivedRequest = receivingProcess->getSvmLink ()->testForAnyMessage ( receiveBuffer ) ;   
01386               if ( receiveBuffer.hasMessage() )
01387                 {
01388                      // TDTD ajout de trace pour debug
01389                    //std::cerr << " 1 buffer reçu : " << receivedRequest << " à date " << receiveBuffer.getMessageDate ()<< " de " << receivingProcess->getProcessName() << std::endl ;
01390                    //cout<<"TRUC BIDULE"<<endl;
01391                   _activeProcessTable->addObjectWithIndex(prIt->first, prIt->second) ;
01392                    aRecevoir.addObjectWithIndex (prIt->first, prIt->second) ;
01393         //         _disconnectedTable->erase( prIt->second->getProcessName() );
01394 
01395                    //cout<<"DEBUT: je rajoute a la liste des ACTIVES le processus suivant: "<<prIt->first<<endl;
01396                    //TDTD : à vérifier avec l'algorithme...
01397 
01398 
01399         //        cout<<"DEBUT: j'enleve de la liste de deconnecte le processus suivant: "<<prIt->first<<endl;
01400                   switch (receivedRequest)
01401                     {
01402                     case PvmMessage::SynchronisationMessage :
01403                       {
01404                         // To filter ancient synchro messages
01405                 //      if (receiveBuffer.getMessageDate()+90 < parsingController.getSimulatedDate())
01406                 //        { receivingProcess->setDateOfLastMessage ( receiveBuffer.getMessageDate () ) ; }
01407                 //      else {
01408                           
01409                                 //have the controller process the message
01410                           parsingController.parseSynchronisationMessage ( &receiveBuffer ) ;
01411                           
01412                           receivingProcess->setDateOfLastMessage ( receiveBuffer.getMessageDate () ) ;
01413                           
01414                                 //an additionnal test could be performed here to make sure correct date of last message has happened
01415                 //        aRecevoir.erase ( receivingProcess->getProcessName() ) ;
01416                 //      }
01417                         
01418                       }
01419                     break ;
01420                     case PvmMessage::MirrorNeedsInitialValues :
01421                       {
01422                         parsingController.sendInitialValuesToMirror ( receiveBuffer ) ;      
01423                       }
01424                     break ;
01425                     case PvmMessage::InitialValuesForMirror :
01426                       {
01427                         //have the controller process the message
01428                         parsingController.parseSynchronisationMessage ( &receiveBuffer ) ;
01429                         
01430                       }
01431                     break ;
01432                     case PvmMessage::LocalInitSuccessfull :
01433                    //  case PvmMessage::pingMessage :
01434 //                    {
01435 //                      cout<<"i got the ping message"<<endl;
01436 //                      _activeProcessTable->addObjectWithIndex(prIt->first, processIterator->second) ;
01437 //                      _disconnectedTable->erase( prIt->second->getProcessName() );
01438 //                    }
01439                     break;
01440                     case PvmMessage::ProcessTable :
01441                     case PvmMessage::EndOfSimulation :
01442                     case PvmMessage::EnterBarrier :
01443                     case PvmMessage::SiteName :
01444                     case PvmMessage::ExitBarrier :
01445                     case PvmMessage::NameServiceGetId :
01446                     case PvmMessage::NameServiceGetString :
01447                     case PvmMessage::NameServiceReturnId :
01448                     case PvmMessage::NameServiceReturnString :
01449                     case PvmMessage::NameServiceVerifyLocalNameServer :
01450                     case PvmMessage::NameServiceVerifyResult :
01451                     default:
01452                       {
01453                          std::cerr << "Svm::waitForAnswerToBlockingRequest(): received unexpected message" << std::endl ;
01454                       }
01455                     } //end switch
01456                   
01457                   
01458                   //  aRecevoir.addObjectWithIndex(prIt->first, prIt->second) ;
01459                   //  _disconnectedTable->erase( prIt->second->getProcessName() );
01460         //        cout<<"je rajoute a la liste des actives le processus suivant: "<<prIt->first<<endl;
01461                   //       Date dlm = receivingProcess->getDateOfLastMessage();
01462                   //      receivingProcess->setDateOfLastMessage( receiveBuffer.getMessageDate()) ;
01463                   //        cout<<"date of last message est = "<<dlm<<" et le message ke je viens de recevoir est daté: "<<receivingProcess->getDateOfLastMessage()<<"donc la difference est de l'ordre de :"<<receivingProcess->getDateOfLastMessage()-dlm<<endl;
01464                   //  while ( dlm+50 < receivingProcess->getDateOfLastMessage() && receiveBuffer.hasMessage() )
01465                   //                {
01466                   //                  receivedRequest = receivingProcess->getSvmLink ()->testForAnyMessage ( receiveBuffer ) ;
01467                   //                  if (receiveBuffer.hasMessage())
01468                   //                    {
01469                   //                      receivingProcess->setDateOfLastMessage( receiveBuffer.getMessageDate()) ;
01470                   //                      dlm = receivingProcess->getDateOfLastMessage();
01471                   //                    }
01472                    //                 receivedRequest = receivingProcess->getSvmLink ()->testForAnyMessage ( receiveBuffer ) ;
01473                   //                }
01474                 }         
01475               
01476             }
01477         } //end for
01478       
01479     }
01480 
01481 
01482   
01484   
01485   //      cout<<"size of _activeProcessTable  : "<<_activeProcessTable->size()<<endl;
01486   
01487   //First make a list of processes that we need to receive values from
01488   for (   NameToPointerMap<Process>::iterator pOMKsE = _activeProcessTable->begin () ; 
01489           pOMKsE != _activeProcessTable->end () ; 
01490           pOMKsE ++) 
01491     {
01492       if ( (pOMKsE->first != _siteName) &&
01493            ( pOMKsE->second->getDateOfLastMessage () +
01494              pOMKsE->second->getPeriod () +
01495              _synchronisationLatency < parsingController.getSimulatedDate ()))
01496         {
01497           aRecevoir.addObjectWithIndex (pOMKsE->first, pOMKsE->second) ;
01498 #ifdef _DEBUGPVMMESS
01499           cerr << "Svm::synchroniseReceiveAndProcessMessages: trying reception from: "
01500                << pOMKsE->first << endl ;
01501 #endif
01502         }
01503     }
01504   
01505   struct timeval start_timer,end_timer;
01506   gettimeofday(&start_timer, NULL);
01507   double time_in_ms = 0;
01508   
01509   
01510   while ( (time_in_ms < 200) && !aRecevoir.empty() ) 
01511     { 
01512       for ( NameToPointerMap<Process>::iterator processIterator = _processTable->begin () ;
01513             processIterator != _processTable->end () ;
01514             ++processIterator 
01515             )
01516         {
01517           
01518           //this new implementation doesn't do blocking receives, as some processes could send urgent requests
01519           receivingProcess = processIterator->second ;
01520           
01521           receivedRequest = receivingProcess->getSvmLink ()->testForAnyMessage ( receiveBuffer ) ;
01522           
01523           if ( receiveBuffer.hasMessage() )
01524             {
01525                      // TDTD ajout de trace pour debug
01526                //std::cerr << " 2 buffer reçu : " << receivedRequest << " à date " << receiveBuffer.getMessageDate ()<< " de " << receivingProcess->getProcessName() << std::endl ;
01527               //---------------------------chadi ajout 4/6 ---------------------
01528               //  aRecevoir.erase ( receivingProcess->getProcessName() ) ;
01529               // start = clock();
01530 
01531              //  Date dlm = receivingProcess->getDateOfLastMessage();
01532 //            receivingProcess->setDateOfLastMessage( receiveBuffer.getMessageDate()) ;
01533 //            cout<<"date of last message est = "<<dlm<<" et le message ke je viens de recevoir est daté: "<<receivingProcess->getDateOfLastMessage()<<"donc la difference est de l'ordre de :"<<receivingProcess->getDateOfLastMessage()-dlm<<endl;
01534 
01535               if (_disconnectedTable->find(processIterator->second->getProcessName())!=_disconnectedTable->end())
01536                 {
01537                   _activeProcessTable->addObjectWithIndex(processIterator->first, processIterator->second) ;
01538                   _disconnectedTable->erase( processIterator->second->getProcessName() );
01539                 //  cout<<" j'enleve de la liste de deconnecte le processus suivant: "<<processIterator->first<<endl;
01540                    _simStep = 0 ;
01541                 }
01542               //-----------------------fin chadi ajout 4/6 -------------------------
01543 //            if (receiveBuffer.getMessageDate()+90 < parsingController.getSimulatedDate())
01544 //              { }
01545 //            else {
01546                 switch (receivedRequest)
01547                   {
01548                   case PvmMessage::SynchronisationMessage :
01549                     {
01550                      // cout<<"parsingController.getSimulatedDate() = "<<parsingController.getSimulatedDate()<<endl;
01551                       // To filter ancient synchro messages
01552                        if (receiveBuffer.getMessageDate()+30 < parsingController.getSimulatedDate())
01553                          { //cout<<"receiveBuffer.getMessageDate()+90 < parsingController.getSimulatedDate()"<<endl; //receivingProcess->setDateOfLastMessage ( receiveBuffer.getMessageDate () ) ; }
01554 
01555                             // TDTD : ajout d'un manque cruel : le traitement du message en retard...
01556                         parsingController.parseSynchronisationMessage ( &receiveBuffer ) ;
01557                             // fin TDTD
01558                        receivingProcess->setDateOfLastMessage ( receiveBuffer.getMessageDate () ) ; }
01559  
01560                       else {
01561                         
01562                                 //have the controller process the message
01563                         parsingController.parseSynchronisationMessage ( &receiveBuffer ) ;
01564                         
01565                         receivingProcess->setDateOfLastMessage ( receiveBuffer.getMessageDate () ) ;
01566                         
01567                                 //an additionnal test could be performed here to make sure correct date of last message has happened
01568                         aRecevoir.erase ( receivingProcess->getProcessName() ) ;
01569                 //      cout<<"Je viens d'effacer le process de aRecevoir ..."<<endl;
01570                       }
01571                       
01572                     }
01573                   break ;
01574                   case PvmMessage::MirrorNeedsInitialValues :
01575                     {
01576                       parsingController.sendInitialValuesToMirror ( receiveBuffer ) ;      
01577                     }
01578                   break ;
01579                   case PvmMessage::InitialValuesForMirror :
01580                     {
01581                       //have the controller process the message
01582                       parsingController.parseSynchronisationMessage ( &receiveBuffer ) ;
01583                       
01584                     }
01585                   break ;
01586                   case PvmMessage::LocalInitSuccessfull :
01587                  //  case PvmMessage::pingMessage :
01588 //                  {
01589 //                    cout<<"i got the ping message"<<endl;
01590 //                    _activeProcessTable->addObjectWithIndex(processIterator->first, processIterator->second) ;
01591 //                    _disconnectedTable->erase( processIterator->second->getProcessName() );
01592 //                  }
01593                   break;
01594                   case PvmMessage::ProcessTable :
01595                   case PvmMessage::EndOfSimulation :
01596                   case PvmMessage::EnterBarrier :
01597                   case PvmMessage::SiteName :
01598                   case PvmMessage::ExitBarrier :
01599                   case PvmMessage::NameServiceGetId :
01600                   case PvmMessage::NameServiceGetString :
01601                   case PvmMessage::NameServiceReturnId :
01602                   case PvmMessage::NameServiceReturnString :
01603                   case PvmMessage::NameServiceVerifyLocalNameServer :
01604                   case PvmMessage::NameServiceVerifyResult :
01605                   default:
01606                     {
01607                        std::cerr << "Svm::waitForAnswerToBlockingRequest(): received unexpected message" << std::endl ;
01608                     }
01609                   } //end switch
01610 //            }
01611         //     receivedRequest = receivingProcess->getSvmLink ()->testForAnyMessage ( receiveBuffer ) ;
01612             }     
01613           
01614           
01615         } //end for
01616       
01617       gettimeofday(&end_timer, NULL);
01618       
01619       time_in_ms =  (end_timer.tv_sec-start_timer.tv_sec) * 1000.0 +
01620                      (end_timer.tv_usec-start_timer.tv_usec) /1000.0 ;
01621       // TDTD : ajout d'un arrêt pour pouvoir travailler avec PVM sur une seule machine et plusieurs process
01622       usleep (1) ;
01623 
01624     }  //end while
01625   
01626   if  ( !aRecevoir.empty() )  
01627     {
01628       for ( NameToPointerMap<Process>::iterator processIterator = aRecevoir.begin () ;
01629             processIterator != aRecevoir.end () ;
01630             ++processIterator 
01631             )
01632         {
01633           _disconnectedTable->addObjectWithIndex(processIterator->first, processIterator->second );
01634 
01635 //        _temporaryTable->addObjectWithIndex(processIterator->first, processIterator->second );
01636 
01637 //        aRecevoir.erase ( processIterator->first ) ;
01638 //        _activeProcessTable->erase( processIterator->first ) ;
01639         
01640           std::cout<<"je rajoute le processus suivant a la table des deconnectes "<<processIterator->second->getProcessName()<<std::endl;
01641         }
01642       aRecevoir.clear();
01643       removeProcessProlog () ;
01644     }
01645 // TDTD ajout trace pour debug
01646           std::cout<<"longueur de la table des deconnectés : "<<_disconnectedTable->size ()<<std::endl;
01647 
01648   
01649 #ifdef _DEBUGPVMMESS
01650   cerr << "Svm::synchroniseReceiveAndProcessMessages : on n'est pas bloque la " << endl ;
01651 #endif
01652   
01653 //  } //else firsttime
01654 
01655 }
01656 */
01657 
01658 void Svm::removeProcessProlog ()
01659 { 
01660   _simStep++ ;
01661   //cout<<"_simStep est egale :"<<_simStep<<endl;
01662   if (_simStep == 2) { 
01663     removeProcessFromActiveTable() ; 
01664    // cout<<"_simStep est egale (2) :"<<_simStep<<endl;
01665     _simStep = 0 ;
01666   }
01667 }
01668 
01669 void Svm::removeProcessFromActiveTable()
01670 {
01671 for ( NameToPointerMap<Process>::iterator processIterator = _disconnectedTable->begin () ;
01672             processIterator != _disconnectedTable->end () ;
01673             ++processIterator 
01674             )
01675         {
01676           _activeProcessTable->erase( processIterator->first ) ;
01677          // cout<<"le process "<<processIterator->first<<" a ete vire de la liste active"<<endl;
01678         }
01679 // _temporaryTable->clear() ;
01680 // _simStep = 0 ;
01681 }
01682 
01684 
01685 
01686 // void Svm::synchroniseReceiveAndProcessMessages (PvmController & parsingController, const PvmMessage::MessageTag tag) 
01687 // {
01688 //   NameToPointerMap<Process> aRecevoir ;
01689   
01690   
01691 //   //First make a list of processes that we need to receive values from//   //First make a list of processes that we need to receive values from
01692 
01693 //   for (   NameToPointerMap<Process>::iterator pOMKsE = _processTable->begin () ; 
01694 //        pOMKsE != _processTable->end () ; 
01695 //        pOMKsE ++) 
01696 //     {
01697 //       if ( (pOMKsE->first != _siteName) &&
01698 //         ( pOMKsE->second->getDateOfLastMessage () +
01699 //           pOMKsE->second->getPeriod () +
01700 //           _synchronisationLatency < parsingController.getSimulatedDate ()))
01701 //      {
01702 //        aRecevoir.addObjectWithIndex (pOMKsE->first, pOMKsE->second) ;
01703 // #ifdef _DEBUGPVMMESS
01704 //        cerr << "Svm::synchroniseReceiveAndProcessMessages: trying reception from: "
01705 //             << pOMKsE->first << endl ;
01706 // #endif
01707 //      }
01708 //     }
01709   
01710 //   PvmIncomingMessage receiveBuffer ;
01711 //   PvmMessage::MessageTag receivedRequest ;  
01712 //   Process * receivingProcess ;
01713   
01714 //   struct timeval start_timer,end_timer;
01715 //   gettimeofday(&start_timer, NULL);
01716 //   int time_in_ms = 0;
01717 //   while ( (time_in_ms < 100) && !aRecevoir.empty() ) 
01718 //     { 
01719 //       for ( NameToPointerMap<Process>::iterator processIterator = aRecevoir.begin () ;
01720 //          processIterator != aRecevoir.end () ;
01721 //          ++processIterator 
01722 //          )
01723 //      {
01724           
01725 //        //this new implementation doesn't do blocking receives, as some processes could send urgent requests
01726 //        receivingProcess = processIterator->second ;
01727          
01728 //        receivedRequest = receivingProcess->getSvmLink ()->testForAnyMessage ( receiveBuffer ) ;
01729          
01730 //        if ( receiveBuffer.hasMessage() )
01731 //          {
01732               
01733 //            //---------------------------chadi ajout 4/6 ---------------------
01734 //            //  aRecevoir.erase ( receivingProcess->getProcessName() ) ;
01735 //            // start = clock();
01736               
01737 //            if (_disconnectedTable->find(processIterator->second->getProcessName())!=_disconnectedTable->end())
01738 //              {
01739 //                //    removeFromDisconnectedTable(processIterator);
01740 //                _disconnectedTable->erase( processIterator->second->getProcessName() );
01741 //                cout<<" j'enleve de la liste de deconnecte le processus suivant: "<<processIterator->first<<endl;
01742 //              }
01743 //            //-----------------------fin chadi ajout 4/6 -------------------------
01744 //  if (receiveBuffer.getMessageDate()+90 < parsingController.getSimulatedDate())
01745 //    { }
01746 //  else {
01747 //             switch (receivedRequest)
01748 //              {
01749 //              case PvmMessage::SynchronisationMessage :
01750 //                {
01751 //                  // To filter ancient synchro messages
01752 //                   if (receiveBuffer.getMessageDate()+50 < parsingController.getSimulatedDate())
01753 //                     { }
01754 //                   else {
01755 
01756 //                                      //have the controller process the message
01757 //                      parsingController.parseSynchronisationMessage ( &receiveBuffer ) ;
01758                         
01759 //                      receivingProcess->setDateOfLastMessage ( receiveBuffer.getMessageDate () ) ;
01760                         
01761 //                              //an additionnal test could be performed here to make sure correct date of last message has happened
01762 //                      aRecevoir.erase ( receivingProcess->getProcessName() ) ;
01763 //                    }
01764                     
01765 //                }
01766 //              break ;
01767 //              case PvmMessage::MirrorNeedsInitialValues :
01768 //                {
01769 //                  parsingController.sendInitialValuesToMirror ( receiveBuffer ) ;      
01770 //                }
01771 //              break ;
01772 //              case PvmMessage::InitialValuesForMirror :
01773 //                {
01774 //                  //have the controller process the message
01775 //                  parsingController.parseSynchronisationMessage ( &receiveBuffer ) ;
01776                     
01777 //                }
01778 //              break ;
01779 //              case PvmMessage::LocalInitSuccessfull :
01780 //              case PvmMessage::ProcessTable :
01781 //              case PvmMessage::EndOfSimulation :
01782 //              case PvmMessage::EnterBarrier :
01783 //              case PvmMessage::SiteName :
01784 //              case PvmMessage::ExitBarrier :
01785 //              case PvmMessage::NameServiceGetId :
01786 //              case PvmMessage::NameServiceGetString :
01787 //              case PvmMessage::NameServiceReturnId :
01788 //              case PvmMessage::NameServiceReturnString :
01789 //              case PvmMessage::NameServiceVerifyLocalNameServer :
01790 //              case PvmMessage::NameServiceVerifyResult :
01791 //              default:
01792 //                {
01793 //                  Controller::warning ("Svm::waitForAnswerToBlockingRequest(): received unexpected message",
01794 //                                         Controller::AllWarnings ) ;
01795 //                }
01796 //              } //end switch
01797 //    }
01798 //          }     
01799           
01800           
01801 //      } //end for
01802       
01803 //       gettimeofday(&end_timer, NULL);
01804       
01805 //       time_in_ms = ( (end_timer.tv_sec-start_timer.tv_sec) * 1000000 +
01806 //                   (end_timer.tv_usec-start_timer.tv_usec) )/1000 ;
01807       
01808 //   //    cout<<"la valeur de dif en millisec : "<<time_in_ms<<endl;
01809 //       //      cout<<"la valeur de start est : "<<start<<endl;
01810 //       //      cout<<"la valeur de end est : "<<end<<endl;
01811       
01812 //     }  //end while
01813   
01814 //   if (!aRecevoir.empty())   
01815 //     {
01816 //       for ( NameToPointerMap<Process>::iterator processIterator = aRecevoir.begin () ;
01817 //          processIterator != aRecevoir.end () ;
01818 //          ++processIterator 
01819 //          )
01820 //      {
01821 //        _disconnectedTable->addObjectWithIndex(processIterator->first, processIterator->second );
01822 //        aRecevoir.erase ( processIterator->first ) ;
01823 //        cout<<"je rajoute le processus suivant a la table des deconnectes "<<processIterator->second->getProcessName()<<endl;
01824 //      }        
01825 //     }
01826   
01827 // #ifdef _DEBUGPVMMESS
01828 //   cerr << "Svm::synchroniseReceiveAndProcessMessages : on n'est pas bloque la " << endl ;
01829 // #endif
01830 // }
01831 
01832 
01833 
01834 
01835 
01836 
01838 //-----------------------------------------------------------------------
01839 
01840 // void Svm::synchroniseReceiveAndProcessMessages (PvmController & parsingController, const PvmMessage::MessageTag tag) 
01841 // {
01842 //    NameToPointerMap<Process> aRecevoir ;
01843 
01844 
01845 //    //First make a list of processes that we need to receive values from
01846 //    for (   NameToPointerMap<Process>::iterator pOMKsE = _processTable->begin () ; 
01847 //         pOMKsE != _processTable->end () ; 
01848 //         pOMKsE ++) 
01849 //       {
01850 //       if ( (pOMKsE->first != _siteName) &&
01851 //            ( pOMKsE->second->getDateOfLastMessage () +
01852 //              pOMKsE->second->getPeriod () +
01853 //               _synchronisationLatency < parsingController.getSimulatedDate ())) // || (pOMKsE->second->getIterationFlag()>=1000) )) 
01854 // //------- chadi ajout 3/6-------)&&  !(disconnectedProcess(pOMKsE)))
01855 //          {
01856 //             aRecevoir.addObjectWithIndex (pOMKsE->first, pOMKsE->second) ;
01857 // #ifdef _DEBUGPVMMESS
01858 //             cerr << "Svm::synchroniseReceiveAndProcessMessages: trying reception from: "
01859 //                  << pOMKsE->first << endl ;
01860 // #endif
01861 //          }
01862 //       }
01863 
01864    
01865 //    PvmIncomingMessage receiveBuffer ;
01866 //    PvmMessage::MessageTag receivedRequest ;  
01867 //    Process * receivingProcess ;
01868    
01869 //    while (! aRecevoir.empty () ) 
01870 //       { 
01871 
01872 //       for ( NameToPointerMap<Process>::iterator processIterator = aRecevoir.begin () ;
01873 //             processIterator != aRecevoir.end () ;
01874 //             ++processIterator 
01875 //             )
01876 //          {
01877 //            //*****************chadi: faux deconnexion d'un site *****************
01878 //            //  if ( processIterator->second->getProcessName()== "visualProcessC" )
01879 //            //     {
01880 //               //  addToDisconnectedTable( processIterator );
01881 //            // _disconnectedTable->addObjectWithIndex(processIterator->first,processIterator->second  );
01882 //            //     }
01883 //            //*************************************************************
01884 //             //this new implementation doesn't do blocking receives, as some processes could send urgent requests
01885 //             receivingProcess = processIterator->second ;
01886 
01887 //             receivedRequest = receivingProcess->getSvmLink ()->testForAnyMessage ( receiveBuffer ) ;
01888                
01889 //             if ( receiveBuffer.hasMessage() )
01890 //                {
01891 
01892 // //---------------------------chadi ajout 4/6 ---------------------
01893                    
01894 //      //          cout<<"parsingController.getSimulatedDate = "<<parsingController.getSimulatedDate()<<endl;
01895 //      //          cout<<"receiveBuffer.getMessageDate = "<<receiveBuffer.getMessageDate()<<" en provenance du processus: "<<processIterator->second->getProcessName()<<endl;
01896 //      //          cout<<"_synchronisationLatency = "<<_synchronisationLatency<<endl;
01897 //      //          cout<<"la periode de "<<processIterator->second->getProcessName()<<" est egale a "<< processIterator->second->getPeriod()<<endl;
01898 
01899 //              //    cout<<"difference de temps ou pas : "<<receiveBuffer.getMessageDate() - receiveBuffer.getDateOfLastMessage()<<endl;
01900 
01901 //      //          cout<<"iterateur->first : " <<processIterator->first<<" iterateur->second : "<<processIterator->second<<endl;
01902                     
01903 //                  if (_disconnectedTable->find(processIterator->second->getProcessName())!=_disconnectedTable->end())
01904 //                    {
01905 //                      processIterator->second->resetIterationFlag();
01906 //                   // aRecevoir.addObjectWithIndex (processIterator->first, processIterator->second) ;
01907 //                      cout<<"j'enleve de la liste des deconnectes le processus suivant: "<<processIterator->second->getProcessName()<<endl;
01908 //                      removeFromDisconnectedTable(processIterator);
01909 //                    }
01910 
01911                      
01912 // //-----------------------fin chadi ajout 4/6 -------------------------
01913                      
01914 //                   switch (receivedRequest)
01915 //                      {
01916 //                      case PvmMessage::SynchronisationMessage :
01917 //                        {
01918 
01919 //                           if (receiveBuffer.getMessageDate()+90 < parsingController.getSimulatedDate())
01920 //                          {
01921 //                          //cout<<"c pas un bon signe ..."<<endl;
01922 //                          }
01923 //                          else
01924 //                           {
01925 
01926 //                              //have the controller process the message
01927 //                              parsingController.parseSynchronisationMessage ( &receiveBuffer ) ;
01928                               
01929 //                              receivingProcess->setDateOfLastMessage ( receiveBuffer.getMessageDate () ) ;
01930                               
01931 //                              //an additionnal test could be performed here to make sure correct date of last message has happened
01932 //                              aRecevoir.erase ( receivingProcess->getProcessName() ) ;
01933 //                             }
01934 //                        }
01935 //                      break ;
01936 //                      case PvmMessage::MirrorNeedsInitialValues :
01937 //                         {
01938 //                            parsingController.sendInitialValuesToMirror ( receiveBuffer ) ;      
01939 //                         }
01940 //                      break ;
01941 //                      case PvmMessage::InitialValuesForMirror :
01942 //                         {
01943 //                            //have the controller process the message
01944 //                            parsingController.parseSynchronisationMessage ( &receiveBuffer ) ;
01945                               
01946 //                         }
01947 //                      break ;
01948 //              //      case PvmMessage::updateProcessTable :
01949 //                        {
01950 //                          cout<<"ProcessTable Will be Updated ....."<<endl;
01951 //                          _processTable->addObjectWithIndex("visualProcessC", new Process ( "visualProcessTest", "maxipes.irisa.fr" )) ;
01952 //                          cout<<"ProcessTable Updated Succeffully ....."<<endl;
01953 //                          ++_numberOfSlaves ;
01954 //                        }
01955 //                      break ;
01956 //                      case PvmMessage::LocalInitSuccessfull :
01957 //                      case PvmMessage::ProcessTable :
01958 //                      case PvmMessage::EndOfSimulation :
01959 //                      case PvmMessage::EnterBarrier :
01960 //                      case PvmMessage::SiteName :
01961 //                      case PvmMessage::ExitBarrier :
01962 //                      case PvmMessage::NameServiceGetId :
01963 //                      case PvmMessage::NameServiceGetString :
01964 //                      case PvmMessage::NameServiceReturnId :
01965 //                      case PvmMessage::NameServiceReturnString :
01966 //                      case PvmMessage::NameServiceVerifyLocalNameServer :
01967 //                      case PvmMessage::NameServiceVerifyResult :
01968 //                      case PvmMessage::updateProcessTable :
01969 //                      case PvmMessage::addNewSite :
01970                 
01971 //                      default:
01972 //                         {
01973 //                            Controller::warning ("Svm::waitForAnswerToBlockingRequest(): received unexpected message",
01974 //                                                   Controller::AllWarnings ) ;
01975 //                         }
01976 //                      }
01977                   
01978 //                }
01979 // //-------------------------------chadi ajout 5/6 ----------------------
01980 
01981 //             else
01982 
01983 //                  {
01984 //                    //   cout<<"nb iteration avant inc = "<<processIterator->second->getIterationFlag()<<endl;
01985                       
01986 //                    processIterator->second->incrementeIterationFlag();
01987                       
01988 //                    //   cout<<"nb iteration apres inc = "<<processIterator->second->getIterationFlag()<<endl;
01989                       
01990 //                    // if ( processIterator->second->disconnectedProcess())
01991 //                    if ( processIterator->second->getIterationFlag() >= 200000)
01992 
01993 //                  //  if (receivingProcess->getDateOfLastMessage()+50 < parsingController.getSimulatedDate())
01994 //                      {
01995 //                      //  addToDisconnectedTable( processIterator );
01996 //                        _disconnectedTable->addObjectWithIndex(processIterator->first,processIterator->second  );
01997 //                        aRecevoir.erase( processIterator->second->getProcessName());
01998 //                       // cout<<"je rajoute le processus suivant a la table des deconnectes "<<processIterator->second->getProcessName()<<" car processIterator->second->getIterationFlag()="<<processIterator->second->getIterationFlag()<<endl;
01999 //                      //  cout<<"getDateOfLastMessage()+50 = "<< receivingProcess->getDateOfLastMessage()+50<<" tandis ke parsingController.getSimulatedDate = "<<parsingController.getSimulatedDate()<<endl;
02000 //                      //  cout<<"processIterator->second->getIterationFlag() ="<<processIterator->second->getIterationFlag()<<endl;
02001 //                      }
02002 //                  }
02003                
02004 // //-----------------------------fin chadi ajout 5/6 -----------------------
02005                
02006 //          } //end for
02007 //       }  //end while
02008    
02009    
02010 // #ifdef _DEBUGPVMMESS
02011 //    cerr << "Svm::synchroniseReceiveAndProcessMessages : on n'est pas bloque la " << endl ;
02012 // #endif
02013 // }
02014 
02015 
02016 //-----------------------------------------------------------------------
02017 
02018 
02019 //----------------------- The Original One ------------------------------------------------
02020 
02021 void Svm::synchroniseReceiveAndProcessMessages (PvmController & parsingController, const PvmMessage::MessageTag tag) 
02022 {
02023    //cout<<"GGGGG"<<endl;
02024    NameToPointerMap<Process> aRecevoir ;
02025 
02026    //First make a list of processes that we need to receive values from
02027    for (   NameToPointerMap<Process>::iterator pOMKsE = _processTable->begin () ; 
02028            pOMKsE != _processTable->end () ; 
02029            pOMKsE ++) 
02030       {
02031          if ( (pOMKsE->first != _siteName) &&
02032               ( pOMKsE->second->getDateOfLastMessage () +
02033                 pOMKsE->second->getPeriod () +
02034                 _synchronisationLatency < parsingController.getSimulatedDate () ) ) 
02035             {
02036                aRecevoir.addObjectWithIndex (pOMKsE->first, pOMKsE->second) ;
02037 #ifdef _DEBUGPVMMESS
02038                cerr << "Svm::synchroniseReceiveAndProcessMessages: trying reception from: "
02039                     << pOMKsE->first << endl ;
02040 #endif
02041             }
02042       }
02043 
02044    
02045    PvmIncomingMessage receiveBuffer ;
02046    PvmMessage::MessageTag receivedRequest ;  
02047    Process * receivingProcess ;
02048    
02049    while (! aRecevoir.empty () ) 
02050       { 
02051          for ( NameToPointerMap<Process>::iterator processIterator = _processTable->begin () ;
02052                processIterator != _processTable->end () ;
02053                ++processIterator 
02054                )
02055             {
02056                //this new implementation doesn't do blocking receives, as some processes could send urgent requests
02057                receivingProcess = processIterator->second ;
02058 
02059                receivedRequest = receivingProcess->getSvmLink ()->testForAnyMessage ( receiveBuffer ) ;
02060                
02061                if ( receiveBuffer.hasMessage() )
02062                   {
02063                      // TDTD ajout de trace pour debug
02064                      //std::cerr << "buffer reçu : " << receivedRequest << " à date " << receiveBuffer.getMessageDate ()
02065                      //       << " de " << receivingProcess->getProcessName() << std::endl ;
02066                      switch (receivedRequest)
02067                         {
02068                         case PvmMessage::SynchronisationMessage :
02069                            {
02070                               //have the controller process the message
02071                               parsingController.parseSynchronisationMessage ( &receiveBuffer ) ;
02072                               
02073                               receivingProcess->setDateOfLastMessage ( receiveBuffer.getMessageDate () ) ;
02074                               
02075                               //an additionnal test could be performed here to make sure correct date of last message has happened
02076                               aRecevoir.erase ( receivingProcess->getProcessName() ) ;
02077                            }
02078                         break ;
02079                         case PvmMessage::MirrorNeedsInitialValues :
02080                            {
02081                               parsingController.sendInitialValuesToMirror ( receiveBuffer ) ;      
02082                            }
02083                         break ;
02084                         case PvmMessage::InitialValuesForMirror :
02085                            {
02086                               //have the controller process the message
02087                               parsingController.parseSynchronisationMessage ( &receiveBuffer ) ;
02088                               
02089                            }
02090                         break ;
02091                         case PvmMessage::LocalInitSuccessfull :
02092                         case PvmMessage::ProcessTable :
02093                         case PvmMessage::EndOfSimulation :
02094                         case PvmMessage::EnterBarrier :
02095                         case PvmMessage::SiteName :
02096                         case PvmMessage::ExitBarrier :
02097                         case PvmMessage::NameServiceGetId :
02098                         case PvmMessage::NameServiceGetString :
02099                         case PvmMessage::NameServiceReturnId :
02100                         case PvmMessage::NameServiceReturnString :
02101                         case PvmMessage::NameServiceVerifyLocalNameServer :
02102                         case PvmMessage::NameServiceVerifyResult :
02103                         default:
02104                            {
02106                               std::cerr << "Svm::waitForAnswerToBlockingRequest(): received unexpected message" << std::endl ;
02107                               /* END CHADI */
02108                            }
02109                         }
02110                   }
02111             }
02112          // TDTD : ajout d'un arrêt pour pouvoir travailler avec PVM sur une seule machine et plusieurs process
02113          if (_yieldNeeded) {
02114 #ifdef _MSC_VER
02115                  Sleep( 0 ) ;
02116 #else
02117             usleep (0) ;
02118 #endif
02119          }
02120       }
02121          
02122    
02123 #ifdef _DEBUGPVMMESS
02124    cerr << "Svm::synchroniseReceiveAndProcessMessages : on n'est pas bloque la " << endl ;
02125 #endif
02126 }
02127 
02128 
02129 //-----------------------------------------------------------------------------
02130 
02131 
02132 
02133 void Svm::sendCurrentBuffersWithTag(const PvmMessage::MessageTag tag) 
02134 {
02135    NameToPointerMap<Process>::iterator pOMKs ; // Iterateur pour parcours tableau
02136    SvmLink * canal ; // Canal vers le precedent processus
02137    static int nbSteps = 1 ;
02138 #ifdef _DEBUGPVMMESS
02139    cerr<<"Svm::sendCurrentBufferWithTag tag "<<tag<<endl<<"Processus : "<<endl;
02140    NameToPointerMap<Process>::iterator pOMK ;
02141    for (pOMK = _processTable->begin () ; pOMK != _processTable->end () ; pOMK ++) 
02142      {
02143        cerr<<(*pOMK).second->getProcessName ();
02144      }
02145    cerr<<endl;
02146 #endif
02147 
02148    if (_deadReckoningInterval > 0) {
02149       nbSteps = (nbSteps + 1) % _deadReckoningInterval ;
02150    }
02151 
02152    // TDTD on envoie seulement les synchros aux actifs...
02153    for (pOMKs = _processTable->begin () ; pOMKs != _processTable->end () ; pOMKs ++) 
02154       //for (pOMKs = _activeProcessTable->begin () ; pOMKs != _activeProcessTable->end () ; pOMKs ++) 
02155 // fin TDTD
02156      {
02157 #ifdef _DEBUGPVMMESS
02158        cerr<<"Traitement des emissions vers "<<(*pOMKs).second->getProcessName ()<<endl;
02159 #endif
02160        if (_siteName != (*pOMKs).second->getProcessName ()) 
02161          {
02162 
02163            // On connait alors le canal a observer
02164            canal = (*pOMKs).second->getSvmLink () ;
02165 
02166            if ( tag == PvmMessage::SynchronisationMessage ) 
02167              {
02168                 (*pOMKs).second->getSvmLink ()->getOutgoingBuffer()<<SynchronisationMessage::endOfSynchronisationFragment ;
02169               
02170              }
02171 
02172            // On transmet le message du canal
02173 
02174 // TDTD ajout pour purger les envois aux inactifs
02175            //std::cerr << "sur site : " << _siteName << " : " ;
02176            if (_activeProcessTable->find ((*pOMKs).second->getProcessName ()) != _activeProcessTable->end ()) {
02177               //std::cerr << "envoi de syncho à actif : " << (*pOMKs).second->getProcessName () ;
02178               canal->sendOutgoingBuffer ( tag ) ;
02179            } else {
02180               if (nbSteps == 0) { // un coup de dead-reckoning ?
02181                  //std::cerr << "envoi de syncho à inactif : " << (*pOMKs).second->getProcessName () ;
02182                  canal->sendOutgoingBuffer ( tag ) ;             
02183               } else {
02184                  canal->getOutgoingBuffer().flushCurrentBuffer () ;
02185                  //std::cerr << "pas d'envoi de syncho à inactif : " << (*pOMKs).second->getProcessName () ;
02186               }
02187                }
02188            //std::cerr << endl ;
02189 // fin TDTD
02190 #ifdef _DEBUGPVMMESS
02191            cerr << "OMKmSvm::transmission avec tag : "
02192                 << tag << endl ;
02193 #endif
02194          }
02195 #ifdef _DEBUGPVMMESS
02196        else 
02197          {
02198            cerr << "Rien à faire "<<endl ;
02199            //canal->affiche () ;
02200          }
02201 #endif
02202      }
02203 }
02204 
02205 //-----------------------------------------------------------------------
02206 
02207 void Svm::waitForMessageFrom (PvmController & parsingController,
02208                                  const Name & processName,
02209                                  const PvmMessage::MessageTag tag) 
02210 {
02211    SvmLink * canal ; // Canal vers le precedent processus
02212    PvmIncomingMessage * messageRecu = NULL ;
02213    
02214    // On recupere le canal a observer
02215    canal = getLinkToProcessNamed (processName) ; 
02216 #ifdef _DEBUGPVMMESS
02217    cout << "OMKmSvm::waitForMessageFrom sur "
02218         << processName << " tag : "
02219         << tag << endl ;
02220    canal->printDebuggingInformation() ;
02221 #endif
02222    // On recoit le message en bloquant
02223    messageRecu = & canal->waitForMessage (tag) ;
02224    // On envoie le message et l'objet au controleur local
02225    parsingController.parseSynchronisationMessage (messageRecu) ;
02226    // On libere le message ???
02227    //delete messageRecu;
02228 }
02229 
02230 
02231 //-----------------------------------------------------------------------
02233 
02234 void Svm::timestampCurrentSendBuffers (const Date & date) 
02235 {
02236    NameToPointerMap<Process>::iterator i ;
02237    for (i = _processTable->begin () ; i != _processTable->end () ; i ++) 
02238      {
02239         if (_siteName != (*i).second->getProcessName ())
02240            { 
02241               i->second->getSvmLink ()->getOutgoingBuffer ().insertTimeStamp ( date ) ;
02242            }
02243      }
02244 }
02245 
02246 
02247 const Name & Svm::getSiteName() const
02248 {
02249    return _siteName;
02250 }
02251 
02252 //-----------------------------------------------------------------------
02253 
02254 Process * Svm::waitForAnswerToBlockingRequest (PvmController & parsingController, PvmMessage::MessageTag tag)
02255 {
02256    // this implementation assumes that only one thread at a time will enter this function
02257    // numberOfThread is an unsafe method to test this
02258    static int numberOfThreads = 0 ;
02259    ++numberOfThreads ;
02260    assert ( numberOfThreads == 1) ;
02261 
02262    Process * result ;
02263 
02264    PvmIncomingMessage receiveBuffer ;
02265    std::pair<PvmMessage::MessageTag,  int> receivedRequest ;  
02266    
02267    bool serving = true ;
02268 
02269    while ( serving ) 
02270       {
02271 
02272       receivedRequest = waitForAnyRequests ( receiveBuffer ) ;
02273 
02274       //find the sender process (result of this member function)
02275       map<int,Process *>::iterator i = _idToProcessTable.find ( receivedRequest.second ) ;
02276       
02277    //   cout<<"receivedRequest.second est egale : "<<receivedRequest.second<<endl;
02278    //   cout<<"receivedRequest.first est egale : "<<receivedRequest.first<<endl;
02279       // the message should come from a known process
02280       assert ( i != _idToProcessTable.end() ) ;
02281       
02282       result = i->second ;
02283       
02284       switch (receivedRequest.first)
02285         {
02286         case PvmMessage::SynchronisationMessage :
02287           {
02288             //have the controller process the message
02289             parsingController.parseSynchronisationMessage ( &receiveBuffer ) ;
02290             
02291             assert ( result != NULL ) ;
02292             
02293             result->setDateOfLastMessage ( receiveBuffer.getMessageDate () ) ;
02294           }
02295         break ;
02296         case PvmMessage::MirrorNeedsInitialValues :
02297           {
02298             parsingController.sendInitialValuesToMirror ( receiveBuffer ) ;      
02299           }
02300         break ;
02301         case PvmMessage::InitialValuesForMirror :
02302           {
02303             //have the controller process the message
02304             parsingController.parseSynchronisationMessage ( &receiveBuffer ) ;
02305             
02306           }
02307         break ;
02308         case PvmMessage::LocalInitSuccessfull :
02309         case PvmMessage::ProcessTable :
02310         case PvmMessage::EndOfSimulation :
02311         case PvmMessage::EnterBarrier :
02312         case PvmMessage::SiteName :
02313         case PvmMessage::ExitBarrier :
02314         case PvmMessage::NameServiceGetId :
02315         case PvmMessage::NameServiceGetString :
02316         case PvmMessage::NameServiceReturnId :
02317         case PvmMessage::NameServiceReturnString :
02318         case PvmMessage::NameServiceVerifyLocalNameServer :
02319         case PvmMessage::NameServiceVerifyResult :
02320         case PvmMessage::updateProcessTable :
02321         case PvmMessage::addNewSite :
02322         default:
02323           {
02328           }
02329         }
02330       
02331       serving = ( receivedRequest.first != tag ) ;
02332       
02333       }
02334    
02335    --numberOfThreads ;
02336 
02337    return result ;
02338 }
02339 
02340 //------------------------------------------------------------------------------------------
02341 
02342 //----------------chadi ajout 6/6--------------------------------------------------------
02343 
02344 void Svm::addToDisconnectedTable( Name& nameOfProcess , Process* pointerToTheProcess )
02345   
02346 {
02347 //***************************
02348 //  if ( processIterator->second->getProcessName()== "visualProcessC" )
02349 //    { }
02350 //  else {
02351 //****************************
02352 
02353 
02354   _disconnectedTable->addObjectWithIndex(nameOfProcess, pointerToTheProcess );
02355  // _everybodyTable->erase(processIterator->second->getProcessName());
02356 // }
02357   _troubleFlag = true ;
02358 }
02359 
02360 void Svm::removeFromDisconnectedTable( NameToPointerMap<Process>::iterator processIterator )
02361  {
02362 //***************************
02363 //  if ( processIterator->second->getProcessName()== "visualProcessC" )
02364 //    { }
02365 //  else {
02366 //****************************
02367 //   _everybodyTable->addObjectWithIndex(processIterator->first,processIterator->second  );
02368   
02369    _disconnectedTable->erase( processIterator->second->getProcessName() );
02370 // }
02371    _troubleFlag = false ;
02372   }
02373 
02374 //-----------------------fin chadi ajout 6/6 ---------------------------------------------
02375 
02376 
02377 bool Svm::disconnectedProcess(Name& zico)
02378 {
02379   if (_disconnectedTable->find(zico)!=_disconnectedTable->end())
02380     { return true;}
02381   else
02382     return false;
02383 }
02384 
02385 //-------------------------------------------------------------------------------------------
02386 
02387 bool Svm::getTroubleFlag ()
02388 {
02389   if (_troubleFlag == true)
02390     { return true; }
02391   return false;
02392 }
02393 
02394 //-------------------------------------------------------------------
02395 
02396 bool Svm::disconnectedTableIsEmpty ()
02397 {
02398   if ( _disconnectedTable->empty() )
02399     {
02400       return true;
02401     }
02402   return false;
02403 }
02404 
02405 //-------------------------------------------------------------------
02406 
02407 bool Svm::reconnexionEstablished ()
02408 {
02409  NameToPointerMap<Process>::iterator prIt = _disconnectedTable->begin () ;
02410           
02411 //       if ( testIfSiteRecovered( (*prIt).second->getHostMachineName() ) )
02412  if ( testIfSiteRecovered( (*prIt).second ) )
02413          {
02414           // cout<<" Svm::reconnexionEstablished OK"<<endl;
02415            return true ;
02416          }
02417         
02418  else 
02419    {
02420     // cout<<" Svm::reconnexionEstablished NOTOK"<<endl;
02421      return false ;
02422    }
02423          
02424 }
02425 
02426 //-----------------------------------------------------------------
02427 
02428 void Svm::sendPingMessage ()
02429 {
02430  for ( NameToPointerMap<Process>::iterator prIt = _disconnectedTable->begin () ;
02431             prIt != _disconnectedTable->end () ;
02432             ++prIt 
02433             )
02434         {
02435           ((*prIt).second)->getSvmLink()->getOutgoingBuffer()<<"ping" ;
02436           ((*prIt).second)->getSvmLink()->getOutgoingBuffer().send(PvmMessage::pingMessage) ;
02437           cout<<"message ping est fait "<<endl;
02438         }
02439 }
02440 
02441 //-----------------------------------------------------------------
02442 // ajouts TDTD pour gestion plus précise des déconnectés
02443 //-----------------------------------------------------------------
02444 
02445 std::list<Name> Svm::getDisconnectedProcessusList () {
02446    _disconnectedProcessusList.clear () ;
02447    for (NameToPointerMap<Process>::iterator prIt = _disconnectedTable->begin () ;
02448         prIt != _disconnectedTable->end () ;
02449         prIt ++) {
02450       _disconnectedProcessusList.push_back ((*prIt).first) ;
02451    }
02452    return _disconnectedProcessusList ;
02453 }
02454 
02455 //-----------------------------------------------------------------
02456 
02457 std::map<Name, int> Svm::getDisconnectedProcessusMap () {
02458    return _disconnectedProcessusMap ;
02459 }
02460 
02461 //----------------------fin chadi ajout 2/3 --------------------------

logo OpenMask

Documentation generated on Mon Jun 9 11:45:57 2008

Generated with doxygen by Dimitri van Heesch ,   1997-2007