USRP_Server  2.0
A flexible, GPU-accelerated radio-frequency readout software.
USRP_server_link_threads.cpp
Go to the documentation of this file.
2 
3 
4 //the initialization method requires an already initialized hardware manager class and an already initialized streaming queue (output of analysis)
5 TXRX::TXRX(server_settings* settings, hardware_manager* init_hardware, bool diagnostic_init){
6  BOOST_LOG_TRIVIAL(info) << "Initializing thread link class";
7  //set the streamin/writing options
8  tcp_streaming = settings->TCP_streaming;
9  file_writing = settings->FILE_writing;
10 
11  //assign the streaming queue
12  stream_queue = new rx_queue(STREAM_QUEUE_LENGTH);
13 
14  //set diagnostic info
15  diagnostic = diagnostic_init;
16 
17  //import the hardware pointer
18  hardware = init_hardware;
19 
20  //memory initialization (can be changed on the fly)
21  A_rx_buffer_len = settings->default_rx_buffer_len;
22  A_tx_buffer_len = settings->default_tx_buffer_len;
23  B_rx_buffer_len = settings->default_rx_buffer_len;
24  B_tx_buffer_len = settings->default_tx_buffer_len;
25 
26  //build the streaming/ writing chain
28  TCP_streamer = new Sync_server(stream_queue, rx_output_memory, true);
29  H5_writer = new H5_file_writer(TCP_streamer);
30  }
31 
32  if(tcp_streaming and (not file_writing))TCP_streamer = new Sync_server(stream_queue, rx_output_memory, false);
33 
34  if((not tcp_streaming) and file_writing)H5_writer = new H5_file_writer(stream_queue, rx_output_memory);
35 
36  if(tcp_streaming)TCP_streamer->connect(TCP_SYNC_PORT);
37 
38  //allocation depends on parameters
39  output_memory_size = 0;
40 
41  std::cout<<"Memory initialization done."<<std::endl;
42 
43  //initialize the conditional waiting classes
44  rx_conditional_waiting = new threading_condition();
45  tx_conditional_waiting = new threading_condition();
46 
47  //set the initial thread status
48  RX_status = false;
49  TX_status = false;
50 
51  //what follows is the initialization of the memory allocator pointers.
52  //do NOT skip that part as the system will use the pointers to detect memory allocation status
53  //and unallocated memory will result in if(pointer) to be compiled as if(true)
54 
55  //set the pointer to current parameter configuration
56  A_current_tx_param = nullptr;
57  A_current_rx_param = nullptr;
58  B_current_tx_param = nullptr;
59  B_current_rx_param = nullptr;
60 
61  //initializing memory pointers
62  A_tx_memory = nullptr;
63  A_rx_memory = nullptr;
64  B_tx_memory = nullptr;
65  B_rx_memory = nullptr;
66 
67  rx_output_memory = nullptr;
68 
69  BOOST_LOG_TRIVIAL(info) << "Thread link class initialized";
70 }
71 
72 //launches the setting functions for the required signals, antennas...
73 void TXRX::set(usrp_param* global_param){
74 
75  BOOST_LOG_TRIVIAL(info) << "Setting thread link class";
76 
77  std::vector<param*> modes(4);
78  modes[0] = &global_param->A_TXRX;
79  modes[1] = &global_param->A_RX2;
80  modes[2] = &global_param->B_TXRX;
81  modes[3] = &global_param->B_RX2;
82 
83  //reset thread counting mechanism
84  thread_counter = 0;
85  rx_thread_n.clear();
86  tx_thread_n.clear();
87 
88  for(size_t i = 0; i < modes.size(); i++ ){
89 
90  if(modes[i]->mode != OFF){
91  if((modes[i]->burst_on != 0) and (modes[i]->burst_on == 0)){
92  print_error("one parameter has burst_off != 0 and burst_on == 0");
93  exit(-1);
94  }
95  if((modes[i]->burst_on == 0) and (modes[i]->burst_on != 0)){
96  print_error("one parameter has burst_on != 0 and burst_off == 0");
97  exit(-1);
98  }
99  if(modes[i]->burst_on != 0){
100  modes[i]->buffer_len = modes[i]->burst_on * modes[i]->rate;
101  std::cout<<"Resizing buffer length to match burst length to: "<< modes[i]->buffer_len<<" samples."<<std::endl;
102  }
103  }
104  int this_thread_n = thread_counter*2;
105  switch(modes[i]->mode){
106  case OFF:
107  break;
108 
109  case RX:
110  std::cout<<"Allocating RF frontend "<<(char)(i<2?'A':'B')<<" RX memory buffer: "<< (modes[i]->buffer_len * sizeof(float2))/(1024.*1024.)<< " MB per buffer..."<<std::endl;
111  if(i<2){
112  if(A_rx_buffer_len != modes[i]->buffer_len or not A_rx_memory){
113  if(A_rx_memory)A_rx_memory->close();
114  A_rx_memory = new preallocator<float2>(A_rx_buffer_len,RX_QUEUE_LENGTH,this_thread_n);
115  }else{
116  std::cout<<"(already allocated).."<<std::endl;
117  }
118  A_rx_buffer_len = modes[i]->buffer_len;
119 
120  //initialize the demodulator class
121  A_rx_dem = new RX_buffer_demodulator(modes[i]);
122 
123  A_current_rx_param = modes[i];
124 
125  }else{
126 
127  if(B_rx_buffer_len != modes[i]->buffer_len or not B_rx_memory){
128  if(B_rx_memory)B_rx_memory->close();
129  B_rx_memory = new preallocator<float2>(B_rx_buffer_len,RX_QUEUE_LENGTH,this_thread_n);
130  }else{
131  std::cout<<"(already allocated).."<<std::endl;
132  }
133  B_rx_buffer_len = modes[i]->buffer_len;
134 
135  //initialize the demodulator class
136  B_rx_dem = new RX_buffer_demodulator(modes[i]);
137 
138  B_current_rx_param = modes[i];
139  }
140 
141  rx_thread_n.push_back(this_thread_n);
142  thread_counter +=1;
143  mem_mult_tmp = std::max((int)(modes[i]->data_mem_mult),1);
144  print_debug("memory expansion multiplier: ",mem_mult_tmp);
145  if ((output_memory_size>modes[i]->buffer_len * mem_mult_tmp) or not rx_output_memory){
146  std::cout<<"Allocating RX output memory buffer: "<< (mem_mult_tmp * modes[i]->buffer_len * sizeof(float2))/(1024.*1024.)<< " MB per buffer..."<<std::endl;
147  if(modes[i]->buffer_len>output_memory_size and output_memory_size>0)std::cout<<" (updating buffer size)"<<std::endl;
148  if(rx_output_memory) rx_output_memory->close();
149  output_memory_size = modes[i]->buffer_len * mem_mult_tmp;
150  rx_output_memory = new preallocator<float2>(output_memory_size,RX_QUEUE_LENGTH);
151 
152  }else{
153  std::cout<<" RX output memory buffer requirements already satisfaid."<<std::endl;
154  }
155 
156 
157  //update pointers
158  if(tcp_streaming)TCP_streamer->update_pointers(stream_queue, rx_output_memory);
159  if(file_writing){
161  H5_writer->update_pointers(TCP_streamer->out_queue, TCP_streamer->memory):
162  H5_writer->update_pointers(stream_queue, rx_output_memory);
163  }
164 
165  break;
166 
167  case TX:
168 
169  //adjust the memory buffer in case of custom buffer length or pint the memori to nullptr
170  std::cout<<"Allocating RF frontend "<<(char)(i<2?'A':'B')<<" TX memory buffer: "<< (modes[i]->buffer_len * sizeof(float2))/(1024.*1024.)<< " MB per buffer..."<<std::endl;
171  //NOTE: this queue doesn't autorefill
172 
173  if(i<2){
174  if(modes[i]->dynamic_buffer()){
175  if(not A_tx_memory or modes[i]->buffer_len != A_tx_buffer_len){
176  if(A_tx_memory){
177  A_tx_memory->close();
178  }
179  A_tx_memory = new preallocator<float2>(modes[i]->buffer_len,TX_QUEUE_LENGTH,false,this_thread_n);
180  A_tx_buffer_len = modes[i]->buffer_len;
181  }else{
182  std::cout<<"(already allocated).."<<std::flush;
183  }
184  }else{
185  if(A_tx_memory){
186  A_tx_memory->close();
187  }
188  A_tx_memory = nullptr;
189  }
190 
191  A_tx_gen = new TX_buffer_generator(modes[i]);
192 
193  A_preallocated = 0;
194 
195  A_current_tx_param = modes[i];
196 
197  }else{
198  if(modes[i]->dynamic_buffer()){
199  if(not B_tx_memory or modes[i]->buffer_len != B_tx_buffer_len){
200  B_tx_memory = new preallocator<float2>(modes[i]->buffer_len,TX_QUEUE_LENGTH,false,this_thread_n);
201  B_tx_buffer_len = modes[i]->buffer_len;
202  }else{
203  std::cout<<"(already allocated).."<<std::flush;
204  }
205  }else{
206  if(B_tx_memory){
207  B_tx_memory->close();
208  }
209  B_tx_memory = nullptr;
210  }
211 
212  B_tx_gen = new TX_buffer_generator(modes[i]);
213 
214  //prefill the queue and save the number of packets
215  //WARNING: something was wrong with that func. see the TX_buffer_generator class
216  B_preallocated = 0;// tx_gen->prefill_queue(hardware->TX_queue, tx_memory, modes[i]);
217 
218  B_current_tx_param = modes[i];
219 
220  }
221  tx_thread_n.push_back(this_thread_n);
222 
223  thread_counter+=1;
224 
225  break;
226  }
227  }
228 
229 
230  std::cout<<"\033[1;32mSetting USRP hardware:\033[0m"<<std::endl;
231  hardware->preset_usrp(global_param);
232 
233  BOOST_LOG_TRIVIAL(info) << "Thread link class set";
234 
235 }
236 
237 //start the threads
238 void TXRX::start(usrp_param* global_param){
239 
240  BOOST_LOG_TRIVIAL(info) << "Starting thread link";
241 
242  //counts the started threads
243  int rx_threads = 0;
244  int tx_threads = 0;
245 
246  if(not hardware->sw_loop)hardware->main_usrp->set_time_unknown_pps(uhd::time_spec_t(0.0));
247 
248  //current_tx_param is nullptr if no param struct in global_param has TX mode
249  if(global_param->A_TXRX.mode!=OFF){
250  if(not hardware->check_A_tx_status()){
251 
252  //start the TX worker: this thread produces the samples and push them in a queue read by the next thread
253  A_TX_worker = new boost::thread(boost::bind(&TXRX::tx_single_link,this,
254  A_tx_memory, //memory preallocator
255  A_tx_gen, //signal generator class
256  hardware->A_TX_queue, //has the queue to the tx loader thread
257  global_param->A_TXRX.samples,
258  global_param->A_TXRX.dynamic_buffer(),
259  A_preallocated,
260  'A' ));
261 
262  SetThreadName(A_TX_worker, "A_TX_worker");
263 
264  Thread_Prioriry(*A_TX_worker, 1, 6);//tx_thread_n[tx_threads]+1);
265 
266  //start the TX loader: this thrad takes samples form the other thread and stream them on the USRP
267  hardware->start_tx(
268  tx_conditional_waiting,
269  6,//tx_thread_n[tx_threads]+1,
270  &(global_param->A_TXRX),
271  'A',
272  A_tx_memory
273  );
274  tx_threads += 1;
275 
276  }else{
277  std::stringstream ss;
278  ss << "Cannot start a new measurement on usrp "<< hardware->this_usrp_number <<" if there is already one running on TX";
279  print_error(ss.str());
280  return;
281  }
282  }else A_TX_worker = nullptr;
283 
284  if(global_param->B_TXRX.mode!=OFF){
285  if(not hardware->check_B_tx_status()){
286 
287  //start the TX worker: this thread produces the samples and push them in a queue read by the next thread
288  B_TX_worker = new boost::thread(boost::bind(&TXRX::tx_single_link,this,
289  B_tx_memory, //memory preallocator
290  B_tx_gen, //signal generator class
291  hardware->B_TX_queue, //has the queue to the tx loader thread
292  global_param->B_TXRX.samples,
293  global_param->B_TXRX.dynamic_buffer(),
294  B_preallocated,
295  'B' ));
296 
297  SetThreadName(B_TX_worker, "B_TX_worker");
298 
299  Thread_Prioriry(*B_TX_worker, 1, 4);//tx_thread_n[tx_threads]+1);
300 
301 
302  //start the TX loader: this thrad takes samples form the other thread and stream them on the USRP
303  hardware->start_tx(
304  tx_conditional_waiting,
305  4,//tx_thread_n[tx_threads]+1,
306  &(global_param->B_TXRX),
307  'B',
308  B_tx_memory
309  );
310  tx_threads += 1;
311 
312  }else{
313  std::stringstream ss;
314  ss << "Cannot start a new measurement on usrp "<< hardware->this_usrp_number <<" if there is already one running on TX";
315  print_error(ss.str());
316  return;
317  }
318  }else B_TX_worker = nullptr;
319 
320  //current_rx_param is nullptr if no param struct in global_param has RX mode
321  if(global_param->A_RX2.mode!=OFF){
322  if(not hardware->check_A_rx_status()){
323 
324  //start the RX worker: takes samples from the queue of the next thread, analyze them and push the result in the streaming queue
325  A_RX_worker = new boost::thread(boost::bind(&TXRX::rx_single_link,this,
326  A_rx_memory,
328  A_rx_dem,
329  hardware,
330  global_param->A_RX2.samples,
331  stream_queue,
332  'A' ));
333 
334  SetThreadName(A_RX_worker, "A_RX_worker");
335 
336  Thread_Prioriry(*A_RX_worker, 1, 0);//rx_thread_n[rx_threads]*2+1);
337 
338  //start the RX thread: interfaces with the USRP receiving samples and pushing them in a queue read by the thread launched above.
339  hardware->start_rx(
340  A_rx_buffer_len,
341  rx_conditional_waiting,
342  A_rx_memory,
343  0,//rx_thread_n[rx_threads]*2+1,
344  &(global_param->A_RX2),
345  'A'
346  );
347 
348  rx_threads += 1;
349 
350 
351  }else{
352  std::stringstream ss;
353  ss << "Cannot start a new measurement on usrp "<< hardware->this_usrp_number <<" if there is already one running on RX";
354  print_error(ss.str());
355  return;
356  }
357  }else A_RX_worker = nullptr;
358 
359  if(global_param->B_RX2.mode!=OFF){
360  if(not hardware->check_B_rx_status()){
361 
362  //start the RX worker: takes samples from the queue of the next thread, analyze them and push the result in the streaming queue
363  B_RX_worker = new boost::thread(boost::bind(&TXRX::rx_single_link,this,
364  B_rx_memory,
366  B_rx_dem,
367  hardware,
368  global_param->B_RX2.samples,
369  stream_queue,
370  'B' ));
371 
372  SetThreadName(B_RX_worker, "B_RX_worker");
373 
374  Thread_Prioriry(*B_RX_worker, 1, 2);//rx_thread_n[rx_threads]*2+1);
375 
376  //start the RX thread: interfaces with the USRP receiving samples and pushing them in a queue read by the thread launched above.
377  hardware->start_rx(
378  B_rx_buffer_len,
379  rx_conditional_waiting,
380  B_rx_memory,
381  2,//rx_thread_n[rx_threads]*2,
382  &(global_param->B_RX2),
383  'B'
384  );
385 
386  rx_threads += 1;
387 
388  //eventually start streaming and writing
389 
390 
391  }else{
392  std::stringstream ss;
393  ss << "Cannot start a new measurement on usrp "<< hardware->this_usrp_number <<" if there is already one running on RX";
394  print_error(ss.str());
395  return;
396  }
397  }else B_RX_worker = nullptr;
398 
399  if ((global_param->A_RX2.mode!=OFF) and (global_param->B_RX2.mode!=OFF)){
400  if(file_writing){
401  H5_writer->start(global_param);
402  }
403  if(tcp_streaming){
405  if(not TCP_streamer->start(&(global_param->A_RX2))){
406  stop(true);
407  }
408  }
409  }else if (global_param->B_RX2.mode!=OFF){
410  if(file_writing){
411  H5_writer->start(global_param);
412  }
413  if(tcp_streaming){
414  if(not TCP_streamer->start(&(global_param->B_RX2))){
415  stop(true);
416  }
417  }
418  }else if (global_param->A_RX2.mode!=OFF){
419  if(file_writing){
420  H5_writer->start(global_param);
421  }
422  if(tcp_streaming){
423  if(not TCP_streamer->start(&(global_param->A_RX2))){
424  stop(true);
425  }
426  }
427  }
428 
429  BOOST_LOG_TRIVIAL(info) << "Thread link started";
430 
431 }
432 //check if the streamer can take a new command and clean the threads for it.
433 //in case the force option is true, force close the threads and cleans the queues
434 // NOTE: with force option true this call is blocking
435 bool TXRX::stop(bool force){
436  if(tcp_streaming)if (TCP_streamer->NEED_RECONNECT == true)force = true;
437 
438  bool status = true;
439 
440 
441  if(A_current_rx_param or B_current_rx_param){
442 
443  bool data_output_status;
444  data_output_status = (file_writing or tcp_streaming)?true:false;
445 
446  //give the stop command to the filewriter and streamer only if the hardware and dsp are done
447  bool hw_status = RX_status or hardware->check_rx_status();
448 
449  bool tcp_status = tcp_streaming?(hw_status?true:TCP_streamer->stop(force)):false;
450 
451  bool wrt_status = file_writing?(tcp_status?true:(hw_status?true:H5_writer->stop())):false;
452 
453  data_output_status = tcp_status or wrt_status;
454  if(diagnostic){
455  print_debug("sw_rx is_active: ",RX_status);
456  print_debug("hw_rx is_active: ",hardware->check_rx_status());
457  print_debug("sw_tx is_active: ",TX_status);
458  print_debug("hw_tx is_active: ",hardware->check_tx_status());
459  if(file_writing)print_debug("sw_wrt is_active: ",wrt_status);
460  if(tcp_streaming)print_debug("sw_stream is_active: ",tcp_status);
461  }
462 
463  hardware->check_rx_status();
464  if(((not RX_status) and (not hardware->check_rx_status()) and (not data_output_status)) or force){
465  hardware->close_rx();
466 
467  if (A_RX_worker){
468  //close the rx worker
469  A_RX_worker->interrupt();
470  A_RX_worker->join();
471  delete A_RX_worker;
472  A_RX_worker = nullptr;
473  //reset the parameter pointer
474  A_current_rx_param = nullptr;
475  }
476  if (B_RX_worker){
477  //close the rx worker
478  B_RX_worker->interrupt();
479  B_RX_worker->join();
480  delete B_RX_worker;
481  B_RX_worker = nullptr;
482  //reset the parameter pointer
483  B_current_rx_param = nullptr;
484  }
485 
486  //force close data output threads
487  if(file_writing)H5_writer->stop(true);
488  //if(tcp_streaming)TCP_streamer->stop(true);
489  //if the threads are still running
490  }else{status = status and false;}
491  }
492 
493  if(A_current_tx_param or B_current_tx_param){
494  if(diagnostic){
495  print_debug("TX_status ",TX_status);
496  print_debug("hardware->check_tx_status() ",hardware->check_tx_status());
497  }
498  if((not TX_status and not hardware->check_tx_status()) or force){
499  //close the rx interface thread
500  hardware->close_tx();
501 
502  //close the rx worker
503  if (A_TX_worker){
504  A_TX_worker->interrupt();
505  A_TX_worker->join();
506  delete A_TX_worker;
507  A_TX_worker=nullptr;
508  A_tx_gen->close();
509 
510  //reset the parameter pointer
511  A_current_tx_param = nullptr;
512  }
513  if (B_TX_worker){
514  B_TX_worker->interrupt();
515  B_TX_worker->join();
516  delete B_TX_worker;
517  B_TX_worker = nullptr;
518 
519  B_tx_gen->close();
520 
521  //reset the parameter pointer
522  B_current_tx_param = nullptr;
523 
524  }
525 
526  //if the threads are still running
527  }else{status = status and false;}
528  }
529 
530  //reset the thread counter
531  if(status)thread_counter = 0;
532 
533  BOOST_LOG_TRIVIAL(info) << "Operations concluded? "<<status;
534 
535  return status;
536 }
537 
538 //thread for loading a packet from the buffer generator into the transmit thread
539 //assuming a single TX generator and a single TX loader
540 void TXRX::tx_single_link(
541  preallocator<float2>* memory, //the custom memory allocator to use in case of dynamically denerated buffer
542  TX_buffer_generator* generator, //source of the buffer
543  tx_queue* queue_tx, //holds the pointer to the queue
544  size_t total_samples, //how many sample to produce and push
545  bool dynamic, //true if the preallocation requires dynamic memory
546  int preallocated, // how many samples have been preallocate
547  char front_end
548 ){
549 
550  std::stringstream thread_name;
551  thread_name << "tx single link "<<front_end;
552  set_this_thread_name(thread_name.str());
553 
554  BOOST_LOG_TRIVIAL(info) << "Thread started";
555 
556  if(front_end!='A' and front_end!='B'){
557  print_error("Frontend code not recognised in transmission link thread");
558  return;
559  }
560 
561 
562 
563  //notify that the tx worker is on
564  TX_status = true;
565 
566  //pointer to the transmission buffer
567  float2* tx_vector;
568 
569  size_t tx_buffer_len = front_end=='A'?A_tx_buffer_len:B_tx_buffer_len;
570 
571  //number of samples "sent" in to the tx queue
572  size_t sent_samples = preallocated*tx_buffer_len;
573 
574  //thread loop controller
575  bool active = true;
576  //main loading loop
577  while((sent_samples < total_samples) and active){
578  try{
579  boost::this_thread::interruption_point();
580  sent_samples+=tx_buffer_len;
581  if(dynamic)tx_vector = memory->get();
582  generator->get(&tx_vector);
583  bool insert = false;
584  while(not insert){
585  insert = queue_tx->push(tx_vector);
586  boost::this_thread::sleep_for(boost::chrono::microseconds{1});
587  }
588  //std::cout<<"Pushing buffer"<<std::endl;
589  boost::this_thread::sleep_for(boost::chrono::microseconds{1});
590  }catch(boost::thread_interrupted &){ active = false;
591 
592 
593  }
594  }
595  //notify that the tx worker is off
596  TX_status = false;
597 
598  BOOST_LOG_TRIVIAL(info) << "Thread joining";
599 
600 }
601 
602 //thread for taking a packet from the receive queue and pushing it into the analysis queue
603 void TXRX::rx_single_link(
604  preallocator<float2>* input_memory,
605  preallocator<float2>* output_memory,
606  RX_buffer_demodulator* demodulator,
607  hardware_manager* rx_thread,
608  size_t max_samples,
609  rx_queue* stream_q, //pointer to the queue to transport the buffer wrapper structure from the analysis to the streaming thread
610  char front_end
611 ){
612 
613  std::stringstream thread_name;
614  thread_name << "rx single link "<<front_end;
615  set_this_thread_name(thread_name.str());
616 
617  BOOST_LOG_TRIVIAL(info) << "Thread started";
618 
619  if(front_end!='A' and front_end!='B'){
620  print_error("Frontend code not recognised in receiver link thread");
621  return;
622  }
623 
624  //notify that the rx worker is on
625  RX_status = true;
626 
627  //wrapper to the buffer
628  RX_wrapper rx_buffer;
629 
630  //pointer to the new buffer
631  float2* output_buffer;
632 
633  //controls thread loop
634  bool active = true;
635 
636  //counter of samples to acquire
637  size_t recv_samples = 0;
638 
639  //saturation warning will only be displayed once.
640  bool queue_saturnation_warning = true;
641 
642  rx_queue* RX_queue = front_end=='A'?rx_thread->A_RX_queue:rx_thread->B_RX_queue;
643 
644  //analysis cycle
645  while(active and (recv_samples < max_samples)){
646  try{
647 
648  //look for abrupt interruptions
649  boost::this_thread::interruption_point();
650 
651  //check if a new packet is in the queue
652  if(RX_queue->pop(rx_buffer)){
653 
654  //the number of channels will be the same for all the measure
655  rx_buffer.channels = demodulator->parameters->wave_type.size();
656 
657  //update the counter
658  recv_samples += rx_buffer.length;
659 
660  //get the output memory available
661  output_buffer = output_memory->get();
662 
663  //analyze the packet and change the valid length of the packet
664  rx_buffer.length = demodulator->process(&rx_buffer.buffer, &output_buffer);
665 
666  //recycle the input buffer
667  input_memory->trash(rx_buffer.buffer);
668 
669  //point to the right buffer in the wrapper
670  rx_buffer.buffer = output_buffer;
671 
672  //push the buffer in the output queue
673  short att = 0;
674  while (not stream_q->push(rx_buffer)){
675  att++;
676  boost::this_thread::sleep_for(boost::chrono::microseconds{5});
677  if(queue_saturnation_warning and att>10){
678  print_warning("Network streaming queue saturated");
679  queue_saturnation_warning = false;
680  }
681  }
682 
683  //if no packet is present sleep for some time, there is no criticality here
684  }else{boost::this_thread::sleep_for(boost::chrono::milliseconds{2});}
685 
686  //chatch the interruption of the thread
687  }catch (boost::thread_interrupted &){ active = false; }
688  }
689  //exit operation is clean the rx queue to avoid memory leak is a interrupted measure situation
690  RX_wrapper rx_buffer_dummy;
691  while(not RX_queue->empty()){
692  RX_queue->pop(rx_buffer_dummy);
693  input_memory->trash(rx_buffer_dummy.buffer);
694  }
695 
696  //notify that the rx worker is off
697  RX_status = false;
698 
699  BOOST_LOG_TRIVIAL(info) << "Thread joining";
700 }
void start(usrp_param *global_param)
Start the threads.
void start_rx(int buffer_len, threading_condition *wait_condition, preallocator< float2 > *memory, int thread_op, param *current_settings, char front_end)
@ brief start a rx thread.
bool check_tx_status(bool verbose=false)
Check the status of every tx operations. Returns the status of A or B.
bool check_rx_status(bool verbose=false)
Check the status of every rx operations. Returns the status of A or B.
void trash(vector_type *trash_vector)
void set(usrp_param *global_param)
Launches the setting functions for the required signals, antennas...
bool tcp_streaming
Enables server tcp streaming.
int TCP_SYNC_PORT
void SetThreadName(boost::thread *thread, const char *threadName)
bool check_A_rx_status(bool verbose=false)
Check the status of A rx operations.
void print_error(std::string text)
bool check_B_tx_status(bool verbose=false)
Check the status of B tx operations.
tx_queue * B_TX_queue
Queue accessed to stream data from B frontend.
preallocator< float2 > * memory
bool stop(bool force=false)
void Thread_Prioriry(boost::thread &Thread, int priority, int affinity)
This class handles the DSP of the buffer coming from the the SDR. This is the class to implement to a...
tx_queue * A_TX_queue
Queue accessed to stream data from A frontend.
rx_queue * out_queue
int process(float2 **__restrict__ in, float2 **__restrict__ out)
PAcket handler for DSP class. This method process information pointed by the in parameter and write t...
rx_queue * stream_queue
Pointer to the TCP streaming queue, initialized with the class. Output of every frontend rx dsp proce...
void set_this_thread_name(std::string thread_name)
Set the htread name reported in the logging.
preallocator< float2 > * rx_output_memory
Pointer to the output memory allocator of all frontends.
bool preset_usrp(usrp_param *requested_config)
Set the USRP device with user parameters.
bool file_writing
Enables server local file writing.
bool check_B_rx_status(bool verbose=false)
Check the status of B rx operations.
void update_pointers(rx_queue *init_stream_queue, preallocator< float2 > *init_memory)
bool diagnostic
Enables diagnostic info on output.
void close_tx()
Close all the tx streamer threads.
void start_tx(threading_condition *wait_condition, int thread_op, param *current_settings, char front_end, preallocator< float2 > *memory=NULL)
Start a transmission thread. The threads started by this function do two things: pop a packet from th...
void close_rx()
Close all the rx streamer threads.
bool stop(bool force=false)
bool check_A_tx_status(bool verbose=false)
Check the status of A tx operations.
void print_debug(std::string text, double value)
void start(usrp_param *global_params)
bool stop(bool force=false)
void print_warning(std::string text)
void update_pointers(rx_queue *init_queue, preallocator< float2 > *init_memory)
void connect(int init_tcp_port)
rx_queue * A_RX_queue
Queue accessed to retrive data from A frontend.
TXRX(server_settings *settings, hardware_manager *init_hardware, bool diagnostic_init=false)
Initialization method requires an already initialized hardware manager class and an already initializ...
Manages the hardware I/O of one usrp unit.
bool start(param *current_settings)
uhd::usrp::multi_usrp::sptr main_usrp
rx_queue * B_RX_queue
Queue accessed to retrive data from B frontend.