OMKPvmReferenceObjectHandle.cxx

Go to the documentation of this file.
00001 /*
00002  * This file is part of openMask © INRIA, CNRS, Universite de Rennes 1 1993-2002, thereinafter the Software
00003  * 
00004  * The Software has been developped within the Siames Project. 
00005  * INRIA, the University of Rennes 1 and CNRS jointly hold intellectual property rights
00006  * 
00007  * The Software has been registered with the Agence pour la Protection des
00008  * Programmes (APP) under registration number IDDN.FR.001.510008.00.S.P.2001.000.41200
00009  *  
00010  * This file may be distributed under the terms of the Q Public License
00011  * version 1.0 as defined by Trolltech AS of Norway and appearing in the file
00012  * LICENSE.QPL included in the packaging of this file.
00013  *
00014  * Licensees holding valid specific licenses issued by INRIA, CNRS or Université de Rennes 1 
00015  * for the software may use this file in accordance with that specific license
00016  *
00017  */
00018 #include "OMKTracer.h"
00019 #include <cassert>
00020 #include <OMKPvmReferenceObjectHandle.h>
00021 #include <OMKSimulatedObject.h>
00022 #include <OMKPvmMessage.h>
00023 #include <OMKObjectDescriptor.h>
00024 #include <OMKSvmLink.h>
00025 #include <OMKProcess.h>
00026 #include <OMKController.h>
00027 //#include <OMKNameServer.h>
00028 
00029 #include <OMKPvmController.h>
00030 #include "OMKSvm.h"
00031 #include "OMKEventListener.h"
00032 #include "OMKIncomingSynchronisationMessage.h"
00033 
00034 using namespace std ;
00035 using namespace OMK ;
00036 
00037 //-----------------------------------------------------------------------------
00038 
00039 PvmReferenceObjectHandle::PvmReferenceObjectHandle (SimulatedObject & object, 
00040                                                           Controller & controller)
00041   : ReferenceObjectHandle ( object, controller),
00043     morphosisPhase ( false )
00044 {
00045 }
00046 
00047 PvmReferenceObjectHandle::PvmReferenceObjectHandle (SimulatedObject & object, 
00048                                                           Controller & controller, 
00049                                                           SignalDispatcher * signalDispatcher)
00050   : ReferenceObjectHandle ( object, controller, signalDispatcher),
00052     morphosisPhase ( false )
00053 {
00054 }
00055 
00056 //-----------------------------------------------------------------------------
00057 
00058 PvmReferenceObjectHandle::~PvmReferenceObjectHandle () 
00059 {
00060 }
00061 
00062 
00063 //-----------------------------------------------------------------------------
00064 
00065 void PvmReferenceObjectHandle::emettreValAttributs (PvmOutgoingMessage *messFD,
00066                                                        PvmOutgoingMessage *messEV) 
00067 {
00068 
00069 }
00070 
00071 
00072 void PvmReferenceObjectHandle::insertInStream (std::ostream & out) const 
00073 {
00074   Controller::error ("On utilise l'insert du referentiel") ;
00075 }
00076 
00077 //-----------------------------------------------------------------------------
00078 void PvmReferenceObjectHandle::unpack (IncomingSynchronisationMessage & in) 
00079 {
00080 #if defined (_DEBUGPVMMESS)
00081   cerr << "PvmReferenceObjectHandle::unpack"<<endl;
00082 #endif
00083   // Frequency max des refs du pss distant
00084   Frequency freqOMKsDist ;
00085   // Frequency de ce pss
00086   Frequency freqLocale ;
00087   // Frequency de communication
00088   Frequency  freqComm ;
00089 
00090   RequestType typeMess ;
00091   int messageType ;
00092 
00093   in >> messageType ;
00094   typeMess = static_cast <RequestType> (messageType) ;
00095 
00096 #if defined (_DEBUGPVMMESS)
00097   cerr << "PvmReferenceObjectHandle::unpack: received a message of type " 
00098        <<typeMess<<" "<<Registration
00099        <<" at date "<<getSimulatedObject().getSimulatedDate()
00100        <<endl;
00101 #endif
00102  
00103   if (typeMess == Registration) 
00104     {
00105       // name of the process sending the message
00106       Name processName ;
00107          
00108       processName.unpack ( in ) ;
00109 
00110 #ifdef _DEBUGPVMMESS
00111       cout << "PvmReferenceObjectHandle::unpack " << processName << " registering a mirror "<< endl ;
00112 #endif
00113       SvmLink * canal = dynamic_cast<PvmController &>(_controller).getDistributedVirtualMachine()->getLinkToProcessNamed (processName) ;
00114 
00115       Process * processDescriptor = dynamic_cast<PvmController &>(_controller).getDistributedVirtualMachine()->getProcessDescriptorNamed (processName) ;
00116          
00117       //the registering process needs a complete version of the outputs of the object in the next synchronisation message
00118       _needAllOutputValues.insert ( make_pair(processName, canal) ) ;
00119 
00120       //         // On recupere la frequency max des refs du pss distant
00121       //         freqOMKsDist = processDescriptor->getMaxFrequencyOfHostedObjects () ;
00122       //         // On recupere la frequency locale
00123       //         freqLocale = _myObject. getObjectDescriptor ().getFrequency () ;
00124       //         // On calcule la frequency de communication
00125       //  #ifdef _DEBUGPVMMESS
00126       //         cout << "freqLocale : " << freqLocale << endl ;
00127       //         cout << "freqOMKsDist : " << freqOMKsDist << endl ;
00128       //  #endif
00129       //         if (freqLocale > freqOMKsDist) 
00130       //            {
00131       //               freqComm = plusPetitSSMultSup(freqLocale,
00132       //                                              freqOMKsDist) ;
00133       //  #ifdef _DEBUGPVMMESS
00134       //               cout << "freqComm : " << freqComm << endl ;
00135       //  #endif
00136       //            } 
00137       //         else 
00138       //            {
00139       //               freqComm = freqLocale ;
00140       //  #ifdef _DEBUGPVMMESS
00141       //               cout << "freqComm  : " << freqComm << endl ;
00142       //  #endif
00143       //            }
00144       //         // On la stocke
00145       //         _processFrequencies.insert (ProcessFrequenciesType::value_type(processName, freqComm)) ;
00146     } 
00147   else if (typeMess == CancelRegistration) 
00148     {
00149      OMTRACEID("DEBUGPVM", "Cancel : " << "avant unpack"); 
00150       // name of the process sending the message
00151       Name processName ;
00152       processName.unpack ( in ) ;
00153       OMTRACEID("DEBUGPVM", "Cancel : " << "avant ProcessName is" << processName); 
00154 
00155       _needUpdateOutputValues.erase (processName) ; 
00156       //_processFrequencies.erase ( processName ) ;
00157       _needAllOutputValues.erase ( processName ) ;
00158     } 
00159   else if (typeMess == EventReceived) 
00160     {
00161 #if ( defined (_DEBUGPVMMESS) || defined (_DEBUGPVMEVENTS ) )
00162       cerr << "PvmReferenceObjectHandle::unpack: received an event relayed from an mirror" << endl ;
00163 #endif
00164       Event * event ;
00165       Name classToCreate ;
00166       EventIdentifier eventId;
00167       Date eventDate;
00168       Name sender, receiver ;
00169       in >> classToCreate >> eventId>> eventDate>>sender>>receiver ;
00170       event = EventCreator::createEvent (classToCreate, eventId, eventDate, sender, receiver ) ;
00171       event->unpack ( in ) ;
00172       ReferenceObjectHandle::receiveEvent ( event ) ;
00173     } 
00174   else if (typeMess == RegisterForSignal) 
00175     {
00176 #ifdef _DEBUGPVMMESS
00177       cout << "PvmReferenceObjectHandle::unpack: RegisterForSignal" << endl ;
00178 #endif
00179       Name sigId ;
00180       Name registrant ;
00181       Name eventId ;
00182       in >> sigId >> registrant >> eventId ;
00183       receiveRegistrationForSignal (sigId, registrant, eventId );
00184     } 
00185   else if (typeMess == CancelRegistrationForSignal) 
00186     {
00187 #ifdef _DEBUGPVMMESS
00188       cout << "PvmReferenceObjectHandle::unpack: CancelRegistrationForSignal" << endl ;
00189 #endif
00190       Name sigId ;
00191       Name registrant ;
00192       in >> sigId >> registrant ;
00193       cancelRegistrationForSignal (sigId, registrant) ;
00194     }
00195   else if (typeMess == SynchronisationMessage) // ajout TDTD pour essayer de voir ce qui ne va pas...
00196     {
00197        //if (morphosisPhase) { // TDTD il me semble pourtant que ce serait bien...
00198        //cerr<<"PvmReferenceObjectHandle::unpack : TDTD : SynchronisationMessage for" << getSimulatedObject ().getName () <<endl; ;
00199           _myObject.unpack ( in ) ;
00200           // semble poser des pbs pour la suppression d'objets locaux...
00201           //_myObject.unpackAllValues ( in ) ;
00202           //}
00203     }
00204   else 
00205     {
00206       cerr<<"PvmReferenceObjectHandle::unpack : unknown request"<<typeMess<<endl; ;
00207       cerr << "messageType : " << messageType << endl ;
00208           assert( false && "You should not be there..." ) ;
00209     }
00210 
00211 #if defined (_DEBUGPVMMESS)
00212   cerr << "PvmReferenceObjectHandle::unpack finished"<<typeMess <<endl;
00213 #endif
00214 }
00215    
00216 
00217 
00218 void PvmReferenceObjectHandle::extract( istream & in ) 
00219 {
00220 #if defined (_DEBUGPVMMESS)
00221   cerr << "PvmReferenceObjectHandle::extract:"<<endl;
00222 #endif
00223   // Type de message
00224   std::string typeMess ;
00225   // Nom du processus ou se trouve le miroir (cas de l'EV)
00226   Name processName ;
00227 
00228   in >> typeMess ;
00229   //cerr << "PvmReferenceObjectHandle::extract: received a message of type " <<typeMess<<endl;
00230   if( typeMess == "EV" ) 
00231   {
00232     int typeEv ;
00233     in >> typeEv ;
00234     cerr << "PvmReferenceObjectHandle::extract: received an event message of type " <<typeEv<<endl;
00235     if (typeEv == Registration) 
00236     {
00237       // Frequency max des refs du pss distant
00238       Frequency freqOMKsDist ;
00239       // Frequency de ce pss
00240       Frequency freqLocale ;
00241       // Frequency de communication
00242       Frequency  freqComm ;
00243 
00244       // On recupere le nom du processus associe
00245       in >> processName ;
00246 #ifdef _DEBUGPVMMESS
00247       cout << "PvmReferenceObjectHandle::extract "<<processName << " registred a mirror "<< endl ;
00248 #endif
00249       // On recupere le processus associe
00250       SvmLink * canal = dynamic_cast<PvmController &>(_controller).getDistributedVirtualMachine()->getLinkToProcessNamed (processName) ;
00251 
00252       Process * processDescriptor = dynamic_cast<PvmController &>(_controller).getDistributedVirtualMachine()->getProcessDescriptorNamed (processName) ;
00253 
00254       //the registering process needs a complete version of the outputs of the object in the next synchronisation message
00255       _needAllOutputValues.insert ( make_pair (processName,canal) ) ;
00256 
00257       //            // On recupere la frequency max des refs du pss distant
00258       //            freqOMKsDist = processDescriptor->getMaxFrequencyOfHostedObjects () ;
00259       //            // On recupere la frequency locale
00260       //            freqLocale = _myObject. getObjectDescriptor ().getFrequency () ;
00261       //            // On calcule la frequency de communication
00262       //  #ifdef _DEBUGPVMMESS
00263       //            cout << "freqLocale : " << freqLocale << endl ;
00264       //            cout << "freqOMKsDist : " << freqOMKsDist << endl ;
00265       //  #endif
00266       //            if (freqLocale > freqOMKsDist) {
00267       //               freqComm = plusPetitSSMultSup(freqLocale,
00268       //                                              freqOMKsDist) ;
00269       //  #ifdef _DEBUGPVMMESS
00270       //               cout << "freqComm : " << freqComm << endl ;
00271       //  #endif
00272       //            } else {
00273       //            freqComm = freqLocale ;
00274       //  #ifdef _DEBUGPVMMESS
00275       //            cout << "freqComm  : " << freqComm << endl ;
00276       //  #endif
00277       //            }
00278       //         // On la stocke
00279       //            _processFrequencies.insert(ProcessFrequenciesType::value_type(processName, freqComm)) ;
00280     } 
00281     else if (typeEv == CancelRegistration) {
00282       //cout << "On se desabonne" << endl ;
00283     } 
00284     else if (typeEv == EventReceived) {
00285       //#ifdef DEBUG
00286       cerr << "ALERTE PvmReferenceObjectHandle::extract : On recoit un evenement en provenance d'un miroir" << endl ;
00287       //#endif
00288       assert ( false ) ;
00289       //Event evt ( in ) ;
00290       //in >> evt ;
00291       //receiveReallyEvent (evt) ;
00292     } 
00293     else if (typeEv == RegisterForSignal) 
00294     {
00295 #ifdef DEBUG
00296       cout << "On recoit un signal en provenance d'un miroir" << endl ;
00297 #endif
00298       Name sigId ;
00299       Name abonne ;
00300       Name eventId ;
00301       in >> sigId >> abonne >> eventId ;
00302       receiveRegistrationForSignal (sigId, abonne, eventId );
00303     } 
00304     else {
00305       cerr<<"PvmReferenceObjectHandle::extract : Type d'evenement incorrect"<<endl ;
00306     }
00307   }
00308   else {
00309     cerr<<"PvmReferenceObjectHandle::extract : Type d'evenement incorrect" <<endl ;
00310   }
00311 }
00312 
00313 
00314 //-----------------------------------------------------------------------------
00316 
00317 void PvmReferenceObjectHandle::makeSynchronisationMessage( const Date & date ) 
00318 {
00320   if (!morphosisPhase)
00321   {
00323 #if defined (_DEBUGEXECPVM) || defined (_DEBUGPVMMESS) 
00324     cerr<<"PvmReferenceObjectHandle::makeSynchronisationMessage ( const Date & "<<date<< ")"<<endl;
00325 #endif
00326     PvmOutgoingMessage * message ;
00327 
00328     if ( _dateOfLastActivation == date )
00329       // only send a synchronisation message if the handled object has been activated
00330     {
00331       NameToPointerMap<SvmLink>::iterator pCanal ; // iterateur
00332       //         ProcessFrequenciesType::iterator pFreq ; // iterateur
00333       //         Date dtComm ;
00334       //         Frequency freqComm ;
00335 #ifdef _DEBUGPVMMESS
00336       cout<<"makeSynchronisationMessages of "<<getSimulatedObject().getName()<<endl;
00337 #endif
00338       // On initialise l'iterateur sur la frequency
00339       //pFreq = _processFrequencies.begin () ;
00340       //Pour tous les abonnes
00341       for (pCanal = _needUpdateOutputValues.begin () ;
00342         pCanal != _needUpdateOutputValues.end ();
00343         pCanal ++) 
00344       {
00345         // On calcule le pas de temps de la communication
00346         //             freqComm = (*pFreq).second ;
00347         //             dtComm = 1000 / freqComm ;
00348 #ifdef _DEBUGPVMMESS
00349         //             cout << "dtComm : " << dtComm << endl ;
00350 #endif
00351         // On verifie qu'il faut reellement emettre
00352         // devrait maintenant etre inutile a cause des Frames majeures et mineures
00353         //             if (_controller.getSimulatedDate () % dtComm == 0) 
00354         //                {
00355         // On recupere le message ou stocker les donnees
00356         message = &(*pCanal).second->getOutgoingBuffer () ;  
00357 #ifdef _DEBUGPVMMESS
00358         cout <<"PvmReferenceObjectHandle::makeSynchronisationMessage writing to the buffer: " <<message<< endl ;
00359 #endif
00360         // On va construire le message
00361         // On ajoute le destinataire, puis le type du message puis les outputs
00362         *message << _myObject.getName ()  
00363           << static_cast<int> (SynchronisationMessage) 
00364           <<  _myObject ;
00365 
00366 #ifdef _DEBUGDISTRIBUTEDINIT
00367         std::cerr<<"PvmReferenceObjectHandle::makeSynchronisationMessage : sending a message to ";
00368         (*pCanal).second->printDebuggingInformation();
00369 #endif
00370       }
00371       // On passe a la frequency suivante
00372       //        pFreq ++ ;
00373       //        }
00374     }
00375     else
00376     {
00377 #ifdef _DEBUGPVMMESS
00378       cerr <<"PvmReferenceObjectHandle::makeSynchronisationMessage : No synchronisation needed at " 
00379         << date 
00380         <<", last activation : "<<_dateOfLastActivation<<endl ;
00381 #endif
00382     }
00383     // send complete values to the processes needing a complete list of the values with their associated production date.
00384     // this is done after updating, because processes in that list are then automatically inserted in the list of process needing only updates
00385     for (std::map<Name, SvmLink *>::iterator i = _needAllOutputValues.begin() ;
00386       i != _needAllOutputValues.end() ;
00387       ++i)
00388     {
00389       packInitialValues ( (i->second)->getOutgoingBuffer () ) ; 
00390 
00391       _needUpdateOutputValues.insert ( *i ) ;
00392     }
00393     _needAllOutputValues.clear() ;
00394 #ifdef _DEBUGPVMMESS
00395     cerr <<"PvmReferenceObjectHandle::makeSynchronisationMessage end" <<endl ;
00396 #endif 
00397 
00398   }
00400 }
00401 
00402 //-----------------------------------------------------------------------------
00403 
00404 int PvmReferenceObjectHandle::plusPetitSSMultSup (const int A, const int B) {
00405   // Fournit le plus petit sous-multiple de A plus grand que B
00406   int i = B ; // Valeur testee
00407   // Valeur max au dela de laquelle pas ss-multiple
00408   int valMax = div (A, 2).quot ; 
00409   bool ssMult = false ; // i ss-multiple de A ???
00410 
00411 #ifdef DEBUG
00412   cout << "plusPetitSSMultSup***************************************" << endl ;
00413   cout << "A : " << A << ", B : " << B << endl ;
00414   cout << "valMax : " << valMax << endl ;
00415 #endif
00416 
00417   // On verifie si i ss-multiple de A
00418   ssMult = (div (A, i).rem == 0) ;
00419   while ((! ssMult) && (i <= valMax)) {
00420     i++ ;
00421     // On verifie si i ss-multiple de A
00422     ssMult = (div (A, i).rem == 0) ;
00423   }
00424   if (i > valMax) {
00425 #ifdef DEBUG
00426     cout << "res : " << A << endl ;
00427     cout << "Fin de plusPetitSSMultSup***************************************" << endl;
00428 #endif  
00429     return A ;
00430   } else {
00431 #ifdef DEBUG
00432     cout << "res : " << i << endl ;
00433     cout << "Fin de plusPetitSSMultSup***************************************" << endl;
00434 #endif  
00435     return i ;
00436   }
00437 }
00438 
00439 
00440 void PvmReferenceObjectHandle::packInitialValues ( PvmOutgoingMessage & message ) 
00441 {
00442   _myObject.getName ().pack ( message ) ; 
00443   message << static_cast<int> (InitialValuesMessage) ;
00444   _myObject.packAllValues ( message ) ;
00445 }
00446 
00449 void PvmReferenceObjectHandle::flushDisconnectedSitesOutgoingBuffer (  ) 
00450 {
00451 
00452   NameToPointerMap<SvmLink>::iterator pCanal ;
00453   for (pCanal = _needUpdateOutputValues.begin () ;
00454     pCanal != _needUpdateOutputValues.end ();
00455     pCanal ++) 
00456   {
00457     (*pCanal).second->getOutgoingBuffer().flushCurrentBuffer() ;  
00458   }
00459 }
00460 
00461 void PvmReferenceObjectHandle::setMorphosisPhaseTrue ()
00462 {
00463   morphosisPhase = true ;
00464 }
00465 

logo OpenMask

Documentation generated on Mon Jun 9 11:45:57 2008

Generated with doxygen by Dimitri van Heesch ,   1997-2007