00001 /* 00002 * This file is part of openMask © INRIA, CNRS, Universite de Rennes 1 1993-2002, thereinafter the Software 00003 * 00004 * The Software has been developped within the Siames Project. 00005 * INRIA, the University of Rennes 1 and CNRS jointly hold intellectual property rights 00006 * 00007 * The Software has been registered with the Agence pour la Protection des 00008 * Programmes (APP) under registration number IDDN.FR.001.510008.00.S.P.2001.000.41200 00009 * 00010 * This file may be distributed under the terms of the Q Public License 00011 * version 1.0 as defined by Trolltech AS of Norway and appearing in the file 00012 * LICENSE.QPL included in the packaging of this file. 00013 * 00014 * Licensees holding valid specific licenses issued by INRIA, CNRS or Université de Rennes 1 00015 * for the software may use this file in accordance with that specific license 00016 * 00017 */ 00018 00019 #ifdef _MSC_VER 00020 # include <windows.h> 00021 #else 00022 # include <unistd.h> 00023 #endif 00024 #include <cstring> 00025 00026 #include <OMKPvmSvm.h> 00027 00028 #include <pvm3.h> 00029 #include <OMKProcess.h> 00030 #include <OMKPvmSvmLink.h> 00031 #include <OMKPvmMessage.h> 00032 #include <OMKController.h> 00033 using namespace std ; 00034 using namespace OMK ; 00035 00036 int PvmSvm::pvmDataEncoding = PvmDataDefault ; 00037 int PvmSvm::pvmSpawnFlags = PvmTaskHost ; 00038 //-------------------------------------------------------------------- 00039 00040 PvmSvm::PvmSvm (NameToPointerMap<Process> * tab, const Date & latence, const Date & timeOut, const int deadReckoningInterval, const bool yieldNeeded, int argc, char * argv []) 00041 : Svm (tab, latence, timeOut, deadReckoningInterval, yieldNeeded), 00042 _argc ( argc)//, _argv ( argv ) // TDTD code indigent et scandaleux ? 00043 { 00044 // TDTD tentative d'initialisation du tableau de chaînes 00045 _argv = new char * [_argc + 1] ; 00046 for (int i = 0 ; i < _argc ; i ++) { 00047 _argv [i] = strdup (argv [i]) ; 00048 } 00049 _argv [_argc] = NULL ; 00050 00051 _siteId = pvm_mytid () ; 00052 00053 // added By Guillermo Andrade 00054 // this option allow to use direct connection between pvm tasks 00055 // using TCP instead of UDP packing message via pvmd-pvmd connection 00056 pvm_setopt(PvmRoute,PvmRouteDirect); 00057 00058 } 00059 00060 //-------------------------------------------------------------------- 00061 00062 PvmSvm::~PvmSvm () 00063 { 00064 // do not wait for output of spawned tasks to be written 00065 pvm_catchout (0) ; 00066 00067 // signal end of pvm to the pvm deamon 00068 pvm_exit () ; 00069 00070 } 00071 00072 void PvmSvm::joinSvmGroup ( std::string & groupName ) 00073 { 00074 int resul = pvm_joingroup ( const_cast<char *> (groupName.c_str()) ) ; 00075 if ( resul < 0 ) 00076 { 00077 cerr<<"PvmSvm::joinSvmGroup ERROR: "; 00078 switch ( resul ) 00079 { 00080 case PvmSysErr: 00081 cerr<<"pvmd was not started or has crashed."; 00082 break; 00083 case PvmBadParam: 00084 cerr<<"giving a NULL group name."; 00085 break; 00086 case PvmDupGroup: 00087 cerr<<"trying to join a group you are allready in."; 00088 break; 00089 default: 00090 cerr<<"unexpected error: "<<resul; 00091 } 00092 OMASSERT( false ) ; 00093 cerr<<endl; 00094 } 00095 } 00096 00097 00098 void PvmSvm::groupBarrier (std::string groupName, int numberToJoin) 00099 { 00100 // cerr<<"PvmSvm::groupBarrier: Waiting for "<<_numberOfSlaves<<" in pvm group named _"<<groupName.c_str()<<"_"<<endl; 00101 int resul = pvm_barrier ( const_cast<char *> (groupName.c_str()), numberToJoin) ; 00102 //cerr<<"PvmSvm::groupBarrier: Waiting done"<<endl; 00103 if ( resul < 0 ) 00104 { 00105 cerr<<"PvmSvm::groupBarrier ERROR: "; 00106 switch ( resul ) 00107 { 00108 case PvmSysErr: 00109 cerr<<"pvmd was not started or has crashed."; 00110 break; 00111 case PvmBadParam: 00112 cerr<<"giving a numberToJoin < 1."; 00113 break; 00114 case PvmNoGroup: 00115 cerr<<"giving a non-existent group name."; 00116 break; 00117 case PvmNotInGroup: 00118 cerr<<"calling process is not in specified group."; 00119 break; 00120 default: 00121 cerr<<"unexpected error: "<<resul; 00122 } 00123 OMASSERT( false ) ; 00124 cerr<<endl; 00125 } 00126 } 00127 00128 00129 00130 void PvmSvm::broadcastToGroup (std::string groupName, PvmMessage::MessageTag tag) 00131 { 00132 int resul = pvm_bcast ( const_cast<char *> (groupName.c_str()), tag ) ; 00133 if ( resul < 0 ) 00134 { 00135 cerr<<"PvmSvm::broadcastToGroup ERROR: "; 00136 switch ( resul ) 00137 { 00138 case PvmSysErr: 00139 cerr<<"pvmd was not started or has crashed."; 00140 break; 00141 case PvmBadParam: 00142 cerr<<"giving a negative message tag."; 00143 break; 00144 case PvmNoGroup: 00145 cerr<<"giving a non-existent group name."; 00146 break; 00147 default: 00148 cerr<<"unexpected error: "<<resul; 00149 } 00150 OMASSERT( false ) ; 00151 cerr<<endl; 00152 } 00153 } 00154 00155 void PvmSvm::initBeforeMessagePacking () 00156 { 00157 int resul = pvm_initsend ( PvmSvm::pvmDataEncoding ) ; 00158 if ( resul < 0 ) 00159 { 00160 cerr<<"PvmSvm::initBeforeMessagePacking ERROR: "; 00161 switch ( resul ) 00162 { 00163 case PvmBadParam: 00164 cerr<<"giving a invalid encoding value."; 00165 break; 00166 case PvmNoMem: 00167 cerr<<"Malloc has failed. There is not enough memory to create the buffer"; 00168 break; 00169 default: 00170 cerr<<"unexpected error: "<<resul; 00171 } 00172 OMASSERT( false ) ; 00173 cerr<<endl; 00174 } 00175 } 00176 00177 int PvmSvm::nonblockingReceive (PvmMessage::MessageTag tag) 00178 { 00179 #ifdef _DEBUGPVMMESS 00180 cerr<<"PvmSvm::nonblockingReceive ("<<tag<<")" <<endl; 00181 #endif 00182 int res = 0 ; 00183 int bufid = pvm_nrecv (-1, tag) ; 00184 while ( bufid > 0) 00185 { 00186 ++res ; 00187 bufid = pvm_nrecv (-1, tag) ; 00188 } 00189 if ( bufid < 0 ) 00190 { 00191 cerr<<"PvmSvm::nonblockingReceive ERROR: "; 00192 switch ( bufid ) 00193 { 00194 case PvmBadParam: 00195 cerr<<"giving a invalid encoding tid value or msgtag."; 00196 break; 00197 case PvmSysErr: 00198 cerr<<"pvmd not responding."; 00199 break; 00200 default: 00201 cerr<<"unexpected error: "<<bufid; 00202 } 00203 OMASSERT( false ) ; 00204 cerr<<endl; 00205 } 00206 return res ; 00207 } 00208 00209 00210 00211 std::pair<PvmMessage::MessageTag, int> 00212 PvmSvm::waitForAnyRequests (PvmIncomingMessage & receiveBuffer) 00213 { 00214 std::pair<PvmMessage::MessageTag, int> result ; 00215 00216 int bufid = pvm_recv ( -1, -1 ) ; //receive anything 00217 00218 if ( bufid < 0 ) 00219 { 00220 cerr<<"Svm::waitForAnyRequests ERROR "; 00221 switch (bufid) 00222 { 00223 case PvmBadParam: 00224 cerr<<"giving an invalid tid value, or msgtag < -1"; 00225 break; 00226 case PvmSysErr: 00227 cerr<<"pvmd not responding"; 00228 break; 00229 default: 00230 cerr<<"unexpected error"; 00231 OMASSERT( false ) ; 00232 } 00233 cerr<<endl; 00234 } 00235 00236 int bytes ; 00237 // int senderSiteId ; 00238 int msgtag ; 00239 int info = pvm_bufinfo( bufid, &bytes , &msgtag , &result.second ); 00240 result.first = static_cast<PvmMessage::MessageTag>(msgtag) ; 00241 if ( info < 0 ) 00242 { 00243 cerr<<"Svm::waitForAnyRequests ERROR in pvm_bufinfo "; 00244 switch (info) 00245 { 00246 case PvmBadParam: 00247 cerr<<"invalid argument"; 00248 break; 00249 case PvmNoSuchBuf: 00250 cerr<<"specified buffer does not exist"; 00251 break; 00252 default: 00253 cerr<<"unexpected error"; 00254 OMASSERT( false ) ; 00255 } 00256 cerr<<endl; 00257 } 00258 00259 receiveBuffer.initialise ( bufid ) ; 00260 00261 return result ; 00262 } 00263 00264 00265 00266 //-------------------------------------------------------------------- 00267 00268 int PvmSvm::getSiteId () { 00269 assert ( _siteId >= 0 ) ; 00270 return _siteId ; 00271 } 00272 00273 //-------------------------------------------------------------------- 00274 00275 void PvmSvm::addNewWorkstation (const Name & m) { 00276 char * mac [2] ; // Machine a ajouter 00277 int res ; // Resultat du add_host 00278 int infos [2] ; // Informations sur chaque host 00279 int tentative = 0 ; // Numero de la tentative 00280 int nbTentative = 40 ; // On essaie 40 fois 00281 00282 // here, const casting because pvm uses a C interface without const 00283 mac [0] = const_cast<char *> (m.getCString ()) ; 00284 mac [1] = (char *)NULL ; 00285 00286 //TDTD ajout d'une temporisation le 24/05/2006 pour atténuer les pbs lors d'ajouts de plusieurs process sur la même machine 00287 #ifdef _MSC_VER 00288 Sleep(1); 00289 #else 00290 sleep (1) ; 00291 #endif 00292 while (tentative < nbTentative) 00293 { 00294 cout << "Tentative number " << tentative+1 00295 << " for machine " << mac [0] <<endl; 00296 res = pvm_addhosts (mac, 1, infos) ; 00297 if (res < 1) { // On a une erreur 00298 tentative++; 00299 00300 if (infos[0] != PvmDupHost) 00301 { 00302 if (tentative >= nbTentative) 00303 { 00304 cerr << "Type d'erreur : " << res << endl; 00305 cerr << "Infos[0] : " << infos[0] << endl; 00306 cerr << "Machine : " << mac [0] << ", " << mac [1] << endl; 00307 Controller::error( "PvmSvm::addNewWorkstation: pb pour ajouter une machine"); 00308 } 00309 else 00310 { 00311 #ifdef _MSC_VER 00312 Sleep(5); 00313 #else 00314 sleep (5) ; 00315 #endif 00316 } 00317 } 00318 else 00319 { // workstation wasn't added because it is allready present ! 00320 // no need to try more 00321 tentative = nbTentative; 00322 } 00323 } 00324 else 00325 { // On n'a pas d'erreur, on arrete d'essayer 00326 tentative = nbTentative; 00327 ++_numberOfSlaves ; 00328 } 00329 } 00330 cout << "Machine " << mac [0] << " added" << endl ; 00331 //cout << "status of machine: "<<pvm_mstat(mac [0])<<endl; 00332 } 00333 00334 //-------------------------------------------------------------------- 00335 00336 int PvmSvm::getParentSiteId () { 00337 int tidCtrlGlo = pvm_parent () ; 00338 // options PVM: 00339 //pvm_setopt (PvmSelfTraceTid, tidCtrlGlo) ; 00340 //pvm_setopt (PvmSelfTraceCode, 666) ; 00341 //pvm_setopt (PvmSelfOutputTid, tidCtrlGlo) ; 00342 //pvm_setopt (PvmSelfOutputCode, 667) ; 00343 //pvm_catchout (stdout) ; 00344 // Message au format XDR 00345 pvm_initsend (pvmDataEncoding) ; 00346 if (tidCtrlGlo == PvmNoParent ) 00347 { 00348 return 0 ; 00349 } 00350 else 00351 { 00352 assert ( tidCtrlGlo != 0 ) ; //for coherance 00353 return tidCtrlGlo ; 00354 } 00355 } 00356 00357 //-------------------------------------------------------------------- 00358 00359 int PvmSvm::spawnProcess (Process * p) 00360 { 00361 assert ( _argv[_argc] == NULL ) ; 00362 // char * argv [5] ; 00363 int tidCtrlLoc ; 00364 // istrstream is (fConfig.getCString (), strlen (fConfig.getCString ())) ; 00365 // argv [0] = new char [256] ; 00366 // argv [1] = new char [256] ; 00367 // argv [2] = new char [256] ; 00368 // argv [3] = new char [256] ; 00369 // is >> argv [0] >> argv [1] >> argv [2] >> argv [3]; 00370 // argv [4] = (char *)NULL ; 00371 std::cerr << "PvmSvm::spawnProcess : " ; 00372 for ( int i = 0 ; i < _argc ; ++i) 00373 { 00374 std::cerr << " <" << _argv[i] << ">" ; 00375 } 00376 std::cerr << std::endl ; 00377 00378 // int res = pvm_spawn ("tutorialValgrind", 00379 // const_cast<char **>(&_argv[1]), 00380 // pvmSpawnFlags, 00381 // (char *)(p->getHostMachineName ().getCString ()), 00382 // 1, 00383 // &tidCtrlLoc) ; 00384 00385 int res = pvm_spawn (const_cast<char *>(_argv[0]), 00386 const_cast<char **>(&_argv[1]), 00387 pvmSpawnFlags, 00388 (char *)(p->getHostMachineName ().getCString ()), 00389 1, 00390 &tidCtrlLoc) ; 00391 if (res != 1) 00392 { 00393 cout << "Executable : " << _argv[0] << endl ; 00394 //p->Executable ().getCString () 00395 char ** argv = &_argv[1] ; 00396 for ( int i = 0 ; i < _argc-1 ; ++i) 00397 { 00398 cout << "_argv ["<<i<<"] : " << argv[i] << endl ; 00399 } 00400 cout << "on machine: " 00401 << p->getHostMachineName ().getCString () << endl ; 00402 cout << "status of machine: " 00403 <<pvm_mstat(const_cast<char *>(p->getHostMachineName (). 00404 getCString ()))<<endl; 00405 cout << "tidCtrlLoc: " << tidCtrlLoc << endl ; 00406 cout << "Number of tasks started : " << res << endl ; 00407 Controller::error ("PvmSvm::spawnProcess: spawn problem") ; 00408 } 00409 return (int)tidCtrlLoc ; 00410 } 00411 00412 00413 //-------------------------------------------------------------------- 00414 00415 void PvmSvm::removeWorkstation (const Name & m) 00416 { 00417 int res ; 00418 char * mac [2] ; 00419 mac [0]= (char *)(m.getCString ()) ; 00420 mac [1] = (char *)NULL ; 00421 00422 pvm_delhosts (mac, 1, &res) ; 00423 } 00424 00425 //-------------------------------------------------------------------- 00426 00427 SvmLink * PvmSvm::createSvmLink (const int & d) 00428 { 00429 PvmSvmLink * canal = new PvmSvmLink (d) ; 00430 return (SvmLink*)canal; 00431 } 00432 00433 //-------------------------------------------------------------------- 00434 00436 bool PvmSvm::testIfSiteRecovered (Process * p) //(const Name &nom) 00437 { 00438 bool flag; 00439 // int mstat = pvm_mstat( const_cast<char *> (nom.getCString()) ); 00440 //int mstat = pvm_mstat( const_cast<char *> (p->getHostMachineName().getCString())); 00441 // cout << "status of machine: "<<pvm_mstat(const_cast<char *>(nom.getCString ()))<<endl; 00442 cout << "nom de la machine: "<<p->getHostMachineName()<<endl; 00443 cout<<"TID de truc "<<p->getSvmLink()->getTID()<<endl; 00444 int pstat = pvm_pstat ( p->getSvmLink()->getTID() ) ; 00445 00446 switch (pstat) 00447 { 00448 case PvmOk: 00449 flag = true ; 00450 cout<<"koukou"<<endl; 00451 break; 00452 case PvmSysErr: 00453 cout<<"putain de merde"<<endl; 00454 flag = false; 00455 break; 00456 case PvmNoTask: 00457 flag = false; 00458 break; 00459 default: 00460 flag = false ; 00461 cout<<"kiki"<<endl; 00462 } 00463 return flag; 00464 } 00465
Documentation generated on Mon Jun 9 11:45:57 2008 |
Generated with doxygen by Dimitri van Heesch , 1997-2007 |