from pymodbus.client.sync import ModbusTcpClient
import utils as ut
[docs]class Manifold:
"""
Provides a wrapper for the manifold, which is controlled by the Wago nModbus
"""
def __init__(self, ip_address, valvemap_path, read_offset=512):
self.client = ModbusTcpClient(ip_address)
self.read_offset = read_offset
self.valvemap = None
self.load_valvemap(valvemap_path)
[docs] def load_valvemap(self, valvemap_path):
"""
Stores valvemap.
To work with open/close, valvemap should have one column named 'valve'.
:param valvemap_path: (str) path to valvemap
"""
self.valvemap = ut.read_delim_pd(valvemap_path)
[docs] def read_valve(self, valve_num):
"""
Reads the state of the register associated with the specified valve.
:param valve_num: (int) register number to read
:return: () state of the register (True: depressurized, False: pressurized)
"""
register_num = valve_num + self.read_offset
return self.client.read_coils(register_num, 1).bits[0]
[docs] def pressurize(self, valve_num):
"""
Pressurizes valve at the specified register.
:param valve_num: (int) valve to pressurize
"""
state = self.read_valve(valve_num)
if state:
self.client.write_coil(valve_num, False)
[docs] def depressurize(self, valve_num):
"""
Depressurizes valve at the specified register.
:param valve_num: (int) valve to depressurize
"""
state = self.read_valve(valve_num)
if not state:
self.client.write_coil(valve_num, True)
[docs] def close(self, lookup_cols, lookup_vals):
"""
Finds lookup_vals in lookup_cols of valvemap; retrieves corresponding valve_num.
Closes valve_num.
:param lookup_cols: (str | list) column(s) to search in valvemap
:param lookup_vals: (val | list) value(s) to find in lookup_cols
"""
# TODO: default lookup_cols
valve_num = ut.lookup(self.valvemap, lookup_cols, lookup_vals)[['valve']].iloc[0]
self.pressurize(valve_num)
[docs] def open(self, lookup_cols, lookup_vals):
"""
Finds lookup_vals in lookup_cols of valvemap; retrieves corresponding valve_num.
Opens valve_num.
:param lookup_cols: (str | list) column(s) to search in valvemap
:param lookup_vals: (val | list) value(s) to find in lookup_cols
"""
valve_num = ut.lookup(self.valvemap, lookup_cols, lookup_vals)[['valve']].iloc[0]
self.depressurize(valve_num)
[docs] def exit(self):
"""
Closes the device's serial connection.
"""
self.client.close()