OMK::Svm Class Reference

Defines an abstract message passing virtual machine. More...

#include <OMKSvm.h>

Inheritance diagram for OMK::Svm:

Inheritance graph
[legend]
Collaboration diagram for OMK::Svm:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 Svm (NameToPointerMap< Process > *tab, const Date &latence, const Date &timeOut, const int deadReckoningInterval, const bool yieldNeeded)
 constructor
virtual ~Svm ()
 Destructor.
virtual void init (const Date &initialSimulationDate)
 initialise the distributed virtual machine
virtual void syncDistributedSites ()
 sync all the distributed sites.
virtual void connectToDistributedSimulation ()
 connect the distributed virtual machine to its peers
virtual void createDistributedSimulation (const Date &initialSimulationDate)
 create a distributed simulation
virtual void broadcast (PvmMessage::MessageTag, PvmMulticastMessage *message=NULL)
 broadcast a message
virtual void synchronizeOn (PvmController &, PvmMessage::MessageTag tag)
 enter a synchronisation barrier, between all distributed sites except the central site this is a blocking call, but the virtual machine still answer requests.
virtual void disconnectFromDistributedSimulation (const Date &deconnexionDate)
 Disconnect the site from the distributed simulation.
virtual const NamegetSiteName () const
 get the site name in the simulation
virtual SvmLinkgetLinkToProcessNamed (const Name &processName)
 get acces to the link to a given process
virtual ProcessgetProcessDescriptorNamed (const Name &processName)
 get acces to a given process descriptor
virtual ProcesstestIfDisconnectedProcessNamed (const Name &processName)
virtual void testIfNewProcessAdded ()
virtual void processReceivedMessages (PvmController &parsingController, const PvmMessage::MessageTag tag)
 Attempt to receive and process messages from all sites, in a non blocking fashion.
virtual void waitAndProcessMessages (PvmController &parsingController, const PvmMessage::MessageTag tag)
 receive and process messages from all sites, and wait for the messages to arrive if necessary
virtual void waitForMessage (PvmController &parsingController, const PvmMessage::MessageTag tag)
 wait until a message has arrived from one site
virtual void waitForMessageFrom (PvmController &parsingController, const Name &processName, const PvmMessage::MessageTag tag)
 receive a message from a particular process
virtual void synchroniseReceiveAndProcessMessages (PvmController &parsingController, const PvmMessage::MessageTag tag)
 receive and process messages from all sites (except the central site), and wait until at least a recent (as defined by latency) message has arrived from each of these sites
virtual void relaxedSynchroniseReceiveAndProcessMessages (PvmController &parsingController, const PvmMessage::MessageTag tag)
virtual ProcesswaitForAnswerToBlockingRequest (PvmController &parsingController, PvmMessage::MessageTag tag)
 receive and process messages from all sites (except the central site), and wait until at least on message of tag tag has arrived
virtual void sendCurrentBuffersWithTag (const PvmMessage::MessageTag tag)
 send the messages contained in the send buffers of all links
virtual void timestampCurrentSendBuffers (const Date &date)
 timestamp the messages contained in the send buffers of all links
virtual const DategetSynchronisationLatency ()
 get the authorized latency for the relaxed receive
virtual void addToDisconnectedTable (Name &, Process *)
virtual void removeFromDisconnectedTable (NameToPointerMap< Process >::iterator)
virtual bool disconnectedProcess (Name &)
virtual bool getTroubleFlag ()
virtual bool disconnectedTableIsEmpty ()
virtual void addNewSiteToSimulation (const Date &)
virtual void addNewSiteRequest (const Date &date)
 added by chadi
virtual bool reconnexionEstablished ()
virtual bool testIfSiteRecovered (Process *p)=0
virtual void sendPingMessage ()
virtual void removeProcessFromActiveTable ()
virtual void removeProcessProlog ()
virtual std::list< NamegetDisconnectedProcessusList ()
virtual std::map< Name, intgetDisconnectedProcessusMap ()

Public Attributes

PvmIncomingMessagenpp
Date dateVar
bool _troubleFlag
int _simStep

Protected Member Functions

virtual void addNewWorkstation (const Name &m)=0
 add a new machine to the virtual machine
virtual void removeWorkstation (const Name &m)=0
 remove a work station from the virtual machine
virtual int spawnProcess (Process *p)=0
 spawn a copy of this proc'ess on another site
virtual SvmLinkcreateSvmLink (const int &d=0)=0
 create a link to a distant site
virtual int getParentSiteId ()=0
 get the site id having spanned this process.
virtual int getSiteId ()=0
 get the id in the svm of this site
virtual void groupBarrier (std::string groupName, int numberToJoin)=0
 block the calling process until numberToJoin processes of group groupName have called groupBarrier
virtual void broadcastToGroup (std::string groupName, PvmMessage::MessageTag)=0
 broadcast an empty tagged message to all members of a broadcast group
virtual void initBeforeMessagePacking ()=0
 do any usefull initialisations before messages are packed
virtual int nonblockingReceive (PvmMessage::MessageTag tag)=0
 try and receive message in a non-blocking fashion the received messages are lost
virtual std::pair< PvmMessage::MessageTag,
int
waitForAnyRequests (PvmIncomingMessage &receiveBuffer)=0
 wait for any message sent to this site.
virtual void joinSvmGroup (std::string &groupName)=0
 join a synchronisation group join a group of processes in the siames virtual machine
virtual void serveNameRequestsUntilEnd ()
 for the central site: go into server mode, receiving and processing any reveived request until all sites have disconnected

Protected Attributes

std::list< Name_disconnectedProcessusList
std::map< Name, int_disconnectedProcessusMap
NameToPointerMap< Process > * _processTable
 map containing all information about the different processes
std::vector< intprocessSiteIds
NameToPointerMap< Process > * _disconnectedTable
NameToPointerMap< Process > * _activeProcessTable
NameToPointerMap< Process > * _temporaryTable
NameToPointerMap< ProcessaRecevoir
bool firstTime
std::map< int, Process * > _idToProcessTable
 map storing correspondance between processId and Process
int _siteId
 workstation identifier in the svm
SvmLink_linkToCentralSite
 the link to the central site
Name _siteName
 Name of the process associated to this site.
int _numberOfSlaves
 number of slaves ( PvmControllers ) in the simulation
const Date _synchronisationLatency
 in millisecond, the authorized latency when synchronising all the sites
const Date _synchronisationTimeOut
 in millisecond, the authorized timeOut for relaxed synchronizing
const Date _deadReckoningInterval
 in int, the dead reckoning time step interval
const bool _yieldNeeded
 in int, the dead reckoning time step interval

Static Protected Attributes

static std::string _slaveGroupName
 name of the group all the slave processes join
static std::string _masterSiteName
 reserved name of the master distributed site

Detailed Description

Defines an abstract message passing virtual machine.

Author:
Siames
Version:
2.1

Definition at line 43 of file OMKSvm.h.


Constructor & Destructor Documentation

Svm::Svm ( NameToPointerMap< Process > *  tab,
const Date latence,
const Date timeOut,
const int  deadReckoningInterval,
const bool  yieldNeeded 
)

constructor

Parameters:
tab a table containing the a map between a processName and the data structure definig a process
latence the authorized synchronisation latency

Definition at line 58 of file OMKSvm.cxx.

References _activeProcessTable, _disconnectedTable, _processTable, _temporaryTable, aRecevoir, and firstTime.

00059                                                                          : 
00060 //   _processTable ( tab ),
00061 //   _activeProcessTable (tab ),
00062    _troubleFlag ( false ),
00063    _simStep ( 0 ),
00064    _siteId ( -1 ), //negative values indicates an error
00065    _numberOfSlaves ( 0 ),
00066    _synchronisationLatency (latence),
00067    _synchronisationTimeOut (timeOut),
00068    _deadReckoningInterval (deadReckoningInterval),
00069    _yieldNeeded (yieldNeeded)
00070 {
00071 _processTable = new NameToPointerMap<Process>();
00072    for ( NameToPointerMap<Process>::iterator processIterator = tab->begin () ;
00073             processIterator != tab->end () ;
00074             ++processIterator 
00075             )
00076         {
00077           _processTable->addObjectWithIndex(processIterator->first, processIterator->second );
00078  
00079         }
00080 
00081 //--------------chadi ajout 1/6 ------------------------
00082   _temporaryTable = new  NameToPointerMap<Process>();
00083   _disconnectedTable = new  NameToPointerMap<Process>();
00084   _activeProcessTable = new  NameToPointerMap<Process>();
00085   _disconnectedTable->clear();
00086    for ( NameToPointerMap<Process>::iterator processIterator = _processTable->begin () ;
00087             processIterator != _processTable->end () ;
00088             ++processIterator 
00089             )
00090         {
00091           _activeProcessTable->addObjectWithIndex(processIterator->first, processIterator->second );
00092  
00093         }
00094   
00095    firstTime = true;
00096  aRecevoir.clear();
00097 //--------------fin chadi ajout 1/6 ------------------------
00098 // TDTD ajout pour simuler des défaillances réseau
00099 #ifdef _MSC_VER
00100         srand( getpid() ) ;
00101 #else
00102         srandom (getpid ()) ;
00103 #endif
00104 }

Svm::~Svm (  )  [virtual]

Destructor.

Definition at line 107 of file OMKSvm.cxx.

00108 {
00109 }


Member Function Documentation

void Svm::init ( const Date initialSimulationDate  )  [virtual]

initialise the distributed virtual machine

Definition at line 119 of file OMKSvm.cxx.

References connectToDistributedSimulation(), createDistributedSimulation(), and getParentSiteId().

Referenced by OMK::PvmController::PvmController().

00120 {
00121    if ( getParentSiteId () == 0 )  // wasn't created by a spawn
00122      {
00123        createDistributedSimulation ( initialSimulationDate ) ;
00124      }
00125    else
00126      {
00127        connectToDistributedSimulation () ;
00128      }
00129 }

void Svm::syncDistributedSites (  )  [virtual]

sync all the distributed sites.

blocks the calling process until all sites except the central site have called this member function

Definition at line 743 of file OMKSvm.cxx.

References _numberOfSlaves, _slaveGroupName, and groupBarrier().

Referenced by connectToDistributedSimulation(), and testIfNewProcessAdded().

00744 {
00745   groupBarrier( _slaveGroupName, _numberOfSlaves ) ;
00746 }

void Svm::connectToDistributedSimulation (  )  [virtual]

connect the distributed virtual machine to its peers

Definition at line 552 of file OMKSvm.cxx.

References _idToProcessTable, _linkToCentralSite, _numberOfSlaves, _processTable, _siteName, _slaveGroupName, createSvmLink(), OMK::Name::getNameServer(), getParentSiteId(), joinSvmGroup(), npp, OMK::PvmMessage::ProcessTable, OMK::Name::setNameServer(), OMK::Process::setSvmLink(), OMK::PvmMessage::SiteName, syncDistributedSites(), and OMK::SvmLink::waitForMessage().

Referenced by init().

