Source code for USRP_noise

########################################################################################
##                                                                                    ##
##  THIS LIBRARY IS PART OF THE SOFTWARE DEVELOPED BY THE JET PROPULSION LABORATORY   ##
##  IN THE CONTEXT OF THE GPU ACCELERATED FLEXIBLE RADIOFREQUENCY READOUT PROJECT     ##
##                                                                                    ##
########################################################################################

import numpy as np
import scipy.signal as signal
import signal as Signal
import h5py
import sys
import struct
import json
import os
import socket
import Queue
from Queue import Empty
from threading import Thread, Condition
import multiprocessing
from joblib import Parallel, delayed
from subprocess import call
import time
import gc
import datetime

# plotly stuff
from plotly.graph_objs import Scatter, Layout
from plotly import tools
import plotly.plotly as py
import plotly.graph_objs as go
import plotly
import colorlover as cl

# matplotlib stuff
import matplotlib
matplotlib.use('agg')
import matplotlib.pyplot as pl
import matplotlib.patches as mpatches
from matplotlib.ticker import EngFormatter

# needed to print the data acquisition process
import progressbar

# import submodules
from USRP_low_level import *
from USRP_files import *
from USRP_delay import *
from USRP_fitting import get_fit_param
from USRP_fitting import get_fit_data

