USRP_Server  2.0
A flexible, GPU-accelerated radio-frequency readout software.
USRP_server_network.hpp
Go to the documentation of this file.
1 #pragma once
2 #ifndef USRP_NET_INCLUDED
3 #define USRP_NET_INCLUDED
4 
9 #include <boost/asio.hpp>
10 using boost::asio::ip::tcp;
11 using boost::asio::ip::address;
12 
13 #define MSG_LENGHT 1e4 //Lenght og the buffer string of the server, should not influent since is in a loop
14 
15 extern std::atomic<bool> reconnect_data; //when th async server detects a disconnection of the API make the sync thread reconnect
16 extern std::atomic<bool> reconnect_async; // same thing used whe the exception is caught on the sync thread
17 
19 
20  public:
21 
22  //true if the udp socket is connected
23  std::atomic<bool> NET_IS_CONNECTED;
24 
25  //true when the server is streaming data
26  std::atomic<bool> NET_IS_STREAMING;
27 
28  bool NEED_RECONNECT = false;
29 
30  bool verbose;
32  rx_queue* stream_queue;
33  rx_queue* out_queue;
35 
36  Sync_server(rx_queue* init_stream_queue, preallocator<float2>* init_memory,bool init_passthrough = false);
37 
38  //update pointers in case of memory swapping in TXRX class
39  void update_pointers(rx_queue* init_stream_queue, preallocator<float2>* init_memory);
40 
41  void connect(int init_tcp_port);
42 
43  void reconnect(int init_tcp_port);
44 
45  bool start(param* current_settings);
46 
47  //gracefully stop streaming or check streaming status
48  bool stop(bool force = false);
49 
50  bool check_status();
51 
52  //Clean the queue using the associated memory preallocator. NOTE: does not close the preallocator.
53  int clear_stream_queue( rx_queue *q, preallocator<float2>* memory);
54 
55  private:
56 
57  boost::thread* TCP_worker;
58  boost::thread* reconnect_thread;
59  boost::thread* virtual_pinger_thread;
60  std::atomic<bool> virtual_pinger_online;
61 
62  boost::asio::io_service *io_service = nullptr;
63  boost::asio::socket_base::reuse_address *option;
64  tcp::acceptor *acceptor;
65  tcp::socket *socket;
66 
67  //periodically check the status of the async thread to determine if there is needing to reconnect
68  void virtual_pinger();
69  //size_t ilen = 0;
70  //This function serialize a net_buffer struct into a boost buffer.
71  void format_net_buffer(RX_wrapper input_packet, char* __restrict__ output_buffer);
72 
73  //this variable force the joining of the thread
74  std::atomic<bool> force_close;
75 
76  //THIS FUNCTION IS INTENDED TO BE LUNCHED AS A SEPARATE THREAD
77  void tcp_streamer(param* current_settings);
78 
79 };
80 
81 //typedef boost::lockfree::queue< char* > async_queue;
82 typedef boost::lockfree::queue< std::string* > async_queue;
83 
84 char* format_error();
85 
86 char* format_status();
87 
88 //allocates memory for char*, print the message in it and returns the pointer
89 char* format_parameter(usrp_param *parameters, bool response);
90 
91 usrp_param json_2_parameters(std::string message);
92 
93 //this stuff is encoding the server action with a int
95 
96 //convert (and define) the action requrest code into a enumerator used by the server to decide what to do
98 
99 //manage all the async communication between the server and the API
101 
102  public:
103 
104  //determines if the async server is connected or not
105  std::atomic<bool> ASYNC_SERVER_CONNECTED;
106 
107  bool connected();
108 
109  Async_server(bool init_verbose = false);
110 
111  bool chk_new_command();
112 
113  //blocks until it pushes the pointer in the async transmit queue
114  void send_async(std::string* message);
115 
116  //return true if there is a message and points to it
117  bool recv_async(usrp_param &my_parameter, bool blocking = true);
118 
119  private:
120 
121  //determine if the threads foreward messages to the std out
122  bool verbose;
123 
124  //relax the busy loops. Value in ms
125  int loop_delay = 50;
126 
127  //this thread will fill the command_queue
128  boost::thread* TCP_async_worker_RX;
129  boost::thread* TCP_async_worker_TX;
130 
131  async_queue* command_queue;
132  async_queue* response_queue;
133 
134  boost::asio::io_service *io_service = nullptr;
135  boost::asio::socket_base::reuse_address option;
136  tcp::acceptor *acceptor;
137  tcp::socket *socket;
138 
139  //connect to the async server
140  void connect(int init_tcp_port);
141 
142  void Disconnect();
143 
144  void Reconnect();
145 
146  //returns the number of bytes in the next async message
147  int check_header(char* init_header);
148 
149  //format the header to be coherent with msg length and initialization code
150  //returns the size in byte to send
151  void format_header(char* header, std::string* message);
152 
153  void rx_async(async_queue* link_command_queue);
154 
155  void tx_async(async_queue* response_queue_link);
156 };
157 
158 #endif
char * format_error()
char * format_parameter(usrp_param *parameters, bool response)
std::atomic< bool > NET_IS_STREAMING
char * format_status()
void reconnect(int init_tcp_port)
std::atomic< bool > reconnect_async
std::atomic< bool > reconnect_data
boost::lockfree::queue< std::string *> async_queue
preallocator< float2 > * memory
rx_queue * out_queue
std::atomic< bool > ASYNC_SERVER_CONNECTED
int clear_stream_queue(rx_queue *q, preallocator< float2 > *memory)
servr_action code_2_server_action(int code)
std::atomic< bool > NET_IS_CONNECTED
void update_pointers(rx_queue *init_stream_queue, preallocator< float2 > *init_memory)
usrp_param json_2_parameters(std::string message)
rx_queue * stream_queue
Sync_server(rx_queue *init_stream_queue, preallocator< float2 > *init_memory, bool init_passthrough=false)
bool stop(bool force=false)
void connect(int init_tcp_port)
bool start(param *current_settings)