00001 /* 00002 * This file is part of openMask © INRIA, CNRS, Universite de Rennes 1 1993-2002, thereinafter the Software 00003 * 00004 * The Software has been developped within the Siames Project. 00005 * INRIA, the University of Rennes 1 and CNRS jointly hold intellectual property rights 00006 * 00007 * The Software has been registered with the Agence pour la Protection des 00008 * Programmes (APP) under registration number IDDN.FR.001.510008.00.S.P.2001.000.41200 00009 * 00010 * This file may be distributed under the terms of the Q Public License 00011 * version 1.0 as defined by Trolltech AS of Norway and appearing in the file 00012 * LICENSE.QPL included in the packaging of this file. 00013 * 00014 * Licensees holding valid specific licenses issued by INRIA, CNRS or Université de Rennes 1 00015 * for the software may use this file in accordance with that specific license 00016 * 00017 */ 00018 #include "OMKTracer.h" 00019 #include <cassert> 00020 #include <OMKPvmReferenceObjectHandle.h> 00021 #include <OMKSimulatedObject.h> 00022 #include <OMKPvmMessage.h> 00023 #include <OMKObjectDescriptor.h> 00024 #include <OMKSvmLink.h> 00025 #include <OMKProcess.h> 00026 #include <OMKController.h> 00027 //#include <OMKNameServer.h> 00028 00029 #include <OMKPvmController.h> 00030 #include "OMKSvm.h" 00031 #include "OMKEventListener.h" 00032 #include "OMKIncomingSynchronisationMessage.h" 00033 00034 using namespace std ; 00035 using namespace OMK ; 00036 00037 //----------------------------------------------------------------------------- 00038 00039 PvmReferenceObjectHandle::PvmReferenceObjectHandle (SimulatedObject & object, 00040 Controller & controller) 00041 : ReferenceObjectHandle ( object, controller), 00043 morphosisPhase ( false ) 00044 { 00045 } 00046 00047 PvmReferenceObjectHandle::PvmReferenceObjectHandle (SimulatedObject & object, 00048 Controller & controller, 00049 SignalDispatcher * signalDispatcher) 00050 : ReferenceObjectHandle ( object, controller, signalDispatcher), 00052 morphosisPhase ( false ) 00053 { 00054 } 00055 00056 //----------------------------------------------------------------------------- 00057 00058 PvmReferenceObjectHandle::~PvmReferenceObjectHandle () 00059 { 00060 } 00061 00062 00063 //----------------------------------------------------------------------------- 00064 00065 void PvmReferenceObjectHandle::emettreValAttributs (PvmOutgoingMessage *messFD, 00066 PvmOutgoingMessage *messEV) 00067 { 00068 00069 } 00070 00071 00072 void PvmReferenceObjectHandle::insertInStream (std::ostream & out) const 00073 { 00074 Controller::error ("On utilise l'insert du referentiel") ; 00075 } 00076 00077 //----------------------------------------------------------------------------- 00078 void PvmReferenceObjectHandle::unpack (IncomingSynchronisationMessage & in) 00079 { 00080 #if defined (_DEBUGPVMMESS) 00081 cerr << "PvmReferenceObjectHandle::unpack"<<endl; 00082 #endif 00083 // Frequency max des refs du pss distant 00084 Frequency freqOMKsDist ; 00085 // Frequency de ce pss 00086 Frequency freqLocale ; 00087 // Frequency de communication 00088 Frequency freqComm ; 00089 00090 RequestType typeMess ; 00091 int messageType ; 00092 00093 in >> messageType ; 00094 typeMess = static_cast <RequestType> (messageType) ; 00095 00096 #if defined (_DEBUGPVMMESS) 00097 cerr << "PvmReferenceObjectHandle::unpack: received a message of type " 00098 <<typeMess<<" "<<Registration 00099 <<" at date "<<getSimulatedObject().getSimulatedDate() 00100 <<endl; 00101 #endif 00102 00103 if (typeMess == Registration) 00104 { 00105 // name of the process sending the message 00106 Name processName ; 00107 00108 processName.unpack ( in ) ; 00109 00110 #ifdef _DEBUGPVMMESS 00111 cout << "PvmReferenceObjectHandle::unpack " << processName << " registering a mirror "<< endl ; 00112 #endif 00113 SvmLink * canal = dynamic_cast<PvmController &>(_controller).getDistributedVirtualMachine()->getLinkToProcessNamed (processName) ; 00114 00115 Process * processDescriptor = dynamic_cast<PvmController &>(_controller).getDistributedVirtualMachine()->getProcessDescriptorNamed (processName) ; 00116 00117 //the registering process needs a complete version of the outputs of the object in the next synchronisation message 00118 _needAllOutputValues.insert ( make_pair(processName, canal) ) ; 00119 00120 // // On recupere la frequency max des refs du pss distant 00121 // freqOMKsDist = processDescriptor->getMaxFrequencyOfHostedObjects () ; 00122 // // On recupere la frequency locale 00123 // freqLocale = _myObject. getObjectDescriptor ().getFrequency () ; 00124 // // On calcule la frequency de communication 00125 // #ifdef _DEBUGPVMMESS 00126 // cout << "freqLocale : " << freqLocale << endl ; 00127 // cout << "freqOMKsDist : " << freqOMKsDist << endl ; 00128 // #endif 00129 // if (freqLocale > freqOMKsDist) 00130 // { 00131 // freqComm = plusPetitSSMultSup(freqLocale, 00132 // freqOMKsDist) ; 00133 // #ifdef _DEBUGPVMMESS 00134 // cout << "freqComm : " << freqComm << endl ; 00135 // #endif 00136 // } 00137 // else 00138 // { 00139 // freqComm = freqLocale ; 00140 // #ifdef _DEBUGPVMMESS 00141 // cout << "freqComm : " << freqComm << endl ; 00142 // #endif 00143 // } 00144 // // On la stocke 00145 // _processFrequencies.insert (ProcessFrequenciesType::value_type(processName, freqComm)) ; 00146 } 00147 else if (typeMess == CancelRegistration) 00148 { 00149 OMTRACEID("DEBUGPVM", "Cancel : " << "avant unpack"); 00150 // name of the process sending the message 00151 Name processName ; 00152 processName.unpack ( in ) ; 00153 OMTRACEID("DEBUGPVM", "Cancel : " << "avant ProcessName is" << processName); 00154 00155 _needUpdateOutputValues.erase (processName) ; 00156 //_processFrequencies.erase ( processName ) ; 00157 _needAllOutputValues.erase ( processName ) ; 00158 } 00159 else if (typeMess == EventReceived) 00160 { 00161 #if ( defined (_DEBUGPVMMESS) || defined (_DEBUGPVMEVENTS ) ) 00162 cerr << "PvmReferenceObjectHandle::unpack: received an event relayed from an mirror" << endl ; 00163 #endif 00164 Event * event ; 00165 Name classToCreate ; 00166 EventIdentifier eventId; 00167 Date eventDate; 00168 Name sender, receiver ; 00169 in >> classToCreate >> eventId>> eventDate>>sender>>receiver ; 00170 event = EventCreator::createEvent (classToCreate, eventId, eventDate, sender, receiver ) ; 00171 event->unpack ( in ) ; 00172 ReferenceObjectHandle::receiveEvent ( event ) ; 00173 } 00174 else if (typeMess == RegisterForSignal) 00175 { 00176 #ifdef _DEBUGPVMMESS 00177 cout << "PvmReferenceObjectHandle::unpack: RegisterForSignal" << endl ; 00178 #endif 00179 Name sigId ; 00180 Name registrant ; 00181 Name eventId ; 00182 in >> sigId >> registrant >> eventId ; 00183 receiveRegistrationForSignal (sigId, registrant, eventId ); 00184 } 00185 else if (typeMess == CancelRegistrationForSignal) 00186 { 00187 #ifdef _DEBUGPVMMESS 00188 cout << "PvmReferenceObjectHandle::unpack: CancelRegistrationForSignal" << endl ; 00189 #endif 00190 Name sigId ; 00191 Name registrant ; 00192 in >> sigId >> registrant ; 00193 cancelRegistrationForSignal (sigId, registrant) ; 00194 } 00195 else if (typeMess == SynchronisationMessage) // ajout TDTD pour essayer de voir ce qui ne va pas... 00196 { 00197 //if (morphosisPhase) { // TDTD il me semble pourtant que ce serait bien... 00198 //cerr<<"PvmReferenceObjectHandle::unpack : TDTD : SynchronisationMessage for" << getSimulatedObject ().getName () <<endl; ; 00199 _myObject.unpack ( in ) ; 00200 // semble poser des pbs pour la suppression d'objets locaux... 00201 //_myObject.unpackAllValues ( in ) ; 00202 //} 00203 } 00204 else 00205 { 00206 cerr<<"PvmReferenceObjectHandle::unpack : unknown request"<<typeMess<<endl; ; 00207 cerr << "messageType : " << messageType << endl ; 00208 assert( false && "You should not be there..." ) ; 00209 } 00210 00211 #if defined (_DEBUGPVMMESS) 00212 cerr << "PvmReferenceObjectHandle::unpack finished"<<typeMess <<endl; 00213 #endif 00214 } 00215 00216 00217 00218 void PvmReferenceObjectHandle::extract( istream & in ) 00219 { 00220 #if defined (_DEBUGPVMMESS) 00221 cerr << "PvmReferenceObjectHandle::extract:"<<endl; 00222 #endif 00223 // Type de message 00224 std::string typeMess ; 00225 // Nom du processus ou se trouve le miroir (cas de l'EV) 00226 Name processName ; 00227 00228 in >> typeMess ; 00229 //cerr << "PvmReferenceObjectHandle::extract: received a message of type " <<typeMess<<endl; 00230 if( typeMess == "EV" ) 00231 { 00232 int typeEv ; 00233 in >> typeEv ; 00234 cerr << "PvmReferenceObjectHandle::extract: received an event message of type " <<typeEv<<endl; 00235 if (typeEv == Registration) 00236 { 00237 // Frequency max des refs du pss distant 00238 Frequency freqOMKsDist ; 00239 // Frequency de ce pss 00240 Frequency freqLocale ; 00241 // Frequency de communication 00242 Frequency freqComm ; 00243 00244 // On recupere le nom du processus associe 00245 in >> processName ; 00246 #ifdef _DEBUGPVMMESS 00247 cout << "PvmReferenceObjectHandle::extract "<<processName << " registred a mirror "<< endl ; 00248 #endif 00249 // On recupere le processus associe 00250 SvmLink * canal = dynamic_cast<PvmController &>(_controller).getDistributedVirtualMachine()->getLinkToProcessNamed (processName) ; 00251 00252 Process * processDescriptor = dynamic_cast<PvmController &>(_controller).getDistributedVirtualMachine()->getProcessDescriptorNamed (processName) ; 00253 00254 //the registering process needs a complete version of the outputs of the object in the next synchronisation message 00255 _needAllOutputValues.insert ( make_pair (processName,canal) ) ; 00256 00257 // // On recupere la frequency max des refs du pss distant 00258 // freqOMKsDist = processDescriptor->getMaxFrequencyOfHostedObjects () ; 00259 // // On recupere la frequency locale 00260 // freqLocale = _myObject. getObjectDescriptor ().getFrequency () ; 00261 // // On calcule la frequency de communication 00262 // #ifdef _DEBUGPVMMESS 00263 // cout << "freqLocale : " << freqLocale << endl ; 00264 // cout << "freqOMKsDist : " << freqOMKsDist << endl ; 00265 // #endif 00266 // if (freqLocale > freqOMKsDist) { 00267 // freqComm = plusPetitSSMultSup(freqLocale, 00268 // freqOMKsDist) ; 00269 // #ifdef _DEBUGPVMMESS 00270 // cout << "freqComm : " << freqComm << endl ; 00271 // #endif 00272 // } else { 00273 // freqComm = freqLocale ; 00274 // #ifdef _DEBUGPVMMESS 00275 // cout << "freqComm : " << freqComm << endl ; 00276 // #endif 00277 // } 00278 // // On la stocke 00279 // _processFrequencies.insert(ProcessFrequenciesType::value_type(processName, freqComm)) ; 00280 } 00281 else if (typeEv == CancelRegistration) { 00282 //cout << "On se desabonne" << endl ; 00283 } 00284 else if (typeEv == EventReceived) { 00285 //#ifdef DEBUG 00286 cerr << "ALERTE PvmReferenceObjectHandle::extract : On recoit un evenement en provenance d'un miroir" << endl ; 00287 //#endif 00288 assert ( false ) ; 00289 //Event evt ( in ) ; 00290 //in >> evt ; 00291 //receiveReallyEvent (evt) ; 00292 } 00293 else if (typeEv == RegisterForSignal) 00294 { 00295 #ifdef DEBUG 00296 cout << "On recoit un signal en provenance d'un miroir" << endl ; 00297 #endif 00298 Name sigId ; 00299 Name abonne ; 00300 Name eventId ; 00301 in >> sigId >> abonne >> eventId ; 00302 receiveRegistrationForSignal (sigId, abonne, eventId ); 00303 } 00304 else { 00305 cerr<<"PvmReferenceObjectHandle::extract : Type d'evenement incorrect"<<endl ; 00306 } 00307 } 00308 else { 00309 cerr<<"PvmReferenceObjectHandle::extract : Type d'evenement incorrect" <<endl ; 00310 } 00311 } 00312 00313 00314 //----------------------------------------------------------------------------- 00316 00317 void PvmReferenceObjectHandle::makeSynchronisationMessage( const Date & date ) 00318 { 00320 if (!morphosisPhase) 00321 { 00323 #if defined (_DEBUGEXECPVM) || defined (_DEBUGPVMMESS) 00324 cerr<<"PvmReferenceObjectHandle::makeSynchronisationMessage ( const Date & "<<date<< ")"<<endl; 00325 #endif 00326 PvmOutgoingMessage * message ; 00327 00328 if ( _dateOfLastActivation == date ) 00329 // only send a synchronisation message if the handled object has been activated 00330 { 00331 NameToPointerMap<SvmLink>::iterator pCanal ; // iterateur 00332 // ProcessFrequenciesType::iterator pFreq ; // iterateur 00333 // Date dtComm ; 00334 // Frequency freqComm ; 00335 #ifdef _DEBUGPVMMESS 00336 cout<<"makeSynchronisationMessages of "<<getSimulatedObject().getName()<<endl; 00337 #endif 00338 // On initialise l'iterateur sur la frequency 00339 //pFreq = _processFrequencies.begin () ; 00340 //Pour tous les abonnes 00341 for (pCanal = _needUpdateOutputValues.begin () ; 00342 pCanal != _needUpdateOutputValues.end (); 00343 pCanal ++) 00344 { 00345 // On calcule le pas de temps de la communication 00346 // freqComm = (*pFreq).second ; 00347 // dtComm = 1000 / freqComm ; 00348 #ifdef _DEBUGPVMMESS 00349 // cout << "dtComm : " << dtComm << endl ; 00350 #endif 00351 // On verifie qu'il faut reellement emettre 00352 // devrait maintenant etre inutile a cause des Frames majeures et mineures 00353 // if (_controller.getSimulatedDate () % dtComm == 0) 00354 // { 00355 // On recupere le message ou stocker les donnees 00356 message = &(*pCanal).second->getOutgoingBuffer () ; 00357 #ifdef _DEBUGPVMMESS 00358 cout <<"PvmReferenceObjectHandle::makeSynchronisationMessage writing to the buffer: " <<message<< endl ; 00359 #endif 00360 // On va construire le message 00361 // On ajoute le destinataire, puis le type du message puis les outputs 00362 *message << _myObject.getName () 00363 << static_cast<int> (SynchronisationMessage) 00364 << _myObject ; 00365 00366 #ifdef _DEBUGDISTRIBUTEDINIT 00367 std::cerr<<"PvmReferenceObjectHandle::makeSynchronisationMessage : sending a message to "; 00368 (*pCanal).second->printDebuggingInformation(); 00369 #endif 00370 } 00371 // On passe a la frequency suivante 00372 // pFreq ++ ; 00373 // } 00374 } 00375 else 00376 { 00377 #ifdef _DEBUGPVMMESS 00378 cerr <<"PvmReferenceObjectHandle::makeSynchronisationMessage : No synchronisation needed at " 00379 << date 00380 <<", last activation : "<<_dateOfLastActivation<<endl ; 00381 #endif 00382 } 00383 // send complete values to the processes needing a complete list of the values with their associated production date. 00384 // this is done after updating, because processes in that list are then automatically inserted in the list of process needing only updates 00385 for (std::map<Name, SvmLink *>::iterator i = _needAllOutputValues.begin() ; 00386 i != _needAllOutputValues.end() ; 00387 ++i) 00388 { 00389 packInitialValues ( (i->second)->getOutgoingBuffer () ) ; 00390 00391 _needUpdateOutputValues.insert ( *i ) ; 00392 } 00393 _needAllOutputValues.clear() ; 00394 #ifdef _DEBUGPVMMESS 00395 cerr <<"PvmReferenceObjectHandle::makeSynchronisationMessage end" <<endl ; 00396 #endif 00397 00398 } 00400 } 00401 00402 //----------------------------------------------------------------------------- 00403 00404 int PvmReferenceObjectHandle::plusPetitSSMultSup (const int A, const int B) { 00405 // Fournit le plus petit sous-multiple de A plus grand que B 00406 int i = B ; // Valeur testee 00407 // Valeur max au dela de laquelle pas ss-multiple 00408 int valMax = div (A, 2).quot ; 00409 bool ssMult = false ; // i ss-multiple de A ??? 00410 00411 #ifdef DEBUG 00412 cout << "plusPetitSSMultSup***************************************" << endl ; 00413 cout << "A : " << A << ", B : " << B << endl ; 00414 cout << "valMax : " << valMax << endl ; 00415 #endif 00416 00417 // On verifie si i ss-multiple de A 00418 ssMult = (div (A, i).rem == 0) ; 00419 while ((! ssMult) && (i <= valMax)) { 00420 i++ ; 00421 // On verifie si i ss-multiple de A 00422 ssMult = (div (A, i).rem == 0) ; 00423 } 00424 if (i > valMax) { 00425 #ifdef DEBUG 00426 cout << "res : " << A << endl ; 00427 cout << "Fin de plusPetitSSMultSup***************************************" << endl; 00428 #endif 00429 return A ; 00430 } else { 00431 #ifdef DEBUG 00432 cout << "res : " << i << endl ; 00433 cout << "Fin de plusPetitSSMultSup***************************************" << endl; 00434 #endif 00435 return i ; 00436 } 00437 } 00438 00439 00440 void PvmReferenceObjectHandle::packInitialValues ( PvmOutgoingMessage & message ) 00441 { 00442 _myObject.getName ().pack ( message ) ; 00443 message << static_cast<int> (InitialValuesMessage) ; 00444 _myObject.packAllValues ( message ) ; 00445 } 00446 00449 void PvmReferenceObjectHandle::flushDisconnectedSitesOutgoingBuffer ( ) 00450 { 00451 00452 NameToPointerMap<SvmLink>::iterator pCanal ; 00453 for (pCanal = _needUpdateOutputValues.begin () ; 00454 pCanal != _needUpdateOutputValues.end (); 00455 pCanal ++) 00456 { 00457 (*pCanal).second->getOutgoingBuffer().flushCurrentBuffer() ; 00458 } 00459 } 00460 00461 void PvmReferenceObjectHandle::setMorphosisPhaseTrue () 00462 { 00463 morphosisPhase = true ; 00464 } 00465
Documentation generated on Mon Jun 9 11:45:57 2008 |
Generated with doxygen by Dimitri van Heesch , 1997-2007 |