00553 {
00554 //TDTD
00555 //#ifdef _DEBUGPVMEXEC
00556   cerr<<"Svm::connectToDistributedSimulation (): connecting to the distributed simulation"<<endl;
00557 //#endif
00558 
00559    PvmIncomingMessage * np = NULL ;
00560 
00561    // connect to the global controller
00562    _linkToCentralSite = createSvmLink ( getParentSiteId () ) ;
00563  
00564    // join the group of slave processes
00565    joinSvmGroup (_slaveGroupName) ;
00566  
00567    // create a distributed version of the nameServer
00568    PvmNameServer * pvmNameServer = new PvmNameServer ( getParentSiteId() , *Name::getNameServer() ) ; 
00569  
00570    Name::setNameServer ( pvmNameServer ) ; 
00571  
00572    // find out the name of this site
00573    np = & _linkToCentralSite->waitForMessage (PvmMessage::SiteName) ;
00574    *np >> _siteName ;
00575    npp = np ;
00576 
00577 //    cout<<"my site name is _siteName: "<<_siteName<<endl;
00578 //    NameToPointerMap<Process>::iterator j ;
00579 //    for (j = _processTable->begin () ; j != _processTable->end () ; j ++) 
00580 //      {
00581 //        cout<<"connect: _processTable contient: "<<(*j).second->getProcessName()<<endl; 
00582 //      }
00583    
00584 
00585  
00586    // find out the adresses of the other sites 
00587    // - extract the timestamp of the communication
00588    _linkToCentralSite->waitForMessage (PvmMessage::ProcessTable) ;
00589 
00590    // - extract, for each process, it's adress in the virtual machine and create a link
00591    NameToPointerMap<Process>::iterator i ;
00592    for (i = _processTable->begin () ; i != _processTable->end () ; i ++) 
00593      {
00594        ++_numberOfSlaves ; //we add ourselves, but as _numberOfSlaves is initialised to 0, it's ok
00595 
00596       // Remarque : creer un canal sur soi-meme n'est pas genant,
00597       // on met juste a jour les tids dans la table des pss
00598 
00599       Process * p = (*i).second ;
00600 
00601       // creation d'un canal sans destinataire 
00602       p->setSvmLink (createSvmLink ()) ;
00603 
00604       // maj du destinataire
00605       *np  >> *p ;
00606 
00607       _idToProcessTable.insert ( make_pair ( p->getSvmLink()->getTID(),p ) ) ;
00608 
00609       cout<<"le TID du process ajoute est : "<< p->getSvmLink()->getTID()<<" et son nom est :"<<p->getProcessName()<<endl; 
00610    }
00611 
00612 
00613    //wait for all other sites to be connected to the distributed simulation.
00614    //avoids sending messages to sites still connected or worse, not yet connected
00615    syncDistributedSites () ;
00616    cout<<"EEEE"<<endl;
00617 }

void Svm::createDistributedSimulation ( const Date initialSimulationDate  )  [virtual]

create a distributed simulation

Parameters:
initialSimulationDate,: the initial simualtion date needed to start timestamping mexchanged messages

add the specified machines to the siames virtual machine

Definition at line 359 of file OMKSvm.cxx.

References _idToProcessTable, _masterSiteName, _processTable, _siteName, addNewWorkstation(), createSvmLink(), OMK::SvmLink::getOutgoingBuffer(), OMK::Process::getSvmLink(), OMK::SvmLink::getTID(), OMK::PvmOutgoingMessage::insertTimeStamp(), processSiteIds, OMK::PvmMessage::ProcessTable, OMK::PvmMulticastMessage::send(), OMK::PvmOutgoingMessage::send(), serveNameRequestsUntilEnd(), OMK::Process::setSvmLink(), OMK::PvmMessage::SiteName, and spawnProcess().

Referenced by init().

00360 {
00361 #ifdef _DEBUGPVMEXEC
00362    cerr << "Svm::createDistributedSimulation : creating the distributed simulation" << endl ;
00363 #endif
00364    _siteName = _masterSiteName ;
00365 
00367      
00368      NameToPointerMap<Process>::iterator i ;
00369      
00370      for (i = _processTable->begin () ; i != _processTable->end () ; ++i ) 
00371        {
00372          addNewWorkstation ( (*i).second->getHostMachineName () ) ;
00373        }
00374 
00375      
00376      // creation des processus locaux
00377 #ifdef _DEBUGPVMEXEC
00378      cerr << "Creation des processus locaux..................." << endl ;
00379 #endif
00380      Process * processInfo ;
00381      int adrLoc ;
00382 
00383      //   std::vector<int> processSiteIds ;
00384      
00385      for (i = _processTable->begin () ; i != _processTable->end () ; i ++) 
00386        {
00387          processInfo = (*i).second ;
00388          
00389          // spawn a new process on the specified workstation
00390          adrLoc = spawnProcess (processInfo) ;
00391 
00392          cout<<"my TID is : "<<adrLoc<<endl ;
00393          
00394          // create a SvmLink, add it to processInfo
00395          processInfo->setSvmLink (createSvmLink (adrLoc)) ; 
00396          
00397          _idToProcessTable.insert ( make_pair ( adrLoc,processInfo ) ) ;
00398 
00399          processInfo->getSvmLink ()->getOutgoingBuffer().insertTimeStamp ( initialSimulationDate ) ;
00400 
00401          // send the name of the process (in the Svm) to the created process
00402          processInfo->getSvmLink ()->getOutgoingBuffer()<<(*i).first ;
00403          
00404          processInfo->getSvmLink ()->getOutgoingBuffer().send (PvmMessage::SiteName) ;
00405 
00406          //prepare processSiteIds for the next step
00407          processSiteIds.push_back ( processInfo->getSvmLink ()->getTID() ) ;
00408 
00409 #ifdef _DEBUGPVMMESS
00410          cerr<<typeid((*i).first).name()<<endl;
00411          cerr<<(*i).first<<" of id "<<processInfo->getSvmLink ()->getTID()<<endl;
00412 #endif
00413        }
00414      
00415 
00416      // Encodage des attributs Svm de chaque processus
00417      PvmMulticastMessage nameToIdMessage ( processSiteIds ) ;
00418      nameToIdMessage.insertTimeStamp ( initialSimulationDate ) ;
00419      
00420      for (i = _processTable->begin () ; i != _processTable->end () ; i ++) 
00421        {
00422          nameToIdMessage << *((*i).second) ;
00423        }
00424      
00425      // Transfert des adresses vers chaque controleur local
00426      nameToIdMessage.send ( PvmMessage::ProcessTable ) ;
00427      
00428      serveNameRequestsUntilEnd () ;
00429 
00430 }

void Svm::broadcast ( PvmMessage::MessageTag  ,
PvmMulticastMessage message = NULL 
) [virtual]

broadcast a message

reusing mess

Definition at line 663 of file OMKSvm.cxx.

References _processTable, _siteName, and OMK::PvmOutgoingMessage::insertTimeStamp().

Referenced by synchronizeOn().

00664 {
00665   //the first implementation, commented, dies in pvm with an unexpected error
00666   //I haven't had time to trace the problem, so the second implementation is used
00667 
00668   /* first implementation
00669   initBeforeMessagePacking () ;
00670   
00671   if ( mess != NULL )
00672     {
00673       mess->packMessage() ;
00674     }
00675   else
00676     {
00677       mess = new PvmOutgoingMessage () ;
00678       mess->insertTimeStamp(0) ;
00679       *mess << "dummy";
00680       mess->packMessage() ;
00681     }
00682   
00683   broadcastToGroup ( slaveGroup, tag ) ;
00684   */
00685 
00686 #ifdef _DEBUGPVMMESS
00687   cerr<<"Svm::broadcast ("<<tag<<","<<mess<<") ; "<<endl;
00688 #endif
00689 
00690   NameToPointerMap<Process>::iterator processIter ; 
00691   
00692   if (mess == NULL )
00693     {
00694       std::vector<int> distantRecepients ;
00695       for (processIter = _processTable->begin () ; processIter != _processTable->end () ; ++processIter) 
00696         {
00697           if (_siteName != (*processIter).second->getProcessName ()) 
00698             {
00699               // On connait alors le canal a observer
00700               distantRecepients.push_back ( (*processIter).second->getSvmLink ()->getTID () ) ;
00701             }
00702         }
00703 
00705       mess = new PvmMulticastMessage ( distantRecepients ) ;
00706       mess->insertTimeStamp(0) ;
00707     }
00708 
00709   mess -> send ( tag ) ;
00710 
00711 }

void Svm::synchronizeOn ( PvmController ,
PvmMessage::MessageTag  tag 
) [virtual]

enter a synchronisation barrier, between all distributed sites except the central site this is a blocking call, but the virtual machine still answer requests.

Definition at line 715 of file OMKSvm.cxx.

References _numberOfSlaves, broadcast(), and waitForAnswerToBlockingRequest().

Referenced by OMK::PvmController::init().

00716 {
00717   broadcast ( tag ) ;
00718 #ifdef _DEBUGPVMSYNCHRO
00719   cerr<< "Svm::waitForAllProcesses("<<tag<<") "<<_numberOfSlaves <<endl;
00720   usleep(1000000) ;
00721 #endif
00722 
00723   int numberOfReadyProcesses = 1 ; //this process has entered the barrier
00724   
00725   while ( numberOfReadyProcesses != _numberOfSlaves )
00726      {
00727 
00728         // here, we ignore the results, because we suppose one process will only send tag once during synchronization
00729         waitForAnswerToBlockingRequest ( parsingController, tag ) ;
00730 
00731         numberOfReadyProcesses++ ;
00732 
00733 #ifdef _DEBUGPVMMESS
00734   cerr<< "Svm::exitSoftBarrier numberToEnterBarrier: "<<numberOfReadyProcesses<<endl;
00735   usleep(1000000) ;
00736 #endif
00737 
00738      }
00739 }

void Svm::disconnectFromDistributedSimulation ( const Date deconnexionDate  )  [virtual]

Disconnect the site from the distributed simulation.

Parameters:
deconnexionDate. date at wich the disconnection is happening (to timestamp the disconnection message) This will trigger diconnection on a sites.

Definition at line 621 of file OMKSvm.cxx.

References _processTable, OMK::PvmMessage::EndOfSimulation, getParentSiteId(), getSiteName(), OMK::PvmOutgoingMessage::insertTimeStamp(), processSiteIds, OMK::PvmMulticastMessage::send(), and OMK::PvmUnicastMessage::send().

Referenced by serveNameRequestsUntilEnd(), and OMK::PvmController::~PvmController().

00622 {
00623 #ifdef _DEBUGPVMEXEC
00624    cout <<"Svm::disconnectFromDistributedSimulation"<< endl ;
00625 #endif
00626    int recepient = getParentSiteId () ;
00627 
00628    if ( recepient != 0 )
00629       // a local PvmController has finished. Notify the central controller
00630       {
00631          PvmUnicastMessage endMessage (recepient) ;
00632          
00633          endMessage.insertTimeStamp ( endOfSimulationDate ) ;
00634  
00635          endMessage << getSiteName () ;
00636 
00637          endMessage.send ( PvmMessage::EndOfSimulation ) ;
00638       }
00639    else
00640       //the central controller is deconnecting. Notify all local controllers
00641       {
00642          std::vector<int> processSiteIds ;
00643          for (NameToPointerMap<Process>::iterator i = _processTable->begin () ; 
00644               i != _processTable-> end () ; 
00645               i ++) 
00646             {
00647                processSiteIds.push_back ( (i->second)->getSvmLink ()->getTID() ) ;
00648             }
00649          PvmMulticastMessage endOfSimulationMessage ( processSiteIds ) ;             
00650 
00651          endOfSimulationMessage.insertTimeStamp ( endOfSimulationDate ) ;
00652 
00653          endOfSimulationMessage << getSiteName () ;
00654 
00655          endOfSimulationMessage.send ( PvmMessage::EndOfSimulation ) ;
00656       }
00657 }

const Name & Svm::getSiteName (  )  const [virtual]

get the site name in the simulation

Definition at line 2247 of file OMKSvm.cxx.

References _siteName.

Referenced by disconnectFromDistributedSimulation(), OMK::PvmController::getProcessName(), and OMK::PvmController::PvmController().

02248 {
02249    return _siteName;
02250 }

SvmLink * Svm::getLinkToProcessNamed ( const Name processName  )  [virtual]

get acces to the link to a given process

CHADI WARN

END CHADI

Definition at line 750 of file OMKSvm.cxx.

References _processTable.

Referenced by OMK::PvmController::sendInitialValuesToMirror(), and waitForMessageFrom().

