#include <OMKPvmController.h>
Inheritance diagram for OMK::PvmController:
Public Member Functions | |
PvmController (ObjectDescriptor &arbre, const Date &initialSimulationDate, int argc, char *argv[]) | |
Constructor The constructor initializes the distributed virtual machine. | |
virtual | ~PvmController () |
Destructor. | |
virtual void | run () |
redefine run so that it does nothing unless the controller belongs to a spawned process | |
virtual void | advanceSimulatedDate () |
redefine advanceSimulatedDate to include datation of synchronisation messages any try to send data between a call to computeNextSimualtionStep and advanceSimulatedDate will cause an assertion to fail | |
virtual void | computeNextSimulationStep () |
redefine computeNextSimulationStep to include the synchronisation primitives | |
virtual void | finish () |
redefine to avoid going into blocking mode when finishing the simulation | |
virtual const Name & | getProcessName () const |
get the name of the process this controller is controlling | |
virtual Svm * | getDistributedVirtualMachine () |
get access to the siames distributed virtual machine | |
virtual void | makeSynchronisationMessage (NameToPointerMap< ReferenceObjectHandle > &referenceObjects) |
make the synchronisation message to be sent to all processes | |
virtual void | parseSynchronisationMessage (PvmIncomingMessage *message) |
parse the received synchronisation messages. | |
virtual void | waitForAnswerToBlockingRequest (PvmMessage::MessageTag tag) |
waitForAnswerToBlockingRequest. | |
virtual void | sendInitialValuesToMirror (PvmIncomingMessage &message) |
answer to a MirrorNeedsInitialValues message received by the distributed subsystem | |
virtual void | init () |
redefine for distributed initialisation | |
virtual std::vector< Name > | getIsolatedMirrorList () |
virtual std::vector< Name > | getNonIsolatedMirrorList () |
virtual bool | testIfTroubleOccured () |
virtual void | createNewSite (const Date &) |
virtual std::list< Name > | getDisconnectedProcessusList () |
virtual std::map< Name, int > | getDisconnectedProcessusMap () |
Public Attributes | |
std::vector< Name > | _publicIsolatedMirrorObjectsVector |
std::vector< Name > | _publicNonIsolatedMirrorObjectsVector |
Protected Member Functions | |
virtual Date | getPurgeDate () |
redefine to take latency into account | |
virtual MirrorObjectHandle * | createMirrorObject (ObjectDescriptor *objectDescription) |
Create a miror object. | |
virtual DuplicatedObjectHandle * | newOMKDuplicatedObjectHandle (SimulatedObject &object) |
create an object handle adapted to handling of duplicated objects | |
virtual LocalObjectHandle * | newOMKLocalObjectHandle (SimulatedObject &object) |
CHADI. | |
virtual MirrorObjectHandle * | newOMKMirrorObjectHandle (SimulatedObject &obj) |
create an object handle for objects mirrored by this controller | |
virtual ReferenceObjectHandle * | newOMKReferenceObjectHandle (SimulatedObject &object, Controller &controller, SignalDispatcher *signalDispatcher) |
create a reference object handler appropriate for this controller | |
virtual int | getOutputHistorySize (void) |
get the size of the history fifo needed when running a distributed simulation. | |
virtual ObjectHandle * | removeObjectFromDataStructures (const Name &nom) |
gestion de la destruction distribuée : destruction des structures locales | |
void | synchronizedReceiveAndProcessMessages (void) |
void | relaxedReceiveAndProcessMessages (void) |
Protected Attributes | |
Svm * | _distributedVirtualMachine |
the distributed virtual machine used by the controller | |
bool | _finishing |
boolean indicating the controller is going into shutdown mode. | |
void(PvmController::* | receiveAndProcessMessages )(void) |
to manage the choice of the synchronization |
the local controller is in charge of
Definition at line 47 of file OMKPvmController.h.
PvmController::PvmController | ( | ObjectDescriptor & | arbre, | |
const Date & | initialSimulationDate, | |||
int | argc, | |||
char * | argv[] | |||
) |
Constructor The constructor initializes the distributed virtual machine.
The created distributed virtual machine acts as a server if the process wasn't spawned. Other wise, the controller acts a a local controller synchronized throught the distributed virtual machine to the other controllers
Definition at line 52 of file OMKPvmController.cxx.
References OMK::Controller::_computeStatePointer, _distributedVirtualMachine, OMK::SimulatedObject::_objectHandle, OMK::DistributedController::_processName, OMK::Controller::_simulationTree, OMK::NameToPointerMap< ObjectType >::addObjectWithIndex(), OMK::DuplicatedObjectHandle::addProcessOfDuplicate(), OMK::ParametersAccessor::get(), OMK::ConfigurationParameterDescriptor::getAssociatedString(), OMK::ObjectDescriptor::getDescendants(), OMK::Process::getFrequency(), OMK::Process::getMaxFrequencyOfHostedObjects(), OMK::SimulatedObject::getName(), OMK::MultipleConfigurationParameter::getNameOfSubDescriptor(), OMK::MultipleConfigurationParameter::getNumberOfSubItems(), OMK::NameToPointerMap< ObjectType >::getObjectOfIndex(), OMK::Controller::getSchedulingParametersOfObject(), OMK::Svm::getSiteName(), OMK::ConfigurationParameterDescriptor::getSubDescriptorByName(), OMK::MultipleConfigurationParameter::getSubDescriptorByName(), OMK::MultipleConfigurationParameter::getSubDescriptorByPosition(), OMK::Svm::init(), OMK::ReferenceObjectHandle::initial, OMK::Controller::lcm(), OMK::SystemEventIdentifier::MaskObjectDestroyed, newOMKDuplicatedObjectHandle(), receiveAndProcessMessages, OMK::SimulatedObject::registerForSignalBy(), relaxedReceiveAndProcessMessages(), OMK::Process::setFrequency(), OMK::Process::setMaxHostedFrequency(), OMK::Controller::setProcessOfDescriptor(), and synchronizedReceiveAndProcessMessages().
00056 : DistributedController (initialObjects, initialDate), 00057 _finishing ( false ) 00058 { 00059 00060 Date latence = 20 ; 00061 Date timeOut = 10 ; 00062 std::string synchro ("strong") ; 00063 int deadReckoning = 70 ; 00064 bool yieldNeeded = false ; 00065 NameToPointerMap<Process> * processTable = new NameToPointerMap<Process>() ; 00066 receiveAndProcessMessages = &PvmController::synchronizedReceiveAndProcessMessages ; 00067 00068 MultipleConfigurationParameter * schedulingParameters = getSchedulingParametersOfObject( initialObjects ) ; 00069 if ( schedulingParameters != NULL) 00070 { 00071 ConfigurationParameterDescriptor * param = schedulingParameters->getSubDescriptorByName("Latency") ; 00072 if (param != NULL) 00073 { 00074 latence = atoi (param->getAssociatedString().c_str()) ; 00075 } 00076 std::cerr << "Latency : " << latence << std::endl ; 00077 00078 synchro = "strong" ; 00079 if( ParametersAccessor::get( schedulingParameters, "Synchronization", synchro ) 00080 && ( synchro == "relaxed" ) ) 00081 { 00082 receiveAndProcessMessages = &PvmController::relaxedReceiveAndProcessMessages ; 00083 } 00084 std::cerr << "Synchronization : " << synchro << std::endl ; 00085 00086 ParametersAccessor::get( schedulingParameters, "YieldNeeded", yieldNeeded ) ; 00087 std::cerr << "YieldNeeded : " << yieldNeeded << std::endl ; 00088 00089 const ConfigurationParameterDescriptor * timeOutParamDescriptor = 00090 schedulingParameters->getSubDescriptorByName ("TimeOut") ; 00091 if (timeOutParamDescriptor != NULL) { 00092 timeOut = atoi (timeOutParamDescriptor->getAssociatedString().c_str()) ; 00093 } 00094 std::cerr << "TimeOut : " << timeOut << std::endl ; 00095 00096 const ConfigurationParameterDescriptor * deadReckoningParamDescriptor = 00097 schedulingParameters->getSubDescriptorByName ("DeadReckoningStepInterval") ; 00098 if (deadReckoningParamDescriptor != NULL) { 00099 deadReckoning = atoi (deadReckoningParamDescriptor->getAssociatedString().c_str()) ; 00100 } 00101 std::cerr << "DeadReckoningStepInterval : " << deadReckoning << std::endl ; 00102 00103 param = schedulingParameters->getSubDescriptorByName("Machines") ; 00104 MultipleConfigurationParameter * describedMachines = dynamic_cast<MultipleConfigurationParameter *>(param); 00105 if (describedMachines != NULL) 00106 { 00107 int numberOfMachines = describedMachines->getNumberOfSubItems () ; 00108 for (int machineNumber = 0 ; 00109 machineNumber != numberOfMachines ; 00110 ++machineNumber ) 00111 { 00112 param = describedMachines->getSubDescriptorByPosition ( machineNumber ) ; 00113 processTable->addObjectWithIndex ( describedMachines->getNameOfSubDescriptor (machineNumber), 00114 new Process ( describedMachines->getNameOfSubDescriptor (machineNumber), 00115 param->getAssociatedString() ) 00116 ) ; 00117 } 00118 } 00119 } 00120 00121 // virtual member functions aren't initiallised in constructor 00122 // therefore, recreate the correct objectHandle 00123 delete _objectHandle ; 00124 00125 _objectHandle = NULL ; 00126 00127 DuplicatedObjectHandle * localObjectHandle = newOMKDuplicatedObjectHandle( *this ) ; 00128 _objectHandle = localObjectHandle ; 00129 // TDTD ajout pour fixer bug mémoire : comme le _objectHandle a été effacé, ça ne marche plus... 00130 _computeStatePointer = const_cast<ReferenceObjectHandle::SimulatedObjectComputingState *>(&(dynamic_cast<DuplicatedObjectHandle *>(_objectHandle)->getComputingState () )) ; 00131 00132 for (NameToPointerMap<Process>::const_iterator i = processTable->begin() ; 00133 i != processTable->end () ; 00134 ++i) 00135 { 00136 localObjectHandle->addProcessOfDuplicate ( (*i).second ) ; 00137 } 00138 00139 00140 _distributedVirtualMachine = new PvmSvm (processTable, latence, timeOut, deadReckoning, yieldNeeded, argc, argv) ; 00141 00142 _distributedVirtualMachine->init (initialDate ) ; 00143 00144 _processName = _distributedVirtualMachine->getSiteName () ; 00145 00146 setProcessOfDescriptor ( initialObjects, _processName ) ; 00147 00148 cout << "PvmController::PvmController (...)" 00149 << "of process: " << _distributedVirtualMachine->getSiteName () << endl ; 00150 00151 // initialisation de l'état du controleur 00152 // TDTD suite ajout pour fixer bug mémoire : comme le _objectHandle a été effacé, ça ne marche plus... 00153 assert ( _computeStatePointer != NULL ) ; 00154 *_computeStatePointer = ReferenceObjectHandle::initial ; 00155 00156 00157 //rewritten 2001/12/5 by D. Margery 00158 list < ObjectDescriptor *> * descriptorList = _simulationTree.getDescendants() ; 00159 for (list < ObjectDescriptor *>::iterator descriptorIterator = descriptorList->begin() ; 00160 descriptorIterator != descriptorList->end() ; 00161 ++ descriptorIterator ) 00162 { 00163 Frequency currentFreq = (*descriptorIterator)->getFrequency() ; 00164 Name currentProcess = (*descriptorIterator)->getProcess() ; 00165 Process * currentProcessDescriptor = processTable->getObjectOfIndex( currentProcess ) ; 00166 if ( currentProcessDescriptor != NULL ) 00167 // the process could have been removed by the svm 00168 { 00169 if ( currentFreq > currentProcessDescriptor->getMaxFrequencyOfHostedObjects () ) 00170 { 00171 currentProcessDescriptor->setMaxHostedFrequency (currentFreq) ; 00172 } 00173 currentProcessDescriptor->setFrequency ( Controller::lcm(currentFreq, currentProcessDescriptor->getFrequency() ) ) ; 00174 } 00175 } 00176 delete descriptorList ; 00177 00178 //subscribe to system signals to receive notification on what is happening on other controllers 00179 registerForSignalBy(SystemEventIdentifier::MaskObjectDestroyed, getName()) ; 00180 00181 }
PvmController::~PvmController | ( | ) | [virtual] |
Destructor.
Definition at line 185 of file OMKPvmController.cxx.
References _distributedVirtualMachine, OMK::Svm::disconnectFromDistributedSimulation(), and OMK::Controller::getSimulatedDate().
00186 { 00187 // deconnexion de la machine virtuelle 00188 #ifdef _DEBUGEXECPVM 00189 cout << "PvmController::~PvmController : " << endl; 00190 #endif 00191 _distributedVirtualMachine->disconnectFromDistributedSimulation ( getSimulatedDate () ) ; 00192 delete _distributedVirtualMachine ; 00193 }
void PvmController::run | ( | ) | [virtual] |
redefine run so that it does nothing unless the controller belongs to a spawned process
Reimplemented from OMK::Controller.
Definition at line 198 of file OMKPvmController.cxx.
References OMK::Controller::_date, _distributedVirtualMachine, OMK::SimulatedObject::_objectHandle, OMK::Controller::_referenceObjectsMap, OMK::Controller::_stepPeriod, advanceSimulatedDate(), OMK::Svm::getSynchronisationLatency(), makeSynchronisationMessage(), OMK::ReferenceObjectHandle::processEvents(), OMK::Svm::processReceivedMessages(), OMK::Controller::run(), OMK::Svm::sendCurrentBuffersWithTag(), and OMK::PvmMessage::SynchronisationMessage.
00199 { 00200 if ( pvm_parent() != PvmNoParent ) 00201 { 00202 DistributedController::run() ; 00203 00204 // the simulation is locally finished, so notify all the controllers before exiting 00205 Date lastDate = _date + 2 * _stepPeriod + _distributedVirtualMachine->getSynchronisationLatency() ; 00206 00207 while (_date < lastDate ) 00208 { 00209 #ifdef _DEBUGPVMMESS 00210 cerr<<"PvmController::run(): trying a clean exit"<<endl; 00211 #endif 00212 ReferenceObjectHandle * objectHandle = dynamic_cast<ReferenceObjectHandle *> ( _objectHandle ) ; 00213 00214 assert ( objectHandle != NULL ) ; 00215 00216 //the controller has to process residual system events adressed to itself 00217 objectHandle->processEvents() ; 00218 00219 //send enought empty signals to enable other controllers to finish their job 00220 makeSynchronisationMessage( _referenceObjectsMap ) ; 00221 00222 // send the synchronisation messages 00223 _distributedVirtualMachine->sendCurrentBuffersWithTag (PvmMessage::SynchronisationMessage) ; 00224 00225 //wait a little, to try and get clean exit 00226 #ifdef _MSC_VER 00227 Sleep( 1000 ) ; 00228 #else 00229 sleep(1) ; 00230 #endif 00231 00232 // for cleanup, read any arrived messages 00233 _distributedVirtualMachine->processReceivedMessages (*this, PvmMessage::SynchronisationMessage) ; 00234 00235 advanceSimulatedDate() ; 00236 } 00237 } 00238 00239 }
void PvmController::advanceSimulatedDate | ( | ) | [virtual] |
redefine advanceSimulatedDate to include datation of synchronisation messages any try to send data between a call to computeNextSimualtionStep and advanceSimulatedDate will cause an assertion to fail
Reimplemented from OMK::Controller.
Definition at line 262 of file OMKPvmController.cxx.
References OMK::Controller::_date, _distributedVirtualMachine, OMK::Controller::advanceSimulatedDate(), OMK::Controller::initialSimulationDate, OMK::Svm::sendCurrentBuffersWithTag(), OMK::PvmMessage::SynchronisationMessage, and OMK::Svm::timestampCurrentSendBuffers().
Referenced by run().
00263 { 00264 #ifdef _DEBUGEXECPVM 00265 cerr << "PvmController::advanceSimulatedDate () from " <<_date<<endl; 00266 #endif 00267 //send any messages collected during init 00268 if (_date < initialSimulationDate ) 00269 { 00270 _distributedVirtualMachine->sendCurrentBuffersWithTag (PvmMessage::SynchronisationMessage) ; 00271 } 00272 00273 DistributedController::advanceSimulatedDate () ; 00274 00275 // prepare buffers to receive any message generate during this simulation step 00276 _distributedVirtualMachine->timestampCurrentSendBuffers (_date) ; 00277 }
void PvmController::computeNextSimulationStep | ( | ) | [virtual] |
redefine computeNextSimulationStep to include the synchronisation primitives
Reimplemented from OMK::Controller.
Definition at line 304 of file OMKPvmController.cxx.
References _distributedVirtualMachine, OMK::Controller::_referenceObjectsMap, OMK::Controller::computeNextSimulationStep(), makeSynchronisationMessage(), receiveAndProcessMessages, OMK::Svm::sendCurrentBuffersWithTag(), and OMK::PvmMessage::SynchronisationMessage.
00304 { 00305 00306 #ifdef _DEBUGEXECPVM 00307 cerr << "PvmController::computeNextSimulation : computing yet another simulation step for " 00308 <<notreProcessus()<< endl; 00309 #endif 00310 00311 // compute the simulation step 00312 DistributedController::computeNextSimulationStep () ; 00313 00314 // compute the synchronisation messages 00315 makeSynchronisationMessage ( _referenceObjectsMap ) ; 00316 00317 // send the synchronisation messages 00318 _distributedVirtualMachine->sendCurrentBuffersWithTag (PvmMessage::SynchronisationMessage) ; 00319 00320 // TDTD : pour un fonctionnement en synchro ou relaxed 00321 (this->*receiveAndProcessMessages) () ; 00322 //synchronizedReceiveAndProcessMessages () ; 00323 // TDTD : en cas de desynchro, il faut régler le pb du flush des messages inutiles... ::: à peu près réalisé... 00324 00325 //added by chadi to test if a new process has been added dynamically 00326 // TDTD : à remettre plus tard 00327 //_distributedVirtualMachine->testIfNewProcessAdded () ; 00328 /* 00329 if (testIfTroubleOccured()) 00330 { 00331 bool tico = _distributedVirtualMachine->reconnexionEstablished() ; 00332 } 00333 */ 00334 00335 // { 00336 // cout<<"bien bien bien"<<endl; 00337 // makeSynchronisationMessage ( _referenceObjectsMap ) ; 00338 // _distributedVirtualMachine->sendCurrentBuffersWithTag (PvmMessage::SynchronisationMessage) ; 00339 // } 00340 00341 // if (testIfTroubleOccured()) 00342 // { 00343 // _distributedVirtualMachine->sendPingMessage() ; 00344 // } 00345 00346 00347 00348 // if ( testIfTroubleOccured() ) 00349 // { 00350 // NameToPointerMap<ReferenceObjectHandle>::iterator pTabRef ; 00351 // for (pTabRef = _referenceObjectsMap.begin () ; 00352 // pTabRef != _referenceObjectsMap.end () ; 00353 // pTabRef ++) 00354 // { 00355 00356 // PvmReferenceObjectHandle * pvmObjectHandle = dynamic_cast<PvmReferenceObjectHandle *> ( (*pTabRef).second ) ; 00357 // if ( pvmObjectHandle != NULL ) 00358 // { 00359 // pvmObjectHandle->flushDisconnectedSitesOutgoingBuffer() ; 00360 // } 00361 00362 // } 00363 00364 // } 00365 00366 }
void PvmController::finish | ( | ) | [virtual] |
redefine to avoid going into blocking mode when finishing the simulation
Reimplemented from OMK::Controller.
Definition at line 793 of file OMKPvmController.cxx.
References _finishing, and OMK::Controller::finish().
00794 { 00795 _finishing = true ; 00796 DistributedController::finish() ; 00797 }
const Name & PvmController::getProcessName | ( | ) | const [virtual] |
get the name of the process this controller is controlling
Definition at line 256 of file OMKPvmController.cxx.
References _distributedVirtualMachine, and OMK::Svm::getSiteName().
Referenced by OMK::PvmMirrorObjectHandle::registerToReferenceObject().
00257 { 00258 return (_distributedVirtualMachine->getSiteName ()) ; 00259 }
Svm * PvmController::getDistributedVirtualMachine | ( | ) | [virtual] |
get access to the siames distributed virtual machine
Definition at line 710 of file OMKPvmController.cxx.
References _distributedVirtualMachine.
Referenced by sendInitialValuesToMirror().
00711 { 00712 return _distributedVirtualMachine; 00713 }
void PvmController::makeSynchronisationMessage | ( | NameToPointerMap< ReferenceObjectHandle > & | referenceObjects | ) | [virtual] |
make the synchronisation message to be sent to all processes
Definition at line 382 of file OMKPvmController.cxx.
References OMK::Controller::_date, and OMK::PvmReferenceObjectHandle::makeSynchronisationMessage().
Referenced by computeNextSimulationStep(), and run().
00383 { 00384 static Name endMessage ("_OpenMASKEndOfSynchronisationMessage") ; 00385 00386 #ifdef _DEBUGPVMMESS 00387 cerr<<"PvmController::makeSynchronisationMessage for "<<referenceObjects.size() <<" objects "<<endl; 00388 #endif 00389 00390 NameToPointerMap<ReferenceObjectHandle>::iterator pTabRef ; 00391 // ask all referentials to contribute to the synchronisation message 00392 for (pTabRef = referenceObjects.begin () ; 00393 pTabRef != referenceObjects.end () ; 00394 pTabRef ++) 00395 { 00396 #ifdef _DEBUGPVMMESS 00397 cerr<<"PvmController::makeSynchronisationMessage for object " <<(*pTabRef).second->getSimulatedObject().getName()<<endl; 00398 #endif 00399 00400 PvmReferenceObjectHandle * pvmObjectHandle = dynamic_cast<PvmReferenceObjectHandle *> ( (*pTabRef).second ) ; 00401 if ( pvmObjectHandle != NULL ) 00402 { 00403 pvmObjectHandle->makeSynchronisationMessage ( _date ) ; 00404 } 00405 else 00406 { 00407 #ifdef _DEBUGPVMMESS 00408 cerr<<"No synchronisation message for "<<(*pTabRef).second->getSimulatedObject().getName()<<endl; 00409 #endif 00410 } 00411 #ifdef _DEBUGPVMMESS 00412 cerr<<"PvmController::makeSynchronisationMessage of " <<(*pTabRef).second->getSimulatedObject().getName()<<" done"<<endl; 00413 #endif 00414 } 00415 #ifdef _DEBUGPVMMESS 00416 cerr<<"PvmController::makeSynchronisationMessage done"<<endl; 00417 #endif 00418 }
void PvmController::parseSynchronisationMessage | ( | PvmIncomingMessage * | message | ) | [virtual] |
parse the received synchronisation messages.
this member function is called by the siames virtual machine when it receives synchronisation messages from distant workstations
message | the message to parse |
Definition at line 423 of file OMKPvmController.cxx.
References OMK::DistributedController::_mirrorObjectsMap, OMK::Controller::_referenceObjectsMap, OMK::EventCreator::createEvent(), OMK::SynchronisationMessage::endOfSynchronisationFragment, OMK::Controller::error(), OMK::EventReceived, OMK::Controller::getSimulatedDate(), OMK::Registration, OMK::Event::unpack(), OMK::SimulatedObject::unpack(), OMK::Flowable::unpack(), OMK::MirrorObjectHandle::unpack(), and OMK::Name::unpack().
Referenced by OMK::Svm::processReceivedMessages(), OMK::Svm::relaxedSynchroniseReceiveAndProcessMessages(), OMK::Svm::synchroniseReceiveAndProcessMessages(), OMK::Svm::waitAndProcessMessages(), OMK::Svm::waitForAnswerToBlockingRequest(), OMK::Svm::waitForMessage(), and OMK::Svm::waitForMessageFrom().
00424 { 00425 00426 ReferenceObjectHandle * refer ; // Referentiel de l'objet destinataire 00427 MirrorObjectHandle * miroir ; // Miroir de l'objet destinataire 00428 Name nomObj ; // Nom du miroir destinataire 00429 bool stillMessagesToParse = true ; 00430 00431 #ifdef _DEBUGPVMMESS 00432 cerr << "PvmController::parseSynchronisationMessage at "<<getSimulatedDate()<<":"<< endl ; 00433 #endif 00434 00435 00436 while ( stillMessagesToParse ) 00437 { 00438 // get the message recepient 00439 nomObj.unpack ( *message ) ; 00440 00441 00442 #ifdef _DEBUGPVMMESS 00443 cerr << "PvmController::parseSynchronisationMessage: message received for " << nomObj << endl; 00444 #endif 00445 if ( nomObj == SynchronisationMessage::endOfSynchronisationFragment ) 00446 // the synchronisation message has been completely parsed 00447 { 00448 stillMessagesToParse = false ; 00449 } 00450 else 00451 { 00452 MirrorObjectsContainerType::iterator i = _mirrorObjectsMap.find ( nomObj ) ; 00453 if (i != _mirrorObjectsMap.end ()) 00454 { 00455 // On lui transmet le message 00456 #ifdef _DEBUGPVMMESS 00457 cout << "PvmController::parseSynchronisationMessage: for miror" << endl ; 00458 #endif 00459 miroir = i->second ; 00460 miroir->unpack ( *message ) ; 00461 } 00462 else if (_referenceObjectsMap.find (nomObj) != _referenceObjectsMap.end ()) 00463 { 00464 // On lui transmet le message 00465 #ifdef _DEBUGPVMMESS 00466 std::cout << "PvmController::parseSynchronisationMessage : for referential : " << nomObj <<std::endl ; 00467 //std::cout << "message : " << *message << std::endl ; 00468 #endif 00469 refer = dynamic_cast<PvmReferenceObjectHandle *>(_referenceObjectsMap.getObjectOfIndex (nomObj) ) ; 00470 if ( refer != NULL ) 00471 { 00472 refer->unpack (*message) ; 00473 } 00474 else 00475 { 00476 dynamic_cast<PvmDuplicatedObjectHandle *>(_referenceObjectsMap.getObjectOfIndex (nomObj) )->unpack ( *message ) ; 00477 } 00478 } 00479 else if (nomObj == "") 00480 { 00481 #ifdef _DEBUGPVMMESS 00482 cout << "PvmController::parseSynchronisationMessage :CL : message vide d'abonnement ... pour synchro" << endl ; 00483 #endif 00484 } 00485 else 00486 { 00487 #ifdef _DEBUGPVMMESS 00488 cerr << "PvmController::parseSynchronisationMessage: "<< endl ; 00489 cerr << "for unknown object: <" << nomObj << "> (was probably destroyed)" << endl ; 00490 error ("PvmController::parseSynchronisationMessage : Objet ni miroir ni referentiel") ; 00491 #endif 00492 //probably a request for an object that was destroyed 00493 int typeMess ; 00494 *message >> typeMess ; 00495 // hope it is a registration for a just destroyed referential, and interprete it 00496 if ( typeMess == Registration ) 00497 { 00498 Name processName ; 00499 *message >> processName ; 00500 } 00501 else if ( typeMess == EventReceived ) 00502 { 00503 Event * event ; 00504 Name classToCreate ; 00505 EventIdentifier eventId; 00506 Date eventDate; 00507 Name sender, receiver ; 00508 *message >> classToCreate >> eventId>> eventDate>>sender>>receiver ; 00509 event = EventCreator::createEvent (classToCreate, eventId, eventDate, sender, receiver ) ; 00510 event->unpack ( *message ) ; 00511 //cerr << "PvmController::parseSynchronisationMessage event "<<*event<<" undelivered"<<endl; 00512 delete event ; 00513 } 00514 } 00515 } 00516 } 00517 #ifdef _DEBUGPVMMESS 00518 cerr << "PvmController::parseSynchronisationMessage : message parsed"<< endl ; 00519 #endif 00520 }
void PvmController::waitForAnswerToBlockingRequest | ( | PvmMessage::MessageTag | tag | ) | [virtual] |
waitForAnswerToBlockingRequest.
make the controller fall bake in a server mode until message tag arrives and is succesfully processed
Definition at line 784 of file OMKPvmController.cxx.
References _distributedVirtualMachine, _finishing, and OMK::Svm::waitForAnswerToBlockingRequest().
Referenced by OMK::PvmMirrorObjectHandle::registerToReferenceObject().
00785 { 00786 if ( ! _finishing ) 00787 // avoid blocking the controller while terminating, because other controllers could have exited 00788 { 00789 _distributedVirtualMachine->waitForAnswerToBlockingRequest ( *this, tag ) ; 00790 } 00791 }
void PvmController::sendInitialValuesToMirror | ( | PvmIncomingMessage & | message | ) | [virtual] |
answer to a MirrorNeedsInitialValues message received by the distributed subsystem
Definition at line 718 of file OMKPvmController.cxx.
References OMK::Controller::_date, OMK::Controller::_deletedObjectHandles, _finishing, OMK::Controller::_referenceObjectsMap, OMK::SynchronisationMessage::endOfSynchronisationFragment, getDistributedVirtualMachine(), OMK::Svm::getLinkToProcessNamed(), OMK::SvmLink::getTID(), OMK::PvmMessage::InitialValuesForMirror, OMK::PvmOutgoingMessage::insertTimeStamp(), OMK::Name::pack(), OMK::PvmReferenceObjectHandle::packInitialValues(), OMK::PvmUnicastMessage::send(), and OMK::Name::unpack().
Referenced by OMK::Svm::relaxedSynchroniseReceiveAndProcessMessages(), OMK::Svm::synchroniseReceiveAndProcessMessages(), and OMK::Svm::waitForAnswerToBlockingRequest().
00719 { 00720 if ( ! _finishing ) 00721 { 00722 // first find out the name of the objects whose initial values are needed 00723 Name referentialName ; 00724 referentialName.unpack ( message ) ; 00725 00726 00727 #ifdef _DEBUGEXECPVM 00728 cerr<<"Looking for initial values of "<<referentialName<<endl; 00729 #endif 00730 00731 //then find the corresponding referential 00732 NameToPointerMap<ReferenceObjectHandle>::iterator i = _referenceObjectsMap.find ( referentialName ) ; 00733 00734 PvmReferenceObjectHandle * referential = NULL ; 00735 if ( i != _referenceObjectsMap.end() ) 00736 { 00737 referential = dynamic_cast<PvmReferenceObjectHandle *> (i->second) ; 00738 } 00739 else 00740 { 00741 //the referential might have been destroyed: look for it in the list of deleted object handles 00742 list <pair <Date, ObjectHandle *> >::iterator i = _deletedObjectHandles.begin() ; 00743 while ( i != _deletedObjectHandles.end() ) 00744 { 00745 if ( i->second->getSimulatedObject().getName() == referentialName ) 00746 { 00747 referential = dynamic_cast<PvmReferenceObjectHandle *> ( i->second ); 00748 i = _deletedObjectHandles.end() ; 00749 } 00750 else 00751 { 00752 ++ i ; 00753 } 00754 } 00755 } 00756 00757 assert ( referential != NULL ) ; 00758 00759 00760 //find the name of the requesting process 00761 Name mirrorProcessName ; 00762 mirrorProcessName.unpack( message ) ; 00763 00764 //then find the link to the corresponding process 00765 SvmLink * process = getDistributedVirtualMachine()->getLinkToProcessNamed ( mirrorProcessName ) ; 00766 00767 assert ( process != NULL ) ; 00768 00769 //build the answer 00770 PvmUnicastMessage * urgentAnswer = new PvmUnicastMessage ( process->getTID() ) ; 00771 00772 urgentAnswer->insertTimeStamp( _date ) ; 00773 00774 referential->packInitialValues ( * urgentAnswer ) ; 00775 00776 SynchronisationMessage::endOfSynchronisationFragment.pack ( * urgentAnswer ) ; 00777 00778 // and send it 00779 urgentAnswer->send ( PvmMessage::InitialValuesForMirror ) ; 00780 } 00781 }
void PvmController::init | ( | ) | [virtual] |
redefine for distributed initialisation
Reimplemented from OMK::Controller.
Definition at line 529 of file OMKPvmController.cxx.
References OMK::Controller::_date, _distributedVirtualMachine, OMK::Controller::init(), OMK::PvmMessage::LocalInitSuccessfull, OMK::Svm::sendCurrentBuffersWithTag(), OMK::PvmMessage::SynchronisationMessage, OMK::Svm::synchronizeOn(), OMK::DistributedController::tableDesNonInitialises, and OMK::Svm::timestampCurrentSendBuffers().
00530 { 00531 00532 if ( pvm_parent() != PvmNoParent ) 00533 { 00534 //on tente l'initialisation normale : pour chaque init, cette initialisation appelle initObjetReferentiel 00535 #ifdef _DEBUGDISTRIBUTEDINIT 00536 cerr<<"PvmController::init preparing the outgoing message buffers "<<endl; 00537 #endif 00538 _distributedVirtualMachine->timestampCurrentSendBuffers ( _date ) ; 00539 00540 // try a classic initialisation 00541 Controller::init () ; 00542 00543 //initialisation shouldn't fail 00544 assert ( tableDesNonInitialises.empty () ) ; 00545 00546 //here, as advanceSimulatedDate has been called to restore the initial simulation date, no need to timestamp messages 00547 00548 #ifdef _DEBUGDISTRIBUTEDINIT 00549 cerr<<"PvmController::init : classic initialisation tried"<<endl; 00550 #endif 00551 // send all registrations 00552 _distributedVirtualMachine->sendCurrentBuffersWithTag (PvmMessage::SynchronisationMessage) ; 00553 00554 // broadcast a empty message signalling our initialisation is finished, and wait for all processes 00555 _distributedVirtualMachine->synchronizeOn ( *this, PvmMessage::LocalInitSuccessfull ) ; 00556 00557 00558 //restore state to the supposed state resulting of a call to the ancestor init 00559 _distributedVirtualMachine->timestampCurrentSendBuffers (_date) ; 00560 } 00561 00562 }
vector< Name > PvmController::getIsolatedMirrorList | ( | ) | [virtual] |
Reimplemented from OMK::DistributedController.
Definition at line 631 of file OMKPvmController.cxx.
References _distributedVirtualMachine, OMK::DistributedController::_mirrorObjectsMap, _publicIsolatedMirrorObjectsVector, and OMK::Svm::disconnectedProcess().
00632 { 00633 00634 for ( MirrorObjectsContainerType::iterator i = _mirrorObjectsMap.begin(); i!= _mirrorObjectsMap.end(); i++ ) 00635 { 00636 Name referentialProcess = i->second->getSimulatedObject().getObjectDescriptor().getProcess(); 00637 if ( _distributedVirtualMachine->disconnectedProcess( referentialProcess ) ) 00638 { 00639 _publicIsolatedMirrorObjectsVector.push_back( i->first) ; 00640 } 00641 } 00642 00643 return _publicIsolatedMirrorObjectsVector; 00644 }
vector< Name > PvmController::getNonIsolatedMirrorList | ( | ) | [virtual] |
Reimplemented from OMK::DistributedController.
Definition at line 646 of file OMKPvmController.cxx.
References _distributedVirtualMachine, OMK::DistributedController::_mirrorObjectsMap, _publicIsolatedMirrorObjectsVector, _publicNonIsolatedMirrorObjectsVector, OMK::Svm::disconnectedProcess(), and OMK::Svm::disconnectedTableIsEmpty().
00647 { 00648 00649 if (_distributedVirtualMachine->disconnectedTableIsEmpty()) 00650 { 00651 _publicIsolatedMirrorObjectsVector.clear(); 00652 return _publicIsolatedMirrorObjectsVector; 00653 } 00654 else 00655 { 00656 00657 for ( MirrorObjectsContainerType::iterator i = _mirrorObjectsMap.begin(); i!= _mirrorObjectsMap.end(); i++ ) 00658 { 00659 Name referentialProcess = i->second->getSimulatedObject().getObjectDescriptor().getProcess(); 00660 if ( _distributedVirtualMachine->disconnectedProcess( referentialProcess ) ) 00661 { 00662 00663 } 00664 else 00665 { 00666 _publicNonIsolatedMirrorObjectsVector.push_back( i->first) ; 00667 } 00668 } 00669 00670 return _publicNonIsolatedMirrorObjectsVector; 00671 } 00672 }
bool PvmController::testIfTroubleOccured | ( | ) | [virtual] |
Reimplemented from OMK::DistributedController.
Definition at line 675 of file OMKPvmController.cxx.
References _distributedVirtualMachine, and OMK::Svm::disconnectedTableIsEmpty().
00676 { 00677 // if (_distributedVirtualMachine->getTroubleFlag() ) 00678 if (_distributedVirtualMachine->disconnectedTableIsEmpty() ) 00679 { 00680 return false; 00681 } 00682 else 00683 return true ; 00684 // dans le cas ou il y a plusieurs sites > 2 , si 2 sites sont deconnectes puis un des 2 s'est retabli alors 00685 // le troubleFlag est Faux par contre on a tjrs un site deconnecte donc le trouble existe encore .... 00686 /* else if (_distributedVirtualMachine->disconnectedTableIsEmpty() ) 00687 { 00688 return false; 00689 } 00690 return true; 00691 */ 00692 }
void PvmController::createNewSite | ( | const Date & | ) | [virtual] |
Reimplemented from OMK::DistributedController.
Definition at line 694 of file OMKPvmController.cxx.
References _distributedVirtualMachine, and OMK::Svm::addNewSiteRequest().
00695 { 00696 _distributedVirtualMachine->addNewSiteRequest (date); 00697 }
std::list< Name > PvmController::getDisconnectedProcessusList | ( | ) | [virtual] |
Reimplemented from OMK::Controller.
Definition at line 808 of file OMKPvmController.cxx.
References _distributedVirtualMachine, and OMK::Svm::getDisconnectedProcessusList().
00808 { 00809 return _distributedVirtualMachine->getDisconnectedProcessusList () ; 00810 }
Reimplemented from OMK::Controller.
Definition at line 812 of file OMKPvmController.cxx.
References _distributedVirtualMachine, and OMK::Svm::getDisconnectedProcessusMap().
00812 { 00813 return _distributedVirtualMachine->getDisconnectedProcessusMap () ; 00814 }
Date PvmController::getPurgeDate | ( | ) | [protected, virtual] |
redefine to take latency into account
Reimplemented from OMK::Controller.
Definition at line 800 of file OMKPvmController.cxx.
References _distributedVirtualMachine, OMK::Controller::getPurgeDate(), and OMK::Svm::getSynchronisationLatency().
00801 { 00802 //in the worst case we could reveive message from an object whose simulation date is current date - _latency -stepPeriod ; 00803 // make sure by doubling latency 00804 return DistributedController::getPurgeDate() - 2 * _distributedVirtualMachine->getSynchronisationLatency() ; 00805 }
MirrorObjectHandle * PvmController::createMirrorObject | ( | ObjectDescriptor * | objectDescription | ) | [protected, virtual] |
Create a miror object.
Reimplemented from OMK::DistributedController.
Definition at line 243 of file OMKPvmController.cxx.
References OMK::DistributedController::createMirrorObject(), and OMK::MirrorObjectHandle::registerToReferenceObject().
00244 { 00245 MirrorObjectHandle * mirror = DistributedController::createMirrorObject( objectDescription ) ; 00246 00247 // Register the mirror to the reference object for regular updates of outputs 00248 mirror->registerToReferenceObject (true) ; 00249 00250 return mirror ; 00251 }
DuplicatedObjectHandle * PvmController::newOMKDuplicatedObjectHandle | ( | SimulatedObject & | object | ) | [protected, virtual] |
create an object handle adapted to handling of duplicated objects
object | : the object the object handle will have to handle : the adapted duplicated object handle |
Implements OMK::DistributedController.
Definition at line 618 of file OMKPvmController.cxx.
Referenced by PvmController().
00619 { 00620 return new PvmDuplicatedObjectHandle( obj, *this ); 00621 }
LocalObjectHandle * PvmController::newOMKLocalObjectHandle | ( | SimulatedObject & | object | ) | [protected, virtual] |
CHADI.
Implements OMK::DistributedController.
Definition at line 625 of file OMKPvmController.cxx.
00626 { 00627 return new PvmLocalObjectHandle( obj, *this ); 00628 }
MirrorObjectHandle * PvmController::newOMKMirrorObjectHandle | ( | SimulatedObject & | obj | ) | [protected, virtual] |
create an object handle for objects mirrored by this controller
object | : the object the object handle will have to handle : the mirror object handle |
Implements OMK::DistributedController.
Definition at line 612 of file OMKPvmController.cxx.
00613 { 00614 return new PvmMirrorObjectHandle(obj); 00615 }
ReferenceObjectHandle * PvmController::newOMKReferenceObjectHandle | ( | SimulatedObject & | object, | |
Controller & | controller, | |||
SignalDispatcher * | signalDispatcher | |||
) | [protected, virtual] |
create a reference object handler appropriate for this controller
obj | the object to handle the reference object created |
Reimplemented from OMK::Controller.
Definition at line 703 of file OMKPvmController.cxx.
00706 { 00707 return new PvmReferenceObjectHandle(object, controller, signalDispatcher ); 00708 }
int PvmController::getOutputHistorySize | ( | void | ) | [protected, virtual] |
get the size of the history fifo needed when running a distributed simulation.
The result depends on the latency used
Reimplemented from OMK::Controller.
Definition at line 566 of file OMKPvmController.cxx.
References _distributedVirtualMachine, OMK::Controller::_stepPeriod, OMK::Controller::getOutputHistorySize(), and OMK::Svm::getSynchronisationLatency().
00566 { 00567 // Taille d'une file d'une output pour un controleur local 00568 // nbMinor * 4 --> pour guarantir la recuperation d'au plus 4 valeurs dans la file 00569 // 1 ?? surete ?? 00570 int nbrValeurPendantLatence = 1 ;//la latence minimal est un pas de simulation au sens controleur 00571 nbrValeurPendantLatence+=_distributedVirtualMachine->getSynchronisationLatency()/_stepPeriod; 00572 return DistributedController::getOutputHistorySize()+nbrValeurPendantLatence; 00573 }
ObjectHandle * PvmController::removeObjectFromDataStructures | ( | const Name & | nom | ) | [protected, virtual] |
gestion de la destruction distribuée : destruction des structures locales
Reimplemented from OMK::Controller.
Definition at line 580 of file OMKPvmController.cxx.
References OMK::DistributedController::_duplicatedObjectsMap, OMK::DistributedController::_mirrorObjectsMap, OMK::Controller::_referenceObjectsMap, and OMK::Controller::removeObjectFromDataStructures().
00581 { 00582 00583 ObjectHandle * result = NULL ; 00584 00585 if (_referenceObjectsMap.find (nom) != _referenceObjectsMap.end ()) 00586 { 00587 result = Controller::removeObjectFromDataStructures(nom); 00588 } 00589 else 00590 00591 { 00592 MirrorObjectsContainerType::iterator i = _mirrorObjectsMap.find (nom) ; 00593 if (i != _mirrorObjectsMap.end ()) 00594 { 00595 MirrorObjectHandle * miroir = i->second ; 00596 _mirrorObjectsMap.erase( i ); 00597 result = miroir; 00598 } 00599 else if (_duplicatedObjectsMap.find (nom) != _duplicatedObjectsMap.end()) 00600 { 00601 //a duplicated object is a referential present in the duplicated objects table 00602 _duplicatedObjectsMap.erase(nom); 00603 } 00604 } 00605 00606 return result ; 00607 }
void PvmController::synchronizedReceiveAndProcessMessages | ( | void | ) | [protected] |
Definition at line 370 of file OMKPvmController.cxx.
References _distributedVirtualMachine, OMK::PvmMessage::SynchronisationMessage, and OMK::Svm::synchroniseReceiveAndProcessMessages().
Referenced by PvmController().
00370 { 00371 _distributedVirtualMachine->synchroniseReceiveAndProcessMessages (*this, PvmMessage::SynchronisationMessage) ; 00372 }
void PvmController::relaxedReceiveAndProcessMessages | ( | void | ) | [protected] |
Definition at line 376 of file OMKPvmController.cxx.
References _distributedVirtualMachine, OMK::Svm::relaxedSynchroniseReceiveAndProcessMessages(), and OMK::PvmMessage::SynchronisationMessage.
Referenced by PvmController().
00376 { 00377 _distributedVirtualMachine->relaxedSynchroniseReceiveAndProcessMessages (*this, PvmMessage::SynchronisationMessage) ; 00378 }
Definition at line 120 of file OMKPvmController.h.
Referenced by getIsolatedMirrorList(), and getNonIsolatedMirrorList().
Svm* OMK::PvmController::_distributedVirtualMachine [protected] |
the distributed virtual machine used by the controller
Definition at line 184 of file OMKPvmController.h.
Referenced by advanceSimulatedDate(), computeNextSimulationStep(), createNewSite(), getDisconnectedProcessusList(), getDisconnectedProcessusMap(), getDistributedVirtualMachine(), getIsolatedMirrorList(), getNonIsolatedMirrorList(), getOutputHistorySize(), getProcessName(), getPurgeDate(), init(), PvmController(), relaxedReceiveAndProcessMessages(), run(), synchronizedReceiveAndProcessMessages(), testIfTroubleOccured(), waitForAnswerToBlockingRequest(), and ~PvmController().
bool OMK::PvmController::_finishing [protected] |
boolean indicating the controller is going into shutdown mode.
No blocking calls should be made
Definition at line 193 of file OMKPvmController.h.
Referenced by finish(), sendInitialValuesToMirror(), and waitForAnswerToBlockingRequest().
void(PvmController::* OMK::PvmController::receiveAndProcessMessages)(void) [protected] |
to manage the choice of the synchronization
Referenced by computeNextSimulationStep(), and PvmController().
Documentation generated on Mon Jun 9 11:46:03 2008 |
Generated with doxygen by Dimitri van Heesch , 1997-2007 |