6 BOOST_LOG_TRIVIAL(info) <<
"Initializing thread link class";
18 hardware = init_hardware;
21 A_rx_buffer_len = settings->default_rx_buffer_len;
22 A_tx_buffer_len = settings->default_tx_buffer_len;
23 B_rx_buffer_len = settings->default_rx_buffer_len;
24 B_tx_buffer_len = settings->default_tx_buffer_len;
39 output_memory_size = 0;
41 std::cout<<
"Memory initialization done."<<std::endl;
56 A_current_tx_param =
nullptr;
57 A_current_rx_param =
nullptr;
58 B_current_tx_param =
nullptr;
59 B_current_rx_param =
nullptr;
62 A_tx_memory =
nullptr;
63 A_rx_memory =
nullptr;
64 B_tx_memory =
nullptr;
65 B_rx_memory =
nullptr;
69 BOOST_LOG_TRIVIAL(info) <<
"Thread link class initialized";
75 BOOST_LOG_TRIVIAL(info) <<
"Setting thread link class";
77 std::vector<param*> modes(4);
78 modes[0] = &global_param->A_TXRX;
79 modes[1] = &global_param->A_RX2;
80 modes[2] = &global_param->B_TXRX;
81 modes[3] = &global_param->B_RX2;
88 for(
size_t i = 0; i < modes.size(); i++ ){
90 if(modes[i]->mode != OFF){
91 if((modes[i]->burst_on != 0) and (modes[i]->burst_on == 0)){
92 print_error(
"one parameter has burst_off != 0 and burst_on == 0");
95 if((modes[i]->burst_on == 0) and (modes[i]->burst_on != 0)){
96 print_error(
"one parameter has burst_on != 0 and burst_off == 0");
99 if(modes[i]->burst_on != 0){
100 modes[i]->buffer_len = modes[i]->burst_on * modes[i]->rate;
101 std::cout<<
"Resizing buffer length to match burst length to: "<< modes[i]->buffer_len<<
" samples."<<std::endl;
104 int this_thread_n = thread_counter*2;
105 switch(modes[i]->mode){
110 std::cout<<
"Allocating RF frontend "<<(char)(i<2?
'A':
'B')<<
" RX memory buffer: "<< (modes[i]->buffer_len *
sizeof(float2))/(1024.*1024.)<<
" MB per buffer..."<<std::endl;
112 if(A_rx_buffer_len != modes[i]->buffer_len or not A_rx_memory){
113 if(A_rx_memory)A_rx_memory->close();
116 std::cout<<
"(already allocated).."<<std::endl;
118 A_rx_buffer_len = modes[i]->buffer_len;
123 A_current_rx_param = modes[i];
127 if(B_rx_buffer_len != modes[i]->buffer_len or not B_rx_memory){
128 if(B_rx_memory)B_rx_memory->close();
131 std::cout<<
"(already allocated).."<<std::endl;
133 B_rx_buffer_len = modes[i]->buffer_len;
138 B_current_rx_param = modes[i];
141 rx_thread_n.push_back(this_thread_n);
143 mem_mult_tmp = std::max((
int)(modes[i]->data_mem_mult),1);
144 print_debug(
"memory expansion multiplier: ",mem_mult_tmp);
145 if ((output_memory_size>modes[i]->buffer_len * mem_mult_tmp) or not
rx_output_memory){
146 std::cout<<
"Allocating RX output memory buffer: "<< (mem_mult_tmp * modes[i]->buffer_len *
sizeof(float2))/(1024.*1024.)<<
" MB per buffer..."<<std::endl;
147 if(modes[i]->buffer_len>output_memory_size and output_memory_size>0)std::cout<<
" (updating buffer size)"<<std::endl;
148 if(rx_output_memory) rx_output_memory->close();
149 output_memory_size = modes[i]->buffer_len * mem_mult_tmp;
153 std::cout<<
" RX output memory buffer requirements already satisfaid."<<std::endl;
170 std::cout<<
"Allocating RF frontend "<<(char)(i<2?
'A':
'B')<<
" TX memory buffer: "<< (modes[i]->buffer_len *
sizeof(float2))/(1024.*1024.)<<
" MB per buffer..."<<std::endl;
174 if(modes[i]->dynamic_buffer()){
175 if(not A_tx_memory or modes[i]->buffer_len != A_tx_buffer_len){
177 A_tx_memory->
close();
179 A_tx_memory =
new preallocator<float2>(modes[i]->buffer_len,TX_QUEUE_LENGTH,
false,this_thread_n);
180 A_tx_buffer_len = modes[i]->buffer_len;
182 std::cout<<
"(already allocated).."<<std::flush;
186 A_tx_memory->
close();
188 A_tx_memory =
nullptr;
191 A_tx_gen =
new TX_buffer_generator(modes[i]);
195 A_current_tx_param = modes[i];
198 if(modes[i]->dynamic_buffer()){
199 if(not B_tx_memory or modes[i]->buffer_len != B_tx_buffer_len){
200 B_tx_memory =
new preallocator<float2>(modes[i]->buffer_len,TX_QUEUE_LENGTH,
false,this_thread_n);
201 B_tx_buffer_len = modes[i]->buffer_len;
203 std::cout<<
"(already allocated).."<<std::flush;
207 B_tx_memory->
close();
209 B_tx_memory =
nullptr;
212 B_tx_gen =
new TX_buffer_generator(modes[i]);
218 B_current_tx_param = modes[i];
221 tx_thread_n.push_back(this_thread_n);
230 std::cout<<
"\033[1;32mSetting USRP hardware:\033[0m"<<std::endl;
233 BOOST_LOG_TRIVIAL(info) <<
"Thread link class set";
240 BOOST_LOG_TRIVIAL(info) <<
"Starting thread link";
246 if(not hardware->
sw_loop)hardware->
main_usrp->set_time_unknown_pps(uhd::time_spec_t(0.0));
249 if(global_param->A_TXRX.mode!=OFF){
253 A_TX_worker =
new boost::thread(boost::bind(&TXRX::tx_single_link,
this,
257 global_param->A_TXRX.samples,
258 global_param->A_TXRX.dynamic_buffer(),
268 tx_conditional_waiting,
270 &(global_param->A_TXRX),
277 std::stringstream ss;
278 ss <<
"Cannot start a new measurement on usrp "<< hardware->
this_usrp_number <<
" if there is already one running on TX";
282 }
else A_TX_worker =
nullptr;
284 if(global_param->B_TXRX.mode!=OFF){
288 B_TX_worker =
new boost::thread(boost::bind(&TXRX::tx_single_link,
this,
292 global_param->B_TXRX.samples,
293 global_param->B_TXRX.dynamic_buffer(),
304 tx_conditional_waiting,
306 &(global_param->B_TXRX),
313 std::stringstream ss;
314 ss <<
"Cannot start a new measurement on usrp "<< hardware->
this_usrp_number <<
" if there is already one running on TX";
318 }
else B_TX_worker =
nullptr;
321 if(global_param->A_RX2.mode!=OFF){
325 A_RX_worker =
new boost::thread(boost::bind(&TXRX::rx_single_link,
this,
330 global_param->A_RX2.samples,
341 rx_conditional_waiting,
344 &(global_param->A_RX2),
352 std::stringstream ss;
353 ss <<
"Cannot start a new measurement on usrp "<< hardware->
this_usrp_number <<
" if there is already one running on RX";
357 }
else A_RX_worker =
nullptr;
359 if(global_param->B_RX2.mode!=OFF){
363 B_RX_worker =
new boost::thread(boost::bind(&TXRX::rx_single_link,
this,
368 global_param->B_RX2.samples,
379 rx_conditional_waiting,
382 &(global_param->B_RX2),
392 std::stringstream ss;
393 ss <<
"Cannot start a new measurement on usrp "<< hardware->
this_usrp_number <<
" if there is already one running on RX";
397 }
else B_RX_worker =
nullptr;
399 if ((global_param->A_RX2.mode!=OFF) and (global_param->B_RX2.mode!=OFF)){
401 H5_writer->
start(global_param);
405 if(not TCP_streamer->
start(&(global_param->A_RX2))){
409 }
else if (global_param->B_RX2.mode!=OFF){
411 H5_writer->
start(global_param);
414 if(not TCP_streamer->
start(&(global_param->B_RX2))){
418 }
else if (global_param->A_RX2.mode!=OFF){
420 H5_writer->
start(global_param);
423 if(not TCP_streamer->
start(&(global_param->A_RX2))){
429 BOOST_LOG_TRIVIAL(info) <<
"Thread link started";
441 if(A_current_rx_param or B_current_rx_param){
443 bool data_output_status;
449 bool tcp_status =
tcp_streaming?(hw_status?
true:TCP_streamer->
stop(force)):
false;
451 bool wrt_status =
file_writing?(tcp_status?
true:(hw_status?
true:H5_writer->
stop())):
false;
453 data_output_status = tcp_status or wrt_status;
464 if(((not RX_status) and (not hardware->
check_rx_status()) and (not data_output_status)) or force){
469 A_RX_worker->interrupt();
472 A_RX_worker =
nullptr;
474 A_current_rx_param =
nullptr;
478 B_RX_worker->interrupt();
481 B_RX_worker =
nullptr;
483 B_current_rx_param =
nullptr;
490 }
else{status = status and
false;}
493 if(A_current_tx_param or B_current_tx_param){
504 A_TX_worker->interrupt();
511 A_current_tx_param =
nullptr;
514 B_TX_worker->interrupt();
517 B_TX_worker =
nullptr;
522 B_current_tx_param =
nullptr;
527 }
else{status = status and
false;}
531 if(status)thread_counter = 0;
533 BOOST_LOG_TRIVIAL(info) <<
"Operations concluded? "<<status;
540 void TXRX::tx_single_link(
542 TX_buffer_generator* generator,
544 size_t total_samples,
550 std::stringstream thread_name;
551 thread_name <<
"tx single link "<<front_end;
554 BOOST_LOG_TRIVIAL(info) <<
"Thread started";
556 if(front_end!=
'A' and front_end!=
'B'){
557 print_error(
"Frontend code not recognised in transmission link thread");
569 size_t tx_buffer_len = front_end==
'A'?A_tx_buffer_len:B_tx_buffer_len;
572 size_t sent_samples = preallocated*tx_buffer_len;
577 while((sent_samples < total_samples) and active){
579 boost::this_thread::interruption_point();
580 sent_samples+=tx_buffer_len;
581 if(dynamic)tx_vector = memory->
get();
582 generator->get(&tx_vector);
585 insert = queue_tx->push(tx_vector);
586 boost::this_thread::sleep_for(boost::chrono::microseconds{1});
589 boost::this_thread::sleep_for(boost::chrono::microseconds{1});
590 }
catch(boost::thread_interrupted &){ active =
false;
598 BOOST_LOG_TRIVIAL(info) <<
"Thread joining";
603 void TXRX::rx_single_link(
613 std::stringstream thread_name;
614 thread_name <<
"rx single link "<<front_end;
617 BOOST_LOG_TRIVIAL(info) <<
"Thread started";
619 if(front_end!=
'A' and front_end!=
'B'){
620 print_error(
"Frontend code not recognised in receiver link thread");
628 RX_wrapper rx_buffer;
631 float2* output_buffer;
637 size_t recv_samples = 0;
640 bool queue_saturnation_warning =
true;
645 while(active and (recv_samples < max_samples)){
649 boost::this_thread::interruption_point();
652 if(RX_queue->pop(rx_buffer)){
655 rx_buffer.channels = demodulator->
parameters->wave_type.size();
658 recv_samples += rx_buffer.length;
661 output_buffer = output_memory->
get();
664 rx_buffer.length = demodulator->
process(&rx_buffer.buffer, &output_buffer);
667 input_memory->
trash(rx_buffer.buffer);
670 rx_buffer.buffer = output_buffer;
674 while (not stream_q->push(rx_buffer)){
676 boost::this_thread::sleep_for(boost::chrono::microseconds{5});
677 if(queue_saturnation_warning and att>10){
679 queue_saturnation_warning =
false;
684 }
else{boost::this_thread::sleep_for(boost::chrono::milliseconds{2});}
687 }
catch (boost::thread_interrupted &){ active =
false; }
690 RX_wrapper rx_buffer_dummy;
691 while(not RX_queue->empty()){
692 RX_queue->pop(rx_buffer_dummy);
693 input_memory->
trash(rx_buffer_dummy.buffer);
699 BOOST_LOG_TRIVIAL(info) <<
"Thread joining";
void start(usrp_param *global_param)
Start the threads.
void start_rx(int buffer_len, threading_condition *wait_condition, preallocator< float2 > *memory, int thread_op, param *current_settings, char front_end)
@ brief start a rx thread.
bool check_tx_status(bool verbose=false)
Check the status of every tx operations. Returns the status of A or B.
bool check_rx_status(bool verbose=false)
Check the status of every rx operations. Returns the status of A or B.
void trash(vector_type *trash_vector)
void set(usrp_param *global_param)
Launches the setting functions for the required signals, antennas...
bool tcp_streaming
Enables server tcp streaming.
void SetThreadName(boost::thread *thread, const char *threadName)
bool check_A_rx_status(bool verbose=false)
Check the status of A rx operations.
void print_error(std::string text)
bool check_B_tx_status(bool verbose=false)
Check the status of B tx operations.
tx_queue * B_TX_queue
Queue accessed to stream data from B frontend.
preallocator< float2 > * memory
bool stop(bool force=false)
void Thread_Prioriry(boost::thread &Thread, int priority, int affinity)
This class handles the DSP of the buffer coming from the the SDR. This is the class to implement to a...
tx_queue * A_TX_queue
Queue accessed to stream data from A frontend.
int process(float2 **__restrict__ in, float2 **__restrict__ out)
PAcket handler for DSP class. This method process information pointed by the in parameter and write t...
rx_queue * stream_queue
Pointer to the TCP streaming queue, initialized with the class. Output of every frontend rx dsp proce...
void set_this_thread_name(std::string thread_name)
Set the htread name reported in the logging.
preallocator< float2 > * rx_output_memory
Pointer to the output memory allocator of all frontends.
bool preset_usrp(usrp_param *requested_config)
Set the USRP device with user parameters.
bool file_writing
Enables server local file writing.
bool check_B_rx_status(bool verbose=false)
Check the status of B rx operations.
void update_pointers(rx_queue *init_stream_queue, preallocator< float2 > *init_memory)
bool diagnostic
Enables diagnostic info on output.
void close_tx()
Close all the tx streamer threads.
void start_tx(threading_condition *wait_condition, int thread_op, param *current_settings, char front_end, preallocator< float2 > *memory=NULL)
Start a transmission thread. The threads started by this function do two things: pop a packet from th...
void close_rx()
Close all the rx streamer threads.
bool stop(bool force=false)
bool check_A_tx_status(bool verbose=false)
Check the status of A tx operations.
void print_debug(std::string text, double value)
void start(usrp_param *global_params)
bool stop(bool force=false)
void print_warning(std::string text)
void update_pointers(rx_queue *init_queue, preallocator< float2 > *init_memory)
void connect(int init_tcp_port)
rx_queue * A_RX_queue
Queue accessed to retrive data from A frontend.
TXRX(server_settings *settings, hardware_manager *init_hardware, bool diagnostic_init=false)
Initialization method requires an already initialized hardware manager class and an already initializ...
Manages the hardware I/O of one usrp unit.
bool start(param *current_settings)
uhd::usrp::multi_usrp::sptr main_usrp
rx_queue * B_RX_queue
Queue accessed to retrive data from B frontend.