00751 {
00752   // check that we know this process
00753   if( _processTable->find( processName ) != _processTable->end() )
00754   {
00755     Process *pss = _processTable->getObjectOfIndex( processName ) ;
00756     return pss->getSvmLink() ;
00757   }
00758   else
00759   {
00761 // #ifdef _USESSTREAM
00762 //     ostringstream o ;
00763 //     o << "Svm::getLinkToProcessNamed : Impossible de trouver le processus "
00764 //       << processName << " afin de recuperer son canal !" ;
00765 //     Controller::warning (o.str(), Controller::SomeWarnings ) ;
00766 // #else
00767 //     ostrstream o ;
00768 //     o << "Svm::getLinkToProcessNamed : Impossible de trouver le processus "
00769 //       << processName << " afin de recuperer son canal !" ;
00770 //     o.put ('\0') ;
00771 //     Controller::warning (o.str(), Controller::SomeWarnings ) ;
00772 //     delete o.str() ;
00773 // #endif
00775     return 0 ;
00776   }
00777 }

Process * Svm::getProcessDescriptorNamed ( const Name processName  )  [virtual]

get acces to a given process descriptor

CHADI WARN

CHADI

Definition at line 781 of file OMKSvm.cxx.

References _processTable.

00782 {
00783   // check that we know this process
00784   if( _processTable->find( processName ) != _processTable->end() )
00785   {
00786     Process *pss = _processTable->getObjectOfIndex( processName ) ;
00787     return pss ;
00788   }
00789   else 
00790   {
00792 // #ifdef _USESSTREAM
00793 //     ostringstream o  ;
00794 //     o << "Svm::getProcessDescriptorNamed: process " << processName << " unknown !" ;
00795 //     Controller::warning (o.str(), Controller::SomeWarnings ) ;
00796 // #else
00797 //     ostrstream o  ;
00798 //     o << "Svm::getProcessDescriptorNamed: process " << processName << " unknown !" ;
00799 //     o.put('\0') ;
00800 //     Controller::warning (o.str(), Controller::SomeWarnings ) ;
00801 //     delete o.str() ;
00802 // #endif
00804     return 0 ;
00805   }
00806 }

Process * Svm::testIfDisconnectedProcessNamed ( const Name processName  )  [virtual]

Definition at line 815 of file OMKSvm.cxx.

References _disconnectedTable.

00816 {
00817   // check that we know this process as a deconnected process
00818   if( _disconnectedTable->find( processName ) != _disconnectedTable->end() )
00819   {
00820     Process *pss = _disconnectedTable->getObjectOfIndex( processName ) ;
00821     return pss ;
00822   }
00823   else 
00824   {
00825     return 0 ;
00826   }
00827 }

void Svm::testIfNewProcessAdded (  )  [virtual]

Definition at line 212 of file OMKSvm.cxx.

References _idToProcessTable, _linkToCentralSite, _numberOfSlaves, _processTable, createSvmLink(), dateVar, OMK::Process::getProcessName(), OMK::PvmMessage::LocalInitSuccessfull, npp, OMK::Process::setSvmLink(), syncDistributedSites(), OMK::SvmLink::testIfRequestUpdateProcessTable(), and OMK::PvmMessage::updateProcessTable.

00213 {
00214  PvmIncomingMessage * message = NULL ;
00215  message =  _linkToCentralSite->testIfRequestUpdateProcessTable (PvmMessage::updateProcessTable) ;
00216  if ( message != NULL )
00217    {  
00218      Name zico ;
00219     // *message>>zico ;
00220     // cout<<"le nom de process recu est :  "<<zico<<endl; 
00221      Process* p = new Process  ( "visualProcessC", "maxipes.irisa.fr" ); 
00222 
00223     // Process *p ;             
00224     // *message>> *p ;
00225      _processTable->addObjectWithIndex(p->getProcessName(),p ) ;
00226      ++_numberOfSlaves ;
00227     
00228     // Process * p = (*i).second ;
00229      // creation d'un canal sans destinataire 
00230      p->setSvmLink (createSvmLink ()) ;
00231      // maj du destinataire ?????????????????????????????????
00232      *npp  >> *p ;
00233      
00234      _idToProcessTable.insert ( make_pair ( p->getSvmLink()->getTID(),p ) ) ;
00235      
00236      PvmUnicastMessage initSucMessage ( p->getSvmLink()->getTID() ) ;
00237      initSucMessage.insertTimeStamp ( dateVar ) ;
00238      initSucMessage.send ( PvmMessage::LocalInitSuccessfull ) ;
00239      
00240      NameToPointerMap<Process>::iterator i ;
00241      for (i = _processTable->begin () ; i != _processTable->end () ; i ++) 
00242         {
00243           cout<<"_processTable contient: "<<(*i).second->getProcessName()<<endl; 
00244         }
00245 
00246      syncDistributedSites();  
00247    }
00248 }

void Svm::processReceivedMessages ( PvmController parsingController,
const PvmMessage::MessageTag  tag 
) [virtual]

Attempt to receive and process messages from all sites, in a non blocking fashion.

Parameters:
parsingController the controller wich will parse the received messages
tag the tag of the messages to receive

Definition at line 836 of file OMKSvm.cxx.

References _processTable, _siteName, OMK::PvmIncomingMessage::getMessageDate(), OMK::PvmMessage::getSize(), OMK::PvmIncomingMessage::hasMessage(), OMK::PvmController::parseSynchronisationMessage(), and OMK::SvmLink::testForMessage().

Referenced by OMK::PvmController::run().

00837 {
00838 #ifdef _DEBUGPVMMESS
00839    cerr << "Svm::processReceivedMessages"<<endl;
00840 #endif
00841    NameToPointerMap<Process>::iterator pOMKs ; // Iterateur pour parcours tableau
00842    SvmLink * canal ; // Canal vers le precedent processus
00843    PvmIncomingMessage * messageRecu = NULL ;
00844    for (pOMKs = _processTable->begin () ; 
00845         pOMKs != _processTable->end () ; 
00846         pOMKs ++) 
00847       {
00848          if (_siteName != (*pOMKs).second->getProcessName ()) 
00849             {
00850                // On connait alors le canal a observer
00851                canal = (*pOMKs).second->getSvmLink () ;
00852 #ifdef _DEBUGPVMMESS
00853                cout << "Svm::processReceivedMessages from "
00854                     << (*pOMKs).second->getProcessName () << " tag : "
00855                     << tag << endl ;
00856                //canal->affiche () ;
00857 #endif
00858                // On fait une reception non bloquante
00859                messageRecu = canal->testForMessage (tag) ;
00860                while ( messageRecu->hasMessage () ) 
00861                   {
00862 #ifdef _DEBUGPVMMESS
00863                      cerr << "Svm::processReceivedMessages : message received of size " <<messageRecu->getSize()<< endl;
00864 #endif        
00865 #ifdef _DEBUGPVMMESS
00866                      cerr << "Svm::processReceivedMessages: extracted emission date: " <<messageRecu->getMessageDate()<< endl;
00867 #endif        
00868                      //cerr << "Svm::processReceivedMessages : date " << dateEmission << endl ;
00869                      // On envoie le message et l'objet au controleur local
00870                      parsingController.parseSynchronisationMessage (messageRecu) ;
00871                      messageRecu = canal->testForMessage (tag) ;
00872                   }
00873             }
00874       }
00875 }

void Svm::waitAndProcessMessages ( PvmController parsingController,
const PvmMessage::MessageTag  tag 
) [virtual]

receive and process messages from all sites, and wait for the messages to arrive if necessary

Parameters:
parsingController the controller wich will parse the received messages
tag the tag of the messages to receive

Definition at line 879 of file OMKSvm.cxx.

References _processTable, _siteName, OMK::PvmController::parseSynchronisationMessage(), OMK::SvmLink::printDebuggingInformation(), and OMK::SvmLink::waitForMessage().

00881 {
00882 #ifdef _DEBUGPVMMESS
00883    cerr<<"Svm::waitAndProcessMessages tag "<<tag<<endl<<"Processus : "<<endl;
00884    NameToPointerMap<Process>::iterator pOMK ;
00885    for (pOMK = _processTable->begin () ; pOMK != _processTable->end () ; pOMK ++) {
00886       cerr<<(*pOMK).second->getProcessName ();
00887    }
00888    cerr<<endl;
00889 #endif
00890    NameToPointerMap<Process>::iterator pOMKs ; // Iterateur pour parcours tableau
00891    SvmLink * canal ; // Canal vers le precedent processus
00892    PvmIncomingMessage * messageRecu ;
00893 #ifdef _DEBUGPVMMESS
00894    cerr<<"Svm::waitAndProcessMessages écoute de tous les processus"<<endl;
00895 #endif
00896    for (pOMKs = _processTable->begin () ; pOMKs!= _processTable->end () ; pOMKs ++) {
00897 #ifdef _DEBUGPVMMESS
00898       cerr<<"Svm::waitAndProcessMessages écoute d'un de plus : "<<(*pOMKs).second->getProcessName ()<<endl;
00899 #endif
00900       if (_siteName != (*pOMKs).second->getProcessName ()) {
00901          // On connait alors le canal a observer
00902          canal = (*pOMKs).second->getSvmLink () ;
00903 #ifdef _DEBUGPVMMESS
00904          cout << "OMKmSvm::waitAndProcessMessages sur "
00905               << (*pOMKs).second->getProcessName () << " tag : "
00906               << tag << endl ;
00907          canal->printDebuggingInformation () ;
00908 #endif
00909          // On recoit le message en bloquant
00910          messageRecu = & canal->waitForMessage (tag) ;
00911 #ifdef _DEBUGPVMMESS
00912          cerr << "Svm::waitAndProcessMessages : message received"<< endl ;
00913 #endif
00914          // On envoie le message et l'objet au controleur local
00915          parsingController.parseSynchronisationMessage (messageRecu) ;
00916       }
00917    }
00918 }

void Svm::waitForMessage ( PvmController parsingController,
const PvmMessage::MessageTag  tag 
) [virtual]

wait until a message has arrived from one site

Parameters:
parsingController the controller wich will parse the received messages
tag the tag of the messages to receive

Definition at line 922 of file OMKSvm.cxx.

References _linkToCentralSite, _processTable, _siteName, OMK::PvmIncomingMessage::hasMessage(), OMK::PvmController::parseSynchronisationMessage(), OMK::SvmLink::printDebuggingInformation(), and OMK::SvmLink::testForMessage().

00923 {
00924    NameToPointerMap<Process>::iterator pOMKs ; // Iterateur pour parcours tableau
00925    SvmLink * canal ; // Canal vers le precedent processus
00926    PvmIncomingMessage * messageRecu = NULL ;
00927    bool rienRecu=true;
00928    while (rienRecu) {//on ne teste pas la nullite du message, 
00929       //car en cas de message ne contenant que la date, on le vide !
00930       for (pOMKs = _processTable->begin () ; pOMKs!= _processTable->end () ; pOMKs ++) {
00931          if (_siteName != (*pOMKs).second->getProcessName ()) {
00932             // On connait alors le canal a observer
00933             canal = (*pOMKs).second->getSvmLink () ;
00934 #ifdef _DEBUGPVMMESS
00935             cout << "OMKmSvm::waitForMessage sur "
00936                  << (*pOMKs).second->getProcessName () << " tag : "
00937                  << tag << endl ;
00938             canal->printDebuggingInformation () ;
00939 #endif
00940             // On recoit le message en bloquant
00941             messageRecu = canal->testForMessage (tag) ;
00942             // On envoie le message et l'objet au controleur local
00943             if (messageRecu->hasMessage() ) 
00944                {
00945 #ifdef _DEBUGPVMMESS
00946                cerr << "Svm::waitForMessage : message received" << endl ;
00947 #endif
00948                parsingController.parseSynchronisationMessage (messageRecu) ;
00949                rienRecu=false;
00950             }
00951          }
00952       }
00953       messageRecu = _linkToCentralSite->testForMessage (tag) ;
00954       //cerr << "Svm::waitForMessage : message recu : " << *messageRecu << endl ;
00955       if (messageRecu->hasMessage() ) 
00956          {
00957 #ifdef _DEBUGPVMMESS
00958          cerr << "Svm::waitForMessage : message received" << endl ;
00959 #endif
00960          parsingController.parseSynchronisationMessage (messageRecu) ;
00961          rienRecu=false;
00962       }
00963    }
00964 }

