OMK::PvmController Class Reference

Class defining a local controller for a pvm session. More...

#include <OMKPvmController.h>

Inheritance diagram for OMK::PvmController:

Inheritance graph
[legend]
Collaboration diagram for OMK::PvmController:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 PvmController (ObjectDescriptor &arbre, const Date &initialSimulationDate, int argc, char *argv[])
 Constructor The constructor initializes the distributed virtual machine.
virtual ~PvmController ()
 Destructor.
virtual void run ()
 redefine run so that it does nothing unless the controller belongs to a spawned process
virtual void advanceSimulatedDate ()
 redefine advanceSimulatedDate to include datation of synchronisation messages any try to send data between a call to computeNextSimualtionStep and advanceSimulatedDate will cause an assertion to fail
virtual void computeNextSimulationStep ()
 redefine computeNextSimulationStep to include the synchronisation primitives
virtual void finish ()
 redefine to avoid going into blocking mode when finishing the simulation
virtual const NamegetProcessName () const
 get the name of the process this controller is controlling
virtual SvmgetDistributedVirtualMachine ()
 get access to the siames distributed virtual machine
virtual void makeSynchronisationMessage (NameToPointerMap< ReferenceObjectHandle > &referenceObjects)
 make the synchronisation message to be sent to all processes
virtual void parseSynchronisationMessage (PvmIncomingMessage *message)
 parse the received synchronisation messages.
virtual void waitForAnswerToBlockingRequest (PvmMessage::MessageTag tag)
 waitForAnswerToBlockingRequest.
virtual void sendInitialValuesToMirror (PvmIncomingMessage &message)
 answer to a MirrorNeedsInitialValues message received by the distributed subsystem
virtual void init ()
 redefine for distributed initialisation
virtual std::vector< NamegetIsolatedMirrorList ()
virtual std::vector< NamegetNonIsolatedMirrorList ()
virtual bool testIfTroubleOccured ()
virtual void createNewSite (const Date &)
virtual std::list< NamegetDisconnectedProcessusList ()
virtual std::map< Name, intgetDisconnectedProcessusMap ()

Public Attributes

std::vector< Name_publicIsolatedMirrorObjectsVector
std::vector< Name_publicNonIsolatedMirrorObjectsVector

Protected Member Functions

virtual Date getPurgeDate ()
 redefine to take latency into account
virtual MirrorObjectHandlecreateMirrorObject (ObjectDescriptor *objectDescription)
 Create a miror object.
virtual DuplicatedObjectHandlenewOMKDuplicatedObjectHandle (SimulatedObject &object)
 create an object handle adapted to handling of duplicated objects
virtual LocalObjectHandlenewOMKLocalObjectHandle (SimulatedObject &object)
 CHADI.
virtual MirrorObjectHandlenewOMKMirrorObjectHandle (SimulatedObject &obj)
 create an object handle for objects mirrored by this controller
virtual ReferenceObjectHandlenewOMKReferenceObjectHandle (SimulatedObject &object, Controller &controller, SignalDispatcher *signalDispatcher)
 create a reference object handler appropriate for this controller
virtual int getOutputHistorySize (void)
 get the size of the history fifo needed when running a distributed simulation.
virtual ObjectHandleremoveObjectFromDataStructures (const Name &nom)
 gestion de la destruction distribuée : destruction des structures locales
void synchronizedReceiveAndProcessMessages (void)
void relaxedReceiveAndProcessMessages (void)

Protected Attributes

Svm_distributedVirtualMachine
 the distributed virtual machine used by the controller
bool _finishing
 boolean indicating the controller is going into shutdown mode.
void(PvmController::* receiveAndProcessMessages )(void)
 to manage the choice of the synchronization

Detailed Description

Class defining a local controller for a pvm session.

the local controller is in charge of

Definition at line 47 of file OMKPvmController.h.


Constructor & Destructor Documentation

PvmController::PvmController ( ObjectDescriptor arbre,
const Date initialSimulationDate,
int  argc,
char *  argv[] 
)

Constructor The constructor initializes the distributed virtual machine.

The created distributed virtual machine acts as a server if the process wasn't spawned. Other wise, the controller acts a a local controller synchronized throught the distributed virtual machine to the other controllers

Definition at line 52 of file OMKPvmController.cxx.

References OMK::Controller::_computeStatePointer, _distributedVirtualMachine, OMK::SimulatedObject::_objectHandle, OMK::DistributedController::_processName, OMK::Controller::_simulationTree, OMK::NameToPointerMap< ObjectType >::addObjectWithIndex(), OMK::DuplicatedObjectHandle::addProcessOfDuplicate(), OMK::ParametersAccessor::get(), OMK::ConfigurationParameterDescriptor::getAssociatedString(), OMK::ObjectDescriptor::getDescendants(), OMK::Process::getFrequency(), OMK::Process::getMaxFrequencyOfHostedObjects(), OMK::SimulatedObject::getName(), OMK::MultipleConfigurationParameter::getNameOfSubDescriptor(), OMK::MultipleConfigurationParameter::getNumberOfSubItems(), OMK::NameToPointerMap< ObjectType >::getObjectOfIndex(), OMK::Controller::getSchedulingParametersOfObject(), OMK::Svm::getSiteName(), OMK::ConfigurationParameterDescriptor::getSubDescriptorByName(), OMK::MultipleConfigurationParameter::getSubDescriptorByName(), OMK::MultipleConfigurationParameter::getSubDescriptorByPosition(), OMK::Svm::init(), OMK::ReferenceObjectHandle::initial, OMK::Controller::lcm(), OMK::SystemEventIdentifier::MaskObjectDestroyed, newOMKDuplicatedObjectHandle(), receiveAndProcessMessages, OMK::SimulatedObject::registerForSignalBy(), relaxedReceiveAndProcessMessages(), OMK::Process::setFrequency(), OMK::Process::setMaxHostedFrequency(), OMK::Controller::setProcessOfDescriptor(), and synchronizedReceiveAndProcessMessages().

