OMKPvmController.cxx

Go to the documentation of this file.
00001 /*
00002 * This file is part of openMask © INRIA, CNRS, Universite de Rennes 1 1993-2002, thereinafter the Software
00003 * 
00004 * The Software has been developped within the Siames Project. 
00005 * INRIA, the University of Rennes 1 and CNRS jointly hold intellectual property rights
00006 * 
00007 * The Software has been registered with the Agence pour la Protection des
00008 * Programmes (APP) under registration number IDDN.FR.001.510008.00.S.P.2001.000.41200
00009 *  
00010 * This file may be distributed under the terms of the Q Public License
00011 * version 1.0 as defined by Trolltech AS of Norway and appearing in the file
00012 * LICENSE.QPL included in the packaging of this file.
00013 *
00014 * Licensees holding valid specific licenses issued by INRIA, CNRS or Université de Rennes 1 
00015 * for the software may use this file in accordance with that specific license
00016 *
00017 */
00018 #include <cassert>
00019 #include <OMKPvmController.h>
00020 #include "OMKSynchronisationMessage.h"
00021 
00022 #include <OMKPvmSvm.h>
00023 #include <OMKSvmLink.h>
00024 #include "OMKPvmUnicastMessage.h"
00025 
00026 #include <OMKObjectDescriptor.h>
00027 #include <OMKSimulatedObject.h>
00028 #include <OMKProcess.h>
00029 #include "OMKParametersAccessor.h"
00030 #include "OMKParametersAccessor.inl"
00031 #include <OMKPvmMirrorObjectHandle.h>
00032 #include <OMKPvmReferenceObjectHandle.h>
00033 #include <OMKPvmDuplicatedObjectHandle.h>
00034 
00035 //-------------chadi ajout 1/2 -------------
00036 #include <OMKPvmLocalObjectHandle.h>
00037 //---------------fin 1/2 -------------------
00038 
00039 #include <OMKDoubleListElement.h>
00040 #include <OMKDoubleList.h>
00041 #include "OMKSystemEventIdentifier.h"
00042 #include "OMKMultipleConfigurationParameter.h"
00043 
00044 #include "OMKPvmException.h"
00045 #include <pvm3.h>
00046 
00047 using namespace std;
00048 using namespace OMK ;
00049 
00050 //------------------------------------------------------------------------
00051 
00052 PvmController::PvmController (ObjectDescriptor & initialObjects,
00053                                     const Date & initialDate,
00054                                     int argc,
00055                                     char * argv [])
00056                                     : DistributedController (initialObjects, initialDate),
00057                                     _finishing ( false ) 
00058 {
00059 
00060   Date latence = 20 ;
00061   Date timeOut = 10 ;
00062   std::string synchro ("strong") ;
00063   int deadReckoning = 70 ;
00064   bool yieldNeeded = false ;
00065   NameToPointerMap<Process> * processTable = new NameToPointerMap<Process>() ;
00066   receiveAndProcessMessages = &PvmController::synchronizedReceiveAndProcessMessages ;
00067 
00068   MultipleConfigurationParameter * schedulingParameters = getSchedulingParametersOfObject( initialObjects ) ;
00069   if ( schedulingParameters != NULL)
00070   {
00071     ConfigurationParameterDescriptor * param = schedulingParameters->getSubDescriptorByName("Latency") ;
00072     if (param != NULL)
00073     {
00074       latence = atoi (param->getAssociatedString().c_str()) ;
00075     }
00076     std::cerr << "Latency : " << latence << std::endl ;
00077 
00078     synchro = "strong" ;
00079     if( ParametersAccessor::get( schedulingParameters, "Synchronization", synchro ) 
00080       && ( synchro == "relaxed" ) )
00081     {
00082       receiveAndProcessMessages = &PvmController::relaxedReceiveAndProcessMessages ;
00083     }
00084     std::cerr << "Synchronization : " << synchro << std::endl ;
00085 
00086     ParametersAccessor::get( schedulingParameters, "YieldNeeded", yieldNeeded ) ;
00087     std::cerr << "YieldNeeded : " << yieldNeeded << std::endl ;
00088 
00089     const ConfigurationParameterDescriptor * timeOutParamDescriptor = 
00090       schedulingParameters->getSubDescriptorByName ("TimeOut") ;
00091     if (timeOutParamDescriptor != NULL) {
00092       timeOut = atoi (timeOutParamDescriptor->getAssociatedString().c_str()) ;
00093     }
00094     std::cerr << "TimeOut : " << timeOut << std::endl ;
00095 
00096     const ConfigurationParameterDescriptor * deadReckoningParamDescriptor = 
00097       schedulingParameters->getSubDescriptorByName ("DeadReckoningStepInterval") ;
00098     if (deadReckoningParamDescriptor != NULL) {
00099       deadReckoning = atoi (deadReckoningParamDescriptor->getAssociatedString().c_str()) ;
00100     }
00101     std::cerr << "DeadReckoningStepInterval : " << deadReckoning << std::endl ;
00102 
00103     param = schedulingParameters->getSubDescriptorByName("Machines") ;
00104     MultipleConfigurationParameter * describedMachines = dynamic_cast<MultipleConfigurationParameter *>(param);
00105     if (describedMachines != NULL)
00106     {
00107       int numberOfMachines = describedMachines->getNumberOfSubItems () ;
00108       for (int machineNumber = 0 ;
00109         machineNumber != numberOfMachines ;
00110         ++machineNumber )
00111       {
00112         param = describedMachines->getSubDescriptorByPosition ( machineNumber ) ;
00113         processTable->addObjectWithIndex ( describedMachines->getNameOfSubDescriptor (machineNumber),
00114           new Process ( describedMachines->getNameOfSubDescriptor (machineNumber),
00115           param->getAssociatedString() )
00116           ) ;
00117       }
00118     }
00119   }
00120 
00121   // virtual member functions aren't initiallised in constructor
00122   // therefore, recreate the correct objectHandle
00123   delete _objectHandle ;
00124 
00125   _objectHandle = NULL ;
00126 
00127   DuplicatedObjectHandle * localObjectHandle = newOMKDuplicatedObjectHandle( *this ) ;
00128   _objectHandle = localObjectHandle ;
00129   // TDTD ajout pour fixer bug mémoire : comme le _objectHandle a été effacé, ça ne marche plus...
00130   _computeStatePointer = const_cast<ReferenceObjectHandle::SimulatedObjectComputingState *>(&(dynamic_cast<DuplicatedObjectHandle *>(_objectHandle)->getComputingState () )) ;
00131 
00132   for (NameToPointerMap<Process>::const_iterator i = processTable->begin() ;
00133     i != processTable->end () ;
00134     ++i) 
00135   {
00136     localObjectHandle->addProcessOfDuplicate ( (*i).second ) ;
00137   }
00138 
00139 
00140   _distributedVirtualMachine = new PvmSvm (processTable, latence, timeOut, deadReckoning, yieldNeeded, argc, argv) ;
00141 
00142   _distributedVirtualMachine->init (initialDate ) ;
00143 
00144   _processName = _distributedVirtualMachine->getSiteName () ;
00145 
00146   setProcessOfDescriptor ( initialObjects, _processName ) ;
00147 
00148   cout << "PvmController::PvmController (...)" 
00149     << "of process: " << _distributedVirtualMachine->getSiteName () << endl ;
00150 
00151   // initialisation de l'état du controleur
00152   // TDTD suite ajout pour fixer bug mémoire : comme le _objectHandle a été effacé, ça ne marche plus...
00153   assert ( _computeStatePointer != NULL ) ;
00154   *_computeStatePointer = ReferenceObjectHandle::initial ;
00155 
00156 
00157   //rewritten 2001/12/5 by D. Margery
00158   list < ObjectDescriptor *> * descriptorList = _simulationTree.getDescendants() ;
00159   for (list < ObjectDescriptor *>::iterator descriptorIterator = descriptorList->begin() ;
00160     descriptorIterator != descriptorList->end() ;
00161     ++ descriptorIterator )
00162   {
00163     Frequency currentFreq = (*descriptorIterator)->getFrequency() ;
00164     Name currentProcess = (*descriptorIterator)->getProcess() ;
00165     Process * currentProcessDescriptor = processTable->getObjectOfIndex( currentProcess ) ;
00166     if ( currentProcessDescriptor != NULL )
00167       // the process could have been removed by the svm
00168     {
00169       if ( currentFreq > currentProcessDescriptor->getMaxFrequencyOfHostedObjects () ) 
00170       {
00171         currentProcessDescriptor->setMaxHostedFrequency (currentFreq) ;
00172       }
00173       currentProcessDescriptor->setFrequency ( Controller::lcm(currentFreq, currentProcessDescriptor->getFrequency() ) ) ;
00174     }
00175   }
00176   delete descriptorList ;
00177 
00178   //subscribe to system signals to receive notification on what is happening on other controllers
00179   registerForSignalBy(SystemEventIdentifier::MaskObjectDestroyed, getName()) ;
00180 
00181 }
00182 
00183 //------------------------------------------------------------------------------
00184 
00185 PvmController::~PvmController () 
00186 {
00187   // deconnexion de la machine virtuelle
00188 #ifdef _DEBUGEXECPVM
00189   cout << "PvmController::~PvmController : " << endl;
00190 #endif
00191   _distributedVirtualMachine->disconnectFromDistributedSimulation ( getSimulatedDate () ) ;
00192   delete _distributedVirtualMachine ;   
00193 }
00194 
00195 
00196 //------------------------------------------------------------------------------
00197 
00198 void PvmController::run()
00199 {
00200   if ( pvm_parent() != PvmNoParent )
00201   {
00202     DistributedController::run() ;
00203 
00204     // the simulation is locally finished, so notify all the controllers before exiting 
00205     Date lastDate = _date + 2 * _stepPeriod + _distributedVirtualMachine->getSynchronisationLatency() ;
00206 
00207     while (_date < lastDate )
00208     {
00209 #ifdef _DEBUGPVMMESS
00210       cerr<<"PvmController::run(): trying a clean exit"<<endl;
00211 #endif
00212       ReferenceObjectHandle * objectHandle = dynamic_cast<ReferenceObjectHandle *> ( _objectHandle ) ;
00213 
00214       assert ( objectHandle != NULL ) ;
00215 
00216       //the controller has to process residual system events adressed to itself
00217       objectHandle->processEvents() ;
00218 
00219       //send enought empty signals to enable other controllers to finish their job
00220       makeSynchronisationMessage( _referenceObjectsMap ) ;
00221 
00222       // send the synchronisation messages
00223       _distributedVirtualMachine->sendCurrentBuffersWithTag (PvmMessage::SynchronisationMessage) ; 
00224 
00225       //wait a little, to try and get clean exit
00226 #ifdef _MSC_VER
00227       Sleep( 1000 ) ;
00228 #else
00229       sleep(1) ;
00230 #endif
00231 
00232       // for cleanup, read any arrived messages
00233       _distributedVirtualMachine->processReceivedMessages (*this, PvmMessage::SynchronisationMessage) ;
00234 
00235       advanceSimulatedDate() ;
00236     } 
00237   }
00238 
00239 }
00240 
00241 
00242 
00243 MirrorObjectHandle * PvmController::createMirrorObject (ObjectDescriptor * objectDescription) 
00244 {
00245   MirrorObjectHandle * mirror =  DistributedController::createMirrorObject( objectDescription ) ;
00246 
00247   // Register the mirror to the reference object for regular updates of outputs
00248   mirror->registerToReferenceObject (true) ;
00249 
00250   return mirror ;
00251 }
00252 
00253 
00254 //------------------------------------------------------------------------------
00255 
00256 const Name & PvmController::getProcessName () const 
00257 {
00258   return (_distributedVirtualMachine->getSiteName ()) ;
00259 }
00260 
00261 //------------------------------------------------------------------------------
00262 void PvmController::advanceSimulatedDate () 
00263 {
00264 #ifdef _DEBUGEXECPVM
00265   cerr << "PvmController::advanceSimulatedDate () from " <<_date<<endl;
00266 #endif
00267   //send any messages collected during init
00268   if (_date < initialSimulationDate )
00269   {
00270     _distributedVirtualMachine->sendCurrentBuffersWithTag (PvmMessage::SynchronisationMessage) ;  
00271   }
00272 
00273   DistributedController::advanceSimulatedDate () ;
00274 
00275   // prepare buffers to receive any message generate during this simulation step
00276   _distributedVirtualMachine->timestampCurrentSendBuffers (_date) ;   
00277 }
00278 
00279 
00280 // void PvmController::computeNextSimulationStep () {
00281 // #ifdef _DEBUGEXECPVM
00282 //    cerr << "PvmController::computeNextSimulation : computing yet another simulation step for " 
00283 //      <<notreProcessus()<< endl;
00284 // #endif
00285 
00286 //    // compute the simulation step
00287 //    DistributedController::computeNextSimulationStep () ;
00288 
00289 //    // compute the synchronisation messages
00290 //    makeSynchronisationMessage ( _referenceObjectsMap ) ;
00291 
00292 //    // send the synchronisation messages
00293 //    _distributedVirtualMachine->sendCurrentBuffersWithTag (PvmMessage::SynchronisationMessage) ;  
00294 
00295 //    // proceed with a relaxed synchronisation
00296 //    _distributedVirtualMachine->synchroniseReceiveAndProcessMessages (*this, PvmMessage::SynchronisationMessage) ; 
00297 
00298 
00299 //    //added by chadi to test if a new process has been added dynamically
00300 //    _distributedVirtualMachine->testIfNewProcessAdded () ;
00301 
00302 // }
00303 
00304 void PvmController::computeNextSimulationStep () {
00305 
00306 #ifdef _DEBUGEXECPVM
00307   cerr << "PvmController::computeNextSimulation : computing yet another simulation step for " 
00308     <<notreProcessus()<< endl;
00309 #endif
00310 
00311   // compute the simulation step
00312   DistributedController::computeNextSimulationStep () ;
00313 
00314   // compute the synchronisation messages
00315   makeSynchronisationMessage ( _referenceObjectsMap ) ;
00316 
00317   // send the synchronisation messages
00318   _distributedVirtualMachine->sendCurrentBuffersWithTag (PvmMessage::SynchronisationMessage) ;  
00319 
00320   // TDTD : pour un fonctionnement en synchro ou relaxed
00321   (this->*receiveAndProcessMessages) () ;
00322   //synchronizedReceiveAndProcessMessages () ;
00323   // TDTD : en cas de desynchro, il faut régler le pb du flush des messages inutiles... ::: à peu près réalisé...
00324 
00325   //added by chadi to test if a new process has been added dynamically
00326   // TDTD : à remettre plus tard
00327   //_distributedVirtualMachine->testIfNewProcessAdded () ;
00328   /*
00329   if (testIfTroubleOccured())
00330   {
00331   bool tico = _distributedVirtualMachine->reconnexionEstablished() ;
00332   }
00333   */
00334 
00335   //    {
00336   //     cout<<"bien bien bien"<<endl;
00337   // makeSynchronisationMessage ( _referenceObjectsMap ) ;       
00338   // _distributedVirtualMachine->sendCurrentBuffersWithTag (PvmMessage::SynchronisationMessage) ;  
00339   //   }
00340 
00341   //  if (testIfTroubleOccured())
00342   //        {
00343   //     _distributedVirtualMachine->sendPingMessage() ;
00344   //        }
00345 
00346 
00347 
00348   //  if ( testIfTroubleOccured() )
00349   //        {
00350   //     NameToPointerMap<ReferenceObjectHandle>::iterator pTabRef ;
00351   //     for (pTabRef = _referenceObjectsMap.begin () ;
00352   //          pTabRef != _referenceObjectsMap.end () ;
00353   //          pTabRef ++) 
00354   //       {
00355 
00356   //         PvmReferenceObjectHandle * pvmObjectHandle = dynamic_cast<PvmReferenceObjectHandle *> ( (*pTabRef).second ) ; 
00357   //         if ( pvmObjectHandle != NULL )
00358   //           {
00359   //             pvmObjectHandle->flushDisconnectedSitesOutgoingBuffer() ;
00360   //           }
00361 
00362   //       }
00363 
00364   //        }
00365 
00366 }
00367 
00368 //------------------------------------------------------------------------------
00369 
00370 void PvmController::synchronizedReceiveAndProcessMessages (void) {
00371   _distributedVirtualMachine->synchroniseReceiveAndProcessMessages (*this, PvmMessage::SynchronisationMessage) ;
00372 }
00373 
00374 //------------------------------------------------------------------------------
00375 
00376 void PvmController::relaxedReceiveAndProcessMessages (void) {
00377   _distributedVirtualMachine->relaxedSynchroniseReceiveAndProcessMessages (*this, PvmMessage::SynchronisationMessage) ;
00378 }
00379 
00380 //------------------------------------------------------------------------------
00381 
00382 void PvmController::makeSynchronisationMessage (NameToPointerMap<ReferenceObjectHandle> & referenceObjects ) 
00383 {
00384   static Name endMessage ("_OpenMASKEndOfSynchronisationMessage") ;
00385 
00386 #ifdef _DEBUGPVMMESS
00387   cerr<<"PvmController::makeSynchronisationMessage for "<<referenceObjects.size() <<" objects "<<endl;
00388 #endif
00389 
00390   NameToPointerMap<ReferenceObjectHandle>::iterator pTabRef ;
00391   // ask all referentials to contribute to the synchronisation message
00392   for (pTabRef = referenceObjects.begin () ;
00393     pTabRef != referenceObjects.end () ;
00394     pTabRef ++) 
00395   {
00396 #ifdef _DEBUGPVMMESS
00397     cerr<<"PvmController::makeSynchronisationMessage for object " <<(*pTabRef).second->getSimulatedObject().getName()<<endl;
00398 #endif
00399 
00400     PvmReferenceObjectHandle * pvmObjectHandle = dynamic_cast<PvmReferenceObjectHandle *> ( (*pTabRef).second ) ; 
00401     if ( pvmObjectHandle != NULL )
00402     {
00403       pvmObjectHandle->makeSynchronisationMessage ( _date ) ;
00404     }
00405     else
00406     {
00407 #ifdef _DEBUGPVMMESS
00408       cerr<<"No synchronisation message for "<<(*pTabRef).second->getSimulatedObject().getName()<<endl;
00409 #endif      
00410     }
00411 #ifdef _DEBUGPVMMESS
00412     cerr<<"PvmController::makeSynchronisationMessage of " <<(*pTabRef).second->getSimulatedObject().getName()<<" done"<<endl;
00413 #endif
00414   } 
00415 #ifdef _DEBUGPVMMESS
00416   cerr<<"PvmController::makeSynchronisationMessage done"<<endl;
00417 #endif
00418 }
00419 
00420 
00421 //------------------------------------------------------------------------------
00422 
00423 void PvmController::parseSynchronisationMessage (PvmIncomingMessage * message) 
00424 {
00425 
00426   ReferenceObjectHandle * refer ; // Referentiel de l'objet destinataire
00427   MirrorObjectHandle * miroir ; // Miroir de l'objet destinataire
00428   Name nomObj ; // Nom du miroir destinataire
00429   bool stillMessagesToParse = true ;
00430 
00431 #ifdef _DEBUGPVMMESS
00432   cerr << "PvmController::parseSynchronisationMessage at "<<getSimulatedDate()<<":"<< endl ;
00433 #endif
00434 
00435 
00436   while ( stillMessagesToParse ) 
00437   {
00438     // get the message recepient
00439     nomObj.unpack ( *message ) ;
00440 
00441 
00442 #ifdef _DEBUGPVMMESS
00443     cerr << "PvmController::parseSynchronisationMessage: message received for " << nomObj << endl;
00444 #endif
00445     if ( nomObj == SynchronisationMessage::endOfSynchronisationFragment )
00446       // the synchronisation message has been completely parsed
00447     {
00448       stillMessagesToParse = false ;
00449     }
00450     else 
00451     {
00452       MirrorObjectsContainerType::iterator i = _mirrorObjectsMap.find ( nomObj ) ;
00453       if (i != _mirrorObjectsMap.end ()) 
00454       {
00455         // On lui transmet le message
00456 #ifdef _DEBUGPVMMESS
00457         cout << "PvmController::parseSynchronisationMessage: for miror" << endl ;
00458 #endif
00459         miroir = i->second ;
00460         miroir->unpack ( *message ) ;
00461       } 
00462       else if (_referenceObjectsMap.find (nomObj) != _referenceObjectsMap.end ()) 
00463       {
00464         // On lui transmet le message
00465 #ifdef _DEBUGPVMMESS
00466         std::cout << "PvmController::parseSynchronisationMessage : for referential : " << nomObj <<std::endl ;
00467         //std::cout << "message : " << *message << std::endl ;
00468 #endif
00469         refer = dynamic_cast<PvmReferenceObjectHandle *>(_referenceObjectsMap.getObjectOfIndex (nomObj) ) ;
00470         if ( refer != NULL )
00471         {
00472           refer->unpack (*message)  ;
00473         }
00474         else
00475         {
00476           dynamic_cast<PvmDuplicatedObjectHandle *>(_referenceObjectsMap.getObjectOfIndex (nomObj) )->unpack ( *message ) ;
00477         }
00478       } 
00479       else if (nomObj == "") 
00480       {
00481 #ifdef _DEBUGPVMMESS
00482         cout << "PvmController::parseSynchronisationMessage :CL : message vide d'abonnement ... pour synchro" << endl ;
00483 #endif
00484       } 
00485       else 
00486       {
00487 #ifdef _DEBUGPVMMESS
00488         cerr << "PvmController::parseSynchronisationMessage: "<< endl ;
00489         cerr << "for unknown object: <" << nomObj << "> (was probably destroyed)" << endl ;
00490         error ("PvmController::parseSynchronisationMessage : Objet ni miroir ni referentiel") ;
00491 #endif
00492         //probably a request for an object that was destroyed
00493         int typeMess ;
00494         *message >> typeMess ;
00495         // hope it is a registration for a just destroyed referential, and interprete it
00496         if ( typeMess == Registration )
00497         {
00498           Name processName ;
00499           *message >> processName ;     
00500         }
00501         else if ( typeMess == EventReceived )
00502         {
00503           Event * event ;
00504           Name classToCreate ;
00505           EventIdentifier eventId;
00506           Date eventDate;
00507           Name sender, receiver ;
00508           *message >> classToCreate >> eventId>> eventDate>>sender>>receiver ;
00509           event = EventCreator::createEvent (classToCreate, eventId, eventDate, sender, receiver ) ;
00510           event->unpack ( *message ) ;
00511           //cerr << "PvmController::parseSynchronisationMessage event "<<*event<<" undelivered"<<endl;
00512           delete event ;
00513         }
00514       }
00515     }
00516   }
00517 #ifdef _DEBUGPVMMESS
00518   cerr << "PvmController::parseSynchronisationMessage : message parsed"<< endl ;
00519 #endif
00520 }
00521 
00522 
00523 
00524 
00525 
00526 
00527 //------------------------------------------------------------------------------
00528 
00529 void PvmController::init () 
00530 {
00531 
00532   if ( pvm_parent() != PvmNoParent )
00533   {
00534     //on tente l'initialisation normale : pour chaque init, cette initialisation appelle initObjetReferentiel 
00535 #ifdef _DEBUGDISTRIBUTEDINIT
00536     cerr<<"PvmController::init preparing the outgoing message buffers "<<endl;
00537 #endif   
00538     _distributedVirtualMachine->timestampCurrentSendBuffers ( _date ) ;
00539 
00540     // try a classic initialisation
00541     Controller::init () ;
00542 
00543     //initialisation shouldn't fail
00544     assert ( tableDesNonInitialises.empty () ) ;
00545 
00546     //here, as advanceSimulatedDate has been called to restore the initial simulation date, no need to timestamp messages
00547 
00548 #ifdef _DEBUGDISTRIBUTEDINIT
00549     cerr<<"PvmController::init : classic initialisation tried"<<endl;
00550 #endif
00551     // send all registrations
00552     _distributedVirtualMachine->sendCurrentBuffersWithTag (PvmMessage::SynchronisationMessage) ;
00553 
00554     // broadcast a empty message signalling our initialisation is finished, and wait for all processes
00555     _distributedVirtualMachine->synchronizeOn ( *this, PvmMessage::LocalInitSuccessfull ) ;
00556 
00557 
00558     //restore state to the supposed state resulting of a call to the ancestor init
00559     _distributedVirtualMachine->timestampCurrentSendBuffers (_date) ;
00560   }
00561 
00562 }
00563 
00564 //------------------------------------------------------------------------------
00565 
00566 int PvmController::getOutputHistorySize(void) {
00567   // Taille d'une file d'une output pour un controleur local
00568   // nbMinor * 4 --> pour guarantir la recuperation d'au plus 4 valeurs dans la file
00569   // 1 ?? surete ??
00570   int nbrValeurPendantLatence = 1 ;//la latence minimal est un pas de simulation au sens controleur
00571   nbrValeurPendantLatence+=_distributedVirtualMachine->getSynchronisationLatency()/_stepPeriod;
00572   return DistributedController::getOutputHistorySize()+nbrValeurPendantLatence;
00573 }  
00574 
00575 
00576 
00577 //------------------------------------------------------------------------------
00578 
00579 ObjectHandle * 
00580 PvmController::removeObjectFromDataStructures(const Name & nom) 
00581 {
00582 
00583   ObjectHandle * result = NULL ;
00584 
00585   if (_referenceObjectsMap.find (nom) != _referenceObjectsMap.end ()) 
00586   {
00587     result = Controller::removeObjectFromDataStructures(nom);
00588   } 
00589   else 
00590 
00591   {
00592     MirrorObjectsContainerType::iterator i = _mirrorObjectsMap.find (nom) ;
00593     if (i != _mirrorObjectsMap.end ()) 
00594     {
00595       MirrorObjectHandle * miroir = i->second ;
00596       _mirrorObjectsMap.erase( i );
00597       result = miroir;
00598     }
00599     else if (_duplicatedObjectsMap.find (nom) != _duplicatedObjectsMap.end()) 
00600     {
00601       //a duplicated object is a referential present in the duplicated objects table
00602       _duplicatedObjectsMap.erase(nom);
00603     }  
00604   } 
00605 
00606   return result ; 
00607 }
00608 
00609 
00610 //------------------------------------------------------------------------------
00611 
00612 MirrorObjectHandle * PvmController::newOMKMirrorObjectHandle (SimulatedObject & obj) 
00613 {
00614   return new PvmMirrorObjectHandle(obj);
00615 }
00616 
00617 
00618 DuplicatedObjectHandle * PvmController::newOMKDuplicatedObjectHandle (SimulatedObject & obj) 
00619 {
00620   return new PvmDuplicatedObjectHandle( obj, *this );
00621 }
00622 
00623 
00624 //---------chadi ajout 2/2 ------
00625 LocalObjectHandle * PvmController::newOMKLocalObjectHandle (SimulatedObject & obj) 
00626 {
00627   return new PvmLocalObjectHandle( obj, *this );
00628 }
00629 
00630 //-------------- --------
00631 vector<Name> PvmController::getIsolatedMirrorList()
00632 {
00633 
00634   for ( MirrorObjectsContainerType::iterator i = _mirrorObjectsMap.begin(); i!= _mirrorObjectsMap.end(); i++ )
00635   {
00636     Name referentialProcess = i->second->getSimulatedObject().getObjectDescriptor().getProcess();
00637     if ( _distributedVirtualMachine->disconnectedProcess( referentialProcess ) )
00638     {
00639       _publicIsolatedMirrorObjectsVector.push_back( i->first) ;
00640     }
00641   }
00642 
00643   return  _publicIsolatedMirrorObjectsVector;
00644 }
00645 //********************************************************
00646 vector<Name> PvmController::getNonIsolatedMirrorList()
00647 {
00648 
00649   if (_distributedVirtualMachine->disconnectedTableIsEmpty())
00650   {
00651     _publicIsolatedMirrorObjectsVector.clear();
00652     return _publicIsolatedMirrorObjectsVector;
00653   }
00654   else
00655   {
00656 
00657     for ( MirrorObjectsContainerType::iterator i = _mirrorObjectsMap.begin(); i!= _mirrorObjectsMap.end(); i++ )
00658     {
00659       Name referentialProcess = i->second->getSimulatedObject().getObjectDescriptor().getProcess();
00660       if ( _distributedVirtualMachine->disconnectedProcess( referentialProcess ) )
00661       {
00662 
00663       }
00664       else
00665       {
00666         _publicNonIsolatedMirrorObjectsVector.push_back( i->first) ;
00667       }
00668     }
00669 
00670     return  _publicNonIsolatedMirrorObjectsVector;
00671   }
00672 }
00673 //**********************************************************
00674 
00675 bool PvmController::testIfTroubleOccured ()
00676 {
00677   //  if (_distributedVirtualMachine->getTroubleFlag() )
00678   if (_distributedVirtualMachine->disconnectedTableIsEmpty() )
00679   {
00680     return false;
00681   }
00682   else
00683     return true ;
00684   // dans le cas ou il y a plusieurs sites > 2 , si 2 sites sont deconnectes puis un des 2 s'est retabli alors
00685   // le troubleFlag est Faux par contre on a tjrs un site deconnecte donc le trouble existe encore ....
00686   /*  else if (_distributedVirtualMachine->disconnectedTableIsEmpty() )
00687   {
00688   return false;
00689   }
00690   return true;
00691   */
00692 }
00693 //*********************************************************
00694 void PvmController::createNewSite ( const Date & date)
00695 {
00696   _distributedVirtualMachine->addNewSiteRequest (date);
00697 }
00698 
00699 
00700 //****************** fin ***********************************
00701 
00702 
00703 ReferenceObjectHandle * PvmController::newOMKReferenceObjectHandle(SimulatedObject & object, 
00704                                                                          Controller & controller,
00705                                                                          SignalDispatcher * signalDispatcher)
00706 {
00707   return new PvmReferenceObjectHandle(object, controller, signalDispatcher );
00708 }
00709 
00710 Svm * PvmController::getDistributedVirtualMachine ()
00711 {
00712   return _distributedVirtualMachine;
00713 }
00714 
00715 //------------------------------------------------------------------------------
00716 
00717 
00718 void PvmController::sendInitialValuesToMirror(PvmIncomingMessage & message)
00719 {
00720   if ( ! _finishing )
00721   {
00722     // first find out the name of the objects whose initial values are needed
00723     Name referentialName ;
00724     referentialName.unpack ( message ) ;
00725 
00726 
00727 #ifdef _DEBUGEXECPVM
00728     cerr<<"Looking for initial values of "<<referentialName<<endl;
00729 #endif
00730 
00731     //then find the corresponding referential
00732     NameToPointerMap<ReferenceObjectHandle>::iterator i = _referenceObjectsMap.find ( referentialName ) ;
00733 
00734     PvmReferenceObjectHandle * referential = NULL ;
00735     if ( i != _referenceObjectsMap.end() ) 
00736     {
00737       referential = dynamic_cast<PvmReferenceObjectHandle *> (i->second) ;
00738     }
00739     else
00740     {
00741       //the referential might have been destroyed: look for it in the list of deleted object handles
00742       list <pair <Date, ObjectHandle *> >::iterator i = _deletedObjectHandles.begin() ;
00743       while ( i != _deletedObjectHandles.end() )
00744       {
00745         if ( i->second->getSimulatedObject().getName() == referentialName )
00746         {
00747           referential = dynamic_cast<PvmReferenceObjectHandle *> ( i->second );
00748           i = _deletedObjectHandles.end() ;
00749         }
00750         else
00751         {
00752           ++ i ;
00753         }
00754       }
00755     }
00756 
00757     assert ( referential != NULL ) ;
00758 
00759 
00760     //find the name of the requesting process
00761     Name mirrorProcessName ;
00762     mirrorProcessName.unpack( message ) ;
00763 
00764     //then find the link to the corresponding process
00765     SvmLink * process = getDistributedVirtualMachine()->getLinkToProcessNamed ( mirrorProcessName ) ;
00766 
00767     assert ( process != NULL ) ;
00768 
00769     //build the answer
00770     PvmUnicastMessage * urgentAnswer = new PvmUnicastMessage ( process->getTID() ) ;
00771 
00772     urgentAnswer->insertTimeStamp( _date ) ;
00773 
00774     referential->packInitialValues ( * urgentAnswer ) ;
00775 
00776     SynchronisationMessage::endOfSynchronisationFragment.pack ( * urgentAnswer ) ;  
00777 
00778     // and send it
00779     urgentAnswer->send ( PvmMessage::InitialValuesForMirror ) ;
00780   }
00781 }
00782 
00783 
00784 void PvmController::waitForAnswerToBlockingRequest (PvmMessage::MessageTag tag)
00785 {
00786   if ( ! _finishing )
00787     // avoid blocking the controller while terminating, because other controllers could have exited
00788   {
00789     _distributedVirtualMachine->waitForAnswerToBlockingRequest ( *this, tag ) ;      
00790   }
00791 }
00792 
00793 void PvmController::finish()
00794 {
00795   _finishing = true ;
00796   DistributedController::finish() ;
00797 }
00798 
00799 
00800 Date PvmController::getPurgeDate () 
00801 {
00802   //in the worst case we could reveive message from an object whose simulation date is current date - _latency -stepPeriod ;
00803   // make sure by doubling latency
00804   return DistributedController::getPurgeDate() - 2 * _distributedVirtualMachine->getSynchronisationLatency() ;
00805 }
00806 
00807 
00808 std::list<Name> PvmController::getDisconnectedProcessusList () {
00809   return _distributedVirtualMachine->getDisconnectedProcessusList () ;
00810 }
00811 
00812 std::map<Name, int> PvmController::getDisconnectedProcessusMap () {
00813   return _distributedVirtualMachine->getDisconnectedProcessusMap () ;
00814 }
00815 
00816 
00817 
00818 
00819 
00820 
00821 
00822 

logo OpenMask

Documentation generated on Mon Jun 9 11:45:57 2008

Generated with doxygen by Dimitri van Heesch ,   1997-2007