OMKPvmSvm.cxx

Go to the documentation of this file.
00001 /*
00002  * This file is part of openMask © INRIA, CNRS, Universite de Rennes 1 1993-2002, thereinafter the Software
00003  * 
00004  * The Software has been developped within the Siames Project. 
00005  * INRIA, the University of Rennes 1 and CNRS jointly hold intellectual property rights
00006  * 
00007  * The Software has been registered with the Agence pour la Protection des
00008  * Programmes (APP) under registration number IDDN.FR.001.510008.00.S.P.2001.000.41200
00009  *  
00010  * This file may be distributed under the terms of the Q Public License
00011  * version 1.0 as defined by Trolltech AS of Norway and appearing in the file
00012  * LICENSE.QPL included in the packaging of this file.
00013  *
00014  * Licensees holding valid specific licenses issued by INRIA, CNRS or Université de Rennes 1 
00015  * for the software may use this file in accordance with that specific license
00016  *
00017  */
00018 
00019 #ifdef _MSC_VER
00020 #  include <windows.h>
00021 #else
00022 #  include <unistd.h>
00023 #endif
00024 #include <cstring>
00025 
00026 #include <OMKPvmSvm.h>
00027 
00028 #include <pvm3.h>
00029 #include <OMKProcess.h>
00030 #include <OMKPvmSvmLink.h>
00031 #include <OMKPvmMessage.h>
00032 #include <OMKController.h>
00033 using namespace std ;
00034 using namespace OMK ;
00035 
00036 int PvmSvm::pvmDataEncoding = PvmDataDefault ;
00037 int PvmSvm::pvmSpawnFlags = PvmTaskHost ;
00038 //--------------------------------------------------------------------
00039 
00040 PvmSvm::PvmSvm (NameToPointerMap<Process> * tab, const Date & latence, const Date & timeOut, const int deadReckoningInterval, const bool yieldNeeded, int argc, char * argv [])
00041    : Svm (tab, latence, timeOut, deadReckoningInterval, yieldNeeded),
00042      _argc ( argc)//, _argv ( argv ) // TDTD code indigent et scandaleux ?
00043 {
00044 // TDTD tentative d'initialisation du tableau de chaînes
00045    _argv = new char * [_argc + 1] ;
00046    for (int i = 0 ; i < _argc ; i ++) {
00047       _argv [i] = strdup (argv [i]) ;
00048    }
00049    _argv [_argc] = NULL ;
00050 
00051   _siteId = pvm_mytid () ;
00052 
00053   // added By Guillermo Andrade
00054   // this option allow to use direct connection between pvm tasks
00055   // using TCP instead of UDP packing message via pvmd-pvmd connection
00056   pvm_setopt(PvmRoute,PvmRouteDirect);
00057 
00058 }
00059 
00060 //--------------------------------------------------------------------
00061 
00062 PvmSvm::~PvmSvm () 
00063 {
00064    // do not wait for output of spawned tasks to be written
00065    pvm_catchout (0) ;
00066    
00067    // signal end of pvm to the pvm deamon
00068    pvm_exit () ;
00069 
00070 }
00071 
00072 void PvmSvm::joinSvmGroup ( std::string & groupName ) 
00073 {
00074   int resul = pvm_joingroup ( const_cast<char *> (groupName.c_str()) ) ;
00075   if ( resul < 0 )
00076     {
00077       cerr<<"PvmSvm::joinSvmGroup ERROR: ";
00078       switch ( resul )
00079         {
00080         case PvmSysErr: 
00081           cerr<<"pvmd was not started or has crashed.";
00082           break;
00083         case PvmBadParam:
00084           cerr<<"giving a NULL group name.";
00085           break;
00086         case PvmDupGroup:
00087           cerr<<"trying to join a group you are allready in.";
00088           break;
00089         default:
00090           cerr<<"unexpected error: "<<resul;
00091         }
00092       OMASSERT( false ) ;
00093       cerr<<endl;
00094     }
00095 }
00096 
00097 
00098 void PvmSvm::groupBarrier (std::string groupName, int numberToJoin)
00099 {
00100    //   cerr<<"PvmSvm::groupBarrier: Waiting for "<<_numberOfSlaves<<" in pvm group named _"<<groupName.c_str()<<"_"<<endl;
00101   int resul = pvm_barrier ( const_cast<char *> (groupName.c_str()), numberToJoin) ;
00102   //cerr<<"PvmSvm::groupBarrier: Waiting done"<<endl;
00103   if ( resul < 0 )
00104     {
00105       cerr<<"PvmSvm::groupBarrier ERROR: ";
00106       switch ( resul )
00107         {
00108         case PvmSysErr: 
00109           cerr<<"pvmd was not started or has crashed.";
00110           break;
00111         case PvmBadParam:
00112           cerr<<"giving a numberToJoin < 1.";
00113           break;
00114         case PvmNoGroup:
00115           cerr<<"giving a non-existent group name.";
00116           break;
00117         case PvmNotInGroup:
00118           cerr<<"calling process is not in specified group.";
00119           break;
00120         default:
00121           cerr<<"unexpected error: "<<resul;
00122         }
00123         OMASSERT( false ) ;
00124       cerr<<endl;
00125     }
00126 }
00127 
00128 
00129 
00130 void PvmSvm::broadcastToGroup (std::string groupName, PvmMessage::MessageTag tag)
00131 {
00132   int resul = pvm_bcast ( const_cast<char *> (groupName.c_str()), tag ) ;
00133   if ( resul < 0 )
00134     {
00135       cerr<<"PvmSvm::broadcastToGroup ERROR: ";
00136       switch ( resul )
00137         {
00138         case PvmSysErr: 
00139           cerr<<"pvmd was not started or has crashed.";
00140           break;
00141         case PvmBadParam:
00142           cerr<<"giving a negative message tag.";
00143           break;
00144         case PvmNoGroup:
00145           cerr<<"giving a non-existent group name.";
00146           break;
00147         default:
00148           cerr<<"unexpected error: "<<resul;
00149         }
00150         OMASSERT( false ) ;
00151       cerr<<endl;
00152     }
00153 }
00154 
00155 void PvmSvm::initBeforeMessagePacking ()
00156 {
00157   int resul = pvm_initsend ( PvmSvm::pvmDataEncoding ) ;
00158   if ( resul < 0 )
00159     {
00160       cerr<<"PvmSvm::initBeforeMessagePacking ERROR: ";
00161       switch ( resul )
00162         {
00163         case PvmBadParam:
00164           cerr<<"giving a invalid encoding value.";
00165           break;
00166         case PvmNoMem:
00167           cerr<<"Malloc has failed. There is not enough memory to create the buffer";
00168           break;
00169         default:
00170           cerr<<"unexpected error: "<<resul;
00171         }
00172         OMASSERT( false ) ;
00173       cerr<<endl;
00174     }
00175 }
00176 
00177 int PvmSvm::nonblockingReceive (PvmMessage::MessageTag tag) 
00178 {
00179 #ifdef _DEBUGPVMMESS
00180   cerr<<"PvmSvm::nonblockingReceive ("<<tag<<")" <<endl;
00181 #endif
00182   int res = 0 ;
00183   int bufid = pvm_nrecv (-1, tag) ;
00184   while ( bufid > 0)
00185     {
00186       ++res ;
00187       bufid = pvm_nrecv (-1, tag) ;
00188     }
00189   if ( bufid < 0 )
00190     {
00191       cerr<<"PvmSvm::nonblockingReceive ERROR: ";
00192       switch ( bufid )
00193         {
00194         case PvmBadParam:
00195           cerr<<"giving a invalid encoding tid value or msgtag.";
00196           break;
00197         case PvmSysErr:
00198           cerr<<"pvmd not responding.";
00199           break;
00200         default:
00201           cerr<<"unexpected error: "<<bufid;
00202         }
00203         OMASSERT( false ) ;
00204       cerr<<endl;
00205    }
00206   return res ;
00207 }
00208 
00209 
00210 
00211 std::pair<PvmMessage::MessageTag, int> 
00212 PvmSvm::waitForAnyRequests (PvmIncomingMessage & receiveBuffer) 
00213 {
00214   std::pair<PvmMessage::MessageTag, int> result ;
00215   
00216   int bufid = pvm_recv ( -1, -1 ) ; //receive anything
00217 
00218   if ( bufid < 0 ) 
00219     {
00220       cerr<<"Svm::waitForAnyRequests ERROR ";
00221       switch (bufid)
00222         {
00223         case PvmBadParam:
00224           cerr<<"giving an invalid tid value, or msgtag < -1";
00225           break;
00226         case PvmSysErr:
00227           cerr<<"pvmd not responding";
00228           break;
00229         default:
00230           cerr<<"unexpected error";
00231         OMASSERT( false ) ;
00232         }
00233       cerr<<endl;
00234     }
00235 
00236   int bytes ;
00237   //  int senderSiteId ;
00238   int msgtag ;
00239   int info = pvm_bufinfo( bufid, &bytes , &msgtag , &result.second  );
00240   result.first = static_cast<PvmMessage::MessageTag>(msgtag) ;
00241   if ( info < 0 ) 
00242     {
00243       cerr<<"Svm::waitForAnyRequests ERROR in pvm_bufinfo ";
00244       switch (info)
00245         {
00246         case PvmBadParam:
00247           cerr<<"invalid argument";
00248           break;
00249         case PvmNoSuchBuf:
00250           cerr<<"specified buffer does not exist";
00251           break;
00252         default:
00253           cerr<<"unexpected error";
00254         OMASSERT( false ) ;
00255         }
00256       cerr<<endl;
00257     }
00258 
00259   receiveBuffer.initialise ( bufid ) ;
00260   
00261   return result ;
00262 }
00263 
00264 
00265 
00266 //--------------------------------------------------------------------
00267 
00268 int PvmSvm::getSiteId () {
00269   assert ( _siteId >= 0 ) ;
00270   return _siteId ; 
00271 }
00272 
00273 //--------------------------------------------------------------------
00274 
00275 void PvmSvm::addNewWorkstation (const Name & m) {
00276    char * mac [2] ; // Machine a ajouter
00277    int res ; // Resultat du add_host
00278    int infos [2] ; // Informations sur chaque host
00279    int tentative = 0 ; // Numero de la tentative
00280    int nbTentative = 40 ; // On essaie 40 fois
00281 
00282    // here, const casting because pvm uses a C interface without const
00283    mac [0] = const_cast<char *> (m.getCString ()) ;
00284    mac [1] = (char *)NULL ;
00285 
00286 //TDTD ajout d'une temporisation le 24/05/2006 pour atténuer les pbs lors d'ajouts de plusieurs process sur la même machine
00287 #ifdef _MSC_VER
00288                                 Sleep(1);
00289 #else
00290                                 sleep (1) ;
00291 #endif
00292    while (tentative < nbTentative) 
00293       {
00294          cout << "Tentative number " << tentative+1
00295               << " for machine " << mac [0] <<endl;
00296       res = pvm_addhosts (mac, 1, infos) ;
00297       if (res < 1) { // On a une erreur
00298          tentative++;
00299 
00300          if (infos[0] != PvmDupHost) 
00301            { 
00302              if (tentative >= nbTentative) 
00303                {
00304                  cerr << "Type d'erreur : " << res << endl;
00305                  cerr << "Infos[0] : " << infos[0] << endl;
00306                  cerr << "Machine : " << mac [0] << ", " << mac [1] << endl;
00307                  Controller::error( "PvmSvm::addNewWorkstation: pb pour ajouter une machine");
00308                } 
00309              else 
00310                {
00311 #ifdef _MSC_VER
00312                                 Sleep(5);
00313 #else
00314                                 sleep (5) ;
00315 #endif
00316                }
00317            } 
00318          else 
00319            { // workstation wasn't added because it is allready present !
00320              // no need to try more
00321              tentative = nbTentative;
00322            }
00323       } 
00324       else 
00325         { // On n'a pas d'erreur, on arrete d'essayer
00326           tentative = nbTentative;
00327           ++_numberOfSlaves ;
00328         }
00329    }
00330    cout << "Machine " << mac [0] << " added" << endl ;           
00331    //cout << "status of machine: "<<pvm_mstat(mac [0])<<endl;
00332 }
00333 
00334 //--------------------------------------------------------------------
00335 
00336 int PvmSvm::getParentSiteId () {
00337    int tidCtrlGlo = pvm_parent () ;
00338    // options PVM:
00339    //pvm_setopt (PvmSelfTraceTid, tidCtrlGlo) ; 
00340    //pvm_setopt (PvmSelfTraceCode, 666) ; 
00341    //pvm_setopt (PvmSelfOutputTid, tidCtrlGlo) ; 
00342    //pvm_setopt (PvmSelfOutputCode, 667) ; 
00343    //pvm_catchout (stdout) ;
00344    // Message au format XDR
00345    pvm_initsend (pvmDataEncoding) ;
00346    if (tidCtrlGlo == PvmNoParent ) 
00347      {
00348        return 0 ;
00349      } 
00350    else 
00351      {
00352        assert ( tidCtrlGlo != 0 ) ; //for coherance
00353        return tidCtrlGlo ;
00354      }
00355 }
00356 
00357 //--------------------------------------------------------------------
00358 
00359 int PvmSvm::spawnProcess (Process * p) 
00360 {
00361    assert ( _argv[_argc] == NULL ) ;
00362    //   char * argv [5] ;
00363    int tidCtrlLoc ;
00364 //     istrstream is (fConfig.getCString (), strlen (fConfig.getCString ())) ;
00365 //     argv [0] = new char [256] ;
00366 //     argv [1] = new char [256] ;
00367 //     argv [2] = new char [256] ;
00368 //     argv [3] = new char [256] ;
00369 //     is >> argv [0] >> argv [1] >> argv [2] >> argv [3];
00370 //     argv [4] = (char *)NULL ;
00371    std::cerr << "PvmSvm::spawnProcess : " ;
00372        for ( int i = 0 ; i < _argc ; ++i) 
00373          {
00374             std::cerr << " <" << _argv[i] << ">" ;
00375          }
00376    std::cerr << std::endl ;
00377 
00378 //    int res = pvm_spawn ("tutorialValgrind", 
00379 //                      const_cast<char **>(&_argv[1]), 
00380 //                      pvmSpawnFlags,
00381 //                      (char *)(p->getHostMachineName ().getCString ()), 
00382 //                      1, 
00383 //                      &tidCtrlLoc) ;
00384 
00385    int res = pvm_spawn (const_cast<char *>(_argv[0]), 
00386                         const_cast<char **>(&_argv[1]), 
00387                         pvmSpawnFlags,
00388                         (char *)(p->getHostMachineName ().getCString ()), 
00389                         1, 
00390                         &tidCtrlLoc) ;
00391    if (res != 1) 
00392      {
00393        cout << "Executable : " << _argv[0] << endl ;
00394        //p->Executable ().getCString ()
00395        char ** argv = &_argv[1] ;
00396        for ( int i = 0 ; i < _argc-1 ; ++i) 
00397          {
00398            cout << "_argv ["<<i<<"] : " << argv[i] << endl ;
00399          }
00400        cout << "on machine: " 
00401             << p->getHostMachineName ().getCString () << endl ;
00402        cout << "status of machine: "
00403             <<pvm_mstat(const_cast<char *>(p->getHostMachineName ().
00404                                            getCString ()))<<endl;
00405        cout << "tidCtrlLoc: " << tidCtrlLoc << endl ;
00406        cout << "Number of tasks started : " << res << endl ;
00407        Controller::error ("PvmSvm::spawnProcess: spawn problem") ;
00408      }
00409    return (int)tidCtrlLoc ;
00410 }
00411 
00412 
00413 //--------------------------------------------------------------------
00414 
00415 void PvmSvm::removeWorkstation (const Name & m) 
00416 {
00417    int res ;
00418    char * mac [2] ;
00419    mac [0]= (char *)(m.getCString ()) ;
00420    mac [1] = (char *)NULL ;
00421    
00422    pvm_delhosts (mac, 1, &res) ;
00423 }
00424 
00425 //--------------------------------------------------------------------
00426 
00427 SvmLink * PvmSvm::createSvmLink (const int & d) 
00428 {
00429    PvmSvmLink * canal = new PvmSvmLink (d) ;
00430    return (SvmLink*)canal;       
00431 }
00432 
00433 //--------------------------------------------------------------------
00434 
00436 bool PvmSvm::testIfSiteRecovered (Process * p)  //(const Name &nom)
00437 {
00438    bool flag;
00439  //  int mstat = pvm_mstat( const_cast<char *> (nom.getCString()) );
00440  //int mstat = pvm_mstat( const_cast<char *> (p->getHostMachineName().getCString()));
00441  // cout << "status of machine: "<<pvm_mstat(const_cast<char *>(nom.getCString ()))<<endl;
00442   cout << "nom de la machine: "<<p->getHostMachineName()<<endl;
00443   cout<<"TID de truc "<<p->getSvmLink()->getTID()<<endl;
00444    int pstat = pvm_pstat ( p->getSvmLink()->getTID() ) ;
00445  
00446    switch (pstat)
00447      {
00448      case PvmOk:
00449        flag = true ;
00450        cout<<"koukou"<<endl;
00451        break;
00452      case PvmSysErr:
00453        cout<<"putain de merde"<<endl;
00454        flag = false;
00455        break;
00456      case PvmNoTask:
00457        flag = false;
00458        break;
00459      default:
00460        flag = false ;
00461        cout<<"kiki"<<endl;
00462      }
00463    return flag;
00464  }
00465 

logo OpenMask

Documentation generated on Mon Jun 9 11:45:57 2008

Generated with doxygen by Dimitri van Heesch ,   1997-2007