USRP_Server  2.0
A flexible, GPU-accelerated radio-frequency readout software.
USRP_hardware_manager.cpp
Go to the documentation of this file.
2 
3 auto start = std::chrono::system_clock::now();
4 
8 hardware_manager::hardware_manager(server_settings* settings, bool sw_loop_init, size_t usrp_number){
9 
10  BOOST_LOG_TRIVIAL(info) << "Initializing hardware manager";
11 
12 
13  //software loop mode exclude the hardware
14  sw_loop = sw_loop_init;
15 
16  if(sw_loop)BOOST_LOG_TRIVIAL(debug) << "Software loop enabled";
17 
18  //in any case a gpu is necessary
19  cudaSetDevice(settings->GPU_device_index);
20  cudaDeviceProp props;
21  cudaGetDeviceProperties(&props, settings->GPU_device_index);
22 
23 
24  if(not sw_loop){
25 
26  this_usrp_number = usrp_number;
27 ;
28 
29  //recursively look for usrps
30  dev_addrs = uhd::device::find(hint);
31  std::cout<<"Looking for USRP x300 device number "<< usrp_number << " .." <<std::flush;
32  while(dev_addrs.size()< usrp_number + 1){
33 
34  dev_addrs = uhd::device::find(hint);
35  std::cout<<"."<<std::flush;
36  usleep(1e6);
37  }
38 
39 
40  std::cout<<"Device found and assigned to GPU "<< props.name <<" ("<< settings->GPU_device_index <<")"<<std::endl;
41  //for(size_t ii = 0; ii<dev_addrs.size(); ii++){
42  // std::cout<<dev_addrs[ii].to_pp_string()<<std::endl;
43  //}
44  //assign desired address
45 
46 
47  //uhd::device_addr_t args("addr=192.168.30.2,second_addr=192.168.40.2");
48  if(device_arguments.compare("noarg")!=0){
49  uhd::device_addr_t args(device_arguments);
50  main_usrp = uhd::usrp::multi_usrp::make(args);
51  std::cout<< "Creating device with arguments: "<<device_arguments <<std::endl;
52  }else{
53  main_usrp = uhd::usrp::multi_usrp::make(dev_addrs[usrp_number]);
54  }
55  //set the clock reference
56  main_usrp->set_clock_source(settings->clock_reference);
57 
58  }else{
59  A_sw_loop_queue = new tx_queue(SW_LOOP_QUEUE_LENGTH);
60  B_sw_loop_queue = new tx_queue(SW_LOOP_QUEUE_LENGTH);
61  }
62 
63  //initialize port connection check variables
64  A_TXRX_chk = OFF;
65  B_RX2_chk = OFF;
66  B_TXRX_chk = OFF;
67  A_RX2_chk = OFF;
68 
69  //set the thread state
70  A_rx_thread_operation = false;
71  A_tx_thread_operation = false;
72  B_rx_thread_operation = false;
73  B_tx_thread_operation = false;
74 
75  //settling time for fpga register initialization
76  std::this_thread::sleep_for(std::chrono::milliseconds(800));
77 
78  //initialize transmission queues
79  A_RX_queue = new rx_queue(RX_QUEUE_LENGTH);
80  A_TX_queue = new tx_queue(TX_QUEUE_LENGTH);
81  B_RX_queue = new rx_queue(RX_QUEUE_LENGTH);
82  B_TX_queue = new tx_queue(TX_QUEUE_LENGTH);
83 
84  A_tx_error_queue = new error_queue(ERROR_QUEUE_LENGTH);
85  B_tx_error_queue = new error_queue(ERROR_QUEUE_LENGTH);
86 
87  A_rx_stream = nullptr;
88  A_tx_stream = nullptr;
89  B_rx_stream = nullptr;
90  B_tx_stream = nullptr;
91 
92  if(not sw_loop)main_usrp->set_time_now(0.);
93 
94  BOOST_LOG_TRIVIAL(info) << "Hardware manager initilaized";
95 }
96 
100 bool hardware_manager::preset_usrp(usrp_param* requested_config){
101 
102  if(not sw_loop){
103  BOOST_LOG_TRIVIAL(info) << "Presetting USRP";
104  }else{
105  BOOST_LOG_TRIVIAL(info) << "Presetting sw loop queue";
106  }
107 
108  apply(requested_config);
109  set_streams();
110  if(not sw_loop){
111  check_tuning();
112  }
113 
114  BOOST_LOG_TRIVIAL(info) << "Preset done";
115  return true;
116 
117 }
118 
120  std::this_thread::sleep_for(std::chrono::milliseconds(20));
121  bool op = A_rx_thread_operation;
122  if(verbose)print_debug("RX thread status: ",op);
123  return op;
124 }
125 
127  std::this_thread::sleep_for(std::chrono::milliseconds(20));
128  bool op = B_rx_thread_operation;
129  if(verbose)print_debug("RX thread status: ",op);
130  return op;
131 }
132 
134  std::this_thread::sleep_for(std::chrono::milliseconds(20));
136  if(verbose)print_debug("RX thread status: ",op);
137  return op;
138 }
139 
141  std::this_thread::sleep_for(std::chrono::milliseconds(20));
142  bool op = B_tx_thread_operation or A_tx_thread_operation;
143  if(verbose)print_debug("TX thread status: ",op);
144  return op;
145 }
146 
148  std::this_thread::sleep_for(std::chrono::milliseconds(20));
149  bool op = A_tx_thread_operation;
150  if(verbose)print_debug("TX thread status: ",op);
151  return op;
152 }
153 
155  std::this_thread::sleep_for(std::chrono::milliseconds(20));
156  bool op = B_tx_thread_operation;
157  if(verbose)print_debug("TX thread status: ",op);
158  return op;
159 }
160 
166  threading_condition* wait_condition, //before joining wait for that condition
167  int thread_op, //core affinity of the process
168  param *current_settings, //representative of the paramenters (must match A or B frontend description)
169  char front_end, //must be "A" or "B"
170 preallocator<float2>* memory //if the thread is transmitting a buffer that requires dynamical allocation than a pointer to custo memory manager class has to be passed
171 ){
172 
173  BOOST_LOG_TRIVIAL(debug) << "Starting tx threads";
174  bool tx_thread_operation;
175 
176  if(front_end=='A'){
177  tx_thread_operation = A_tx_thread_operation;
178  }else if(front_end=='B'){
179  tx_thread_operation = B_tx_thread_operation;
180  }else{
181  print_error("Front end code not recognised by hardware manager");
182  return;
183  }
184 
185  if(not tx_thread_operation){
186 
187  //start the thread
188  if(not sw_loop){
189  if(front_end=='A'){
190  A_tx_thread = new boost::thread(boost::bind(&hardware_manager::single_tx_thread,this,
191  current_settings,
192  wait_condition,
193  A_TX_queue,
194  A_tx_stream,
195  memory,
196  'A'
197  ));
198  SetThreadName(A_tx_thread, "A_tx_thread");
199  Thread_Prioriry(*A_tx_thread, 99, thread_op);
200 
201  }else if(front_end=='B'){
202  B_tx_thread = new boost::thread(boost::bind(&hardware_manager::single_tx_thread,this,
203  current_settings,
204  wait_condition,
205  B_TX_queue,
206  B_tx_stream,
207  memory,
208  'B'
209  ));
210 
211  SetThreadName(B_tx_thread, "B_tx_thread");
212  Thread_Prioriry(*B_tx_thread, 99, thread_op);
213 
214  }
215  }else{
216  if(front_end=='A'){
217  A_tx_thread = new boost::thread(boost::bind(&hardware_manager::software_tx_thread,this,current_settings,memory,A_TX_queue,A_sw_loop_queue,'A'));
218  Thread_Prioriry(*A_tx_thread, 99, thread_op);
219  }else if(front_end=='B'){
220  B_tx_thread = new boost::thread(boost::bind(&hardware_manager::software_tx_thread,this,current_settings,memory,B_TX_queue,B_sw_loop_queue,'B'));
221  Thread_Prioriry(*B_tx_thread, 99, thread_op);
222  }
223  }
224  }else{
225  std::stringstream ss;
226  ss << "Cannot start TX thread, a tx thread associated with USRP "<< this_usrp_number <<" is already running";
227  print_error(ss.str());
228  }
229 
230  BOOST_LOG_TRIVIAL(debug) << "tx threads started";
231 }
232 
235  int buffer_len, //length of the buffer. MUST be the same of the preallocator initialization
236  threading_condition* wait_condition, //before joining wait for that condition
237  preallocator<float2>* memory, //custom memory preallocator
238  int thread_op,
239  param *current_settings, //representative of the paramenters (must match A or B frontend description)
240  char front_end //must be "A" or "B"
241 
242  ){
243  BOOST_LOG_TRIVIAL(debug) << "Starting rx threads";
244  bool rx_thread_operation;
245 
246  if(front_end=='A'){
247  rx_thread_operation = A_rx_thread_operation;
248  }else if(front_end=='B'){
249  rx_thread_operation = B_rx_thread_operation;
250  }else{
251  print_error("Front end code not recognised by hardware manager");
252  return;
253  }
254 
255  if(not rx_thread_operation){
256  //start the thread
257  if(not sw_loop){
258  if(front_end=='A'){
259  A_rx_thread = new boost::thread(boost::bind(&hardware_manager::single_rx_thread,this,
260  current_settings,
261  A_RX_queue,
262  wait_condition,
263  memory,
264  A_rx_stream,
265  'A'));
266  Thread_Prioriry(*A_rx_thread, 99, thread_op);
267  SetThreadName(A_rx_thread, "A_rx_thread");
268  }else if(front_end=='B'){
269  B_rx_thread = new boost::thread(boost::bind(&hardware_manager::single_rx_thread,this,
270  current_settings,
271  B_RX_queue,
272  wait_condition,
273  memory,
274  B_rx_stream,
275  'B'));
276  Thread_Prioriry(*B_rx_thread, 99, thread_op);
277  SetThreadName(B_rx_thread, "B_rx_thread");
278  }
279  }else{
280  if(front_end=='A'){
281  A_rx_thread = new boost::thread(boost::bind(&hardware_manager::software_rx_thread,this,current_settings,memory,A_RX_queue,A_sw_loop_queue,'A'));
282  Thread_Prioriry(*A_rx_thread, 99, thread_op);
283  SetThreadName(A_rx_thread, "A_rx_thread");
284  }else if(front_end=='B'){
285  B_rx_thread = new boost::thread(boost::bind(&hardware_manager::software_rx_thread,this,current_settings,memory,B_RX_queue,B_sw_loop_queue,'B'));
286  Thread_Prioriry(*B_rx_thread, 99, thread_op);
287  SetThreadName(B_rx_thread, "B_rx_thread");
288  }
289  }
290 
291  }else{
292  std::stringstream ss;
293  ss << "Cannot start RX thread, a rx threead associated with USRP "<< this_usrp_number <<" is already running";
294  print_error(ss.str());
295  }
296  BOOST_LOG_TRIVIAL(debug) << "rx threads started";
297 }
300 
301  if(A_tx_thread_operation){
302  A_tx_thread->interrupt();
303  A_tx_thread->join();
304  delete A_tx_thread;
305  A_tx_thread = nullptr;
306  A_tx_thread_operation = false;
307 
308  }
309 
310  if(B_tx_thread_operation){
311  B_tx_thread->interrupt();
312  B_tx_thread->join();
313  delete B_tx_thread;
314  B_tx_thread = nullptr;
315  B_tx_thread_operation = false;
316 
317  }
318 
319 }
323 
324  A_rx_thread->interrupt();
325  A_rx_thread->join();
326  delete A_rx_thread;
327  A_rx_thread = nullptr;
328  A_rx_thread_operation = false;
329 
330  }
331 
333  B_rx_thread->interrupt();
334  B_rx_thread->join();
335  delete B_rx_thread;
336  B_rx_thread = nullptr;
337  B_rx_thread_operation = false;
338  }
339 
340 }
344 
345  BOOST_LOG_TRIVIAL(info) << "Cleaning tx queue";
346  //temporary wrapper
347  float2* buffer;
348 
349  //counter. Expected to be 0
350  int counter = 0;
351 
352  while(not TX_queue->empty() or TX_queue->pop(buffer)){
353 
354  memory->trash(buffer);
355  counter ++;
356  }
357 
358  if(counter > 0){
359  std::stringstream ss;
360  ss << "TX queue cleaned of "<< counter <<"buffer(s)";
361  print_warning(ss.str());
362  }
363  BOOST_LOG_TRIVIAL(info) << "tx queue cleaned of packets: "<< counter;
364  return counter;
365 }
369 
370  BOOST_LOG_TRIVIAL(info) << "Cleaning rx queue";
371 
372  //temporary wrapper
373  RX_wrapper warapped_buffer;
374  warapped_buffer.buffer = nullptr;
375  //counter. Expected to be 0
376  int counter = 0;
377 
378  //cannot execute when the rx thread is going
379  while(not RX_queue->empty() and RX_queue->pop(warapped_buffer)){
380  memory->trash(warapped_buffer.buffer);
381  counter ++;
382  }
383 
384  if(counter > 0){
385  std::stringstream ss;
386  ss << "RX queue cleaned of "<< counter <<"buffer(s)";
387  print_warning(ss.str());
388  }
389  BOOST_LOG_TRIVIAL(info) << "rx queue cleaned of packets: "<< counter;
390  return counter;
391 }
392 
393 
394 void hardware_manager::apply(usrp_param* requested_config){
395  BOOST_LOG_TRIVIAL(info) << "Applying USRP configuration";
396  //transfer the usrp index to the setting parameters
397  requested_config->usrp_number = this_usrp_number;
398 
399  //stack of messages
400  std::stringstream ss;
401  ss<<std::endl;
402 
403  //apply configuration to each antenna on each subdevice
404  ss<<"Hardware parameter subdevice A_TXRX: ";
405  switch(requested_config->A_TXRX.mode){
406  case RX:
407  if(not sw_loop)main_usrp->set_rx_antenna("TX/RX",0);
408  break;
409  case TX:
410  if(not sw_loop)main_usrp->set_tx_antenna("TX/RX",0);
411  break;
412  case OFF:
413  ss<<"Channel is OFF";
414  break;
415  }
416  ss<<std::endl;
417  ss<<apply_antenna_config(&(requested_config->A_TXRX), &config.A_TXRX,0);
418 
419  ss<<"Hardware parameter subdevice A_RX2: ";
420  switch(requested_config->A_RX2.mode){
421  case RX:
422  if(not sw_loop)main_usrp->set_rx_antenna("RX2",0);
423  break;
424  case TX:
425  if(not sw_loop)main_usrp->set_tx_antenna("RX2",0);
426  break;
427  case OFF:
428  ss<<"Channel is OFF";
429  break;
430  }
431  ss<<std::endl;
432  ss<<apply_antenna_config(&(requested_config->A_RX2), &config.A_RX2,0);
433 
434  //std::string subdev = "B:0";
435  //main_usrp->set_rx_subdev_spec(subdev);
436 
437  ss<<"Hardware parameter subdevice B_TXRX: ";
438  switch(requested_config->B_TXRX.mode){
439  case RX:
440  if(not sw_loop)main_usrp->set_rx_antenna("TX/RX",1);
441  break;
442  case TX:
443  if(not sw_loop)main_usrp->set_tx_antenna("TX/RX",1);
444  break;
445  case OFF:
446  ss<<"Channel is OFF";
447  break;
448  }
449  ss<<std::endl;
450  ss<<apply_antenna_config(&(requested_config->B_TXRX), &config.B_TXRX,1);
451 
452  ss<<"Hardware parameter subdevice B_RX2: ";
453  switch(requested_config->B_RX2.mode){
454  case RX:
455  if(not sw_loop)main_usrp->set_rx_antenna("RX2",1);
456  break;
457  case TX:
458  if(not sw_loop)main_usrp->set_tx_antenna("RX2",1);
459  break;
460  case OFF:
461  ss<<"Channel is OFF";
462  break;
463  }
464  ss<<std::endl;
465  ss<<apply_antenna_config(&(requested_config->B_RX2), &config.B_RX2,1);
466 
467  std::cout<<ss.str();
468  BOOST_LOG_TRIVIAL(info) << "USRP configuration applied";
469 }
470 
471 bool hardware_manager::check_tuning(){
472  BOOST_LOG_TRIVIAL(info) << "Checking tuning";
473  //both rx and tx must be locked if selected
474  bool rx = true;
475  bool tx = true;
476 
477  size_t num_rx_channels = main_usrp->get_rx_num_channels();
478  size_t num_tx_channels = main_usrp->get_tx_num_channels();
479 
480  for(size_t chan = 0; chan<num_rx_channels; chan++){
481  //check for RX locking
482  std::vector<std::string> rx_sensor_names;
483  rx_sensor_names = main_usrp->get_rx_sensor_names(chan);
484 
485  try{
486  //check only if there is a channel associated with RX.
487  if(check_global_mode_presence(RX,chan)){
488  std::cout<<"Checking RX frontend tuning for channel "<< chan <<" ... "<<std::endl;
489  if (std::find(rx_sensor_names.begin(), rx_sensor_names.end(), "lo_locked") != rx_sensor_names.end()) {
490  uhd::sensor_value_t lo_locked = main_usrp->get_rx_sensor("lo_locked",chan);
491 
492  //a settling time is normal
493  int timeout_counter = 0;
494  while (not main_usrp->get_rx_sensor("lo_locked",chan).to_bool()){
495  //sleep for a short time in milliseconds
496  timeout_counter++;
497  std::this_thread::sleep_for(std::chrono::milliseconds(20));
498  if(timeout_counter>500){
499  std::stringstream ss;
500  ss<<"Cannot tune the RX frontend of channel "<<chan;
501  print_error(ss.str());
502  return false;
503  }
504  }
505  lo_locked = main_usrp->get_rx_sensor("lo_locked",chan);
506  }
507  rx = rx and main_usrp->get_rx_sensor("lo_locked",chan).to_bool();
508  }
509  }catch (uhd::lookup_error e){
510  std::cout<<"None"<<std::endl;
511  rx = true;
512  }
513  }
514  for(size_t chan = 0; chan<num_tx_channels; chan++){
515  //check for TX locking
516  std::vector<std::string> tx_sensor_names;
517  tx_sensor_names = main_usrp->get_tx_sensor_names(chan);
518  try{
519  //check only if there is a channel associated with TX.
520  if(check_global_mode_presence(TX,chan)){
521  std::cout<<"Checking TX frontend tuning for channel "<< chan <<" ... "<<std::endl;
522  if (std::find(tx_sensor_names.begin(), tx_sensor_names.end(), "lo_locked") != tx_sensor_names.end()) {
523  uhd::sensor_value_t lo_locked = main_usrp->get_tx_sensor("lo_locked",chan);
524 
525  //a settling time is normal
526  int timeout_counter = 0;
527  while (not main_usrp->get_tx_sensor("lo_locked",chan).to_bool()){
528  //sleep for a short time in milliseconds
529  timeout_counter++;
530  std::this_thread::sleep_for(std::chrono::milliseconds(20));
531  if(timeout_counter>500){
532  std::stringstream ss;
533  ss<<"Cannot tune the TX frontend of channel "<<chan;
534  print_error(ss.str());
535  return false;
536  }
537  }
538  lo_locked = main_usrp->get_tx_sensor("lo_locked",chan);
539  }
540  tx = tx and main_usrp->get_tx_sensor("lo_locked",chan).to_bool();
541  }
542  }catch (uhd::lookup_error e){
543  std::cout<<"None"<<std::endl;
544  tx = true;
545  }
546  }
547  BOOST_LOG_TRIVIAL(info) << "Tuning checked with results tx: "<< tx << " and rx: "<< rx ;
548  return rx and tx;
549 
550 }
551 
552 void hardware_manager::set_streams(){
553 
554  BOOST_LOG_TRIVIAL(info) << "Presetting streams";
555 
556  //in this function config is an object representing the current paramenters.
557 
558  //if the stream configuration is different, reset the streams
559  clear_streams();
560 
561  if(channel_num.size()!=1)channel_num.resize(1);
562 
563  if (config.A_TXRX.mode == RX and config.A_RX2.mode == RX){
564  print_error("Currently only one receiver per front end is suppored");
565  return;
566  }
567 
568  if (config.A_TXRX.mode == TX and config.A_RX2.mode == TX){
569  print_error("Currently only one transmitter per front end is suppored");
570  return;
571  }
572 
573  //declare unit to be used
574 
575  //front_end_code0 was used in a previous version. Kept for showing the scheme
576 
577  if(config.A_TXRX.mode == RX){
578  BOOST_LOG_TRIVIAL(info) << "Config A_TXRX as RX";
579  uhd::stream_args_t stream_args("fc32");
580  front_end_code0 = 'A';
581  channel_num[0] = 0;
582  stream_args.channels = channel_num;
583  if(not sw_loop)A_rx_stream = main_usrp->get_rx_stream(stream_args);
584  BOOST_LOG_TRIVIAL(info) << "Config done";
585  }else if(config.A_RX2.mode == RX){
586  BOOST_LOG_TRIVIAL(info) << "Config A_RX2 as RX";
587  uhd::stream_args_t stream_args("fc32");
588  front_end_code0 = 'B';
589  channel_num[0] = 0;
590  stream_args.channels = channel_num;
591  if(not sw_loop)A_rx_stream = main_usrp->get_rx_stream(stream_args);
592  BOOST_LOG_TRIVIAL(info) << "Config done";
593  }
594 
595  if(config.B_TXRX.mode == RX){
596  BOOST_LOG_TRIVIAL(info) << "Config B_TXRX as RX";
597  uhd::stream_args_t stream_args("fc32");
598 
599  front_end_code0 = 'C';
600  channel_num[0] = 1;
601  stream_args.channels = channel_num;
602  if(not sw_loop)B_rx_stream = main_usrp->get_rx_stream(stream_args);
603  BOOST_LOG_TRIVIAL(info) << "Config done";
604 
605  }else if(config.B_RX2.mode == RX){
606  BOOST_LOG_TRIVIAL(info) << "Config B_RX2 as RX";
607  uhd::stream_args_t stream_args("fc32");
608  front_end_code0 = 'D';
609  channel_num[0] = 1;
610  stream_args.channels = channel_num;
611  if(not sw_loop)B_rx_stream = main_usrp->get_rx_stream(stream_args);
612  BOOST_LOG_TRIVIAL(info) << "Config done";
613  }
614 
615 
616  if(config.A_RX2.mode == TX or config.A_TXRX.mode == TX){
617  BOOST_LOG_TRIVIAL(info) << "Config A_TXRX as TX";
618  uhd::stream_args_t stream_args("fc32");
619  channel_num[0] = 0;
620  stream_args.channels = channel_num;
621  if(not sw_loop)A_tx_stream = main_usrp->get_tx_stream(stream_args);
622  BOOST_LOG_TRIVIAL(info) << "Config done";
623  }
624 
625  if(config.B_RX2.mode == TX or config.B_TXRX.mode == TX){
626  BOOST_LOG_TRIVIAL(info) << "Config B_TXRX as TX";
627  uhd::stream_args_t stream_args("fc32");
628  //declare unit to be used
629  channel_num[0] = 1;
630  stream_args.channels = channel_num;
631  if(not sw_loop)B_tx_stream = main_usrp->get_tx_stream(stream_args);
632  BOOST_LOG_TRIVIAL(info) << "Config done";
633  }
634 
635  BOOST_LOG_TRIVIAL(info) << "Stream presetting done";
636 };
637 
638 void hardware_manager::clear_streams(){
639  BOOST_LOG_TRIVIAL(info) << "Resetting streams";
640  if(A_rx_stream){
641  A_rx_stream.reset();
642  A_rx_stream = nullptr;
643  }
644  if(A_tx_stream){
645  A_tx_stream.reset();
646  A_tx_stream = nullptr;
647  }
648  if(B_rx_stream){
649  B_rx_stream.reset();
650  B_rx_stream = nullptr;
651  }
652  if(B_tx_stream){
653  B_tx_stream.reset();
654  B_tx_stream = nullptr;
655  }
656  BOOST_LOG_TRIVIAL(info) << "Streams reset";
657 }
658 
659 
660 std::string hardware_manager::apply_antenna_config(param *parameters, param *old_parameters, size_t chan){
661 
662  BOOST_LOG_TRIVIAL(info) << "Configuring antenna...";
663 
664  //handles the eventual output
665  std::stringstream ss;
666 
667  //keep track of channel changing to dispay correct message.
668  bool changed = false;
669 
670  if(parameters->mode!=OFF){
671 
672  if(old_parameters->mode == OFF or old_parameters->rate != parameters->rate){
673  changed = true;
674 
675  //check if tx or rx
676  if(not sw_loop){
677  parameters->mode == RX ?
678  main_usrp->set_rx_rate(parameters->rate,chan):
679  main_usrp->set_tx_rate(parameters->rate,chan);
680  }
681  if(parameters->mode == RX){
682  if(not sw_loop){
683  old_parameters->rate = main_usrp->get_rx_rate(chan);
684  }else old_parameters->rate = parameters->rate;
685  ss << boost::format("\tSetting RX Rate: %f Msps. ") % (parameters->rate / 1e6) << std::flush;
686  }else{
687  if(not sw_loop){
688  old_parameters->rate = main_usrp->get_tx_rate(chan);
689  }else old_parameters->rate = parameters->rate;
690  ss << boost::format("\tSetting TX Rate: %f Msps. ") % (parameters->rate / 1e6) << std::flush;
691  }
692  old_parameters->rate == parameters->rate?
693  ss<<std::endl:
694  ss<<boost::format("Effective value: %f Msps. ") % (old_parameters->rate / 1e6)<<std::endl;
695  parameters->rate = old_parameters->rate;
696  }
697  try{
698  if(old_parameters->mode == OFF or old_parameters->tone != parameters->tone or old_parameters->tuning_mode != parameters->tuning_mode){
699  changed = true;
700 
701 
702  if(parameters->mode == RX) {
703 
704  if(not sw_loop){
705  main_usrp->get_rx_sensor("lo_locked",chan).to_bool();
706  if(not parameters->tuning_mode){
707 
708  uhd::tune_request_t tune_request(parameters->tone);
709  tune_request.args = uhd::device_addr_t("mode_n=integer");
710  main_usrp->set_rx_freq(tune_request,chan);
711  }else{
712  uhd::tune_request_t tune_request(parameters->tone);
713  main_usrp->set_rx_freq(tune_request,chan);
714  }
715 
716  old_parameters->tone = main_usrp->get_rx_freq(chan);
717  } else old_parameters->tone = parameters->tone;
718  old_parameters->tuning_mode = parameters->tuning_mode;
719  ss << boost::format("\tSetting RX central frequency: %f MHz. ") % (parameters->tone / 1e6);
720  if(parameters->tuning_mode){
721  ss<<" (fractional) ";
722  }else{
723  ss<<" (integer) ";
724  }
725  ss<< std::flush;
726 
727 
728  }else{
729 
730  if(not sw_loop){
731  main_usrp->get_tx_sensor("lo_locked",chan).to_bool();
732  if(not parameters->tuning_mode){
733 
734  uhd::tune_request_t tune_request(parameters->tone);
735  tune_request.args = uhd::device_addr_t("mode_n=integer");
736  main_usrp->set_tx_freq(tune_request,chan);
737  }else{
738  uhd::tune_request_t tune_request(parameters->tone);
739  main_usrp->set_tx_freq(tune_request,chan);
740  }
741  old_parameters->tone = main_usrp->get_tx_freq(chan);
742  }else{
743  old_parameters->tone = parameters->tone;
744  }
745  old_parameters->tuning_mode = parameters->tuning_mode;
746 
747  ss << boost::format("\tSetting TX central frequency: %f MHz. ") % (parameters->tone / 1e6);
748  if(parameters->tuning_mode){
749  ss<<" (fractional) ";
750  }else{
751  ss<<" (integer) ";
752  }
753  ss<< std::flush;
754  }
755 
756  old_parameters->tone == parameters->tone?
757  ss<<std::endl:
758  ss<<boost::format("Effective value: %f MHz. ") % (old_parameters->tone / 1e6)<<std::endl;
759  parameters->tone = old_parameters->tone;
760  }
761  }catch(uhd::lookup_error e){
762  ss << boost::format("\tNo mixer detected\n");
763  }
764 
765  if(old_parameters->mode == OFF or old_parameters->gain != parameters->gain){
766  changed = true;
767 
768  //check if tx or rx
769  if(not sw_loop){
770  parameters->mode == RX ?
771  main_usrp->set_rx_gain(parameters->gain,chan):
772  main_usrp->set_tx_gain(parameters->gain,chan);
773 
774  if(parameters->mode == RX){
775  old_parameters->gain = main_usrp->get_rx_gain(chan);
776  ss << boost::format("\tSetting RX gain: %d dB. ") % (parameters->gain ) << std::flush;
777  }else{
778  old_parameters->gain = main_usrp->get_tx_gain(chan);
779  ss << boost::format("\tSetting TX gain: %d dB. ") % (parameters->gain ) << std::flush;
780  }
781  old_parameters->gain == parameters->gain?
782  ss<<std::endl:
783  ss<<boost::format("Effective value: %d dB. ") % (old_parameters->gain )<<std::endl;
784  parameters->gain = old_parameters->gain;
785  }else old_parameters->gain = parameters->gain;
786  }
787  if(old_parameters->mode == OFF or old_parameters->bw != parameters->bw){
788  changed = true;
789  //check if tx or rx
790  if(not sw_loop){
791  parameters->mode == RX ?
792  main_usrp->set_rx_bandwidth(parameters->bw,chan):
793  main_usrp->set_tx_bandwidth(parameters->bw,chan);
794 
795  if(parameters->mode == RX){
796  old_parameters->bw = main_usrp->get_rx_bandwidth(chan);
797  ss << boost::format("\tSetting RX bandwidth: %f MHz. ") % (parameters->bw/ 1e6 ) << std::flush;
798  }else{
799  old_parameters->bw = main_usrp->get_tx_bandwidth(chan);
800  ss << boost::format("\tSetting TX bandwidth: %f MHz. ") % (parameters->bw/ 1e6 ) << std::flush;
801  }
802  old_parameters->bw == parameters->bw?
803  ss<<std::endl:
804  ss<<boost::format("Effective value: %f MHz. ") % (old_parameters->gain/ 1e6 )<<std::endl;
805  parameters->bw = old_parameters->bw;
806  }else old_parameters->bw = parameters->bw;
807  }
808  if(old_parameters->mode == OFF or old_parameters->delay != parameters->delay){
809  changed = true;
810  old_parameters->delay = parameters->delay;
811  ss << boost::format("\tSetting start streaming delay: %f msec. ") % (parameters->delay*1e3 ) << std::endl;
812  }
813  if((old_parameters->mode == OFF or old_parameters->burst_off != parameters->burst_off) and parameters->burst_off != 0){
814  changed = true;
815  old_parameters->burst_off = parameters->burst_off;
816  ss << boost::format("\tSetting interval between bursts: %f msec. ") % (parameters->burst_off*1e3 ) << std::endl;
817  }
818  if((old_parameters->mode == OFF or old_parameters->burst_on != parameters->burst_on) and parameters->burst_on != 0){
819  changed = true;
820  old_parameters->burst_on = parameters->burst_on;
821  ss << boost::format("\tSetting bursts duration: %f msec. ") % (parameters->burst_on*1e3 ) << std::endl;
822  }
823  if(old_parameters->mode == OFF or old_parameters->samples != parameters->samples){
824  changed = true;
825  old_parameters->samples = parameters->samples;
826  ss << boost::format("\tSetting total samples to %f Ms") % (parameters->samples /1e6 ) << std::endl;
827  }
828  if(old_parameters->mode == OFF or old_parameters->buffer_len != parameters->buffer_len){
829  changed = true;
830  old_parameters->buffer_len = parameters->buffer_len;
831  ss << boost::format("\tSetting buffer length to %f Ms") % (parameters->buffer_len /1e6 ) << std::endl;
832  }
833  if(old_parameters->mode == OFF or old_parameters->burst_off != parameters->burst_off){
834  changed = true;
835  old_parameters->burst_off = parameters->burst_off;
836  if(old_parameters->burst_off == 0){
837  ss << boost::format("\tSetting continuous acquisition mode (no bursts)")<< std::endl;
838  }else{
839  ss << boost::format("\tSetting samples between bursts to %f msec") % (parameters->burst_off *1e3 ) << std::endl;
840  }
841  }
842  if(old_parameters->mode == OFF or old_parameters->burst_on != parameters->burst_on){
843  changed = true;
844  old_parameters->burst_on = parameters->burst_on;
845  if(old_parameters->burst_on != 0){
846  ss << boost::format("\tSetting bursts length to %f msec") % (parameters->burst_on *1e3 ) << std::endl;
847  }
848  }
849  if(not changed) ss<<"\tHardware parameters were identical to last setup"<<std::endl;
850  }
851  //last thing to do
852  old_parameters->mode = parameters->mode;
853 
854  BOOST_LOG_TRIVIAL(info) << "Antenna configured";
855 
856  return ss.str();
857 
858 }
859 
860 //check if there are more than 1 tx/rx channel
861 bool hardware_manager::check_double_txrx(ant_mode TXRX){
862  int tx = 0;
863  if(A_TXRX_chk == TXRX)tx++;
864  if(A_RX2_chk == TXRX)tx++;
865  if(B_RX2_chk == TXRX)tx++;
866  if(B_TXRX_chk == TXRX)tx++;
867  if(tx>1)return true;
868  return false;
869 }
870 
871 
872 //check if the selected mode has to be tuned
873 bool hardware_manager::check_global_mode_presence(ant_mode mode, size_t chan){
874  bool present = false;
875  if(chan == 0)present = present or (config.A_TXRX.mode == mode);
876  if(chan == 1)present = present or (config.B_TXRX.mode == mode);
877  if(chan == 0)present = present or (config.A_RX2.mode == mode);
878  if(chan == 1)present = present or (config.B_RX2.mode == mode);
879  return present;
880 }
881 
882 void hardware_manager::software_tx_thread(
883  param *current_settings, //some parameters are useful also in sw
884  preallocator<float2>* memory, //custom memory preallocator
885  tx_queue* TX_queue,
886  tx_queue* sw_loop_queue,
887  char front_end
888  ){
889  std::stringstream thread_name;
890  thread_name << "Software tx thread "<<front_end;
891  set_this_thread_name(thread_name.str());
892  BOOST_LOG_TRIVIAL(debug) << "Thread started";
893  float2* tx_buffer; //the buffer pointer
894  if(front_end == 'A'){
895  A_tx_thread_operation = true; //class variable to account for thread activity
896  }else if(front_end == 'B'){
897  B_tx_thread_operation = true;
898  }else{
899  print_error("Frontend code not recognized in software tx thread");
900  return;
901  } //class variable to account for thread activity
902  bool active = true; //local activity monitor
903  size_t sent_samp = 0; //total number of samples sent
904 
905  while(active and (sent_samp < current_settings->samples)){
906  try{
907  boost::this_thread::interruption_point();
908  if(TX_queue->pop(tx_buffer)){
909 
910  float2* tx_buffer_copy = (float2*)malloc(sizeof(float2)*current_settings->buffer_len);
911 
912  sent_samp += current_settings->buffer_len;
913 
914  memcpy(tx_buffer_copy, tx_buffer, sizeof(float2)*current_settings->buffer_len);
915 
916  while(not sw_loop_queue->push(tx_buffer_copy))std::this_thread::sleep_for(std::chrono::microseconds(1));
917 
918  if(memory)memory->trash(tx_buffer);
919 
920  }else{
921  std::this_thread::sleep_for(std::chrono::microseconds(200));
922  }
923  }catch (boost::thread_interrupted &){
924  active = false;
925 
926  }
927  }
928  if(front_end == 'A'){
929  A_tx_thread_operation = false; //class variable to account for thread activity
930  }else if(front_end == 'B'){
931  B_tx_thread_operation = false;
932  }
933  BOOST_LOG_TRIVIAL(debug) << "Thread joined";
934 }
935 
936 
937 
938 /*
939 
940 #ifndef GET_CACHE_LINE_SIZE_H_INCLUDED
941 #define GET_CACHE_LINE_SIZE_H_INCLUDED
942 
943 #include <stddef.h>
944 size_t cache_line_size();
945 
946 #if defined(__gnu_linux__)
947 
948 #include <stdio.h>
949 size_t cache_line_size() {
950  FILE * p = 0;
951  p = fopen("/sys/devices/system/cpu/cpu0/cache/index0/coherency_line_size", "r");
952  unsigned int i = 0;
953  if (p) {
954  fscanf(p, "%d", &i);
955  fclose(p);
956  }
957  return i;
958 }
959 
960 #else
961 size_t cache_line_size() {
962  return 0;
963 }
964 #endif
965 
966 #endif
967 
968 #include <emmintrin.h>
969 
970 static inline void prefetch_range(void *addr, size_t len, size_t cache){
971  char *addr_c = reinterpret_cast<char*>(addr);
972  char *cp;
973  char *end = addr_c + len;
974 
975  for (cp = addr_c; cp < end; cp += cache*4){
976  //__builtin_prefetch(cp);
977  _mm_prefetch(cp, _MM_HINT_T1);
978  }
979 
980 }
981 inline float2* get_buffer_ready(tx_queue* __restrict__ TX_queue, size_t buffer_len_tx, size_t cache){
982  float2* tx_buffer;
983  while(not TX_queue->pop(tx_buffer))std::this_thread::sleep_for(std::chrono::nanoseconds(10));
984  //there is no need of the full buffer as the cache may be limited and the access pattern is very regular.
985  prefetch_range(tx_buffer, buffer_len_tx, cache);
986  return tx_buffer;
987 }
988 */
989 void hardware_manager::single_tx_thread(
990  param *current_settings, //(managed internally to the class) user parameter to use for rx setting
991  threading_condition* wait_condition, //before joining wait for that condition
992  tx_queue* TX_queue, //associated tx stream queue
993  uhd::tx_streamer::sptr &tx_stream, //stream to usrp
994  preallocator<float2>* memory, //custom memory preallocator
995  char front_end
996 ){
997 
998  std::stringstream thread_name;
999  thread_name << "Hardware tx thread "<<front_end;
1000  set_this_thread_name(thread_name.str());
1001 
1002  uhd::set_thread_priority_safe(1.);
1003  bool active = true;
1004  size_t sent_samp = 0; //total number of samples sent
1005  float2* tx_buffer; //the buffer pointer
1006  //float2* tx_next_buffer; //the buffer pointer next
1007 
1008 
1009  //SetThreadName(metadata_thread, "TX_metadata_thread");
1010  std::stringstream ss;
1011  ss<< "TX worker " << front_end;
1012  set_this_thread_name(ss.str());
1013 
1014  if(front_end == 'A'){
1015  A_tx_thread_operation = true; //class variable to account for thread activity
1016  }else if(front_end == 'B'){
1017  B_tx_thread_operation = true;
1018  }else{
1019  print_error("Frontend code not recognized in hardware tx thread");
1020  return;
1021  } //class variable to account for thread activity
1022 
1023 
1024  //double start_time = main_usrp->get_time_now().get_real_secs();
1025 
1026  uhd::tx_metadata_t metadata_tx;
1027 
1028  BOOST_LOG_TRIVIAL(debug) <<"Starting metadata thread";
1029  boost::thread* metadata_thread = nullptr;
1030  if(front_end == 'A'){
1031  metadata_thread = new boost::thread(boost::bind(&hardware_manager::async_stream,this,tx_stream,front_end));
1032  }
1033  metadata_tx.start_of_burst = true;
1034  metadata_tx.end_of_burst = false;
1035  metadata_tx.has_time_spec = true;
1036  metadata_tx.time_spec = uhd::time_spec_t(1.0+current_settings->delay);
1037  double timeout = 1.0+current_settings->delay + 0.1;
1038  //optimizations for tx loop
1039  size_t max_samples_tx = current_settings->samples;
1040  //double burst_off = current_settings->burst_off;
1041  size_t buffer_len_tx = current_settings->buffer_len;
1042  //size_t cache = cache_line_size();
1043 
1044 
1045  std::future<float2*> handle;
1046  //tx_next_buffer = get_buffer_ready(TX_queue, buffer_len_tx, cache);
1047  BOOST_LOG_TRIVIAL(info) <<"Starting main loop";
1048  while(active and (sent_samp < current_settings->samples)){
1049  try{
1050 
1051  boost::this_thread::interruption_point();
1052 
1053  if(sent_samp + buffer_len_tx >= max_samples_tx) metadata_tx.end_of_burst = true;
1054 
1055  //tx_buffer = tx_next_buffer;
1056 
1057  //handle = std::async(std::launch::async, get_buffer_ready, TX_queue, buffer_len_tx, cache);
1058 
1059  while(not TX_queue->pop(tx_buffer))std::this_thread::sleep_for(std::chrono::nanoseconds(500));
1060 
1061  sent_samp += tx_stream->send(tx_buffer, buffer_len_tx, metadata_tx, timeout);
1062  timeout = 0.1f;
1063  metadata_tx.start_of_burst = false;
1064  metadata_tx.has_time_spec = false;
1065 
1066  //tx_next_buffer = handle.get();
1067 
1068  if(memory)memory->trash(tx_buffer);
1069 
1070  }catch (boost::thread_interrupted &){
1071  active = false;
1072  BOOST_LOG_TRIVIAL(info) <<"Interrupt received";
1073  }
1074 
1075  }
1076  //clean the queue as it's le last consumer
1077  while(not TX_queue->empty()){
1078  TX_queue->pop(tx_buffer);
1079  if(memory)memory->trash(tx_buffer);
1080  }
1081  //something went wrong and the thread has interrupred
1082  if(not active and sent_samp < current_settings->samples){
1083  print_warning("TX thread was joined without transmitting the specified samples");
1084  std::cout<< "Missing "<< current_settings->samples - sent_samp<<" samples"<<std::endl;
1085  BOOST_LOG_TRIVIAL(info) <<"Thread was joined without transmitting "<< current_settings->samples - sent_samp<<" samples";
1086  }
1087  if(front_end == 'A'){
1088  metadata_thread->interrupt();
1089  metadata_thread->join();
1090  delete metadata_thread;
1091  metadata_thread = nullptr;
1092  }
1093  //set check the condition to false
1094  if(front_end == 'A'){
1095  A_tx_thread_operation = false; //class variable to account for thread activity
1096  }else if(front_end == 'B'){
1097  B_tx_thread_operation = false;
1098  }
1099  tx_stream.reset();
1100 
1101  BOOST_LOG_TRIVIAL(debug) << "Thread joined";
1102 }
1103 
1104 //ment to be in a thread. receive messages asyncronously on metadata
1105 void hardware_manager::async_stream(uhd::tx_streamer::sptr &tx_stream, char fornt_end){
1106  bool parent_process_active = false;
1107  bool active = true;
1108  uhd::async_metadata_t async_md;
1109  int errors;
1110  while(active){
1111  try{
1112 
1113  boost::this_thread::interruption_point();
1114 
1115  if (fornt_end == 'A'){
1116  parent_process_active = A_tx_thread_operation;
1117  }else if (fornt_end == 'B'){
1118  parent_process_active = B_tx_thread_operation;
1119  }
1120 
1121  if(parent_process_active){
1122  errors = 0;
1123  if(tx_stream->recv_async_msg(async_md)){
1124  errors = get_tx_error(&async_md,true);
1125  }
1126  if(errors>0 and parent_process_active){
1127  if (fornt_end == 'A'){
1128  A_tx_error_queue->push(1);
1129  }else if (fornt_end == 'B'){
1130  B_tx_error_queue->push(1);
1131  }
1132  }
1133 
1134  }else{ active = false; }
1135  }catch (boost::thread_interrupted &e){ active = false; }
1136  std::this_thread::sleep_for(std::chrono::milliseconds(5));
1137 
1138  }
1139  tx_stream.reset();
1140 }
1141 
1142 void hardware_manager::software_rx_thread(
1143  param *current_settings,
1144  preallocator<float2>* memory,
1145  rx_queue* Rx_queue,
1146  tx_queue* sw_loop_queue,
1147  char front_end
1148 ){
1149  std::stringstream thread_name;
1150  thread_name << "Software rx thread "<<front_end;
1151  set_this_thread_name(thread_name.str());
1152  BOOST_LOG_TRIVIAL(debug) << "Thread started";
1153  char front_end_c;
1154  if(front_end == 'A'){
1155  A_rx_thread_operation = true; //class variable to account for thread activity
1156  front_end_c = 'B';
1157  }else if(front_end == 'B'){
1158  B_rx_thread_operation = true;
1159  front_end_c = 'D';
1160  }else{
1161  print_error("Frontend code not recognized in software rx thread");
1162  return;
1163  } //class variable to account for thread activity
1164  bool active = true; //control the interruption of the thread
1165  bool taken = true;
1166  RX_wrapper warapped_buffer;
1167  float2 *rx_buffer;
1168  float2 *rx_buffer_cpy;
1169  size_t acc_samp = 0; //total number of samples received
1170  int counter = 0;
1171  while(active and (acc_samp < current_settings->samples)){
1172 
1173  try{
1174 
1175  boost::this_thread::interruption_point();
1176 
1177  if(taken)rx_buffer_cpy = memory->get();
1178 
1179  if(sw_loop_queue->pop(rx_buffer)){
1180  taken = true;
1181  counter++;
1182  memcpy(rx_buffer_cpy, rx_buffer, current_settings->buffer_len * sizeof(float2));
1183  free(rx_buffer);
1184  warapped_buffer.buffer = rx_buffer_cpy;
1185  warapped_buffer.packet_number = counter;
1186  warapped_buffer.length = current_settings->buffer_len;
1187  warapped_buffer.errors = 0;
1188  warapped_buffer.front_end_code = front_end_c;
1189  while(not Rx_queue->push(warapped_buffer))std::this_thread::sleep_for(std::chrono::microseconds(1));
1190  acc_samp += current_settings->buffer_len;
1191  }else{
1192  taken = false;
1193  std::this_thread::sleep_for(std::chrono::milliseconds(5));
1194 
1195  }
1196 
1197  }catch (boost::thread_interrupted &){ active = false;}
1198 
1199  }
1200  if(front_end == 'A'){
1201  A_rx_thread_operation = false; //class variable to account for thread activity
1202  }else if(front_end == 'B'){
1203  B_rx_thread_operation = false;
1204  }
1205  BOOST_LOG_TRIVIAL(debug) << "Thread joined";
1206 }
1207 
1208 
1209 void hardware_manager::single_rx_thread(
1210  param *current_settings, //(managed internally) user parameter to use for rx setting
1211  rx_queue* Rx_queue, //(managed internally)queue to use for pushing
1212  threading_condition* wait_condition, //before joining wait for that condition
1213  preallocator<float2>* memory, //custom memory preallocator
1214  uhd::rx_streamer::sptr &rx_stream , //the streamer to usrp
1215  char front_end //front end code for operation accountability
1216 
1217 ){
1218 
1219  std::stringstream thread_name;
1220  thread_name << "Hardware rx thread "<<front_end;
1221  set_this_thread_name(thread_name.str());
1222  BOOST_LOG_TRIVIAL(debug) << "Thread started";
1223  char front_end_c;
1224  if(front_end == 'A'){
1225  A_rx_thread_operation = true; //class variable to account for thread activity
1226  front_end_c = 'B';
1227  }else if(front_end == 'B'){
1228  B_rx_thread_operation = true;
1229  front_end_c = 'D';
1230  }else{
1231  print_error("Frontend code not recognized in hardware rx thread");
1232  return;
1233  }
1234  if (not uhd::set_thread_priority_safe(+1)){
1235  std::stringstream ss;
1236  ss<<"Cannot set thread priority from tx thread"<<front_end;
1237  print_warning(ss.str());
1238  }
1239  bool active = true; //control the interruption of the thread
1240 
1241  float2* rx_buffer; //pointer to the receiver buffer
1242  size_t num_rx_samps = 0; //number of samples received in a single loop
1243  size_t acc_samp = 0; //total number of samples received
1244  size_t frag_count = 0; //fragmentation count used for very long buffer
1245  float timeout = 0.1f; //timeout in seconds(will be used to sync the recv calls)
1246  size_t counter = 0; //internal packet number
1247  size_t errors = 0; //error counter (per loop)
1248  size_t samples_remaining; //internal loop samples counter
1249  size_t push_counter; //number of pushing attempts
1250  size_t push_timer = 1; //interval between queue pushing attempts
1251 
1252  //packet wrapping structure
1253  RX_wrapper warapped_buffer;
1254  warapped_buffer.usrp_number = this_usrp_number;
1255 
1256 
1257 
1258  //setting the start metadata
1259  uhd::rx_metadata_t metadata_rx;
1260  //metadata_rx.has_time_spec = true; // in this application the time specification is always set
1261  //metadata_rx.time_spec = uhd::time_spec_t(1.0 + current_settings->delay); //set the transmission delay
1262  //metadata_rx.start_of_burst = true;
1263 
1264  //setting the stream command (isn't it redundant with metadata?)
1265  uhd::stream_cmd_t stream_cmd(uhd::stream_cmd_t::STREAM_MODE_START_CONTINUOUS);
1266 
1267  //change the mode properly
1268  if(current_settings->burst_off == 0){
1269  stream_cmd.stream_mode = uhd::stream_cmd_t::STREAM_MODE_START_CONTINUOUS ;
1270  stream_cmd.num_samps = 0;
1271  }else{
1272  stream_cmd.stream_mode = uhd::stream_cmd_t::STREAM_MODE_NUM_SAMPS_AND_MORE;
1273  stream_cmd.num_samps = current_settings->buffer_len;
1274  }
1275  stream_cmd.stream_now = current_settings->delay == 0 ? true:false;
1276 
1277  stream_cmd.time_spec = uhd::time_spec_t(1.0+current_settings->delay);
1278 
1279  //if the number of samples to receive is smaller than the buffer the first packet is also the last one
1280  metadata_rx.end_of_burst = current_settings->samples <= current_settings->buffer_len ? true : false;
1281 
1282  //needed to change metadata only once
1283  bool first_packet = true;
1284 
1285  //needed to download the error from tx queue
1286  bool tmp;
1287 
1288  //issue the stream command (ignoring the code above @todo)
1289  stream_cmd.stream_now = false;
1290  stream_cmd.num_samps = current_settings->buffer_len;
1291  stream_cmd.time_spec = uhd::time_spec_t(1.0+current_settings->delay);
1292  timeout = 1.0+current_settings->delay;
1293  stream_cmd.stream_mode = uhd::stream_cmd_t::STREAM_MODE_START_CONTINUOUS ;
1294  rx_stream->issue_stream_cmd(stream_cmd);
1295 
1296  // Just setting the stop command for later
1297  uhd::stream_cmd_t stop_cmd(uhd::stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS);
1298  stop_cmd.stream_now = false;
1299 
1300  uhd::set_thread_priority_safe(+1);
1301  while(active and acc_samp < current_settings->samples){
1302  //std::cout<<"samples: "<<acc_samp<<"/"<< current_settings->samples<<std::endl;
1303  try{
1304 
1305  boost::this_thread::interruption_point();
1306 
1307  //get a new buffer
1308  rx_buffer = memory->get();
1309 
1310  //reset/increment counters
1311  num_rx_samps = 0;
1312  frag_count = 0;
1313  errors = 0;
1314  counter++;
1315 
1316  //issue an other command for the burst mode
1317  /*
1318  if(stream_cmd.stream_mode == uhd::stream_cmd_t::STREAM_MODE_NUM_SAMPS_AND_MORE and not first_packet){
1319  stream_cmd.stream_now = false;
1320  stream_cmd.time_spec = uhd::time_spec_t(current_settings->burst_off);
1321  rx_stream->issue_stream_cmd(stream_cmd);
1322  std::cout<<"stream command is: "<<"STREAM_MODE_NUM_SAMPS_AND_MORE"<<std::endl;
1323  }
1324  */
1325 
1326  //this hsould be only one cycle however there are cases in which multiple recv calls are needed
1327  while(num_rx_samps<current_settings->buffer_len){
1328 
1329  //how many samples have to be received yet
1330  samples_remaining = std::min(current_settings->buffer_len,current_settings->buffer_len-num_rx_samps);
1331 
1332  //how long to wait for new samples //TODO not adapting the timeout can produce O
1333  //timeout = std::max((float)samples_remaining/(float)current_settings->rate,0.2f);
1334 
1335  //if(first_packet)std::this_thread::sleep_for(std::chrono::nanoseconds(size_t(1.e9*current_settings->delay)));
1336 
1337  //receive command
1338  num_rx_samps += rx_stream->recv(rx_buffer + num_rx_samps, samples_remaining, metadata_rx,timeout);//,0.1f
1339  timeout = 0.01f;
1340  //interpret errors
1341  if(get_rx_errors(&metadata_rx, true)>0)errors++;
1342 
1343  //get errors from tx thread if present
1344  if(front_end == 'A'){
1345  while(A_tx_error_queue->pop(tmp) and A_tx_thread_operation)errors++;
1346  }else if(front_end == 'B'){
1347  while(B_tx_error_queue->pop(tmp) and B_tx_thread_operation)errors++;
1348  }
1349 
1350  //change metadata for continuous streaming or burst mode
1351  if(first_packet){
1352  metadata_rx.start_of_burst = current_settings->samples <= current_settings->buffer_len ? true : false;
1353  metadata_rx.has_time_spec = current_settings->burst_off == 0? false:true;
1354  metadata_rx.time_spec = uhd::time_spec_t(current_settings->burst_off);
1355  first_packet = false;
1356  }
1357  if(++frag_count>1){
1358  std::cout<< "F" << frag_count<<std::flush;
1359  }
1360  //fragmentation handling (intended as: the USRP is not keeping up with the host thread)
1361  if(++frag_count>4){
1362  std::stringstream ss;
1363  ss<<"RX Fragmentation too high: "<< frag_count <<" calls to recv to reach "<<num_rx_samps<<" /"<< current_settings->buffer_len<<" samples.";
1364  print_warning(ss.str());
1365  if(frag_count>8){
1366  print_error("RX thread got stuck: USRP is not streaming any sample.");
1367  active = false;
1368  //exit this loop
1369  num_rx_samps = current_settings->buffer_len;
1370  }
1371  }
1372  }
1373 
1374 
1375  //update total number of accumulated samples
1376  acc_samp += num_rx_samps;
1377 
1378  //update metadata for last packet
1379  if(current_settings->samples < acc_samp + current_settings->buffer_len) metadata_rx.end_of_burst = true;
1380 
1381  //wrap the buffer
1382 
1383  warapped_buffer.buffer = rx_buffer;
1384  warapped_buffer.packet_number = counter;
1385  warapped_buffer.length = num_rx_samps;
1386  warapped_buffer.errors = errors;
1387  warapped_buffer.front_end_code = front_end_c;
1388 
1389  //insist in pushing the buffer
1390  push_counter = 0;
1391  while(not Rx_queue->push(warapped_buffer)){
1392  std::this_thread::sleep_for(std::chrono::microseconds(push_timer));
1393  if(push_counter>1)print_warning("RX queue is experencing some delay. This may cause troubles in real time acquisition.");
1394  push_counter++;
1395  if(push_timer * push_counter > 1e3*current_settings->buffer_len/current_settings->rate){
1396  print_warning("RX queue is experencing some delay. This may cause troubles in real time acquisition.");
1397  }
1398  }
1399 
1400  }catch (boost::thread_interrupted &){ active = false; }
1401  }
1402 
1403  rx_stream->issue_stream_cmd(stop_cmd);
1404 
1405  flush_rx_streamer(rx_stream); // flush the cache
1406  rx_stream.reset();
1407 
1408  //something went wrong and the thread has interrupred
1409  if(not active){
1410  print_warning("RX thread was taken down without receiving the specified samples");
1411  }
1412 
1413  // Class atomic variable to account for thread activity:
1414  // Result in the check function to return to false.
1415  if(front_end == 'A'){
1416  A_rx_thread_operation = false;
1417  }else if(front_end == 'B'){
1418  B_rx_thread_operation = false;
1419  }
1420 
1421  BOOST_LOG_TRIVIAL(debug) << "Thread joined";
1422 }
1423 
1424 void hardware_manager::flush_rx_streamer(uhd::rx_streamer::sptr &rx_streamer) {
1425  constexpr double timeout { 0.010 }; // 10ms
1426  constexpr size_t size { 1048576 };
1427  static float2 dummy_buffer[size];
1428  static uhd::rx_metadata_t dummy_meta { };
1429  while (rx_streamer->recv(dummy_buffer, size, dummy_meta, timeout)) {}
1430 }
std::atomic< bool > B_rx_thread_operation
std::string device_arguments
void start_rx(int buffer_len, threading_condition *wait_condition, preallocator< float2 > *memory, int thread_op, param *current_settings, char front_end)
@ brief start a rx thread.
bool check_tx_status(bool verbose=false)
Check the status of every tx operations. Returns the status of A or B.
bool check_rx_status(bool verbose=false)
Check the status of every rx operations. Returns the status of A or B.
void trash(vector_type *trash_vector)
std::atomic< bool > A_rx_thread_operation
void SetThreadName(boost::thread *thread, const char *threadName)
bool check_A_rx_status(bool verbose=false)
Check the status of A rx operations.
void print_error(std::string text)
auto start
int clean_rx_queue(rx_queue *RX_queue, preallocator< float2 > *memory)
Release the memory associated with pointers holded by a rx queue using the respective memory allocato...
bool check_B_tx_status(bool verbose=false)
Check the status of B tx operations.
tx_queue * B_TX_queue
Queue accessed to stream data from B frontend.
void Thread_Prioriry(boost::thread &Thread, int priority, int affinity)
tx_queue * A_TX_queue
Queue accessed to stream data from A frontend.
hardware_manager(server_settings *settings, bool sw_loop_init, size_t usrp_number=0)
The initializer of the class can be used to select which usrp is controlled by the class Default call...
void set_this_thread_name(std::string thread_name)
Set the htread name reported in the logging.
int clean_tx_queue(tx_queue *TX_queue, preallocator< float2 > *memory)
Release the memory associated with pointers holded by a tx queue using the respective memory allocato...
bool preset_usrp(usrp_param *requested_config)
Set the USRP device with user parameters.
bool check_B_rx_status(bool verbose=false)
Check the status of B rx operations.
int get_rx_errors(uhd::rx_metadata_t *metadata, bool verbose)
void close_tx()
Close all the tx streamer threads.
void start_tx(threading_condition *wait_condition, int thread_op, param *current_settings, char front_end, preallocator< float2 > *memory=NULL)
Start a transmission thread. The threads started by this function do two things: pop a packet from th...
void close_rx()
Close all the rx streamer threads.
bool check_A_tx_status(bool verbose=false)
Check the status of A tx operations.
void print_debug(std::string text, double value)
void print_warning(std::string text)
rx_queue * A_RX_queue
Queue accessed to retrive data from A frontend.
int get_tx_error(uhd::async_metadata_t *async_md, bool verbose)
Interpret tx errors from the async usrp comunication.
uhd::usrp::multi_usrp::sptr main_usrp
rx_queue * B_RX_queue
Queue accessed to retrive data from B frontend.