00056                                     : DistributedController (initialObjects, initialDate),
00057                                     _finishing ( false ) 
00058 {
00059 
00060   Date latence = 20 ;
00061   Date timeOut = 10 ;
00062   std::string synchro ("strong") ;
00063   int deadReckoning = 70 ;
00064   bool yieldNeeded = false ;
00065   NameToPointerMap<Process> * processTable = new NameToPointerMap<Process>() ;
00066   receiveAndProcessMessages = &PvmController::synchronizedReceiveAndProcessMessages ;
00067 
00068   MultipleConfigurationParameter * schedulingParameters = getSchedulingParametersOfObject( initialObjects ) ;
00069   if ( schedulingParameters != NULL)
00070   {
00071     ConfigurationParameterDescriptor * param = schedulingParameters->getSubDescriptorByName("Latency") ;
00072     if (param != NULL)
00073     {
00074       latence = atoi (param->getAssociatedString().c_str()) ;
00075     }
00076     std::cerr << "Latency : " << latence << std::endl ;
00077 
00078     synchro = "strong" ;
00079     if( ParametersAccessor::get( schedulingParameters, "Synchronization", synchro ) 
00080       && ( synchro == "relaxed" ) )
00081     {
00082       receiveAndProcessMessages = &PvmController::relaxedReceiveAndProcessMessages ;
00083     }
00084     std::cerr << "Synchronization : " << synchro << std::endl ;
00085 
00086     ParametersAccessor::get( schedulingParameters, "YieldNeeded", yieldNeeded ) ;
00087     std::cerr << "YieldNeeded : " << yieldNeeded << std::endl ;
00088 
00089     const ConfigurationParameterDescriptor * timeOutParamDescriptor = 
00090       schedulingParameters->getSubDescriptorByName ("TimeOut") ;
00091     if (timeOutParamDescriptor != NULL) {
00092       timeOut = atoi (timeOutParamDescriptor->getAssociatedString().c_str()) ;
00093     }
00094     std::cerr << "TimeOut : " << timeOut << std::endl ;
00095 
00096     const ConfigurationParameterDescriptor * deadReckoningParamDescriptor = 
00097       schedulingParameters->getSubDescriptorByName ("DeadReckoningStepInterval") ;
00098     if (deadReckoningParamDescriptor != NULL) {
00099       deadReckoning = atoi (deadReckoningParamDescriptor->getAssociatedString().c_str()) ;
00100     }
00101     std::cerr << "DeadReckoningStepInterval : " << deadReckoning << std::endl ;
00102 
00103     param = schedulingParameters->getSubDescriptorByName("Machines") ;
00104     MultipleConfigurationParameter * describedMachines = dynamic_cast<MultipleConfigurationParameter *>(param);
00105     if (describedMachines != NULL)
00106     {
00107       int numberOfMachines = describedMachines->getNumberOfSubItems () ;
00108       for (int machineNumber = 0 ;
00109         machineNumber != numberOfMachines ;
00110         ++machineNumber )
00111       {
00112         param = describedMachines->getSubDescriptorByPosition ( machineNumber ) ;
00113         processTable->addObjectWithIndex ( describedMachines->getNameOfSubDescriptor (machineNumber),
00114           new Process ( describedMachines->getNameOfSubDescriptor (machineNumber),
00115           param->getAssociatedString() )
00116           ) ;
00117       }
00118     }
00119   }
00120 
00121   // virtual member functions aren't initiallised in constructor
00122   // therefore, recreate the correct objectHandle
00123   delete _objectHandle ;
00124 
00125   _objectHandle = NULL ;
00126 
00127   DuplicatedObjectHandle * localObjectHandle = newOMKDuplicatedObjectHandle( *this ) ;
00128   _objectHandle = localObjectHandle ;
00129   // TDTD ajout pour fixer bug mémoire : comme le _objectHandle a été effacé, ça ne marche plus...
00130   _computeStatePointer = const_cast<ReferenceObjectHandle::SimulatedObjectComputingState *>(&(dynamic_cast<DuplicatedObjectHandle *>(_objectHandle)->getComputingState () )) ;
00131 
00132   for (NameToPointerMap<Process>::const_iterator i = processTable->begin() ;
00133     i != processTable->end () ;
00134     ++i) 
00135   {
00136     localObjectHandle->addProcessOfDuplicate ( (*i).second ) ;
00137   }
00138 
00139 
00140   _distributedVirtualMachine = new PvmSvm (processTable, latence, timeOut, deadReckoning, yieldNeeded, argc, argv) ;
00141 
00142   _distributedVirtualMachine->init (initialDate ) ;
00143 
00144   _processName = _distributedVirtualMachine->getSiteName () ;
00145 
00146   setProcessOfDescriptor ( initialObjects, _processName ) ;
00147 
00148   cout << "PvmController::PvmController (...)" 
00149     << "of process: " << _distributedVirtualMachine->getSiteName () << endl ;
00150 
00151   // initialisation de l'état du controleur
00152   // TDTD suite ajout pour fixer bug mémoire : comme le _objectHandle a été effacé, ça ne marche plus...
00153   assert ( _computeStatePointer != NULL ) ;
00154   *_computeStatePointer = ReferenceObjectHandle::initial ;
00155 
00156 
00157   //rewritten 2001/12/5 by D. Margery
00158   list < ObjectDescriptor *> * descriptorList = _simulationTree.getDescendants() ;
00159   for (list < ObjectDescriptor *>::iterator descriptorIterator = descriptorList->begin() ;
00160     descriptorIterator != descriptorList->end() ;
00161     ++ descriptorIterator )
00162   {
00163     Frequency currentFreq = (*descriptorIterator)->getFrequency() ;
00164     Name currentProcess = (*descriptorIterator)->getProcess() ;
00165     Process * currentProcessDescriptor = processTable->getObjectOfIndex( currentProcess ) ;
00166     if ( currentProcessDescriptor != NULL )
00167       // the process could have been removed by the svm
00168     {
00169       if ( currentFreq > currentProcessDescriptor->getMaxFrequencyOfHostedObjects () ) 
00170       {
00171         currentProcessDescriptor->setMaxHostedFrequency (currentFreq) ;
00172       }
00173       currentProcessDescriptor->setFrequency ( Controller::lcm(currentFreq, currentProcessDescriptor->getFrequency() ) ) ;
00174     }
00175   }
00176   delete descriptorList ;
00177 
00178   //subscribe to system signals to receive notification on what is happening on other controllers
00179   registerForSignalBy(SystemEventIdentifier::MaskObjectDestroyed, getName()) ;
00180 
00181 }