void Svm::waitForMessageFrom ( PvmController parsingController,
const Name processName,
const PvmMessage::MessageTag  tag 
) [virtual]

receive a message from a particular process

Parameters:
parsingController the controller wich will parse the received messages
tag the tag of the messages to receive
processName the name of the process to receive a message from

Definition at line 2207 of file OMKSvm.cxx.

References getLinkToProcessNamed(), OMK::PvmController::parseSynchronisationMessage(), OMK::SvmLink::printDebuggingInformation(), and OMK::SvmLink::waitForMessage().

02210 {
02211    SvmLink * canal ; // Canal vers le precedent processus
02212    PvmIncomingMessage * messageRecu = NULL ;
02213    
02214    // On recupere le canal a observer
02215    canal = getLinkToProcessNamed (processName) ; 
02216 #ifdef _DEBUGPVMMESS
02217    cout << "OMKmSvm::waitForMessageFrom sur "
02218         << processName << " tag : "
02219         << tag << endl ;
02220    canal->printDebuggingInformation() ;
02221 #endif
02222    // On recoit le message en bloquant
02223    messageRecu = & canal->waitForMessage (tag) ;
02224    // On envoie le message et l'objet au controleur local
02225    parsingController.parseSynchronisationMessage (messageRecu) ;
02226    // On libere le message ???
02227    //delete messageRecu;
02228 }

void Svm::synchroniseReceiveAndProcessMessages ( PvmController parsingController,
const PvmMessage::MessageTag  tag 
) [virtual]

receive and process messages from all sites (except the central site), and wait until at least a recent (as defined by latency) message has arrived from each of these sites

Parameters:
parsingController the controller wich will parse the received messages
tag the tag of the messages to receive

CHADI WARN

Definition at line 2021 of file OMKSvm.cxx.

References _processTable, _siteName, _synchronisationLatency, _yieldNeeded, aRecevoir, OMK::PvmMessage::EndOfSimulation, OMK::PvmMessage::EnterBarrier, OMK::PvmMessage::ExitBarrier, OMK::Process::getProcessName(), OMK::Controller::getSimulatedDate(), OMK::Process::getSvmLink(), OMK::PvmIncomingMessage::hasMessage(), OMK::PvmMessage::InitialValuesForMirror, OMK::PvmMessage::LocalInitSuccessfull, OMK::PvmMessage::MirrorNeedsInitialValues, OMK::PvmMessage::NameServiceGetId, OMK::PvmMessage::NameServiceGetString, OMK::PvmMessage::NameServiceReturnId, OMK::PvmMessage::NameServiceReturnString, OMK::PvmMessage::NameServiceVerifyLocalNameServer, OMK::PvmMessage::NameServiceVerifyResult, OMK::PvmController::parseSynchronisationMessage(), OMK::PvmMessage::ProcessTable, OMK::PvmController::sendInitialValuesToMirror(), OMK::Process::setDateOfLastMessage(), OMK::PvmMessage::SiteName, OMK::PvmMessage::SynchronisationMessage, and OMK::SvmLink::testForAnyMessage().

Referenced by OMK::PvmController::synchronizedReceiveAndProcessMessages().

02022 {
02023    //cout<<"GGGGG"<<endl;
02024    NameToPointerMap<Process> aRecevoir ;
02025 
02026    //First make a list of processes that we need to receive values from
02027    for (   NameToPointerMap<Process>::iterator pOMKsE = _processTable->begin () ; 
02028            pOMKsE != _processTable->end () ; 
02029            pOMKsE ++) 
02030       {
02031          if ( (pOMKsE->first != _siteName) &&
02032               ( pOMKsE->second->getDateOfLastMessage () +
02033                 pOMKsE->second->getPeriod () +
02034                 _synchronisationLatency < parsingController.getSimulatedDate () ) ) 
02035             {
02036                aRecevoir.addObjectWithIndex (pOMKsE->first, pOMKsE->second) ;
02037 #ifdef _DEBUGPVMMESS
02038                cerr << "Svm::synchroniseReceiveAndProcessMessages: trying reception from: "
02039                     << pOMKsE->first << endl ;
02040 #endif
02041             }
02042       }
02043 
02044    
02045    PvmIncomingMessage receiveBuffer ;
02046    PvmMessage::MessageTag receivedRequest ;  
02047    Process * receivingProcess ;
02048    
02049    while (! aRecevoir.empty () ) 
02050       { 
02051          for ( NameToPointerMap<Process>::iterator processIterator = _processTable->begin () ;
02052                processIterator != _processTable->end () ;
02053                ++processIterator 
02054                )
02055             {
02056                //this new implementation doesn't do blocking receives, as some processes could send urgent requests
02057                receivingProcess = processIterator->second ;
02058 
02059                receivedRequest = receivingProcess->getSvmLink ()->testForAnyMessage ( receiveBuffer ) ;
02060                
02061                if ( receiveBuffer.hasMessage() )
02062                   {
02063                      // TDTD ajout de trace pour debug
02064                      //std::cerr << "buffer reçu : " << receivedRequest << " à date " << receiveBuffer.getMessageDate ()
02065                      //       << " de " << receivingProcess->getProcessName() << std::endl ;
02066                      switch (receivedRequest)
02067                         {
02068                         case PvmMessage::SynchronisationMessage :
02069                            {
02070                               //have the controller process the message
02071                               parsingController.parseSynchronisationMessage ( &receiveBuffer ) ;
02072                               
02073                               receivingProcess->setDateOfLastMessage ( receiveBuffer.getMessageDate () ) ;
02074                               
02075                               //an additionnal test could be performed here to make sure correct date of last message has happened
02076                               aRecevoir.erase ( receivingProcess->getProcessName() ) ;
02077                            }
02078                         break ;
02079                         case PvmMessage::MirrorNeedsInitialValues :
02080                            {
02081                               parsingController.sendInitialValuesToMirror ( receiveBuffer ) ;      
02082                            }
02083                         break ;
02084                         case PvmMessage::InitialValuesForMirror :
02085                            {
02086                               //have the controller process the message
02087                               parsingController.parseSynchronisationMessage ( &receiveBuffer ) ;
02088                               
02089                            }
02090                         break ;
02091                         case PvmMessage::LocalInitSuccessfull :
02092                         case PvmMessage::ProcessTable :
02093                         case PvmMessage::EndOfSimulation :
02094                         case PvmMessage::EnterBarrier :
02095                         case PvmMessage::SiteName :
02096                         case PvmMessage::ExitBarrier :
02097                         case PvmMessage::NameServiceGetId :
02098                         case PvmMessage::NameServiceGetString :
02099                         case PvmMessage::NameServiceReturnId :
02100                         case PvmMessage::NameServiceReturnString :
02101                         case PvmMessage::NameServiceVerifyLocalNameServer :
02102                         case PvmMessage::NameServiceVerifyResult :
02103                         default:
02104                            {
02106                               std::cerr << "Svm::waitForAnswerToBlockingRequest(): received unexpected message" << std::endl ;
02107                               /* END CHADI */
02108                            }
02109                         }
02110                   }
02111             }
02112          // TDTD : ajout d'un arrêt pour pouvoir travailler avec PVM sur une seule machine et plusieurs process
02113          if (_yieldNeeded) {
02114 #ifdef _MSC_VER
02115                  Sleep( 0 ) ;
02116 #else
02117             usleep (0) ;
02118 #endif
02119          }
02120       }
02121          
02122    
02123 #ifdef _DEBUGPVMMESS
02124    cerr << "Svm::synchroniseReceiveAndProcessMessages : on n'est pas bloque la " << endl ;
02125 #endif
02126 }

void Svm::relaxedSynchroniseReceiveAndProcessMessages ( PvmController parsingController,
const PvmMessage::MessageTag  tag 
) [virtual]

Definition at line 1103 of file OMKSvm.cxx.

References _activeProcessTable, _disconnectedProcessusMap, _disconnectedTable, _siteName, _synchronisationLatency, _synchronisationTimeOut, _yieldNeeded, OMK::NameToPointerMap< ObjectType >::addObjectWithIndex(), aRecevoir, OMK::PvmMessage::EndOfSimulation, OMK::PvmMessage::EnterBarrier, OMK::PvmMessage::ExitBarrier, OMK::Process::getProcessName(), OMK::Controller::getSimulatedDate(), OMK::Process::getSvmLink(), OMK::PvmIncomingMessage::hasMessage(), OMK::PvmMessage::InitialValuesForMirror, OMK::PvmMessage::LocalInitSuccessfull, OMK::PvmMessage::MirrorNeedsInitialValues, OMK::PvmMessage::NameServiceGetId, OMK::PvmMessage::NameServiceGetString, OMK::PvmMessage::NameServiceReturnId, OMK::PvmMessage::NameServiceReturnString, OMK::PvmMessage::NameServiceVerifyLocalNameServer, OMK::PvmMessage::NameServiceVerifyResult, OMK::PvmController::parseSynchronisationMessage(), OMK::PvmMessage::ProcessTable, removeProcessProlog(), OMK::PvmController::sendInitialValuesToMirror(), OMK::Process::setDateOfLastMessage(), OMK::PvmMessage::SiteName, OMK::PvmMessage::SynchronisationMessage, and OMK::SvmLink::testForAnyMessage().

Referenced by OMK::PvmController::relaxedReceiveAndProcessMessages().

