00001 /*************************************************************************** 00002 tag: Peter Soetens Mon Jan 19 14:11:26 CET 2004 DataObjectLockFree.hpp 00003 00004 DataObjectLockFree.hpp - description 00005 ------------------- 00006 begin : Mon January 19 2004 00007 copyright : (C) 2004 Peter Soetens 00008 email : peter.soetens@mech.kuleuven.ac.be 00009 00010 *************************************************************************** 00011 * This library is free software; you can redistribute it and/or * 00012 * modify it under the terms of the GNU General Public * 00013 * License as published by the Free Software Foundation; * 00014 * version 2 of the License. * 00015 * * 00016 * As a special exception, you may use this file as part of a free * 00017 * software library without restriction. Specifically, if other files * 00018 * instantiate templates or use macros or inline functions from this * 00019 * file, or you compile this file and link it with other files to * 00020 * produce an executable, this file does not by itself cause the * 00021 * resulting executable to be covered by the GNU General Public * 00022 * License. This exception does not however invalidate any other * 00023 * reasons why the executable file might be covered by the GNU General * 00024 * Public License. * 00025 * * 00026 * This library is distributed in the hope that it will be useful, * 00027 * but WITHOUT ANY WARRANTY; without even the implied warranty of * 00028 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * 00029 * Lesser General Public License for more details. * 00030 * * 00031 * You should have received a copy of the GNU General Public * 00032 * License along with this library; if not, write to the Free Software * 00033 * Foundation, Inc., 59 Temple Place, * 00034 * Suite 330, Boston, MA 02111-1307 USA * 00035 * * 00036 ***************************************************************************/ 00037 00038 #pragma once 00039 00040 #include "generic/dataobjectlockfree/target.hpp" 00041 #include "generic/dataobjectlockfree/os/oro_arch.h" 00042 00043 namespace youbot { 00044 00045 /** 00046 * @brief This DataObject is a Lock-Free implementation, 00047 * such that reads and writes can happen concurrently without priority 00048 * inversions. 00049 * 00050 * When there are more writes than reads, the last write will 00051 * be returned. The internal buffer can get full if too many 00052 * concurrent reads are taking to long. In that case, each new 00053 * read will read the element the previous read returned. 00054 * 00055 * @verbatim 00056 * The following Truth table applies when a Low Priority thread is 00057 * preempted by a High Priority thread : 00058 * 00059 * L\H | Set | Get | 00060 * Set | Ok | Ok | 00061 * Get | Ok | Ok | 00062 * 00063 * legend : L : Low Priority thread 00064 * H : High Priority thread 00065 * Blk: Blocks High Priority thread (bad!) 00066 * internal::NA : Not allowed ! 00067 * @endverbatim 00068 * Further, multiple reads may occur before, during and after 00069 * a write operation simultaneously. The buffer needs readers+2*writers 00070 * elements to be guaranteed non blocking. 00071 * @ingroup PortBuffers 00072 */ 00073 template<class T> 00074 class DataObjectLockFree{ 00075 public: 00076 /** 00077 * The type of the data. 00078 */ 00079 typedef T DataType; 00080 00081 /** 00082 * @brief The maximum number of threads. 00083 * 00084 * When used in data flow, this is always 2. 00085 */ 00086 const unsigned int MAX_THREADS; // = 2 00087 private: 00088 /** 00089 * Conversion of number of threads to size of buffer. 00090 */ 00091 const unsigned int BUF_LEN; // = MAX_THREADS+2 00092 00093 /** 00094 * Internal buffer structure. 00095 * Both the read and write pointers pointing to this struct 00096 * must be declared volatile, since they are modified in other threads. 00097 * I did not declare data as volatile, 00098 * since we only read/write it in secured buffers. 00099 */ 00100 struct DataBuf { 00101 DataBuf() 00102 : data(), counter(), next() 00103 { 00104 oro_atomic_set(&counter, 0); 00105 } 00106 DataType data; mutable oro_atomic_t counter; DataBuf* next; 00107 }; 00108 00109 typedef DataBuf* volatile VolPtrType; 00110 typedef DataBuf ValueType; 00111 typedef DataBuf* PtrType; 00112 00113 VolPtrType read_ptr; 00114 VolPtrType write_ptr; 00115 00116 /** 00117 * A 3 element Data buffer 00118 */ 00119 DataBuf* data; 00120 public: 00121 00122 /** 00123 * Construct a DataObjectLockFree by name. 00124 * 00125 * @param _name The name of this DataObject. 00126 * @param initial_value The initial value of this DataObject. 00127 */ 00128 DataObjectLockFree( const T& initial_value = T(), unsigned int max_threads = 2 ) 00129 : MAX_THREADS(max_threads), BUF_LEN( max_threads + 2), 00130 read_ptr(0), 00131 write_ptr(0) 00132 { 00133 data = new DataBuf[BUF_LEN]; 00134 read_ptr = &data[0]; 00135 write_ptr = &data[1]; 00136 data_sample(initial_value); 00137 } 00138 00139 ~DataObjectLockFree() { 00140 delete[] data; 00141 } 00142 00143 /** 00144 * Get a copy of the data. 00145 * This method will allocate memory twice if data is not a value type. 00146 * Use Get(DataType&) for the non-allocating version. 00147 * 00148 * @return A copy of the data. 00149 */ 00150 virtual DataType Get() const {DataType cache; Get(cache); return cache; } 00151 00152 /** 00153 * Get a copy of the Data (non allocating). 00154 * If pull has reserved enough memory to store the copy, 00155 * no memory will be allocated. 00156 * 00157 * @param pull A copy of the data. 00158 */ 00159 virtual void Get( DataType& pull ) const 00160 { 00161 PtrType reading; 00162 // loop to combine Read/Modify of counter 00163 // This avoids a race condition where read_ptr 00164 // could become write_ptr ( then we would read corrupted data). 00165 do { 00166 reading = read_ptr; // copy buffer location 00167 oro_atomic_inc(&reading->counter); // lock buffer, no more writes 00168 // XXX smp_mb 00169 if ( reading != read_ptr ) // if read_ptr changed, 00170 oro_atomic_dec(&reading->counter); // better to start over. 00171 else 00172 break; 00173 } while ( true ); 00174 // from here on we are sure that 'reading' 00175 // is a valid buffer to read from. 00176 pull = reading->data; // takes some time 00177 // XXX smp_mb 00178 oro_atomic_dec(&reading->counter); // release buffer 00179 } 00180 00181 /** 00182 * Set the data to a certain value (non blocking). 00183 * 00184 * @param push The data which must be set. 00185 */ 00186 virtual void Set( const DataType& push ) 00187 { 00188 /** 00189 * This method can not be called concurrently (only one 00190 * producer). With a minimum of 3 buffers, if the 00191 * write_ptr+1 field is not occupied, it will remain so 00192 * because the read_ptr is at write_ptr-1 (and can 00193 * not increment the counter on write_ptr+1). Hence, no 00194 * locking is needed. 00195 */ 00196 // writeout in any case 00197 write_ptr->data = push; 00198 PtrType wrote_ptr = write_ptr; 00199 // if next field is occupied (by read_ptr or counter), 00200 // go to next and check again... 00201 while ( oro_atomic_read( &write_ptr->next->counter ) != 0 || write_ptr->next == read_ptr ) 00202 { 00203 write_ptr = write_ptr->next; 00204 if (write_ptr == wrote_ptr) 00205 return; // nothing found, to many readers ! 00206 } 00207 00208 // we will be able to move, so replace read_ptr 00209 read_ptr = wrote_ptr; 00210 write_ptr = write_ptr->next; // we checked this in the while loop 00211 } 00212 00213 virtual void data_sample( const DataType& sample ) { 00214 // prepare the buffer. 00215 for (unsigned int i = 0; i < BUF_LEN-1; ++i) { 00216 data[i].data = sample; 00217 data[i].next = &data[i+1]; 00218 } 00219 data[BUF_LEN-1].data = sample; 00220 data[BUF_LEN-1].next = &data[0]; 00221 } 00222 }; 00223 } 00224 00225