PvmController::~PvmController (  )  [virtual]

Destructor.

Definition at line 185 of file OMKPvmController.cxx.

References _distributedVirtualMachine, OMK::Svm::disconnectFromDistributedSimulation(), and OMK::Controller::getSimulatedDate().

00186 {
00187   // deconnexion de la machine virtuelle
00188 #ifdef _DEBUGEXECPVM
00189   cout << "PvmController::~PvmController : " << endl;
00190 #endif
00191   _distributedVirtualMachine->disconnectFromDistributedSimulation ( getSimulatedDate () ) ;
00192   delete _distributedVirtualMachine ;   
00193 }


Member Function Documentation

void PvmController::run (  )  [virtual]

redefine run so that it does nothing unless the controller belongs to a spawned process

Reimplemented from OMK::Controller.

Definition at line 198 of file OMKPvmController.cxx.

References OMK::Controller::_date, _distributedVirtualMachine, OMK::SimulatedObject::_objectHandle, OMK::Controller::_referenceObjectsMap, OMK::Controller::_stepPeriod, advanceSimulatedDate(), OMK::Svm::getSynchronisationLatency(), makeSynchronisationMessage(), OMK::ReferenceObjectHandle::processEvents(), OMK::Svm::processReceivedMessages(), OMK::Controller::run(), OMK::Svm::sendCurrentBuffersWithTag(), and OMK::PvmMessage::SynchronisationMessage.

00199 {
00200   if ( pvm_parent() != PvmNoParent )
00201   {
00202     DistributedController::run() ;
00203 
00204     // the simulation is locally finished, so notify all the controllers before exiting 
00205     Date lastDate = _date + 2 * _stepPeriod + _distributedVirtualMachine->getSynchronisationLatency() ;
00206 
00207     while (_date < lastDate )
00208     {
00209 #ifdef _DEBUGPVMMESS
00210       cerr<<"PvmController::run(): trying a clean exit"<<endl;
00211 #endif
00212       ReferenceObjectHandle * objectHandle = dynamic_cast<ReferenceObjectHandle *> ( _objectHandle ) ;
00213 
00214       assert ( objectHandle != NULL ) ;
00215 
00216       //the controller has to process residual system events adressed to itself
00217       objectHandle->processEvents() ;
00218 
00219       //send enought empty signals to enable other controllers to finish their job
00220       makeSynchronisationMessage( _referenceObjectsMap ) ;
00221 
00222       // send the synchronisation messages
00223       _distributedVirtualMachine->sendCurrentBuffersWithTag (PvmMessage::SynchronisationMessage) ; 
00224 
00225       //wait a little, to try and get clean exit
00226 #ifdef _MSC_VER
00227       Sleep( 1000 ) ;
00228 #else
00229       sleep(1) ;
00230 #endif
00231 
00232       // for cleanup, read any arrived messages
00233       _distributedVirtualMachine->processReceivedMessages (*this, PvmMessage::SynchronisationMessage) ;
00234 
00235       advanceSimulatedDate() ;
00236     } 
00237   }
00238 
00239 }

void PvmController::advanceSimulatedDate (  )  [virtual]

redefine advanceSimulatedDate to include datation of synchronisation messages any try to send data between a call to computeNextSimualtionStep and advanceSimulatedDate will cause an assertion to fail

Reimplemented from OMK::Controller.

Definition at line 262 of file OMKPvmController.cxx.

References OMK::Controller::_date, _distributedVirtualMachine, OMK::Controller::advanceSimulatedDate(), OMK::Controller::initialSimulationDate, OMK::Svm::sendCurrentBuffersWithTag(), OMK::PvmMessage::SynchronisationMessage, and OMK::Svm::timestampCurrentSendBuffers().

Referenced by run().

00263 {
00264 #ifdef _DEBUGEXECPVM
00265   cerr << "PvmController::advanceSimulatedDate () from " <<_date<<endl;
00266 #endif
00267   //send any messages collected during init
00268   if (_date < initialSimulationDate )
00269   {
00270     _distributedVirtualMachine->sendCurrentBuffersWithTag (PvmMessage::SynchronisationMessage) ;  
00271   }
00272 
00273   DistributedController::advanceSimulatedDate () ;
00274 
00275   // prepare buffers to receive any message generate during this simulation step
00276   _distributedVirtualMachine->timestampCurrentSendBuffers (_date) ;   
00277 }

void PvmController::computeNextSimulationStep (  )  [virtual]

redefine computeNextSimulationStep to include the synchronisation primitives

Reimplemented from OMK::Controller.

Definition at line 304 of file OMKPvmController.cxx.

References _distributedVirtualMachine, OMK::Controller::_referenceObjectsMap, OMK::Controller::computeNextSimulationStep(), makeSynchronisationMessage(), receiveAndProcessMessages, OMK::Svm::sendCurrentBuffersWithTag(), and OMK::PvmMessage::SynchronisationMessage.

