DistributedNetwork.cpp

Go to the documentation of this file.
00001 #include "DistributedNetwork.h"
00002 #include "PCSIMException.h"
00003 #include "SpikeSender.h"
00004 #include "SimTime.h"
00005 #include "SimObjectRegistry.h"
00006 
00007 #include <string>
00008 using std::string;
00009 
00010 #include <boost/format.hpp>
00011 
00012 #include <mpi.h>
00013 
00014 #include <iostream>
00015 using std::cerr;
00016 using std::endl;
00017 
00018 DistributedNetwork::DistributedNetwork( int nThreads, SimParameter sp ) :
00019         SimNetwork(MPI::COMM_WORLD, sp, &globalRoundRobin ), mpi_comm(MPI::COMM_WORLD), _nThreads(nThreads)
00020 {
00021     init();
00022 }
00023 
00024 DistributedNetwork::DistributedNetwork( int nThreads, MPI::Intracomm &mpiCommunicator, SimParameter sp ) :
00025         SimNetwork(mpiCommunicator, sp, &globalRoundRobin ), mpi_comm(mpiCommunicator), _nThreads(nThreads)
00026 {
00027     init();
00028 }
00029 
00030 
00031 
00032 void DistributedNetwork::init()
00033 {
00034     setupGlEngineIDs(_nThreads);
00035     incomingConnections.resize(mpi_comm.Get_size(), false);
00036     outgoingConnections.resize(mpi_comm.Get_size(), false);
00037     objectCounter.resize( maxGlobalEngineID()+1 );
00038     spikePortCounter.resize( maxGlobalEngineID()+1 );
00039     for( size_t i=0; i<spikePortCounter.size(); i++)
00040         spikePortCounter[0] = 0;
00041     // ghostPortInfo.resize( maxGlobalEngineID()+1 );
00042     ghostFirstPortInfo.resize( maxGlobalEngineID()+1 );
00043     _mpi_rank = mpi_comm.Get_rank();
00044 
00045 }
00046 
00047 DistributedNetwork::~DistributedNetwork()
00048 {
00049     /* NOOP */
00050 }
00051 
00052 uint32 DistributedNetwork::getUniqueSeedOverMpi( uint32 seed )
00053 {
00054     uint32 seed_buffer = seed;
00055     mpi_comm.Bcast((void *)&seed_buffer, (int)1, MPI::UNSIGNED, 0);
00056     return seed_buffer;
00057 }
00058 
00059 void DistributedNetwork::seed_noise_rng( uint32 noiseRNGseed )
00060 {
00061     uint32 seed = getUniqueSeedOverMpi( noiseRNGseed );
00062     vector<uint32> global_sim_seeds( maxGlobalEngineID()+1 );
00063     fillSeedVector( seed, global_sim_seeds );
00064 
00065     vector<uint32> local_sim_seeds( _nThreads );
00066     for( int e = 0;  e < _nThreads; e++ ) {
00067         local_sim_seeds[e] = global_sim_seeds[ glengineids[ _mpi_rank ][ e ] ];
00068     }
00069     localSimEngine->seed( local_sim_seeds );
00070 
00071 
00072     seed = getUniqueSeedOverMpi( makeSeed( simParam.constructionRNGSeed ) );
00073     global_sim_seeds.resize( mpi_size() + 1 );
00074     fillSeedVector( seed, global_sim_seeds );
00075     objectVariationRNDEngine->seed( global_sim_seeds[ mpi_rank() ] );
00076 
00077 }
00078 
00079 void DistributedNetwork::setupGlEngineIDs(unsigned int numLocalEngines)
00080 {
00081     uint32 *nEngineArray  = new uint32[mpi_comm.Get_size()];
00082     // setup all the global engine ids
00083     uint32 nEngines = numLocalEngines;
00084     mpi_comm.Allgather((void*)&nEngines, (int)1, MPI::UNSIGNED, (void*)nEngineArray, (int)1, MPI::UNSIGNED);
00085     int glEngineCounter = 0;
00086     glengineids.resize(mpi_comm.Get_size());
00087     for (int i=0 ; i < mpi_comm.Get_size(); ++i) {
00088         glengineids[i].resize(nEngineArray[i]);
00089         for (unsigned int j = 0 ; j < nEngineArray[i] ; ++j) {
00090             glengineids[i][j] = glEngineCounter;
00091             location.push_back( SimEngine::ID( i, j ) );
00092             glEngineCounter++;
00093         }
00094     }
00095     //cerr << "setupGlEngineIDs rank " << mpi_comm.Get_rank() << endl ;
00096     _max_global_engine_id = glEngineCounter-1;
00097     delete [] nEngineArray;
00098 }
00099 
00100 // ----------------------------- conversion between local and global engine IDs ---------------------------//
00101 
00103 gl_engineid_t DistributedNetwork::getGlobalEngineID( const SimEngine::ID &eid ) const
00104 {
00105     if( ( eid.node < glengineids.size() ) && ( eid.engine < glengineids[ eid.node ].size() ) ) {
00106         return glengineids[ eid.node ][ eid.engine ];
00107     } else {
00108         return INVALID_GLOBAL_ENGINE_ID;
00109     }
00110 }
00111 
00113 gl_engineid_t DistributedNetwork::getGlobalEngineID( engineid_t eng ) const
00114 {
00115     return getGlobalEngineID( SimEngine::ID( mpi_comm.Get_rank(), eng) );
00116 }
00117 
00118 
00120 gl_engineid_t DistributedNetwork::maxGlobalEngineID(void) const
00121 {
00122     return _max_global_engine_id;
00123 }
00124 
00125 gl_engineid_t DistributedNetwork::maxLocalEngineID(void) const
00126 {
00127     return _nThreads-1;
00128 }
00129 
00131 const SimEngine::ID DistributedNetwork::getLocation( gl_engineid_t gEID ) const
00132 {
00133     if( gEID < location.size() ) {
00134         return location[gEID];
00135     } else {
00136         return SimEngine::ID::Invalid;
00137     }
00138 }
00139 
00140 local_objectid_t DistributedNetwork::getGhostID( gl_engineid_t gEID, SimObjectFactory const& objFactory  )
00141 {
00142     object_type_t tid = objFactory.getObjectTypeID();
00143     if( gEID >= objectCounter.size() ) {
00144         return INVALID_LOCAL_OBJECT_ID;
00145     }
00146 
00147     // resize if necessary
00148     if( tid >= objectCounter[gEID].size() ) {
00149         objectCounter[gEID].resize( tid + 1 );
00150         objectCounter[gEID][tid] = 0;
00151         //ghostPortInfo[gEID].resize( tid + 1 );
00152         ghostFirstPortInfo[gEID].resize( tid + 1 );
00153     }
00154 
00155     // cerr << "spc.sz=" << spikePortCounter.size() << endl;
00156     local_objectid_t oid  = objectCounter[gEID][tid]++;
00157 
00158     int nsop = theSimObjectRegistry->getObject( objFactory.getObjectTypeID() ).nSpikeOutputPorts();
00159     if( nsop  > 0 ) {
00160         if( oid >= ghostFirstPortInfo[gEID][tid].size() ) {
00161             ghostFirstPortInfo[gEID][tid].resize( oid + 1 );
00162         }
00163         ghostFirstPortInfo[gEID][tid][oid] = spikePortCounter[gEID];
00164     } else {
00165         // pi.first_spike_output_port_id = INVALID_SPIKE_PORT_ID;
00166     }
00167     //cerr << "-- getGhostID node " << mpi_comm.Get_rank() << ": pi(" << gEID << "," << (int)tid << ")=" << pi << endl;
00168     spikePortCounter[gEID] += nsop;
00169 
00170     return oid;
00171 
00172 }
00173 
00174 
00175 // ---------------------------- adding objects ------------------------------- //
00176 
00177 void DistributedNetwork::_addObject_( const SimObjectFactory &objFactory, const SimEngine::ID &loc, SimObject::ID &id )
00178 {
00179     gl_engineid_t gEID = getGlobalEngineID( loc );
00180     // cerr << "gEID =" << gEID << endl;
00181     if( gEID != INVALID_GLOBAL_ENGINE_ID ) {
00182         id.node = loc.node;
00183         id.eng  = loc.engine;
00184         if( loc.node == mpi_comm.Get_rank() ) {
00185             localSimEngine->addObject( objFactory, id );
00186         } else {
00187             id.type = objFactory.getObjectTypeID();
00188             id.localid = getGhostID( gEID, objFactory );
00189         }
00190     } else {
00191         id = SimObject::ID::Invalid;
00192         throw( PCSIM::ConstructionException( "DistributedSingleThreadNetwork::addObject", "Invalid sim engine id " + loc.toString() + " specified." ) );
00193     }
00194 }
00195 
00196 void DistributedNetwork::_mount_( const SimObjectFactory &objFactory, const SimObject::ID &mountpoint, SimObject::ID &id )
00197 {
00198     SimEngine::ID loc( mountpoint.node, mountpoint.eng );
00199     gl_engineid_t gEID = getGlobalEngineID( loc );
00200     if( gEID != INVALID_GLOBAL_ENGINE_ID ) {
00201         id.node = loc.node;
00202         id.eng  = loc.engine;
00203         if( loc.node == mpi_comm.Get_rank() ) {
00204             localSimEngine->mount( objFactory, mountpoint, id );
00205         } else {
00206             id.type = objFactory.getObjectTypeID();
00207             id.localid = getGhostID( gEID, objFactory );
00208         }
00209     } else {
00210         id = SimObject::ID::Invalid;
00211         throw( PCSIM::ConstructionException( "DistributedSingleThreadNetwork::mount", "Invalid mountpoint id " + mountpoint.toString() + " specified." ) );
00212     }
00213 }
00214 
00215 void DistributedNetwork::_insert_( const SimObjectFactory &objFactory, const SimObject::ID &container, SimObject::ID &id )
00216 {
00217     SimEngine::ID loc( container.node, container.eng );
00218     gl_engineid_t gEID = getGlobalEngineID( loc );
00219     if( gEID != INVALID_GLOBAL_ENGINE_ID ) {
00220         id.node = loc.node;
00221         id.eng  = loc.engine;
00222         if( loc.node == mpi_comm.Get_rank() ) {
00223             localSimEngine->insert( objFactory, container, id );
00224         } else {
00225             id.type = objFactory.getObjectTypeID();
00226             id.localid = getGhostID( gEID, objFactory );
00227         }
00228     } else {
00229         id = SimObject::ID::Invalid;
00230         throw( PCSIM::ConstructionException( "DistributedSingleThreadNetwork::insert", "Invalid container id " + container.toString() + " specified." ) );
00231     }
00232 }
00233 
00234 // -------------------------------------- connections ------------------------------------------ //
00235 
00236 void DistributedNetwork::_connect_( SimObject::ID const& sender, port_t out, const SimObject::ID &receiver, port_t in, int delay )
00237 {
00238     int delay_to_use = 0 ;
00239     if( mpi_comm.Get_rank() == receiver.node ) {
00240         SimObject *rec_obj = localSimEngine->getObject(receiver);
00241         delay_to_use = delay < 0 ? (int)( rec_obj->getManagedDelay() / get_dt().in_sec() + 0.5 ) : delay ;
00242     }
00243 
00244     if( mpi_comm.Get_rank() == sender.node || mpi_comm.Get_rank() == receiver.node ) {
00245 
00246         SimObject::PortType sender_port_type = (theSimObjectRegistry->getObject( sender.type )).outputPortType( out );
00247         SimObject::PortType receiver_port_type = (theSimObjectRegistry->getObject( receiver.type )).inputPortType( in );
00248 
00249         if( sender_port_type == SimObject::spiking && receiver_port_type == SimObject::spiking ) {
00250             _addSpikeMessage_( sender, out, getFirstSenderSpikePort( sender ) + out, receiver, in, delay_to_use );
00251         } else if( sender_port_type == SimObject::analog && receiver_port_type == SimObject::analog ) {
00252             addAnalogMessage( sender, out, receiver, in, Time::steps( delay_to_use, get_dt() ) );
00253         } else  {
00254             throw(
00255                 PCSIM::ConstructionException(
00256                     "DistributedNetwork::_connect_",
00257                     str( boost::format("Can not connect specified source (%1%) and destination (%2%) object: no matching output (%3%) and input (%4%) ports.") % sender.toString() % receiver.toString() % out % in)
00258                 )
00259             );
00260         }
00261     }
00262 }
00263 
00264 // ------------------------------------- messages ----------------------------------------- //
00265 
00266 void DistributedNetwork::_addSpikeMessage_(const SimObject::ID &sender, const port_t out, const SimObject::ID &receiver, const port_t in, const Time &delay)
00267 {
00268     SimObject::PortType sender_port_type = (theSimObjectRegistry->getObject( sender.type )).outputPortType( out );
00269     SimObject::PortType receiver_port_type = (theSimObjectRegistry->getObject( receiver.type )).inputPortType( in );
00270     if( sender_port_type == SimObject::spiking && receiver_port_type == SimObject::spiking ) {
00271         _addSpikeMessage_( sender, out, getFirstSenderSpikePort(sender)+ out, receiver, in, delay.in_steps( get_dt() ) );
00272     } else {
00273         throw(
00274             PCSIM::ConstructionException(
00275                 "DistributedSingleThreadNetwork::_connect_",
00276                 str( boost::format("Can not add spike message from source (%1%) to destination (%2%) object: no matching out (%3%) and in (%4%) ports.") % sender.toString() % receiver.toString() % out % in)
00277             )
00278         );
00279     }
00280 }
00281 
00282 
00283 ostream& operator<<(ostream &s, const DistributedNetwork::ConnectInfo &ci)
00284 {
00285     return s << "("
00286            // << "md=" << (int)ci.managed_delay << ", "
00287            << "sp=" << (int)ci.first_spike_output_port_id  << ", "
00288            << "si=" << (int)ci.nSpikeInputPorts << ", "
00289            << "so=" << (int)ci.nSpikeOutputPorts << ", "
00290            << "ai=" << (int)ci.nAnalogInputPorts << ", "
00291            << "ao=" << (int)ci.nAnalogOutputPorts << ")";
00292 }
00293 
00294 spike_port_id_t DistributedNetwork::getFirstSenderSpikePort( const SimObject::ID &oid )
00295 {
00296     if ( oid.node == mpi_comm.Get_rank() ) {
00297         SimObject *src_obj = localSimEngine->getObject(oid);
00298         //ci.managed_delay = (int)( src_obj->getManagedDelay() / get_dt().in_sec() + 0.5 );
00299         //if (ci.managed_delay == 0) {
00300         //cerr << " get_dt().in_sec() = " << get_dt().in_sec() << " getManagedDelay = " << src_obj->getManagedDelay() << endl;
00301         //}
00302 
00303         SpikeSender *ss = dynamic_cast<SpikeSender*>( src_obj );
00304         if( ss != NULL && ss->getSpikePort( 0 ) != NULL ) {
00305             return ss->getSpikePort( 0 )->ID();
00306         } else {
00307             return INVALID_SPIKE_PORT_ID;
00308         }
00309     } else {
00310         SimEngine::ID loc( oid.node, oid.eng );
00311         gl_engineid_t gEID = getGlobalEngineID( loc );
00312         return ghostFirstPortInfo[gEID][oid.type][oid.localid];
00313     }
00314 }

Generated on Wed Jul 9 16:34:37 2008 for PCSIM by  doxygen 1.5.5