from acqpack import utils as ut
import time
import yaml
from ctypes import *
DLL_PATH = 'mfcs_64.dll'
ALPHA_DEFAULT = 1
# TODO:
# - config file vs chanmap_path
# - which device
# - kwargs/channel default in read, set, pid?
[docs]class Mfcs:
"""
Class to control the MFCS-EZ.
"""
def __init__(self, config_file, chanmap_path):
with open(config_file) as file:
self.config = yaml.load(file)
self.config['conversion_to_mbar'] = float(self.config['conversion_to_mbar']) # ensure conversion factor is float
self.dll = cdll.LoadLibrary(DLL_PATH) # load dll (i.e. MFCS API)
self.c_status = c_char() # placeholder for status
self.c_serial = c_ushort(0) # placeholder for serial number
self.detect()
self.connect()
self.load_chanmap(chanmap_path)
def __del__(self):
self.exit()
[docs] def detect(self):
"""
Detects up to 8 connected MFCS devices; returns serial numbers of connected devices.
:return: (list) detected MFCS serial numbers as ints
"""
c_table = (c_ushort*8)(0,0,0,0,0,0,0,0)
c_error = self.dll.mfcsez_detect(byref(c_table))
return [sn for sn in c_table if sn != 0]
[docs] def connect(self):
"""
Initializes the MFCS.
Makes connection, checks status, and sets the PID alpha parameter of all channels to 2.
"""
self.handle = self.dll.mfcsez_initialisation(self.config['serial_number'])
time.sleep(0.6) # wait to check serial
c_error = self.dll.mfcs_get_serial(self.handle, byref(self.c_serial))
if self.c_serial.value == 0: # serial==0 means no connection
print('Error: Could not connect to MFCS')
self.exit()
else:
print('MFCS initialized. SN: {}'.format(self.c_serial.value))
self.pid(0, ALPHA_DEFAULT)
s, status = self.status()
time.sleep(0.1)
if s != 1:
print('Warning: Connected to MFCS, but status not normal. Status {}: {}'.format(s, status))
[docs] def status(self):
"""
Gets and returns status of the MFCS.
0: 'MFCS is reset - press "Play"'
1: 'normal'
2: 'overpressure'
3: 'need to rearm'
:return: (tup) status int [0-3], status string
"""
statuses = {0: 'MFCS is reset - press "Play"',
1: 'normal',
2: 'overpressure',
3: 'need to rearm'}
c_error = self.dll.mfcs_get_status(self.handle, byref(self.c_status))
k = ord(self.c_status.value)
return k, statuses[k]
[docs] def pid(self, chan, a):
"""
Sets alpha parameter of the PID controller for the given channel.
Lower values of alpha (1-2) are typically more stable at lower pressures, but take slightly
longer to equilibrate.
For some reason, the python kernel would crash when 'channel' and 'alpha' were used
as keywords. C-types...
:param chan: (int) channel [1-4] to set; 0 sets for all channels
:param a: (int) desired alpha value for PID
"""
c_error = self.dll.mfcs_set_alpha(self.handle, chan, a)
[docs] def load_chanmap(self, chanmap_path):
"""
Stores channel map.
:param chanmap_path: (str) path to chanmap
"""
self.chanmap = ut.read_delim_pd(chanmap_path)
[docs] def exit(self):
"""
Safely closes the MFCS.
First closes device connection, then releases the DLL.
"""
try:
c_error = self.dll.mfcs_get_serial(self.handle, byref(self.c_serial))
if self.dll.mfcs_close(self.handle): # Close communication port
print('Closed connection to device with SN {}'.format(self.c_serial.value))
else:
print('Failed to close connection to device with SN {}'.format(self.c_serial.value))
except IOError:
print('MFCS connection error: {}'.format(c_error))
finally:
windll.kernel32.FreeLibrary(self.dll._handle) # Release the DLL
del self.dll
print('MFCS library released')
[docs] def set(self, lookup_cols, lookup_vals, pressure=0.0):
"""
Sets pressure of specified channel.
:param lookup_cols: (str | list) column(s) to search in chanmap
:param lookup_vals: (val | list) value(s) to find in lookup_cols
:param pressure: (float) desired pressure; units specified in config file
"""
channel = ut.lookup(self.chanmap, lookup_cols, lookup_vals)[['channel']].iloc[0]
channel = int(channel)
mbar = pressure * self.config['conversion_to_mbar']
c_error = self.dll.mfcs_set_auto(self.handle, channel, c_float(mbar))
[docs] def read(self, lookup_cols, lookup_vals):
"""
Reads current pressure of the channel.
:param lookup_cols: (str | list) column(s) to search in chanmap
:param lookup_vals: (val | list) value(s) to find in lookup_cols
:return: (float) current pressure; units specified in config file
"""
pressure = c_float()
timer = c_ushort()
channel = ut.lookup(self.chanmap, lookup_cols, lookup_vals)[['channel']].iloc[0]
channel = int(channel)
c_error = self.dll.mfcs_read_chan(self.handle, channel, pointer(pressure), pointer(timer))
mbar = pressure.value
return mbar/self.config['conversion_to_mbar']