00304                                                {
00305 
00306 #ifdef _DEBUGEXECPVM
00307   cerr << "PvmController::computeNextSimulation : computing yet another simulation step for " 
00308     <<notreProcessus()<< endl;
00309 #endif
00310 
00311   // compute the simulation step
00312   DistributedController::computeNextSimulationStep () ;
00313 
00314   // compute the synchronisation messages
00315   makeSynchronisationMessage ( _referenceObjectsMap ) ;
00316 
00317   // send the synchronisation messages
00318   _distributedVirtualMachine->sendCurrentBuffersWithTag (PvmMessage::SynchronisationMessage) ;  
00319 
00320   // TDTD : pour un fonctionnement en synchro ou relaxed
00321   (this->*receiveAndProcessMessages) () ;
00322   //synchronizedReceiveAndProcessMessages () ;
00323   // TDTD : en cas de desynchro, il faut régler le pb du flush des messages inutiles... ::: à peu près réalisé...
00324 
00325   //added by chadi to test if a new process has been added dynamically
00326   // TDTD : à remettre plus tard
00327   //_distributedVirtualMachine->testIfNewProcessAdded () ;
00328   /*
00329   if (testIfTroubleOccured())
00330   {
00331   bool tico = _distributedVirtualMachine->reconnexionEstablished() ;
00332   }
00333   */
00334 
00335   //    {
00336   //     cout<<"bien bien bien"<<endl;
00337   // makeSynchronisationMessage ( _referenceObjectsMap ) ;       
00338   // _distributedVirtualMachine->sendCurrentBuffersWithTag (PvmMessage::SynchronisationMessage) ;  
00339   //   }
00340 
00341   //  if (testIfTroubleOccured())
00342   //        {
00343   //     _distributedVirtualMachine->sendPingMessage() ;
00344   //        }
00345 
00346 
00347 
00348   //  if ( testIfTroubleOccured() )
00349   //        {
00350   //     NameToPointerMap<ReferenceObjectHandle>::iterator pTabRef ;
00351   //     for (pTabRef = _referenceObjectsMap.begin () ;
00352   //          pTabRef != _referenceObjectsMap.end () ;
00353   //          pTabRef ++) 
00354   //       {
00355 
00356   //         PvmReferenceObjectHandle * pvmObjectHandle = dynamic_cast<PvmReferenceObjectHandle *> ( (*pTabRef).second ) ; 
00357   //         if ( pvmObjectHandle != NULL )
00358   //           {
00359   //             pvmObjectHandle->flushDisconnectedSitesOutgoingBuffer() ;
00360   //           }
00361 
00362   //       }
00363 
00364   //        }
00365 
00366 }

void PvmController::finish (  )  [virtual]

redefine to avoid going into blocking mode when finishing the simulation

Reimplemented from OMK::Controller.

Definition at line 793 of file OMKPvmController.cxx.

References _finishing, and OMK::Controller::finish().

00794 {
00795   _finishing = true ;
00796   DistributedController::finish() ;
00797 }

const Name & PvmController::getProcessName (  )  const [virtual]

get the name of the process this controller is controlling

Definition at line 256 of file OMKPvmController.cxx.

References _distributedVirtualMachine, and OMK::Svm::getSiteName().

Referenced by OMK::PvmMirrorObjectHandle::registerToReferenceObject().

00257 {
00258   return (_distributedVirtualMachine->getSiteName ()) ;
00259 }

Svm * PvmController::getDistributedVirtualMachine (  )  [virtual]

get access to the siames distributed virtual machine

Definition at line 710 of file OMKPvmController.cxx.

References _distributedVirtualMachine.

Referenced by sendInitialValuesToMirror().

00711 {
00712   return _distributedVirtualMachine;
00713 }

void PvmController::makeSynchronisationMessage ( NameToPointerMap< ReferenceObjectHandle > &  referenceObjects  )  [virtual]

make the synchronisation message to be sent to all processes

Definition at line 382 of file OMKPvmController.cxx.

References OMK::Controller::_date, and OMK::PvmReferenceObjectHandle::makeSynchronisationMessage().

Referenced by computeNextSimulationStep(), and run().

00383 {
00384   static Name endMessage ("_OpenMASKEndOfSynchronisationMessage") ;
00385 
00386 #ifdef _DEBUGPVMMESS
00387   cerr<<"PvmController::makeSynchronisationMessage for "<<referenceObjects.size() <<" objects "<<endl;
00388 #endif
00389 
00390   NameToPointerMap<ReferenceObjectHandle>::iterator pTabRef ;
00391   // ask all referentials to contribute to the synchronisation message
00392   for (pTabRef = referenceObjects.begin () ;
00393     pTabRef != referenceObjects.end () ;
00394     pTabRef ++) 
00395   {
00396 #ifdef _DEBUGPVMMESS
00397     cerr<<"PvmController::makeSynchronisationMessage for object " <<(*pTabRef).second->getSimulatedObject().getName()<<endl;
00398 #endif
00399 
00400     PvmReferenceObjectHandle * pvmObjectHandle = dynamic_cast<PvmReferenceObjectHandle *> ( (*pTabRef).second ) ; 
00401     if ( pvmObjectHandle != NULL )
00402     {
00403       pvmObjectHandle->makeSynchronisationMessage ( _date ) ;
00404     }
00405     else
00406     {
00407 #ifdef _DEBUGPVMMESS
00408       cerr<<"No synchronisation message for "<<(*pTabRef).second->getSimulatedObject().getName()<<endl;
00409 #endif      
00410     }
00411 #ifdef _DEBUGPVMMESS
00412     cerr<<"PvmController::makeSynchronisationMessage of " <<(*pTabRef).second->getSimulatedObject().getName()<<" done"<<endl;
00413 #endif
00414   } 
00415 #ifdef _DEBUGPVMMESS
00416   cerr<<"PvmController::makeSynchronisationMessage done"<<endl;
00417 #endif
00418 }

void PvmController::parseSynchronisationMessage ( PvmIncomingMessage message  )  [virtual]

parse the received synchronisation messages.

this member function is called by the siames virtual machine when it receives synchronisation messages from distant workstations

Parameters:
message the message to parse

Definition at line 423 of file OMKPvmController.cxx.

