USRP_Server  2.0
A flexible, GPU-accelerated radio-frequency readout software.
USRP_server_network.cpp
Go to the documentation of this file.
2 
3 std::atomic<bool> reconnect_data; //when th async server detects a disconnection of the API make the sync thread reconnect
4 std::atomic<bool> reconnect_async; // same thing used whe the exception is caught on the sync thread
5 
6 Sync_server::Sync_server(rx_queue* init_stream_queue, preallocator<float2>* init_memory,bool init_passthrough){
7  passthrough = init_passthrough;
8  stream_queue = init_stream_queue;
9  if(passthrough){
10  out_queue = new rx_queue(SECONDARY_STREAM_QUEUE_LENGTH);
11  }
12  virtual_pinger_online = false;
13  memory = init_memory;
14  NET_IS_CONNECTED = false;
15  NET_IS_STREAMING = false;
16  verbose = true;
17  option = new boost::asio::socket_base::reuse_address(true);
18 }
19 
20 //update pointers in case of memory swapping in TXRX class
21 void Sync_server::update_pointers(rx_queue* init_stream_queue, preallocator<float2>* init_memory){
22  BOOST_LOG_TRIVIAL(info) << "Updating sync server memory pointers";
23  memory = init_memory;
24  stream_queue = init_stream_queue;
25 }
26 
27 void Sync_server::connect(int init_tcp_port){
28 
29  BOOST_LOG_TRIVIAL(info) << "Connecting sync protocol...";
30  boost::asio::socket_base::reuse_address ciao(true);
31  if(verbose)std::cout<<"Waiting for TCP data connection on port: "<< init_tcp_port<<" ..."<<std::endl;
32  //std::this_thread::sleep_for(std::chrono::milliseconds(3000));
33  if(io_service == nullptr){
34  io_service = new boost::asio::io_service;
35  }
36  acceptor = new tcp::acceptor(*io_service, tcp::endpoint(tcp::v4(), init_tcp_port),true);
37  acceptor->set_option(ciao);
38  socket = new tcp::socket(*io_service);
39  acceptor->accept(*socket);
40  BOOST_LOG_TRIVIAL(info) << "Connected";
41  if(verbose)std::cout<<"TCP data connection status update: Connected."<< std::endl;
42  NET_IS_CONNECTED = true;
43  NEED_RECONNECT = false;
44  if(not virtual_pinger_online){
45  BOOST_LOG_TRIVIAL(info) << "Launching virtual pinger...";
46  virtual_pinger_thread = new boost::thread(boost::bind(&Sync_server::virtual_pinger,this));
47  }else{
48  BOOST_LOG_TRIVIAL(warning) << "No virtual pinger will be launched";
49  }
50  BOOST_LOG_TRIVIAL(info) << "Sync protocol connected";
51 }
52 
53 void Sync_server::reconnect(int init_tcp_port){
54  BOOST_LOG_TRIVIAL(debug) << "debug message from Sync_server::reconnect";
55  set_this_thread_name("TCP streamer"); // Just for clarity in the logs.
56  // This function is called as a thread so it's not blocking the configuration of other stuff,
57  BOOST_LOG_TRIVIAL(debug) << "Sync protocol reconnect() fcn called (temporary thread)";
58  //std::this_thread::sleep_for(std::chrono::milliseconds(3000));
59  stop(true);
60  //delete io_service;
61  if(io_service != nullptr){
62  io_service->reset();
63  }
64  delete acceptor;
65  delete socket;
66  connect(init_tcp_port);
67  BOOST_LOG_TRIVIAL(debug) << "Reconnect() joined/terminated";
68 }
69 
70 bool Sync_server::start(param* current_settings){
71  force_close = false;
72  if (NEED_RECONNECT){
73  print_warning("Before start streaming, data soket has to be reconnected.");
74  BOOST_LOG_TRIVIAL(error) << "Sync protocol started with NEED_RECONNECT: "<< NEED_RECONNECT;
75  while(NEED_RECONNECT)std::this_thread::sleep_for(std::chrono::milliseconds(50));
76  }
77  if (NET_IS_CONNECTED){
78  BOOST_LOG_TRIVIAL(info) << "Starting TCP worker...";
79  TCP_worker = new boost::thread(boost::bind(&Sync_server::tcp_streamer,this, current_settings));
80  }else{
81  BOOST_LOG_TRIVIAL(warning) << "Cannot start sync protocol if server is not connected. NET_IS_CONNECTED state "<<NET_IS_CONNECTED ;
82  print_error("Cannot stream data without a connected socket!");
83  return false;
84  }
85  return true;
86 }
87 
88 //gracefully stop streaming or check streaming status
89 bool Sync_server::stop(bool force){
90  if(NET_IS_CONNECTED and force){
91  BOOST_LOG_TRIVIAL(warning) << "forcing stop of sync data thread";
92  force_close = true;
93  TCP_worker->interrupt();
94  TCP_worker->join();
95  delete TCP_worker;
96  TCP_worker = nullptr;
97  return NET_IS_STREAMING;
98  }else if(NET_IS_CONNECTED){
99  force_close = false;
100  TCP_worker->interrupt();
101  if(not NET_IS_STREAMING){
102  TCP_worker->join();
103  delete TCP_worker;
104  TCP_worker = nullptr;
105  }
106  return NET_IS_STREAMING;
107  }
108  print_warning("Chekcing streaming status on disconnected socket");
109  BOOST_LOG_TRIVIAL(warning) << "Chekcing sync data protocol status on disconnected socket";
110  return false;
111 }
112 
114  return NET_IS_STREAMING;
115 }
116 
117 // @brief Clean the queue using the associated memory preallocator.
118 // Also checks for remaining packets in the queue and trash them.
119 // NOTE: does not close the preallocator.
121  BOOST_LOG_TRIVIAL(info) << "Clearing stream queue of Sync data protocol";
122  int i = 0;
123  RX_wrapper trash_packet;
124  trash_packet.buffer = nullptr;
125  while(!q->empty()){
126  i++;
127  q->pop(trash_packet);
128  memory->trash(trash_packet.buffer);
129  }
130  if (i>0){BOOST_LOG_TRIVIAL(warning) << "clearing sync data protocol queue discarded "<< i << " packets.";}
131  return i;
132 }
133 
134 //periodically check the status of the async thread to determine if there is needing to reconnect
135 void Sync_server::virtual_pinger(){
136  set_this_thread_name("Virtual pinger");
137  BOOST_LOG_TRIVIAL(debug) << "Protocol synchronization thread started";
138  bool active = true;
139  virtual_pinger_online = true;
140  while(active){
141  std::this_thread::sleep_for(std::chrono::milliseconds(700));
142  try{
143  if(reconnect_data){
144  BOOST_LOG_TRIVIAL(info) << "reconnect_data state is: "<< reconnect_data;
145  std::this_thread::sleep_for(std::chrono::milliseconds(30));
146  reconnect_data = false; //twice to avoid data race with async
147  NEED_RECONNECT = true;
148  NET_IS_CONNECTED = false;
149  BOOST_LOG_TRIVIAL(info) << "launching reconnect now";
150  virtual_pinger_online = false;
152  reconnect_data = false;
153  active = false;
154  }
155  }catch(boost::thread_interrupted &){
156  active = false;
157  }
158  }
159 
160  BOOST_LOG_TRIVIAL(info) << "Joined";
161 }
162 //size_t ilen = 0;
163 //This function serialize a net_buffer struct into a boost buffer.
164 void Sync_server::format_net_buffer(RX_wrapper input_packet, char* __restrict__ output_buffer){
165 
166  //where to write in the buffer
167  int offset = 0;
168 
169  memcpy(&output_buffer[offset], &input_packet.usrp_number, sizeof(input_packet.usrp_number));
170  offset += sizeof(input_packet.usrp_number);
171 
172  memcpy(&output_buffer[offset], &input_packet.front_end_code, sizeof(input_packet.front_end_code));
173  offset += sizeof(input_packet.front_end_code);
174 
175  memcpy(&output_buffer[offset], &input_packet.packet_number, sizeof(input_packet.packet_number));
176  offset += sizeof(input_packet.packet_number);
177 
178  memcpy(&output_buffer[offset], &input_packet.length, sizeof(input_packet.length));
179  offset += sizeof(input_packet.length);
180 
181  memcpy(&output_buffer[offset], &input_packet.errors, sizeof(input_packet.errors));
182  offset += sizeof(input_packet.errors);
183 
184  memcpy(&output_buffer[offset], &input_packet.channels, sizeof(input_packet.channels));
185  offset += sizeof(input_packet.channels);
186 
187  memcpy(&output_buffer[offset], input_packet.buffer, input_packet.length * 2 * sizeof(float));
188  //ilen+=input_packet.length/input_packet.channels;
189  //std::cout<<"Streaming packet: "<< input_packet.packet_number <<" acc_samp: "<< ilen<<std::endl;
190 
191 }
192 
193 
194 //THIS FUNCTION IS INTENDED TO BE LUNCHED AS A SEPARATE THREAD
195 void Sync_server::tcp_streamer(param* current_settings){
196 
197  set_this_thread_name("TCP streamer");
198  BOOST_LOG_TRIVIAL(debug) << "Thread started";
199 
200  //Packet to be serialized and sent
201  RX_wrapper incoming_packet;
202 
203  //The python API will first download the header containing the metadata
204  //(including the length of the buffer). The header has fixed size.
205  int header_size = sizeof(incoming_packet.usrp_number) + sizeof(incoming_packet.front_end_code);
206  header_size += sizeof(incoming_packet.packet_number) + sizeof(incoming_packet.length);
207  header_size += sizeof(incoming_packet.errors) + sizeof(incoming_packet.channels);
208 
209 
210  //maximum transmission buffer size is only reached when no decimation is applied.
211  //To avoid error the support memory to the transmission buffer will be oversized.
212  size_t max_size = current_settings->data_mem_mult *current_settings->buffer_len * 2 * sizeof(float) + header_size;
213 
214  //buffer for serializing the network packet
215  char *fullData = ( char *)std::malloc(max_size+1);
216 
217  //additional check for debugging
218  if(current_settings->buffer_len * current_settings->data_mem_mult <= 0){
219  std::stringstream ss;
220  ss<<std::string("Sync data thread: ")<<std::string("Maximum buffer length is <= 0 in the TCP streamer. This is not allowed");
221  print_error(ss.str());
222  NEED_RECONNECT = true;
223  NET_IS_CONNECTED = false;
224  BOOST_LOG_TRIVIAL(error) << "Maximum buffer length is <= 0 in the TCP streamer. This is not allowed";
225  return;
226  }
227 
228  //some error handling
229  boost::system::error_code ignored_error;
230 
231  bool active = true;
232  bool finishing = true;
233 
234  NET_IS_STREAMING = true;
235 
236  BOOST_LOG_TRIVIAL(info) << "main loop started";
237  while(active or finishing){
238  if(reconnect_data){
239  active = false;
240  finishing = false;
241  }
242  try{
243  boost::this_thread::interruption_point();
244  if(stream_queue->pop(incoming_packet)){
245  //calculate total size to be transmitted
246  int total_size = header_size + incoming_packet.length * 2 * sizeof(float);
247  //std::cout<<"streaming "<<incoming_packet.length<<" samples"<<std::endl;
248 
249  //setrialize data structure in a char buffer
250  boost::this_thread::interruption_point();
251  format_net_buffer(incoming_packet, fullData);
252 
253  if(not NEED_RECONNECT){
254  try{
255  //send data structure
256  boost::asio::write(*socket, boost::asio::buffer(fullData,total_size),boost::asio::transfer_all(), ignored_error);
257 
258  }catch(std::exception &e){
259  std::stringstream ss;
260  ss<<"Sync data thread: "<<e.what();
261  print_error(ss.str());
262  active = false;
263  NET_IS_STREAMING = false;
264  //reconnect_data = true;
265  std::cout<<e.what()<<std::endl;
266  }
267  if (ignored_error!=boost::system::errc::success and ignored_error){
268  std::stringstream ss;
269  ss<<std::string("Sync data thread: ")<<std::string(ignored_error.message());
270  print_error(ss.str());
271  NEED_RECONNECT = true;
272  NET_IS_CONNECTED = false;
273  BOOST_LOG_TRIVIAL(warning) << "Something's wrong with the packet terminating transmission";
274  active = false;
275  }
276  }
277  if(passthrough){
278  while(not out_queue->push(incoming_packet))std::this_thread::sleep_for(std::chrono::milliseconds(5));
279  }else{
280  memory->trash(incoming_packet.buffer);
281  }
282 
283 
284  }else{
285  //else wait for packets
286  std::this_thread::sleep_for(std::chrono::milliseconds(1));
287  }
288 
289  if(not active)finishing = not stream_queue->empty();
290 
291  }catch(boost::thread_interrupted &){
292  BOOST_LOG_TRIVIAL(info) << "Interrupt called to stop thread";
293  active = false;
294  if (not force_close){
295  //finishing = not stream_queue->empty();
296  ;
297  }else{
298  finishing = false;
299  BOOST_LOG_TRIVIAL(info) << "finishing transmission";
300  }
301  }
302  }
303 
304  free(fullData);
305  NET_IS_STREAMING = false;
306  BOOST_LOG_TRIVIAL(info) << "Thread joining";
307  pLogSink->flush(); //If the sink is not flushed something bad happens when the thread joins
308 }
309 
310 char* format_error(){
311  char* error = nullptr;
312  return error;
313 }
314 
316  char* error = nullptr;
317  return error;
318 }
319 
320 //allocates memory for char*, print the message in it and returns the pointer
321 char* format_parameter(usrp_param *parameters, bool response){
322  char* error = nullptr;
323  return error;
324 }
325 
326 usrp_param json_2_parameters(std::string message){
327  usrp_param parameters;
328  return parameters;
329 }
330 
331 //convert (and define) the action requrest code into a enumerator used by the server to decide what to do
333  servr_action what_2_do;
334  switch(code){
335 
336  case 0:
337  what_2_do = START;
338  break;
339 
340  case 1:
341  what_2_do = STOP;
342  break;
343 
344  case 2:
345  what_2_do = FORCE_STOP;
346  break;
347 
348  case 3:
349  what_2_do = RESET_USRP;
350  break;
351 
352  case 4:
353  what_2_do = STATUS_REQUEST;
354  break;
355 
356  case 5:
357  what_2_do = INFO_REQUEST;
358  break;
359 
360  default:
361  what_2_do = NOTHING;
362  break;
363  }
364  return what_2_do;
365 }
366 
367 
368 
370  return ASYNC_SERVER_CONNECTED;
371 }
372 
373 Async_server::Async_server(bool init_verbose){
374  BOOST_LOG_TRIVIAL(info) << "Initializing async protocol thread";
375  reconnect_async = false;
376  verbose = init_verbose;
377 
378  //connect the async server
379  ASYNC_SERVER_CONNECTED = false;
381 
382  //create the message queue
383  command_queue = new async_queue(0);
384  response_queue = new async_queue(0);
385 
386  //start the server
387  TCP_async_worker_RX = new boost::thread(boost::bind(&Async_server::rx_async,this,command_queue));
388  TCP_async_worker_TX = new boost::thread(boost::bind(&Async_server::tx_async,this,response_queue));
389 
390 
391 }
392 
394  print_debug("async interrupt size: ",socket->available());
395  return socket->available()>0?true:false;
396 }
397 
398 //blocks until it pushes the pointer in the async transmit queue
399 void Async_server::send_async(std::string* message){
400  BOOST_LOG_TRIVIAL(info) << "Sending message to client application";
401  if(ASYNC_SERVER_CONNECTED){
402  while(not response_queue->push(message))std::this_thread::sleep_for(std::chrono::microseconds(25));
403  }else{
404  print_warning("Cannot sent async message, interface disconnected.");
405  BOOST_LOG_TRIVIAL(warning) << "Cannot sent async message, interface disconnected";
406  delete message;
407  }
408 };
409 
410 //return true if there is a message and points to it
411 bool Async_server::recv_async(usrp_param &my_parameter, bool blocking){
412  //BOOST_LOG_TRIVIAL(warning) << "Decoding async message";
413  if(not ASYNC_SERVER_CONNECTED){
414  print_warning("Async server is not connected, cannot receive messages.");
415  return false;
416  }
417  bool res = false;
418  std::string* message_string;
419  if (blocking){
420  while(not command_queue->pop(message_string)) std::this_thread::sleep_for(std::chrono::milliseconds(50));
421  res = string2param(*message_string, my_parameter);
422  // Get rid of the memory associated with the string.
423  //BOOST_LOG_TRIVIAL(warning) << "(blocking) about to delete current command string";
424  delete message_string;
425  //BOOST_LOG_TRIVIAL(warning) << "(blocking) string deleted";
426 
427  }else{
428 
429  if(command_queue->pop(message_string)){
430  //interpreter goes here
431  res = string2param(*message_string, my_parameter);
432  // Get rid of the memory associated with the string.
433  //BOOST_LOG_TRIVIAL(warning) << "(non-blocking) about to delete current command string";
434  delete message_string;
435  //BOOST_LOG_TRIVIAL(warning) << "(non-blocking) string deleted";
436  }
437  }
438 
439  return res;
440 }
441 
442 
443 
444 //connect to the async server
445 void Async_server::connect(int init_tcp_port){
446  BOOST_LOG_TRIVIAL(info) << "Connecting async service";
447  reconnect_async = false;
448  if(verbose)std::cout<<"Waiting for TCP async data connection on port: "<< init_tcp_port<<" ..."<<std::endl;;
449  boost::asio::socket_base::reuse_address ciao(true);
450  if(io_service == nullptr){
451  io_service = new boost::asio::io_service;
452  }
453  acceptor = new tcp::acceptor(*io_service, tcp::endpoint(tcp::v4(), init_tcp_port),true);
454  acceptor->set_option(ciao);
455 
456  socket = new tcp::socket(*io_service);
457 
458  acceptor->accept(*socket);
459  //socket->set_option(ciao);
460  if(verbose)std::cout<<"Async TCP connection update: Connected."<< std::endl;
461  ASYNC_SERVER_CONNECTED = true;
462  BOOST_LOG_TRIVIAL(info) << "State ASYNC_SERVER_CONNECTED is: "<< ASYNC_SERVER_CONNECTED;
463  }
464 
465 void Async_server::Disconnect(){
466  BOOST_LOG_TRIVIAL(info) << "Disconnecting async service";
467  //delete io_service;
468  if(io_service != nullptr){
469  io_service->reset();
470  }
471  delete acceptor;
472  delete socket;
473  ASYNC_SERVER_CONNECTED = false;
474 }
475 
476 void Async_server::Reconnect(){
477  BOOST_LOG_TRIVIAL(debug) << "Reconnecting async service";
478  reconnect_data = true;
479  reconnect_async = false;
480  ASYNC_SERVER_CONNECTED = false;
482  TCP_async_worker_RX = new boost::thread(boost::bind(&Async_server::rx_async,this,command_queue));
483  TCP_async_worker_TX = new boost::thread(boost::bind(&Async_server::tx_async,this,response_queue));
484 }
485 
486 //returns the number of bytes in the next async message
487 int Async_server::check_header(char* init_header){
488  int* code_check = reinterpret_cast<int*>(init_header);
489  if(code_check[0] == 0){
490  return code_check[1];
491  }
492  return 0;
493 }
494 
495 //format the header to be coherent with msg length and initialization code
496 //returns the size in byte to send
497 void Async_server::format_header(char* header, std::string* message){
498  int *head = reinterpret_cast<int*>(header);
499  head[0]=0;
500  head[1]=message->length();
501 }
502 
503 
504 void Async_server::rx_async(async_queue* link_command_queue){
505  set_this_thread_name("Async TCP RX");
506  BOOST_LOG_TRIVIAL(debug) << "Thread started";
507  //preallocate space for the fixed header
508  char* header_buffer;
509  header_buffer = (char*)malloc(2*sizeof(int));
510  //declare the message buffer
511  char* message_buffer;
512  std::string* message_string;
513  bool active = true;
514 
515  boost::system::error_code error;
516 
517  while(active){
518  BOOST_LOG_TRIVIAL(info) << "main loop started";
519  std::this_thread::sleep_for(std::chrono::milliseconds(2));
520  try{
521  boost::this_thread::interruption_point();
522  *header_buffer = {0};
523  boost::asio::read(
524  *socket,
525  boost::asio::buffer(header_buffer,2*sizeof(int)),
526  boost::asio::transfer_all(),
527  error
528  );
529 
530  int size = error != boost::system::errc::success?0:(reinterpret_cast<int*>(header_buffer))[1];
531  if ((reinterpret_cast<int*>(header_buffer))[0]!=0){
532  print_warning("Corrupted async header detected!");
533  }
534  if (error == boost::system::errc::success){
535  //std::cout<<"Header size is "<<size<<std::endl;
536 
537  //allocates the message buffer
538  message_buffer = (char*)malloc(size);
539  //*message_buffer = {0};
540 
541  //std::cout<<message_buffer<<std::endl;
542  boost::asio::read(
543  *socket,
544  boost::asio::buffer(message_buffer,size),
545  boost::asio::transfer_all(),
546  error
547  );
548 
549  if (error == boost::system::errc::success){
550  BOOST_LOG_TRIVIAL(info) << "Succesfully received info from client application";
551  message_string = new std::string(message_buffer,size);
552 
553  // The buffer has been received, push the message in the queue
554  while(not link_command_queue->push(message_string))std::this_thread::sleep_for(std::chrono::microseconds(25));
555  BOOST_LOG_TRIVIAL(info) << "Message pushed in command queue";
556 
557 
558  }else{
559  BOOST_LOG_TRIVIAL(warning) << "Error received in boost::asio::read";
560  std::stringstream ss;
561  ss<<"Async RX server side encountered a payload problem: "<<error.message()<<std::endl;
562  reconnect_data = true;
563  print_warning(ss.str());
564  active = false;
565  ASYNC_SERVER_CONNECTED = false;
566  Disconnect();
567  Reconnect();
568 
569  }
570  }else{
571 
572  std::stringstream ss;
573  ss<<"Async RX server side encountered a header problem: "<<error.message()<<std::endl;
574  reconnect_data = true;
575  print_warning(ss.str());
576  active = false;
577  ASYNC_SERVER_CONNECTED = false;
578  Disconnect();
579  Reconnect();
580 
581  }
582  if(reconnect_async and active){
583  active = false;
584  ASYNC_SERVER_CONNECTED = false;
585  Disconnect();
586  Reconnect();
587 
588  }
589  if(not ASYNC_SERVER_CONNECTED)active = false;
590 
591  }catch(boost::thread_interrupted &){
592  active = false;
593  }
594  }
595 
596  free(header_buffer);
597 }
598 
599 void Async_server::tx_async(async_queue* response_queue_link){
600 
601  bool active = true;
602 
603  std::string* message;
604 
605  //allocate header space
606  char* header = (char*)malloc(2*sizeof(int));
607 
608  //some error handling
609  boost::system::error_code ignored_error;
610 
611  while(active){
612 
613  try{
614 
615  boost::this_thread::interruption_point();
616 
617  if(response_queue_link->pop(message)){
618 
619  //format the header
620  format_header(header, message);
621 
622  //send the header
623  boost::asio::write(*socket, boost::asio::buffer(header,2*sizeof(int)),boost::asio::transfer_all(), ignored_error);
624 
625  //send message
626  boost::asio::write(*socket, boost::asio::buffer(*message),boost::asio::transfer_all(), ignored_error);
627 
628  //check error
629  if (ignored_error!=boost::system::errc::success){
630  std::stringstream ss;
631  ss<<"Async tx error: "<<ignored_error.message()<<std::endl;
632  print_warning(ss.str());
633  }
634 
635  //release the message memory
636  delete message;
637 
638  }else{
639  std::this_thread::sleep_for(std::chrono::milliseconds(50));
640  }
641  }catch(boost::thread_interrupted &){
642  active = false;
643  }
644 
645  if(not ASYNC_SERVER_CONNECTED)active = false;
646  }
647 
648  free(header);
649 }
std::atomic< bool > reconnect_data
bool string2param(std::string data, usrp_param &my_parameter)
usrp_param json_2_parameters(std::string message)
std::atomic< bool > NET_IS_STREAMING
void trash(vector_type *trash_vector)
boost::shared_ptr< file_sink > pLogSink
Shared pointer to the logfile writer object.
char * format_parameter(usrp_param *parameters, bool response)
int TCP_SYNC_PORT
void reconnect(int init_tcp_port)
void print_error(std::string text)
std::atomic< bool > reconnect_async
boost::lockfree::queue< std::string *> async_queue
preallocator< float2 > * memory
rx_queue * out_queue
int clear_stream_queue(rx_queue *q, preallocator< float2 > *memory)
Async_server(bool init_verbose=false)
std::atomic< bool > NET_IS_CONNECTED
void set_this_thread_name(std::string thread_name)
Set the htread name reported in the logging.
bool recv_async(usrp_param &my_parameter, bool blocking=true)
void update_pointers(rx_queue *init_stream_queue, preallocator< float2 > *init_memory)
char * format_status()
rx_queue * stream_queue
servr_action code_2_server_action(int code)
Sync_server(rx_queue *init_stream_queue, preallocator< float2 > *init_memory, bool init_passthrough=false)
void print_debug(std::string text, double value)
bool stop(bool force=false)
void print_warning(std::string text)
void connect(int init_tcp_port)
int TCP_ASYNC_PORT
bool start(param *current_settings)
void send_async(std::string *message)
char * format_error()