01103                                                                                                                         {
01104 
01105    PvmIncomingMessage receiveBuffer ;
01106    PvmMessage::MessageTag receivedRequest ;  
01107    Process * receivingProcess ;
01108    static Name visualProcessA ("visualProcessA") ;
01109    static Name visualProcessB ("visualProcessB") ;
01110    static Name visualProcessC ("visualProcessC") ;
01111    static Name visualProcessD ("visualProcessD") ;
01112 
01113    //std::cerr <<"Svm:: le controleur est sur le processus : " << _siteName << std::endl ;
01114 //    if (_siteName == visualProcessB) { // simulation d'un pb de latence réseau
01115 //       int sommeil = random () % 15000 ;
01116 //       usleep (sommeil) ;
01117 //       //std::cerr <<"ralentissement de : " << sommeil << std::endl ;
01118 //    } else if (_siteName == visualProcessC) { // simulation d'un pb de latence réseau
01119 //       int sommeil = random () % 30000 ;
01120 //       usleep (sommeil) ;
01121 //       //std::cerr <<"ralentissement de : " << sommeil << std::endl ;
01122 //    } else if (_siteName == visualProcessD) { // simulation d'un pb de latence réseau
01123 //       int sommeil = random () % 45000 ;
01124 //       usleep (sommeil) ;
01125 //       //std::cerr <<"ralentissement de : " << sommeil << std::endl ;
01126 //    }
01127 
01128 
01129 //  if (! _disconnectedTable->empty ()) {
01130 
01131    // first, we try only once to receive something from the disconnected processes
01132    NameToPointerMap<Process> copyOfDisconnected ;
01133    for (NameToPointerMap<Process>::iterator prIt = _disconnectedTable->begin () ;
01134         prIt != _disconnectedTable->end () ;
01135         ++ prIt) {
01136       copyOfDisconnected.addObjectWithIndex (prIt->first, prIt->second) ;
01137    }
01138    for (NameToPointerMap<Process>::iterator prIt = copyOfDisconnected.begin () ;
01139         prIt != copyOfDisconnected.end () ;
01140         ++ prIt) {
01141       if (prIt->first != _siteName) {
01142          receivingProcess = prIt->second ;       
01143          receivedRequest = receivingProcess->getSvmLink ()->testForAnyMessage (receiveBuffer) ;  
01144          if (receiveBuffer.hasMessage ()) {
01145             // this process has become active again, we put it in the list of processes that we need to receive values from
01146             // TDTD ajout de trace pour debug
01147             //std::cerr << " 1 buffer reçu : " << receivedRequest << " à date " << receiveBuffer.getMessageDate ()<< " de " << receivingProcess->getProcessName() << std::endl ;
01148             // TDTD essai de mise en commentaire pour voir... c'est normalement fait plus loin, avec un test...
01149             //_activeProcessTable->addObjectWithIndex (prIt->first, prIt->second) ;
01150             // fin TDTD
01151             // TDTD : essai pour voir...
01152             _activeProcessTable->addObjectWithIndex (prIt->first, prIt->second) ;
01153             _disconnectedTable->erase (prIt->second->getProcessName ()) ;
01154 //TDTDTD enlever le processus de la map _disconnectedProcessusMap
01155             _disconnectedProcessusMap.erase (prIt->first) ;
01156 
01157             // fin TDTD
01158             switch (receivedRequest) {
01159             case PvmMessage::SynchronisationMessage : {
01160                // cout<<"parsingController.getSimulatedDate() = "<<parsingController.getSimulatedDate()<<endl;
01161                parsingController.parseSynchronisationMessage (&receiveBuffer) ;
01162                receivingProcess->setDateOfLastMessage (receiveBuffer.getMessageDate ()) ;
01163             }
01164                break ;
01165             case PvmMessage::MirrorNeedsInitialValues : {
01166                parsingController.sendInitialValuesToMirror (receiveBuffer) ;      
01167             }
01168                break ;
01169             case PvmMessage::InitialValuesForMirror : {
01170                //have the controller process the message
01171                parsingController.parseSynchronisationMessage (&receiveBuffer) ;
01172             }
01173                break ;
01174             case PvmMessage::LocalInitSuccessfull :
01175                //  case PvmMessage::pingMessage :
01176 //                    {
01177 //                      cout<<"i got the ping message"<<endl;
01178 //                      _activeProcessTable->addObjectWithIndex(prIt->first, processIterator->second) ;
01179 //                      _disconnectedTable->erase( prIt->second->getProcessName() );
01180 //                    }
01181                break;
01182             case PvmMessage::ProcessTable :
01183             case PvmMessage::EndOfSimulation :
01184             case PvmMessage::EnterBarrier :
01185             case PvmMessage::SiteName :
01186             case PvmMessage::ExitBarrier :
01187             case PvmMessage::NameServiceGetId :
01188             case PvmMessage::NameServiceGetString :
01189             case PvmMessage::NameServiceReturnId :
01190             case PvmMessage::NameServiceReturnString :
01191             case PvmMessage::NameServiceVerifyLocalNameServer :
01192             case PvmMessage::NameServiceVerifyResult :
01193             default: {
01194                std::cerr << "Svm::waitForAnswerToBlockingRequest(): received unexpected message" << std::endl ;
01195             }
01196             } //end switch
01197          } else {
01198             // TDTD
01199             // if the proces was declared active, it is now declared inactive, not to send to him datas that will cost time to read
01200             // so that it will still be late...
01201             if (_activeProcessTable->find (prIt->second->getProcessName ()) != _activeProcessTable->end ()) {
01202                _activeProcessTable->erase (prIt->second->getProcessName ()) ;
01203             }
01204             // fin TDTD
01205          }                    
01206       }
01207    } //end for
01208    //     cout<<"size of _activeProcessTable  : "<<_activeProcessTable->size()<<endl;
01209 //    }
01210    //second, make a list of processes that we need to receive values from
01211    for (NameToPointerMap<Process>::iterator pOMKsE = _activeProcessTable->begin () ; 
01212         pOMKsE != _activeProcessTable->end () ; 
01213         pOMKsE ++) {
01214       if ((pOMKsE->first != _siteName) &&
01215           (pOMKsE->second->getDateOfLastMessage () +
01216            pOMKsE->second->getPeriod () +
01217            _synchronisationLatency < parsingController.getSimulatedDate ())) {
01218          aRecevoir.addObjectWithIndex (pOMKsE->first, pOMKsE->second) ;
01219 #ifdef _DEBUGPVMMESS
01220          cerr << "Svm::synchroniseReceiveAndProcessMessages: trying reception from: "
01221               << pOMKsE->first << endl ;
01222 #endif
01223       }
01224    }
01225   
01226     struct timeval start_timer,end_timer;
01227     gettimeofday(&start_timer, NULL);
01228     double time_in_ms = 0;
01229 
01230    // third, try to receive something from the active processes
01231    // valeur initiale proposée par Chadi : 200 ms
01232    while ((time_in_ms < _synchronisationTimeOut) && ! aRecevoir.empty ()) { // TDTD : valeur délicate à estimer pour le cas multi processus sur une seule machine
01233 // TDTD : ici je remplace processTable par activeProcesstable
01234 //       for (NameToPointerMap<Process>::iterator processIterator = _processTable->begin () ;
01235 //         processIterator != _processTable->end () ;
01236 //         ++processIterator) {
01237       for (NameToPointerMap<Process>::iterator processIterator = _activeProcessTable->begin () ;
01238            processIterator != _activeProcessTable->end () ;
01239            ++processIterator) {
01240 // fin TDTD
01241          //this new implementation doesn't do blocking receives, as some processes could send urgent requests
01242          receivingProcess = processIterator->second ;
01243          receivedRequest = receivingProcess->getSvmLink ()->testForAnyMessage (receiveBuffer) ;
01244          if (receiveBuffer.hasMessage ()) {
01245             // TDTD ajout de trace pour debug
01246             //std::cerr << " 2 buffer reçu : " << receivedRequest << " à date " << receiveBuffer.getMessageDate ()<< " de " << receivingProcess->getProcessName() << std::endl ;
01247             //---------------------------chadi ajout 4/6 ---------------------
01248             //  aRecevoir.erase ( receivingProcess->getProcessName() ) ;
01249             // start = clock();
01250 
01251             //  Date dlm = receivingProcess->getDateOfLastMessage();
01252 //            receivingProcess->setDateOfLastMessage( receiveBuffer.getMessageDate()) ;
01253 //            cout<<"date of last message est = "<<dlm<<" et le message ke je viens de recevoir est daté: "<<receivingProcess->getDateOfLastMessage()<<"donc la difference est de l'ordre de :"<<receivingProcess->getDateOfLastMessage()-dlm<<endl;
01254 // TDTD : je mets ce code plus haut
01255 //          if (_disconnectedTable->find (processIterator->second->getProcessName ()) != _disconnectedTable->end ()) {
01256 //             _activeProcessTable->addObjectWithIndex (processIterator->first, processIterator->second) ;
01257 //             _disconnectedTable->erase (processIterator->second->getProcessName ()) ;
01258 //             //std::cout<<" j'enleve de la liste de deconnecte le processus suivant: "<<processIterator->first<<std::endl;
01259 //             _simStep = 0 ;
01260 //          }
01261 // fin TDTD
01262             //-----------------------fin chadi ajout 4/6 -------------------------
01263 //            if (receiveBuffer.getMessageDate()+90 < parsingController.getSimulatedDate())
01264 //              { }
01265 //            else {
01266             switch (receivedRequest) {
01267             case PvmMessage::SynchronisationMessage : {
01268                // cout<<"parsingController.getSimulatedDate() = "<<parsingController.getSimulatedDate()<<endl;
01269                // To filter ancient synchro messages
01270                parsingController.parseSynchronisationMessage (&receiveBuffer) ;
01271                receivingProcess->setDateOfLastMessage (receiveBuffer.getMessageDate ()) ;
01272                if (parsingController.getSimulatedDate () < receiveBuffer.getMessageDate () + 30) {
01273                   // on a déjà reçu des messages assez récents de ce processus, on n'en demande pas d'autres
01274                   aRecevoir.erase (receivingProcess->getProcessName ()) ;
01275                }
01276             }
01277                break ;
01278             case PvmMessage::MirrorNeedsInitialValues : {
01279                parsingController.sendInitialValuesToMirror (receiveBuffer) ;      
01280             } break ;
01281             case PvmMessage::InitialValuesForMirror : {
01282                //have the controller process the message
01283                parsingController.parseSynchronisationMessage (&receiveBuffer) ;
01284             } break ;
01285             case PvmMessage::LocalInitSuccessfull :
01286                //  case PvmMessage::pingMessage :
01287 //                  {
01288 //                    cout<<"i got the ping message"<<endl;
01289 //                    _activeProcessTable->addObjectWithIndex(processIterator->first, processIterator->second) ;
01290 //                    _disconnectedTable->erase( processIterator->second->getProcessName() );
01291 //                  }
01292                break ;
01293             case PvmMessage::ProcessTable :
01294             case PvmMessage::EndOfSimulation :
01295             case PvmMessage::EnterBarrier :
01296             case PvmMessage::SiteName :
01297             case PvmMessage::ExitBarrier :
01298             case PvmMessage::NameServiceGetId :
01299             case PvmMessage::NameServiceGetString :
01300             case PvmMessage::NameServiceReturnId :
01301             case PvmMessage::NameServiceReturnString :
01302             case PvmMessage::NameServiceVerifyLocalNameServer :
01303             case PvmMessage::NameServiceVerifyResult :
01304             default: {
01305                std::cerr << "Svm::waitForAnswerToBlockingRequest(): received unexpected message" << std::endl ;
01306             }
01307             } //end switch
01308          }                
01309       } //end for
01310       
01311       gettimeofday (&end_timer, NULL) ;
01312       time_in_ms = (end_timer.tv_sec-start_timer.tv_sec) * 1000.0 +
01313          (end_timer.tv_usec-start_timer.tv_usec) /1000.0 ;
01314 
01315       // TDTD : ajout d'un arrêt pour pouvoir travailler avec PVM sur une seule machine et plusieurs process
01316       if (_yieldNeeded) {
01317 #ifdef _MSC_VER
01318                   Sleep( 0 ) ;
01319 #else
01320          usleep (0) ;
01321 #endif
01322       }
01323    }  //end while
01324 
01325    // fourth, remove the inactive processes from the _activeProcessTable and place them in the _disconnectedTable
01326    if  (! aRecevoir.empty ()) {
01327       for (NameToPointerMap<Process>::iterator processIterator = aRecevoir.begin () ;
01328            processIterator != aRecevoir.end () ;
01329            ++ processIterator) {
01330          _disconnectedTable->addObjectWithIndex (processIterator->first, processIterator->second) ;
01331 // TDTDTD ajouter le nom du processus dans la map _disconnectedProcessusMap avec la dernière date simulée arrivée
01332            _disconnectedProcessusMap.insert (std::pair<Name, Type::IntType> (processIterator->first,
01333                                                                              (int)processIterator->second->getDateOfLastMessage ())) ;
01334          // TDTD : je ne comprends pas trop pourquoi mais ici les lignes suivantes doivent être commentéee...
01335 //       if (_activeProcessTable->find (processIterator->second->getProcessName ()) != _disconnectedTable->end ()) {
01336 //          _activeProcessTable->erase (processIterator->second->getProcessName ()) ;
01337 //       }
01338          // fin TDTD : en fait c'était à cause de l'envoi aux seuls non inactifs (et non pas non en retard), voir la méthode send...withTag...
01339          //std::cout << "je rajoute le processus suivant a la table des deconnectes " << processIterator->second->getProcessName () << std::endl ;
01340       }
01341       aRecevoir.clear () ;
01342       removeProcessProlog () ;
01343    }
01344 // TDTD ajout trace pour debug
01345    //std::cout<<"longueur de la table des deconnectés : "<<_disconnectedTable->size ()<<std::endl;
01346 #ifdef _DEBUGPVMMESS
01347    cerr << "Svm::synchroniseReceiveAndProcessMessages : on n'est pas bloque la " << endl ;
01348 #endif
01349 
01350 }

Process * Svm::waitForAnswerToBlockingRequest ( PvmController parsingController,
PvmMessage::MessageTag  tag 
) [virtual]

receive and process messages from all sites (except the central site), and wait until at least on message of tag tag has arrived

Parameters:
parsingController the controller wich will parse the received messages
tag the tag of the messages to receive
Returns:
a pointer to the process having sent the unblocking message

CHADI WARN Controller::warning ("Svm::waitForAnswerToBlockingRequest(): received unexpected message", Controller::AllWarnings ) ; END CHADI

Definition at line 2254 of file OMKSvm.cxx.

