USRP_Server  2.0
A flexible, GPU-accelerated radio-frequency readout software.
USRP_file_writer.cpp
Go to the documentation of this file.
1 #include "USRP_file_writer.hpp"
2 
3 using namespace H5;
4 
5 //the initialization needs a rx queue to get packets and a memory to dispose of them
6 //NOTE: the file writer should always be the last element of the chain unless using a very fast storage support
7 H5_file_writer::H5_file_writer(rx_queue* init_queue, preallocator<float2>* init_memory){
8 
9  writing_in_progress = false;
10  dspace_rank = 2;
11  dimsf = (hsize_t*)malloc(sizeof(hsize_t)*dspace_rank);
12 
13  //defining a complex data type for HDF5
14  //NOTE: this is compatible with H5py/numpy interface
15  complex_data_type = new CompType(sizeof(float2));
16  complex_data_type->insertMember( "r", 0, PredType::NATIVE_FLOAT);
17  complex_data_type->insertMember( "i", sizeof(float), PredType::NATIVE_FLOAT);
18 
19 
20  stream_queue = init_queue;
21  memory = init_memory;
22 }
23 
24 
25 //initialize after a streaming server class to terminate the dsp chain
27 
28  writing_in_progress = false;
29  dspace_rank = 2;
30  dimsf = (hsize_t*)malloc(sizeof(hsize_t)*dspace_rank);
31 
32  //defining a complex data type for HDF5
33  //NOTE: this is compatible with H5py/numpy interface
34  complex_data_type = new CompType(sizeof(float2));
35  complex_data_type->insertMember( "r", 0, PredType::NATIVE_FLOAT);
36  complex_data_type->insertMember( "i", sizeof(float), PredType::NATIVE_FLOAT);
37 
38 
39  stream_queue = streaming_server->out_queue;
40  memory = streaming_server->memory;
41 }
42 void H5_file_writer::start(usrp_param* global_params){//
43 
44  if(not writing_in_progress){
45  std::stringstream group_name;
46  group_name<<"/raw_data"<<global_params->usrp_number;
47  file = new H5File(get_name(),H5F_ACC_TRUNC);
48  group = new Group( file->createGroup( group_name.str() ));
49 
50  //initialize the pointers to an invalid location by default (needed in conditions)
51  A_TXRX=NULL;
52  B_TXRX=NULL;
53  A_RX2=NULL;
54  B_RX2=NULL;
55 
56  int scalar_dspace_rank = 1;
57  hsize_t* attribute_dimension = (hsize_t*)malloc(sizeof(hsize_t)*scalar_dspace_rank);
58  attribute_dimension[0] = 1;
59  DataSpace att_space_int(scalar_dspace_rank, attribute_dimension);
60  DataSpace att_space_float(scalar_dspace_rank, attribute_dimension);
61 
62  Attribute err = group->createAttribute( "usrp_number", PredType::NATIVE_INT, att_space_int );
63  err.write(PredType::NATIVE_INT, &global_params->usrp_number);
64 
65  int n_active_channels = global_params->get_number(RX);
66 
67  Attribute rx_num = group->createAttribute( "active_rx", PredType::NATIVE_INT, att_space_int );
68  rx_num.write(PredType::NATIVE_INT, &n_active_channels);
69 
70  if(global_params->A_TXRX.mode != OFF){
71  std::stringstream sub_group_name;
72  sub_group_name<<group_name.str()<<"/A_TXRX";
73  A_TXRX = new Group( file->createGroup( sub_group_name.str() ));
74  write_properties(A_TXRX, &(global_params->A_TXRX));
75  }
76  if(global_params->B_TXRX.mode != OFF){
77  std::stringstream sub_group_name;
78  sub_group_name<<group_name.str()<<"/B_TXRX";
79  B_TXRX = new Group( file->createGroup( sub_group_name.str() ));
80  write_properties(B_TXRX, &(global_params->B_TXRX));
81  }
82  if(global_params->A_RX2.mode != OFF){
83  std::stringstream sub_group_name;
84  sub_group_name<<group_name.str()<<"/A_RX2";
85  A_RX2 = new Group( file->createGroup( sub_group_name.str() ));
86  write_properties(A_RX2, &(global_params->A_RX2));
87  }
88  if(global_params->B_RX2.mode != OFF){
89  std::stringstream sub_group_name;
90  sub_group_name<<group_name.str()<<"/B_RX2";
91  B_RX2 = new Group( file->createGroup( sub_group_name.str() ));
92  write_properties(B_RX2, &(global_params->B_RX2));
93  }
94 
95  binary_writer = new boost::thread(boost::bind(&H5_file_writer::write_files,this));
96  }else print_error("Cannot write a new binary file on disk, writing thread is still running.");
97 }
98 
99 bool H5_file_writer::stop(bool force){
100  //std::cout<<(force?"Closing HDF5 files..":"Waiting for HDF5 file(s)... ")<<std::flush;
101  if(not force){
102  force_close = false;
103  binary_writer->interrupt();
104  if (not writing_in_progress)binary_writer->join();
105  return writing_in_progress;
106  }else{
107  force_close = true;
108  binary_writer->interrupt();
109  binary_writer->join();
110  clean_queue(); //dangerous behaviour?
111  return writing_in_progress;
112  }
113 }
114 
116 
117  free(dimsf);
118 
119 }
120 
121 //in case of pointers update in the TXRX class method set() those functions have to be called
122 //this is because memory size adjustments on rx_output preallocator
123 void H5_file_writer::update_pointers(rx_queue* init_queue, preallocator<float2>* init_memory){
124  stream_queue = init_queue;
125  memory = init_memory;
126 }
128  stream_queue = streaming_server->out_queue;
129  memory = streaming_server->memory;
130 }
131 
132 void H5_file_writer::write_properties(Group *measure_group, param* parameters_group){
133 
134  //set up attribute geometry
135  int scalar_dspace_rank = 1;
136  hsize_t* attribute_dimension = (hsize_t*)malloc(sizeof(hsize_t)*scalar_dspace_rank);
137  attribute_dimension[0] = 1;
138  DataSpace att_space_int(scalar_dspace_rank, attribute_dimension);
139 
140  //string type
141  StrType str_type(PredType::C_S1, H5T_VARIABLE);
142 
143  std::vector<const char *> mode_c_str;
144  mode_c_str.push_back(ant_mode_to_str(parameters_group->mode).c_str());
145  Attribute mode(measure_group->createAttribute("mode" , str_type, att_space_int));
146  mode.write(str_type, (void*)&mode_c_str[0]);
147 
148  //write scalar attributes
149  Attribute rate = measure_group->createAttribute( "rate", PredType::NATIVE_INT, att_space_int );
150  rate.write(PredType::NATIVE_INT, &(parameters_group->rate));
151 
152  Attribute rf = measure_group->createAttribute( "rf", PredType::NATIVE_INT, att_space_int );
153  rf.write(PredType::NATIVE_INT, &(parameters_group->tone));
154 
155  Attribute gain = measure_group->createAttribute( "gain", PredType::NATIVE_INT, att_space_int );
156  gain.write(PredType::NATIVE_INT, &(parameters_group->gain));
157 
158  Attribute bw = measure_group->createAttribute( "bw", PredType::NATIVE_INT, att_space_int );
159  bw.write(PredType::NATIVE_INT, &(parameters_group->bw));
160 
161  Attribute samples = measure_group->createAttribute( "samples", PredType::NATIVE_LONG, att_space_int );
162  samples.write(PredType::NATIVE_LONG, &(parameters_group->samples));
163 
164  Attribute delay = measure_group->createAttribute( "delay", PredType::NATIVE_FLOAT, att_space_int );
165  delay.write(PredType::NATIVE_FLOAT, &(parameters_group->delay));
166 
167  Attribute burst_on = measure_group->createAttribute( "burst_on", PredType::NATIVE_FLOAT, att_space_int );
168  burst_on.write(PredType::NATIVE_FLOAT, &(parameters_group->burst_on));
169 
170  Attribute burst_off = measure_group->createAttribute( "burst_off", PredType::NATIVE_FLOAT, att_space_int );
171  burst_off.write(PredType::NATIVE_FLOAT, &(parameters_group->burst_off));
172 
173  Attribute buffer_len_att = measure_group->createAttribute( "buffer_len", PredType::NATIVE_INT, att_space_int );
174  buffer_len_att.write(PredType::NATIVE_INT, &(parameters_group->buffer_len));
175 
176  Attribute fft_tones = measure_group->createAttribute( "fft_tones", PredType::NATIVE_INT, att_space_int );
177  fft_tones.write(PredType::NATIVE_INT, &(parameters_group->fft_tones));
178 
179  Attribute pf_average = measure_group->createAttribute( "pf_average", PredType::NATIVE_INT, att_space_int );
180  pf_average.write(PredType::NATIVE_INT, &(parameters_group->pf_average));
181 
182  Attribute decim = measure_group->createAttribute( "decim", PredType::NATIVE_INT, att_space_int );
183  decim.write(PredType::NATIVE_INT, &(parameters_group->decim));
184 
185  //set the number of channels
186  int n_chan_int = (int)(parameters_group->wave_type.size());//address of temporary is a bad isea
187  Attribute n_chan = measure_group->createAttribute( "n_chan", PredType::NATIVE_INT, att_space_int );
188  n_chan.write(PredType::NATIVE_INT, &n_chan_int);
189 
190  //vectorial dataspaces geometry setup
191  hsize_t* attribute_dimension_vect = (hsize_t*)malloc(sizeof(hsize_t)*scalar_dspace_rank);
192  attribute_dimension_vect[0] = parameters_group->wave_type.size();
193  DataSpace vect_dataspace(scalar_dspace_rank, attribute_dimension_vect);
194 
195  //Convert the vector into a C string array.
196  //Because the input function ::write requires that.
197  std::vector<const char *> cStrArray;
198  for(size_t index = 0; index < parameters_group->wave_type.size(); ++index){
199  cStrArray.push_back(w_type_to_str(parameters_group->wave_type[index]).c_str());
200  }
201 
202  //write the description of the signal processing
203  Attribute wave_types(measure_group->createAttribute("wave_type" , str_type, vect_dataspace));
204  wave_types.write(str_type, (void*)&cStrArray[0]);
205 
206  Attribute freq = measure_group->createAttribute( "freq", PredType::NATIVE_INT, vect_dataspace );
207  freq.write(PredType::NATIVE_INT, (void*)&(parameters_group->freq)[0]);
208 
209  Attribute chirp_f = measure_group->createAttribute( "chirp_f", PredType::NATIVE_INT, vect_dataspace );
210  chirp_f.write(PredType::NATIVE_INT, (void*)&(parameters_group->chirp_f)[0]);
211 
212  Attribute swipe_s = measure_group->createAttribute( "swipe_s", PredType::NATIVE_INT, vect_dataspace );
213  swipe_s.write(PredType::NATIVE_INT, (void*)&(parameters_group->swipe_s)[0]);
214 
215  Attribute ampl = measure_group->createAttribute( "ampl", PredType::NATIVE_FLOAT, vect_dataspace );
216  ampl.write(PredType::NATIVE_FLOAT, (void*)&(parameters_group->ampl)[0]);
217 
218  Attribute chirp_t = measure_group->createAttribute( "chirp_t", PredType::NATIVE_FLOAT, vect_dataspace );
219  chirp_t.write(PredType::NATIVE_FLOAT, (void*)&(parameters_group->chirp_t)[0]);
220 
221 }
222 
223 std::string H5_file_writer::get_name(){
224 
225  time_t rawtime;
226  struct tm * timeinfo;
227  char buffer[100];
228  std::stringstream ss;
229 
230  time (&rawtime);
231  timeinfo = localtime(&rawtime);
232 
233  strftime(buffer,sizeof(buffer),"%d%m%Y_%I%M%S",timeinfo);
234  std::string str(buffer);
235 
236  ss << "USRP_"<<str<<".h5";
237  std::cout<<"Saving measure to file name: "<<ss.str()<<std::endl;
238  return ss.str();
239 
240 }
241 void H5_file_writer::clean_queue(){
242 
243  RX_wrapper wrapper;
244  if(not writing_in_progress){
245  while(not stream_queue->empty())if(stream_queue->pop(wrapper))memory->trash(wrapper.buffer);
246  }else print_warning("Cannot clean the binary file queue: binary file writer is still runnning.");
247 }
248 
249 void H5_file_writer::write_files(){
250 
251  bool active = true;
252  writing_in_progress = true;
253  RX_wrapper wrapper;
254 
255  //hdf5 attributes for single buffer
256  int scalar_dspace_rank = 1;
257  hsize_t* attribute_dimension = (hsize_t*)malloc(sizeof(hsize_t)*scalar_dspace_rank);
258  attribute_dimension[0] = 1;
259  DataSpace att_space(scalar_dspace_rank, attribute_dimension);
260 
261  //variable needed to
262  bool finishing = true;
263  while(active or finishing){
264  try{
265  boost::this_thread::interruption_point();
266 
267  if(stream_queue->pop(wrapper)){
268  //select subgroup where to write the packet
269  std::stringstream ss;
270  ss<<"raw_data"<<wrapper.usrp_number<<"/"<< get_front_end_name(wrapper.front_end_code) <<"/dataset_"<<wrapper.packet_number<<"/";
271 
272  // create new dspace (holds geometry info about the data)
273  //NOTR: rank is always 2.
274  dimsf[0] = wrapper.channels;
275  dimsf[1] = wrapper.length/wrapper.channels;
276  dataspace = new DataSpace(dspace_rank, dimsf);
277 
278  //data container in the group
279  DataSet dataset = file->createDataSet( ss.str(), *complex_data_type, *dataspace );
280 
281 
282  //write attributes
283  Attribute pn = dataset.createAttribute( "packet_number", PredType::NATIVE_INT, att_space );
284  pn.write(PredType::NATIVE_INT, &wrapper.packet_number);
285 
286  Attribute err = dataset.createAttribute( "errors", PredType::NATIVE_INT, att_space );
287  err.write(PredType::NATIVE_INT, &wrapper.errors);
288 
289  //write data in the dataset
290  dataset.write( wrapper.buffer, *complex_data_type );
291  dataset.flush(H5F_SCOPE_LOCAL);
292 
293  //cleanup stuff
294  delete dataspace;
295 
296  memory->trash(wrapper.buffer);
297  }else{
298  boost::this_thread::sleep_for(boost::chrono::milliseconds{20});
299  }
300 
301  if(not active)finishing = not stream_queue->empty();
302 
303  }catch(boost::thread_interrupted &){
304  active = false;
305  if (not force_close){
306  finishing = not stream_queue->empty();
307  }else finishing = false;
308  }
309  }
310  if(A_TXRX){
311  A_TXRX->flush(H5F_SCOPE_LOCAL);
312  A_TXRX->flush(H5F_SCOPE_GLOBAL);
313  delete A_TXRX;
314  }
315  if(A_RX2){
316  A_RX2->flush(H5F_SCOPE_LOCAL);
317  A_RX2->flush(H5F_SCOPE_GLOBAL);
318  delete A_RX2;
319  }
320  if(B_TXRX){
321  B_TXRX->flush(H5F_SCOPE_LOCAL);
322  B_TXRX->flush(H5F_SCOPE_GLOBAL);
323  delete B_TXRX;
324  }
325  if(B_RX2){
326  B_RX2->flush(H5F_SCOPE_LOCAL);
327  B_RX2->flush(H5F_SCOPE_GLOBAL);
328  delete B_RX2;
329  }
330 
331 
332  group->flush(H5F_SCOPE_LOCAL);
333  group->flush(H5F_SCOPE_GLOBAL);
334  file->flush(H5F_SCOPE_LOCAL);
335  file->flush(H5F_SCOPE_GLOBAL);
336 
337  delete group;
338  delete file;
339 
340  writing_in_progress = false;
341 }
std::string ant_mode_to_str(ant_mode enumerator)
std::string get_front_end_name(char code)
std::string w_type_to_str(w_type enumerator)
void print_error(std::string text)
preallocator< float2 > * memory
bool stop(bool force=false)
rx_queue * out_queue
H5_file_writer(rx_queue *init_queue, preallocator< float2 > *init_memory)
void start(usrp_param *global_params)
void print_warning(std::string text)
void update_pointers(rx_queue *init_queue, preallocator< float2 > *init_memory)