10 out_queue =
new rx_queue(SECONDARY_STREAM_QUEUE_LENGTH);
12 virtual_pinger_online =
false;
17 option =
new boost::asio::socket_base::reuse_address(
true);
22 BOOST_LOG_TRIVIAL(info) <<
"Updating sync server memory pointers";
29 BOOST_LOG_TRIVIAL(info) <<
"Connecting sync protocol...";
30 boost::asio::socket_base::reuse_address ciao(
true);
31 if(
verbose)std::cout<<
"Waiting for TCP data connection on port: "<< init_tcp_port<<
" ..."<<std::endl;
33 if(io_service ==
nullptr){
34 io_service =
new boost::asio::io_service;
36 acceptor =
new tcp::acceptor(*io_service, tcp::endpoint(tcp::v4(), init_tcp_port),
true);
37 acceptor->set_option(ciao);
38 socket =
new tcp::socket(*io_service);
39 acceptor->accept(*socket);
40 BOOST_LOG_TRIVIAL(info) <<
"Connected";
41 if(
verbose)std::cout<<
"TCP data connection status update: Connected."<< std::endl;
44 if(not virtual_pinger_online){
45 BOOST_LOG_TRIVIAL(info) <<
"Launching virtual pinger...";
46 virtual_pinger_thread =
new boost::thread(boost::bind(&Sync_server::virtual_pinger,
this));
48 BOOST_LOG_TRIVIAL(warning) <<
"No virtual pinger will be launched";
50 BOOST_LOG_TRIVIAL(info) <<
"Sync protocol connected";
54 BOOST_LOG_TRIVIAL(debug) <<
"debug message from Sync_server::reconnect";
57 BOOST_LOG_TRIVIAL(debug) <<
"Sync protocol reconnect() fcn called (temporary thread)";
61 if(io_service !=
nullptr){
67 BOOST_LOG_TRIVIAL(debug) <<
"Reconnect() joined/terminated";
73 print_warning(
"Before start streaming, data soket has to be reconnected.");
74 BOOST_LOG_TRIVIAL(error) <<
"Sync protocol started with NEED_RECONNECT: "<<
NEED_RECONNECT;
75 while(
NEED_RECONNECT)std::this_thread::sleep_for(std::chrono::milliseconds(50));
78 BOOST_LOG_TRIVIAL(info) <<
"Starting TCP worker...";
79 TCP_worker =
new boost::thread(boost::bind(&Sync_server::tcp_streamer,
this, current_settings));
81 BOOST_LOG_TRIVIAL(warning) <<
"Cannot start sync protocol if server is not connected. NET_IS_CONNECTED state "<<
NET_IS_CONNECTED ;
82 print_error(
"Cannot stream data without a connected socket!");
91 BOOST_LOG_TRIVIAL(warning) <<
"forcing stop of sync data thread";
93 TCP_worker->interrupt();
100 TCP_worker->interrupt();
104 TCP_worker =
nullptr;
108 print_warning(
"Chekcing streaming status on disconnected socket");
109 BOOST_LOG_TRIVIAL(warning) <<
"Chekcing sync data protocol status on disconnected socket";
121 BOOST_LOG_TRIVIAL(info) <<
"Clearing stream queue of Sync data protocol";
123 RX_wrapper trash_packet;
124 trash_packet.buffer =
nullptr;
127 q->pop(trash_packet);
128 memory->
trash(trash_packet.buffer);
130 if (i>0){BOOST_LOG_TRIVIAL(warning) <<
"clearing sync data protocol queue discarded "<< i <<
" packets.";}
135 void Sync_server::virtual_pinger(){
137 BOOST_LOG_TRIVIAL(debug) <<
"Protocol synchronization thread started";
139 virtual_pinger_online =
true;
141 std::this_thread::sleep_for(std::chrono::milliseconds(700));
144 BOOST_LOG_TRIVIAL(info) <<
"reconnect_data state is: "<<
reconnect_data;
145 std::this_thread::sleep_for(std::chrono::milliseconds(30));
149 BOOST_LOG_TRIVIAL(info) <<
"launching reconnect now";
150 virtual_pinger_online =
false;
155 }
catch(boost::thread_interrupted &){
160 BOOST_LOG_TRIVIAL(info) <<
"Joined";
164 void Sync_server::format_net_buffer(RX_wrapper input_packet,
char* __restrict__ output_buffer){
169 memcpy(&output_buffer[offset], &input_packet.usrp_number,
sizeof(input_packet.usrp_number));
170 offset +=
sizeof(input_packet.usrp_number);
172 memcpy(&output_buffer[offset], &input_packet.front_end_code,
sizeof(input_packet.front_end_code));
173 offset +=
sizeof(input_packet.front_end_code);
175 memcpy(&output_buffer[offset], &input_packet.packet_number,
sizeof(input_packet.packet_number));
176 offset +=
sizeof(input_packet.packet_number);
178 memcpy(&output_buffer[offset], &input_packet.length,
sizeof(input_packet.length));
179 offset +=
sizeof(input_packet.length);
181 memcpy(&output_buffer[offset], &input_packet.errors,
sizeof(input_packet.errors));
182 offset +=
sizeof(input_packet.errors);
184 memcpy(&output_buffer[offset], &input_packet.channels,
sizeof(input_packet.channels));
185 offset +=
sizeof(input_packet.channels);
187 memcpy(&output_buffer[offset], input_packet.buffer, input_packet.length * 2 *
sizeof(
float));
195 void Sync_server::tcp_streamer(param* current_settings){
198 BOOST_LOG_TRIVIAL(debug) <<
"Thread started";
201 RX_wrapper incoming_packet;
205 int header_size =
sizeof(incoming_packet.usrp_number) +
sizeof(incoming_packet.front_end_code);
206 header_size +=
sizeof(incoming_packet.packet_number) +
sizeof(incoming_packet.length);
207 header_size +=
sizeof(incoming_packet.errors) +
sizeof(incoming_packet.channels);
212 size_t max_size = current_settings->data_mem_mult *current_settings->buffer_len * 2 *
sizeof(float) + header_size;
215 char *fullData = (
char *)std::malloc(max_size+1);
218 if(current_settings->buffer_len * current_settings->data_mem_mult <= 0){
219 std::stringstream ss;
220 ss<<std::string(
"Sync data thread: ")<<std::string(
"Maximum buffer length is <= 0 in the TCP streamer. This is not allowed");
224 BOOST_LOG_TRIVIAL(error) <<
"Maximum buffer length is <= 0 in the TCP streamer. This is not allowed";
229 boost::system::error_code ignored_error;
232 bool finishing =
true;
236 BOOST_LOG_TRIVIAL(info) <<
"main loop started";
237 while(active or finishing){
243 boost::this_thread::interruption_point();
246 int total_size = header_size + incoming_packet.length * 2 *
sizeof(float);
250 boost::this_thread::interruption_point();
251 format_net_buffer(incoming_packet, fullData);
256 boost::asio::write(*socket, boost::asio::buffer(fullData,total_size),boost::asio::transfer_all(), ignored_error);
258 }
catch(std::exception &e){
259 std::stringstream ss;
260 ss<<
"Sync data thread: "<<e.what();
265 std::cout<<e.what()<<std::endl;
267 if (ignored_error!=boost::system::errc::success and ignored_error){
268 std::stringstream ss;
269 ss<<std::string(
"Sync data thread: ")<<std::string(ignored_error.message());
271 NEED_RECONNECT =
true;
273 BOOST_LOG_TRIVIAL(warning) <<
"Something's wrong with the packet terminating transmission";
278 while(not
out_queue->push(incoming_packet))std::this_thread::sleep_for(std::chrono::milliseconds(5));
286 std::this_thread::sleep_for(std::chrono::milliseconds(1));
291 }
catch(boost::thread_interrupted &){
292 BOOST_LOG_TRIVIAL(info) <<
"Interrupt called to stop thread";
294 if (not force_close){
299 BOOST_LOG_TRIVIAL(info) <<
"finishing transmission";
306 BOOST_LOG_TRIVIAL(info) <<
"Thread joining";
311 char* error =
nullptr;
316 char* error =
nullptr;
322 char* error =
nullptr;
327 usrp_param parameters;
370 return ASYNC_SERVER_CONNECTED;
374 BOOST_LOG_TRIVIAL(info) <<
"Initializing async protocol thread";
379 ASYNC_SERVER_CONNECTED =
false;
387 TCP_async_worker_RX =
new boost::thread(boost::bind(&Async_server::rx_async,
this,command_queue));
388 TCP_async_worker_TX =
new boost::thread(boost::bind(&Async_server::tx_async,
this,response_queue));
394 print_debug(
"async interrupt size: ",socket->available());
395 return socket->available()>0?
true:
false;
400 BOOST_LOG_TRIVIAL(info) <<
"Sending message to client application";
401 if(ASYNC_SERVER_CONNECTED){
402 while(not response_queue->push(message))std::this_thread::sleep_for(std::chrono::microseconds(25));
404 print_warning(
"Cannot sent async message, interface disconnected.");
405 BOOST_LOG_TRIVIAL(warning) <<
"Cannot sent async message, interface disconnected";
413 if(not ASYNC_SERVER_CONNECTED){
414 print_warning(
"Async server is not connected, cannot receive messages.");
418 std::string* message_string;
420 while(not command_queue->pop(message_string)) std::this_thread::sleep_for(std::chrono::milliseconds(50));
424 delete message_string;
429 if(command_queue->pop(message_string)){
434 delete message_string;
445 void Async_server::connect(
int init_tcp_port){
446 BOOST_LOG_TRIVIAL(info) <<
"Connecting async service";
448 if(
verbose)std::cout<<
"Waiting for TCP async data connection on port: "<< init_tcp_port<<
" ..."<<std::endl;;
449 boost::asio::socket_base::reuse_address ciao(
true);
450 if(io_service ==
nullptr){
451 io_service =
new boost::asio::io_service;
453 acceptor =
new tcp::acceptor(*io_service, tcp::endpoint(tcp::v4(), init_tcp_port),
true);
454 acceptor->set_option(ciao);
456 socket =
new tcp::socket(*io_service);
458 acceptor->accept(*socket);
460 if(
verbose)std::cout<<
"Async TCP connection update: Connected."<< std::endl;
461 ASYNC_SERVER_CONNECTED =
true;
462 BOOST_LOG_TRIVIAL(info) <<
"State ASYNC_SERVER_CONNECTED is: "<< ASYNC_SERVER_CONNECTED;
465 void Async_server::Disconnect(){
466 BOOST_LOG_TRIVIAL(info) <<
"Disconnecting async service";
468 if(io_service !=
nullptr){
473 ASYNC_SERVER_CONNECTED =
false;
476 void Async_server::Reconnect(){
477 BOOST_LOG_TRIVIAL(debug) <<
"Reconnecting async service";
480 ASYNC_SERVER_CONNECTED =
false;
482 TCP_async_worker_RX =
new boost::thread(boost::bind(&Async_server::rx_async,
this,command_queue));
483 TCP_async_worker_TX =
new boost::thread(boost::bind(&Async_server::tx_async,
this,response_queue));
487 int Async_server::check_header(
char* init_header){
488 int* code_check =
reinterpret_cast<int*
>(init_header);
489 if(code_check[0] == 0){
490 return code_check[1];
497 void Async_server::format_header(
char* header, std::string* message){
498 int *head =
reinterpret_cast<int*
>(header);
500 head[1]=message->length();
504 void Async_server::rx_async(
async_queue* link_command_queue){
506 BOOST_LOG_TRIVIAL(debug) <<
"Thread started";
509 header_buffer = (
char*)malloc(2*
sizeof(
int));
511 char* message_buffer;
512 std::string* message_string;
515 boost::system::error_code error;
518 BOOST_LOG_TRIVIAL(info) <<
"main loop started";
519 std::this_thread::sleep_for(std::chrono::milliseconds(2));
521 boost::this_thread::interruption_point();
522 *header_buffer = {0};
525 boost::asio::buffer(header_buffer,2*
sizeof(
int)),
526 boost::asio::transfer_all(),
530 int size = error != boost::system::errc::success?0:(
reinterpret_cast<int*
>(header_buffer))[1];
531 if ((reinterpret_cast<int*>(header_buffer))[0]!=0){
534 if (error == boost::system::errc::success){
538 message_buffer = (
char*)malloc(size);
544 boost::asio::buffer(message_buffer,size),
545 boost::asio::transfer_all(),
549 if (error == boost::system::errc::success){
550 BOOST_LOG_TRIVIAL(info) <<
"Succesfully received info from client application";
551 message_string =
new std::string(message_buffer,size);
554 while(not link_command_queue->push(message_string))std::this_thread::sleep_for(std::chrono::microseconds(25));
555 BOOST_LOG_TRIVIAL(info) <<
"Message pushed in command queue";
559 BOOST_LOG_TRIVIAL(warning) <<
"Error received in boost::asio::read";
560 std::stringstream ss;
561 ss<<
"Async RX server side encountered a payload problem: "<<error.message()<<std::endl;
565 ASYNC_SERVER_CONNECTED =
false;
572 std::stringstream ss;
573 ss<<
"Async RX server side encountered a header problem: "<<error.message()<<std::endl;
577 ASYNC_SERVER_CONNECTED =
false;
584 ASYNC_SERVER_CONNECTED =
false;
589 if(not ASYNC_SERVER_CONNECTED)active =
false;
591 }
catch(boost::thread_interrupted &){
599 void Async_server::tx_async(
async_queue* response_queue_link){
603 std::string* message;
606 char* header = (
char*)malloc(2*
sizeof(
int));
609 boost::system::error_code ignored_error;
615 boost::this_thread::interruption_point();
617 if(response_queue_link->pop(message)){
620 format_header(header, message);
623 boost::asio::write(*socket, boost::asio::buffer(header,2*
sizeof(
int)),boost::asio::transfer_all(), ignored_error);
626 boost::asio::write(*socket, boost::asio::buffer(*message),boost::asio::transfer_all(), ignored_error);
629 if (ignored_error!=boost::system::errc::success){
630 std::stringstream ss;
631 ss<<
"Async tx error: "<<ignored_error.message()<<std::endl;
639 std::this_thread::sleep_for(std::chrono::milliseconds(50));
641 }
catch(boost::thread_interrupted &){
645 if(not ASYNC_SERVER_CONNECTED)active =
false;
std::atomic< bool > reconnect_data
bool string2param(std::string data, usrp_param &my_parameter)
usrp_param json_2_parameters(std::string message)
std::atomic< bool > NET_IS_STREAMING
void trash(vector_type *trash_vector)
boost::shared_ptr< file_sink > pLogSink
Shared pointer to the logfile writer object.
char * format_parameter(usrp_param *parameters, bool response)
void reconnect(int init_tcp_port)
void print_error(std::string text)
std::atomic< bool > reconnect_async
boost::lockfree::queue< std::string *> async_queue
preallocator< float2 > * memory
int clear_stream_queue(rx_queue *q, preallocator< float2 > *memory)
Async_server(bool init_verbose=false)
std::atomic< bool > NET_IS_CONNECTED
void set_this_thread_name(std::string thread_name)
Set the htread name reported in the logging.
bool recv_async(usrp_param &my_parameter, bool blocking=true)
void update_pointers(rx_queue *init_stream_queue, preallocator< float2 > *init_memory)
servr_action code_2_server_action(int code)
Sync_server(rx_queue *init_stream_queue, preallocator< float2 > *init_memory, bool init_passthrough=false)
void print_debug(std::string text, double value)
bool stop(bool force=false)
void print_warning(std::string text)
void connect(int init_tcp_port)
bool start(param *current_settings)
void send_async(std::string *message)