USRP_Server  2.0
A flexible, GPU-accelerated radio-frequency readout software.
USRP_server_memory_management.hpp
Go to the documentation of this file.
1 #pragma once
2 #ifndef USRP_MEMORY_INCLUDED
3 #define USRP_MEMORY_INCLUDED
4 
7 
8 //class for controlling events. The mutex makes it thread safe.
10  public:
12  void wait();
13  void release();
14  void rearm();
15  private:
16  boost::condition_variable ready_cond;
17  boost::mutex ready_mutex;
18  bool ready;
19 };
20 
21 //helper class for the vna decimator: the vna scan is NOT syncronous with the USRP buffer
23  public:
24  int valid_size; //size to download in the current buffer
25  int new0; //hwere to copy the new buffer & size of the reminder
26  int total_len;
28  VNA_decimator_helper(int init_ppt, int init_buffer_len);
29 
30  void update();
31 
32  private:
33 
34  int ppt,buffer_len;
35 
36 };
37 
38 //this class helps with the memory copy-paste in post-demodulation decimation processes with the gp decimator (see kernels.cu)
40  public:
41  int new_0; //where to copy the new buffer and spare size
42  int out_size; //size of the output buffer
43 
44  gp_decimator_helper(int buffer_len_init, int decim_init);
45 
46 
47  //update the values. in case of different buffer length on next interation use the arg
48  void update(int new_buffer_len = 0);
49 
50  private:
51 
52  int decim; //decimation factor
53  int buffer_len; //length of the RX buffer
54  int tot_buffer_len; //length of the buffer + reminder
55 
56  int calculate_spare();
57 
58  int calculate_outsize();
59 };
60 
61 //this class helps managing the decimation in the case off pfb
62 //NOTE: in future could be merged with the other class
64  public:
65 
66  int out_size; //in samples
67  int new_0; // in samples
68 
69  pfb_decimator_helper(int init_decim, int init_nfft);
70 
71  void update(int current_batch);
72  private:
73  int nfft;
74  int decim; //in terms of fft buffers
75  int buffer_len; //in samples
76 };
77 //helper class for moving around the buffer and the reminder buffer of the polyphase filter bank
78 //NOTE: this is exclusively for the PFB
80  public:
81 
82  int n_tones; //number of fft points.
83  int eff_length; //total lenght of the buffer + spare samples from preceding buffer.
84  int buffer_len; //length of the buffer incoming from USRP.
85  int average; //how many buffer are averaged by the polyphase filter.
86  int n_eff_tones; //how many fft points will be effectively copied to host memory.
87  int new_0; //where to copy the next buffer.
88  int copy_size; //size of the device to host memory transfert.
89  int current_batch; //how many batches of the fft are valid.
90  int spare_samples; //how many samples must be copied to the begin of the fft input buffer.
91  int spare_begin; //from which sample in the buffer the spare has to be copied.
92 
93  buffer_helper (int _n_tones, int _buffer_len, int _average, int _n_eff_tones);
94 
95  void update();
96 
97  private:
98 
99  //the formula I wrote had a defect for certain number combinations so..
100  int simulate_batching();
101 };
102 
103 template <typename vector_type>
105 
106 
107  public:
108 
109  int vector_size, pipe_size, wait_on_full;
110 
111  boost::lockfree::queue< intptr_t, boost::lockfree::fixed_sized<(bool)true>>* allocated;
112  boost::lockfree::queue< intptr_t, boost::lockfree::fixed_sized<(bool)true>>* deallocated;
113 
114  bool prefil;//if fals the queue will not adjust automatically
115 
116  preallocator(int init_vector_size, int init_pipe_size, bool prefill_init = true, int core = -1){
117  counter = 0;
118  prefil = prefill_init; //controls the prefill mechanism
119  vector_size = init_vector_size;
120  wait_on_full = 5;
121  pipe_size = init_pipe_size;
122  allocated = new boost::lockfree::queue< intptr_t ,boost::lockfree::fixed_sized<(bool)true>> (init_pipe_size);
123  deallocated = new boost::lockfree::queue< intptr_t ,boost::lockfree::fixed_sized<(bool)true>>(init_pipe_size);
124  filler = new boost::thread(boost::bind(&preallocator::queue_filler,this));
125  if(core>-1)Thread_Prioriry(*filler, 40, core);
126  deallocator = new boost::thread(boost::bind(&preallocator::queue_deallocator,this));
127  //if(core>-1)Thread_Prioriry(*deallocator, 40, core);
128  while(counter<pipe_size-1)boost::this_thread::sleep_for(boost::chrono::milliseconds{200});
129 
130  }
131 
132  vector_type* get(){
133 
134 
135  intptr_t thatvalue_other;
136 
137  while(not allocated->pop(thatvalue_other))boost::this_thread::sleep_for(boost::chrono::microseconds{10});
138 
139  counter--;
140  return reinterpret_cast<vector_type*>(thatvalue_other);
141 
142  }
143 
144  void trash(vector_type* trash_vector){
145 
146  while(not deallocated->push(reinterpret_cast<intptr_t>(trash_vector)))boost::this_thread::sleep_for(boost::chrono::microseconds{1});
147  counter++;
148 
149  }
150 
151  void close(){
152  deallocator->interrupt();
153  deallocator->join();
154  delete deallocator;
155  deallocator = nullptr;
156 
157  filler->interrupt();
158  filler->join();
159  delete filler;
160  filler = nullptr;
161 
162  delete allocated;
163  delete deallocated;
164 
165 
166 
167  }
168 
169  private:
170  bool warning = (bool)true;
171  boost::thread* filler;
172  boost::thread* deallocator;
173  std::atomic<int> counter;
174 
175  void queue_deallocator(){
176  set_this_thread_name("queue_deallocator");
177  bool active = (bool)true;
178  while(active){
179  try{
180  boost::this_thread::interruption_point();
181  intptr_t trash_vector = 0;
182  if(deallocated->pop(trash_vector)){
183  int err_counter = 0;
184  //try to recicle or cancel
185  intptr_t thatvalue = reinterpret_cast<intptr_t>(trash_vector);
186  while (not allocated->push(thatvalue) or err_counter){
187  err_counter++;
188  boost::this_thread::sleep_for(boost::chrono::microseconds{wait_on_full});
189  }
190  boost::this_thread::sleep_for(boost::chrono::milliseconds{3});
191 
192 
193  counter++;
194  }else boost::this_thread::sleep_for(boost::chrono::microseconds{wait_on_full});
195  }catch (boost::thread_interrupted&){active=(bool)false;}
196  }
197 
198  while(not deallocated->empty()){
199 
200  intptr_t trash_vector;
201  if(deallocated->pop(trash_vector)){
202  cudaFreeHost(static_cast<vector_type*>(reinterpret_cast<void*>(trash_vector)));
203  }else{
204  boost::this_thread::sleep_for(boost::chrono::milliseconds{3});
205  }
206 
207  }
208  }
209 
210  void queue_filler(){
211  set_this_thread_name("queue_filler");
212  bool active = (bool)true;
213  vector_type* h_aPinned=NULL;
214  intptr_t thatvalue = 1;
215  int debug_counter = 0;
216 
217  while(counter<pipe_size-1){
218 
219  cudaError_t status = cudaMallocHost((void**)&h_aPinned, vector_size*sizeof(vector_type));
220  if (status != cudaSuccess){
221  print_error("Memory manager cannot allocate pinned host memory!");
222  exit(-1);
223  }
224  thatvalue = reinterpret_cast<intptr_t>(h_aPinned);
225  while(not allocated->push(thatvalue))boost::this_thread::sleep_for(boost::chrono::microseconds{10*wait_on_full});
226  counter++;
227 
228  }
229  if(prefil){
230 
231  while(active){
232  try{
233  boost::this_thread::interruption_point();
234  if(counter < pipe_size/10.){
235  cudaError_t status = cudaMallocHost((void**)&h_aPinned, vector_size*sizeof(vector_type));
236  if (status != cudaSuccess){
237  print_error("Memory manager cannot allocate pinned host memory!");
238  exit(-1);
239  }
240  thatvalue = reinterpret_cast<intptr_t>(h_aPinned);
241  while(not allocated->push(thatvalue))boost::this_thread::sleep_for(boost::chrono::microseconds{10*wait_on_full});
242  counter++;
243  debug_counter++;
244  }else if(debug_counter>0 ){//and warning
245  pipe_size+=debug_counter;
246  print_debug("Internal memory manager had to adjust pipe size to ",pipe_size);
247 
248  debug_counter = 0;
249  }
250  boost::this_thread::sleep_for(boost::chrono::microseconds{10*wait_on_full});
251  }catch (boost::thread_interrupted&){active=(bool)false;}
252  }
253  }else{
254  while(active){
255  try{
256  boost::this_thread::interruption_point();
257  boost::this_thread::sleep_for(boost::chrono::microseconds{1000*wait_on_full});
258  }catch (boost::thread_interrupted&){active=(bool)false;}
259  }
260  }
261 
262 
263  while(not allocated->empty()){
264 
265  intptr_t trash_vector;
266  if(allocated->pop(trash_vector)){
267  cudaFreeHost(static_cast<vector_type*>(reinterpret_cast<void*>(trash_vector)));
268  }else{
269  boost::this_thread::sleep_for(boost::chrono::milliseconds{1});
270  }
271  }
272  }
273 };
274 #endif
void trash(vector_type *trash_vector)
preallocator(int init_vector_size, int init_pipe_size, bool prefill_init=true, int core=-1)
boost::lockfree::queue< intptr_t, boost::lockfree::fixed_sized<(bool) true > > * deallocated
boost::lockfree::queue< intptr_t, boost::lockfree::fixed_sized<(bool) true > > * allocated
void print_error(std::string text)
void Thread_Prioriry(boost::thread &Thread, int priority, int affinity)
void set_this_thread_name(std::string thread_name)
Set the htread name reported in the logging.
void print_debug(std::string text, double value)