OMK::PvmSvm Class Reference

class encapsulating the PVM virtual machine More...

#include <OMKPvmSvm.h>

Inheritance diagram for OMK::PvmSvm:

Inheritance graph
[legend]
Collaboration diagram for OMK::PvmSvm:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 PvmSvm (NameToPointerMap< Process > *tab, const Date &latence, const Date &timeOut, const int deadReckoningInterval, const bool yieldNeeded, int argc, char *argv[])
 constructor
virtual ~PvmSvm ()
 destructor

Static Public Attributes

static int pvmDataEncoding
 define the encoding used when sending and receiveing messages
static int pvmSpawnFlags
 define the flags used when spawning a new task using PVM

Protected Member Functions

virtual void joinSvmGroup (std::string &groupName)
 joinSvmGroup.
virtual void groupBarrier (std::string groupName, int numberToJoin)
 block the calling process until numberToJoin processes of group groupName have called syncDistributedSites
virtual void broadcastToGroup (std::string groupName, PvmMessage::MessageTag tag)
 broadcast an empty tagged message to all members of a broadcast group
virtual int nonblockingReceive (PvmMessage::MessageTag tag)
 try and receive message in a non-blocking fashion the received messages are lost
virtual void initBeforeMessagePacking ()
 do any usefull initialisations before messages are packed
virtual std::pair< PvmMessage::MessageTag,
int
waitForAnyRequests (PvmIncomingMessage &receiveBuffer)
 wait for any message sent to this site.
virtual int getSiteId ()
 get the id in the svm of this site
virtual int getParentSiteId ()
 get the site id having spanned this process.
virtual void addNewWorkstation (const Name &m)
 add a new machine to the virtual machine
virtual void removeWorkstation (const Name &m)
 remove a work station from the virtual machine
virtual int spawnProcess (Process *p)
 spawn a copy of this proc'ess on another site
virtual SvmLinkcreateSvmLink (const int &d)
 create a link to a distant site
virtual bool testIfSiteRecovered (Process *p)
 CHADI.

Protected Attributes

int _argc
 remember the arguments to use when spawning: their number
char ** _argv
 remember the arguments to use when spawning: their contents

Detailed Description

class encapsulating the PVM virtual machine

Author:
Siames
Version:
2.0

Definition at line 32 of file OMKPvmSvm.h.


Constructor & Destructor Documentation

PvmSvm::PvmSvm ( NameToPointerMap< Process > *  tab,
const Date latence,
const Date timeOut,
const int  deadReckoningInterval,
const bool  yieldNeeded,
int  argc,
char *  argv[] 
)

constructor

Parameters:
tab a table containing the a map between a processName and the data structure definig a process
latence the authorized synchronisation latency
argc the number of arguments given at the command line
argv the arguments given at the command line

Definition at line 40 of file OMKPvmSvm.cxx.

References _argc, _argv, and OMK::Svm::_siteId.

00041    : Svm (tab, latence, timeOut, deadReckoningInterval, yieldNeeded),
00042      _argc ( argc)//, _argv ( argv ) // TDTD code indigent et scandaleux ?
00043 {
00044 // TDTD tentative d'initialisation du tableau de chaînes
00045    _argv = new char * [_argc + 1] ;
00046    for (int i = 0 ; i < _argc ; i ++) {
00047       _argv [i] = strdup (argv [i]) ;
00048    }
00049    _argv [_argc] = NULL ;
00050 
00051   _siteId = pvm_mytid () ;
00052 
00053   // added By Guillermo Andrade
00054   // this option allow to use direct connection between pvm tasks
00055   // using TCP instead of UDP packing message via pvmd-pvmd connection
00056   pvm_setopt(PvmRoute,PvmRouteDirect);
00057 
00058 }

PvmSvm::~PvmSvm (  )  [virtual]

destructor

Definition at line 62 of file OMKPvmSvm.cxx.

00063 {
00064    // do not wait for output of spawned tasks to be written
00065    pvm_catchout (0) ;
00066    
00067    // signal end of pvm to the pvm deamon
00068    pvm_exit () ;
00069 
00070 }


Member Function Documentation

void PvmSvm::joinSvmGroup ( std::string &  groupName  )  [protected, virtual]

joinSvmGroup.

join a group of processes in the siames virtual machine

Parameters:
groupName the name of the group of processes to join. This parameter is not const, because of the underlying C compatibility layer

Implements OMK::Svm.

Definition at line 72 of file OMKPvmSvm.cxx.

References OMASSERT.

