"""Contains object representations of all the necessary devices."""
import socket
from socket import AF_INET, SOCK_STREAM
import numpy as np
import random
from typing import List, Tuple
from visa import ResourceManager
from pyvisa.resources.gpib import GPIBInstrument
# SM125 Data Constants
WAVELEN_SCALE_FACTOR = 10000.0
AMP_SCALE_FACTOR = 100.0
[docs]class SM125(socket.socket):
"""UDP socket connection for SM125 device."""
def __init__(self, address, port, use_dev):
socket.setdefaulttimeout(3)
super().__init__(AF_INET, SOCK_STREAM)
if use_dev:
super().connect((address, port))
[docs] def get_data(self, dummy_value=False, num=0) -> Tuple[List[float], List[float], List[float]]:
"""
Returns the SM125 wavelengths, amplitudes, and number of elements recorded for each channel.
:param dummy_value: If true then make up fake values, otherwise query the SM125 for wavelength and power data
:param num: number of fake values to make up
:returns: Wavelength readings, Amplitude readings, Number of readings on each channel, if using fake values then
the third parameter is empty
"""
if dummy_value:
waves = []
amps = []
wave_start_num = 0
wave_end_num = 25
amp_start_num = 0
amp_end_num = -.5
for _ in range(num):
waves.append(random.uniform(1500+wave_start_num, 1500 + wave_end_num))
wave_start_num += 25
wave_end_num += 25
amps.append(random.uniform(-10+amp_start_num, -10 + amp_end_num))
amp_start_num += .25
amp_end_num += .25
return waves, amps, []
else:
self.send(b'#GET_PEAKS_AND_LEVELS')
pre_response = self.recv(10)
response = self.recv(int(pre_response))
chan_lens = np.frombuffer(response[:20], dtype='3uint32, 4uint16')[0][1]
total_peaks = sum(chan_lens)
wave_start_idx = 32
wave_end_idx = wave_start_idx + 4 * total_peaks
wavelengths = np.frombuffer(response[wave_start_idx:wave_end_idx], dtype=(str(total_peaks) + 'int32'))
amp_start_idx = wave_end_idx
amp_end_idx = amp_start_idx + 2 * total_peaks
amplitudes = np.frombuffer(response[amp_start_idx:amp_end_idx], dtype=(str(total_peaks) + 'int16'))
wavelengths_list = [en / WAVELEN_SCALE_FACTOR for en in wavelengths]
amplitudes_list = [en / AMP_SCALE_FACTOR for en in amplitudes]
return wavelengths_list[0], amplitudes_list[0], chan_lens
[docs]class Vidia(object):
"""
Vidia-Swept laser wrapper object.
:ivar pyvisa.resources.gpib.GPIBInstrument device: PyVisa GPIB connection to the device
"""
def __init__(self, loc: str, manager: ResourceManager, use_dev: bool):
"""
Create a visa connection using loc and manager to the Vidia-Swept laser.
:param loc: the GPIB location of the laser
:param manager: the PyVisa resource manager
:param use_dev: if True connect to laser
"""
self.device = None
if use_dev:
self.device = manager.open_resource(loc) # type: GPIBInstrument
[docs] def start_scan(self):
"""Starts the scanning process for the laser."""
self.device.query(":OUTP ON")
self.device.query(":OUTP:TRAC OFF")
self.device.query(":OUTP:SCAN:STAR -1")
[docs] def wave_info(self):
"""Returns wavelength information from the laser."""
return self.device.query(":WAVE MIN?"), self.device.query(":WAVE?"), self.device.query(":WAVE MAX?")
[docs]class Oven(object):
"""
Delta oven object, uses pyvisa.
:ivar pyvisa.resources.gpib.GPIBInstrument device: PyVisa GPIB connection to the device
"""
def __init__(self, loc: str, manager: ResourceManager, use_dev: bool):
"""
Opens a GPIB connection with the device at the specified location.
:param loc: the location of the device
:param manager: the PyVisa Resource Manager
:param use_dev: if True connect to the device
"""
self.device = None
if use_dev:
self.device = manager.open_resource(loc, read_termination="\n", open_timeout=2500) # type: GPIBInstrument
[docs] def set_temp(self, temp: float):
"""
Sets set point of delta oven.
:param temp: Temperature to set the oven to
"""
self.device.query('S {}'.format(temp))
[docs] def heater_on(self):
"""Turns oven heater on."""
self.device.query('H ON')
[docs] def heater_off(self):
"""Turns oven heater off."""
self.device.query('H OFF')
[docs] def cooling_on(self):
"""Turns oven cooling on."""
self.device.query('C ON')
[docs] def cooling_off(self):
"""Turns oven cooling off."""
self.device.query('C OFF')
[docs] def close(self):
"""Closes the resource."""
if self.device is not None:
self.device.close()
[docs]class OpSwitch(socket.socket):
"""Object representation of the Optical Switch needed for the program, overrides the socket module."""
def __init__(self, addr, port, use_dev):
"""
Connects to the specified address, and port using a socket connection.
:param addr: IP address of the optical switch
:param port: Port of the optical switch
:param use_dev: If true connect to the device
"""
socket.setdefaulttimeout(3)
if use_dev:
super().__init__(AF_INET, SOCK_STREAM)
super().connect((addr, port))
[docs] def set_channel(self, chan: int):
"""
Sets the channel on the optical switch.
:param chan: channel to set the optical switch to
"""
msg = "<OSW{}_OUT_{}>".format(format(int(1), '02d'), format(int(chan), '02d'))
self.send(msg.encode())
[docs]class TempController(object):
"""
Object representation of the Temperature Controller needed for the program.
:ivar pyvisa.resources.gpib.GPIBInstrument device: PyVisa GPIB connection to the device.
"""
def __init__(self, loc, manager, use_dev):
"""
Establishes a GPIB connection with the temperature controller.
:param loc: the location of the instrument
:param manager: the PyVisa resource manager
:param use_dev: if True connect to the device
"""
self.device = None
if use_dev:
self.device = manager.open_resource(loc, read_termination='\n', open_timeout=2500) # type: GPIBInstrument
[docs] def get_temp_k(self, dummy_val=False, center_num=0):
"""
Return temperature reading in degrees Kelvin.
:param dummy_val: If true make up a temperature
:param center_num: The number the temperature is set to used for simulating the temp reading
"""
if dummy_val:
return float(random.gauss(center_num - 5, center_num + 5))
else:
query = self.device.query('KRDG? B')
return float(query[:-4])
[docs] def close(self):
"""Close the device connection."""
if self.device is not None:
self.device.close()