Source code for fbgui.cal_program

"""Contains the calibration program specific logic."""

from typing import List
import math
import time
import visa
import datetime
from uuid import UUID
from fbgui import file_helper as fh
from fbgui.constants import CAL, TEMP
from fbgui.program import Program, ProgramType
from fbgui.messages import MessageType, Message
from fbgui.main_program import Application


[docs]class CalProgram(Program): """Contains the logic specific for running a calibration program.""" def __init__(self, master: Application): """ Creates a new CalProgram object that overrides the Program class with the CAl program type. :param master: Application object representing the main gui. """ cal_type = ProgramType(CAL) super().__init__(master, cal_type)
[docs] def program_loop(self, thread_id: UUID): """ Runs the calibration main loop. :param thread_id: UUID of the thread this code is running in """ temps_arr = self.options.get_target_temps() self.cal_loop(temps_arr, thread_id) if self.master.thread_map[thread_id]: self.create_excel() self.pause_program()
[docs] def sleep(self, thread_id: UUID) -> bool: """ Sleeps for the configured temp_interval on the home screen. :param thread_id: UUID of the thread this code is running in :return: False if the program was paused, True otherwise """ start_time = time.time() while time.time() - start_time < self.options.temp_interval.get(): time.sleep(.5) if not self.master.thread_map[thread_id]: return False return True
[docs] def get_temp(self, thread_id: UUID) -> float: """ Takes the configured number of temperature readings and averages them. :param thread_id: UUID of the thread the code is currently running in :return: the averaged temperature """ avg_temp = 0. temp = None for _ in range(self.options.num_temp_readings.get()): while temp is None: try: self.master.conn_dev(TEMP, thread_id=thread_id) temp = float((self.master.temp_controller.get_temp_k())) avg_temp += temp except (AttributeError, visa.VisaIOError): self.temp_controller_error() temp = None self.disconnect_devices() return avg_temp/self.options.num_temp_readings.get()
[docs] def cal_loop(self, temps: List[float], thread_id: UUID): """ Runs the main calibration loop. :param temps: the list of temperatures to set the oven to :param thread_id: UUID of the code the thread is currently running in """ last_cycle_num = fh.get_last_cycle_num(self.options.file_name.get(), CAL) if last_cycle_num == self.options.num_cal_cycles.get(): self.master.main_queue.put(Message(MessageType.WARNING, "Calibration Program Complete", "The calibration program has already completed the specified {} cycles" .format(self.options.num_cal_cycles.get()))) for cycle_num in range(self.options.num_cal_cycles.get() - last_cycle_num): cycle_num += last_cycle_num if not self.master.thread_map[thread_id]: return temp = self.get_temp(thread_id) kwargs = {"force_connect": True, "thread_id": thread_id} if temp < float(temps[0]) + 274.15 - 5: kwargs["heat"] = True else: kwargs["cooling"] = True self.master.main_queue.put(Message(MessageType.INFO, text="Initializing cycle {} to start temperature {}K." .format(cycle_num+1, temps[0]+274.15-5), title=None)) start_init_time = time.time() if not self.master.thread_map[thread_id]: return self.set_oven_temp(temps[0] - 5, **kwargs) self.disconnect_devices() kwargs["force_connect"] = False while not self.reset_temp(temps[0], thread_id): if not self.sleep(thread_id): return self.set_oven_temp(temps[0] - 5, **kwargs) self.master.main_queue.put(Message(MessageType.INFO, text="Initializing cycle {} took {}.".format( cycle_num+1, str(datetime.timedelta(seconds=int(time.time()-start_init_time)))), title=None)) self.master.main_queue.put(Message(MessageType.INFO, text="Starting cycle {}.".format(cycle_num+1), title=None)) start_cycle_time = time.time() for temp in temps: if temp >= temps[0]: kwargs = {"temp": temp, "heat": True, "force_connect": True} else: kwargs = {"temp": temp, "heat": False, "cooling": True, "force_connect": True} self.set_oven_temp(**kwargs) self.disconnect_devices() if not self.sleep(thread_id): return kwargs["force_connect"] = False while not self.check_drift_rate(thread_id, cycle_num+1): if not self.sleep(thread_id): return self.set_oven_temp(**kwargs) self.master.main_queue.put(Message(MessageType.INFO, text="Cycle {} complete it ran for {}.".format( cycle_num+1, str(datetime.timedelta(seconds=int(time.time()-start_cycle_time)))), title=None))
[docs] def reset_temp(self, start_temp: float, thread_id: UUID) -> bool: """ Checks to see if the temperature is 5K below the starting temperature. :param start_temp: The first temperature the oven is set to :param thread_id: UUID of the thread the code is currently running in """ temp = self.get_temp(thread_id) if temp <= float(start_temp + 274.15) - 5: return True return False
[docs] def check_drift_rate(self, thread_id: UUID, cycle_num: int) -> bool: """ Checks if the drift rate is below the configured drift. :param thread_id: UUID of the thread the code is curretnly running in :param cycle_num: The number of the current calibration cycle :return: True if the drift rate is below the configured drift rate, otherwise False """ while True: try: self.master.conn_dev(TEMP, thread_id=thread_id) start_time = time.time() if not self.master.thread_map[thread_id]: return False start_temp = self.get_temp(thread_id) waves, amps = self.get_wave_amp_data(thread_id) if not self.master.thread_map[thread_id]: return False curr_temp = self.get_temp(thread_id) curr_time = time.time() self.disconnect_devices() drift_rate = math.fabs(start_temp - curr_temp) / math.fabs(start_time - curr_time) drift_rate *= 60000. if not self.master.thread_map[thread_id] and math.fabs(curr_temp - self.options.set_temp.get()) < .5: return False if drift_rate <= self.options.drift_rate.get(): fh.write_db(self.options.file_name.get(), self.snums, curr_time, curr_temp, waves, amps, CAL, self.table, self.master.main_queue, drift_rate, True, cycle_num) return True if not self.master.thread_map[thread_id]: return False fh.write_db(self.options.file_name.get(), self.snums, curr_time, curr_temp, waves, amps, CAL, self.table, self.master.main_queue, drift_rate, False, cycle_num) return False except (AttributeError, visa.VisaIOError): self.temp_controller_error()