References _idToProcessTable, OMK::PvmMessage::addNewSite, OMK::PvmMessage::EndOfSimulation, OMK::PvmMessage::EnterBarrier, OMK::PvmMessage::ExitBarrier, OMK::PvmIncomingMessage::getMessageDate(), OMK::PvmMessage::InitialValuesForMirror, OMK::PvmMessage::LocalInitSuccessfull, OMK::PvmMessage::MirrorNeedsInitialValues, OMK::PvmMessage::NameServiceGetId, OMK::PvmMessage::NameServiceGetString, OMK::PvmMessage::NameServiceReturnId, OMK::PvmMessage::NameServiceReturnString, OMK::PvmMessage::NameServiceVerifyLocalNameServer, OMK::PvmMessage::NameServiceVerifyResult, OMK::PvmController::parseSynchronisationMessage(), OMK::PvmMessage::ProcessTable, OMK::PvmController::sendInitialValuesToMirror(), OMK::PvmMessage::SiteName, OMK::PvmMessage::SynchronisationMessage, OMK::PvmMessage::updateProcessTable, and waitForAnyRequests().

Referenced by synchronizeOn(), and OMK::PvmController::waitForAnswerToBlockingRequest().

02255 {
02256    // this implementation assumes that only one thread at a time will enter this function
02257    // numberOfThread is an unsafe method to test this
02258    static int numberOfThreads = 0 ;
02259    ++numberOfThreads ;
02260    assert ( numberOfThreads == 1) ;
02261 
02262    Process * result ;
02263 
02264    PvmIncomingMessage receiveBuffer ;
02265    std::pair<PvmMessage::MessageTag,  int> receivedRequest ;  
02266    
02267    bool serving = true ;
02268 
02269    while ( serving ) 
02270       {
02271 
02272       receivedRequest = waitForAnyRequests ( receiveBuffer ) ;
02273 
02274       //find the sender process (result of this member function)
02275       map<int,Process *>::iterator i = _idToProcessTable.find ( receivedRequest.second ) ;
02276       
02277    //   cout<<"receivedRequest.second est egale : "<<receivedRequest.second<<endl;
02278    //   cout<<"receivedRequest.first est egale : "<<receivedRequest.first<<endl;
02279       // the message should come from a known process
02280       assert ( i != _idToProcessTable.end() ) ;
02281       
02282       result = i->second ;
02283       
02284       switch (receivedRequest.first)
02285         {
02286         case PvmMessage::SynchronisationMessage :
02287           {
02288             //have the controller process the message
02289             parsingController.parseSynchronisationMessage ( &receiveBuffer ) ;
02290             
02291             assert ( result != NULL ) ;
02292             
02293             result->setDateOfLastMessage ( receiveBuffer.getMessageDate () ) ;
02294           }
02295         break ;
02296         case PvmMessage::MirrorNeedsInitialValues :
02297           {
02298             parsingController.sendInitialValuesToMirror ( receiveBuffer ) ;      
02299           }
02300         break ;
02301         case PvmMessage::InitialValuesForMirror :
02302           {
02303             //have the controller process the message
02304             parsingController.parseSynchronisationMessage ( &receiveBuffer ) ;
02305             
02306           }
02307         break ;
02308         case PvmMessage::LocalInitSuccessfull :
02309         case PvmMessage::ProcessTable :
02310         case PvmMessage::EndOfSimulation :
02311         case PvmMessage::EnterBarrier :
02312         case PvmMessage::SiteName :
02313         case PvmMessage::ExitBarrier :
02314         case PvmMessage::NameServiceGetId :
02315         case PvmMessage::NameServiceGetString :
02316         case PvmMessage::NameServiceReturnId :
02317         case PvmMessage::NameServiceReturnString :
02318         case PvmMessage::NameServiceVerifyLocalNameServer :
02319         case PvmMessage::NameServiceVerifyResult :
02320         case PvmMessage::updateProcessTable :
02321         case PvmMessage::addNewSite :
02322         default:
02323           {
02328           }
02329         }
02330       
02331       serving = ( receivedRequest.first != tag ) ;
02332       
02333       }
02334    
02335    --numberOfThreads ;
02336 
02337    return result ;
02338 }

void Svm::sendCurrentBuffersWithTag ( const PvmMessage::MessageTag  tag  )  [virtual]

send the messages contained in the send buffers of all links

Parameters:
tag the tag to associate to the message

Definition at line 2133 of file OMKSvm.cxx.

References _activeProcessTable, _deadReckoningInterval, _processTable, _siteName, OMK::SynchronisationMessage::endOfSynchronisationFragment, OMK::PvmOutgoingMessage::flushCurrentBuffer(), OMK::SvmLink::getOutgoingBuffer(), OMK::SvmLink::sendOutgoingBuffer(), and OMK::PvmMessage::SynchronisationMessage.

Referenced by OMK::PvmController::advanceSimulatedDate(), OMK::PvmController::computeNextSimulationStep(), OMK::PvmController::init(), and OMK::PvmController::run().

02134 {
02135    NameToPointerMap<Process>::iterator pOMKs ; // Iterateur pour parcours tableau
02136    SvmLink * canal ; // Canal vers le precedent processus
02137    static int nbSteps = 1 ;
02138 #ifdef _DEBUGPVMMESS
02139    cerr<<"Svm::sendCurrentBufferWithTag tag "<<tag<<endl<<"Processus : "<<endl;
02140    NameToPointerMap<Process>::iterator pOMK ;
02141    for (pOMK = _processTable->begin () ; pOMK != _processTable->end () ; pOMK ++) 
02142      {
02143        cerr<<(*pOMK).second->getProcessName ();
02144      }
02145    cerr<<endl;
02146 #endif
02147 
02148    if (_deadReckoningInterval > 0) {
02149       nbSteps = (nbSteps + 1) % _deadReckoningInterval ;
02150    }
02151 
02152    // TDTD on envoie seulement les synchros aux actifs...
02153    for (pOMKs = _processTable->begin () ; pOMKs != _processTable->end () ; pOMKs ++) 
02154       //for (pOMKs = _activeProcessTable->begin () ; pOMKs != _activeProcessTable->end () ; pOMKs ++) 
02155 // fin TDTD
02156      {
02157 #ifdef _DEBUGPVMMESS
02158        cerr<<"Traitement des emissions vers "<<(*pOMKs).second->getProcessName ()<<endl;
02159 #endif
02160        if (_siteName != (*pOMKs).second->getProcessName ()) 
02161          {
02162 
02163            // On connait alors le canal a observer
02164            canal = (*pOMKs).second->getSvmLink () ;
02165 
02166            if ( tag == PvmMessage::SynchronisationMessage ) 
02167              {
02168                 (*pOMKs).second->getSvmLink ()->getOutgoingBuffer()<<SynchronisationMessage::endOfSynchronisationFragment ;
02169               
02170              }
02171 
02172            // On transmet le message du canal
02173 
02174 // TDTD ajout pour purger les envois aux inactifs
02175            //std::cerr << "sur site : " << _siteName << " : " ;
02176            if (_activeProcessTable->find ((*pOMKs).second->getProcessName ()) != _activeProcessTable->end ()) {
02177               //std::cerr << "envoi de syncho à actif : " << (*pOMKs).second->getProcessName () ;
02178               canal->sendOutgoingBuffer ( tag ) ;
02179            } else {
02180               if (nbSteps == 0) { // un coup de dead-reckoning ?
02181                  //std::cerr << "envoi de syncho à inactif : " << (*pOMKs).second->getProcessName () ;
02182                  canal->sendOutgoingBuffer ( tag ) ;             
02183               } else {
02184                  canal->getOutgoingBuffer().flushCurrentBuffer () ;
02185                  //std::cerr << "pas d'envoi de syncho à inactif : " << (*pOMKs).second->getProcessName () ;
02186               }
02187                }
02188            //std::cerr << endl ;
02189 // fin TDTD
02190 #ifdef _DEBUGPVMMESS
02191            cerr << "OMKmSvm::transmission avec tag : "
02192                 << tag << endl ;
02193 #endif
02194          }
02195 #ifdef _DEBUGPVMMESS
02196        else 
02197          {
02198            cerr << "Rien à faire "<<endl ;
02199            //canal->affiche () ;
02200          }
02201 #endif
02202      }
02203 }

void Svm::timestampCurrentSendBuffers ( const Date date  )  [virtual]

timestamp the messages contained in the send buffers of all links

Definition at line 2234 of file OMKSvm.cxx.

References _processTable, and _siteName.

Referenced by OMK::PvmController::advanceSimulatedDate(), and OMK::PvmController::init().

02235 {
02236    NameToPointerMap<Process>::iterator i ;
02237    for (i = _processTable->begin () ; i != _processTable->end () ; i ++) 
02238      {
02239         if (_siteName != (*i).second->getProcessName ())
02240            { 
02241               i->second->getSvmLink ()->getOutgoingBuffer ().insertTimeStamp ( date ) ;
02242            }
02243      }
02244 }

const Date & Svm::getSynchronisationLatency (  )  [virtual]

get the authorized latency for the relaxed receive

Definition at line 112 of file OMKSvm.cxx.

References _synchronisationLatency.

Referenced by OMK::PvmController::getOutputHistorySize(), OMK::PvmController::getPurgeDate(), and OMK::PvmController::run().

00113 {
00114   return _synchronisationLatency;
00115 }

void Svm::addToDisconnectedTable ( Name ,
Process  
) [virtual]

Definition at line 2344 of file OMKSvm.cxx.

References _disconnectedTable, and _troubleFlag.

02346 {
02347 //***************************
02348 //  if ( processIterator->second->getProcessName()== "visualProcessC" )
02349 //    { }
02350 //  else {
02351 //****************************
02352 
02353 
02354   _disconnectedTable->addObjectWithIndex(nameOfProcess, pointerToTheProcess );
02355  // _everybodyTable->erase(processIterator->second->getProcessName());
02356 // }
02357   _troubleFlag = true ;
02358 }

void Svm::removeFromDisconnectedTable ( NameToPointerMap< Process >::iterator   )  [virtual]

Definition at line 2360 of file OMKSvm.cxx.

References _disconnectedTable, and _troubleFlag.

02361  {
02362 //***************************
02363 //  if ( processIterator->second->getProcessName()== "visualProcessC" )
02364 //    { }
02365 //  else {
02366 //****************************
02367 //   _everybodyTable->addObjectWithIndex(processIterator->first,processIterator->second  );
02368   
02369    _disconnectedTable->erase( processIterator->second->getProcessName() );
02370 // }
02371    _troubleFlag = false ;
02372   }

bool Svm::disconnectedProcess ( Name  )  [virtual]

Definition at line 2377 of file OMKSvm.cxx.

References _disconnectedTable.

Referenced by OMK::PvmController::getIsolatedMirrorList(), and OMK::PvmController::getNonIsolatedMirrorList().

02378 {
02379   if (_disconnectedTable->find(zico)!=_disconnectedTable->end())
02380     { return true;}
02381   else
02382     return false;
02383 }

bool Svm::getTroubleFlag (  )  [virtual]

Definition at line 2387 of file OMKSvm.cxx.

References _troubleFlag.

02388 {
02389   if (_troubleFlag == true)
02390     { return true; }
02391   return false;
02392 }

bool Svm::disconnectedTableIsEmpty (  )  [virtual]

Definition at line 2396 of file OMKSvm.cxx.

References _disconnectedTable.

Referenced by OMK::PvmController::getNonIsolatedMirrorList(), and OMK::PvmController::testIfTroubleOccured().

02397 {
02398   if ( _disconnectedTable->empty() )
02399     {
02400       return true;
02401     }
02402   return false;
02403 }

void Svm::addNewSiteToSimulation ( const Date  )  [virtual]

Definition at line 136 of file OMKSvm.cxx.

