MultiThreadNetwork.cpp

Go to the documentation of this file.
00001 #include "MultiThreadNetwork.h"
00002 #include "SpikeSender.h"
00003 #include "SingleThreadNetwork.h"
00004 #include "PCSIMException.h"
00005 
00006 #include <mpi.h>
00007 
00008 #include <string>
00009 using std::string;
00010 
00011 #include <boost/format.hpp>
00012 
00013 // ------------------------------ constructor  / destructor --------------------------------------- //
00014 
00015 MultiThreadNetwork::MultiThreadNetwork(int nT, SimParameter sp ) :
00016 //        SimNetwork( MPI::COMM_WORLD, sp, localRoundRobin ), nThreads(nT), thrPool( nThreads-1 )
00017         SimNetwork( MPI::COMM_WORLD, sp, &localRoundRobin ), nThreads(nT), thrPool( nThreads-1 )
00018 {
00019     init();
00020 }
00021 
00022 MultiThreadNetwork::MultiThreadNetwork(int nT, MPI::Intracomm &comm, SimParameter sp ) :
00023 //        SimNetwork( comm, sp, localRoundRobin ), nThreads(nT), thrPool( nThreads-1 )
00024         SimNetwork( comm, sp, &localRoundRobin ), nThreads(nT), thrPool( nThreads-1 )
00025 {
00026     init();
00027 }
00028 
00029 void MultiThreadNetwork::init()
00030 {
00031         if (simParam.minDelay.in_sec() < simParam.dt.in_sec())
00032                 throw(
00033                     PCSIM::ConstructionException("MultiThreadNetwork::init",
00034                 str(boost::format("minDelay (%1%) smaller than dt (%2%)") 
00035                     % simParam.minDelay.in_sec() % simParam.dt.in_sec() )
00036             ) 
00037         );
00038 
00039     tables = new MTSpikeRoutingTables(nThreads);
00040 
00041     for (int i = 0; i < nThreads; ++i) {
00042         PropagatedSpikeBuffer *buf = new PropagatedSpikeBuffer(simParam.minDelay.in_steps( simParam.dt ),
00043                                      simParam.maxDelay.in_steps( simParam.dt ));
00044         STBuffers.push_back(buf);
00045     }
00046 
00047     spikeScheduler = new MultiThreadSpikeScheduler(nThreads, *tables, STBuffers, simParam );
00048 
00049     analogDelayObjectsMap = new AnalogDelayObjectMap;
00050 
00051     for (int i = 0 ; i < nThreads; ++i)
00052         analogMsgDispatchers.push_back(new MultiThreadAnalogMsgDispatcher( (delay_t)simParam.minDelay.in_steps( simParam.dt) ) );
00053 
00054 
00055 
00056     AnalogMessageDispatcherVectorImpl<MultiThreadAnalogMsgDispatcher>
00057     dispatchersVector(analogMsgDispatchers);
00058 
00059     simEngine = new MultiThreadSimEngine(0, nThreads, thrPool, *spikeScheduler,
00060                                          dispatchersVector,
00061                                          *this );
00062 
00063     // ----- create the analog message creators ----------//
00064     for (int i = 0; i < nThreads ; ++i) {
00065         stAnalogMsgCreators.push_back(new STAnalogMessageCreator(analogMsgDispatchers[i]->sTLocalDispatcher(),
00066                                       simEngine->getSTEngine(i), *analogDelayObjectsMap,
00067                                       (delay_t)simParam.minDelay.in_steps( simParam.dt) ) );
00068     }
00069 
00070     mtAnalogMsgCreator = new MTAnalogMessageCreator(analogMsgDispatchers, *simEngine, *analogDelayObjectsMap,
00071                          (delay_t)simParam.minDelay.in_steps( simParam.dt));
00072 
00073     initialized = false;
00074 
00075     setupConstructRNGEngines();
00076     MultiThreadNetwork::seed_noise_rng( makeSeed( simParam.simulationRNGSeed ) );
00077 
00078 }
00079 
00080 MultiThreadNetwork::~MultiThreadNetwork()
00081 {
00082     delete simEngine;
00083     delete spikeScheduler;
00084     delete tables;
00085     delete analogDelayObjectsMap;
00086     for (int i = 0; i < nThreads; ++i) {
00087         delete STBuffers[i];
00088         delete stAnalogMsgCreators[i];
00089         delete analogMsgDispatchers[i];
00090     }
00091     delete mtAnalogMsgCreator;
00092 }
00093 
00094 void MultiThreadNetwork::seed_noise_rng( uint32 noiseRNGseed )
00095 {
00096     vector<uint32> sim_seeds(nThreads);
00097     fillSeedVector( noiseRNGseed, sim_seeds );
00098     simEngine->seed( sim_seeds );
00099     objectVariationRNDEngine->seed( makeSeed( simParam.constructionRNGSeed ) ) ;
00100 }
00101 
00102 // ------------------------- adding / getting objects ----------------------- //
00103 
00104 void MultiThreadNetwork::_addObject_( const SimObjectFactory &objFactory, const SimEngine::ID &loc, SimObject::ID &id )
00105 {
00106     if( loc.node == _mpi_rank && loc.engine < this->nThreads ) {
00107         id.node = _mpi_rank;
00108         id.eng  = loc.engine;
00109         simEngine->addObject( objFactory, id );
00110     } else {
00111         id = SimObject::ID::Invalid;
00112         if( loc.node != _mpi_rank )
00113             throw(
00114                 PCSIM::ConstructionException(
00115                     "MultiThreadNetwork::addObject",
00116                     str( boost::format("Specified node (%1%) not equal to mpi rank (%2%)" ) % loc.node % _mpi_rank )
00117                 )
00118             );
00119         else
00120             throw(
00121                 PCSIM::ConstructionException(
00122                     "MultiThreadNetwork::addObject",
00123                     str( boost::format("Specified engine (%1%) out of range (0,%2%)" ) % 0 % this->nThreads ) 
00124                 )
00125             );
00126     }
00127 }
00128 
00129 void MultiThreadNetwork::_addObject_( const SimObjectFactory &objFactory, SimObject::ID &id )
00130 {
00131 //    SimEngine::ID loc = distributionStrategy( this );
00132     SimEngine::ID loc = (*distributionStrategy)( this );
00133     return addObject( objFactory, loc, id );
00134 }
00135 
00136 void MultiThreadNetwork::_mount_( const SimObjectFactory &objFactory, const SimObject::ID &mountpoint, SimObject::ID &id )
00137 {
00138     if( mountpoint.node == _mpi_rank && mountpoint.eng < this->nThreads ) {
00139         id.node = _mpi_rank;
00140         id.eng  = mountpoint.eng;
00141         simEngine->mount( objFactory, mountpoint, id );
00142     } else {
00143         id = SimObject::ID::Invalid;
00144         if( mountpoint.node != _mpi_rank )
00145             throw(
00146                 PCSIM::ConstructionException(
00147                     "MultiThreadNetwork::mount",
00148                     str( boost::format("Specified node (%1%) not equal to mpi rank (%2%)" ) % mountpoint.node % _mpi_rank )
00149                 )
00150             );
00151         else
00152             throw(
00153                 PCSIM::ConstructionException(
00154                     "MultiThreadNetwork::mount",
00155                     str( boost::format("Specified engine (%1%) out of range (0,%2%)" ) % 0 % this->nThreads )
00156                 )
00157             );
00158     }
00159 }
00160 
00161 void MultiThreadNetwork::_insert_( const SimObjectFactory &objFactory, const SimObject::ID &container, SimObject::ID &id )
00162 {
00163     if( container.node == _mpi_rank && container.eng < this->nThreads ) {
00164         id.node = _mpi_rank;
00165         id.eng  = container.eng;
00166         simEngine->insert( objFactory, container, id );
00167     } else {
00168         id = SimObject::ID::Invalid;
00169         if( container.node != _mpi_rank )
00170             throw(
00171                 PCSIM::ConstructionException(
00172                     "MultiThreadNetwork::mount",
00173                     str( boost::format("Specified node (%1%) not equal to mpi rank (%2%)") % container.node % _mpi_rank ) 
00174                 )
00175             );
00176         else
00177             throw(
00178                 PCSIM::ConstructionException(
00179                     "MultiThreadNetwork::mount",
00180                     str( boost::format("Specified engine (%1%) out of range (0,%2%)")  % 0 % this->nThreads ) 
00181                 )
00182             );
00183     }
00184 }
00185 
00186 
00187 SimObject * MultiThreadNetwork::_getObject_(const SimObject::ID &id)
00188 {
00189     return simEngine->getObject(id);
00190 }
00191 
00192 // -------------------------------------- connections ------------------------------------------ //
00193 
00194 void MultiThreadNetwork::_connect_( SimObject::ID const& src, port_t out, const SimObject::ID &dst, port_t in, int delay )
00195 {
00196     SimObject *src_obj = simEngine->getObject(src);
00197     SimObject *dst_obj = simEngine->getObject(dst);
00198 
00199     Time delay_to_use;
00200     if( delay < 0 ) {
00201         delay_to_use = Time::sec( dst_obj->getManagedDelay() );
00202     } else {
00203         delay_to_use = Time::steps( delay, get_dt() );
00204     }
00205 
00206     if( src_obj->outputPortType(out) == SimObject::spiking && dst_obj->inputPortType( in ) == SimObject::spiking ) {
00207         addSpikeMessage( src, out, dst, in, delay_to_use );
00208     } else if ( src_obj->outputPortType(out) == SimObject::analog && dst_obj->inputPortType( in ) == SimObject::analog ) {
00209         addAnalogMessage( src, out, dst, in, delay_to_use );
00210     } else {
00211         throw(
00212             PCSIM::ConstructionException(
00213                 "MultiThreadNetwork::_connect_",
00214                 str( boost::format("Can not connect specified source (%1%) and destination (%2%) object: no matching out (%3%) and in (%4%) ports.") % src.toString() % dst.toString() % out % in )
00215             )
00216         );
00217     }
00218 
00219 }
00220 
00221 // ---------------------------------- spike messages -------------------------------------- //
00222 
00223 void MultiThreadNetwork::_addSpikeMessage_(const SimObject::ID &sender, const port_t out, const SimObject::ID &receiver, const port_t in, const Time &delay)
00224 {               
00225     if (  ( sender.eng != receiver.eng && delay < simParam.minDelay) || delay > simParam.maxDelay ) {
00226         throw(
00227             PCSIM::ConstructionException(
00228                 "MultiThreadNetwork::addSpikeMessage",
00229                 str( boost::format("Specified delay (%1%) out of range (min=%2% ms, max=%3% ms)." ) % delay.in_ms() % simParam.minDelay.in_ms() % simParam.maxDelay.in_ms() ) 
00230             )
00231         );
00232     }
00233     SimObject *src_obj = simEngine->getObject(sender);
00234     SpikeSender *ss = dynamic_cast<SpikeSender*>( src_obj );
00235     if( ss == NULL ) {
00236         throw( PCSIM::ConstructionException( "MultiThreadNetwork::addSpikeMessage", 
00237                 str( boost::format("Specified simulation object (%1%, %2%) is not a spike sender.") % typeid(*src_obj).name() % sender.toString() ) ) ) ;
00238     }
00239     if( ss->getSpikePort( out ) != NULL ) {
00240         MultiThreadNetwork::addLocalSpikeMessage( tables, simEngine, simParam, sender, ss->getSpikePort( out )->ID(), receiver, in, delay.in_steps( get_dt() ) );
00241         _nSpikeMessages++;
00242     } else {
00243         throw( PCSIM::ConstructionException( "MultiThreadNetwork::addSpikeMessage", str( boost::format("Specified spike output port (%1%) does not exist") % out ) ) );
00244     }
00245 }
00246 
00247 void MultiThreadNetwork::addLocalSpikeMessage(
00248     MTSpikeRoutingTables *arg_tables, SimEngine *arg_simEngine, const  SimParameter &SP,
00249     const SimObject::ID &sender, spike_port_id_t sender_port, const SimObject::ID &receiver, const port_t in_port, step_t delay )
00250 {
00251     if ( sender.eng == receiver.eng ) {
00252         SingleThreadNetwork::addLocalSpikeMessage( &(arg_tables->localDelayMaps[sender.eng]), &(arg_tables->stgPool), arg_simEngine, sender_port, receiver, in_port, delay );
00253     } else {
00254         if( (step_t)delay < SP.minDelay.in_steps( SP.dt ) ) {
00255             throw( PCSIM::ConstructionException( "MultiThreadNetwork::addSpikeMessage", 
00256                         str( boost::format("Specified delay (%1%) smaller than minDelay (%2% ms).") % delay % SP.minDelay.in_ms() ) ) );
00257         }
00258         spikegroupid_t tg = arg_tables->mTDelayMap.find(sender.eng, sender_port, receiver.eng, (delaystep_t)delay);
00259         if( tg == no_spikegroup ) {
00260             tg = arg_tables->stgPool.addSpikeTarget( arg_simEngine->getObject( receiver ), in_port); // receiver, port
00261             arg_tables->mTDelayMap.insert( sender.eng, sender_port, receiver.eng, (delaystep_t)delay, tg ); // sender_type, sender, delay, targetgroup
00262         } else {
00263             arg_tables->stgPool.addSpikeTarget( tg, arg_simEngine->getObject( receiver ), in_port);
00264         }
00265 
00266     }
00267 }
00268 
00269 // ------------------------------------ analog messages -------------------------------------- //
00270 
00271 void MultiThreadNetwork::_addAnalogMessage_(const SimObject::ID &sender, int sender_port, const SimObject::ID &receiver, int recv_port, const Time &delay)
00272 {
00273     _nAnalogMessages++;
00274     addGenericAnalogMessage(sender, sender_port, receiver, recv_port, (delay_t)delay.in_steps( get_dt() ) );
00275 }
00276 
00277 void MultiThreadNetwork::_addAnalogMessage_(const SimObject::ID &sender, int sender_port, const SimObject::ID &receiver, string destfield, const Time &delay)
00278 {
00279     _nAnalogMessages++;
00280     addGenericAnalogMessage(sender, sender_port, receiver, destfield, (delay_t)delay.in_steps( get_dt() ) );
00281 }
00282 
00283 void MultiThreadNetwork::_addAnalogMessage_(const SimObject::ID &sender, string srcfield, const SimObject::ID &receiver, int recv_port, const Time &delay)
00284 {
00285     _nAnalogMessages++;
00286     addGenericAnalogMessage(sender, srcfield, receiver, recv_port, (delay_t)delay.in_steps( get_dt() ) );
00287 }
00288 
00289 void MultiThreadNetwork::_addAnalogMessage_(const SimObject::ID &sender, string srcfield, const SimObject::ID &receiver, string destfield, const Time &delay)
00290 {
00291     _nAnalogMessages++;
00292     addGenericAnalogMessage(sender, srcfield, receiver, destfield, (delay_t)delay.in_steps( get_dt() ) );
00293 }
00294 
00295 // ------------------------------------------------------------------------------------------- //
00296 
00297 gl_engineid_t MultiThreadNetwork::maxLocalEngineID(void) const
00298 {
00299     return nThreads-1;
00300 }
00301 
00302 // -------------------------- running the simulation messages -------------------------------- //
00303 
00304 void MultiThreadNetwork::_initialize_()
00305 {
00306     if (!initialized) {
00307         simEngine->initialize();
00308         vector<MultiThreadAnalogMsgDispatcher *>::const_iterator it;
00309         for ( it = analogMsgDispatchers.begin(); it != analogMsgDispatchers.end(); ++it) {
00310             if (simParam.minDelay.in_steps( simParam.dt ) == 1)
00311                 (*it)->setMinDelay(0);
00312             else
00313                 (*it)->setMinDelay((delay_t)simParam.minDelay.in_steps(simParam.dt));
00314         }
00315         initialized = true;
00316     }
00317 }
00318 
00319 void MultiThreadNetwork::_reset_()
00320 {
00321     if( ! initialized )
00322         initialize();
00323     spikeScheduler->reset();
00324     simEngine->reset();
00325     vector<MultiThreadAnalogMsgDispatcher *>::const_iterator it;
00326     for ( it = analogMsgDispatchers.begin(); it != analogMsgDispatchers.end(); ++it)
00327         (*it)->reset(get_dt().in_sec());
00328     reseted = true;
00329 }
00330 
00331 void MultiThreadNetwork::_advance_( int nSteps )
00332 {
00333     if( ! reseted )
00334         reset();
00335     simEngine->advance( nSteps );
00336 }

Generated on Wed Jul 9 16:34:38 2008 for PCSIM by  doxygen 1.5.5