MPIOutputBuffer.cpp

Go to the documentation of this file.
00001 #include "MPIOutputBuffer.h"
00002 
00003 #include <cassert>
00004 
00005 #include <cstring>
00006 
00007 
00008 
00009 MPIOutputBuffer::MPIOutputBuffer(vector<gl_engineid_t> &engIDs)
00010         : initialized(false), mpiOutputSpikeBuffer(engIDs), slicer(MPIBufferSlicer::tpOutputBuffer)
00011 {
00012     analogMsgCounters.resize(engIDs.size(), 0);
00013 }
00014 
00015 
00016 MPIOutputBuffer::~MPIOutputBuffer()
00017 {
00018         if (initialized)
00019         if (slicer.thereIsMixedDataType)
00020                 mixedMPIDataType.Free();
00021 }
00022 
00023 void MPIOutputBuffer::startNewMPIExchange()
00024 {
00025     slicer.reset();
00026 }
00027 
00028 void MPIOutputBuffer::initialize(MPIMessageSpec msgSpec, size_t spikeBufferSize, size_t maxMPIMsgSize,
00029                                  void *baseBufferPtr, void *analogBuffer, void *spikeBuffer)
00030 {
00031     this->baseBufferPtr = baseBufferPtr;
00032     currentMsgInfo = msgSpec;
00033     analog_buf = analogBuffer;
00034     spike_buf = spikeBuffer;
00035 
00036     slicer.initialize((char *)spikeBuffer - (char *)analogBuffer,
00037                       maxMPIMsgSize, spikeBufferSize);
00038 
00039     // prepare mixed
00040     size_t analog_buffer_size_elements = (double *)spikeBuffer - (double *)analogBuffer;
00041 
00042     spike_buffer_size_elements = spikeBufferSize / sizeof(MPIOutputSpikeBuffer<>::coding_element_type);
00043 
00044     mpiOutputSpikeBuffer.initialize((MPIOutputSpikeBuffer<>::coding_element_type *)spikeBuffer, spike_buffer_size_elements);
00045 
00046     slicer.initialize(analog_buffer_size_elements,
00047                       maxMPIMsgSize, spikeBufferSize);
00048 
00049     analogMPIDatatype = MPI::DOUBLE;
00050     if (sizeof(MPIOutputSpikeBuffer<>::coding_element_type) == 4)
00051         spikingMPIDatatype = MPI::LONG;
00052     else
00053         spikingMPIDatatype = MPI::SHORT;
00054 
00055     // initialize the mixed mpi data type
00056     if (slicer.thereIsMixedDataType) {
00057         // set the  data types comprising the mixed data type        
00058         mixed_data_types[0] = analogMPIDatatype;
00059         mixed_data_types[1] = spikingMPIDatatype;
00060         // set the mixed counts and displacements
00061         if (maxMPIMsgSize == 0)
00062             mixedCounts[0] = analog_buffer_size_elements;
00063         else
00064             mixedCounts[0] = analog_buffer_size_elements % (maxMPIMsgSize / sizeof(double));
00065         mixedCounts[1] = spikeBufferSize / sizeof(MPIOutputSpikeBuffer<>::coding_element_type);
00066 
00067         mixedDisplacements[0] = 0;
00068         mixedDisplacements[1] = mixedCounts[0] * sizeof(double);
00069         
00070         // initialize mixed data type to some dummy initial value, so that free will work afterwards
00071         mixedMPIDataType = MPI::Datatype::Create_struct(2, mixedCounts, mixedDisplacements, mixed_data_types);
00072     }
00073 
00074     initialized = true;
00075 }
00076 
00077 bool MPIOutputBuffer::hasNextBufferSlice()
00078 {
00079     if (slicer.currentSliceType == MPIBufferSlicer::sliceAnalog ||
00080             slicer.currentSliceType == MPIBufferSlicer::sliceUndefined ) {
00081         if (totalAnalogMsgCounter)
00082             return true;
00083         else {     
00084             return mpiOutputSpikeBuffer.hasNextSerializedMPIBuffer();
00085         }
00086     }    
00087     return mpiOutputSpikeBuffer.hasNextSerializedMPIBuffer();
00088 }
00089 
00090 MPIMessageSpec & MPIOutputBuffer::prepareNextBufferSlice()
00091 {
00092     currentMsgInfo.hasContent = hasNextBufferSlice();
00093 
00094     slicer.calcNextBufferSliceDimensions();
00095 
00096     // setup currentMsgInfo
00097     switch (slicer.currentSliceType) {
00098     case MPIBufferSlicer::sliceAnalog :
00099         *currentMsgInfo.displacement = ((char *)analog_buf + slicer.currentSlicePos) - (char *)baseBufferPtr;
00100         *currentMsgInfo.buffer = (void *)((char *)analog_buf + slicer.currentSlicePos);
00101         *currentMsgInfo.count = slicer.currentAnalogSliceSize / sizeof(double);
00102         *currentMsgInfo.datatype = analogMPIDatatype;
00103         currentMsgInfo.content_type = MPIMessageSpec::contentAnalog;
00104         break;
00105     case MPIBufferSlicer::sliceMixed :
00106         mixedCounts[1] = mpiOutputSpikeBuffer.prepareNextSerializedMPIBuffer(slicer.allowedSpikeSliceSize/sizeof(MPIOutputSpikeBuffer<>::coding_element_type));
00107         // free previous mixed data type before creating new one
00108         mixedMPIDataType.Free();
00109         mixedMPIDataType = MPI::Datatype::Create_struct(2, mixedCounts, mixedDisplacements, mixed_data_types);
00110         mixedMPIDataType.Commit();
00111         *currentMsgInfo.displacement = (char *)( (double *)spike_buf - mixedCounts[0] ) - (char *)baseBufferPtr;
00112         *currentMsgInfo.buffer = (void *)( (double *)spike_buf - mixedCounts[0] ) ;
00113         *currentMsgInfo.datatype = mixedMPIDataType;
00114         *currentMsgInfo.count = 1;
00115         currentMsgInfo.content_type = MPIMessageSpec::contentMixed;        
00116         break;
00117     case MPIBufferSlicer::sliceSpiking :        
00118         *currentMsgInfo.displacement = (char *)spike_buf - (char *)baseBufferPtr;
00119         *currentMsgInfo.buffer = spike_buf;
00120         *currentMsgInfo.count = mpiOutputSpikeBuffer.prepareNextSerializedMPIBuffer(slicer.allowedSpikeSliceSize/sizeof(MPIOutputSpikeBuffer<>::coding_element_type));;
00121         *currentMsgInfo.datatype = spikingMPIDatatype;
00122         currentMsgInfo.content_type = MPIMessageSpec::contentSpiking;
00123         break;
00124     case MPIBufferSlicer::sliceUndefined :
00125         assert( 0 );
00126     }   
00127     return currentMsgInfo;
00128 }
00129 
00130 MPIMessageSpec& MPIOutputBuffer::getCurrentMPIMsgSpecHolder()
00131 {
00132     return currentMsgInfo;
00133 }
00134 
00135 void *MPIOutputBuffer::getBuffer()
00136 {
00137     return (void *)analog_buf;
00138 }
00139 
00140 void MPIOutputBuffer::setFinishedFlag(bool finished)
00141 {
00142     mpiOutputSpikeBuffer.setFinishedFlag(finished);
00143 }
00144 
00145 void MPIOutputBuffer::nextCycle()
00146 {
00147     mpiOutputSpikeBuffer.nextCycle();
00148 }
00149 
00150 MPIOutputSpikeBuffer<> & MPIOutputBuffer::mpiOutputSpikingBuffer()
00151 {
00152     return mpiOutputSpikeBuffer;
00153 }
00154 
00155 
00156 MPIOutputBufferVector::MPIOutputBufferVector(int numBuffers, vector<gl_engineid_t> &engIDs)
00157         : initialized(false), nNodes(numBuffers), mpiExchBlocksInfo(numBuffers)
00158 {
00159     _buffers.resize(nNodes, MPIOutputBuffer(engIDs));
00160 }
00161 
00162 MPIOutputBufferVector::~MPIOutputBufferVector()
00163 {
00164         if (initialized)
00165         delete [] memoryPool;
00166 }
00167 
00168 
00169 void MPIOutputBufferVector::initialize(int minDelay, size_t maxMPIMessageSize,
00170                                        size_t spikeBufferSize)
00171 {
00172     // Calculate the size of the memory pool
00173     spike_buffer_size = spikeBufferSize; // in bytes
00174     int pool_size = 0;
00175     for (int i = 0; i < nNodes; ++i) {
00176         _buffers[i].calculateTotalAnalogMsgCounter();
00177         // for each buffer allocate memory for the analog messages and one slice for the spiking messages
00178         pool_size += _buffers[i].totalAnalogMsgCounter * sizeof(double) * minDelay + spikeBufferSize;
00179     }
00180 
00181     memoryPool = new char[pool_size];
00182     
00183     memset(memoryPool, 0, pool_size);
00184 
00185     //  Setup buffer segment pointers for the individual input buffers
00186     void * analogBufPtr = (void *)memoryPool;
00187     for (int i = 0; i < nNodes; ++i) {
00188         void *spikeBufPtr = (void *)((char *)analogBufPtr + _buffers[i].totalAnalogMsgCounter * sizeof(double) * minDelay);
00189         _buffers[i].initialize(mpiExchBlocksInfo.getMsgSpec(i), spike_buffer_size, maxMPIMessageSize,
00190                                memoryPool, analogBufPtr, spikeBufPtr);
00191         analogBufPtr = (char *)analogBufPtr + _buffers[i].totalAnalogMsgCounter * sizeof(double) * minDelay + spike_buffer_size;
00192     }
00193 
00194     nextCycle();
00195     
00196     initialized = true;
00197 }
00198 
00199 void MPIOutputBufferVector::startNewMPIExchange()
00200 {
00201     vector<MPIOutputBuffer>::iterator buf_it;
00202     for (buf_it = _buffers.begin() ; buf_it != _buffers.end() ; ++buf_it)
00203         buf_it->startNewMPIExchange();
00204 
00205 }
00206 
00207 void MPIOutputBufferVector::prepareNextBufferSlices()
00208 {
00209     vector<MPIOutputBuffer>::iterator buf_it;
00210     for (buf_it = _buffers.begin() ; buf_it != _buffers.end() ; ++buf_it)
00211         buf_it->prepareNextBufferSlice();
00212 }
00213 
00214 
00215 MPIExchangeBlocksInfo & MPIOutputBufferVector::getMPIExchangeBlocksInfo()
00216 {
00217         return mpiExchBlocksInfo;       
00218 }

Generated on Wed Jul 9 16:34:38 2008 for PCSIM by  doxygen 1.5.5