DistributedSingleThreadNetwork.cpp

Go to the documentation of this file.
00001 #include "DistributedSingleThreadNetwork.h"
00002 #include "MPIAllToAllCommunicatorFactory.h"
00003 
00004 #include "SingleThreadNetwork.h"
00005 #include "SpikeSender.h"
00006 
00007 #include <boost/format.hpp>
00008 
00009 #include <iostream>
00010 #include <string>
00011 
00012 using std::string;
00013 
00014 using std::cout;
00015 using std::cerr;
00016 using std::endl;
00017 
00018 
00019 
00020 
00021 // ------------------------------ constructor  / destructor --------------------------------------- //
00022 
00023 DistributedSingleThreadNetwork::DistributedSingleThreadNetwork( SimParameter sp )
00024         : DistributedNetwork(1, MPI::COMM_WORLD, sp )
00025 {
00026     init();
00027 }
00028 
00029 DistributedSingleThreadNetwork::DistributedSingleThreadNetwork( MPI::Intracomm &mpiCommunicator, SimParameter sp )
00030         : DistributedNetwork(1, mpiCommunicator, sp )
00031 {
00032     init();
00033 }
00034 
00035 void DistributedSingleThreadNetwork::init()
00036 {
00037 
00038     spikeBuffer = new PropagatedSpikeBuffer(simParam.minDelay.in_steps( get_dt() ), simParam.maxDelay.in_steps( get_dt() ) );
00039     localDelayMap = new LocalDelayMap;
00040 
00041     stgPool = new SpikeTargetGroupPool;
00042 
00043     localSpikeScheduler = new SingleThreadSpikeScheduler(*localDelayMap, *stgPool, *spikeBuffer, simParam) ;
00044 
00045     targetNodesMap = new TargetNodesMap();
00046     globalDelayMap = new GlobalSingleTargetDelayMap();
00047 
00048     mpiOutBuffers = new MPIOutputBufferVector(mpi_comm.Get_size(), glengineids[mpi_comm.Get_rank()]);
00049 
00050     mpiInBuffers = new MPIInputBufferVector(glengineids, mpi_comm.Get_size());
00051 
00052     spikeScheduler = new DistributedSpikeScheduler(*localSpikeScheduler,
00053                      *targetNodesMap,
00054                      *mpiOutBuffers);
00055 
00056     localAnalogMsgDispatcher = new SingleThreadAnalogMsgDispatcher;
00057 
00058     // create incoming and outgoing dispatchers
00059     for (int i = 0; i < mpi_comm.Get_size(); ++i) {
00060         sources2BufPositionsMaps.push_back(new DistIncomingAnalogSources2BufPosMap);
00061         incomingDispatchers.push_back(new DistributedIncomingAnalogMsgDispatcher(
00062                                           (delay_t)simParam.minDelay.in_steps( get_dt() ),
00063                                           &(*mpiInBuffers)[i],
00064                                           sources2BufPositionsMaps[i]));
00065 
00066         outgoingDispatchers.push_back(new DistributedOutgoingAnalogMsgDispatcher(
00067                                           0,
00068                                           (delay_t)simParam.minDelay.in_steps( get_dt() ),
00069                                           &(*mpiOutBuffers)[i]));
00070     }
00071 
00072     distAnalogMsgDispatcher = new DistributedAnalogMessageDispatcher(mpi_comm.Get_size(),
00073                               simParam.minDelay.in_steps( get_dt() ),
00074                               incomingDispatchers,
00075                               outgoingDispatchers,
00076                               *localAnalogMsgDispatcher);
00077 
00078 
00079 
00080     SingleThreadSimEngine *ste = new SingleThreadSimEngine(0, *spikeScheduler, *distAnalogMsgDispatcher, *this );
00081     localSimEngine = ste;
00082 
00083     // setEnginePtr( ste );
00084 
00085     delayObjectMap = new AnalogDelayObjectMap;
00086 
00087     localAnalogMsgCreator = new STAnalogMessageCreator(
00088                                 *static_cast<SingleThreadAnalogMsgDispatcher *>(localAnalogMsgDispatcher),
00089                                 *ste,
00090                                 *delayObjectMap,
00091                                 (delay_t)simParam.minDelay.in_steps( get_dt() )) ;
00092 
00093 
00094     distAnalogMsgCreator = new DistributedAnalogMessageCreator(
00095                                *ste,
00096                                *distAnalogMsgDispatcher,
00097                                (delay_t)simParam.minDelay.in_steps( get_dt() ));
00098 
00099     distIncomingSpikeScheduler = new STDistributedIncomingSpikeScheduler(*mpiInBuffers,
00100                                  *globalDelayMap,
00101                                  *spikeBuffer,
00102                                  (delay_t)simParam.minDelay.in_steps( get_dt() ));
00103 
00104 
00105     MPIAllToAllCommunicatorFactory mpi_comm_factory(MPIAllToAllCommunicatorFactory::Default);
00106 
00107     mpiAllToAllComm = mpi_comm_factory.getCommunicator(*mpiInBuffers,
00108                       *mpiOutBuffers,
00109                       mpi_comm,
00110                       incomingConnections,
00111                       outgoingConnections);
00112 
00113     distEngine = new DistributedSimEngine(*localSimEngine,
00114                                           *distIncomingSpikeScheduler,
00115                                           *spikeScheduler,
00116                                           *distAnalogMsgDispatcher,
00117                                           *mpiAllToAllComm,
00118                                           *this );
00119 
00120     setupConstructRNGEngines();
00121     DistributedNetwork::seed_noise_rng( makeSeed( simParam.simulationRNGSeed ) );
00122 
00123     // Make sure that all net works are trhough the init befor anything else can happen
00124     mpi_comm.Barrier();
00125 
00126 }
00127 
00128 DistributedSingleThreadNetwork::~DistributedSingleThreadNetwork()
00129 {
00130     delete distEngine;
00131     delete mpiAllToAllComm;
00132     delete distIncomingSpikeScheduler;
00133     delete localAnalogMsgDispatcher;
00134     delete localSimEngine;
00135     delete spikeScheduler;
00136     delete mpiInBuffers;
00137     delete mpiOutBuffers;
00138     delete globalDelayMap;
00139     delete targetNodesMap;
00140     delete localSpikeScheduler;
00141     delete stgPool;
00142     delete localDelayMap;
00143     delete spikeBuffer;
00144     delete localAnalogMsgCreator;
00145     for (int i = 0; i < mpi_comm.Get_size() ; ++i) {
00146         delete incomingDispatchers[i];
00147         delete outgoingDispatchers[i];
00148         delete sources2BufPositionsMaps[i];
00149     }
00150     delete distAnalogMsgDispatcher;
00151     delete distAnalogMsgCreator;
00152     delete delayObjectMap;
00153 }
00154 
00155 
00156 SimObject * DistributedSingleThreadNetwork::_getObject_(const SimObject::ID &id)
00157 {
00158     if (id.node == mpi_comm.Get_rank()) {
00159         SimObject *p = localSimEngine->getObject(id);
00160         return p;
00161     }
00162     return NULL;
00163 }
00164 
00165 
00166 void DistributedSingleThreadNetwork::_addSpikeMessage_(const SimObject::ID &sender, const port_t, const spike_port_id_t sender_port, const SimObject::ID &receiver, const port_t port, const step_t delay)
00167 {
00168 
00169     if (sender.node != receiver.node) {
00170         if (mpi_comm.Get_rank() == sender.node) {
00171             outgoingConnections[receiver.node] = true;
00172         } else if (mpi_comm.Get_rank() == receiver.node) {
00173             incomingConnections[sender.node] = true;
00174         }
00175     }
00176 
00177     if (sender.node == mpi_comm.Get_rank() && receiver.node == mpi_comm.Get_rank()) {
00178 
00179         if ( delay > simParam.maxDelay.in_steps( get_dt() ) ) {
00180             throw(
00181                 PCSIM::ConstructionException(
00182                     "DistributedSingleThreadNetwork::addSpikeMessage",
00183                     str( boost::format("Specified delay (%1%) out of range (min=%2% ms, max=%3% ms).") % (delay * simParam.dt.in_ms()) % simParam.minDelay.in_ms() % simParam.maxDelay.in_ms() )
00184                 )
00185             );
00186         }
00187 
00188         SingleThreadNetwork::addLocalSpikeMessage( localDelayMap, stgPool, localSimEngine, sender_port, receiver, port, delay );
00189 
00190     } else if (sender.node == mpi_comm.Get_rank()) {
00191         targetNodesMap->addTargetNode(sender_port, receiver.node );
00192         _nSpikeMessages++;
00193     } else if (receiver.node == mpi_comm.Get_rank()) {
00194 
00195         if( delay < simParam.minDelay.in_steps( get_dt() )  || delay > simParam.maxDelay.in_steps( get_dt() ) ) {
00196             throw(
00197                 PCSIM::ConstructionException(
00198                     "DistributedSingleThreadNetwork::addSpikeMessage",
00199                     str( boost::format("Specified delay (%1%) out of range (min=%2% ms, max=%3% ms).") % (delay * simParam.dt.in_ms()) % simParam.minDelay.in_ms() % simParam.maxDelay.in_ms() )
00200                 )
00201             );
00202         }
00203 
00204         spikegroupid_t tg = globalDelayMap->find(glengineids[sender.node][sender.eng], sender_port, (delaystep_t)delay);
00205 
00206         if( tg == no_spikegroup ) {
00207             tg = stgPool->addSpikeTarget( localSimEngine->getObject(receiver), port );
00208             globalDelayMap->insert( glengineids[sender.node][sender.eng], sender_port, (delaystep_t)delay, tg );
00209         } else {
00210             stgPool->addSpikeTarget( tg, localSimEngine->getObject(receiver), port);
00211         }
00212         _nSpikeMessages++;
00213     }
00214 
00215 }
00216 
00217 // ------------------------------------ analog messages -------------------------------------- //
00218 template<typename analogSrcType, typename analogDestType>
00219 inline void DistributedSingleThreadNetwork::addGenericAnalogMessage(const SimObject::ID &sender, analogSrcType srcPortOrField, const SimObject::ID &receiver, analogDestType destPortOrField, delay_t delay)
00220 {
00221 
00222     if (sender.node != receiver.node) {
00223         if (mpi_comm.Get_rank() == sender.node) {
00224             outgoingConnections[receiver.node] = true;
00225         } else if (mpi_comm.Get_rank() == receiver.node) {
00226             incomingConnections[sender.node] = true;
00227         }
00228     }
00229 
00230     if (sender.node == mpi_comm.Get_rank() && receiver.node == mpi_comm.Get_rank()) {
00231 
00232         localAnalogMsgCreator->addAnalogMessage( sender, srcPortOrField, receiver, destPortOrField, delay );
00233 
00234     } else if (sender.node == mpi_comm.Get_rank()) {
00235         distAnalogMsgCreator->addOutgoingAnalogMessage( sender, srcPortOrField, receiver.node);
00236 
00237     } else if (receiver.node == mpi_comm.Get_rank()) {
00238         if ( delay < simParam.minDelay.in_steps( get_dt() ) || delay > simParam.maxDelay.in_steps( get_dt() ) ) {
00239             throw(
00240                 PCSIM::ConstructionException(
00241                     "DistributedSingleThreadNetwork::addAnalogMessage",
00242                     str( boost::format("Specified delay (%1%) out of range (min=%2% ms, max=%3% ms).") % (delay * simParam.dt.in_ms()) % simParam.minDelay.in_ms() % simParam.maxDelay.in_ms() )
00243                 )
00244             );
00245         }
00246         distAnalogMsgCreator->addIncomingAnalogMessage( sender, srcPortOrField, receiver, destPortOrField, delay);
00247     }
00248 }
00249 
00250 void DistributedSingleThreadNetwork::_addAnalogMessage_(const SimObject::ID &sender, int sender_port, const SimObject::ID &receiver, int recv_port, const Time &delay)
00251 {
00252     addGenericAnalogMessage(sender, (analog_port_id_t)sender_port, receiver, (analog_port_id_t)recv_port, (delay_t)delay.in_steps( get_dt() ));
00253 }
00254 
00255 void DistributedSingleThreadNetwork::_addAnalogMessage_(const SimObject::ID &sender, int sender_port, const SimObject::ID &receiver, string destfield, const Time &delay)
00256 {
00257     addGenericAnalogMessage(sender, (analog_port_id_t)sender_port, receiver, destfield, (delay_t)delay.in_steps( get_dt() ));
00258 }
00259 
00260 void DistributedSingleThreadNetwork::_addAnalogMessage_(const SimObject::ID &sender, string srcfield, const SimObject::ID &receiver, int recv_port, const Time &delay)
00261 {
00262     addGenericAnalogMessage(sender, srcfield, receiver, (analog_port_id_t)recv_port, (delay_t)delay.in_steps( get_dt() ) );
00263 }
00264 
00265 void DistributedSingleThreadNetwork::_addAnalogMessage_(const SimObject::ID &sender, string srcfield, const SimObject::ID &receiver, string destfield, const Time &delay)
00266 {
00267     addGenericAnalogMessage(sender, srcfield, receiver, destfield, (delay_t)delay.in_steps( get_dt() ) );
00268 }
00269 
00270 
00271 // -------------------------- running the simulation messages -------------------------------- //
00272 
00273 void DistributedSingleThreadNetwork::_initialize_()
00274 {
00275     if (!initialized) {
00276         localSimEngine->initialize();
00277         mpiInBuffers->initialize(simParam.minDelay.in_steps( get_dt() ), 0, MPIBUFFER_BLOCK_SIZE);
00278         mpiOutBuffers->initialize(simParam.minDelay.in_steps( get_dt() ), 0, MPIBUFFER_BLOCK_SIZE);
00279         distAnalogMsgDispatcher->initialize();
00280         initialized = true;
00281     }
00282 }
00283 
00284 void DistributedSingleThreadNetwork::_reset_()
00285 {
00286     if ( ! initialized )
00287         initialize();
00288     spikeScheduler->reset();
00289     distEngine->reset();
00290     distAnalogMsgDispatcher->reset(get_dt().in_sec());
00291     reseted = true;
00292 }
00293 
00294 
00295 void DistributedSingleThreadNetwork::_advance_( int nSteps )
00296 {
00297     if( ! reseted )
00298         reset();
00299     distEngine->advance( nSteps );
00300 }
00301 

Generated on Wed Jul 9 16:34:37 2008 for PCSIM by  doxygen 1.5.5