00073 {
00074   int resul = pvm_joingroup ( const_cast<char *> (groupName.c_str()) ) ;
00075   if ( resul < 0 )
00076     {
00077       cerr<<"PvmSvm::joinSvmGroup ERROR: ";
00078       switch ( resul )
00079         {
00080         case PvmSysErr: 
00081           cerr<<"pvmd was not started or has crashed.";
00082           break;
00083         case PvmBadParam:
00084           cerr<<"giving a NULL group name.";
00085           break;
00086         case PvmDupGroup:
00087           cerr<<"trying to join a group you are allready in.";
00088           break;
00089         default:
00090           cerr<<"unexpected error: "<<resul;
00091         }
00092       OMASSERT( false ) ;
00093       cerr<<endl;
00094     }
00095 }

void PvmSvm::groupBarrier ( std::string  groupName,
int  numberToJoin 
) [protected, virtual]

block the calling process until numberToJoin processes of group groupName have called syncDistributedSites

Parameters:
numberToJoin number of processes to wait for
groupName the name of the process group

Implements OMK::Svm.

Definition at line 98 of file OMKPvmSvm.cxx.

References OMASSERT.

00099 {
00100    //   cerr<<"PvmSvm::groupBarrier: Waiting for "<<_numberOfSlaves<<" in pvm group named _"<<groupName.c_str()<<"_"<<endl;
00101   int resul = pvm_barrier ( const_cast<char *> (groupName.c_str()), numberToJoin) ;
00102   //cerr<<"PvmSvm::groupBarrier: Waiting done"<<endl;
00103   if ( resul < 0 )
00104     {
00105       cerr<<"PvmSvm::groupBarrier ERROR: ";
00106       switch ( resul )
00107         {
00108         case PvmSysErr: 
00109           cerr<<"pvmd was not started or has crashed.";
00110           break;
00111         case PvmBadParam:
00112           cerr<<"giving a numberToJoin < 1.";
00113           break;
00114         case PvmNoGroup:
00115           cerr<<"giving a non-existent group name.";
00116           break;
00117         case PvmNotInGroup:
00118           cerr<<"calling process is not in specified group.";
00119           break;
00120         default:
00121           cerr<<"unexpected error: "<<resul;
00122         }
00123         OMASSERT( false ) ;
00124       cerr<<endl;
00125     }
00126 }

void PvmSvm::broadcastToGroup ( std::string  groupName,
PvmMessage::MessageTag  tag 
) [protected, virtual]

broadcast an empty tagged message to all members of a broadcast group

Implements OMK::Svm.

Definition at line 130 of file OMKPvmSvm.cxx.

References OMASSERT.

00131 {
00132   int resul = pvm_bcast ( const_cast<char *> (groupName.c_str()), tag ) ;
00133   if ( resul < 0 )
00134     {
00135       cerr<<"PvmSvm::broadcastToGroup ERROR: ";
00136       switch ( resul )
00137         {
00138         case PvmSysErr: 
00139           cerr<<"pvmd was not started or has crashed.";
00140           break;
00141         case PvmBadParam:
00142           cerr<<"giving a negative message tag.";
00143           break;
00144         case PvmNoGroup:
00145           cerr<<"giving a non-existent group name.";
00146           break;
00147         default:
00148           cerr<<"unexpected error: "<<resul;
00149         }
00150         OMASSERT( false ) ;
00151       cerr<<endl;
00152     }
00153 }

int PvmSvm::nonblockingReceive ( PvmMessage::MessageTag  tag  )  [protected, virtual]

try and receive message in a non-blocking fashion the received messages are lost

Parameters:
tag the tag of the messages to receive
Returns:
number of received messages with that tag

Implements OMK::Svm.

Definition at line 177 of file OMKPvmSvm.cxx.

References OMASSERT.

00178 {
00179 #ifdef _DEBUGPVMMESS
00180   cerr<<"PvmSvm::nonblockingReceive ("<<tag<<")" <<endl;
00181 #endif
00182   int res = 0 ;
00183   int bufid = pvm_nrecv (-1, tag) ;
00184   while ( bufid > 0)
00185     {
00186       ++res ;
00187       bufid = pvm_nrecv (-1, tag) ;
00188     }
00189   if ( bufid < 0 )
00190     {
00191       cerr<<"PvmSvm::nonblockingReceive ERROR: ";
00192       switch ( bufid )
00193         {
00194         case PvmBadParam:
00195           cerr<<"giving a invalid encoding tid value or msgtag.";
00196           break;
00197         case PvmSysErr:
00198           cerr<<"pvmd not responding.";
00199           break;
00200         default:
00201           cerr<<"unexpected error: "<<bufid;
00202         }
00203         OMASSERT( false ) ;
00204       cerr<<endl;
00205    }
00206   return res ;
00207 }