References OMK::DistributedController::_mirrorObjectsMap, OMK::Controller::_referenceObjectsMap, OMK::EventCreator::createEvent(), OMK::SynchronisationMessage::endOfSynchronisationFragment, OMK::Controller::error(), OMK::EventReceived, OMK::Controller::getSimulatedDate(), OMK::Registration, OMK::Event::unpack(), OMK::SimulatedObject::unpack(), OMK::Flowable::unpack(), OMK::MirrorObjectHandle::unpack(), and OMK::Name::unpack().

Referenced by OMK::Svm::processReceivedMessages(), OMK::Svm::relaxedSynchroniseReceiveAndProcessMessages(), OMK::Svm::synchroniseReceiveAndProcessMessages(), OMK::Svm::waitAndProcessMessages(), OMK::Svm::waitForAnswerToBlockingRequest(), OMK::Svm::waitForMessage(), and OMK::Svm::waitForMessageFrom().

00424 {
00425 
00426   ReferenceObjectHandle * refer ; // Referentiel de l'objet destinataire
00427   MirrorObjectHandle * miroir ; // Miroir de l'objet destinataire
00428   Name nomObj ; // Nom du miroir destinataire
00429   bool stillMessagesToParse = true ;
00430 
00431 #ifdef _DEBUGPVMMESS
00432   cerr << "PvmController::parseSynchronisationMessage at "<<getSimulatedDate()<<":"<< endl ;
00433 #endif
00434 
00435 
00436   while ( stillMessagesToParse ) 
00437   {
00438     // get the message recepient
00439     nomObj.unpack ( *message ) ;
00440 
00441 
00442 #ifdef _DEBUGPVMMESS
00443     cerr << "PvmController::parseSynchronisationMessage: message received for " << nomObj << endl;
00444 #endif
00445     if ( nomObj == SynchronisationMessage::endOfSynchronisationFragment )
00446       // the synchronisation message has been completely parsed
00447     {
00448       stillMessagesToParse = false ;
00449     }
00450     else 
00451     {
00452       MirrorObjectsContainerType::iterator i = _mirrorObjectsMap.find ( nomObj ) ;
00453       if (i != _mirrorObjectsMap.end ()) 
00454       {
00455         // On lui transmet le message
00456 #ifdef _DEBUGPVMMESS
00457         cout << "PvmController::parseSynchronisationMessage: for miror" << endl ;
00458 #endif
00459         miroir = i->second ;
00460         miroir->unpack ( *message ) ;
00461       } 
00462       else if (_referenceObjectsMap.find (nomObj) != _referenceObjectsMap.end ()) 
00463       {
00464         // On lui transmet le message
00465 #ifdef _DEBUGPVMMESS
00466         std::cout << "PvmController::parseSynchronisationMessage : for referential : " << nomObj <<std::endl ;
00467         //std::cout << "message : " << *message << std::endl ;
00468 #endif
00469         refer = dynamic_cast<PvmReferenceObjectHandle *>(_referenceObjectsMap.getObjectOfIndex (nomObj) ) ;
00470         if ( refer != NULL )
00471         {
00472           refer->unpack (*message)  ;
00473         }
00474         else
00475         {
00476           dynamic_cast<PvmDuplicatedObjectHandle *>(_referenceObjectsMap.getObjectOfIndex (nomObj) )->unpack ( *message ) ;
00477         }
00478       } 
00479       else if (nomObj == "") 
00480       {
00481 #ifdef _DEBUGPVMMESS
00482         cout << "PvmController::parseSynchronisationMessage :CL : message vide d'abonnement ... pour synchro" << endl ;
00483 #endif
00484       } 
00485       else 
00486       {
00487 #ifdef _DEBUGPVMMESS
00488         cerr << "PvmController::parseSynchronisationMessage: "<< endl ;
00489         cerr << "for unknown object: <" << nomObj << "> (was probably destroyed)" << endl ;
00490         error ("PvmController::parseSynchronisationMessage : Objet ni miroir ni referentiel") ;
00491 #endif
00492         //probably a request for an object that was destroyed
00493         int typeMess ;
00494         *message >> typeMess ;
00495         // hope it is a registration for a just destroyed referential, and interprete it
00496         if ( typeMess == Registration )
00497         {
00498           Name processName ;
00499           *message >> processName ;     
00500         }
00501         else if ( typeMess == EventReceived )
00502         {
00503           Event * event ;
00504           Name classToCreate ;
00505           EventIdentifier eventId;
00506           Date eventDate;
00507           Name sender, receiver ;
00508           *message >> classToCreate >> eventId>> eventDate>>sender>>receiver ;
00509           event = EventCreator::createEvent (classToCreate, eventId, eventDate, sender, receiver ) ;
00510           event->unpack ( *message ) ;
00511           //cerr << "PvmController::parseSynchronisationMessage event "<<*event<<" undelivered"<<endl;
00512           delete event ;
00513         }
00514       }
00515     }
00516   }
00517 #ifdef _DEBUGPVMMESS
00518   cerr << "PvmController::parseSynchronisationMessage : message parsed"<< endl ;
00519 #endif
00520 }

void PvmController::waitForAnswerToBlockingRequest ( PvmMessage::MessageTag  tag  )  [virtual]

waitForAnswerToBlockingRequest.

make the controller fall bake in a server mode until message tag arrives and is succesfully processed

Definition at line 784 of file OMKPvmController.cxx.

References _distributedVirtualMachine, _finishing, and OMK::Svm::waitForAnswerToBlockingRequest().

Referenced by OMK::PvmMirrorObjectHandle::registerToReferenceObject().

00785 {
00786   if ( ! _finishing )
00787     // avoid blocking the controller while terminating, because other controllers could have exited
00788   {
00789     _distributedVirtualMachine->waitForAnswerToBlockingRequest ( *this, tag ) ;      
00790   }
00791 }

void PvmController::sendInitialValuesToMirror ( PvmIncomingMessage message  )  [virtual]

