Source code for NanoVNASaver.SweepWorker

#  NanoVNASaver
#
#  A python program to view and export Touchstone data from a NanoVNA
#  Copyright (C) 2019, 2020  Rune B. Broberg
#  Copyright (C) 2020,2021 NanoVNA-Saver Authors
#
#  This program is free software: you can redistribute it and/or modify
#  it under the terms of the GNU General Public License as published by
#  the Free Software Foundation, either version 3 of the License, or
#  (at your option) any later version.
#
#  This program is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU General Public License for more details.
#
#  You should have received a copy of the GNU General Public License
#  along with this program.  If not, see <https://www.gnu.org/licenses/>.
import logging
from enum import Enum
from time import sleep

import numpy as np
from PySide6.QtCore import QObject, QThread, Signal, Slot

from NanoVNASaver import NanoVNASaver

from .Calibration import correct_delay
from .Hardware.VNA import VNA
from .RFTools import Datapoint
from .Settings.Sweep import Sweep, SweepMode

logger = logging.getLogger(__name__)

VALUE_MAX: float = 9.5
RETRIES_RECONNECT: int = 5
RETRIES_MAX: int = 10


[docs] def truncate(values: list[list[complex]], count: int) -> list[list[complex]]: """truncate drops extrema from data list if averaging is active""" keep = len(values) - count logger.debug("Truncating from %d values to %d", len(values), keep) if count < 1 or keep < 1: logger.info("Not doing illegal truncate") return values truncated = [] for valueset in np.swapaxes(values, 0, 1).tolist(): avg = np.average(valueset) truncated.append( sorted(valueset, key=lambda v, a=avg: abs(a - v))[:keep] ) return np.swapaxes(truncated, 0, 1).tolist()
[docs] class WorkerSignals(QObject): updated = Signal() finished = Signal() sweep_error = Signal()
# TODO do we need it for QThread?
[docs] class SweepState(Enum): STOPPED = 0 RUNNING = 1
[docs] class SweepWorker(QThread): def __init__(self, app: NanoVNASaver) -> None: super().__init__() logger.info("Initializing SweepWorker") self.signals: WorkerSignals = WorkerSignals() self.app = app self.sweep = Sweep() #self.setAutoDelete(False) self.percentage: float = 0.0 self.data11: list[Datapoint] = [] self.data21: list[Datapoint] = [] self.rawData11: list[Datapoint] = [] self.rawData21: list[Datapoint] = [] self.init_data() self.state = SweepState.STOPPED self.error_message: str = "" self.offsetDelay: float = 0.0
[docs] @Slot() def quit(self) -> None: self.state = SweepState.STOPPED super().quit()
[docs] @Slot() def run(self) -> None: try: self._run() except BaseException as exc: # pylint: disable=broad-except self.state = SweepState.STOPPED logger.exception("%s", exc) self.gui_error(f"ERROR during sweep\n\nStopped\n\n{exc}") if logger.isEnabledFor(logging.DEBUG): raise exc
def _run(self) -> None: logger.info("Initializing SweepWorker") if not self.app.vna.connected(): logger.debug( "Attempted to run without being connected to the NanoVNA" ) return self.state = SweepState.RUNNING self.percentage = 0.0 sweep = self.app.sweep.copy() if sweep != self.sweep: # parameters changed self.sweep = sweep self.init_data() self._run_loop() if sweep.segments > 1: start = sweep.start end = sweep.end logger.debug( "Resetting NanoVNA sweep to full range: %d to %d", start, end ) self.app.vna.resetSweep(start, end) self.percentage = 100.0 logger.debug('Sending "finished" signal') self.signals.finished.emit() self.state = SweepState.STOPPED def _run_loop(self) -> None: sweep = self.sweep averages = ( sweep.properties.averages[0] if sweep.properties.mode == SweepMode.AVERAGE else 1 ) logger.info("%d averages", averages) while True: for i in range(sweep.segments): logger.debug("Sweep segment no %d", i) if self.state == SweepState.STOPPED: logger.debug("Stopping sweeping as signalled") break start, stop = sweep.get_index_range(i) freq, values11, values21 = self.read_averaged_segment( start, stop, averages ) self.percentage = (i + 1) * 100 / sweep.segments self.update_data(freq, values11, values21, i) if ( sweep.properties.mode != SweepMode.CONTINOUS or self.state == SweepState.STOPPED ): break
[docs] def init_data(self) -> None: self.data11: list[Datapoint] = [] self.data21: list[Datapoint] = [] self.rawData11: list[Datapoint] = [] self.rawData21: list[Datapoint] = [] for freq in self.sweep.get_frequencies(): self.data11.append(Datapoint(freq, 0.0, 0.0)) self.data21.append(Datapoint(freq, 0.0, 0.0)) self.rawData11.append(Datapoint(freq, 0.0, 0.0)) self.rawData21.append(Datapoint(freq, 0.0, 0.0)) logger.debug("Init data length: %s", len(self.data11))
[docs] def update_data( self, frequencies: list[int], values11: list[complex], values21: list[complex], index: int, ) -> None: # Update the data from (i*101) to (i+1)*101 logger.debug( "Calculating data and inserting in existing data at index %d", index ) offset = self.sweep.points * index raw_data11 = [ Datapoint(freq, values11[i].real, values11[i].imag) for i, freq in enumerate(frequencies) ] raw_data21 = [ Datapoint(freq, values21[i].real, values21[i].imag) for i, freq in enumerate(frequencies) ] data11, data21 = self.applyCalibration(raw_data11, raw_data21) logger.debug("update Freqs: %s, Offset: %s", len(frequencies), offset) for i in range(len(frequencies)): self.data11[offset + i] = data11[i] self.data21[offset + i] = data21[i] self.rawData11[offset + i] = raw_data11[i] self.rawData21[offset + i] = raw_data21[i] logger.debug( "Saving data to application (%d and %d points)", len(self.data11), len(self.data21), ) self.app.saveData(self.data11, self.data21) logger.debug('Sending "updated" signal') self.signals.updated.emit()
[docs] def applyCalibration( self, raw_data11: list[Datapoint], raw_data21: list[Datapoint] ) -> tuple[list[Datapoint], list[Datapoint]]: data11: list[Datapoint] = [] data21: list[Datapoint] = [] if not self.app.calibration.isCalculated: data11 = raw_data11.copy() data21 = raw_data21.copy() elif self.app.calibration.isValid1Port(): data11.extend( self.app.calibration.correct11(dp) for dp in raw_data11 ) else: data11 = raw_data11.copy() if self.app.calibration.isValid2Port(): for counter, dp in enumerate(raw_data21): dp11 = raw_data11[counter] data21.append(self.app.calibration.correct21(dp, dp11)) else: data21 = raw_data21 if self.offsetDelay != 0.0: data11 = [ correct_delay(dp, self.offsetDelay, reflect=True) for dp in data11 ] data21 = [correct_delay(dp, self.offsetDelay) for dp in data21] return data11, data21
[docs] def read_averaged_segment( self, start: int, stop: int, averages: int = 1 ) -> tuple[list[int], list[complex], list[complex]]: logger.info( "Reading from %d to %d. Averaging %d values", start, stop, averages ) freq: list[int] = [] values11: list[complex] = [] values21: list[complex] = [] for i in range(averages): if self.state == SweepState.STOPPED: logger.debug("Stopping averaging as signalled.") if averages == 1: break logger.warning("Stop during average. Discarding sweep result.") return [], [], [] logger.debug("Reading average no %d / %d", i + 1, averages) retries = RETRIES_RECONNECT tmp_11: list[complex] = [] tmp_21: list[complex] = [] while retries and not tmp_11: if retries < RETRIES_RECONNECT: logger.warning("retry readSegment(%s,%s)", start, stop) sleep(0.5) retries -= 1 freq, tmp_11, tmp_21 = self.read_segment(start, stop) if not tmp_11: raise IOError("Invalid data during swwep") values11.append(tmp_11) values21.append(tmp_21) self.percentage += 100 / (self.sweep.segments * averages) self.signals.updated.emit() if not values11: raise IOError("Invalid data during swwep") truncates = self.sweep.properties.averages[1] if truncates > 0 and averages > 1: logger.debug("Truncating %d values by %d", len(values11), truncates) values11 = truncate(values11, truncates) values21 = truncate(values21, truncates) logger.debug("Averaging %d values", len(values11)) values11: list[complex] = np.average(values11, axis=0).tolist() values21: list[complex] = np.average(values21, axis=0).tolist() return freq, values11, values21
[docs] def read_segment( self, start: int, stop: int ) -> tuple[list[int], list[complex], list[complex]]: logger.debug("Setting sweep range to %d to %d", start, stop) self.app.vna.setSweep(start, stop) frequencies = self.app.vna.read_frequencies() logger.debug("Read %s frequencies", len(frequencies)) values11 = self.read_data("data 0") values21 = self.read_data("data 1") if not len(frequencies) == len(values11) == len(values21): logger.info("No valid data during this run") frequencies = [] values11 = values21 = [] return frequencies, values11, values21
[docs] def read_data(self, data) -> list[complex]: logger.debug("Reading %s", data) vna: "VNA" = self.app.vna # shortcut to device retries = RETRIES_MAX while retries: retries -= 1 try: result = vna.readValues(data) logger.debug("Read %d values", len(result)) if vna.validateInput and any( abs(v) > VALUE_MAX for v in result ): logger.error("Got a non plausible data: (%s)", data) else: return result except ValueError as exc: logger.exception( "An exception occurred reading %s: %s", data, exc ) logger.error("Re-reading %s", data) sleep(0.2) vna.reconnect() logger.critical( "Tried and failed to read %s %s times. Giving up.", data, RETRIES_MAX, ) raise IOError( f"Failed reading {data} {RETRIES_MAX} times.\n" f"Data outside expected valid ranges," f" or in an unexpected format.\n\n" f"You can disable data validation on the" f"device settings screen." )
[docs] def gui_error(self, message: str) -> None: self.error_message = message self.state = SweepState.STOPPED self.signals.sweep_error.emit()