void PvmSvm::initBeforeMessagePacking (  )  [protected, virtual]

do any usefull initialisations before messages are packed

Implements OMK::Svm.

Definition at line 155 of file OMKPvmSvm.cxx.

References OMASSERT, and pvmDataEncoding.

00156 {
00157   int resul = pvm_initsend ( PvmSvm::pvmDataEncoding ) ;
00158   if ( resul < 0 )
00159     {
00160       cerr<<"PvmSvm::initBeforeMessagePacking ERROR: ";
00161       switch ( resul )
00162         {
00163         case PvmBadParam:
00164           cerr<<"giving a invalid encoding value.";
00165           break;
00166         case PvmNoMem:
00167           cerr<<"Malloc has failed. There is not enough memory to create the buffer";
00168           break;
00169         default:
00170           cerr<<"unexpected error: "<<resul;
00171         }
00172         OMASSERT( false ) ;
00173       cerr<<endl;
00174     }
00175 }

std::pair< PvmMessage::MessageTag, int > PvmSvm::waitForAnyRequests ( PvmIncomingMessage receiveBuffer  )  [protected, virtual]

wait for any message sent to this site.

This call is blocking

Parameters:
receiveBuffer : the user buffer to use to receive any message a pair giving the tag of the received message and the siteId of the sender

Implements OMK::Svm.

Definition at line 212 of file OMKPvmSvm.cxx.

References OMK::PvmIncomingMessage::initialise(), and OMASSERT.

00213 {
00214   std::pair<PvmMessage::MessageTag, int> result ;
00215   
00216   int bufid = pvm_recv ( -1, -1 ) ; //receive anything
00217 
00218   if ( bufid < 0 ) 
00219     {
00220       cerr<<"Svm::waitForAnyRequests ERROR ";
00221       switch (bufid)
00222         {
00223         case PvmBadParam:
00224           cerr<<"giving an invalid tid value, or msgtag < -1";
00225           break;
00226         case PvmSysErr:
00227           cerr<<"pvmd not responding";
00228           break;
00229         default:
00230           cerr<<"unexpected error";
00231         OMASSERT( false ) ;
00232         }
00233       cerr<<endl;
00234     }
00235 
00236   int bytes ;
00237   //  int senderSiteId ;
00238   int msgtag ;
00239   int info = pvm_bufinfo( bufid, &bytes , &msgtag , &result.second  );
00240   result.first = static_cast<PvmMessage::MessageTag>(msgtag) ;
00241   if ( info < 0 ) 
00242     {
00243       cerr<<"Svm::waitForAnyRequests ERROR in pvm_bufinfo ";
00244       switch (info)
00245         {
00246         case PvmBadParam:
00247           cerr<<"invalid argument";
00248           break;
00249         case PvmNoSuchBuf:
00250           cerr<<"specified buffer does not exist";
00251           break;
00252         default:
00253           cerr<<"unexpected error";
00254         OMASSERT( false ) ;
00255         }
00256       cerr<<endl;
00257     }
00258 
00259   receiveBuffer.initialise ( bufid ) ;
00260   
00261   return result ;
00262 }

int PvmSvm::getSiteId (  )  [protected, virtual]

get the id in the svm of this site

Implements OMK::Svm.

Definition at line 268 of file OMKPvmSvm.cxx.

References OMK::Svm::_siteId.

00268                        {
00269   assert ( _siteId >= 0 ) ;
00270   return _siteId ; 
00271 }

int PvmSvm::getParentSiteId (  )  [protected, virtual]

get the site id having spanned this process.

Returns:
the site id, 0 meaning the process wasn't spawned by the virtual machine

Implements OMK::Svm.

Definition at line 336 of file OMKPvmSvm.cxx.

References pvmDataEncoding.

00336                              {
00337    int tidCtrlGlo = pvm_parent () ;
00338    // options PVM:
00339    //pvm_setopt (PvmSelfTraceTid, tidCtrlGlo) ; 
00340    //pvm_setopt (PvmSelfTraceCode, 666) ; 
00341    //pvm_setopt (PvmSelfOutputTid, tidCtrlGlo) ; 
00342    //pvm_setopt (PvmSelfOutputCode, 667) ; 
00343    //pvm_catchout (stdout) ;
00344    // Message au format XDR
00345    pvm_initsend (pvmDataEncoding) ;
00346    if (tidCtrlGlo == PvmNoParent ) 
00347      {
00348        return 0 ;
00349      } 
00350    else 
00351      {
00352        assert ( tidCtrlGlo != 0 ) ; //for coherance
00353        return tidCtrlGlo ;
00354      }
00355 }