answer to a MirrorNeedsInitialValues message received by the distributed subsystem

Definition at line 718 of file OMKPvmController.cxx.

References OMK::Controller::_date, OMK::Controller::_deletedObjectHandles, _finishing, OMK::Controller::_referenceObjectsMap, OMK::SynchronisationMessage::endOfSynchronisationFragment, getDistributedVirtualMachine(), OMK::Svm::getLinkToProcessNamed(), OMK::SvmLink::getTID(), OMK::PvmMessage::InitialValuesForMirror, OMK::PvmOutgoingMessage::insertTimeStamp(), OMK::Name::pack(), OMK::PvmReferenceObjectHandle::packInitialValues(), OMK::PvmUnicastMessage::send(), and OMK::Name::unpack().

Referenced by OMK::Svm::relaxedSynchroniseReceiveAndProcessMessages(), OMK::Svm::synchroniseReceiveAndProcessMessages(), and OMK::Svm::waitForAnswerToBlockingRequest().

00719 {
00720   if ( ! _finishing )
00721   {
00722     // first find out the name of the objects whose initial values are needed
00723     Name referentialName ;
00724     referentialName.unpack ( message ) ;
00725 
00726 
00727 #ifdef _DEBUGEXECPVM
00728     cerr<<"Looking for initial values of "<<referentialName<<endl;
00729 #endif
00730 
00731     //then find the corresponding referential
00732     NameToPointerMap<ReferenceObjectHandle>::iterator i = _referenceObjectsMap.find ( referentialName ) ;
00733 
00734     PvmReferenceObjectHandle * referential = NULL ;
00735     if ( i != _referenceObjectsMap.end() ) 
00736     {
00737       referential = dynamic_cast<PvmReferenceObjectHandle *> (i->second) ;
00738     }
00739     else
00740     {
00741       //the referential might have been destroyed: look for it in the list of deleted object handles
00742       list <pair <Date, ObjectHandle *> >::iterator i = _deletedObjectHandles.begin() ;
00743       while ( i != _deletedObjectHandles.end() )
00744       {
00745         if ( i->second->getSimulatedObject().getName() == referentialName )
00746         {
00747           referential = dynamic_cast<PvmReferenceObjectHandle *> ( i->second );
00748           i = _deletedObjectHandles.end() ;
00749         }
00750         else
00751         {
00752           ++ i ;
00753         }
00754       }
00755     }
00756 
00757     assert ( referential != NULL ) ;
00758 
00759 
00760     //find the name of the requesting process
00761     Name mirrorProcessName ;
00762     mirrorProcessName.unpack( message ) ;
00763 
00764     //then find the link to the corresponding process
00765     SvmLink * process = getDistributedVirtualMachine()->getLinkToProcessNamed ( mirrorProcessName ) ;
00766 
00767     assert ( process != NULL ) ;
00768 
00769     //build the answer
00770     PvmUnicastMessage * urgentAnswer = new PvmUnicastMessage ( process->getTID() ) ;
00771 
00772     urgentAnswer->insertTimeStamp( _date ) ;
00773 
00774     referential->packInitialValues ( * urgentAnswer ) ;
00775 
00776     SynchronisationMessage::endOfSynchronisationFragment.pack ( * urgentAnswer ) ;  
00777 
00778     // and send it
00779     urgentAnswer->send ( PvmMessage::InitialValuesForMirror ) ;
00780   }
00781 }

void PvmController::init (  )  [virtual]

redefine for distributed initialisation

Reimplemented from OMK::Controller.

Definition at line 529 of file OMKPvmController.cxx.

References OMK::Controller::_date, _distributedVirtualMachine, OMK::Controller::init(), OMK::PvmMessage::LocalInitSuccessfull, OMK::Svm::sendCurrentBuffersWithTag(), OMK::PvmMessage::SynchronisationMessage, OMK::Svm::synchronizeOn(), OMK::DistributedController::tableDesNonInitialises, and OMK::Svm::timestampCurrentSendBuffers().

00530 {
00531 
00532   if ( pvm_parent() != PvmNoParent )
00533   {
00534     //on tente l'initialisation normale : pour chaque init, cette initialisation appelle initObjetReferentiel 
00535 #ifdef _DEBUGDISTRIBUTEDINIT
00536     cerr<<"PvmController::init preparing the outgoing message buffers "<<endl;
00537 #endif   
00538     _distributedVirtualMachine->timestampCurrentSendBuffers ( _date ) ;
00539 
00540     // try a classic initialisation
00541     Controller::init () ;
00542 
00543     //initialisation shouldn't fail
00544     assert ( tableDesNonInitialises.empty () ) ;
00545 
00546     //here, as advanceSimulatedDate has been called to restore the initial simulation date, no need to timestamp messages
00547 
00548 #ifdef _DEBUGDISTRIBUTEDINIT
00549     cerr<<"PvmController::init : classic initialisation tried"<<endl;
00550 #endif
00551     // send all registrations
00552     _distributedVirtualMachine->sendCurrentBuffersWithTag (PvmMessage::SynchronisationMessage) ;
00553 
00554     // broadcast a empty message signalling our initialisation is finished, and wait for all processes
00555     _distributedVirtualMachine->synchronizeOn ( *this, PvmMessage::LocalInitSuccessfull ) ;
00556 
00557 
00558     //restore state to the supposed state resulting of a call to the ancestor init
00559     _distributedVirtualMachine->timestampCurrentSendBuffers (_date) ;
00560   }
00561 
00562 }

vector< Name > PvmController::getIsolatedMirrorList (  )  [virtual]

Reimplemented from OMK::DistributedController.

Definition at line 631 of file OMKPvmController.cxx.

References _distributedVirtualMachine, OMK::DistributedController::_mirrorObjectsMap, _publicIsolatedMirrorObjectsVector, and OMK::Svm::disconnectedProcess().