References _idToProcessTable, _processTable, addNewWorkstation(), createSvmLink(), dateVar, OMK::Process::getHostMachineName(), OMK::SvmLink::getOutgoingBuffer(), OMK::Process::getProcessName(), OMK::Process::getSvmLink(), OMK::SvmLink::getTID(), OMK::PvmOutgoingMessage::insertTimeStamp(), processSiteIds, OMK::PvmMessage::ProcessTable, OMK::PvmMulticastMessage::send(), OMK::PvmUnicastMessage::send(), OMK::PvmOutgoingMessage::send(), OMK::Process::setSvmLink(), OMK::PvmMessage::SiteName, spawnProcess(), and OMK::PvmMessage::updateProcessTable.

Referenced by serveNameRequestsUntilEnd().

00137 {
00138   dateVar = date ;
00139 
00140  Process * theNewProcessToAdd = new Process ( "visualProcessC", "maxipes.irisa.fr" );
00141 
00142 
00144  
00145  // addNewWorkstation ( "barbux.irisa.fr" ) ;
00146  addNewWorkstation ( theNewProcessToAdd->getHostMachineName () ) ;
00147  
00148  _processTable->addObjectWithIndex ("visualProcessC", theNewProcessToAdd ) ;
00149  
00150  
00151  int adrLoc ;
00152  
00153  int processSiteId ;
00154 
00155  cout<<"New Process will be Spawned .....///"<<endl;
00156  cout<<"theNewProcessToAdd is named : "<<theNewProcessToAdd->getProcessName()<<endl;
00157  
00158  // spawn a new process on the specified workstation
00159  adrLoc = spawnProcess (theNewProcessToAdd) ;
00160 
00161  cout<<"New Process Spawned .....valeur de spawn est egale a : "<<adrLoc<<endl;
00162  
00163  // create a SvmLink, add it to processInfo
00164  theNewProcessToAdd->setSvmLink (createSvmLink (adrLoc)) ; 
00165  
00166  _idToProcessTable.insert ( make_pair ( adrLoc,theNewProcessToAdd ) ) ;
00167  
00168  theNewProcessToAdd->getSvmLink ()->getOutgoingBuffer().insertTimeStamp ( date ) ;
00169 
00170  
00171  // send the name of the process (in the Svm) to the created process
00172  theNewProcessToAdd->getSvmLink ()->getOutgoingBuffer()<<theNewProcessToAdd->getProcessName() ;
00173  
00174  theNewProcessToAdd->getSvmLink ()->getOutgoingBuffer().send (PvmMessage::SiteName) ;
00175  
00176  //prepare processSiteIds for the next step
00177  processSiteId =  theNewProcessToAdd->getSvmLink ()->getTID()  ;      
00178 // processSiteIds.push_back ( theNewProcessToAdd->getSvmLink ()->getTID() ) ;  
00179  
00180   
00181  // Encodage des attributs Svm de chaque processus
00182  PvmUnicastMessage nameToIdMessage ( processSiteId ) ;
00183  nameToIdMessage.insertTimeStamp ( date ) ;
00184 
00186  NameToPointerMap<Process>::iterator i ;
00187  for (i = _processTable->begin () ; i != _processTable->end () ; i ++) 
00188    {
00189      nameToIdMessage << *((*i).second) ;
00190    }
00191  
00192  // Transfert des adresses vers le controleur local du process ajouté
00193  nameToIdMessage.send ( PvmMessage::ProcessTable ) ;
00194  
00195  
00196  // envoyer le processTable Update a tt le monde
00197  PvmMulticastMessage updateProcessTableMessage ( processSiteIds ) ;
00198  updateProcessTableMessage.insertTimeStamp ( date ) ;
00199 
00200  updateProcessTableMessage << *theNewProcessToAdd ;
00201 // updateProcessTableMessage << (theNewProcessToAdd->getProcessName()) ;
00202  cout<<" theNewProcessToAdd->getProcessName() = "<<theNewProcessToAdd->getProcessName()<<endl;
00203 
00204  updateProcessTableMessage.send ( PvmMessage::updateProcessTable ) ;
00205  
00206 
00207  processSiteIds.push_back ( theNewProcessToAdd->getSvmLink ()->getTID() ) ; 
00208  cout<<"Finishing method addNewSiteToSimulation"<<endl;
00209 
00210 }

void Svm::addNewSiteRequest ( const Date date  )  [virtual]

added by chadi

Definition at line 543 of file OMKSvm.cxx.

References OMK::PvmMessage::addNewSite, getParentSiteId(), OMK::PvmOutgoingMessage::insertTimeStamp(), and OMK::PvmUnicastMessage::send().

Referenced by OMK::PvmController::createNewSite().

00544 {
00545   PvmUnicastMessage mess ( getParentSiteId () ) ;
00546   mess.insertTimeStamp ( date ) ;
00547   mess.send ( PvmMessage::addNewSite ) ;
00548 }

bool Svm::reconnexionEstablished (  )  [virtual]

Definition at line 2407 of file OMKSvm.cxx.

References _disconnectedTable, and testIfSiteRecovered().

02408 {
02409  NameToPointerMap<Process>::iterator prIt = _disconnectedTable->begin () ;
02410           
02411 //       if ( testIfSiteRecovered( (*prIt).second->getHostMachineName() ) )
02412  if ( testIfSiteRecovered( (*prIt).second ) )
02413          {
02414           // cout<<" Svm::reconnexionEstablished OK"<<endl;
02415            return true ;
02416          }
02417         
02418  else 
02419    {
02420     // cout<<" Svm::reconnexionEstablished NOTOK"<<endl;
02421      return false ;
02422    }
02423          
02424 }

virtual bool OMK::Svm::testIfSiteRecovered ( Process p  )  [pure virtual]

Implemented in OMK::PvmSvm.

Referenced by reconnexionEstablished().

void Svm::sendPingMessage (  )  [virtual]

Definition at line 2428 of file OMKSvm.cxx.

References _disconnectedTable, and OMK::PvmMessage::pingMessage.

02429 {
02430  for ( NameToPointerMap<Process>::iterator prIt = _disconnectedTable->begin () ;
02431             prIt != _disconnectedTable->end () ;
02432             ++prIt 
02433             )
02434         {
02435           ((*prIt).second)->getSvmLink()->getOutgoingBuffer()<<"ping" ;
02436           ((*prIt).second)->getSvmLink()->getOutgoingBuffer().send(PvmMessage::pingMessage) ;
02437           cout<<"message ping est fait "<<endl;
02438         }
02439 }

void Svm::removeProcessFromActiveTable (  )  [virtual]

Definition at line 1669 of file OMKSvm.cxx.

References _activeProcessTable, and _disconnectedTable.

Referenced by removeProcessProlog().

01670 {
01671 for ( NameToPointerMap<Process>::iterator processIterator = _disconnectedTable->begin () ;
01672             processIterator != _disconnectedTable->end () ;
01673             ++processIterator 
01674             )
01675         {
01676           _activeProcessTable->erase( processIterator->first ) ;
01677          // cout<<"le process "<<processIterator->first<<" a ete vire de la liste active"<<endl;
01678         }
01679 // _temporaryTable->clear() ;
01680 // _simStep = 0 ;
01681 }

void Svm::removeProcessProlog (  )  [virtual]

Definition at line 1658 of file OMKSvm.cxx.

References _simStep, and removeProcessFromActiveTable().

Referenced by relaxedSynchroniseReceiveAndProcessMessages().

01659 { 
01660   _simStep++ ;
01661   //cout<<"_simStep est egale :"<<_simStep<<endl;
01662   if (_simStep == 2) { 
01663     removeProcessFromActiveTable() ; 
01664    // cout<<"_simStep est egale (2) :"<<_simStep<<endl;
01665     _simStep = 0 ;
01666   }
01667 }

std::list< Name > Svm::getDisconnectedProcessusList (  )  [virtual]

Definition at line 2445 of file OMKSvm.cxx.

References _disconnectedProcessusList, and _disconnectedTable.

Referenced by OMK::PvmController::getDisconnectedProcessusList().

02445                                                  {
02446    _disconnectedProcessusList.clear () ;
02447    for (NameToPointerMap<Process>::iterator prIt = _disconnectedTable->begin () ;
02448         prIt != _disconnectedTable->end () ;
02449         prIt ++) {
02450       _disconnectedProcessusList.push_back ((*prIt).first) ;
02451    }
02452    return _disconnectedProcessusList ;
02453 }

std::map< Name, int > Svm::getDisconnectedProcessusMap (  )  [virtual]

Definition at line 2457 of file OMKSvm.cxx.

References _disconnectedProcessusMap.

Referenced by OMK::PvmController::getDisconnectedProcessusMap().

02457                                                     {
02458    return _disconnectedProcessusMap ;
02459 }

virtual void OMK::Svm::addNewWorkstation ( const Name m  )  [protected, pure virtual]

add a new machine to the virtual machine

Implemented in OMK::PvmSvm.

Referenced by addNewSiteToSimulation(), and createDistributedSimulation().

virtual void OMK::Svm::removeWorkstation ( const Name m  )  [protected, pure virtual]

remove a work station from the virtual machine

Implemented in OMK::PvmSvm.

virtual int OMK::Svm::spawnProcess ( Process p  )  [protected, pure virtual]

spawn a copy of this proc'ess on another site

Implemented in OMK::PvmSvm.

Referenced by addNewSiteToSimulation(), and createDistributedSimulation().

virtual SvmLink* OMK::Svm::createSvmLink ( const int d = 0  )  [protected, pure virtual]

create a link to a distant site

Parameters:
d the site id of the distant site

Implemented in OMK::PvmSvm.

Referenced by addNewSiteToSimulation(), connectToDistributedSimulation(), createDistributedSimulation(), and testIfNewProcessAdded().

virtual int OMK::Svm::getParentSiteId (  )  [protected, pure virtual]

get the site id having spanned this process.

Returns:
the site id, 0 meaning the process wasn't spawned by the virtual machine

Implemented in OMK::PvmSvm.

Referenced by addNewSiteRequest(), connectToDistributedSimulation(), disconnectFromDistributedSimulation(), and init().

virtual int OMK::Svm::getSiteId (  )  [protected, pure virtual]

get the id in the svm of this site

Implemented in OMK::PvmSvm.

virtual void OMK::Svm::groupBarrier ( std::string  groupName,
int  numberToJoin 
) [protected, pure virtual]

block the calling process until numberToJoin processes of group groupName have called groupBarrier

Parameters:
numberToJoin number of processes to wait for
groupName the name of the process group

Implemented in OMK::PvmSvm.

Referenced by syncDistributedSites().

virtual void OMK::Svm::broadcastToGroup ( std::string  groupName,
PvmMessage::MessageTag   
) [protected, pure virtual]

broadcast an empty tagged message to all members of a broadcast group

Implemented in OMK::PvmSvm.

virtual void OMK::Svm::initBeforeMessagePacking (  )  [protected, pure virtual]

do any usefull initialisations before messages are packed

Implemented in OMK::PvmSvm.

virtual int OMK::Svm::nonblockingReceive ( PvmMessage::MessageTag  tag  )  [protected, pure virtual]

try and receive message in a non-blocking fashion the received messages are lost

Parameters:
tag the tag of the messages to receive
Returns:
number of received messages with that tag

Implemented in OMK::PvmSvm.

virtual std::pair<PvmMessage::MessageTag, int> OMK::Svm::waitForAnyRequests ( PvmIncomingMessage receiveBuffer  )  [protected, pure virtual]

wait for any message sent to this site.

This call is blocking

Parameters:
receiveBuffer : the user buffer to use to receive any message a pair giving the tag of the received message and the siteId of the sender

Implemented in OMK::PvmSvm.

Referenced by serveNameRequestsUntilEnd(), and waitForAnswerToBlockingRequest().

virtual void OMK::Svm::joinSvmGroup ( std::string &  groupName  )  [protected, pure virtual]

join a synchronisation group join a group of processes in the siames virtual machine

Parameters:
groupName the name of the group of processes to join. This parameter is not const, because of the underlying C compatibility layer

Implemented in OMK::PvmSvm.

Referenced by connectToDistributedSimulation().

void Svm::serveNameRequestsUntilEnd (  )  [protected, virtual]

for the central site: go into server mode, receiving and processing any reveived request until all sites have disconnected