void PvmSvm::addNewWorkstation ( const Name m  )  [protected, virtual]

add a new machine to the virtual machine

Implements OMK::Svm.

Definition at line 275 of file OMKPvmSvm.cxx.

References OMK::Svm::_numberOfSlaves, OMK::Controller::error(), and OMK::Name::getCString().

00275                                               {
00276    char * mac [2] ; // Machine a ajouter
00277    int res ; // Resultat du add_host
00278    int infos [2] ; // Informations sur chaque host
00279    int tentative = 0 ; // Numero de la tentative
00280    int nbTentative = 40 ; // On essaie 40 fois
00281 
00282    // here, const casting because pvm uses a C interface without const
00283    mac [0] = const_cast<char *> (m.getCString ()) ;
00284    mac [1] = (char *)NULL ;
00285 
00286 //TDTD ajout d'une temporisation le 24/05/2006 pour atténuer les pbs lors d'ajouts de plusieurs process sur la même machine
00287 #ifdef _MSC_VER
00288                                 Sleep(1);
00289 #else
00290                                 sleep (1) ;
00291 #endif
00292    while (tentative < nbTentative) 
00293       {
00294          cout << "Tentative number " << tentative+1
00295               << " for machine " << mac [0] <<endl;
00296       res = pvm_addhosts (mac, 1, infos) ;
00297       if (res < 1) { // On a une erreur
00298          tentative++;
00299 
00300          if (infos[0] != PvmDupHost) 
00301            { 
00302              if (tentative >= nbTentative) 
00303                {
00304                  cerr << "Type d'erreur : " << res << endl;
00305                  cerr << "Infos[0] : " << infos[0] << endl;
00306                  cerr << "Machine : " << mac [0] << ", " << mac [1] << endl;
00307                  Controller::error( "PvmSvm::addNewWorkstation: pb pour ajouter une machine");
00308                } 
00309              else 
00310                {
00311 #ifdef _MSC_VER
00312                                 Sleep(5);
00313 #else
00314                                 sleep (5) ;
00315 #endif
00316                }
00317            } 
00318          else 
00319            { // workstation wasn't added because it is allready present !
00320              // no need to try more
00321              tentative = nbTentative;
00322            }
00323       } 
00324       else 
00325         { // On n'a pas d'erreur, on arrete d'essayer
00326           tentative = nbTentative;
00327           ++_numberOfSlaves ;
00328         }
00329    }
00330    cout << "Machine " << mac [0] << " added" << endl ;           
00331    //cout << "status of machine: "<<pvm_mstat(mac [0])<<endl;
00332 }

void PvmSvm::removeWorkstation ( const Name m  )  [protected, virtual]

remove a work station from the virtual machine

Implements OMK::Svm.

Definition at line 415 of file OMKPvmSvm.cxx.

References OMK::Name::getCString().

00416 {
00417    int res ;
00418    char * mac [2] ;
00419    mac [0]= (char *)(m.getCString ()) ;
00420    mac [1] = (char *)NULL ;
00421    
00422    pvm_delhosts (mac, 1, &res) ;
00423 }

int PvmSvm::spawnProcess ( Process p  )  [protected, virtual]

spawn a copy of this proc'ess on another site

Implements OMK::Svm.

Definition at line 359 of file OMKPvmSvm.cxx.

References _argc, _argv, OMK::Controller::error(), OMK::Name::getCString(), OMK::Process::getHostMachineName(), and pvmSpawnFlags.