00632 {
00633 
00634   for ( MirrorObjectsContainerType::iterator i = _mirrorObjectsMap.begin(); i!= _mirrorObjectsMap.end(); i++ )
00635   {
00636     Name referentialProcess = i->second->getSimulatedObject().getObjectDescriptor().getProcess();
00637     if ( _distributedVirtualMachine->disconnectedProcess( referentialProcess ) )
00638     {
00639       _publicIsolatedMirrorObjectsVector.push_back( i->first) ;
00640     }
00641   }
00642 
00643   return  _publicIsolatedMirrorObjectsVector;
00644 }

vector< Name > PvmController::getNonIsolatedMirrorList (  )  [virtual]

Reimplemented from OMK::DistributedController.

Definition at line 646 of file OMKPvmController.cxx.

References _distributedVirtualMachine, OMK::DistributedController::_mirrorObjectsMap, _publicIsolatedMirrorObjectsVector, _publicNonIsolatedMirrorObjectsVector, OMK::Svm::disconnectedProcess(), and OMK::Svm::disconnectedTableIsEmpty().

00647 {
00648 
00649   if (_distributedVirtualMachine->disconnectedTableIsEmpty())
00650   {
00651     _publicIsolatedMirrorObjectsVector.clear();
00652     return _publicIsolatedMirrorObjectsVector;
00653   }
00654   else
00655   {
00656 
00657     for ( MirrorObjectsContainerType::iterator i = _mirrorObjectsMap.begin(); i!= _mirrorObjectsMap.end(); i++ )
00658     {
00659       Name referentialProcess = i->second->getSimulatedObject().getObjectDescriptor().getProcess();
00660       if ( _distributedVirtualMachine->disconnectedProcess( referentialProcess ) )
00661       {
00662 
00663       }
00664       else
00665       {
00666         _publicNonIsolatedMirrorObjectsVector.push_back( i->first) ;
00667       }
00668     }
00669 
00670     return  _publicNonIsolatedMirrorObjectsVector;
00671   }
00672 }

bool PvmController::testIfTroubleOccured (  )  [virtual]

Reimplemented from OMK::DistributedController.

Definition at line 675 of file OMKPvmController.cxx.

References _distributedVirtualMachine, and OMK::Svm::disconnectedTableIsEmpty().

00676 {
00677   //  if (_distributedVirtualMachine->getTroubleFlag() )
00678   if (_distributedVirtualMachine->disconnectedTableIsEmpty() )
00679   {
00680     return false;
00681   }
00682   else
00683     return true ;
00684   // dans le cas ou il y a plusieurs sites > 2 , si 2 sites sont deconnectes puis un des 2 s'est retabli alors
00685   // le troubleFlag est Faux par contre on a tjrs un site deconnecte donc le trouble existe encore ....
00686   /*  else if (_distributedVirtualMachine->disconnectedTableIsEmpty() )
00687   {
00688   return false;
00689   }
00690   return true;
00691   */
00692 }

void PvmController::createNewSite ( const Date  )  [virtual]

Reimplemented from OMK::DistributedController.

Definition at line 694 of file OMKPvmController.cxx.

References _distributedVirtualMachine, and OMK::Svm::addNewSiteRequest().

00695 {
00696   _distributedVirtualMachine->addNewSiteRequest (date);
00697 }

std::list< Name > PvmController::getDisconnectedProcessusList (  )  [virtual]

Reimplemented from OMK::Controller.

Definition at line 808 of file OMKPvmController.cxx.

References _distributedVirtualMachine, and OMK::Svm::getDisconnectedProcessusList().

00808                                                            {
00809   return _distributedVirtualMachine->getDisconnectedProcessusList () ;
00810 }

std::map< Name, int > PvmController::getDisconnectedProcessusMap (  )  [virtual]

Reimplemented from OMK::Controller.

Definition at line 812 of file OMKPvmController.cxx.

References _distributedVirtualMachine, and OMK::Svm::getDisconnectedProcessusMap().

00812                                                               {
00813   return _distributedVirtualMachine->getDisconnectedProcessusMap () ;
00814 }

Date PvmController::getPurgeDate (  )  [protected, virtual]

redefine to take latency into account

Reimplemented from OMK::Controller.

Definition at line 800 of file OMKPvmController.cxx.

References _distributedVirtualMachine, OMK::Controller::getPurgeDate(), and OMK::Svm::getSynchronisationLatency().

00801 {
00802   //in the worst case we could reveive message from an object whose simulation date is current date - _latency -stepPeriod ;
00803   // make sure by doubling latency
00804   return DistributedController::getPurgeDate() - 2 * _distributedVirtualMachine->getSynchronisationLatency() ;
00805 }

MirrorObjectHandle * PvmController::createMirrorObject ( ObjectDescriptor objectDescription  )  [protected, virtual]

Create a miror object.

Reimplemented from OMK::DistributedController.

Definition at line 243 of file OMKPvmController.cxx.

References OMK::DistributedController::createMirrorObject(), and OMK::MirrorObjectHandle::registerToReferenceObject().

00244 {
00245   MirrorObjectHandle * mirror =  DistributedController::createMirrorObject( objectDescription ) ;
00246 
00247   // Register the mirror to the reference object for regular updates of outputs
00248   mirror->registerToReferenceObject (true) ;
00249 
00250   return mirror ;
00251 }

DuplicatedObjectHandle * PvmController::newOMKDuplicatedObjectHandle ( SimulatedObject object  )  [protected, virtual]

create an object handle adapted to handling of duplicated objects

Parameters:
object : the object the object handle will have to handle : the adapted duplicated object handle

Implements OMK::DistributedController.

Definition at line 618 of file OMKPvmController.cxx.

Referenced by PvmController().

00619 {
00620   return new PvmDuplicatedObjectHandle( obj, *this );
00621 }

LocalObjectHandle * PvmController::newOMKLocalObjectHandle ( SimulatedObject object  )  [protected, virtual]

CHADI.

Implements OMK::DistributedController.

