MPIOutputSpikeBuffer.h

Go to the documentation of this file.
00001 #ifndef MPIOutputSpikeBuffer_H_
00002 #define MPIOutputSpikeBuffer_H_
00003 
00004 #include "globaldefinitions.h"
00005 #include <vector>
00006 #include <algorithm>
00007 #include <cstddef>
00008 
00009 using std::vector;
00010 using std::copy;
00011 
00012 #include <iostream>
00013 using std::cout;
00014 using std::endl;
00015 
00016 
00017 template<typename T>
00018 class MPIOutputSpikeBufferVector;
00019 
00021 
00030 template<typename T = unsigned short>
00031 class MPIOutputSpikeBuffer
00032 {
00033 public:
00035 
00042     MPIOutputSpikeBuffer(vector<gl_engineid_t> &engIDs);
00043 
00044 
00046 
00049     typedef T coding_element_type ;
00050 
00052     T guard_value;
00053 
00054     virtual ~MPIOutputSpikeBuffer();
00055     
00056     void initialize(void * spikeBuffer, size_t spikeBufferSize);
00057 
00059     void appendSpike(local_objectid_t oid, int timestamp, engineid_t eng = 0);
00060 
00062     void nextCycle();
00063 
00065 
00068     int prepareNextSerializedMPIBuffer(size_t currentBufferSize);
00069     
00070     int prepareNextSerializedMPIBuffer()
00071     {
00072         return prepareNextSerializedMPIBuffer(buf_size);        
00073     };
00074 
00076     bool hasNextSerializedMPIBuffer();
00077 
00079     const T* const getBuffer();
00080 
00082     void setFinishedFlag(bool flag);
00083 
00084 protected:
00085     
00086 
00088     int nLocalEngines;
00089 
00091     size_t buf_size;
00092 
00094 
00098     vector<vector<T> > _buffers;
00099 
00101     vector<gl_engineid_t> *gl_engineids;
00102 
00104     T* serialBuffer;
00105 
00107 
00112     vector<int> timestamps;
00113 
00115     bool isInit;
00116 
00118     void initSerialization();
00119 
00121     gl_engineid_t s_curr_eng;
00122 
00124     int s_curr_timestamp;
00125 
00126     typedef typename vector<T>::const_iterator vector_iterator;
00127 
00129     vector_iterator s_curr_src_pos;
00130 
00132     vector_iterator s_src_end_pos;
00133 
00135     bool serializationFinished;
00136 
00137     friend class MPIOutputSpikeBufferVector<T>;
00138 
00139 };
00140 
00142 
00150 template<typename T = unsigned short>
00151 class MPIOutputSpikeBufferVector
00152 {
00153 public:
00154 
00163     MPIOutputSpikeBufferVector(int numBuffers, vector<gl_engineid_t> &engIDs,
00164                           size_t buffer_size = MPIBUFFER_BLOCK_SIZE);
00165 
00166     ~MPIOutputSpikeBufferVector();
00167 
00169     T *getBuffersPool()
00170     {
00171         return serialBuffersPool;
00172     }
00173 
00175     void nextCycle()
00176     {
00177         for (int i = 0; i < nNodes; ++i) {
00178             _outputbuffers[i]->nextCycle();
00179         }
00180     }
00181 
00183     void setFinishedFlag(bool flag)
00184     {
00185         for (int i = 0; i < nNodes; ++i) {
00186             _outputbuffers[i]->setFinishedFlag(flag);
00187         }
00188     }
00189 
00191     MPIOutputSpikeBuffer<T> & operator[] (int idx)
00192     {
00193         return *_outputbuffers[idx];
00194     }
00195 
00196 private:
00197 
00199     int nNodes;
00200 
00202     size_t buf_size;
00203 
00205     vector<MPIOutputSpikeBuffer<T>*> _outputbuffers;
00206 
00208 
00211     T *serialBuffersPool;
00212 };
00213 
00214 template<typename T>
00215 MPIOutputSpikeBuffer<T>::MPIOutputSpikeBuffer(vector<gl_engineid_t> &engIDs) :
00216                                                           gl_engineids(&engIDs)
00217 {
00218     guard_value = 0xFF;
00219     for (unsigned int i = 1; i < sizeof(T); ++i)
00220         guard_value = (guard_value << 8) + 0xFF;
00221 
00222     nLocalEngines = gl_engineids->size();
00223 
00224     _buffers.resize(nLocalEngines);
00225     timestamps.resize(nLocalEngines, int(-1));
00226     for (int j = 0 ; j < nLocalEngines ; ++j) {
00227         _buffers[j].push_back((*gl_engineids)[j]);
00228     }
00229 
00230     serializationFinished = false;
00231 }
00232 
00233 template<typename T>
00234 void MPIOutputSpikeBuffer<T>::initialize(void * spikeBuffer, 
00235                                                                         size_t spikeBufferSize)
00236 {
00237         serialBuffer = (T *)spikeBuffer;
00238         buf_size = spikeBufferSize;     
00239 }
00240 
00241 template<typename T>
00242 MPIOutputSpikeBuffer<T>::~MPIOutputSpikeBuffer()
00243 {}
00244 
00245 template<typename T>
00246 void MPIOutputSpikeBuffer<T>::appendSpike(local_objectid_t oid,
00247                                      int timestamp, engineid_t eng)
00248 {
00249     if (timestamps[eng] < timestamp) {
00250         timestamps[eng] = timestamp;
00251         _buffers[eng].push_back(guard_value);
00252         _buffers[eng].push_back(timestamp);
00253     }
00254     _buffers[eng].push_back(oid);
00255 }
00256 
00257 template<typename T>
00258 void MPIOutputSpikeBuffer<T>::nextCycle()
00259 {
00260     for (int i  = 0 ; i < nLocalEngines ; ++i) {
00261         _buffers[i].resize(1);
00262         timestamps[i] = -1;
00263     }
00264     isInit = false;
00265 }
00266 
00267 template<typename T>
00268 void MPIOutputSpikeBuffer<T>::initSerialization()
00269 {
00270 
00271     serializationFinished = false;
00272     engineid_t first_non_empty = 0;
00273     for (int j = 0 ; j < nLocalEngines; ++j) {
00274         _buffers[j].push_back(guard_value);
00275     }
00276     while (first_non_empty < nLocalEngines && _buffers[first_non_empty].size() == 2)
00277         first_non_empty++;
00278     if (first_non_empty == nLocalEngines) {
00279         serializationFinished = true;
00280         s_curr_timestamp = -1;
00281     } else {
00282         s_curr_eng = first_non_empty;
00283         s_curr_timestamp = -1;
00284         s_curr_src_pos = _buffers[first_non_empty].begin();
00285         s_src_end_pos = _buffers[first_non_empty].end();
00286     }
00287 }
00288 
00289 template<typename T>
00290 const T* const MPIOutputSpikeBuffer<T>::getBuffer()
00291 {
00292     return serialBuffer;
00293 }
00294 
00295 template<typename T>
00296 void MPIOutputSpikeBuffer<T>::setFinishedFlag(bool flag)
00297 {
00298     *(serialBuffer+1) = flag;
00299 }
00300 
00301 
00302 template<typename T>
00303 int MPIOutputSpikeBuffer<T>::prepareNextSerializedMPIBuffer(size_t currentBufferSize)
00304 {
00305     if (!isInit) {
00306         initSerialization();
00307         isInit = true;
00308     }
00309 
00310     T *dest_pos = serialBuffer;
00311     T *dest_end_pos = serialBuffer + currentBufferSize;
00312     bool withoutHeader = true;
00313 
00314     // jump two positions ahead, to leave space for the "finished" indicator
00315     // and the length of the buffer
00316     dest_pos+=2;
00317 
00318     if (serializationFinished) {
00319         int length = 2;
00320         *serialBuffer = length ;
00321         *(serialBuffer + 1) = (T)serializationFinished;
00322         return length;
00323     }
00324 
00325 
00326     if (s_curr_timestamp != -1) { // we are not at the beginning of a buffer, so fill up the header
00327         *(dest_pos++) = (*gl_engineids)[s_curr_eng];
00328         *(dest_pos++) = guard_value;
00329         *(dest_pos++) = s_curr_timestamp;
00330         withoutHeader = false;
00331     }
00332     // transfer whole engine-buffers in the serialBuffer while there is space
00333     while (!serializationFinished &&
00334             (s_src_end_pos - s_curr_src_pos) <= ((dest_end_pos - dest_pos)-1) ) {
00335         copy(s_curr_src_pos, s_src_end_pos, dest_pos);
00336         dest_pos += s_src_end_pos - s_curr_src_pos;
00337         // go to the next src buffer (skip the empty ones)
00338         s_curr_eng++;
00339         while (s_curr_eng < nLocalEngines && _buffers[s_curr_eng].size() == 2)
00340             s_curr_eng++;
00341 
00342         // check if we are finished. If we are not, then setup the pointers
00343         if (s_curr_eng == nLocalEngines) {
00344             serializationFinished = true;
00345         } else {
00346             s_curr_timestamp = -1;
00347             s_curr_src_pos = _buffers[s_curr_eng].begin();
00348             s_src_end_pos = _buffers[s_curr_eng].end();
00349         }
00350         *(dest_pos++) = guard_value;
00351         withoutHeader = true;
00352     }
00353 
00354     if (!serializationFinished) {
00355 
00356         // there is only space to copy a part of the current buffer, so try to copy as much as possible
00357         typename vector<T>::const_iterator last_possible_pos = s_curr_src_pos + ((dest_end_pos - dest_pos) -2);
00358         // adjust the last_possible_pos since we may be on a guard or a timestamp
00359         if (last_possible_pos - s_curr_src_pos >= 2) {
00360             if (*(last_possible_pos-2) == guard_value )
00361                 last_possible_pos-=2;
00362             else
00363                 if (*(last_possible_pos-1) == guard_value )
00364                     last_possible_pos-- ;
00365         }
00366 
00367         if (withoutHeader) {
00368             // have we enough space to put the header and at least one oid?
00369             if (last_possible_pos - s_curr_src_pos >= 4) {
00370                 *(dest_pos++) = (*gl_engineids)[s_curr_eng];
00371                 *(dest_pos++) = guard_value;
00372                 *(dest_pos++) = *(s_curr_src_pos + 2);
00373                 s_curr_src_pos += 3;
00374             }
00375         }
00376 
00377         if (last_possible_pos - s_curr_src_pos > 0) {
00378             copy(s_curr_src_pos, last_possible_pos, dest_pos);
00379             dest_pos += last_possible_pos - s_curr_src_pos;
00380             s_curr_src_pos = last_possible_pos;
00381             if (*s_curr_src_pos == guard_value)
00382                 s_curr_src_pos+=2;
00383             typename vector<T>::const_iterator timestamp_pos = last_possible_pos;
00384             while (*timestamp_pos != guard_value)
00385                 timestamp_pos--;
00386             s_curr_timestamp = *(timestamp_pos+1);
00387             *(dest_pos++) = guard_value;
00388             *(dest_pos++) = guard_value;
00389         }
00390     }
00391 
00392 
00393     int length = dest_pos - serialBuffer;
00394     *serialBuffer = length ;     
00395     *(serialBuffer+1) = (T)serializationFinished;
00396 
00397     return length;
00398 }
00399 
00400 template<typename T>
00401 bool MPIOutputSpikeBuffer<T>::hasNextSerializedMPIBuffer()
00402 {
00403     return !serializationFinished;
00404 }
00405 
00406 
00407 template<typename T>
00408 MPIOutputSpikeBufferVector<T>::MPIOutputSpikeBufferVector(int numBuffers,
00409         vector<gl_engineid_t> &engIDs,
00410         size_t buffer_size) :
00411         nNodes(numBuffers), buf_size(buffer_size)
00412 {
00413 
00414     serialBuffersPool = new T[nNodes*buf_size];
00415 
00416     _outputbuffers.resize(nNodes);
00417 
00418     for (int i = 0; i < nNodes; ++i) {
00419         _outputbuffers[i] = new MPIOutputSpikeBuffer<T>(engIDs);
00420         _outputbuffers[i]->initialize(serialBuffersPool + i*buf_size, buf_size);        
00421     }
00422 
00423     nextCycle();
00424 }
00425 
00426 
00427 template<typename T>
00428 MPIOutputSpikeBufferVector<T>::~MPIOutputSpikeBufferVector()
00429 {
00430     for (int i = 0; i < nNodes; ++i) {
00431         delete _outputbuffers[i];
00432     }
00433 
00434     delete [] serialBuffersPool;
00435 }
00436 
00437 
00438 #endif /*MPIOutputSpikeBuffer_H_*/

Generated on Wed Jul 9 16:34:38 2008 for PCSIM by  doxygen 1.5.5