[docs]def dual_get_noise(tones_A, tones_B, measure_t, rate, decimation = None, amplitudes_A = None, amplitudes_B = None, RF_A = None, RF_B = None, tx_gain_A = 0, tx_gain_B = 0, output_filename = None, Device = None, delay = None, pf_average = None, mode = "DIRECT" ,**kwargs): ''' Perform a noise acquisition using fixed tone technique on both frontend with a symmetrical PFB setup Arguments: - tones_A/B: list of ABSOLUTE tones frequencies in Hz for frontend A/B. - measure_t: duration of the measure in seconds. - decimation: the decimation factor to use for the acquisition. Default is minimum. Note that with the PFB the decimation factor can only be >= N_tones. - amplitudes_A/B: a list of linear power to use with each tone. Must be the same length of tones arg; will be normalized. Default is equally splitted power. - RF_A/B: central up/down mixing frequency. Default is deducted by other arguments. - tx_gain_A/B: gain to use in the transmission side. - output_filename: eventual filename. default is datetime. - Device: the on-server device number to use. default is 0. - delay: delay between TX and RX processes. Default is taken from the INTERNAL_DELAY variable. - pf_average: pfb averaging factor. Default is 4 for PFB mode and 1 for DIRECT mode. - mode: noise acquisition kernels. DIRECT uses direct demodulation PFB use the polyphase filter bank technique. Note that PF average will refer to something slightly different in DIRECT mode (moving average ratio: 1 has no overlap). - kwargs: * verbose: additional prints. Default is False. * push_queue: queue for post writing samples. Note: - In the PFB acquisition scheme the decimation factor and bin width are directly correlated. This function execute a check on the input parameters to determine the number of FFT bins to use. Returns: - filename of the measure file. ''' global USRP_data_queue, REMOTE_FILENAME, END_OF_MEASURE, LINE_DELAY, USRP_power try: verbose = kwargs['verbose'] except KeyError: verbose = False if ((mode != "PFB") and (mode != "DIRECT")): error_msg = "Noise acquisition mode %s not defined" % str(mode) print_error(err_msg) raise ValueError(err_msg) if pf_average is None: if mode == "DIRECT": pf_average = 1 elif mode == "PFB": pf_average = 4 try: push_queue = kwargs['push_queue'] except KeyError: push_queue = None if output_filename is None: output_filename = "USRP_Noise_"+get_timestamp() else: output_filename = str(output_filename) print("Begin dual noise acquisition, file %s ..."%output_filename) if measure_t <= 0: print_error("Cannot execute a noise measure with "+str(measure_t)+"s duration.") return "" tx_gain_A = int(np.abs(tx_gain_A)) tx_gain_B = int(np.abs(tx_gain_B)) if tx_gain_A > 31: print_warning("Max tx_gain_A is usually 31 dB. %d dB selected"%tx_gain_A) if tx_gain_B > 31: print_warning("Max tx_gain_B is usually 31 dB. %d dB selected"%tx_gain_B) if not (int(rate) in USRP_accepted_rates): print_warning("Requested rate will cause additional CIC filtering in USRP firmware. To avoid this use one of these rates (MHz): "+str(USRP_accepted_rates)) if RF_A is None or RF_B is None: print_warning("Assuming tones are in absolute units (detector bandwith)") #calculate the splitting Frequency err_msg = 'Automatic frequency calculation has not been implemented for dual noise acquisition' print_error(err_msg) raise ValueError(err_msg) else: prev = len(tones_A) + len(tones_B) tones_A = tones_A[np.abs(tones_A) - rate/2 < 0] tones_B = tones_B[np.abs(tones_B) - rate/2 < 0] if len(tones_B) + len(tones_A) < prev: print_warning("Some tone has been discrarded because out of bandwidth") ''' common = list(set(tones_A).intersection(tones_B)) if len(common) > 0: print_warning("Some tone is compatible with both frontend tunings, see next messages.") for cc in common: if len(tones_A)>=len(tones_B): print_debug("length of tones A: %d, length of tones B: %d, removing from %s"%(len(tones_A),len(tones_B),'A')) np.delete(tones_A, np.where(tones_A == cc)[0][0]) else: print_debug("length of tones A: %d, length of tones B: %d, removing from %s"%(len(tones_A),len(tones_B),'B')) np.delete(tones_B, np.where(tones_B == cc)[0][0]) ''' print_debug("RF_A\tRF_B\t[MHz]") print_debug("%.2f\t%.2f"%(RF_A/1e6,RF_B/1e6)) print_debug("TONES_A\tTONES_B\t[MHz]") tones_strings = [] for j in range(len(tones_A)): tones_strings.append("%.2f\t"%(tones_A[j]/1e6)) for j in range(len(tones_B)): try: tones_strings[j]+="%.2f"%(tones_B[j]/1e6) except IndexError: tones_strings.append("\t%.2f"%(tones_B[j]/1e6)) if amplitudes_A is not None: if len(amplitudes_A) != len(tones_A): print_warning("Amplitudes for RF A profile length is %d and it's different from tones array length that is %d. Amplitude profile will be ignored."%(len(amplitudes_A),len(tones_A))) amplitudes_A = [1. / len(tones_A) for x in tones_A] print_debug("Using %.1f percent of A DAC range"%(np.sum(amplitudes_A)*100)) else: print_debug("Using 100 percent of RF A DAC ranges.") amplitudes_A = [1. / len(tones_A) for x in tones_A] if amplitudes_B is not None: if len(amplitudes_B) != len(tones_B): print_warning("Amplitudes for RF B profile length is %d and it's different from tones array length that is %d. Amplitude profile will be ignored."%(len(amplitudes_B),len(tones_B))) amplitudes_B = [1. / len(tones_B) for x in tones_B] print_debug("Using %.1f percent of /b DAC range"%(np.sum(amplitudes_B)*100)) else: print_debug("Using 100 percent of RF B DAC ranges.") amplitudes_B = [1. / len(tones_B) for x in tones_B] TX_frontend_A = "A_TXRX" RX_frontend_A = "A_RX2" TX_frontend_B = "B_TXRX" RX_frontend_B = "B_RX2" if delay is None: try: delay = LINE_DELAY[str(int(rate/1e6))] delay *= 1e-9 except KeyError: print_warning("Cannot find associated line delay for a rate of %d Msps. Setting to 0s."%(int(rate/1e6))) delay = 0 else: print_debug("Using a delay of %d ns" % int(delay*1e9)) if mode == "PFB": # Calculate the number of channel needed per rf frontend A if len(tones_A)>1: min_required_space_A = np.min([ x for x in np.abs([[tones_A[i]-tones_A[j] if i!=j else 1e8 for j in range(len(tones_A))] for i in range(len(tones_A))]).flatten()]) print_debug("Minimum bin width required for frontend A is %.2f MHz"%(min_required_space_A/1e6)) min_required_fft_A = int(np.ceil(float(rate) / float(min_required_space_A))) else: min_required_fft_A = 10 if decimation is not None: if decimation < min_required_fft_A: print_warning("Cannot use a decimation factor of %d as the minimum required number of bin in the PFB of frontend A is %d" % (decimation, min_required_fft_A)) final_fft_bins_A = min_required_fft_A else: final_fft_bins_A = int(decimation) else: final_fft_bins_A = int(min_required_fft_A) if final_fft_bins_A<10: # Less that 10 pfb bins cause a bottleneck in the GPU: too many instruction to fetch. final_fft_bins_A = 10 print_debug("Using %d PFB channels"%final_fft_bins_A) for i in range(len(tones_A)): if tones_A[i] > rate / 2: print_error("Out of bandwidth tone!") raise ValueError("Out of bandwidth tone requested in frontend A. %.2f MHz / %.2f MHz (Nyq)" %(tones_A[i]/1e6, rate / 2e6) ) #tones_A = tones_A - RF_A tones_A = quantize_tones(tones_A, rate, final_fft_bins_A) # Calculate the number of channel needed per rf frontend B if len(tones_B)>1: min_required_space_B = np.min([ x for x in np.abs([[tones_B[i]-tones_B[j] if i!=j else 1e8 for j in range(len(tones_B))] for i in range(len(tones_B))]).flatten()]) print_debug("Minimum bin width required for frontend B is %.2f MHz"%(min_required_space_B/1e6)) min_required_fft_B = int(np.ceil(float(rate) / float(min_required_space_B))) else: min_required_fft_B = 10 if decimation is not None: if decimation < min_required_fft_B: print_warning("Cannot use a decimation factor of %d as the minimum required number of bin in the PFB of frontend A is %d" % (decimation, min_required_fft_B)) final_fft_bins_B = min_required_fft_B else: final_fft_bins_B = int(decimation) else: final_fft_bins_B = int(min_required_fft_B) if final_fft_bins_B<10: # Less that 10 pfb bins cause a bottleneck in the GPU: too many instruction to fetch. final_fft_bins_B = 10 print_debug("Using %d PFB channels"%final_fft_bins_B) for i in range(len(tones_B)): if tones_B[i] > rate / 2: print_error("Out of bandwidth tone!") raise ValueError("Out of bandwidth tone requested in frontend A. %.2f MHz / %.2f MHz (Nyq)" %(tones_A[i]/1e6, rate / 2e6) ) #tones_B = tones_B - RF_B tones_B = quantize_tones(tones_B, rate, final_fft_bins_B) number_of_samples = rate * measure_t print_warning("overriding number of bins: calculation above it's wrong") final_fft_bins_B = int(decimation) final_fft_bins_A = int(decimation) ''' print("RF Frontend A") print("Tone [MHz]\tPower [dBm]\tOffset [MHz]") for i in range(len(tones_A)): print("%.1f\t%.2f\t%.3f" % ((RF_A + tones_A[i]) / 1e6, USRP_power + 20 * np.log10(amplitudes_A[i]) + tx_gain_A, tones_A[i] / 1e6)) print("RF Frontend B") print("Tone [MHz]\tPower [dBm]\tOffset [MHz]") for i in range(len(tones_B)): print("%.1f\t%.2f\t%.3f" % ((RF_B + tones_B[i]) / 1e6, USRP_power + 20 * np.log10(amplitudes_B[i]) + tx_gain_B, tones_B[i] / 1e6)) ''' expected_samples_B = int(number_of_samples/final_fft_bins_B) expected_samples_A = int(number_of_samples/final_fft_bins_A) noise_command = global_parameter() noise_command.set(TX_frontend_A, "mode", "TX") noise_command.set(TX_frontend_A, "buffer_len", 1e6) noise_command.set(TX_frontend_A, "gain", tx_gain_A) noise_command.set(TX_frontend_A, "delay", 1) noise_command.set(TX_frontend_A, "samples", number_of_samples) noise_command.set(TX_frontend_A, "rate", rate) noise_command.set(TX_frontend_A, "bw", 2 * rate) #noise_command.set(TX_frontend, 'tuning_mode', 0) noise_command.set(TX_frontend_A, "wave_type", ["TONES" for x in tones_A]) noise_command.set(TX_frontend_A, "ampl", amplitudes_A) noise_command.set(TX_frontend_A, "freq", tones_A) noise_command.set(TX_frontend_A, "rf", RF_A) # This parameter does not have an effect (except suppress a warning from the server) noise_command.set(TX_frontend_A, "fft_tones", 100) noise_command.set(RX_frontend_A, "mode", "RX") #noise_command.set(RX_frontend, 'tuning_mode', 0) noise_command.set(RX_frontend_A, "buffer_len", 1e6) noise_command.set(RX_frontend_A, "gain", 0) noise_command.set(RX_frontend_A, "delay", 1 + delay) noise_command.set(RX_frontend_A, "samples", number_of_samples) noise_command.set(RX_frontend_A, "rate", rate) noise_command.set(RX_frontend_A, "bw", 2 * rate) noise_command.set(RX_frontend_A, "wave_type", ["TONES" for x in tones_A]) noise_command.set(RX_frontend_A, "freq", tones_A) noise_command.set(RX_frontend_A, "rf", RF_A) noise_command.set(RX_frontend_A, "fft_tones", final_fft_bins_A) noise_command.set(RX_frontend_A, "pf_average", pf_average) # With the polyphase filter the decimation is realized increasing the number of channels. # This parameter will average in the GPU a certain amount of PFB outputs. noise_command.set(RX_frontend_A, "decim", 0) noise_command.set(TX_frontend_B, "mode", "TX") noise_command.set(TX_frontend_B, "buffer_len", 1e6) noise_command.set(TX_frontend_B, "gain", tx_gain_B) noise_command.set(TX_frontend_B, "delay", 1) noise_command.set(TX_frontend_B, "samples", number_of_samples) noise_command.set(TX_frontend_B, "rate", rate) noise_command.set(TX_frontend_B, "bw", 2 * rate) #noise_command.set(TX_frontend, 'tuning_mode', 0) noise_command.set(TX_frontend_B, "wave_type", ["TONES" for x in tones_B]) noise_command.set(TX_frontend_B, "ampl", amplitudes_B) noise_command.set(TX_frontend_B, "freq", tones_B) noise_command.set(TX_frontend_B, "rf", RF_B) # This parameter does not have an effect (except suppress a warning from the server) noise_command.set(TX_frontend_B, "fft_tones", 100) noise_command.set(RX_frontend_B, "mode", "RX") #noise_command.set(RX_frontend, 'tuning_mode', 0) noise_command.set(RX_frontend_B, "buffer_len", 1e6) noise_command.set(RX_frontend_B, "gain", 0) noise_command.set(RX_frontend_B, "delay", 1 + delay) noise_command.set(RX_frontend_B, "samples", number_of_samples) noise_command.set(RX_frontend_B, "rate", rate) noise_command.set(RX_frontend_B, "bw", 2 * rate) noise_command.set(RX_frontend_B, "wave_type", ["TONES" for x in tones_B]) noise_command.set(RX_frontend_B, "freq", tones_B) noise_command.set(RX_frontend_B, "rf", RF_B) noise_command.set(RX_frontend_B, "fft_tones", final_fft_bins_B) noise_command.set(RX_frontend_B, "pf_average", pf_average) # With the polyphase filter the decimation is realized increasing the number of channels. # This parameter will average in the GPU a certain amount of PFB outputs. noise_command.set(RX_frontend_B, "decim", 0) elif mode == "DIRECT": expected_samples_B = number_of_samples/int(decimation) expected_samples_A = expected_samples_B err_msg = "Double noise measurement not implemented yet. It's not hard to port from the single noise measure anyway." print_error(err_msg) raise(err_msg) if noise_command.self_check(): if (verbose): print_debug("Noise command successfully checked") noise_command.pprint() Async_send(noise_command.to_json()) else: err_msg = "Something went wrong in the noise acquisition command self_check()" print_error(err_msg) raise ValueError(err_msg) Packets_to_file( parameters=noise_command, timeout=None, filename=output_filename, dpc_expected=max(expected_samples_A,expected_samples_B), meas_type="Noise", push_queue = push_queue, **kwargs ) print_debug("Noise acquisition terminated.") return output_filename
[docs]def Get_noise(tones, measure_t, rate, decimation = None, amplitudes = None, RF = None, tx_gain = 0, output_filename = None, Front_end = None, Device = None, delay = None, pf_average = 4, mode = "DIRECT", trigger = None, **kwargs): ''' Perform a noise acquisition using fixed tone technique. Arguments: - tones: list of tones frequencies in Hz (absolute if RF is not given, relative to RF otherwise). - measure_t: duration of the measure in seconds. - decimation: the decimation factor to use for the acquisition. Default for PFB mode is minimum. Note that with the PFB the decimation factor can only be >= N_tones. This parameter represent actual decimation for the DIRECT mode. - amplitudes: a list of linear power to use with each tone. Must be the same length of tones arg; will be normalized. Default is equally splitted power. - RF: central up/down mixing frequency. Default is deducted by other arguments. - tx_gain: gain to use in the transmission side. - output_filename: eventual filename. default is datetime. - Front_end: the front end to be used on the USRP. default is A. - Device: the on-server device number to use. default is 0. - delay: delay between TX and RX processes. Default is taken from the INTERNAL_DELAY variable. - pf_average: pfb averaging factor. Default is 4 for PFB mode and 1 for DIRECT mode. - mode: noise acquisition kernels. DIRECT uses direct demodulation PFB use the polyphase filter bank technique. Note that PF average will refer to something slightly different in DIRECT mode (moving average ratio: 1 has no overlap). - trigger: class used for triggering. (See trigger section for more info). Default is no trigger. - kwargs: * verbose: additional prints. Default is False. * push_queue: queue for post writing samples. Note: - In the PFB acquisition scheme the decimation factor and bin width are directly correlated. This function execute a check on the input parameters to determine the number of FFT bins to use. Returns: - filename of the measure file. ''' global USRP_data_queue, REMOTE_FILENAME, END_OF_MEASURE, LINE_DELAY, USRP_power if ((mode != "PFB") and (mode != "DIRECT")): error_msg = "Noise acquisition mode %s not defined" % str(mode) print_error(err_msg) raise ValueError(err_msg) if pf_average is None: if mode == "DIRECT": pf_average = 1 elif mode == "PFB": pf_average = 4 try: verbose = kwargs['verbose'] except KeyError: verbose = False try: push_queue = kwargs['push_queue'] except KeyError: push_queue = None if output_filename is None: output_filename = "USRP_Noise_"+get_timestamp() else: output_filename = str(output_filename) print("Begin noise acquisition, file %s ..."%output_filename) if measure_t <= 0: print_error("Cannot execute a noise measure with "+str(measure_t)+"s duration.") return "" tx_gain = int(np.abs(tx_gain)) if tx_gain > 31: print_warning("Max gain is usually 31 dB. %d dB selected"%tx_gain) if not (int(rate) in USRP_accepted_rates): print_warning("Requested rate will cause additional CIC filtering in USRP firmware. To avoid this use one of these rates (MHz): "+str(USRP_accepted_rates)) if RF is None: print_warning("Assuming tones are in absolute units (detector bandwith)") # Calculate the optimal RF central frequency RF = np.mean(tones) tones = np.asarray(tones) - RF print_debug("RF central frequency will be %.2f MHz"%(RF/1e6)) if amplitudes is not None: if len(amplitudes) != len(tones): print_warning("Amplitudes profile length is %d and it's different from tones array length that is %d. Amplitude profile will be ignored."%(len(amplitudes),len(tones))) used_DAC_range = np.sum(amplitudes) print_debug("Using %.1f percent of DAC range"%(used_DAC_range*100)) else: print_debug("Using 100 percent of the DAC range.") amplitudes = [1. / len(tones) for x in tones] if Front_end is None: Front_end = 'A' if not Front_end_chk(Front_end): err_msg = "Cannot detect front_end: "+str(Front_end) print_error(err_msg) raise ValueError(err_msg) else: TX_frontend = Front_end+"_TXRX" RX_frontend = Front_end+"_RX2" if delay is None: try: delay = LINE_DELAY[str(int(rate/1e6))] delay *= 1e-9 except KeyError: print_warning("Cannot find associated line delay for a rate of %d Msps. Setting to 0s."%(int(rate/1e6))) delay = 0 else: print_debug("Using a delay of %d ns" % int(delay*1e9)) if mode == "PFB": # Calculate the number of channel needed if len(tones)>1: min_required_space = np.min([ x for x in np.abs([[i-j for j in tones] for i in tones]).flatten() if x > 0]) print_debug("Minimum bin width required is %.2f MHz"%(min_required_space/1e6)) min_required_fft = int(np.ceil(float(rate) / float(min_required_space))) else: min_required_fft = 10 if decimation is not None: if decimation < min_required_fft: print_warning("Cannot use a decimation factor of %d as the minimum required number of bin in the PFB is %d" % (decimation, min_required_fft)) final_fft_bins = min_required_fft else: final_fft_bins = int(decimation) else: final_fft_bins = int(min_required_fft) if final_fft_bins<10: # Less that 10 pfb bins cause a bottleneck in the GPU: too many instruction to fetch. final_fft_bins = 10 print_debug("Using %d PFB channels"%final_fft_bins) for i in range(len(tones)): if tones[i] > rate / 2: print_error("Out of bandwidth tone!") raise ValueError("Out of bandwidth tone requested. %.2f MHz / %.2f MHz (Nyq)" %(tones[i]/1e6, rate / 2e6) ) tones = quantize_tones(tones, rate, final_fft_bins) number_of_samples = rate * measure_t print("Tone [MHz]\tPower [dBm]\tOffset [MHz]") for i in range(len(tones)): print("%.1f\t%.2f\t%.3f" % ((RF + tones[i]) / 1e6, USRP_power + 20 * np.log10(amplitudes[i]), tones[i] / 1e6)) expected_samples = int(number_of_samples/final_fft_bins) noise_command = global_parameter() noise_command.set(TX_frontend, "mode", "TX") noise_command.set(TX_frontend, "buffer_len", 1e6) noise_command.set(TX_frontend, "gain", tx_gain) noise_command.set(TX_frontend, "delay", 1) noise_command.set(TX_frontend, "samples", number_of_samples) noise_command.set(TX_frontend, "rate", rate) noise_command.set(TX_frontend, "bw", 2 * rate) #noise_command.set(TX_frontend, 'tuning_mode', 0) noise_command.set(TX_frontend, "wave_type", ["TONES" for x in tones]) noise_command.set(TX_frontend, "ampl", amplitudes) noise_command.set(TX_frontend, "freq", tones) noise_command.set(TX_frontend, "rf", RF) # This parameter does not have an effect (except suppress a warning from the server) noise_command.set(TX_frontend, "fft_tones", 100) noise_command.set(RX_frontend, "mode", "RX") #noise_command.set(RX_frontend, 'tuning_mode', 0) noise_command.set(RX_frontend, "buffer_len", 1e6) noise_command.set(RX_frontend, "gain", 0) noise_command.set(RX_frontend, "delay", 1 + delay) noise_command.set(RX_frontend, "samples", number_of_samples) noise_command.set(RX_frontend, "rate", rate) noise_command.set(RX_frontend, "bw", 2 * rate) noise_command.set(RX_frontend, "wave_type", ["TONES" for x in tones]) noise_command.set(RX_frontend, "freq", tones) noise_command.set(RX_frontend, "rf", RF) noise_command.set(RX_frontend, "fft_tones", final_fft_bins) noise_command.set(RX_frontend, "pf_average", pf_average) # With the polyphase filter the decimation is realized increasing the number of channels. # This parameter will average in the GPU a certain amount of PFB outputs. noise_command.set(RX_frontend, "decim", 0) elif mode =="DIRECT": buffer_len = int(1e6) decimation = int(decimation) number_of_samples = rate * measure_t if decimation != 0: expected_samples = int(float(number_of_samples)/decimation) if buffer_len % decimation != 0: error_msg = "Cannot use a decimation factor of %d with a buffer len of %d as decimation % buffer_len must be 0" print_error(error_mas) raise ValueError(error_msg) else: expected_samples = int(number_of_samples) print_debug("GPU FIR filter disabled") if trigger is not None: expected_samples = None #Quantize tones to 1Hz resolution tones = [int(tt) for tt in tones] noise_command = global_parameter() noise_command.set(TX_frontend, "mode", "TX") noise_command.set(TX_frontend, "buffer_len", buffer_len) noise_command.set(TX_frontend, "gain", tx_gain) noise_command.set(TX_frontend, "delay", 1) noise_command.set(TX_frontend, "samples", number_of_samples) noise_command.set(TX_frontend, "rate", rate) noise_command.set(TX_frontend, "bw", 2 * rate) #noise_command.set(TX_frontend, 'tuning_mode', 0) noise_command.set(TX_frontend, "wave_type", ["TONES" for x in tones]) noise_command.set(TX_frontend, "ampl", amplitudes) noise_command.set(TX_frontend, "freq", tones) noise_command.set(TX_frontend, "rf", RF) # This parameter does not have an effect (except suppress a warning from the server) noise_command.set(TX_frontend, "fft_tones", 100) noise_command.set(RX_frontend, "mode", "RX") #noise_command.set(RX_frontend, 'tuning_mode', 0) noise_command.set(RX_frontend, "buffer_len", buffer_len) noise_command.set(RX_frontend, "gain", 0) noise_command.set(RX_frontend, "delay", 1 + delay) noise_command.set(RX_frontend, "samples", number_of_samples) noise_command.set(RX_frontend, "rate", rate) noise_command.set(RX_frontend, "bw", 2 * rate) noise_command.set(RX_frontend, "wave_type", ["DIRECT" for x in tones]) noise_command.set(RX_frontend, "freq", tones) noise_command.set(RX_frontend, "rf", RF) noise_command.set(RX_frontend, "fft_tones", 0) # in this case this value is discarded noise_command.set(RX_frontend, "pf_average", pf_average) noise_command.set(RX_frontend, "decim", decimation) if noise_command.self_check(): if (verbose): print_debug("Noise command successfully checked") noise_command.pprint() Async_send(noise_command.to_json()) else: err_msg = "Something went wrong in the noise acquisition command self_check()" print_error(err_msg) raise ValueError(err_msg) Packets_to_file( parameters=noise_command, timeout=None, filename=output_filename, dpc_expected=expected_samples, meas_type="Noise", push_queue = push_queue, trigger = trigger, **kwargs ) print_debug("Noise acquisition terminated.") return output_filename
[docs]def spec_from_samples(samples, sampling_rate=1, welch=None, dbc=False, rotate=True, verbose=True, clip_samples = False): ''' Calculate real and imaginary part of the spectra of a complex array using the Welch method. Arguments: - Samples: complex array representing samples. - sampling_rate: sampling_rate - welch: in how many segment to divide the samples given for applying the Welch method - dbc: scales samples to calculate dBc spectra. - rotate: if True rotate the IQ plane Returns: - Frequency array, - Imaginary spectrum array - Real spectrum array ''' if verbose: print_debug("[Welch worker]") try: L = len(samples) except TypeError: print_error("Expecting complex array for spectra calculation, got something esle.") return None, None, None if welch == None: welch = L else: welch = int(L / welch) if not clip_samples: end_clip_samples = len(samples) start_clip_samples = 0 else: end_clip_samples = int(len(samples) - clip_samples) start_clip_samples = int(clip_samples) if verbose: print_debug("Selecting from %d sample to %d sample" % (start_clip_samples, end_clip_samples)) if rotate: samples = samples * (np.abs(np.mean(samples)) / np.mean(samples)) if dbc: samples = samples / np.mean(samples) samples = samples - np.mean(samples) Frequencies, RealPart = signal.welch(samples[start_clip_samples:end_clip_samples].real, nperseg=welch, fs=sampling_rate, detrend='linear',scaling='density') Frequencies, ImaginaryPart = signal.welch(samples[start_clip_samples:end_clip_samples].imag, nperseg=welch, fs=sampling_rate, detrend='linear',scaling='density') return Frequencies, 10 * np.log10(RealPart), 10 * np.log10(ImaginaryPart)
[docs]def calculate_noise(filename, welch=None, dbc=False, rotate=True, usrp_number=0, ant=None, verbose=False, clip=0.1): ''' Generates the FFT of each channel stored in the .h5 file and stores the results in the same file. :param welch: in how many segment to divide the samples given for applying the Welch method. :param dbc: scales samples to calculate dBc spectra. :param rotate: if True rotate the IQ plane. TODO: * Default behaviour should be getting all the available RX antenna. ''' print("Calculating noise spectra for " + filename) if verbose: print_debug("Reading attributes...") filename = format_filename(filename) parameters = global_parameter() parameters.retrive_prop_from_file(filename) if ant is None: ant = parameters.get_active_rx_param() else: ant = to_list_of_str(ant) if len(ant) > 1: print_error("multiple RX devices not yet supported") return active_RX_param = parameters.parameters[ant[0]] try: if active_RX_param['wave_type'][0] == "DIRECT": if (active_RX_param['decim']>0): sampling_rate = float(active_RX_param['rate']) / active_RX_param['decim'] else: sampling_rate = float(active_RX_param['rate']) else: sampling_rate = float(active_RX_param['rate']) / active_RX_param['fft_tones'] if active_RX_param['decim']>1: sampling_rate /= float(active_RX_param['decim']) except TypeError: print_warning("Parameters passed to spectrum evaluation are not valid. Sampling rate = 1") sampling_rate = 1 except ZeroDivisionError: print_warning("Parameters passed to spectrum evaluation are not valid. Sampling rate = 1") sampling_rate = 1 if clip is not False: clip_samples = int(clip * sampling_rate) else: clip_samples = None f, samples, errors = openH5file( filename, ch_list=None, start_sample=clip_samples, # ignored last_sample=None, # ignored usrp_number=usrp_number, front_end=None, # this will change for double RX verbose=verbose, error_coord=True, big_file=True ) if len(errors) > 0: print_error("Cannot evaluate spectra of samples containing transmission error") return if verbose: print_debug("Calculating spectra...") Results = Parallel(n_jobs=min(N_CORES,3), verbose=1, backend=parallel_backend)( delayed(spec_from_samples)( i, sampling_rate=sampling_rate, welch=welch, dbc=dbc, rotate=rotate, clip_samples = clip_samples, verbose=verbose ) for i in samples ) f.close() if verbose: print_debug("Saving result on file " + filename + " ...") fv = h5py.File(filename, 'r+') noise_group_name = "Noise" + str(int(usrp_number)) try: noise_group = fv.create_group(noise_group_name) except ValueError: noise_group = fv[noise_group_name] try: noise_subgroup = noise_group.create_group(ant[0]) except ValueError: print_warning("Overwriting Noise subgroup %s in h5 file" % ant[0]) del noise_group[ant[0]] noise_subgroup = noise_group.create_group(ant[0]) if welch is None: welch = 0 noise_subgroup.attrs.create(name="welch", data=welch) noise_subgroup.attrs.create(name="dbc", data=dbc) noise_subgroup.attrs.create(name="rotate", data=rotate) noise_subgroup.attrs.create(name="rate", data=sampling_rate) noise_subgroup.attrs.create(name="n_chan", data=len(Results)) noise_subgroup.create_dataset("freq", data=Results[0][0], compression=H5PY_compression) for i in range(len(Results)): tone_freq = active_RX_param['rf'] + active_RX_param['freq'][i] ds = noise_subgroup.create_dataset("real_" + str(i), data=Results[i][1], compression=H5PY_compression, dtype=np.dtype('Float32')) ds.attrs.create(name="tone", data=tone_freq) ds = noise_subgroup.create_dataset("imag_" + str(i), data=Results[i][2], compression=H5PY_compression, dtype=np.dtype('Float32')) ds.attrs.create(name="tone", data=tone_freq) print_debug("calculate_noise_spec() done.") fv.close()
[docs]def plot_noise_spec(filenames, channel_list=None, max_frequency=None, title_info=None, backend='matplotlib', cryostat_attenuation=0, auto_open=True, output_filename=None, **kwargs): ''' Plot the noise spectra of given, pre-analized, H5 files. Arguments: - filenames: list of strings containing the filenames. - channel_list: - max_frequency: maximum frequency to plot. - title_info: add a custom line to the plot title - backend: see plotting backend section for informations. - auto_open: open the plot in default system browser if plotly backend is selected (non-blocking) or open the matplotlib figure (blocking). Default is True. - output_filename: output filename without any system extension. Default is Noise_timestamp().xx - kwargs: * usrp_number and front_end can be passed to the openH5file() function. * tx_front_end can be passed to manually determine the tx frontend to calculate the readout power. * add_info could be a list of the same length oF filenames containing additional legend information. * html will make the function retrn html code instead of saving a html file in case of plotly backend. * fig_size: matplotlib fig size in inches (xx,yy). :return the name of the file saved ''' filenames = to_list_of_str(filenames) if not (backend in ['matplotlib', 'plotly']): err_msg = "Cannot plot noise with backend \'%s\': not implemented"%backend print_error(err_msg) raise ValueError(err_msg) try: fig_size = kwargs['figsize'] except KeyError: fig_size = None try: html = kwargs['html'] except KeyError: html = False if len(filenames)>1: print("Plotting noise from files:") for f in filenames: print("\t%s"%f) else: print("Plotting noise from file %s ..."%filenames[0]) add_info_labels = None try: add_info_labels = kwargs['add_info'] if len(add_info_labels) != len(filenames): print_warning("Cannot add info labels on noise plot. len(add_info_labels)(%d)!=len(filenames)(%d)"%(len(add_info_labels),len(filenames))) add_info_labels = None except KeyError: pass if output_filename is None: output_filename = "Noise_" if len(filenames)>1: output_filename+="compare_" output_filename += get_timestamp() plot_title = 'USRP Noise spectra from ' if len(filenames) < 2: plot_title += "file: " + filenames[0] + "." else: plot_title += "multiple files." if backend == 'matplotlib': fig, ax = pl.subplots(nrows=1, ncols=1) if fig_size is None: fig_size = (16, 10) fig.set_size_inches(fig_size[0], fig_size[1]) ax.set_xlabel("Frequency [Hz]") elif backend == 'plotly': fig = tools.make_subplots(rows=1, cols=1) fig['layout']['xaxis1'].update(title="Frequency [Hz]")#), type='log') y_name_set = True rate_tag_set = True try: usrp_number = kwargs['usrp_number'] except KeyError: usrp_number = None try: front_end = kwargs['front_end'] except KeyError: front_end = None try: tx_front_end = kwargs['tx_front_end'] except KeyError: tx_front_end = None f_count = 0 for filename in filenames: info, freq, real, imag = get_noise( filename, usrp_number=usrp_number, front_end=front_end, channel_list=channel_list ) if max_frequency is not None: for ii in range(len(imag)): index_cut = find_nearest(freq, max_frequency) index_cut = np.min([len(freq),len(real[ii]),index_cut]) freq = freq[:index_cut] imag[ii] = imag[ii][:index_cut] real[ii] = real[ii][:index_cut] if y_name_set: y_name_set = False if backend == 'matplotlib': if info['dbc']: ax.set_ylabel("PSD [dBc/Hz]") else: ax.set_ylabel("PSD [dBm/Hz]") elif backend == 'plotly': if info['dbc']: fig['layout']['yaxis1'].update(title="PSD [dBc/Hz]") else: fig['layout']['yaxis1'].update(title="PSD [dBm/Hz]") if rate_tag_set: rate_tag_set = False if info['rate'] / 1e6 > 1.: plot_title += "Effective rate: %.2f Msps" % (info['rate'] / 1e6) else: plot_title += "Effective rate: %.2f ksps" % (info['rate'] / 1e3) for i in range(len(info['tones'])): readout_power = get_readout_power(filename, i, tx_front_end, usrp_number) - cryostat_attenuation label = filename+"<br>" label += "%.2f MHz" % (info['tones'][i] / 1e6) R = real[i] I = imag[i] if backend == 'matplotlib': label += "\nReadout pwr %.1f dBm" % (readout_power) if add_info_labels is not None: label += "\n" + add_info_labels[f_count] ax.semilogx(freq, R, '--', color=get_color(f_count + i), label="Real " + label) ax.semilogx(freq, I, color=get_color(f_count + i), label="Imag " + label) elif backend == 'plotly': label += "<br>Readout pwr %.1f dBm" % (readout_power) if add_info_labels is not None: label += "<br>" + add_info_labels[f_count] fig.append_trace(go.Scatter( x=freq, y=R, name="Real " + label, legendgroup="group" + str(i) + "file" + str(f_count), line=dict(color=get_color(f_count + i)), mode='lines' ), 1, 1) fig.append_trace(go.Scatter( x=freq, y=I, name="Imag " + label, legendgroup="group" + str(i) + "file" + str(f_count), line=dict(color=get_color(f_count + i), dash='dot'), mode='lines' ), 1, 1) # increase file counter f_count += 1 if backend == 'matplotlib': if title_info is not None: plot_title += "\n" + title_info fig.suptitle(plot_title) ax.legend(bbox_to_anchor=(1.04, 1), loc="upper left") formatter0 = EngFormatter(unit='Hz') ax.xaxis.set_major_formatter(formatter0) ax.grid(True) output_filename += '.png' print_debug("Saving %s..."%output_filename) fig.savefig(output_filename, bbox_inches="tight") pl.close(fig) elif backend == 'plotly': if title_info is not None: plot_title += "<br>" + title_info fig['layout'].update(title=plot_title) output_filename += ".html" if html: print_debug("Noise plotting done") return plotly.offline.plot(fig, filename=output_filename, auto_open=False, output_type = 'div') plotly.offline.plot(fig, filename=output_filename, auto_open=auto_open) print_debug("Noise plotting done") return output_filename
[docs]def calculate_frequency_timestream(noise_frequency, noise_data, fit_param): """ Convert IQ timestreams into frequency and quality factor timestreams. Derived from Albert's function to convert noise data in f0 stream data. The original function has been stripped of the matplotlib capabilities and adapted to the scope of this library. Arguments: - noise_frequency: float, Noise acquisition tone in Hz. - noise_data: list of complex, Noise data already scaled as S21 (see diagnosic() function). - fit_param: if fit parameters are given in the form (f0, A, phi, D, Qi, Qr, Qe_re, Qe_im,a, _, _, pcov), the fit won't be executed again. Returns: - X noise - Qr noise """ try: f0, A, phi, D, Qi, Qr, Qe_re, Qe_im, a = fit_param except: err_msg = "Fit parameter given to calculate_frequency_timestream() are not good." print_error(err_msg) raise ValueError(err_msg) Qe = Qe_re + 1.j*Qe_im dQe = 1./Qe f0 *= 1e6 #1. Remove the cable phase and the amplitude scaling from the time streams n_amplitude = A * np.exp(2.j*np.pi*(1e-6*D*(noise_frequency -f0) + phi)) #print "noise offet: %.2f: "%np.abs(n_amplitude) noise_data /= n_amplitude qrx_noise = dQe/(1.-noise_data) return f0*qrx_noise.imag/2., 1./qrx_noise.real
[docs]def copy_resonator_group(VNA_filename, NOISE_filename): ''' Copy the resonator groups from a VNA file to a mnoise file. Arguments: - VNA_filename: name of the file containing the resonator group (can also be an other noise file). - NOISE_filename: name of the file in which to copy the resonator group. If an other resonator group is in place, it will be rewrited. Returns: - None ''' VNA_filename = format_filename(VNA_filename) resonator_grp_name = "Resonators" VNA_fv = h5py.File(VNA_filename, 'r') if resonator_grp_name not in VNA_fv.keys(): err_msg = 'VNA file:%s does not contain the Resonators group'%VNA_filename print_error(err_msg) raise ValueError(err_msg) NOISE_filename = format_filename(NOISE_filename) NOISE_fv = h5py.File(NOISE_filename, 'r+') print_debug("Copying resonator group from \'%s\' to \'%s\' ..."%(VNA_filename,NOISE_filename)) resonator_grp = VNA_fv[resonator_grp_name] try: NOISE_fv_reso = NOISE_fv.create_group(resonator_grp_name) except ValueError: print_warning("Overwriting Noise subgroup %s in h5 file" % resonator_grp_name) del NOISE_fv[resonator_grp_name] else: del NOISE_fv[resonator_grp_name] # noise_resonator_group = noise_group.create_group(resonator_group_name) NOISE_fv.copy(resonator_grp, NOISE_fv) VNA_fv.close() NOISE_fv.close() return
[docs]def get_frequency_timestreams(NOISE_filename, start = None, end = None, channel_freq = None, frontend = None): ''' Returns the frequency and quality factor timestreams from a noise file in which a resonator group has been already copied. To copy the resonator group refer to copy_resonator_group() function. Arguments: - NOISE_filename: Name of the noise file. - start: start time in seconds. Default is from the beginning of the file. - end: end of the data in seconds. Default is up to file's end. - channel_freq: list of frequency of the channels to return. Default is all of them. - frontend: from which frontend to take the noise data. Default is A. Returns: - tuple containing frequency timestreams and quality factor timestreams. Each element of the tuple is a list of timestreams. Example: >>> frequencies, Q_factors = get_frequency_timestreams("noisefile.h5", start = 1, end = 1.5, channel_freq = 325.5): >>> # This will retrive frequency and quality factor timestreams of the 325.5 MHz channel (or closest) from the file "noisefile.h5" between 1 and 1.5 seconds of acquisition. ''' NOISE_filename = format_filename(NOISE_filename) print_debug("Opening file \'%s\'..."%NOISE_filename) if frontend is not None: if frontend == 'A': ant = "A_RX2" elif frontend == 'B': ant = "B_RX2" else: err_msg = "cannot recognize frontend code \'%s\' in get_frequency_timestreams()"%frontend print_error(err_msg) raise ValueError(err_msg) else: ant = frontend info = get_rx_info(NOISE_filename, ant=ant) last_sample = None if start is not None: if info['wave_type'] == "TONES": decimation_factor = info['fft_tones']/max(info['decim'],1.) else: decimation_factor = info['decim'] time_conv = float(info['rate'])/decimation_factor start_sample = time_conv*start if end is not None: last_sample = time_conv*end else: start_sample = 0 tones = np.asarray(info['freq'])+info['rf'] if channel_freq is not None: print_debug("Channel selected: ") numeric_channel_list = [] for x in channel_freq: j = find_nearest(tones,x) numeric_channel_list.append(j) print_debug("%d) %.2f MHz"%(len(numeric_channel_list),tones[j]/1e6)) else: numeric_channel_list = channel_freq params = get_fit_param(NOISE_filename, verbose = False) data = openH5file(NOISE_filename, ch_list=numeric_channel_list, start_sample=start_sample, last_sample=last_sample, usrp_number=None, front_end=frontend, verbose=False, error_coord=False, big_file = False) result_f = [] result_q = [] for i in range(len(params)): # f0, A, phi, D, Qi, Qr, Qe_re, Qe_im,a fit_param = (params[i]['f0'], params[i]['A'], params[i]['phi'], params[i]['D'], params[i]['Qi'], params[i]['Qr'], np.real(params[i]['Qe']),np.imag(params[i]['Qe']),params[i]['a']) f_ts, q_ts = calculate_frequency_timestream(tones[i], data[i], fit_param) result_f.append(f_ts - np.mean(f_ts)) result_q.append(q_ts - np.mean(q_ts)) return result_f, result_q
[docs]def plot_frequency_timestreams(filenames, decimation=None, displayed_samples=None, low_pass=None, backend='matplotlib', output_filename=None, channel_list=None, start_time=None, end_time=None, auto_open=True, **kwargs): ''' Plot frequency timestreams from given H5 noise files. Arguments: - a list of strings containing the files to plot. - decimation: eventually deciamte the signal before plotting. - displayed_samples: calculate decimation to display a certain number of samples. - low pass: floating point number controlling the cut-off frequency of a low pass filter that is eventually applied to the data. - backend: [string] choose the return type of the plot. Allowed backends for now are: * matplotlib: creates a matplotlib figure, plots in non-blocking mode and return the matplotlib figure object. kwargs in this case accept: - size: size of the plot in the form of a tuple (inches,inches). Default is matplotlib default. * plotly: plot using plotly and webgl interface, returns the html code descibing the plot. kwargs in this case accept: - size: size of the plot. Default is plotly default. * bokeh: use bokeh to generate an interactive html file containing the IQ plane and the magnitude/phase timestream. - output_filename: string: name of the file saved. Default is a timestamp. - channel_list: select only al list of channels to plot. - start_time: time where to start plotting. Default is 0. - end_time: time where to stop plotting. Default is end of the measure. - auto_open: open the plot in default system browser if plotly backend is selected (non-blocking). Default is True. - kwargs: * usrp_number and front_end can be passed to the openH5file() function. * fig_size: the size of matplotlib figure. * add_info: list of strings as long as the file list to add info to the legend. Returns: - the complete name of the saved file or None in case no file is saved. Note: - Possible errors are signaled on the plot. ''' downsample_warning = True overwriting_decim_waring = True try: fig_size = kwargs['figsize'] except KeyError: fig_size = None if output_filename is None: output_filename = "USRP_freq_timestream_"+get_timestamp() try: add_info_labels = kwargs['add_info'] except KeyError: add_info_labels = None plot_title = 'USRP frequency/Qr timestream ' if backend == 'matplotlib': fig, ax = pl.subplots(nrows=2, ncols=1, sharex=True) if fig_size is None: fig_size = (16, 10) fig.set_size_inches(fig_size[0], fig_size[1]) ax[1].set_xlabel("Time [s]") formatter0 = EngFormatter(unit='s') ax[1].xaxis.set_major_formatter(formatter0) formatter1 = EngFormatter(unit='') ax[0].set_ylabel("Frequency Shift [Hz]") ax[1].set_ylabel("Qr Shift [-]") ax[0].yaxis.set_major_formatter(formatter1) ax[1].yaxis.set_major_formatter(formatter1) elif backend == 'plotly': fig = tools.make_subplots(rows=2, cols=1, subplot_titles=('f0 timestream', 'Qr timestream'), shared_xaxes=True) fig['layout']['yaxis1'].update(title='Frequency Shift [Hz]') fig['layout']['yaxis2'].update(title='Qr Shift [-]') fig['layout']['xaxis'].update(exponentformat='SI') fig['layout']['xaxis1'].update(title='Time [s]') filenames = to_list_of_str(filenames) print_debug("Plotting from files:") for i in range(len(filenames)): print_debug("%d) %s" % (i, filenames[i])) try: usrp_number = kwargs['usrp_number'] except KeyError: usrp_number = None try: front_end = kwargs['front_end'] except KeyError: front_end = None file_count = 0 for filename in filenames: filename = format_filename(filename) parameters = global_parameter() parameters.retrive_prop_from_file(filename) ant = parameters.get_active_rx_param() if len(ant) > 1: print_error("multiple RX devices not yet supported") return freq = None if parameters.get(ant[0], 'wave_type')[0] == "TONES": decimation_factor = parameters.get(ant[0], 'fft_tones') freq = np.asarray(parameters.get(ant[0], 'freq')) + parameters.get(ant[0], 'rf') if parameters.get(ant[0], 'decim') != 0: decimation_factor *= parameters.get(ant[0], 'decim') effective_rate = parameters.get(ant[0], 'rate') / float(decimation_factor) elif parameters.get(ant[0], 'wave_type')[0] == "CHIRP": if parameters.get(ant[0], 'decim') != 0: effective_rate = parameters.get(ant[0], 'swipe_s')[0] / parameters.get(ant[0], 'chirp_t')[0] else: effective_rate = parameters.get(ant[0], 'rate') else: decimation_factor = max(1, parameters.get(ant[0], 'fft_tones')) if parameters.get(ant[0], 'decim') != 0: decimation_factor *= parameters.get(ant[0], 'decim') effective_rate = parameters.get(ant[0], 'rate') / float(decimation_factor) if start_time is not None: file_start_time = start_time * effective_rate else: file_start_time = 0 if end_time is not None: file_end_time = end_time * effective_rate else: file_end_time = None freq_ts, qr_ts = get_frequency_timestreams(filename, start = file_start_time, end = file_end_time, channel_freq = None, frontend = None ) # samples, errors = openH5file( # filename, # ch_list=channel_list, # start_sample=file_start_time, # last_sample=file_end_time, # usrp_number=usrp_number, # front_end=front_end, # verbose=False, # error_coord=True # ) #print_debug("plot_raw_data() found %d channels each long %d samples" % (len(samples), len(samples[0]))) if channel_list == None: ch_list = range(len(freq_ts)) else: if max(channel_list) > len(freq_ts): print_warning( "Channel list selected in plot_raw_data() is bigger than avaliable channels. plotting all available channels") ch_list = range(len(freq_ts)) else: ch_list = channel_list # prepare samples TODO for i in ch_list: Y1 = freq_ts[i] Y2 = qr_ts[i] if displayed_samples is not None: if decimation is not None and overwriting_decim_waring: print_warning("Overwriting offline decimation arguments with displayed_samples") overwriting_decim_waring = False decimation = int(len(freq_ts[i])/displayed_samples) if decimation <= 1 and downsample_warning: print_warning("Channel does not require decimation to reach the number of displayed samples") downsample_warning = False if decimation is not None and decimation > 1: decimation = int(np.abs(decimation)) Y1 = signal.decimate(Y1, decimation, ftype='fir') Y2 = signal.decimate(Y2, decimation, ftype='fir') else: decimation = 1 X = np.arange(len(Y1)) / float(effective_rate / decimation) + file_start_time / float(effective_rate) if effective_rate / 1e6 > 1: rate_tag = 'rate: %.2f Msps' % (effective_rate / 1e6) else: rate_tag = 'rate: %.2f ksps' % (effective_rate / 1e3) if freq is None: label = "Channel %d" % i else: label = "Channel %.2f MHz" % (freq[i] / 1.e6) if backend == 'matplotlib': label += "\n" + filename if add_info_labels is not None: label += "\n" + add_info_labels[file_count] ax[0].plot(X[decimation:-decimation], Y1[decimation:-decimation], color=get_color(i + file_count), label=label) ax[1].plot(X[decimation:-decimation], Y2[decimation:-decimation], color=get_color(i + file_count)) elif backend == 'plotly': label += "<br>" + filename if add_info_labels is not None: label += "<br>" + add_info_labels[file_count] fig.append_trace(go.Scatter( x=X[decimation:-decimation], y=Y1[decimation:-decimation], name=label, legendgroup="group" + str(i) + "file" + str(file_count), line=dict(color=get_color(i + file_count)), mode='lines' ), 1, 1) fig.append_trace(go.Scatter( x=X[decimation:-decimation], y=Y2[decimation:-decimation], # name = "channel %d"%i, showlegend=False, legendgroup="group" + str(i) + "file" + str(file_count), line=dict(color=get_color(i + file_count)), mode='lines' ), 2, 1) file_count += 1 final_filename = "" if backend == 'matplotlib': fig.suptitle(plot_title + "\n" + rate_tag) handles, labels = ax[0].get_legend_handles_labels() ax[0].legend(handles, labels, bbox_to_anchor=(1.04, 1), loc="upper left") ax[0].grid(True) ax[1].grid(True) final_filename = output_filename + '.png' fig.savefig(final_filename, bbox_inches="tight") pl.close(fig) if backend == 'plotly': final_filename = output_filename + ".html" fig['layout'].update(title=plot_title + "<br>" + rate_tag) plotly.offline.plot(fig, filename=final_filename, auto_open=auto_open) return final_filename
[docs]def diagnostic_VNA_noise(noise_filename, points = None, VNA_file = None, ant = "A_RX2", backend = 'matplotlib', **kwargs): ''' Plot the VNA traces and the noise (averaged or in N points) on the same plot to check for acquisition consistency. The noise file has to contain the Resonators group in order to use this function; to copy that from a VNA file use the function #copy_resonator_group(). :param noise_filename: noise acquisition filename. :param points: the default behaviour is to average every channel in a single point; if this argument is >1 the noise will be decimated in that number of points. :param VNA_file: if the fit information has to be taken from an external VNA file fill this argument with the filename. :param backend: Choose the plotting backend. Currently implemented: plotly and matplotlib :param ant: specify the antenna used to take noise data. Default is A_RX2 :param kwargs: * auto_open: plotly backend specific, determines if after saving the plot the browser is called. * figsize: matplotlib specific argument: figure size of the plot or each plot. TODO: allow this function to interpret multiple VNA sources and multiple noise files. ''' def db(value): return 20*np.log10(value) noise_filename = format_filename(noise_filename) print("Plotting diagnostic data from \'%s\'"%noise_filename) resonator_grp_name = "Resonators" info = get_rx_info(noise_filename, ant=ant) tx_info = get_tx_info(noise_filename, ant=ant.split('_')[0]+"_TXRX") noise_file = h5py.File(noise_filename, 'r') try: fig_size = kwargs['figsize'] except KeyError: fig_size = None #source fit data fit_source_filename = noise_filename extra_source_info = "" if VNA_file is not None: fit_source_filename = format_filename(VNA_file) extra_source_info = " Fit data taken from \'%s\'."%fit_source_filename print_debug(extra_source_info) fit_param = get_fit_param(fit_source_filename) fit_data = get_fit_data(fit_source_filename) #check backend existance if ((backend!='matplotlib') and (backend!='plotly')): err_msg = "backend %s not implemented in diagnostic_VNA_noise() function"%backend print_error(err_msg) raise ValueError(err_msg) #check Resonators group existance if resonator_grp_name not in noise_file.keys(): err_msg = "Cannot find the Resonator group in the file %s" % noise_filename print_error(err_msg) raise ValueError(err_msg) #check resonator group length matching if (len(fit_data) != len(info['wave_type'])): warning_msg = "The length of the resonator group (%d) does not match the number of tones (%d) in file %s."%(len(fit_data), len(info['wave_type']), fit_source_filename) print_warning(warning_msg) #check frequency matching tones = np.asarray(info['freq']) + info['rf'] fit_freqs = np.asarray([p['f0'] for p in fit_param]) #retrive calibrations calibrations = [(1./tx_info['ampl'][i])*USRP_calibration/(10**((USRP_power + tx_info['gain'])/20.)) for i in range(len(tx_info['ampl']))] #do averages print_debug("Averaging...") if points is None: noise_points = np.asarray([ np.mean(dataset) for dataset in noise_file['raw_data0'][ant]['data'] ]) else: decimation = int(np.shape(noise_file['raw_data0'][ant]['data'])[1]/noise_points) print_debug("Decimating %d"%decimation) noise_points = np.asarray([ signal.decimate(dataset,decimation,ftype="fir") for dataset in noise_file['raw_data0'][ant]['data'] ]) title = "Diagnostic plot: overlaying averaged noise acquisition and VNA traces" #plot print_debug("Plotting...") if backend == 'matplotlib': if fig_size is None: fig_size = (16, 10) diagnostic_folder_name = "Diagnostic_"+noise_filename.split(".")[0] try: os.mkdir(diagnostic_folder_name) except OSError: pass os.chdir(diagnostic_folder_name) for i in range(len(fit_data)): #Plot single channel data fig = pl.figure() fig.set_size_inches(fig_size[0], fig_size[1]) ax = fig.add_subplot(111) label = "Channel %.2f MHz"%(fit_param[i]['f0']) current_color = get_color(i) edge_color = 'k' if current_color == 'black': edge_color = 'grey' original_data = fit_data[i]['original'] ax.plot(original_data.real, original_data.imag , color = current_color, label = label, alpha = 0.4) ax.scatter(noise_points[i].real * calibrations[i], noise_points[i].imag * calibrations[i], color = current_color, edgecolors=edge_color,linewidth=2,s = 110) fig.suptitle(title) ax.set_aspect('equal','datalim') ax.legend() ax.grid() fig.savefig("diagnostic_channel_%d_IQ.png"%i) pl.close(fig) #plot phase and magnitude fig = pl.figure() fig.set_size_inches(fig_size[0], fig_size[1]) ax = fig.add_subplot(111) ax.plot(fit_data[i]["frequency"],db(np.abs(original_data)), label = "VNA data", color = current_color) if points is not None: freq_data = [tx_info["rf"] + tx_info["freq"][i] for tt in range(len(noise_points[i]))] magdata = db(np.abs(noise_points[i]) *calibrations[i]) diff_mag = np.mean(magdata) - vrms2dbm(np.abs(original_data)[find_nearest(fit_data[i]["frequency"], freq_data[0])]) else: freq_data = [tx_info["rf"] + tx_info["freq"][i]] magdata = [db(np.abs(noise_points[i]) * calibrations[i])] diff_mag = magdata[0] - db(np.abs(original_data)[find_nearest(fit_data[i]["frequency"], freq_data[0])]) ax.scatter(freq_data, magdata , label = "Averaged noise data", color = current_color, edgecolors=edge_color,linewidth=2,s = 110) ax.legend() ax.grid() fig.suptitle(label+"\n average discrepancy: %.2fdB"%diff_mag) fig.savefig("diagnostic_channel_%d_mag.png"%i) pl.close(fig) os.chdir("..") elif backend == 'plotly': pass else: err_msg = "%s backend not implemented in diagnostic function" % str(backend) print_error(err_msg) raise ValueError(err_msg) return
def calculate_NEF_spectra(): #copied from calculate_spec return def get_NEF_spec(): return def plot_NEF_spectra(): #plot_spec return