Definition at line 625 of file OMKPvmController.cxx.

00626 {
00627   return new PvmLocalObjectHandle( obj, *this );
00628 }

MirrorObjectHandle * PvmController::newOMKMirrorObjectHandle ( SimulatedObject obj  )  [protected, virtual]

create an object handle for objects mirrored by this controller

Parameters:
object : the object the object handle will have to handle : the mirror object handle

Implements OMK::DistributedController.

Definition at line 612 of file OMKPvmController.cxx.

00613 {
00614   return new PvmMirrorObjectHandle(obj);
00615 }

ReferenceObjectHandle * PvmController::newOMKReferenceObjectHandle ( SimulatedObject object,
Controller controller,
SignalDispatcher signalDispatcher 
) [protected, virtual]

create a reference object handler appropriate for this controller

Parameters:
obj the object to handle the reference object created

Reimplemented from OMK::Controller.

Definition at line 703 of file OMKPvmController.cxx.

00706 {
00707   return new PvmReferenceObjectHandle(object, controller, signalDispatcher );
00708 }

int PvmController::getOutputHistorySize ( void   )  [protected, virtual]

get the size of the history fifo needed when running a distributed simulation.

The result depends on the latency used

Reimplemented from OMK::Controller.

Definition at line 566 of file OMKPvmController.cxx.

References _distributedVirtualMachine, OMK::Controller::_stepPeriod, OMK::Controller::getOutputHistorySize(), and OMK::Svm::getSynchronisationLatency().

00566                                             {
00567   // Taille d'une file d'une output pour un controleur local
00568   // nbMinor * 4 --> pour guarantir la recuperation d'au plus 4 valeurs dans la file
00569   // 1 ?? surete ??
00570   int nbrValeurPendantLatence = 1 ;//la latence minimal est un pas de simulation au sens controleur
00571   nbrValeurPendantLatence+=_distributedVirtualMachine->getSynchronisationLatency()/_stepPeriod;
00572   return DistributedController::getOutputHistorySize()+nbrValeurPendantLatence;
00573 }  

ObjectHandle * PvmController::removeObjectFromDataStructures ( const Name nom  )  [protected, virtual]

gestion de la destruction distribuée : destruction des structures locales

Reimplemented from OMK::Controller.

Definition at line 580 of file OMKPvmController.cxx.

References OMK::DistributedController::_duplicatedObjectsMap, OMK::DistributedController::_mirrorObjectsMap, OMK::Controller::_referenceObjectsMap, and OMK::Controller::removeObjectFromDataStructures().

00581 {
00582 
00583   ObjectHandle * result = NULL ;
00584 
00585   if (_referenceObjectsMap.find (nom) != _referenceObjectsMap.end ()) 
00586   {
00587     result = Controller::removeObjectFromDataStructures(nom);
00588   } 
00589   else 
00590 
00591   {
00592     MirrorObjectsContainerType::iterator i = _mirrorObjectsMap.find (nom) ;
00593     if (i != _mirrorObjectsMap.end ()) 
00594     {
00595       MirrorObjectHandle * miroir = i->second ;
00596       _mirrorObjectsMap.erase( i );
00597       result = miroir;
00598     }
00599     else if (_duplicatedObjectsMap.find (nom) != _duplicatedObjectsMap.end()) 
00600     {
00601       //a duplicated object is a referential present in the duplicated objects table
00602       _duplicatedObjectsMap.erase(nom);
00603     }  
00604   } 
00605 
00606   return result ; 
00607 }

void PvmController::synchronizedReceiveAndProcessMessages ( void   )  [protected]

Definition at line 370 of file OMKPvmController.cxx.

References _distributedVirtualMachine, OMK::PvmMessage::SynchronisationMessage, and OMK::Svm::synchroniseReceiveAndProcessMessages().

Referenced by PvmController().

void PvmController::relaxedReceiveAndProcessMessages ( void   )  [protected]

Definition at line 376 of file OMKPvmController.cxx.

References _distributedVirtualMachine, OMK::Svm::relaxedSynchroniseReceiveAndProcessMessages(), and OMK::PvmMessage::SynchronisationMessage.

Referenced by PvmController().


Member Data Documentation

std::vector<Name> OMK::PvmController::_publicIsolatedMirrorObjectsVector

Definition at line 120 of file OMKPvmController.h.

Referenced by getIsolatedMirrorList(), and getNonIsolatedMirrorList().

std::vector<Name> OMK::PvmController::_publicNonIsolatedMirrorObjectsVector

Definition at line 122 of file OMKPvmController.h.

Referenced by getNonIsolatedMirrorList().

Svm* OMK::PvmController::_distributedVirtualMachine [protected]

the distributed virtual machine used by the controller

Definition at line 184 of file OMKPvmController.h.

Referenced by advanceSimulatedDate(), computeNextSimulationStep(), createNewSite(), getDisconnectedProcessusList(), getDisconnectedProcessusMap(), getDistributedVirtualMachine(), getIsolatedMirrorList(), getNonIsolatedMirrorList(), getOutputHistorySize(), getProcessName(), getPurgeDate(), init(), PvmController(), relaxedReceiveAndProcessMessages(), run(), synchronizedReceiveAndProcessMessages(), testIfTroubleOccured(), waitForAnswerToBlockingRequest(), and ~PvmController().

bool OMK::PvmController::_finishing [protected]

boolean indicating the controller is going into shutdown mode.

No blocking calls should be made

Definition at line 193 of file OMKPvmController.h.

Referenced by finish(), sendInitialValuesToMirror(), and waitForAnswerToBlockingRequest().

void(PvmController::* OMK::PvmController::receiveAndProcessMessages)(void) [protected]

to manage the choice of the synchronization

Referenced by computeNextSimulationStep(), and PvmController().


logo OpenMask

Documentation generated on Mon Jun 9 11:46:03 2008

Generated with doxygen by Dimitri van Heesch ,   1997-2007