CHADI WARNING

END CHADI

Definition at line 434 of file OMKSvm.cxx.

References _processTable, OMK::PvmMessage::addNewSite, addNewSiteToSimulation(), disconnectFromDistributedSimulation(), OMK::PvmMessage::EndOfSimulation, OMK::NameServer::getIdentifier(), OMK::PvmIncomingMessage::getMessageDate(), OMK::Name::getNameServer(), OMK::NameServer::getStringAssociatedTo(), OMK::PvmMessage::NameServiceGetId, OMK::PvmMessage::NameServiceGetString, OMK::PvmMessage::NameServiceReturnId, OMK::PvmMessage::NameServiceReturnString, OMK::PvmMessage::NameServiceVerifyLocalNameServer, OMTRACE, OMTRACEID, OMK::PvmSvm::pvmDataEncoding, OMK::Name::setNameServer(), OMK::PvmCentralNameServer::verifyCompatibilityWithLocalNameServer(), and waitForAnyRequests().

Referenced by createDistributedSimulation().

00435 {
00436   bool serving = true ;
00437 
00438   //convert the actual name Server to a PvmCentralNameServer
00439   PvmCentralNameServer * nameServer = new PvmCentralNameServer ( *Name::getNameServer() ) ;
00440   Name::setNameServer (nameServer ) ;
00441 
00442 
00443   PvmIncomingMessage receiveBuffer ;
00444   std::pair<PvmMessage::MessageTag,  int> receivedRequest ;  
00445 
00446   while ( serving )
00447     {
00448 
00449       receivedRequest = waitForAnyRequests ( receiveBuffer ) ;
00450 
00451       switch (receivedRequest.first)
00452         {
00453         case PvmMessage::EndOfSimulation :
00454            {
00455               const Date & endDate (receiveBuffer.getMessageDate() );
00456               Name endedProcess ;
00457               receiveBuffer>>endedProcess ;
00458               cerr<<endedProcess<<" finished at "<<endDate<<endl;
00459               _processTable->erase ( endedProcess ) ;
00460               if ( _processTable->size() == 0 ) 
00461                  {  
00462                     disconnectFromDistributedSimulation ( endDate ) ;
00463                     serving = false ;
00464                  }
00465            }
00466            break;
00467         case PvmMessage::NameServiceGetId :
00468           {
00469             int stringLength ;
00470             receiveBuffer>>stringLength ;
00471             char * stringId = new char[stringLength] ;
00472             receiveBuffer>>stringId ;
00473             assert ( stringId [stringLength - 1] == 0 ) ;
00474             OMTRACEID("PVMSVM",
00475           "Requested id for string "<<stringId<<"| of length "<<stringLength
00476                 <<" from "<< hex << receivedRequest.second << dec);
00477             if ( stringLength ==1)
00478       {
00479         OMTRACE ("Ask for Id of " << stringId <<"|");
00480         Name::idType id = nameServer->getIdentifier( stringId ) ;
00481         OMTRACE ("id=" << id  <<"|");
00482         OMTRACE ("StringAssociatedTo=  " << nameServer->getStringAssociatedTo (id) <<"|");
00483       }
00484       Name::idType id = nameServer->getIdentifier( stringId ) ;
00485             OMTRACEID("PVMSVM"," sending "<<id<< "|");
00486             delete [] stringId ;
00487             pvm_initsend( PvmSvm::pvmDataEncoding ) ;
00488             pvm_pklong (&id, 1, 1);
00489             pvm_send (receivedRequest.second,PvmMessage::NameServiceReturnId) ;  
00490           }
00491           break;
00492         case PvmMessage::NameServiceGetString:
00493           {
00494             Name::idType id ;
00495             receiveBuffer>>id ;
00496             OMTRACEID("PVMSVM",
00497             "Requested String for id "<<id
00498                 <<"| from "<< hex << receivedRequest.second<< dec );
00499       std::string resultingString( nameServer->getStringAssociatedTo (id) ) ;
00500             OMTRACEID("PVMSVM"," sending "<<resultingString<<"|");
00501             pvm_initsend( PvmSvm::pvmDataEncoding ) ;
00502             pvm_pkstr( const_cast<char *>( resultingString.c_str() ) ) ;
00503             pvm_send (receivedRequest.second,PvmMessage::NameServiceReturnString) ;  
00504           }
00505           break;
00506         case PvmMessage::NameServiceVerifyLocalNameServer:
00507           {
00508             nameServer->verifyCompatibilityWithLocalNameServer (receivedRequest.second, receiveBuffer) ;
00509           }
00510           break;
00511           //added by Chadi
00512         case PvmMessage::addNewSite:
00513           {
00514             addNewSiteToSimulation(receiveBuffer.getMessageDate());
00515           }
00516           break;
00517           //end added
00518         default:
00519             OMTRACEID("PVMSVM","Svm::serveNameRequestsUntilEnd() : unexpected request "
00520                                               <<receivedRequest.first);
00521           //an unexpected request is received
00523 //        switch (Controller::warningLevel)
00524 //          {
00525 //          case Controller::AllWarnings :
00526 //            cerr<<"Svm::serveNameRequestsUntilEnd() : unexpected request "
00527 //                <<receivedRequest.first<<endl;
00528 //            break ;
00529 //          case Controller::FatalWarnings :
00530 //            cerr<<"Svm::serveNameRequestsUntilEnd() : unexpected request"
00531 //                <<receivedRequest.first<<endl;
00532 //            exit (2) ;              
00533 //            break;
00534 //          default://nothing to do
00535 //            break;
00536 //          }
00538           break;
00539         }
00540     }
00541 }


Member Data Documentation

PvmIncomingMessage* OMK::Svm::npp

Definition at line 114 of file OMKSvm.h.

Referenced by connectToDistributedSimulation(), and testIfNewProcessAdded().

Date OMK::Svm::dateVar

Definition at line 116 of file OMKSvm.h.

Referenced by addNewSiteToSimulation(), and testIfNewProcessAdded().

bool OMK::Svm::_troubleFlag

Definition at line 190 of file OMKSvm.h.

Referenced by addToDisconnectedTable(), getTroubleFlag(), and removeFromDisconnectedTable().

int OMK::Svm::_simStep

Definition at line 201 of file OMKSvm.h.

Referenced by removeProcessProlog().

std::list<Name> OMK::Svm::_disconnectedProcessusList [protected]

Definition at line 211 of file OMKSvm.h.

Referenced by getDisconnectedProcessusList().

std::map<Name, int> OMK::Svm::_disconnectedProcessusMap [protected]

Definition at line 212 of file OMKSvm.h.

Referenced by getDisconnectedProcessusMap(), and relaxedSynchroniseReceiveAndProcessMessages().

NameToPointerMap<Process>* OMK::Svm::_processTable [protected]

map containing all information about the different processes

Definition at line 286 of file OMKSvm.h.

Referenced by addNewSiteToSimulation(), broadcast(), connectToDistributedSimulation(), createDistributedSimulation(), disconnectFromDistributedSimulation(), getLinkToProcessNamed(), getProcessDescriptorNamed(), processReceivedMessages(), sendCurrentBuffersWithTag(), serveNameRequestsUntilEnd(), Svm(), synchroniseReceiveAndProcessMessages(), testIfNewProcessAdded(), timestampCurrentSendBuffers(), waitAndProcessMessages(), and waitForMessage().

std::vector<int> OMK::Svm::processSiteIds [protected]

Definition at line 290 of file OMKSvm.h.

Referenced by addNewSiteToSimulation(), createDistributedSimulation(), and disconnectFromDistributedSimulation().

NameToPointerMap<Process>* OMK::Svm::_disconnectedTable [protected]

Definition at line 292 of file OMKSvm.h.

Referenced by addToDisconnectedTable(), disconnectedProcess(), disconnectedTableIsEmpty(), getDisconnectedProcessusList(), reconnexionEstablished(), relaxedSynchroniseReceiveAndProcessMessages(), removeFromDisconnectedTable(), removeProcessFromActiveTable(), sendPingMessage(), Svm(), and testIfDisconnectedProcessNamed().

NameToPointerMap<Process>* OMK::Svm::_activeProcessTable [protected]

Definition at line 294 of file OMKSvm.h.

Referenced by relaxedSynchroniseReceiveAndProcessMessages(), removeProcessFromActiveTable(), sendCurrentBuffersWithTag(), and Svm().

NameToPointerMap<Process>* OMK::Svm::_temporaryTable [protected]

Definition at line 296 of file OMKSvm.h.

Referenced by Svm().

NameToPointerMap<Process> OMK::Svm::aRecevoir [protected]

Definition at line 298 of file OMKSvm.h.

Referenced by relaxedSynchroniseReceiveAndProcessMessages(), Svm(), and synchroniseReceiveAndProcessMessages().

bool OMK::Svm::firstTime [protected]

Definition at line 300 of file OMKSvm.h.

Referenced by Svm().

std::map<int,Process *> OMK::Svm::_idToProcessTable [protected]

map storing correspondance between processId and Process

Definition at line 305 of file OMKSvm.h.

Referenced by addNewSiteToSimulation(), connectToDistributedSimulation(), createDistributedSimulation(), testIfNewProcessAdded(), and waitForAnswerToBlockingRequest().

int OMK::Svm::_siteId [protected]

workstation identifier in the svm

Definition at line 309 of file OMKSvm.h.

Referenced by OMK::PvmSvm::getSiteId(), and OMK::PvmSvm::PvmSvm().

SvmLink* OMK::Svm::_linkToCentralSite [protected]

the link to the central site

Definition at line 313 of file OMKSvm.h.

Referenced by connectToDistributedSimulation(), testIfNewProcessAdded(), and waitForMessage().

Name OMK::Svm::_siteName [protected]

Name of the process associated to this site.

Definition at line 317 of file OMKSvm.h.

Referenced by broadcast(), connectToDistributedSimulation(), createDistributedSimulation(), getSiteName(), processReceivedMessages(), relaxedSynchroniseReceiveAndProcessMessages(), sendCurrentBuffersWithTag(), synchroniseReceiveAndProcessMessages(), timestampCurrentSendBuffers(), waitAndProcessMessages(), and waitForMessage().

int OMK::Svm::_numberOfSlaves [protected]

number of slaves ( PvmControllers ) in the simulation

Definition at line 322 of file OMKSvm.h.

Referenced by OMK::PvmSvm::addNewWorkstation(), connectToDistributedSimulation(), syncDistributedSites(), synchronizeOn(), and testIfNewProcessAdded().

const Date OMK::Svm::_synchronisationLatency [protected]

in millisecond, the authorized latency when synchronising all the sites

Definition at line 327 of file OMKSvm.h.

Referenced by getSynchronisationLatency(), relaxedSynchroniseReceiveAndProcessMessages(), and synchroniseReceiveAndProcessMessages().

const Date OMK::Svm::_synchronisationTimeOut [protected]

in millisecond, the authorized timeOut for relaxed synchronizing

Definition at line 331 of file OMKSvm.h.

Referenced by relaxedSynchroniseReceiveAndProcessMessages().

const Date OMK::Svm::_deadReckoningInterval [protected]

in int, the dead reckoning time step interval

Definition at line 335 of file OMKSvm.h.

Referenced by sendCurrentBuffersWithTag().

const bool OMK::Svm::_yieldNeeded [protected]

in int, the dead reckoning time step interval

Definition at line 339 of file OMKSvm.h.

Referenced by relaxedSynchroniseReceiveAndProcessMessages(), and synchroniseReceiveAndProcessMessages().

std::string Svm::_slaveGroupName [static, protected]

name of the group all the slave processes join

Definition at line 343 of file OMKSvm.h.

Referenced by connectToDistributedSimulation(), and syncDistributedSites().

std::string Svm::_masterSiteName [static, protected]

reserved name of the master distributed site

Definition at line 347 of file OMKSvm.h.

Referenced by createDistributedSimulation().


logo OpenMask

Documentation generated on Mon Jun 9 11:46:03 2008

Generated with doxygen by Dimitri van Heesch ,   1997-2007