#include <OMKPvmSvm.h>
Inheritance diagram for OMK::PvmSvm:
Public Member Functions | |
PvmSvm (NameToPointerMap< Process > *tab, const Date &latence, const Date &timeOut, const int deadReckoningInterval, const bool yieldNeeded, int argc, char *argv[]) | |
constructor | |
virtual | ~PvmSvm () |
destructor | |
Static Public Attributes | |
static int | pvmDataEncoding |
define the encoding used when sending and receiveing messages | |
static int | pvmSpawnFlags |
define the flags used when spawning a new task using PVM | |
Protected Member Functions | |
virtual void | joinSvmGroup (std::string &groupName) |
joinSvmGroup. | |
virtual void | groupBarrier (std::string groupName, int numberToJoin) |
block the calling process until numberToJoin processes of group groupName have called syncDistributedSites | |
virtual void | broadcastToGroup (std::string groupName, PvmMessage::MessageTag tag) |
broadcast an empty tagged message to all members of a broadcast group | |
virtual int | nonblockingReceive (PvmMessage::MessageTag tag) |
try and receive message in a non-blocking fashion the received messages are lost | |
virtual void | initBeforeMessagePacking () |
do any usefull initialisations before messages are packed | |
virtual std::pair< PvmMessage::MessageTag, int > | waitForAnyRequests (PvmIncomingMessage &receiveBuffer) |
wait for any message sent to this site. | |
virtual int | getSiteId () |
get the id in the svm of this site | |
virtual int | getParentSiteId () |
get the site id having spanned this process. | |
virtual void | addNewWorkstation (const Name &m) |
add a new machine to the virtual machine | |
virtual void | removeWorkstation (const Name &m) |
remove a work station from the virtual machine | |
virtual int | spawnProcess (Process *p) |
spawn a copy of this proc'ess on another site | |
virtual SvmLink * | createSvmLink (const int &d) |
create a link to a distant site | |
virtual bool | testIfSiteRecovered (Process *p) |
CHADI. | |
Protected Attributes | |
int | _argc |
remember the arguments to use when spawning: their number | |
char ** | _argv |
remember the arguments to use when spawning: their contents |
Definition at line 32 of file OMKPvmSvm.h.
PvmSvm::PvmSvm | ( | NameToPointerMap< Process > * | tab, | |
const Date & | latence, | |||
const Date & | timeOut, | |||
const int | deadReckoningInterval, | |||
const bool | yieldNeeded, | |||
int | argc, | |||
char * | argv[] | |||
) |
constructor
tab | a table containing the a map between a processName and the data structure definig a process | |
latence | the authorized synchronisation latency | |
argc | the number of arguments given at the command line | |
argv | the arguments given at the command line |
Definition at line 40 of file OMKPvmSvm.cxx.
References _argc, _argv, and OMK::Svm::_siteId.
00041 : Svm (tab, latence, timeOut, deadReckoningInterval, yieldNeeded), 00042 _argc ( argc)//, _argv ( argv ) // TDTD code indigent et scandaleux ? 00043 { 00044 // TDTD tentative d'initialisation du tableau de chaînes 00045 _argv = new char * [_argc + 1] ; 00046 for (int i = 0 ; i < _argc ; i ++) { 00047 _argv [i] = strdup (argv [i]) ; 00048 } 00049 _argv [_argc] = NULL ; 00050 00051 _siteId = pvm_mytid () ; 00052 00053 // added By Guillermo Andrade 00054 // this option allow to use direct connection between pvm tasks 00055 // using TCP instead of UDP packing message via pvmd-pvmd connection 00056 pvm_setopt(PvmRoute,PvmRouteDirect); 00057 00058 }
PvmSvm::~PvmSvm | ( | ) | [virtual] |
destructor
Definition at line 62 of file OMKPvmSvm.cxx.
00063 { 00064 // do not wait for output of spawned tasks to be written 00065 pvm_catchout (0) ; 00066 00067 // signal end of pvm to the pvm deamon 00068 pvm_exit () ; 00069 00070 }
void PvmSvm::joinSvmGroup | ( | std::string & | groupName | ) | [protected, virtual] |
joinSvmGroup.
join a group of processes in the siames virtual machine
groupName | the name of the group of processes to join. This parameter is not const, because of the underlying C compatibility layer |
Implements OMK::Svm.
Definition at line 72 of file OMKPvmSvm.cxx.
References OMASSERT.
00073 { 00074 int resul = pvm_joingroup ( const_cast<char *> (groupName.c_str()) ) ; 00075 if ( resul < 0 ) 00076 { 00077 cerr<<"PvmSvm::joinSvmGroup ERROR: "; 00078 switch ( resul ) 00079 { 00080 case PvmSysErr: 00081 cerr<<"pvmd was not started or has crashed."; 00082 break; 00083 case PvmBadParam: 00084 cerr<<"giving a NULL group name."; 00085 break; 00086 case PvmDupGroup: 00087 cerr<<"trying to join a group you are allready in."; 00088 break; 00089 default: 00090 cerr<<"unexpected error: "<<resul; 00091 } 00092 OMASSERT( false ) ; 00093 cerr<<endl; 00094 } 00095 }
void PvmSvm::groupBarrier | ( | std::string | groupName, | |
int | numberToJoin | |||
) | [protected, virtual] |
block the calling process until numberToJoin processes of group groupName have called syncDistributedSites
numberToJoin | number of processes to wait for | |
groupName | the name of the process group |
Implements OMK::Svm.
Definition at line 98 of file OMKPvmSvm.cxx.
References OMASSERT.
00099 { 00100 // cerr<<"PvmSvm::groupBarrier: Waiting for "<<_numberOfSlaves<<" in pvm group named _"<<groupName.c_str()<<"_"<<endl; 00101 int resul = pvm_barrier ( const_cast<char *> (groupName.c_str()), numberToJoin) ; 00102 //cerr<<"PvmSvm::groupBarrier: Waiting done"<<endl; 00103 if ( resul < 0 ) 00104 { 00105 cerr<<"PvmSvm::groupBarrier ERROR: "; 00106 switch ( resul ) 00107 { 00108 case PvmSysErr: 00109 cerr<<"pvmd was not started or has crashed."; 00110 break; 00111 case PvmBadParam: 00112 cerr<<"giving a numberToJoin < 1."; 00113 break; 00114 case PvmNoGroup: 00115 cerr<<"giving a non-existent group name."; 00116 break; 00117 case PvmNotInGroup: 00118 cerr<<"calling process is not in specified group."; 00119 break; 00120 default: 00121 cerr<<"unexpected error: "<<resul; 00122 } 00123 OMASSERT( false ) ; 00124 cerr<<endl; 00125 } 00126 }
void PvmSvm::broadcastToGroup | ( | std::string | groupName, | |
PvmMessage::MessageTag | tag | |||
) | [protected, virtual] |
broadcast an empty tagged message to all members of a broadcast group
Implements OMK::Svm.
Definition at line 130 of file OMKPvmSvm.cxx.
References OMASSERT.
00131 { 00132 int resul = pvm_bcast ( const_cast<char *> (groupName.c_str()), tag ) ; 00133 if ( resul < 0 ) 00134 { 00135 cerr<<"PvmSvm::broadcastToGroup ERROR: "; 00136 switch ( resul ) 00137 { 00138 case PvmSysErr: 00139 cerr<<"pvmd was not started or has crashed."; 00140 break; 00141 case PvmBadParam: 00142 cerr<<"giving a negative message tag."; 00143 break; 00144 case PvmNoGroup: 00145 cerr<<"giving a non-existent group name."; 00146 break; 00147 default: 00148 cerr<<"unexpected error: "<<resul; 00149 } 00150 OMASSERT( false ) ; 00151 cerr<<endl; 00152 } 00153 }
int PvmSvm::nonblockingReceive | ( | PvmMessage::MessageTag | tag | ) | [protected, virtual] |
try and receive message in a non-blocking fashion the received messages are lost
tag | the tag of the messages to receive |
Implements OMK::Svm.
Definition at line 177 of file OMKPvmSvm.cxx.
References OMASSERT.
00178 { 00179 #ifdef _DEBUGPVMMESS 00180 cerr<<"PvmSvm::nonblockingReceive ("<<tag<<")" <<endl; 00181 #endif 00182 int res = 0 ; 00183 int bufid = pvm_nrecv (-1, tag) ; 00184 while ( bufid > 0) 00185 { 00186 ++res ; 00187 bufid = pvm_nrecv (-1, tag) ; 00188 } 00189 if ( bufid < 0 ) 00190 { 00191 cerr<<"PvmSvm::nonblockingReceive ERROR: "; 00192 switch ( bufid ) 00193 { 00194 case PvmBadParam: 00195 cerr<<"giving a invalid encoding tid value or msgtag."; 00196 break; 00197 case PvmSysErr: 00198 cerr<<"pvmd not responding."; 00199 break; 00200 default: 00201 cerr<<"unexpected error: "<<bufid; 00202 } 00203 OMASSERT( false ) ; 00204 cerr<<endl; 00205 } 00206 return res ; 00207 }
void PvmSvm::initBeforeMessagePacking | ( | ) | [protected, virtual] |
do any usefull initialisations before messages are packed
Implements OMK::Svm.
Definition at line 155 of file OMKPvmSvm.cxx.
References OMASSERT, and pvmDataEncoding.
00156 { 00157 int resul = pvm_initsend ( PvmSvm::pvmDataEncoding ) ; 00158 if ( resul < 0 ) 00159 { 00160 cerr<<"PvmSvm::initBeforeMessagePacking ERROR: "; 00161 switch ( resul ) 00162 { 00163 case PvmBadParam: 00164 cerr<<"giving a invalid encoding value."; 00165 break; 00166 case PvmNoMem: 00167 cerr<<"Malloc has failed. There is not enough memory to create the buffer"; 00168 break; 00169 default: 00170 cerr<<"unexpected error: "<<resul; 00171 } 00172 OMASSERT( false ) ; 00173 cerr<<endl; 00174 } 00175 }
std::pair< PvmMessage::MessageTag, int > PvmSvm::waitForAnyRequests | ( | PvmIncomingMessage & | receiveBuffer | ) | [protected, virtual] |
wait for any message sent to this site.
This call is blocking
receiveBuffer | : the user buffer to use to receive any message a pair giving the tag of the received message and the siteId of the sender |
Implements OMK::Svm.
Definition at line 212 of file OMKPvmSvm.cxx.
References OMK::PvmIncomingMessage::initialise(), and OMASSERT.
00213 { 00214 std::pair<PvmMessage::MessageTag, int> result ; 00215 00216 int bufid = pvm_recv ( -1, -1 ) ; //receive anything 00217 00218 if ( bufid < 0 ) 00219 { 00220 cerr<<"Svm::waitForAnyRequests ERROR "; 00221 switch (bufid) 00222 { 00223 case PvmBadParam: 00224 cerr<<"giving an invalid tid value, or msgtag < -1"; 00225 break; 00226 case PvmSysErr: 00227 cerr<<"pvmd not responding"; 00228 break; 00229 default: 00230 cerr<<"unexpected error"; 00231 OMASSERT( false ) ; 00232 } 00233 cerr<<endl; 00234 } 00235 00236 int bytes ; 00237 // int senderSiteId ; 00238 int msgtag ; 00239 int info = pvm_bufinfo( bufid, &bytes , &msgtag , &result.second ); 00240 result.first = static_cast<PvmMessage::MessageTag>(msgtag) ; 00241 if ( info < 0 ) 00242 { 00243 cerr<<"Svm::waitForAnyRequests ERROR in pvm_bufinfo "; 00244 switch (info) 00245 { 00246 case PvmBadParam: 00247 cerr<<"invalid argument"; 00248 break; 00249 case PvmNoSuchBuf: 00250 cerr<<"specified buffer does not exist"; 00251 break; 00252 default: 00253 cerr<<"unexpected error"; 00254 OMASSERT( false ) ; 00255 } 00256 cerr<<endl; 00257 } 00258 00259 receiveBuffer.initialise ( bufid ) ; 00260 00261 return result ; 00262 }
int PvmSvm::getSiteId | ( | ) | [protected, virtual] |
get the id in the svm of this site
Implements OMK::Svm.
Definition at line 268 of file OMKPvmSvm.cxx.
References OMK::Svm::_siteId.
int PvmSvm::getParentSiteId | ( | ) | [protected, virtual] |
get the site id having spanned this process.
Implements OMK::Svm.
Definition at line 336 of file OMKPvmSvm.cxx.
References pvmDataEncoding.
00336 { 00337 int tidCtrlGlo = pvm_parent () ; 00338 // options PVM: 00339 //pvm_setopt (PvmSelfTraceTid, tidCtrlGlo) ; 00340 //pvm_setopt (PvmSelfTraceCode, 666) ; 00341 //pvm_setopt (PvmSelfOutputTid, tidCtrlGlo) ; 00342 //pvm_setopt (PvmSelfOutputCode, 667) ; 00343 //pvm_catchout (stdout) ; 00344 // Message au format XDR 00345 pvm_initsend (pvmDataEncoding) ; 00346 if (tidCtrlGlo == PvmNoParent ) 00347 { 00348 return 0 ; 00349 } 00350 else 00351 { 00352 assert ( tidCtrlGlo != 0 ) ; //for coherance 00353 return tidCtrlGlo ; 00354 } 00355 }
void PvmSvm::addNewWorkstation | ( | const Name & | m | ) | [protected, virtual] |
add a new machine to the virtual machine
Implements OMK::Svm.
Definition at line 275 of file OMKPvmSvm.cxx.
References OMK::Svm::_numberOfSlaves, OMK::Controller::error(), and OMK::Name::getCString().
00275 { 00276 char * mac [2] ; // Machine a ajouter 00277 int res ; // Resultat du add_host 00278 int infos [2] ; // Informations sur chaque host 00279 int tentative = 0 ; // Numero de la tentative 00280 int nbTentative = 40 ; // On essaie 40 fois 00281 00282 // here, const casting because pvm uses a C interface without const 00283 mac [0] = const_cast<char *> (m.getCString ()) ; 00284 mac [1] = (char *)NULL ; 00285 00286 //TDTD ajout d'une temporisation le 24/05/2006 pour atténuer les pbs lors d'ajouts de plusieurs process sur la même machine 00287 #ifdef _MSC_VER 00288 Sleep(1); 00289 #else 00290 sleep (1) ; 00291 #endif 00292 while (tentative < nbTentative) 00293 { 00294 cout << "Tentative number " << tentative+1 00295 << " for machine " << mac [0] <<endl; 00296 res = pvm_addhosts (mac, 1, infos) ; 00297 if (res < 1) { // On a une erreur 00298 tentative++; 00299 00300 if (infos[0] != PvmDupHost) 00301 { 00302 if (tentative >= nbTentative) 00303 { 00304 cerr << "Type d'erreur : " << res << endl; 00305 cerr << "Infos[0] : " << infos[0] << endl; 00306 cerr << "Machine : " << mac [0] << ", " << mac [1] << endl; 00307 Controller::error( "PvmSvm::addNewWorkstation: pb pour ajouter une machine"); 00308 } 00309 else 00310 { 00311 #ifdef _MSC_VER 00312 Sleep(5); 00313 #else 00314 sleep (5) ; 00315 #endif 00316 } 00317 } 00318 else 00319 { // workstation wasn't added because it is allready present ! 00320 // no need to try more 00321 tentative = nbTentative; 00322 } 00323 } 00324 else 00325 { // On n'a pas d'erreur, on arrete d'essayer 00326 tentative = nbTentative; 00327 ++_numberOfSlaves ; 00328 } 00329 } 00330 cout << "Machine " << mac [0] << " added" << endl ; 00331 //cout << "status of machine: "<<pvm_mstat(mac [0])<<endl; 00332 }
void PvmSvm::removeWorkstation | ( | const Name & | m | ) | [protected, virtual] |
remove a work station from the virtual machine
Implements OMK::Svm.
Definition at line 415 of file OMKPvmSvm.cxx.
References OMK::Name::getCString().
00416 { 00417 int res ; 00418 char * mac [2] ; 00419 mac [0]= (char *)(m.getCString ()) ; 00420 mac [1] = (char *)NULL ; 00421 00422 pvm_delhosts (mac, 1, &res) ; 00423 }
spawn a copy of this proc'ess on another site
Implements OMK::Svm.
Definition at line 359 of file OMKPvmSvm.cxx.
References _argc, _argv, OMK::Controller::error(), OMK::Name::getCString(), OMK::Process::getHostMachineName(), and pvmSpawnFlags.
00360 { 00361 assert ( _argv[_argc] == NULL ) ; 00362 // char * argv [5] ; 00363 int tidCtrlLoc ; 00364 // istrstream is (fConfig.getCString (), strlen (fConfig.getCString ())) ; 00365 // argv [0] = new char [256] ; 00366 // argv [1] = new char [256] ; 00367 // argv [2] = new char [256] ; 00368 // argv [3] = new char [256] ; 00369 // is >> argv [0] >> argv [1] >> argv [2] >> argv [3]; 00370 // argv [4] = (char *)NULL ; 00371 std::cerr << "PvmSvm::spawnProcess : " ; 00372 for ( int i = 0 ; i < _argc ; ++i) 00373 { 00374 std::cerr << " <" << _argv[i] << ">" ; 00375 } 00376 std::cerr << std::endl ; 00377 00378 // int res = pvm_spawn ("tutorialValgrind", 00379 // const_cast<char **>(&_argv[1]), 00380 // pvmSpawnFlags, 00381 // (char *)(p->getHostMachineName ().getCString ()), 00382 // 1, 00383 // &tidCtrlLoc) ; 00384 00385 int res = pvm_spawn (const_cast<char *>(_argv[0]), 00386 const_cast<char **>(&_argv[1]), 00387 pvmSpawnFlags, 00388 (char *)(p->getHostMachineName ().getCString ()), 00389 1, 00390 &tidCtrlLoc) ; 00391 if (res != 1) 00392 { 00393 cout << "Executable : " << _argv[0] << endl ; 00394 //p->Executable ().getCString () 00395 char ** argv = &_argv[1] ; 00396 for ( int i = 0 ; i < _argc-1 ; ++i) 00397 { 00398 cout << "_argv ["<<i<<"] : " << argv[i] << endl ; 00399 } 00400 cout << "on machine: " 00401 << p->getHostMachineName ().getCString () << endl ; 00402 cout << "status of machine: " 00403 <<pvm_mstat(const_cast<char *>(p->getHostMachineName (). 00404 getCString ()))<<endl; 00405 cout << "tidCtrlLoc: " << tidCtrlLoc << endl ; 00406 cout << "Number of tasks started : " << res << endl ; 00407 Controller::error ("PvmSvm::spawnProcess: spawn problem") ; 00408 } 00409 return (int)tidCtrlLoc ; 00410 }
create a link to a distant site
d | the site id of the distant site |
Implements OMK::Svm.
Definition at line 427 of file OMKPvmSvm.cxx.
00428 { 00429 PvmSvmLink * canal = new PvmSvmLink (d) ; 00430 return (SvmLink*)canal; 00431 }
CHADI.
Implements OMK::Svm.
Definition at line 436 of file OMKPvmSvm.cxx.
References OMK::Process::getHostMachineName(), OMK::Process::getSvmLink(), and OMK::SvmLink::getTID().
00437 { 00438 bool flag; 00439 // int mstat = pvm_mstat( const_cast<char *> (nom.getCString()) ); 00440 //int mstat = pvm_mstat( const_cast<char *> (p->getHostMachineName().getCString())); 00441 // cout << "status of machine: "<<pvm_mstat(const_cast<char *>(nom.getCString ()))<<endl; 00442 cout << "nom de la machine: "<<p->getHostMachineName()<<endl; 00443 cout<<"TID de truc "<<p->getSvmLink()->getTID()<<endl; 00444 int pstat = pvm_pstat ( p->getSvmLink()->getTID() ) ; 00445 00446 switch (pstat) 00447 { 00448 case PvmOk: 00449 flag = true ; 00450 cout<<"koukou"<<endl; 00451 break; 00452 case PvmSysErr: 00453 cout<<"putain de merde"<<endl; 00454 flag = false; 00455 break; 00456 case PvmNoTask: 00457 flag = false; 00458 break; 00459 default: 00460 flag = false ; 00461 cout<<"kiki"<<endl; 00462 } 00463 return flag; 00464 }
int PvmSvm::pvmDataEncoding [static] |
define the encoding used when sending and receiveing messages
Definition at line 117 of file OMKPvmSvm.h.
Referenced by OMK::PvmNameServer::getIdentifierAsFrom(), getParentSiteId(), OMK::PvmNameServer::getStringAssociatedToOnCentralServer(), initBeforeMessagePacking(), OMK::PvmNameServer::PvmNameServer(), OMK::PvmOutgoingMessage::PvmOutgoingMessage(), OMK::PvmOutgoingMessage::reinitAndRevertPvmContext(), OMK::Svm::serveNameRequestsUntilEnd(), and OMK::PvmCentralNameServer::verifyCompatibilityWithLocalNameServer().
int PvmSvm::pvmSpawnFlags [static] |
define the flags used when spawning a new task using PVM
Definition at line 121 of file OMKPvmSvm.h.
Referenced by spawnProcess().
int OMK::PvmSvm::_argc [protected] |
remember the arguments to use when spawning: their number
Definition at line 126 of file OMKPvmSvm.h.
Referenced by PvmSvm(), and spawnProcess().
char** OMK::PvmSvm::_argv [protected] |
remember the arguments to use when spawning: their contents
Definition at line 130 of file OMKPvmSvm.h.
Referenced by PvmSvm(), and spawnProcess().
Documentation generated on Mon Jun 9 11:46:03 2008 |
Generated with doxygen by Dimitri van Heesch , 1997-2007 |