#include <OMKSvm.h>
Inheritance diagram for OMK::Svm:
Public Member Functions | |
Svm (NameToPointerMap< Process > *tab, const Date &latence, const Date &timeOut, const int deadReckoningInterval, const bool yieldNeeded) | |
constructor | |
virtual | ~Svm () |
Destructor. | |
virtual void | init (const Date &initialSimulationDate) |
initialise the distributed virtual machine | |
virtual void | syncDistributedSites () |
sync all the distributed sites. | |
virtual void | connectToDistributedSimulation () |
connect the distributed virtual machine to its peers | |
virtual void | createDistributedSimulation (const Date &initialSimulationDate) |
create a distributed simulation | |
virtual void | broadcast (PvmMessage::MessageTag, PvmMulticastMessage *message=NULL) |
broadcast a message | |
virtual void | synchronizeOn (PvmController &, PvmMessage::MessageTag tag) |
enter a synchronisation barrier, between all distributed sites except the central site this is a blocking call, but the virtual machine still answer requests. | |
virtual void | disconnectFromDistributedSimulation (const Date &deconnexionDate) |
Disconnect the site from the distributed simulation. | |
virtual const Name & | getSiteName () const |
get the site name in the simulation | |
virtual SvmLink * | getLinkToProcessNamed (const Name &processName) |
get acces to the link to a given process | |
virtual Process * | getProcessDescriptorNamed (const Name &processName) |
get acces to a given process descriptor | |
virtual Process * | testIfDisconnectedProcessNamed (const Name &processName) |
virtual void | testIfNewProcessAdded () |
virtual void | processReceivedMessages (PvmController &parsingController, const PvmMessage::MessageTag tag) |
Attempt to receive and process messages from all sites, in a non blocking fashion. | |
virtual void | waitAndProcessMessages (PvmController &parsingController, const PvmMessage::MessageTag tag) |
receive and process messages from all sites, and wait for the messages to arrive if necessary | |
virtual void | waitForMessage (PvmController &parsingController, const PvmMessage::MessageTag tag) |
wait until a message has arrived from one site | |
virtual void | waitForMessageFrom (PvmController &parsingController, const Name &processName, const PvmMessage::MessageTag tag) |
receive a message from a particular process | |
virtual void | synchroniseReceiveAndProcessMessages (PvmController &parsingController, const PvmMessage::MessageTag tag) |
receive and process messages from all sites (except the central site), and wait until at least a recent (as defined by latency) message has arrived from each of these sites | |
virtual void | relaxedSynchroniseReceiveAndProcessMessages (PvmController &parsingController, const PvmMessage::MessageTag tag) |
virtual Process * | waitForAnswerToBlockingRequest (PvmController &parsingController, PvmMessage::MessageTag tag) |
receive and process messages from all sites (except the central site), and wait until at least on message of tag tag has arrived | |
virtual void | sendCurrentBuffersWithTag (const PvmMessage::MessageTag tag) |
send the messages contained in the send buffers of all links | |
virtual void | timestampCurrentSendBuffers (const Date &date) |
timestamp the messages contained in the send buffers of all links | |
virtual const Date & | getSynchronisationLatency () |
get the authorized latency for the relaxed receive | |
virtual void | addToDisconnectedTable (Name &, Process *) |
virtual void | removeFromDisconnectedTable (NameToPointerMap< Process >::iterator) |
virtual bool | disconnectedProcess (Name &) |
virtual bool | getTroubleFlag () |
virtual bool | disconnectedTableIsEmpty () |
virtual void | addNewSiteToSimulation (const Date &) |
virtual void | addNewSiteRequest (const Date &date) |
added by chadi | |
virtual bool | reconnexionEstablished () |
virtual bool | testIfSiteRecovered (Process *p)=0 |
virtual void | sendPingMessage () |
virtual void | removeProcessFromActiveTable () |
virtual void | removeProcessProlog () |
virtual std::list< Name > | getDisconnectedProcessusList () |
virtual std::map< Name, int > | getDisconnectedProcessusMap () |
Public Attributes | |
PvmIncomingMessage * | npp |
Date | dateVar |
bool | _troubleFlag |
int | _simStep |
Protected Member Functions | |
virtual void | addNewWorkstation (const Name &m)=0 |
add a new machine to the virtual machine | |
virtual void | removeWorkstation (const Name &m)=0 |
remove a work station from the virtual machine | |
virtual int | spawnProcess (Process *p)=0 |
spawn a copy of this proc'ess on another site | |
virtual SvmLink * | createSvmLink (const int &d=0)=0 |
create a link to a distant site | |
virtual int | getParentSiteId ()=0 |
get the site id having spanned this process. | |
virtual int | getSiteId ()=0 |
get the id in the svm of this site | |
virtual void | groupBarrier (std::string groupName, int numberToJoin)=0 |
block the calling process until numberToJoin processes of group groupName have called groupBarrier | |
virtual void | broadcastToGroup (std::string groupName, PvmMessage::MessageTag)=0 |
broadcast an empty tagged message to all members of a broadcast group | |
virtual void | initBeforeMessagePacking ()=0 |
do any usefull initialisations before messages are packed | |
virtual int | nonblockingReceive (PvmMessage::MessageTag tag)=0 |
try and receive message in a non-blocking fashion the received messages are lost | |
virtual std::pair< PvmMessage::MessageTag, int > | waitForAnyRequests (PvmIncomingMessage &receiveBuffer)=0 |
wait for any message sent to this site. | |
virtual void | joinSvmGroup (std::string &groupName)=0 |
join a synchronisation group join a group of processes in the siames virtual machine | |
virtual void | serveNameRequestsUntilEnd () |
for the central site: go into server mode, receiving and processing any reveived request until all sites have disconnected | |
Protected Attributes | |
std::list< Name > | _disconnectedProcessusList |
std::map< Name, int > | _disconnectedProcessusMap |
NameToPointerMap< Process > * | _processTable |
map containing all information about the different processes | |
std::vector< int > | processSiteIds |
NameToPointerMap< Process > * | _disconnectedTable |
NameToPointerMap< Process > * | _activeProcessTable |
NameToPointerMap< Process > * | _temporaryTable |
NameToPointerMap< Process > | aRecevoir |
bool | firstTime |
std::map< int, Process * > | _idToProcessTable |
map storing correspondance between processId and Process | |
int | _siteId |
workstation identifier in the svm | |
SvmLink * | _linkToCentralSite |
the link to the central site | |
Name | _siteName |
Name of the process associated to this site. | |
int | _numberOfSlaves |
number of slaves ( PvmControllers ) in the simulation | |
const Date | _synchronisationLatency |
in millisecond, the authorized latency when synchronising all the sites | |
const Date | _synchronisationTimeOut |
in millisecond, the authorized timeOut for relaxed synchronizing | |
const Date | _deadReckoningInterval |
in int, the dead reckoning time step interval | |
const bool | _yieldNeeded |
in int, the dead reckoning time step interval | |
Static Protected Attributes | |
static std::string | _slaveGroupName |
name of the group all the slave processes join | |
static std::string | _masterSiteName |
reserved name of the master distributed site |
Definition at line 43 of file OMKSvm.h.
Svm::Svm | ( | NameToPointerMap< Process > * | tab, | |
const Date & | latence, | |||
const Date & | timeOut, | |||
const int | deadReckoningInterval, | |||
const bool | yieldNeeded | |||
) |
constructor
tab | a table containing the a map between a processName and the data structure definig a process | |
latence | the authorized synchronisation latency |
Definition at line 58 of file OMKSvm.cxx.
References _activeProcessTable, _disconnectedTable, _processTable, _temporaryTable, aRecevoir, and firstTime.
00059 : 00060 // _processTable ( tab ), 00061 // _activeProcessTable (tab ), 00062 _troubleFlag ( false ), 00063 _simStep ( 0 ), 00064 _siteId ( -1 ), //negative values indicates an error 00065 _numberOfSlaves ( 0 ), 00066 _synchronisationLatency (latence), 00067 _synchronisationTimeOut (timeOut), 00068 _deadReckoningInterval (deadReckoningInterval), 00069 _yieldNeeded (yieldNeeded) 00070 { 00071 _processTable = new NameToPointerMap<Process>(); 00072 for ( NameToPointerMap<Process>::iterator processIterator = tab->begin () ; 00073 processIterator != tab->end () ; 00074 ++processIterator 00075 ) 00076 { 00077 _processTable->addObjectWithIndex(processIterator->first, processIterator->second ); 00078 00079 } 00080 00081 //--------------chadi ajout 1/6 ------------------------ 00082 _temporaryTable = new NameToPointerMap<Process>(); 00083 _disconnectedTable = new NameToPointerMap<Process>(); 00084 _activeProcessTable = new NameToPointerMap<Process>(); 00085 _disconnectedTable->clear(); 00086 for ( NameToPointerMap<Process>::iterator processIterator = _processTable->begin () ; 00087 processIterator != _processTable->end () ; 00088 ++processIterator 00089 ) 00090 { 00091 _activeProcessTable->addObjectWithIndex(processIterator->first, processIterator->second ); 00092 00093 } 00094 00095 firstTime = true; 00096 aRecevoir.clear(); 00097 //--------------fin chadi ajout 1/6 ------------------------ 00098 // TDTD ajout pour simuler des défaillances réseau 00099 #ifdef _MSC_VER 00100 srand( getpid() ) ; 00101 #else 00102 srandom (getpid ()) ; 00103 #endif 00104 }
Svm::~Svm | ( | ) | [virtual] |
void Svm::init | ( | const Date & | initialSimulationDate | ) | [virtual] |
initialise the distributed virtual machine
Definition at line 119 of file OMKSvm.cxx.
References connectToDistributedSimulation(), createDistributedSimulation(), and getParentSiteId().
Referenced by OMK::PvmController::PvmController().
00120 { 00121 if ( getParentSiteId () == 0 ) // wasn't created by a spawn 00122 { 00123 createDistributedSimulation ( initialSimulationDate ) ; 00124 } 00125 else 00126 { 00127 connectToDistributedSimulation () ; 00128 } 00129 }
void Svm::syncDistributedSites | ( | ) | [virtual] |
sync all the distributed sites.
blocks the calling process until all sites except the central site have called this member function
Definition at line 743 of file OMKSvm.cxx.
References _numberOfSlaves, _slaveGroupName, and groupBarrier().
Referenced by connectToDistributedSimulation(), and testIfNewProcessAdded().
00744 { 00745 groupBarrier( _slaveGroupName, _numberOfSlaves ) ; 00746 }
void Svm::connectToDistributedSimulation | ( | ) | [virtual] |
connect the distributed virtual machine to its peers
Definition at line 552 of file OMKSvm.cxx.
References _idToProcessTable, _linkToCentralSite, _numberOfSlaves, _processTable, _siteName, _slaveGroupName, createSvmLink(), OMK::Name::getNameServer(), getParentSiteId(), joinSvmGroup(), npp, OMK::PvmMessage::ProcessTable, OMK::Name::setNameServer(), OMK::Process::setSvmLink(), OMK::PvmMessage::SiteName, syncDistributedSites(), and OMK::SvmLink::waitForMessage().
Referenced by init().
00553 { 00554 //TDTD 00555 //#ifdef _DEBUGPVMEXEC 00556 cerr<<"Svm::connectToDistributedSimulation (): connecting to the distributed simulation"<<endl; 00557 //#endif 00558 00559 PvmIncomingMessage * np = NULL ; 00560 00561 // connect to the global controller 00562 _linkToCentralSite = createSvmLink ( getParentSiteId () ) ; 00563 00564 // join the group of slave processes 00565 joinSvmGroup (_slaveGroupName) ; 00566 00567 // create a distributed version of the nameServer 00568 PvmNameServer * pvmNameServer = new PvmNameServer ( getParentSiteId() , *Name::getNameServer() ) ; 00569 00570 Name::setNameServer ( pvmNameServer ) ; 00571 00572 // find out the name of this site 00573 np = & _linkToCentralSite->waitForMessage (PvmMessage::SiteName) ; 00574 *np >> _siteName ; 00575 npp = np ; 00576 00577 // cout<<"my site name is _siteName: "<<_siteName<<endl; 00578 // NameToPointerMap<Process>::iterator j ; 00579 // for (j = _processTable->begin () ; j != _processTable->end () ; j ++) 00580 // { 00581 // cout<<"connect: _processTable contient: "<<(*j).second->getProcessName()<<endl; 00582 // } 00583 00584 00585 00586 // find out the adresses of the other sites 00587 // - extract the timestamp of the communication 00588 _linkToCentralSite->waitForMessage (PvmMessage::ProcessTable) ; 00589 00590 // - extract, for each process, it's adress in the virtual machine and create a link 00591 NameToPointerMap<Process>::iterator i ; 00592 for (i = _processTable->begin () ; i != _processTable->end () ; i ++) 00593 { 00594 ++_numberOfSlaves ; //we add ourselves, but as _numberOfSlaves is initialised to 0, it's ok 00595 00596 // Remarque : creer un canal sur soi-meme n'est pas genant, 00597 // on met juste a jour les tids dans la table des pss 00598 00599 Process * p = (*i).second ; 00600 00601 // creation d'un canal sans destinataire 00602 p->setSvmLink (createSvmLink ()) ; 00603 00604 // maj du destinataire 00605 *np >> *p ; 00606 00607 _idToProcessTable.insert ( make_pair ( p->getSvmLink()->getTID(),p ) ) ; 00608 00609 cout<<"le TID du process ajoute est : "<< p->getSvmLink()->getTID()<<" et son nom est :"<<p->getProcessName()<<endl; 00610 } 00611 00612 00613 //wait for all other sites to be connected to the distributed simulation. 00614 //avoids sending messages to sites still connected or worse, not yet connected 00615 syncDistributedSites () ; 00616 cout<<"EEEE"<<endl; 00617 }
void Svm::createDistributedSimulation | ( | const Date & | initialSimulationDate | ) | [virtual] |
create a distributed simulation
initialSimulationDate,: | the initial simualtion date needed to start timestamping mexchanged messages |
add the specified machines to the siames virtual machine
Definition at line 359 of file OMKSvm.cxx.
References _idToProcessTable, _masterSiteName, _processTable, _siteName, addNewWorkstation(), createSvmLink(), OMK::SvmLink::getOutgoingBuffer(), OMK::Process::getSvmLink(), OMK::SvmLink::getTID(), OMK::PvmOutgoingMessage::insertTimeStamp(), processSiteIds, OMK::PvmMessage::ProcessTable, OMK::PvmMulticastMessage::send(), OMK::PvmOutgoingMessage::send(), serveNameRequestsUntilEnd(), OMK::Process::setSvmLink(), OMK::PvmMessage::SiteName, and spawnProcess().
Referenced by init().
00360 { 00361 #ifdef _DEBUGPVMEXEC 00362 cerr << "Svm::createDistributedSimulation : creating the distributed simulation" << endl ; 00363 #endif 00364 _siteName = _masterSiteName ; 00365 00367 00368 NameToPointerMap<Process>::iterator i ; 00369 00370 for (i = _processTable->begin () ; i != _processTable->end () ; ++i ) 00371 { 00372 addNewWorkstation ( (*i).second->getHostMachineName () ) ; 00373 } 00374 00375 00376 // creation des processus locaux 00377 #ifdef _DEBUGPVMEXEC 00378 cerr << "Creation des processus locaux..................." << endl ; 00379 #endif 00380 Process * processInfo ; 00381 int adrLoc ; 00382 00383 // std::vector<int> processSiteIds ; 00384 00385 for (i = _processTable->begin () ; i != _processTable->end () ; i ++) 00386 { 00387 processInfo = (*i).second ; 00388 00389 // spawn a new process on the specified workstation 00390 adrLoc = spawnProcess (processInfo) ; 00391 00392 cout<<"my TID is : "<<adrLoc<<endl ; 00393 00394 // create a SvmLink, add it to processInfo 00395 processInfo->setSvmLink (createSvmLink (adrLoc)) ; 00396 00397 _idToProcessTable.insert ( make_pair ( adrLoc,processInfo ) ) ; 00398 00399 processInfo->getSvmLink ()->getOutgoingBuffer().insertTimeStamp ( initialSimulationDate ) ; 00400 00401 // send the name of the process (in the Svm) to the created process 00402 processInfo->getSvmLink ()->getOutgoingBuffer()<<(*i).first ; 00403 00404 processInfo->getSvmLink ()->getOutgoingBuffer().send (PvmMessage::SiteName) ; 00405 00406 //prepare processSiteIds for the next step 00407 processSiteIds.push_back ( processInfo->getSvmLink ()->getTID() ) ; 00408 00409 #ifdef _DEBUGPVMMESS 00410 cerr<<typeid((*i).first).name()<<endl; 00411 cerr<<(*i).first<<" of id "<<processInfo->getSvmLink ()->getTID()<<endl; 00412 #endif 00413 } 00414 00415 00416 // Encodage des attributs Svm de chaque processus 00417 PvmMulticastMessage nameToIdMessage ( processSiteIds ) ; 00418 nameToIdMessage.insertTimeStamp ( initialSimulationDate ) ; 00419 00420 for (i = _processTable->begin () ; i != _processTable->end () ; i ++) 00421 { 00422 nameToIdMessage << *((*i).second) ; 00423 } 00424 00425 // Transfert des adresses vers chaque controleur local 00426 nameToIdMessage.send ( PvmMessage::ProcessTable ) ; 00427 00428 serveNameRequestsUntilEnd () ; 00429 00430 }
void Svm::broadcast | ( | PvmMessage::MessageTag | , | |
PvmMulticastMessage * | message = NULL | |||
) | [virtual] |
broadcast a message
reusing mess
Definition at line 663 of file OMKSvm.cxx.
References _processTable, _siteName, and OMK::PvmOutgoingMessage::insertTimeStamp().
Referenced by synchronizeOn().
00664 { 00665 //the first implementation, commented, dies in pvm with an unexpected error 00666 //I haven't had time to trace the problem, so the second implementation is used 00667 00668 /* first implementation 00669 initBeforeMessagePacking () ; 00670 00671 if ( mess != NULL ) 00672 { 00673 mess->packMessage() ; 00674 } 00675 else 00676 { 00677 mess = new PvmOutgoingMessage () ; 00678 mess->insertTimeStamp(0) ; 00679 *mess << "dummy"; 00680 mess->packMessage() ; 00681 } 00682 00683 broadcastToGroup ( slaveGroup, tag ) ; 00684 */ 00685 00686 #ifdef _DEBUGPVMMESS 00687 cerr<<"Svm::broadcast ("<<tag<<","<<mess<<") ; "<<endl; 00688 #endif 00689 00690 NameToPointerMap<Process>::iterator processIter ; 00691 00692 if (mess == NULL ) 00693 { 00694 std::vector<int> distantRecepients ; 00695 for (processIter = _processTable->begin () ; processIter != _processTable->end () ; ++processIter) 00696 { 00697 if (_siteName != (*processIter).second->getProcessName ()) 00698 { 00699 // On connait alors le canal a observer 00700 distantRecepients.push_back ( (*processIter).second->getSvmLink ()->getTID () ) ; 00701 } 00702 } 00703 00705 mess = new PvmMulticastMessage ( distantRecepients ) ; 00706 mess->insertTimeStamp(0) ; 00707 } 00708 00709 mess -> send ( tag ) ; 00710 00711 }
void Svm::synchronizeOn | ( | PvmController & | , | |
PvmMessage::MessageTag | tag | |||
) | [virtual] |
enter a synchronisation barrier, between all distributed sites except the central site this is a blocking call, but the virtual machine still answer requests.
Definition at line 715 of file OMKSvm.cxx.
References _numberOfSlaves, broadcast(), and waitForAnswerToBlockingRequest().
Referenced by OMK::PvmController::init().
00716 { 00717 broadcast ( tag ) ; 00718 #ifdef _DEBUGPVMSYNCHRO 00719 cerr<< "Svm::waitForAllProcesses("<<tag<<") "<<_numberOfSlaves <<endl; 00720 usleep(1000000) ; 00721 #endif 00722 00723 int numberOfReadyProcesses = 1 ; //this process has entered the barrier 00724 00725 while ( numberOfReadyProcesses != _numberOfSlaves ) 00726 { 00727 00728 // here, we ignore the results, because we suppose one process will only send tag once during synchronization 00729 waitForAnswerToBlockingRequest ( parsingController, tag ) ; 00730 00731 numberOfReadyProcesses++ ; 00732 00733 #ifdef _DEBUGPVMMESS 00734 cerr<< "Svm::exitSoftBarrier numberToEnterBarrier: "<<numberOfReadyProcesses<<endl; 00735 usleep(1000000) ; 00736 #endif 00737 00738 } 00739 }
void Svm::disconnectFromDistributedSimulation | ( | const Date & | deconnexionDate | ) | [virtual] |
Disconnect the site from the distributed simulation.
deconnexionDate. | date at wich the disconnection is happening (to timestamp the disconnection message) This will trigger diconnection on a sites. |
Definition at line 621 of file OMKSvm.cxx.
References _processTable, OMK::PvmMessage::EndOfSimulation, getParentSiteId(), getSiteName(), OMK::PvmOutgoingMessage::insertTimeStamp(), processSiteIds, OMK::PvmMulticastMessage::send(), and OMK::PvmUnicastMessage::send().
Referenced by serveNameRequestsUntilEnd(), and OMK::PvmController::~PvmController().
00622 { 00623 #ifdef _DEBUGPVMEXEC 00624 cout <<"Svm::disconnectFromDistributedSimulation"<< endl ; 00625 #endif 00626 int recepient = getParentSiteId () ; 00627 00628 if ( recepient != 0 ) 00629 // a local PvmController has finished. Notify the central controller 00630 { 00631 PvmUnicastMessage endMessage (recepient) ; 00632 00633 endMessage.insertTimeStamp ( endOfSimulationDate ) ; 00634 00635 endMessage << getSiteName () ; 00636 00637 endMessage.send ( PvmMessage::EndOfSimulation ) ; 00638 } 00639 else 00640 //the central controller is deconnecting. Notify all local controllers 00641 { 00642 std::vector<int> processSiteIds ; 00643 for (NameToPointerMap<Process>::iterator i = _processTable->begin () ; 00644 i != _processTable-> end () ; 00645 i ++) 00646 { 00647 processSiteIds.push_back ( (i->second)->getSvmLink ()->getTID() ) ; 00648 } 00649 PvmMulticastMessage endOfSimulationMessage ( processSiteIds ) ; 00650 00651 endOfSimulationMessage.insertTimeStamp ( endOfSimulationDate ) ; 00652 00653 endOfSimulationMessage << getSiteName () ; 00654 00655 endOfSimulationMessage.send ( PvmMessage::EndOfSimulation ) ; 00656 } 00657 }
const Name & Svm::getSiteName | ( | ) | const [virtual] |
get the site name in the simulation
Definition at line 2247 of file OMKSvm.cxx.
References _siteName.
Referenced by disconnectFromDistributedSimulation(), OMK::PvmController::getProcessName(), and OMK::PvmController::PvmController().
02248 { 02249 return _siteName; 02250 }
get acces to the link to a given process
CHADI WARN
END CHADI
Definition at line 750 of file OMKSvm.cxx.
References _processTable.
Referenced by OMK::PvmController::sendInitialValuesToMirror(), and waitForMessageFrom().
00751 { 00752 // check that we know this process 00753 if( _processTable->find( processName ) != _processTable->end() ) 00754 { 00755 Process *pss = _processTable->getObjectOfIndex( processName ) ; 00756 return pss->getSvmLink() ; 00757 } 00758 else 00759 { 00761 // #ifdef _USESSTREAM 00762 // ostringstream o ; 00763 // o << "Svm::getLinkToProcessNamed : Impossible de trouver le processus " 00764 // << processName << " afin de recuperer son canal !" ; 00765 // Controller::warning (o.str(), Controller::SomeWarnings ) ; 00766 // #else 00767 // ostrstream o ; 00768 // o << "Svm::getLinkToProcessNamed : Impossible de trouver le processus " 00769 // << processName << " afin de recuperer son canal !" ; 00770 // o.put ('\0') ; 00771 // Controller::warning (o.str(), Controller::SomeWarnings ) ; 00772 // delete o.str() ; 00773 // #endif 00775 return 0 ; 00776 } 00777 }
get acces to a given process descriptor
CHADI WARN
CHADI
Definition at line 781 of file OMKSvm.cxx.
References _processTable.
00782 { 00783 // check that we know this process 00784 if( _processTable->find( processName ) != _processTable->end() ) 00785 { 00786 Process *pss = _processTable->getObjectOfIndex( processName ) ; 00787 return pss ; 00788 } 00789 else 00790 { 00792 // #ifdef _USESSTREAM 00793 // ostringstream o ; 00794 // o << "Svm::getProcessDescriptorNamed: process " << processName << " unknown !" ; 00795 // Controller::warning (o.str(), Controller::SomeWarnings ) ; 00796 // #else 00797 // ostrstream o ; 00798 // o << "Svm::getProcessDescriptorNamed: process " << processName << " unknown !" ; 00799 // o.put('\0') ; 00800 // Controller::warning (o.str(), Controller::SomeWarnings ) ; 00801 // delete o.str() ; 00802 // #endif 00804 return 0 ; 00805 } 00806 }
Definition at line 815 of file OMKSvm.cxx.
References _disconnectedTable.
00816 { 00817 // check that we know this process as a deconnected process 00818 if( _disconnectedTable->find( processName ) != _disconnectedTable->end() ) 00819 { 00820 Process *pss = _disconnectedTable->getObjectOfIndex( processName ) ; 00821 return pss ; 00822 } 00823 else 00824 { 00825 return 0 ; 00826 } 00827 }
void Svm::testIfNewProcessAdded | ( | ) | [virtual] |
Definition at line 212 of file OMKSvm.cxx.
References _idToProcessTable, _linkToCentralSite, _numberOfSlaves, _processTable, createSvmLink(), dateVar, OMK::Process::getProcessName(), OMK::PvmMessage::LocalInitSuccessfull, npp, OMK::Process::setSvmLink(), syncDistributedSites(), OMK::SvmLink::testIfRequestUpdateProcessTable(), and OMK::PvmMessage::updateProcessTable.
00213 { 00214 PvmIncomingMessage * message = NULL ; 00215 message = _linkToCentralSite->testIfRequestUpdateProcessTable (PvmMessage::updateProcessTable) ; 00216 if ( message != NULL ) 00217 { 00218 Name zico ; 00219 // *message>>zico ; 00220 // cout<<"le nom de process recu est : "<<zico<<endl; 00221 Process* p = new Process ( "visualProcessC", "maxipes.irisa.fr" ); 00222 00223 // Process *p ; 00224 // *message>> *p ; 00225 _processTable->addObjectWithIndex(p->getProcessName(),p ) ; 00226 ++_numberOfSlaves ; 00227 00228 // Process * p = (*i).second ; 00229 // creation d'un canal sans destinataire 00230 p->setSvmLink (createSvmLink ()) ; 00231 // maj du destinataire ????????????????????????????????? 00232 *npp >> *p ; 00233 00234 _idToProcessTable.insert ( make_pair ( p->getSvmLink()->getTID(),p ) ) ; 00235 00236 PvmUnicastMessage initSucMessage ( p->getSvmLink()->getTID() ) ; 00237 initSucMessage.insertTimeStamp ( dateVar ) ; 00238 initSucMessage.send ( PvmMessage::LocalInitSuccessfull ) ; 00239 00240 NameToPointerMap<Process>::iterator i ; 00241 for (i = _processTable->begin () ; i != _processTable->end () ; i ++) 00242 { 00243 cout<<"_processTable contient: "<<(*i).second->getProcessName()<<endl; 00244 } 00245 00246 syncDistributedSites(); 00247 } 00248 }
void Svm::processReceivedMessages | ( | PvmController & | parsingController, | |
const PvmMessage::MessageTag | tag | |||
) | [virtual] |
Attempt to receive and process messages from all sites, in a non blocking fashion.
parsingController | the controller wich will parse the received messages | |
tag | the tag of the messages to receive |
Definition at line 836 of file OMKSvm.cxx.
References _processTable, _siteName, OMK::PvmIncomingMessage::getMessageDate(), OMK::PvmMessage::getSize(), OMK::PvmIncomingMessage::hasMessage(), OMK::PvmController::parseSynchronisationMessage(), and OMK::SvmLink::testForMessage().
Referenced by OMK::PvmController::run().
00837 { 00838 #ifdef _DEBUGPVMMESS 00839 cerr << "Svm::processReceivedMessages"<<endl; 00840 #endif 00841 NameToPointerMap<Process>::iterator pOMKs ; // Iterateur pour parcours tableau 00842 SvmLink * canal ; // Canal vers le precedent processus 00843 PvmIncomingMessage * messageRecu = NULL ; 00844 for (pOMKs = _processTable->begin () ; 00845 pOMKs != _processTable->end () ; 00846 pOMKs ++) 00847 { 00848 if (_siteName != (*pOMKs).second->getProcessName ()) 00849 { 00850 // On connait alors le canal a observer 00851 canal = (*pOMKs).second->getSvmLink () ; 00852 #ifdef _DEBUGPVMMESS 00853 cout << "Svm::processReceivedMessages from " 00854 << (*pOMKs).second->getProcessName () << " tag : " 00855 << tag << endl ; 00856 //canal->affiche () ; 00857 #endif 00858 // On fait une reception non bloquante 00859 messageRecu = canal->testForMessage (tag) ; 00860 while ( messageRecu->hasMessage () ) 00861 { 00862 #ifdef _DEBUGPVMMESS 00863 cerr << "Svm::processReceivedMessages : message received of size " <<messageRecu->getSize()<< endl; 00864 #endif 00865 #ifdef _DEBUGPVMMESS 00866 cerr << "Svm::processReceivedMessages: extracted emission date: " <<messageRecu->getMessageDate()<< endl; 00867 #endif 00868 //cerr << "Svm::processReceivedMessages : date " << dateEmission << endl ; 00869 // On envoie le message et l'objet au controleur local 00870 parsingController.parseSynchronisationMessage (messageRecu) ; 00871 messageRecu = canal->testForMessage (tag) ; 00872 } 00873 } 00874 } 00875 }
void Svm::waitAndProcessMessages | ( | PvmController & | parsingController, | |
const PvmMessage::MessageTag | tag | |||
) | [virtual] |
receive and process messages from all sites, and wait for the messages to arrive if necessary
parsingController | the controller wich will parse the received messages | |
tag | the tag of the messages to receive |
Definition at line 879 of file OMKSvm.cxx.
References _processTable, _siteName, OMK::PvmController::parseSynchronisationMessage(), OMK::SvmLink::printDebuggingInformation(), and OMK::SvmLink::waitForMessage().
00881 { 00882 #ifdef _DEBUGPVMMESS 00883 cerr<<"Svm::waitAndProcessMessages tag "<<tag<<endl<<"Processus : "<<endl; 00884 NameToPointerMap<Process>::iterator pOMK ; 00885 for (pOMK = _processTable->begin () ; pOMK != _processTable->end () ; pOMK ++) { 00886 cerr<<(*pOMK).second->getProcessName (); 00887 } 00888 cerr<<endl; 00889 #endif 00890 NameToPointerMap<Process>::iterator pOMKs ; // Iterateur pour parcours tableau 00891 SvmLink * canal ; // Canal vers le precedent processus 00892 PvmIncomingMessage * messageRecu ; 00893 #ifdef _DEBUGPVMMESS 00894 cerr<<"Svm::waitAndProcessMessages écoute de tous les processus"<<endl; 00895 #endif 00896 for (pOMKs = _processTable->begin () ; pOMKs!= _processTable->end () ; pOMKs ++) { 00897 #ifdef _DEBUGPVMMESS 00898 cerr<<"Svm::waitAndProcessMessages écoute d'un de plus : "<<(*pOMKs).second->getProcessName ()<<endl; 00899 #endif 00900 if (_siteName != (*pOMKs).second->getProcessName ()) { 00901 // On connait alors le canal a observer 00902 canal = (*pOMKs).second->getSvmLink () ; 00903 #ifdef _DEBUGPVMMESS 00904 cout << "OMKmSvm::waitAndProcessMessages sur " 00905 << (*pOMKs).second->getProcessName () << " tag : " 00906 << tag << endl ; 00907 canal->printDebuggingInformation () ; 00908 #endif 00909 // On recoit le message en bloquant 00910 messageRecu = & canal->waitForMessage (tag) ; 00911 #ifdef _DEBUGPVMMESS 00912 cerr << "Svm::waitAndProcessMessages : message received"<< endl ; 00913 #endif 00914 // On envoie le message et l'objet au controleur local 00915 parsingController.parseSynchronisationMessage (messageRecu) ; 00916 } 00917 } 00918 }
void Svm::waitForMessage | ( | PvmController & | parsingController, | |
const PvmMessage::MessageTag | tag | |||
) | [virtual] |
wait until a message has arrived from one site
parsingController | the controller wich will parse the received messages | |
tag | the tag of the messages to receive |
Definition at line 922 of file OMKSvm.cxx.
References _linkToCentralSite, _processTable, _siteName, OMK::PvmIncomingMessage::hasMessage(), OMK::PvmController::parseSynchronisationMessage(), OMK::SvmLink::printDebuggingInformation(), and OMK::SvmLink::testForMessage().
00923 { 00924 NameToPointerMap<Process>::iterator pOMKs ; // Iterateur pour parcours tableau 00925 SvmLink * canal ; // Canal vers le precedent processus 00926 PvmIncomingMessage * messageRecu = NULL ; 00927 bool rienRecu=true; 00928 while (rienRecu) {//on ne teste pas la nullite du message, 00929 //car en cas de message ne contenant que la date, on le vide ! 00930 for (pOMKs = _processTable->begin () ; pOMKs!= _processTable->end () ; pOMKs ++) { 00931 if (_siteName != (*pOMKs).second->getProcessName ()) { 00932 // On connait alors le canal a observer 00933 canal = (*pOMKs).second->getSvmLink () ; 00934 #ifdef _DEBUGPVMMESS 00935 cout << "OMKmSvm::waitForMessage sur " 00936 << (*pOMKs).second->getProcessName () << " tag : " 00937 << tag << endl ; 00938 canal->printDebuggingInformation () ; 00939 #endif 00940 // On recoit le message en bloquant 00941 messageRecu = canal->testForMessage (tag) ; 00942 // On envoie le message et l'objet au controleur local 00943 if (messageRecu->hasMessage() ) 00944 { 00945 #ifdef _DEBUGPVMMESS 00946 cerr << "Svm::waitForMessage : message received" << endl ; 00947 #endif 00948 parsingController.parseSynchronisationMessage (messageRecu) ; 00949 rienRecu=false; 00950 } 00951 } 00952 } 00953 messageRecu = _linkToCentralSite->testForMessage (tag) ; 00954 //cerr << "Svm::waitForMessage : message recu : " << *messageRecu << endl ; 00955 if (messageRecu->hasMessage() ) 00956 { 00957 #ifdef _DEBUGPVMMESS 00958 cerr << "Svm::waitForMessage : message received" << endl ; 00959 #endif 00960 parsingController.parseSynchronisationMessage (messageRecu) ; 00961 rienRecu=false; 00962 } 00963 } 00964 }
void Svm::waitForMessageFrom | ( | PvmController & | parsingController, | |
const Name & | processName, | |||
const PvmMessage::MessageTag | tag | |||
) | [virtual] |
receive a message from a particular process
parsingController | the controller wich will parse the received messages | |
tag | the tag of the messages to receive | |
processName | the name of the process to receive a message from |
Definition at line 2207 of file OMKSvm.cxx.
References getLinkToProcessNamed(), OMK::PvmController::parseSynchronisationMessage(), OMK::SvmLink::printDebuggingInformation(), and OMK::SvmLink::waitForMessage().
02210 { 02211 SvmLink * canal ; // Canal vers le precedent processus 02212 PvmIncomingMessage * messageRecu = NULL ; 02213 02214 // On recupere le canal a observer 02215 canal = getLinkToProcessNamed (processName) ; 02216 #ifdef _DEBUGPVMMESS 02217 cout << "OMKmSvm::waitForMessageFrom sur " 02218 << processName << " tag : " 02219 << tag << endl ; 02220 canal->printDebuggingInformation() ; 02221 #endif 02222 // On recoit le message en bloquant 02223 messageRecu = & canal->waitForMessage (tag) ; 02224 // On envoie le message et l'objet au controleur local 02225 parsingController.parseSynchronisationMessage (messageRecu) ; 02226 // On libere le message ??? 02227 //delete messageRecu; 02228 }
void Svm::synchroniseReceiveAndProcessMessages | ( | PvmController & | parsingController, | |
const PvmMessage::MessageTag | tag | |||
) | [virtual] |
receive and process messages from all sites (except the central site), and wait until at least a recent (as defined by latency) message has arrived from each of these sites
parsingController | the controller wich will parse the received messages | |
tag | the tag of the messages to receive |
CHADI WARN
Definition at line 2021 of file OMKSvm.cxx.
References _processTable, _siteName, _synchronisationLatency, _yieldNeeded, aRecevoir, OMK::PvmMessage::EndOfSimulation, OMK::PvmMessage::EnterBarrier, OMK::PvmMessage::ExitBarrier, OMK::Process::getProcessName(), OMK::Controller::getSimulatedDate(), OMK::Process::getSvmLink(), OMK::PvmIncomingMessage::hasMessage(), OMK::PvmMessage::InitialValuesForMirror, OMK::PvmMessage::LocalInitSuccessfull, OMK::PvmMessage::MirrorNeedsInitialValues, OMK::PvmMessage::NameServiceGetId, OMK::PvmMessage::NameServiceGetString, OMK::PvmMessage::NameServiceReturnId, OMK::PvmMessage::NameServiceReturnString, OMK::PvmMessage::NameServiceVerifyLocalNameServer, OMK::PvmMessage::NameServiceVerifyResult, OMK::PvmController::parseSynchronisationMessage(), OMK::PvmMessage::ProcessTable, OMK::PvmController::sendInitialValuesToMirror(), OMK::Process::setDateOfLastMessage(), OMK::PvmMessage::SiteName, OMK::PvmMessage::SynchronisationMessage, and OMK::SvmLink::testForAnyMessage().
Referenced by OMK::PvmController::synchronizedReceiveAndProcessMessages().
02022 { 02023 //cout<<"GGGGG"<<endl; 02024 NameToPointerMap<Process> aRecevoir ; 02025 02026 //First make a list of processes that we need to receive values from 02027 for ( NameToPointerMap<Process>::iterator pOMKsE = _processTable->begin () ; 02028 pOMKsE != _processTable->end () ; 02029 pOMKsE ++) 02030 { 02031 if ( (pOMKsE->first != _siteName) && 02032 ( pOMKsE->second->getDateOfLastMessage () + 02033 pOMKsE->second->getPeriod () + 02034 _synchronisationLatency < parsingController.getSimulatedDate () ) ) 02035 { 02036 aRecevoir.addObjectWithIndex (pOMKsE->first, pOMKsE->second) ; 02037 #ifdef _DEBUGPVMMESS 02038 cerr << "Svm::synchroniseReceiveAndProcessMessages: trying reception from: " 02039 << pOMKsE->first << endl ; 02040 #endif 02041 } 02042 } 02043 02044 02045 PvmIncomingMessage receiveBuffer ; 02046 PvmMessage::MessageTag receivedRequest ; 02047 Process * receivingProcess ; 02048 02049 while (! aRecevoir.empty () ) 02050 { 02051 for ( NameToPointerMap<Process>::iterator processIterator = _processTable->begin () ; 02052 processIterator != _processTable->end () ; 02053 ++processIterator 02054 ) 02055 { 02056 //this new implementation doesn't do blocking receives, as some processes could send urgent requests 02057 receivingProcess = processIterator->second ; 02058 02059 receivedRequest = receivingProcess->getSvmLink ()->testForAnyMessage ( receiveBuffer ) ; 02060 02061 if ( receiveBuffer.hasMessage() ) 02062 { 02063 // TDTD ajout de trace pour debug 02064 //std::cerr << "buffer reçu : " << receivedRequest << " à date " << receiveBuffer.getMessageDate () 02065 // << " de " << receivingProcess->getProcessName() << std::endl ; 02066 switch (receivedRequest) 02067 { 02068 case PvmMessage::SynchronisationMessage : 02069 { 02070 //have the controller process the message 02071 parsingController.parseSynchronisationMessage ( &receiveBuffer ) ; 02072 02073 receivingProcess->setDateOfLastMessage ( receiveBuffer.getMessageDate () ) ; 02074 02075 //an additionnal test could be performed here to make sure correct date of last message has happened 02076 aRecevoir.erase ( receivingProcess->getProcessName() ) ; 02077 } 02078 break ; 02079 case PvmMessage::MirrorNeedsInitialValues : 02080 { 02081 parsingController.sendInitialValuesToMirror ( receiveBuffer ) ; 02082 } 02083 break ; 02084 case PvmMessage::InitialValuesForMirror : 02085 { 02086 //have the controller process the message 02087 parsingController.parseSynchronisationMessage ( &receiveBuffer ) ; 02088 02089 } 02090 break ; 02091 case PvmMessage::LocalInitSuccessfull : 02092 case PvmMessage::ProcessTable : 02093 case PvmMessage::EndOfSimulation : 02094 case PvmMessage::EnterBarrier : 02095 case PvmMessage::SiteName : 02096 case PvmMessage::ExitBarrier : 02097 case PvmMessage::NameServiceGetId : 02098 case PvmMessage::NameServiceGetString : 02099 case PvmMessage::NameServiceReturnId : 02100 case PvmMessage::NameServiceReturnString : 02101 case PvmMessage::NameServiceVerifyLocalNameServer : 02102 case PvmMessage::NameServiceVerifyResult : 02103 default: 02104 { 02106 std::cerr << "Svm::waitForAnswerToBlockingRequest(): received unexpected message" << std::endl ; 02107 /* END CHADI */ 02108 } 02109 } 02110 } 02111 } 02112 // TDTD : ajout d'un arrêt pour pouvoir travailler avec PVM sur une seule machine et plusieurs process 02113 if (_yieldNeeded) { 02114 #ifdef _MSC_VER 02115 Sleep( 0 ) ; 02116 #else 02117 usleep (0) ; 02118 #endif 02119 } 02120 } 02121 02122 02123 #ifdef _DEBUGPVMMESS 02124 cerr << "Svm::synchroniseReceiveAndProcessMessages : on n'est pas bloque la " << endl ; 02125 #endif 02126 }
void Svm::relaxedSynchroniseReceiveAndProcessMessages | ( | PvmController & | parsingController, | |
const PvmMessage::MessageTag | tag | |||
) | [virtual] |
Definition at line 1103 of file OMKSvm.cxx.
References _activeProcessTable, _disconnectedProcessusMap, _disconnectedTable, _siteName, _synchronisationLatency, _synchronisationTimeOut, _yieldNeeded, OMK::NameToPointerMap< ObjectType >::addObjectWithIndex(), aRecevoir, OMK::PvmMessage::EndOfSimulation, OMK::PvmMessage::EnterBarrier, OMK::PvmMessage::ExitBarrier, OMK::Process::getProcessName(), OMK::Controller::getSimulatedDate(), OMK::Process::getSvmLink(), OMK::PvmIncomingMessage::hasMessage(), OMK::PvmMessage::InitialValuesForMirror, OMK::PvmMessage::LocalInitSuccessfull, OMK::PvmMessage::MirrorNeedsInitialValues, OMK::PvmMessage::NameServiceGetId, OMK::PvmMessage::NameServiceGetString, OMK::PvmMessage::NameServiceReturnId, OMK::PvmMessage::NameServiceReturnString, OMK::PvmMessage::NameServiceVerifyLocalNameServer, OMK::PvmMessage::NameServiceVerifyResult, OMK::PvmController::parseSynchronisationMessage(), OMK::PvmMessage::ProcessTable, removeProcessProlog(), OMK::PvmController::sendInitialValuesToMirror(), OMK::Process::setDateOfLastMessage(), OMK::PvmMessage::SiteName, OMK::PvmMessage::SynchronisationMessage, and OMK::SvmLink::testForAnyMessage().
Referenced by OMK::PvmController::relaxedReceiveAndProcessMessages().
01103 { 01104 01105 PvmIncomingMessage receiveBuffer ; 01106 PvmMessage::MessageTag receivedRequest ; 01107 Process * receivingProcess ; 01108 static Name visualProcessA ("visualProcessA") ; 01109 static Name visualProcessB ("visualProcessB") ; 01110 static Name visualProcessC ("visualProcessC") ; 01111 static Name visualProcessD ("visualProcessD") ; 01112 01113 //std::cerr <<"Svm:: le controleur est sur le processus : " << _siteName << std::endl ; 01114 // if (_siteName == visualProcessB) { // simulation d'un pb de latence réseau 01115 // int sommeil = random () % 15000 ; 01116 // usleep (sommeil) ; 01117 // //std::cerr <<"ralentissement de : " << sommeil << std::endl ; 01118 // } else if (_siteName == visualProcessC) { // simulation d'un pb de latence réseau 01119 // int sommeil = random () % 30000 ; 01120 // usleep (sommeil) ; 01121 // //std::cerr <<"ralentissement de : " << sommeil << std::endl ; 01122 // } else if (_siteName == visualProcessD) { // simulation d'un pb de latence réseau 01123 // int sommeil = random () % 45000 ; 01124 // usleep (sommeil) ; 01125 // //std::cerr <<"ralentissement de : " << sommeil << std::endl ; 01126 // } 01127 01128 01129 // if (! _disconnectedTable->empty ()) { 01130 01131 // first, we try only once to receive something from the disconnected processes 01132 NameToPointerMap<Process> copyOfDisconnected ; 01133 for (NameToPointerMap<Process>::iterator prIt = _disconnectedTable->begin () ; 01134 prIt != _disconnectedTable->end () ; 01135 ++ prIt) { 01136 copyOfDisconnected.addObjectWithIndex (prIt->first, prIt->second) ; 01137 } 01138 for (NameToPointerMap<Process>::iterator prIt = copyOfDisconnected.begin () ; 01139 prIt != copyOfDisconnected.end () ; 01140 ++ prIt) { 01141 if (prIt->first != _siteName) { 01142 receivingProcess = prIt->second ; 01143 receivedRequest = receivingProcess->getSvmLink ()->testForAnyMessage (receiveBuffer) ; 01144 if (receiveBuffer.hasMessage ()) { 01145 // this process has become active again, we put it in the list of processes that we need to receive values from 01146 // TDTD ajout de trace pour debug 01147 //std::cerr << " 1 buffer reçu : " << receivedRequest << " à date " << receiveBuffer.getMessageDate ()<< " de " << receivingProcess->getProcessName() << std::endl ; 01148 // TDTD essai de mise en commentaire pour voir... c'est normalement fait plus loin, avec un test... 01149 //_activeProcessTable->addObjectWithIndex (prIt->first, prIt->second) ; 01150 // fin TDTD 01151 // TDTD : essai pour voir... 01152 _activeProcessTable->addObjectWithIndex (prIt->first, prIt->second) ; 01153 _disconnectedTable->erase (prIt->second->getProcessName ()) ; 01154 //TDTDTD enlever le processus de la map _disconnectedProcessusMap 01155 _disconnectedProcessusMap.erase (prIt->first) ; 01156 01157 // fin TDTD 01158 switch (receivedRequest) { 01159 case PvmMessage::SynchronisationMessage : { 01160 // cout<<"parsingController.getSimulatedDate() = "<<parsingController.getSimulatedDate()<<endl; 01161 parsingController.parseSynchronisationMessage (&receiveBuffer) ; 01162 receivingProcess->setDateOfLastMessage (receiveBuffer.getMessageDate ()) ; 01163 } 01164 break ; 01165 case PvmMessage::MirrorNeedsInitialValues : { 01166 parsingController.sendInitialValuesToMirror (receiveBuffer) ; 01167 } 01168 break ; 01169 case PvmMessage::InitialValuesForMirror : { 01170 //have the controller process the message 01171 parsingController.parseSynchronisationMessage (&receiveBuffer) ; 01172 } 01173 break ; 01174 case PvmMessage::LocalInitSuccessfull : 01175 // case PvmMessage::pingMessage : 01176 // { 01177 // cout<<"i got the ping message"<<endl; 01178 // _activeProcessTable->addObjectWithIndex(prIt->first, processIterator->second) ; 01179 // _disconnectedTable->erase( prIt->second->getProcessName() ); 01180 // } 01181 break; 01182 case PvmMessage::ProcessTable : 01183 case PvmMessage::EndOfSimulation : 01184 case PvmMessage::EnterBarrier : 01185 case PvmMessage::SiteName : 01186 case PvmMessage::ExitBarrier : 01187 case PvmMessage::NameServiceGetId : 01188 case PvmMessage::NameServiceGetString : 01189 case PvmMessage::NameServiceReturnId : 01190 case PvmMessage::NameServiceReturnString : 01191 case PvmMessage::NameServiceVerifyLocalNameServer : 01192 case PvmMessage::NameServiceVerifyResult : 01193 default: { 01194 std::cerr << "Svm::waitForAnswerToBlockingRequest(): received unexpected message" << std::endl ; 01195 } 01196 } //end switch 01197 } else { 01198 // TDTD 01199 // if the proces was declared active, it is now declared inactive, not to send to him datas that will cost time to read 01200 // so that it will still be late... 01201 if (_activeProcessTable->find (prIt->second->getProcessName ()) != _activeProcessTable->end ()) { 01202 _activeProcessTable->erase (prIt->second->getProcessName ()) ; 01203 } 01204 // fin TDTD 01205 } 01206 } 01207 } //end for 01208 // cout<<"size of _activeProcessTable : "<<_activeProcessTable->size()<<endl; 01209 // } 01210 //second, make a list of processes that we need to receive values from 01211 for (NameToPointerMap<Process>::iterator pOMKsE = _activeProcessTable->begin () ; 01212 pOMKsE != _activeProcessTable->end () ; 01213 pOMKsE ++) { 01214 if ((pOMKsE->first != _siteName) && 01215 (pOMKsE->second->getDateOfLastMessage () + 01216 pOMKsE->second->getPeriod () + 01217 _synchronisationLatency < parsingController.getSimulatedDate ())) { 01218 aRecevoir.addObjectWithIndex (pOMKsE->first, pOMKsE->second) ; 01219 #ifdef _DEBUGPVMMESS 01220 cerr << "Svm::synchroniseReceiveAndProcessMessages: trying reception from: " 01221 << pOMKsE->first << endl ; 01222 #endif 01223 } 01224 } 01225 01226 struct timeval start_timer,end_timer; 01227 gettimeofday(&start_timer, NULL); 01228 double time_in_ms = 0; 01229 01230 // third, try to receive something from the active processes 01231 // valeur initiale proposée par Chadi : 200 ms 01232 while ((time_in_ms < _synchronisationTimeOut) && ! aRecevoir.empty ()) { // TDTD : valeur délicate à estimer pour le cas multi processus sur une seule machine 01233 // TDTD : ici je remplace processTable par activeProcesstable 01234 // for (NameToPointerMap<Process>::iterator processIterator = _processTable->begin () ; 01235 // processIterator != _processTable->end () ; 01236 // ++processIterator) { 01237 for (NameToPointerMap<Process>::iterator processIterator = _activeProcessTable->begin () ; 01238 processIterator != _activeProcessTable->end () ; 01239 ++processIterator) { 01240 // fin TDTD 01241 //this new implementation doesn't do blocking receives, as some processes could send urgent requests 01242 receivingProcess = processIterator->second ; 01243 receivedRequest = receivingProcess->getSvmLink ()->testForAnyMessage (receiveBuffer) ; 01244 if (receiveBuffer.hasMessage ()) { 01245 // TDTD ajout de trace pour debug 01246 //std::cerr << " 2 buffer reçu : " << receivedRequest << " à date " << receiveBuffer.getMessageDate ()<< " de " << receivingProcess->getProcessName() << std::endl ; 01247 //---------------------------chadi ajout 4/6 --------------------- 01248 // aRecevoir.erase ( receivingProcess->getProcessName() ) ; 01249 // start = clock(); 01250 01251 // Date dlm = receivingProcess->getDateOfLastMessage(); 01252 // receivingProcess->setDateOfLastMessage( receiveBuffer.getMessageDate()) ; 01253 // cout<<"date of last message est = "<<dlm<<" et le message ke je viens de recevoir est daté: "<<receivingProcess->getDateOfLastMessage()<<"donc la difference est de l'ordre de :"<<receivingProcess->getDateOfLastMessage()-dlm<<endl; 01254 // TDTD : je mets ce code plus haut 01255 // if (_disconnectedTable->find (processIterator->second->getProcessName ()) != _disconnectedTable->end ()) { 01256 // _activeProcessTable->addObjectWithIndex (processIterator->first, processIterator->second) ; 01257 // _disconnectedTable->erase (processIterator->second->getProcessName ()) ; 01258 // //std::cout<<" j'enleve de la liste de deconnecte le processus suivant: "<<processIterator->first<<std::endl; 01259 // _simStep = 0 ; 01260 // } 01261 // fin TDTD 01262 //-----------------------fin chadi ajout 4/6 ------------------------- 01263 // if (receiveBuffer.getMessageDate()+90 < parsingController.getSimulatedDate()) 01264 // { } 01265 // else { 01266 switch (receivedRequest) { 01267 case PvmMessage::SynchronisationMessage : { 01268 // cout<<"parsingController.getSimulatedDate() = "<<parsingController.getSimulatedDate()<<endl; 01269 // To filter ancient synchro messages 01270 parsingController.parseSynchronisationMessage (&receiveBuffer) ; 01271 receivingProcess->setDateOfLastMessage (receiveBuffer.getMessageDate ()) ; 01272 if (parsingController.getSimulatedDate () < receiveBuffer.getMessageDate () + 30) { 01273 // on a déjà reçu des messages assez récents de ce processus, on n'en demande pas d'autres 01274 aRecevoir.erase (receivingProcess->getProcessName ()) ; 01275 } 01276 } 01277 break ; 01278 case PvmMessage::MirrorNeedsInitialValues : { 01279 parsingController.sendInitialValuesToMirror (receiveBuffer) ; 01280 } break ; 01281 case PvmMessage::InitialValuesForMirror : { 01282 //have the controller process the message 01283 parsingController.parseSynchronisationMessage (&receiveBuffer) ; 01284 } break ; 01285 case PvmMessage::LocalInitSuccessfull : 01286 // case PvmMessage::pingMessage : 01287 // { 01288 // cout<<"i got the ping message"<<endl; 01289 // _activeProcessTable->addObjectWithIndex(processIterator->first, processIterator->second) ; 01290 // _disconnectedTable->erase( processIterator->second->getProcessName() ); 01291 // } 01292 break ; 01293 case PvmMessage::ProcessTable : 01294 case PvmMessage::EndOfSimulation : 01295 case PvmMessage::EnterBarrier : 01296 case PvmMessage::SiteName : 01297 case PvmMessage::ExitBarrier : 01298 case PvmMessage::NameServiceGetId : 01299 case PvmMessage::NameServiceGetString : 01300 case PvmMessage::NameServiceReturnId : 01301 case PvmMessage::NameServiceReturnString : 01302 case PvmMessage::NameServiceVerifyLocalNameServer : 01303 case PvmMessage::NameServiceVerifyResult : 01304 default: { 01305 std::cerr << "Svm::waitForAnswerToBlockingRequest(): received unexpected message" << std::endl ; 01306 } 01307 } //end switch 01308 } 01309 } //end for 01310 01311 gettimeofday (&end_timer, NULL) ; 01312 time_in_ms = (end_timer.tv_sec-start_timer.tv_sec) * 1000.0 + 01313 (end_timer.tv_usec-start_timer.tv_usec) /1000.0 ; 01314 01315 // TDTD : ajout d'un arrêt pour pouvoir travailler avec PVM sur une seule machine et plusieurs process 01316 if (_yieldNeeded) { 01317 #ifdef _MSC_VER 01318 Sleep( 0 ) ; 01319 #else 01320 usleep (0) ; 01321 #endif 01322 } 01323 } //end while 01324 01325 // fourth, remove the inactive processes from the _activeProcessTable and place them in the _disconnectedTable 01326 if (! aRecevoir.empty ()) { 01327 for (NameToPointerMap<Process>::iterator processIterator = aRecevoir.begin () ; 01328 processIterator != aRecevoir.end () ; 01329 ++ processIterator) { 01330 _disconnectedTable->addObjectWithIndex (processIterator->first, processIterator->second) ; 01331 // TDTDTD ajouter le nom du processus dans la map _disconnectedProcessusMap avec la dernière date simulée arrivée 01332 _disconnectedProcessusMap.insert (std::pair<Name, Type::IntType> (processIterator->first, 01333 (int)processIterator->second->getDateOfLastMessage ())) ; 01334 // TDTD : je ne comprends pas trop pourquoi mais ici les lignes suivantes doivent être commentéee... 01335 // if (_activeProcessTable->find (processIterator->second->getProcessName ()) != _disconnectedTable->end ()) { 01336 // _activeProcessTable->erase (processIterator->second->getProcessName ()) ; 01337 // } 01338 // fin TDTD : en fait c'était à cause de l'envoi aux seuls non inactifs (et non pas non en retard), voir la méthode send...withTag... 01339 //std::cout << "je rajoute le processus suivant a la table des deconnectes " << processIterator->second->getProcessName () << std::endl ; 01340 } 01341 aRecevoir.clear () ; 01342 removeProcessProlog () ; 01343 } 01344 // TDTD ajout trace pour debug 01345 //std::cout<<"longueur de la table des deconnectés : "<<_disconnectedTable->size ()<<std::endl; 01346 #ifdef _DEBUGPVMMESS 01347 cerr << "Svm::synchroniseReceiveAndProcessMessages : on n'est pas bloque la " << endl ; 01348 #endif 01349 01350 }
Process * Svm::waitForAnswerToBlockingRequest | ( | PvmController & | parsingController, | |
PvmMessage::MessageTag | tag | |||
) | [virtual] |
receive and process messages from all sites (except the central site), and wait until at least on message of tag tag has arrived
parsingController | the controller wich will parse the received messages | |
tag | the tag of the messages to receive |
CHADI WARN Controller::warning ("Svm::waitForAnswerToBlockingRequest(): received unexpected message", Controller::AllWarnings ) ; END CHADI
Definition at line 2254 of file OMKSvm.cxx.
References _idToProcessTable, OMK::PvmMessage::addNewSite, OMK::PvmMessage::EndOfSimulation, OMK::PvmMessage::EnterBarrier, OMK::PvmMessage::ExitBarrier, OMK::PvmIncomingMessage::getMessageDate(), OMK::PvmMessage::InitialValuesForMirror, OMK::PvmMessage::LocalInitSuccessfull, OMK::PvmMessage::MirrorNeedsInitialValues, OMK::PvmMessage::NameServiceGetId, OMK::PvmMessage::NameServiceGetString, OMK::PvmMessage::NameServiceReturnId, OMK::PvmMessage::NameServiceReturnString, OMK::PvmMessage::NameServiceVerifyLocalNameServer, OMK::PvmMessage::NameServiceVerifyResult, OMK::PvmController::parseSynchronisationMessage(), OMK::PvmMessage::ProcessTable, OMK::PvmController::sendInitialValuesToMirror(), OMK::PvmMessage::SiteName, OMK::PvmMessage::SynchronisationMessage, OMK::PvmMessage::updateProcessTable, and waitForAnyRequests().
Referenced by synchronizeOn(), and OMK::PvmController::waitForAnswerToBlockingRequest().
02255 { 02256 // this implementation assumes that only one thread at a time will enter this function 02257 // numberOfThread is an unsafe method to test this 02258 static int numberOfThreads = 0 ; 02259 ++numberOfThreads ; 02260 assert ( numberOfThreads == 1) ; 02261 02262 Process * result ; 02263 02264 PvmIncomingMessage receiveBuffer ; 02265 std::pair<PvmMessage::MessageTag, int> receivedRequest ; 02266 02267 bool serving = true ; 02268 02269 while ( serving ) 02270 { 02271 02272 receivedRequest = waitForAnyRequests ( receiveBuffer ) ; 02273 02274 //find the sender process (result of this member function) 02275 map<int,Process *>::iterator i = _idToProcessTable.find ( receivedRequest.second ) ; 02276 02277 // cout<<"receivedRequest.second est egale : "<<receivedRequest.second<<endl; 02278 // cout<<"receivedRequest.first est egale : "<<receivedRequest.first<<endl; 02279 // the message should come from a known process 02280 assert ( i != _idToProcessTable.end() ) ; 02281 02282 result = i->second ; 02283 02284 switch (receivedRequest.first) 02285 { 02286 case PvmMessage::SynchronisationMessage : 02287 { 02288 //have the controller process the message 02289 parsingController.parseSynchronisationMessage ( &receiveBuffer ) ; 02290 02291 assert ( result != NULL ) ; 02292 02293 result->setDateOfLastMessage ( receiveBuffer.getMessageDate () ) ; 02294 } 02295 break ; 02296 case PvmMessage::MirrorNeedsInitialValues : 02297 { 02298 parsingController.sendInitialValuesToMirror ( receiveBuffer ) ; 02299 } 02300 break ; 02301 case PvmMessage::InitialValuesForMirror : 02302 { 02303 //have the controller process the message 02304 parsingController.parseSynchronisationMessage ( &receiveBuffer ) ; 02305 02306 } 02307 break ; 02308 case PvmMessage::LocalInitSuccessfull : 02309 case PvmMessage::ProcessTable : 02310 case PvmMessage::EndOfSimulation : 02311 case PvmMessage::EnterBarrier : 02312 case PvmMessage::SiteName : 02313 case PvmMessage::ExitBarrier : 02314 case PvmMessage::NameServiceGetId : 02315 case PvmMessage::NameServiceGetString : 02316 case PvmMessage::NameServiceReturnId : 02317 case PvmMessage::NameServiceReturnString : 02318 case PvmMessage::NameServiceVerifyLocalNameServer : 02319 case PvmMessage::NameServiceVerifyResult : 02320 case PvmMessage::updateProcessTable : 02321 case PvmMessage::addNewSite : 02322 default: 02323 { 02328 } 02329 } 02330 02331 serving = ( receivedRequest.first != tag ) ; 02332 02333 } 02334 02335 --numberOfThreads ; 02336 02337 return result ; 02338 }
void Svm::sendCurrentBuffersWithTag | ( | const PvmMessage::MessageTag | tag | ) | [virtual] |
send the messages contained in the send buffers of all links
tag | the tag to associate to the message |
Definition at line 2133 of file OMKSvm.cxx.
References _activeProcessTable, _deadReckoningInterval, _processTable, _siteName, OMK::SynchronisationMessage::endOfSynchronisationFragment, OMK::PvmOutgoingMessage::flushCurrentBuffer(), OMK::SvmLink::getOutgoingBuffer(), OMK::SvmLink::sendOutgoingBuffer(), and OMK::PvmMessage::SynchronisationMessage.
Referenced by OMK::PvmController::advanceSimulatedDate(), OMK::PvmController::computeNextSimulationStep(), OMK::PvmController::init(), and OMK::PvmController::run().
02134 { 02135 NameToPointerMap<Process>::iterator pOMKs ; // Iterateur pour parcours tableau 02136 SvmLink * canal ; // Canal vers le precedent processus 02137 static int nbSteps = 1 ; 02138 #ifdef _DEBUGPVMMESS 02139 cerr<<"Svm::sendCurrentBufferWithTag tag "<<tag<<endl<<"Processus : "<<endl; 02140 NameToPointerMap<Process>::iterator pOMK ; 02141 for (pOMK = _processTable->begin () ; pOMK != _processTable->end () ; pOMK ++) 02142 { 02143 cerr<<(*pOMK).second->getProcessName (); 02144 } 02145 cerr<<endl; 02146 #endif 02147 02148 if (_deadReckoningInterval > 0) { 02149 nbSteps = (nbSteps + 1) % _deadReckoningInterval ; 02150 } 02151 02152 // TDTD on envoie seulement les synchros aux actifs... 02153 for (pOMKs = _processTable->begin () ; pOMKs != _processTable->end () ; pOMKs ++) 02154 //for (pOMKs = _activeProcessTable->begin () ; pOMKs != _activeProcessTable->end () ; pOMKs ++) 02155 // fin TDTD 02156 { 02157 #ifdef _DEBUGPVMMESS 02158 cerr<<"Traitement des emissions vers "<<(*pOMKs).second->getProcessName ()<<endl; 02159 #endif 02160 if (_siteName != (*pOMKs).second->getProcessName ()) 02161 { 02162 02163 // On connait alors le canal a observer 02164 canal = (*pOMKs).second->getSvmLink () ; 02165 02166 if ( tag == PvmMessage::SynchronisationMessage ) 02167 { 02168 (*pOMKs).second->getSvmLink ()->getOutgoingBuffer()<<SynchronisationMessage::endOfSynchronisationFragment ; 02169 02170 } 02171 02172 // On transmet le message du canal 02173 02174 // TDTD ajout pour purger les envois aux inactifs 02175 //std::cerr << "sur site : " << _siteName << " : " ; 02176 if (_activeProcessTable->find ((*pOMKs).second->getProcessName ()) != _activeProcessTable->end ()) { 02177 //std::cerr << "envoi de syncho à actif : " << (*pOMKs).second->getProcessName () ; 02178 canal->sendOutgoingBuffer ( tag ) ; 02179 } else { 02180 if (nbSteps == 0) { // un coup de dead-reckoning ? 02181 //std::cerr << "envoi de syncho à inactif : " << (*pOMKs).second->getProcessName () ; 02182 canal->sendOutgoingBuffer ( tag ) ; 02183 } else { 02184 canal->getOutgoingBuffer().flushCurrentBuffer () ; 02185 //std::cerr << "pas d'envoi de syncho à inactif : " << (*pOMKs).second->getProcessName () ; 02186 } 02187 } 02188 //std::cerr << endl ; 02189 // fin TDTD 02190 #ifdef _DEBUGPVMMESS 02191 cerr << "OMKmSvm::transmission avec tag : " 02192 << tag << endl ; 02193 #endif 02194 } 02195 #ifdef _DEBUGPVMMESS 02196 else 02197 { 02198 cerr << "Rien à faire "<<endl ; 02199 //canal->affiche () ; 02200 } 02201 #endif 02202 } 02203 }
void Svm::timestampCurrentSendBuffers | ( | const Date & | date | ) | [virtual] |
timestamp the messages contained in the send buffers of all links
Definition at line 2234 of file OMKSvm.cxx.
References _processTable, and _siteName.
Referenced by OMK::PvmController::advanceSimulatedDate(), and OMK::PvmController::init().
02235 { 02236 NameToPointerMap<Process>::iterator i ; 02237 for (i = _processTable->begin () ; i != _processTable->end () ; i ++) 02238 { 02239 if (_siteName != (*i).second->getProcessName ()) 02240 { 02241 i->second->getSvmLink ()->getOutgoingBuffer ().insertTimeStamp ( date ) ; 02242 } 02243 } 02244 }
const Date & Svm::getSynchronisationLatency | ( | ) | [virtual] |
get the authorized latency for the relaxed receive
Definition at line 112 of file OMKSvm.cxx.
References _synchronisationLatency.
Referenced by OMK::PvmController::getOutputHistorySize(), OMK::PvmController::getPurgeDate(), and OMK::PvmController::run().
00113 { 00114 return _synchronisationLatency; 00115 }
Definition at line 2344 of file OMKSvm.cxx.
References _disconnectedTable, and _troubleFlag.
02346 { 02347 //*************************** 02348 // if ( processIterator->second->getProcessName()== "visualProcessC" ) 02349 // { } 02350 // else { 02351 //**************************** 02352 02353 02354 _disconnectedTable->addObjectWithIndex(nameOfProcess, pointerToTheProcess ); 02355 // _everybodyTable->erase(processIterator->second->getProcessName()); 02356 // } 02357 _troubleFlag = true ; 02358 }
void Svm::removeFromDisconnectedTable | ( | NameToPointerMap< Process >::iterator | ) | [virtual] |
Definition at line 2360 of file OMKSvm.cxx.
References _disconnectedTable, and _troubleFlag.
02361 { 02362 //*************************** 02363 // if ( processIterator->second->getProcessName()== "visualProcessC" ) 02364 // { } 02365 // else { 02366 //**************************** 02367 // _everybodyTable->addObjectWithIndex(processIterator->first,processIterator->second ); 02368 02369 _disconnectedTable->erase( processIterator->second->getProcessName() ); 02370 // } 02371 _troubleFlag = false ; 02372 }
Definition at line 2377 of file OMKSvm.cxx.
References _disconnectedTable.
Referenced by OMK::PvmController::getIsolatedMirrorList(), and OMK::PvmController::getNonIsolatedMirrorList().
02378 { 02379 if (_disconnectedTable->find(zico)!=_disconnectedTable->end()) 02380 { return true;} 02381 else 02382 return false; 02383 }
bool Svm::getTroubleFlag | ( | ) | [virtual] |
Definition at line 2387 of file OMKSvm.cxx.
References _troubleFlag.
02388 { 02389 if (_troubleFlag == true) 02390 { return true; } 02391 return false; 02392 }
bool Svm::disconnectedTableIsEmpty | ( | ) | [virtual] |
Definition at line 2396 of file OMKSvm.cxx.
References _disconnectedTable.
Referenced by OMK::PvmController::getNonIsolatedMirrorList(), and OMK::PvmController::testIfTroubleOccured().
02397 { 02398 if ( _disconnectedTable->empty() ) 02399 { 02400 return true; 02401 } 02402 return false; 02403 }
void Svm::addNewSiteToSimulation | ( | const Date & | ) | [virtual] |
Definition at line 136 of file OMKSvm.cxx.
References _idToProcessTable, _processTable, addNewWorkstation(), createSvmLink(), dateVar, OMK::Process::getHostMachineName(), OMK::SvmLink::getOutgoingBuffer(), OMK::Process::getProcessName(), OMK::Process::getSvmLink(), OMK::SvmLink::getTID(), OMK::PvmOutgoingMessage::insertTimeStamp(), processSiteIds, OMK::PvmMessage::ProcessTable, OMK::PvmMulticastMessage::send(), OMK::PvmUnicastMessage::send(), OMK::PvmOutgoingMessage::send(), OMK::Process::setSvmLink(), OMK::PvmMessage::SiteName, spawnProcess(), and OMK::PvmMessage::updateProcessTable.
Referenced by serveNameRequestsUntilEnd().
00137 { 00138 dateVar = date ; 00139 00140 Process * theNewProcessToAdd = new Process ( "visualProcessC", "maxipes.irisa.fr" ); 00141 00142 00144 00145 // addNewWorkstation ( "barbux.irisa.fr" ) ; 00146 addNewWorkstation ( theNewProcessToAdd->getHostMachineName () ) ; 00147 00148 _processTable->addObjectWithIndex ("visualProcessC", theNewProcessToAdd ) ; 00149 00150 00151 int adrLoc ; 00152 00153 int processSiteId ; 00154 00155 cout<<"New Process will be Spawned .....///"<<endl; 00156 cout<<"theNewProcessToAdd is named : "<<theNewProcessToAdd->getProcessName()<<endl; 00157 00158 // spawn a new process on the specified workstation 00159 adrLoc = spawnProcess (theNewProcessToAdd) ; 00160 00161 cout<<"New Process Spawned .....valeur de spawn est egale a : "<<adrLoc<<endl; 00162 00163 // create a SvmLink, add it to processInfo 00164 theNewProcessToAdd->setSvmLink (createSvmLink (adrLoc)) ; 00165 00166 _idToProcessTable.insert ( make_pair ( adrLoc,theNewProcessToAdd ) ) ; 00167 00168 theNewProcessToAdd->getSvmLink ()->getOutgoingBuffer().insertTimeStamp ( date ) ; 00169 00170 00171 // send the name of the process (in the Svm) to the created process 00172 theNewProcessToAdd->getSvmLink ()->getOutgoingBuffer()<<theNewProcessToAdd->getProcessName() ; 00173 00174 theNewProcessToAdd->getSvmLink ()->getOutgoingBuffer().send (PvmMessage::SiteName) ; 00175 00176 //prepare processSiteIds for the next step 00177 processSiteId = theNewProcessToAdd->getSvmLink ()->getTID() ; 00178 // processSiteIds.push_back ( theNewProcessToAdd->getSvmLink ()->getTID() ) ; 00179 00180 00181 // Encodage des attributs Svm de chaque processus 00182 PvmUnicastMessage nameToIdMessage ( processSiteId ) ; 00183 nameToIdMessage.insertTimeStamp ( date ) ; 00184 00186 NameToPointerMap<Process>::iterator i ; 00187 for (i = _processTable->begin () ; i != _processTable->end () ; i ++) 00188 { 00189 nameToIdMessage << *((*i).second) ; 00190 } 00191 00192 // Transfert des adresses vers le controleur local du process ajouté 00193 nameToIdMessage.send ( PvmMessage::ProcessTable ) ; 00194 00195 00196 // envoyer le processTable Update a tt le monde 00197 PvmMulticastMessage updateProcessTableMessage ( processSiteIds ) ; 00198 updateProcessTableMessage.insertTimeStamp ( date ) ; 00199 00200 updateProcessTableMessage << *theNewProcessToAdd ; 00201 // updateProcessTableMessage << (theNewProcessToAdd->getProcessName()) ; 00202 cout<<" theNewProcessToAdd->getProcessName() = "<<theNewProcessToAdd->getProcessName()<<endl; 00203 00204 updateProcessTableMessage.send ( PvmMessage::updateProcessTable ) ; 00205 00206 00207 processSiteIds.push_back ( theNewProcessToAdd->getSvmLink ()->getTID() ) ; 00208 cout<<"Finishing method addNewSiteToSimulation"<<endl; 00209 00210 }
void Svm::addNewSiteRequest | ( | const Date & | date | ) | [virtual] |
added by chadi
Definition at line 543 of file OMKSvm.cxx.
References OMK::PvmMessage::addNewSite, getParentSiteId(), OMK::PvmOutgoingMessage::insertTimeStamp(), and OMK::PvmUnicastMessage::send().
Referenced by OMK::PvmController::createNewSite().
00544 { 00545 PvmUnicastMessage mess ( getParentSiteId () ) ; 00546 mess.insertTimeStamp ( date ) ; 00547 mess.send ( PvmMessage::addNewSite ) ; 00548 }
bool Svm::reconnexionEstablished | ( | ) | [virtual] |
Definition at line 2407 of file OMKSvm.cxx.
References _disconnectedTable, and testIfSiteRecovered().
02408 { 02409 NameToPointerMap<Process>::iterator prIt = _disconnectedTable->begin () ; 02410 02411 // if ( testIfSiteRecovered( (*prIt).second->getHostMachineName() ) ) 02412 if ( testIfSiteRecovered( (*prIt).second ) ) 02413 { 02414 // cout<<" Svm::reconnexionEstablished OK"<<endl; 02415 return true ; 02416 } 02417 02418 else 02419 { 02420 // cout<<" Svm::reconnexionEstablished NOTOK"<<endl; 02421 return false ; 02422 } 02423 02424 }
void Svm::sendPingMessage | ( | ) | [virtual] |
Definition at line 2428 of file OMKSvm.cxx.
References _disconnectedTable, and OMK::PvmMessage::pingMessage.
02429 { 02430 for ( NameToPointerMap<Process>::iterator prIt = _disconnectedTable->begin () ; 02431 prIt != _disconnectedTable->end () ; 02432 ++prIt 02433 ) 02434 { 02435 ((*prIt).second)->getSvmLink()->getOutgoingBuffer()<<"ping" ; 02436 ((*prIt).second)->getSvmLink()->getOutgoingBuffer().send(PvmMessage::pingMessage) ; 02437 cout<<"message ping est fait "<<endl; 02438 } 02439 }
void Svm::removeProcessFromActiveTable | ( | ) | [virtual] |
Definition at line 1669 of file OMKSvm.cxx.
References _activeProcessTable, and _disconnectedTable.
Referenced by removeProcessProlog().
01670 { 01671 for ( NameToPointerMap<Process>::iterator processIterator = _disconnectedTable->begin () ; 01672 processIterator != _disconnectedTable->end () ; 01673 ++processIterator 01674 ) 01675 { 01676 _activeProcessTable->erase( processIterator->first ) ; 01677 // cout<<"le process "<<processIterator->first<<" a ete vire de la liste active"<<endl; 01678 } 01679 // _temporaryTable->clear() ; 01680 // _simStep = 0 ; 01681 }
void Svm::removeProcessProlog | ( | ) | [virtual] |
Definition at line 1658 of file OMKSvm.cxx.
References _simStep, and removeProcessFromActiveTable().
Referenced by relaxedSynchroniseReceiveAndProcessMessages().
01659 { 01660 _simStep++ ; 01661 //cout<<"_simStep est egale :"<<_simStep<<endl; 01662 if (_simStep == 2) { 01663 removeProcessFromActiveTable() ; 01664 // cout<<"_simStep est egale (2) :"<<_simStep<<endl; 01665 _simStep = 0 ; 01666 } 01667 }
std::list< Name > Svm::getDisconnectedProcessusList | ( | ) | [virtual] |
Definition at line 2445 of file OMKSvm.cxx.
References _disconnectedProcessusList, and _disconnectedTable.
Referenced by OMK::PvmController::getDisconnectedProcessusList().
02445 { 02446 _disconnectedProcessusList.clear () ; 02447 for (NameToPointerMap<Process>::iterator prIt = _disconnectedTable->begin () ; 02448 prIt != _disconnectedTable->end () ; 02449 prIt ++) { 02450 _disconnectedProcessusList.push_back ((*prIt).first) ; 02451 } 02452 return _disconnectedProcessusList ; 02453 }
Definition at line 2457 of file OMKSvm.cxx.
References _disconnectedProcessusMap.
Referenced by OMK::PvmController::getDisconnectedProcessusMap().
02457 { 02458 return _disconnectedProcessusMap ; 02459 }
virtual void OMK::Svm::addNewWorkstation | ( | const Name & | m | ) | [protected, pure virtual] |
add a new machine to the virtual machine
Implemented in OMK::PvmSvm.
Referenced by addNewSiteToSimulation(), and createDistributedSimulation().
virtual void OMK::Svm::removeWorkstation | ( | const Name & | m | ) | [protected, pure virtual] |
spawn a copy of this proc'ess on another site
Implemented in OMK::PvmSvm.
Referenced by addNewSiteToSimulation(), and createDistributedSimulation().
create a link to a distant site
d | the site id of the distant site |
Implemented in OMK::PvmSvm.
Referenced by addNewSiteToSimulation(), connectToDistributedSimulation(), createDistributedSimulation(), and testIfNewProcessAdded().
virtual int OMK::Svm::getParentSiteId | ( | ) | [protected, pure virtual] |
get the site id having spanned this process.
Implemented in OMK::PvmSvm.
Referenced by addNewSiteRequest(), connectToDistributedSimulation(), disconnectFromDistributedSimulation(), and init().
virtual int OMK::Svm::getSiteId | ( | ) | [protected, pure virtual] |
virtual void OMK::Svm::groupBarrier | ( | std::string | groupName, | |
int | numberToJoin | |||
) | [protected, pure virtual] |
block the calling process until numberToJoin processes of group groupName have called groupBarrier
numberToJoin | number of processes to wait for | |
groupName | the name of the process group |
Implemented in OMK::PvmSvm.
Referenced by syncDistributedSites().
virtual void OMK::Svm::broadcastToGroup | ( | std::string | groupName, | |
PvmMessage::MessageTag | ||||
) | [protected, pure virtual] |
virtual void OMK::Svm::initBeforeMessagePacking | ( | ) | [protected, pure virtual] |
virtual int OMK::Svm::nonblockingReceive | ( | PvmMessage::MessageTag | tag | ) | [protected, pure virtual] |
try and receive message in a non-blocking fashion the received messages are lost
tag | the tag of the messages to receive |
Implemented in OMK::PvmSvm.
virtual std::pair<PvmMessage::MessageTag, int> OMK::Svm::waitForAnyRequests | ( | PvmIncomingMessage & | receiveBuffer | ) | [protected, pure virtual] |
wait for any message sent to this site.
This call is blocking
receiveBuffer | : the user buffer to use to receive any message a pair giving the tag of the received message and the siteId of the sender |
Implemented in OMK::PvmSvm.
Referenced by serveNameRequestsUntilEnd(), and waitForAnswerToBlockingRequest().
virtual void OMK::Svm::joinSvmGroup | ( | std::string & | groupName | ) | [protected, pure virtual] |
join a synchronisation group join a group of processes in the siames virtual machine
groupName | the name of the group of processes to join. This parameter is not const, because of the underlying C compatibility layer |
Implemented in OMK::PvmSvm.
Referenced by connectToDistributedSimulation().
void Svm::serveNameRequestsUntilEnd | ( | ) | [protected, virtual] |
for the central site: go into server mode, receiving and processing any reveived request until all sites have disconnected
CHADI WARNING
END CHADI
Definition at line 434 of file OMKSvm.cxx.
References _processTable, OMK::PvmMessage::addNewSite, addNewSiteToSimulation(), disconnectFromDistributedSimulation(), OMK::PvmMessage::EndOfSimulation, OMK::NameServer::getIdentifier(), OMK::PvmIncomingMessage::getMessageDate(), OMK::Name::getNameServer(), OMK::NameServer::getStringAssociatedTo(), OMK::PvmMessage::NameServiceGetId, OMK::PvmMessage::NameServiceGetString, OMK::PvmMessage::NameServiceReturnId, OMK::PvmMessage::NameServiceReturnString, OMK::PvmMessage::NameServiceVerifyLocalNameServer, OMTRACE, OMTRACEID, OMK::PvmSvm::pvmDataEncoding, OMK::Name::setNameServer(), OMK::PvmCentralNameServer::verifyCompatibilityWithLocalNameServer(), and waitForAnyRequests().
Referenced by createDistributedSimulation().
00435 { 00436 bool serving = true ; 00437 00438 //convert the actual name Server to a PvmCentralNameServer 00439 PvmCentralNameServer * nameServer = new PvmCentralNameServer ( *Name::getNameServer() ) ; 00440 Name::setNameServer (nameServer ) ; 00441 00442 00443 PvmIncomingMessage receiveBuffer ; 00444 std::pair<PvmMessage::MessageTag, int> receivedRequest ; 00445 00446 while ( serving ) 00447 { 00448 00449 receivedRequest = waitForAnyRequests ( receiveBuffer ) ; 00450 00451 switch (receivedRequest.first) 00452 { 00453 case PvmMessage::EndOfSimulation : 00454 { 00455 const Date & endDate (receiveBuffer.getMessageDate() ); 00456 Name endedProcess ; 00457 receiveBuffer>>endedProcess ; 00458 cerr<<endedProcess<<" finished at "<<endDate<<endl; 00459 _processTable->erase ( endedProcess ) ; 00460 if ( _processTable->size() == 0 ) 00461 { 00462 disconnectFromDistributedSimulation ( endDate ) ; 00463 serving = false ; 00464 } 00465 } 00466 break; 00467 case PvmMessage::NameServiceGetId : 00468 { 00469 int stringLength ; 00470 receiveBuffer>>stringLength ; 00471 char * stringId = new char[stringLength] ; 00472 receiveBuffer>>stringId ; 00473 assert ( stringId [stringLength - 1] == 0 ) ; 00474 OMTRACEID("PVMSVM", 00475 "Requested id for string "<<stringId<<"| of length "<<stringLength 00476 <<" from "<< hex << receivedRequest.second << dec); 00477 if ( stringLength ==1) 00478 { 00479 OMTRACE ("Ask for Id of " << stringId <<"|"); 00480 Name::idType id = nameServer->getIdentifier( stringId ) ; 00481 OMTRACE ("id=" << id <<"|"); 00482 OMTRACE ("StringAssociatedTo= " << nameServer->getStringAssociatedTo (id) <<"|"); 00483 } 00484 Name::idType id = nameServer->getIdentifier( stringId ) ; 00485 OMTRACEID("PVMSVM"," sending "<<id<< "|"); 00486 delete [] stringId ; 00487 pvm_initsend( PvmSvm::pvmDataEncoding ) ; 00488 pvm_pklong (&id, 1, 1); 00489 pvm_send (receivedRequest.second,PvmMessage::NameServiceReturnId) ; 00490 } 00491 break; 00492 case PvmMessage::NameServiceGetString: 00493 { 00494 Name::idType id ; 00495 receiveBuffer>>id ; 00496 OMTRACEID("PVMSVM", 00497 "Requested String for id "<<id 00498 <<"| from "<< hex << receivedRequest.second<< dec ); 00499 std::string resultingString( nameServer->getStringAssociatedTo (id) ) ; 00500 OMTRACEID("PVMSVM"," sending "<<resultingString<<"|"); 00501 pvm_initsend( PvmSvm::pvmDataEncoding ) ; 00502 pvm_pkstr( const_cast<char *>( resultingString.c_str() ) ) ; 00503 pvm_send (receivedRequest.second,PvmMessage::NameServiceReturnString) ; 00504 } 00505 break; 00506 case PvmMessage::NameServiceVerifyLocalNameServer: 00507 { 00508 nameServer->verifyCompatibilityWithLocalNameServer (receivedRequest.second, receiveBuffer) ; 00509 } 00510 break; 00511 //added by Chadi 00512 case PvmMessage::addNewSite: 00513 { 00514 addNewSiteToSimulation(receiveBuffer.getMessageDate()); 00515 } 00516 break; 00517 //end added 00518 default: 00519 OMTRACEID("PVMSVM","Svm::serveNameRequestsUntilEnd() : unexpected request " 00520 <<receivedRequest.first); 00521 //an unexpected request is received 00523 // switch (Controller::warningLevel) 00524 // { 00525 // case Controller::AllWarnings : 00526 // cerr<<"Svm::serveNameRequestsUntilEnd() : unexpected request " 00527 // <<receivedRequest.first<<endl; 00528 // break ; 00529 // case Controller::FatalWarnings : 00530 // cerr<<"Svm::serveNameRequestsUntilEnd() : unexpected request" 00531 // <<receivedRequest.first<<endl; 00532 // exit (2) ; 00533 // break; 00534 // default://nothing to do 00535 // break; 00536 // } 00538 break; 00539 } 00540 } 00541 }
Definition at line 114 of file OMKSvm.h.
Referenced by connectToDistributedSimulation(), and testIfNewProcessAdded().
Definition at line 116 of file OMKSvm.h.
Referenced by addNewSiteToSimulation(), and testIfNewProcessAdded().
Definition at line 190 of file OMKSvm.h.
Referenced by addToDisconnectedTable(), getTroubleFlag(), and removeFromDisconnectedTable().
std::list<Name> OMK::Svm::_disconnectedProcessusList [protected] |
std::map<Name, int> OMK::Svm::_disconnectedProcessusMap [protected] |
Definition at line 212 of file OMKSvm.h.
Referenced by getDisconnectedProcessusMap(), and relaxedSynchroniseReceiveAndProcessMessages().
NameToPointerMap<Process>* OMK::Svm::_processTable [protected] |
map containing all information about the different processes
Definition at line 286 of file OMKSvm.h.
Referenced by addNewSiteToSimulation(), broadcast(), connectToDistributedSimulation(), createDistributedSimulation(), disconnectFromDistributedSimulation(), getLinkToProcessNamed(), getProcessDescriptorNamed(), processReceivedMessages(), sendCurrentBuffersWithTag(), serveNameRequestsUntilEnd(), Svm(), synchroniseReceiveAndProcessMessages(), testIfNewProcessAdded(), timestampCurrentSendBuffers(), waitAndProcessMessages(), and waitForMessage().
std::vector<int> OMK::Svm::processSiteIds [protected] |
Definition at line 290 of file OMKSvm.h.
Referenced by addNewSiteToSimulation(), createDistributedSimulation(), and disconnectFromDistributedSimulation().
NameToPointerMap<Process>* OMK::Svm::_disconnectedTable [protected] |
Definition at line 292 of file OMKSvm.h.
Referenced by addToDisconnectedTable(), disconnectedProcess(), disconnectedTableIsEmpty(), getDisconnectedProcessusList(), reconnexionEstablished(), relaxedSynchroniseReceiveAndProcessMessages(), removeFromDisconnectedTable(), removeProcessFromActiveTable(), sendPingMessage(), Svm(), and testIfDisconnectedProcessNamed().
NameToPointerMap<Process>* OMK::Svm::_activeProcessTable [protected] |
Definition at line 294 of file OMKSvm.h.
Referenced by relaxedSynchroniseReceiveAndProcessMessages(), removeProcessFromActiveTable(), sendCurrentBuffersWithTag(), and Svm().
NameToPointerMap<Process>* OMK::Svm::_temporaryTable [protected] |
NameToPointerMap<Process> OMK::Svm::aRecevoir [protected] |
Definition at line 298 of file OMKSvm.h.
Referenced by relaxedSynchroniseReceiveAndProcessMessages(), Svm(), and synchroniseReceiveAndProcessMessages().
bool OMK::Svm::firstTime [protected] |
std::map<int,Process *> OMK::Svm::_idToProcessTable [protected] |
map storing correspondance between processId and Process
Definition at line 305 of file OMKSvm.h.
Referenced by addNewSiteToSimulation(), connectToDistributedSimulation(), createDistributedSimulation(), testIfNewProcessAdded(), and waitForAnswerToBlockingRequest().
int OMK::Svm::_siteId [protected] |
workstation identifier in the svm
Definition at line 309 of file OMKSvm.h.
Referenced by OMK::PvmSvm::getSiteId(), and OMK::PvmSvm::PvmSvm().
SvmLink* OMK::Svm::_linkToCentralSite [protected] |
the link to the central site
Definition at line 313 of file OMKSvm.h.
Referenced by connectToDistributedSimulation(), testIfNewProcessAdded(), and waitForMessage().
Name OMK::Svm::_siteName [protected] |
Name of the process associated to this site.
Definition at line 317 of file OMKSvm.h.
Referenced by broadcast(), connectToDistributedSimulation(), createDistributedSimulation(), getSiteName(), processReceivedMessages(), relaxedSynchroniseReceiveAndProcessMessages(), sendCurrentBuffersWithTag(), synchroniseReceiveAndProcessMessages(), timestampCurrentSendBuffers(), waitAndProcessMessages(), and waitForMessage().
int OMK::Svm::_numberOfSlaves [protected] |
number of slaves ( PvmControllers ) in the simulation
Definition at line 322 of file OMKSvm.h.
Referenced by OMK::PvmSvm::addNewWorkstation(), connectToDistributedSimulation(), syncDistributedSites(), synchronizeOn(), and testIfNewProcessAdded().
const Date OMK::Svm::_synchronisationLatency [protected] |
in millisecond, the authorized latency when synchronising all the sites
Definition at line 327 of file OMKSvm.h.
Referenced by getSynchronisationLatency(), relaxedSynchroniseReceiveAndProcessMessages(), and synchroniseReceiveAndProcessMessages().
const Date OMK::Svm::_synchronisationTimeOut [protected] |
in millisecond, the authorized timeOut for relaxed synchronizing
Definition at line 331 of file OMKSvm.h.
Referenced by relaxedSynchroniseReceiveAndProcessMessages().
const Date OMK::Svm::_deadReckoningInterval [protected] |
in int, the dead reckoning time step interval
Definition at line 335 of file OMKSvm.h.
Referenced by sendCurrentBuffersWithTag().
const bool OMK::Svm::_yieldNeeded [protected] |
in int, the dead reckoning time step interval
Definition at line 339 of file OMKSvm.h.
Referenced by relaxedSynchroniseReceiveAndProcessMessages(), and synchroniseReceiveAndProcessMessages().
std::string Svm::_slaveGroupName [static, protected] |
name of the group all the slave processes join
Definition at line 343 of file OMKSvm.h.
Referenced by connectToDistributedSimulation(), and syncDistributedSites().
std::string Svm::_masterSiteName [static, protected] |
reserved name of the master distributed site
Definition at line 347 of file OMKSvm.h.
Referenced by createDistributedSimulation().
Documentation generated on Mon Jun 9 11:46:03 2008 |
Generated with doxygen by Dimitri van Heesch , 1997-2007 |