00360 {
00361    assert ( _argv[_argc] == NULL ) ;
00362    //   char * argv [5] ;
00363    int tidCtrlLoc ;
00364 //     istrstream is (fConfig.getCString (), strlen (fConfig.getCString ())) ;
00365 //     argv [0] = new char [256] ;
00366 //     argv [1] = new char [256] ;
00367 //     argv [2] = new char [256] ;
00368 //     argv [3] = new char [256] ;
00369 //     is >> argv [0] >> argv [1] >> argv [2] >> argv [3];
00370 //     argv [4] = (char *)NULL ;
00371    std::cerr << "PvmSvm::spawnProcess : " ;
00372        for ( int i = 0 ; i < _argc ; ++i) 
00373          {
00374             std::cerr << " <" << _argv[i] << ">" ;
00375          }
00376    std::cerr << std::endl ;
00377 
00378 //    int res = pvm_spawn ("tutorialValgrind", 
00379 //                      const_cast<char **>(&_argv[1]), 
00380 //                      pvmSpawnFlags,
00381 //                      (char *)(p->getHostMachineName ().getCString ()), 
00382 //                      1, 
00383 //                      &tidCtrlLoc) ;
00384 
00385    int res = pvm_spawn (const_cast<char *>(_argv[0]), 
00386                         const_cast<char **>(&_argv[1]), 
00387                         pvmSpawnFlags,
00388                         (char *)(p->getHostMachineName ().getCString ()), 
00389                         1, 
00390                         &tidCtrlLoc) ;
00391    if (res != 1) 
00392      {
00393        cout << "Executable : " << _argv[0] << endl ;
00394        //p->Executable ().getCString ()
00395        char ** argv = &_argv[1] ;
00396        for ( int i = 0 ; i < _argc-1 ; ++i) 
00397          {
00398            cout << "_argv ["<<i<<"] : " << argv[i] << endl ;
00399          }
00400        cout << "on machine: " 
00401             << p->getHostMachineName ().getCString () << endl ;
00402        cout << "status of machine: "
00403             <<pvm_mstat(const_cast<char *>(p->getHostMachineName ().
00404                                            getCString ()))<<endl;
00405        cout << "tidCtrlLoc: " << tidCtrlLoc << endl ;
00406        cout << "Number of tasks started : " << res << endl ;
00407        Controller::error ("PvmSvm::spawnProcess: spawn problem") ;
00408      }
00409    return (int)tidCtrlLoc ;
00410 }

SvmLink * PvmSvm::createSvmLink ( const int d  )  [protected, virtual]

create a link to a distant site

Parameters:
d the site id of the distant site

Implements OMK::Svm.

Definition at line 427 of file OMKPvmSvm.cxx.

00428 {
00429    PvmSvmLink * canal = new PvmSvmLink (d) ;
00430    return (SvmLink*)canal;       
00431 }

bool PvmSvm::testIfSiteRecovered ( Process p  )  [protected, virtual]

CHADI.

Implements OMK::Svm.

Definition at line 436 of file OMKPvmSvm.cxx.

References OMK::Process::getHostMachineName(), OMK::Process::getSvmLink(), and OMK::SvmLink::getTID().

00437 {
00438    bool flag;
00439  //  int mstat = pvm_mstat( const_cast<char *> (nom.getCString()) );
00440  //int mstat = pvm_mstat( const_cast<char *> (p->getHostMachineName().getCString()));
00441  // cout << "status of machine: "<<pvm_mstat(const_cast<char *>(nom.getCString ()))<<endl;
00442   cout << "nom de la machine: "<<p->getHostMachineName()<<endl;
00443   cout<<"TID de truc "<<p->getSvmLink()->getTID()<<endl;
00444    int pstat = pvm_pstat ( p->getSvmLink()->getTID() ) ;
00445  
00446    switch (pstat)
00447      {
00448      case PvmOk:
00449        flag = true ;
00450        cout<<"koukou"<<endl;
00451        break;
00452      case PvmSysErr:
00453        cout<<"putain de merde"<<endl;
00454        flag = false;
00455        break;
00456      case PvmNoTask:
00457        flag = false;
00458        break;
00459      default:
00460        flag = false ;
00461        cout<<"kiki"<<endl;
00462      }
00463    return flag;
00464  }


Member Data Documentation

int PvmSvm::pvmDataEncoding [static]

define the encoding used when sending and receiveing messages

Definition at line 117 of file OMKPvmSvm.h.

Referenced by OMK::PvmNameServer::getIdentifierAsFrom(), getParentSiteId(), OMK::PvmNameServer::getStringAssociatedToOnCentralServer(), initBeforeMessagePacking(), OMK::PvmNameServer::PvmNameServer(), OMK::PvmOutgoingMessage::PvmOutgoingMessage(), OMK::PvmOutgoingMessage::reinitAndRevertPvmContext(), OMK::Svm::serveNameRequestsUntilEnd(), and OMK::PvmCentralNameServer::verifyCompatibilityWithLocalNameServer().

int PvmSvm::pvmSpawnFlags [static]

define the flags used when spawning a new task using PVM

Definition at line 121 of file OMKPvmSvm.h.

Referenced by spawnProcess().

int OMK::PvmSvm::_argc [protected]

remember the arguments to use when spawning: their number

Definition at line 126 of file OMKPvmSvm.h.

Referenced by PvmSvm(), and spawnProcess().

char** OMK::PvmSvm::_argv [protected]

remember the arguments to use when spawning: their contents

Definition at line 130 of file OMKPvmSvm.h.

Referenced by PvmSvm(), and spawnProcess().


logo OpenMask

Documentation generated on Mon Jun 9 11:46:03 2008

Generated with doxygen by Dimitri van Heesch ,   1997-2007