9 writing_in_progress =
false;
11 dimsf = (hsize_t*)malloc(
sizeof(hsize_t)*dspace_rank);
15 complex_data_type =
new CompType(
sizeof(float2));
16 complex_data_type->insertMember(
"r", 0, PredType::NATIVE_FLOAT);
17 complex_data_type->insertMember(
"i",
sizeof(
float), PredType::NATIVE_FLOAT);
20 stream_queue = init_queue;
28 writing_in_progress =
false;
30 dimsf = (hsize_t*)malloc(
sizeof(hsize_t)*dspace_rank);
34 complex_data_type =
new CompType(
sizeof(float2));
35 complex_data_type->insertMember(
"r", 0, PredType::NATIVE_FLOAT);
36 complex_data_type->insertMember(
"i",
sizeof(
float), PredType::NATIVE_FLOAT);
39 stream_queue = streaming_server->
out_queue;
40 memory = streaming_server->
memory;
44 if(not writing_in_progress){
45 std::stringstream group_name;
46 group_name<<
"/raw_data"<<global_params->usrp_number;
47 file =
new H5File(get_name(),H5F_ACC_TRUNC);
48 group =
new Group( file->createGroup( group_name.str() ));
56 int scalar_dspace_rank = 1;
57 hsize_t* attribute_dimension = (hsize_t*)malloc(
sizeof(hsize_t)*scalar_dspace_rank);
58 attribute_dimension[0] = 1;
59 DataSpace att_space_int(scalar_dspace_rank, attribute_dimension);
60 DataSpace att_space_float(scalar_dspace_rank, attribute_dimension);
62 Attribute err = group->createAttribute(
"usrp_number", PredType::NATIVE_INT, att_space_int );
63 err.write(PredType::NATIVE_INT, &global_params->usrp_number);
65 int n_active_channels = global_params->get_number(RX);
67 Attribute rx_num = group->createAttribute(
"active_rx", PredType::NATIVE_INT, att_space_int );
68 rx_num.write(PredType::NATIVE_INT, &n_active_channels);
70 if(global_params->A_TXRX.mode != OFF){
71 std::stringstream sub_group_name;
72 sub_group_name<<group_name.str()<<
"/A_TXRX";
73 A_TXRX =
new Group( file->createGroup( sub_group_name.str() ));
74 write_properties(A_TXRX, &(global_params->A_TXRX));
76 if(global_params->B_TXRX.mode != OFF){
77 std::stringstream sub_group_name;
78 sub_group_name<<group_name.str()<<
"/B_TXRX";
79 B_TXRX =
new Group( file->createGroup( sub_group_name.str() ));
80 write_properties(B_TXRX, &(global_params->B_TXRX));
82 if(global_params->A_RX2.mode != OFF){
83 std::stringstream sub_group_name;
84 sub_group_name<<group_name.str()<<
"/A_RX2";
85 A_RX2 =
new Group( file->createGroup( sub_group_name.str() ));
86 write_properties(A_RX2, &(global_params->A_RX2));
88 if(global_params->B_RX2.mode != OFF){
89 std::stringstream sub_group_name;
90 sub_group_name<<group_name.str()<<
"/B_RX2";
91 B_RX2 =
new Group( file->createGroup( sub_group_name.str() ));
92 write_properties(B_RX2, &(global_params->B_RX2));
95 binary_writer =
new boost::thread(boost::bind(&H5_file_writer::write_files,
this));
96 }
else print_error(
"Cannot write a new binary file on disk, writing thread is still running.");
103 binary_writer->interrupt();
104 if (not writing_in_progress)binary_writer->join();
105 return writing_in_progress;
108 binary_writer->interrupt();
109 binary_writer->join();
111 return writing_in_progress;
124 stream_queue = init_queue;
125 memory = init_memory;
128 stream_queue = streaming_server->
out_queue;
129 memory = streaming_server->
memory;
132 void H5_file_writer::write_properties(Group *measure_group, param* parameters_group){
135 int scalar_dspace_rank = 1;
136 hsize_t* attribute_dimension = (hsize_t*)malloc(
sizeof(hsize_t)*scalar_dspace_rank);
137 attribute_dimension[0] = 1;
138 DataSpace att_space_int(scalar_dspace_rank, attribute_dimension);
141 StrType str_type(PredType::C_S1, H5T_VARIABLE);
143 std::vector<const char *> mode_c_str;
145 Attribute mode(measure_group->createAttribute(
"mode" , str_type, att_space_int));
146 mode.write(str_type, (
void*)&mode_c_str[0]);
149 Attribute rate = measure_group->createAttribute(
"rate", PredType::NATIVE_INT, att_space_int );
150 rate.write(PredType::NATIVE_INT, &(parameters_group->rate));
152 Attribute rf = measure_group->createAttribute(
"rf", PredType::NATIVE_INT, att_space_int );
153 rf.write(PredType::NATIVE_INT, &(parameters_group->tone));
155 Attribute gain = measure_group->createAttribute(
"gain", PredType::NATIVE_INT, att_space_int );
156 gain.write(PredType::NATIVE_INT, &(parameters_group->gain));
158 Attribute bw = measure_group->createAttribute(
"bw", PredType::NATIVE_INT, att_space_int );
159 bw.write(PredType::NATIVE_INT, &(parameters_group->bw));
161 Attribute samples = measure_group->createAttribute(
"samples", PredType::NATIVE_LONG, att_space_int );
162 samples.write(PredType::NATIVE_LONG, &(parameters_group->samples));
164 Attribute delay = measure_group->createAttribute(
"delay", PredType::NATIVE_FLOAT, att_space_int );
165 delay.write(PredType::NATIVE_FLOAT, &(parameters_group->delay));
167 Attribute burst_on = measure_group->createAttribute(
"burst_on", PredType::NATIVE_FLOAT, att_space_int );
168 burst_on.write(PredType::NATIVE_FLOAT, &(parameters_group->burst_on));
170 Attribute burst_off = measure_group->createAttribute(
"burst_off", PredType::NATIVE_FLOAT, att_space_int );
171 burst_off.write(PredType::NATIVE_FLOAT, &(parameters_group->burst_off));
173 Attribute buffer_len_att = measure_group->createAttribute(
"buffer_len", PredType::NATIVE_INT, att_space_int );
174 buffer_len_att.write(PredType::NATIVE_INT, &(parameters_group->buffer_len));
176 Attribute fft_tones = measure_group->createAttribute(
"fft_tones", PredType::NATIVE_INT, att_space_int );
177 fft_tones.write(PredType::NATIVE_INT, &(parameters_group->fft_tones));
179 Attribute pf_average = measure_group->createAttribute(
"pf_average", PredType::NATIVE_INT, att_space_int );
180 pf_average.write(PredType::NATIVE_INT, &(parameters_group->pf_average));
182 Attribute decim = measure_group->createAttribute(
"decim", PredType::NATIVE_INT, att_space_int );
183 decim.write(PredType::NATIVE_INT, &(parameters_group->decim));
186 int n_chan_int = (int)(parameters_group->wave_type.size());
187 Attribute n_chan = measure_group->createAttribute(
"n_chan", PredType::NATIVE_INT, att_space_int );
188 n_chan.write(PredType::NATIVE_INT, &n_chan_int);
191 hsize_t* attribute_dimension_vect = (hsize_t*)malloc(
sizeof(hsize_t)*scalar_dspace_rank);
192 attribute_dimension_vect[0] = parameters_group->wave_type.size();
193 DataSpace vect_dataspace(scalar_dspace_rank, attribute_dimension_vect);
197 std::vector<const char *> cStrArray;
198 for(
size_t index = 0; index < parameters_group->wave_type.size(); ++index){
199 cStrArray.push_back(
w_type_to_str(parameters_group->wave_type[index]).c_str());
203 Attribute wave_types(measure_group->createAttribute(
"wave_type" , str_type, vect_dataspace));
204 wave_types.write(str_type, (
void*)&cStrArray[0]);
206 Attribute freq = measure_group->createAttribute(
"freq", PredType::NATIVE_INT, vect_dataspace );
207 freq.write(PredType::NATIVE_INT, (
void*)&(parameters_group->freq)[0]);
209 Attribute chirp_f = measure_group->createAttribute(
"chirp_f", PredType::NATIVE_INT, vect_dataspace );
210 chirp_f.write(PredType::NATIVE_INT, (
void*)&(parameters_group->chirp_f)[0]);
212 Attribute swipe_s = measure_group->createAttribute(
"swipe_s", PredType::NATIVE_INT, vect_dataspace );
213 swipe_s.write(PredType::NATIVE_INT, (
void*)&(parameters_group->swipe_s)[0]);
215 Attribute ampl = measure_group->createAttribute(
"ampl", PredType::NATIVE_FLOAT, vect_dataspace );
216 ampl.write(PredType::NATIVE_FLOAT, (
void*)&(parameters_group->ampl)[0]);
218 Attribute chirp_t = measure_group->createAttribute(
"chirp_t", PredType::NATIVE_FLOAT, vect_dataspace );
219 chirp_t.write(PredType::NATIVE_FLOAT, (
void*)&(parameters_group->chirp_t)[0]);
223 std::string H5_file_writer::get_name(){
226 struct tm * timeinfo;
228 std::stringstream ss;
231 timeinfo = localtime(&rawtime);
233 strftime(buffer,
sizeof(buffer),
"%d%m%Y_%I%M%S",timeinfo);
234 std::string str(buffer);
236 ss <<
"USRP_"<<str<<
".h5";
237 std::cout<<
"Saving measure to file name: "<<ss.str()<<std::endl;
241 void H5_file_writer::clean_queue(){
244 if(not writing_in_progress){
245 while(not stream_queue->empty())
if(stream_queue->pop(wrapper))memory->trash(wrapper.buffer);
246 }
else print_warning(
"Cannot clean the binary file queue: binary file writer is still runnning.");
249 void H5_file_writer::write_files(){
252 writing_in_progress =
true;
256 int scalar_dspace_rank = 1;
257 hsize_t* attribute_dimension = (hsize_t*)malloc(
sizeof(hsize_t)*scalar_dspace_rank);
258 attribute_dimension[0] = 1;
259 DataSpace att_space(scalar_dspace_rank, attribute_dimension);
262 bool finishing =
true;
263 while(active or finishing){
265 boost::this_thread::interruption_point();
267 if(stream_queue->pop(wrapper)){
269 std::stringstream ss;
270 ss<<
"raw_data"<<wrapper.usrp_number<<
"/"<<
get_front_end_name(wrapper.front_end_code) <<
"/dataset_"<<wrapper.packet_number<<
"/";
274 dimsf[0] = wrapper.channels;
275 dimsf[1] = wrapper.length/wrapper.channels;
276 dataspace =
new DataSpace(dspace_rank, dimsf);
279 DataSet dataset = file->createDataSet( ss.str(), *complex_data_type, *dataspace );
283 Attribute pn = dataset.createAttribute(
"packet_number", PredType::NATIVE_INT, att_space );
284 pn.write(PredType::NATIVE_INT, &wrapper.packet_number);
286 Attribute err = dataset.createAttribute(
"errors", PredType::NATIVE_INT, att_space );
287 err.write(PredType::NATIVE_INT, &wrapper.errors);
290 dataset.write( wrapper.buffer, *complex_data_type );
291 dataset.flush(H5F_SCOPE_LOCAL);
296 memory->trash(wrapper.buffer);
298 boost::this_thread::sleep_for(boost::chrono::milliseconds{20});
301 if(not active)finishing = not stream_queue->empty();
303 }
catch(boost::thread_interrupted &){
305 if (not force_close){
306 finishing = not stream_queue->empty();
307 }
else finishing =
false;
311 A_TXRX->flush(H5F_SCOPE_LOCAL);
312 A_TXRX->flush(H5F_SCOPE_GLOBAL);
316 A_RX2->flush(H5F_SCOPE_LOCAL);
317 A_RX2->flush(H5F_SCOPE_GLOBAL);
321 B_TXRX->flush(H5F_SCOPE_LOCAL);
322 B_TXRX->flush(H5F_SCOPE_GLOBAL);
326 B_RX2->flush(H5F_SCOPE_LOCAL);
327 B_RX2->flush(H5F_SCOPE_GLOBAL);
332 group->flush(H5F_SCOPE_LOCAL);
333 group->flush(H5F_SCOPE_GLOBAL);
334 file->flush(H5F_SCOPE_LOCAL);
335 file->flush(H5F_SCOPE_GLOBAL);
340 writing_in_progress =
false;
std::string ant_mode_to_str(ant_mode enumerator)
std::string get_front_end_name(char code)
std::string w_type_to_str(w_type enumerator)
void print_error(std::string text)
preallocator< float2 > * memory
bool stop(bool force=false)
H5_file_writer(rx_queue *init_queue, preallocator< float2 > *init_memory)
void start(usrp_param *global_params)
void print_warning(std::string text)
void update_pointers(rx_queue *init_queue, preallocator< float2 > *init_memory)