#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: Mingyeong Yang (mingyeong@khu.ac.kr)
# @Date: 2021-10-03
# @Filename: Controller.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)
"""
isort:skip_file
"""
from __future__ import annotations
import datetime
import warnings
from pymodbus.client.asynchronous.async_io import AsyncioModbusTcpClient as ModbusClient
from sdsstools.logger import SDSSLogger
from lvmecp.exceptions import LvmecpControllerError, LvmecpControllerWarning
__all__ = ["PlcController", "Module"]
[docs]class PlcController:
"""Talks to an Plc controller over TCP/IP.
Parameters
----------
name
A name identifying this controller.
config
The configuration defined on the .yaml file under /etc/lvmecp.yml
log
The logger for logging
"""
def __init__(self, name: str, config: [], log: SDSSLogger):
self.name = name
self.log = log
self.config = config
modules = self.config_get("modules")
modules_list = list(modules.keys())
self.modules = [
Module(
name,
config,
self.config_get(f"modules.{module}.name"),
self.config_get(f"modules.{module}.mode"),
self.config_get(f"modules.{module}.channels"),
self.config_get(f"modules.{module}.description"),
)
for module in modules_list
]
self.host = self.config_get("host")
self.port = self.config_get("port")
self.addr = {}
self.unit = {}
for module in self.modules:
self.addr[module.name] = module.get_address()
if module.name == "hvac":
self.unit[module.name] = module.get_unit()
# print(self.addr)
# self.Client = None
[docs] async def start(self, *argv):
"""open the ModbusTCP connection with PLC"""
# connection
self.Client = None
try:
self.Client = ModbusClient(self.host, self.port)
await self.Client.connect()
connection = self.Client.protocol_made_connection
assert connection
print(connection)
except LvmecpControllerError:
raise LvmecpControllerError(f"fail to open connection with {self.host}")
self.log.error(f"fail to open connection with {self.host}")
self.log.info("Client made a connection properly.")
return connection
[docs] async def stop(self):
"""close the ModbusTCP connection with PLC"""
try:
if self.Client.protocol:
close_connection = self.Client.protocol.close()
else:
await self.Client.connect()
close_connection = self.Client.protocol.close()
except LvmecpControllerError:
raise LvmecpControllerError(f"fail to close connection with {self.host}")
self.log.error(f"fail to close connection with {self.host}")
self.log.info("Client close a connection properly.")
return close_connection
[docs] async def write(self, mode: str, addr: int, data):
"""write the data to devices
parameters
------------
mode
coil or holding_registers
addr
modbus address
data
ON 0xFF00
OFF 0x0000
"""
try:
if mode == "coil":
await self.Client.protocol.write_coil(addr, data)
elif mode == "holding_registers":
await self.Client.protocol.write_register(addr, data)
else:
raise LvmecpControllerError(f"{mode} is a wrong value")
except LvmecpControllerError:
raise LvmecpControllerError(f"fail to write coil to {addr}")
self.log.error(f"fail to write coil to {addr}")
[docs] async def read(self, mode: str, addr: int):
"""read the data from devices
parameters
------------
mode
coil or holding_registers
addr
modbus address
"""
try:
if mode == "coil":
# assert self.Client
if self.Client.protocol:
reply = await self.Client.protocol.read_coils(addr, 1, unit=0x01)
return reply.bits[0]
else:
raise LvmecpControllerError("protocol returns no values.")
self.log.warning(f"protocol returns {self.Client.protocol}")
elif mode == "holding_registers":
# assert self.Client
if self.Client.protocol:
reply = await self.Client.protocol.read_holding_registers(
addr, 1, unit=1
)
return reply.registers[0]
else:
raise LvmecpControllerError("protocol returns no values.")
self.log.warning(f"protocol returns {self.Client.protocol}")
else:
raise LvmecpControllerError(f"{mode} is a wrong value")
except LvmecpControllerError:
raise LvmecpControllerError(f"fail to read coils to {addr}")
self.log.warning(f"fail to read coils to {addr}")
[docs] async def send_command(self, module: str, element: str, command: str):
"""send command to PLC
Parameters
-----------
module
The devices controlled by lvmecp
which are "interlocks", "light", "shutter" and "emergengy".
element
The elements contained by the module
command
on/off/status/trigger
"""
self.result = {}
try:
# module "interlocks" -> 0
if module == "interlocks":
elements = self.modules[0].get_element()
if element in elements:
if command == "status":
self.result[element] = await self.get_status(
self.modules[0].mode, self.addr[module][element]
)
elif command == "trigger":
await self.write(
self.modules[0].mode, self.addr[module][element], 0xFF00
)
else:
raise LvmecpControllerError(f"{command} is not correct")
else:
raise LvmecpControllerError(f"{element} is not correct")
# module "lights" -> 1
# 0x0000 off
# 0xff00 on
if module == "lights":
elements = self.modules[1].get_element()
if command == "status":
if element in elements:
self.result[element] = await self.get_status(
self.modules[1].mode, self.addr[module][element]
)
elif element == "all":
for element in elements:
self.result[element] = await self.get_status(
self.modules[1].mode, self.addr[module][element]
)
else:
raise LvmecpControllerError(f"{element} is not correct")
elif command == "on":
if element in elements:
await self.write(
self.modules[1].mode, self.addr[module][element], 0xFF00
)
else:
raise LvmecpControllerError(f"{element} is not correct")
elif command == "off":
if element in elements:
await self.write(
self.modules[1].mode, self.addr[module][element], 0x0000
)
else:
raise LvmecpControllerError(f"{element} is not correct")
else:
raise LvmecpControllerError(f"{command} is not correct")
# module "dome" -> 2, 3
if module == "shutter1":
elements = self.modules[2].get_element()
if command == "status":
if element in elements:
self.result[element] = await self.get_status(
self.modules[2].mode, self.addr[module][element]
)
elif element == "all":
for element in elements:
self.result[element] = await self.get_status(
self.modules[2].mode, self.addr[module][element]
)
else:
raise LvmecpControllerError(f"{element} is not correct")
elif command == "on":
if element in elements:
await self.write(
self.modules[2].mode, self.addr[module][element], 0xFF00
)
elif element == "all":
for element in elements:
await self.write(
self.modules[2].mode, self.addr[module][element], 0xFF00
)
else:
raise LvmecpControllerError(f"{element} is not correct")
elif command == "off":
if element in elements:
await self.write(
self.modules[2].mode, self.addr[module][element], 0x0000
)
elif element == "all":
for element in elements:
await self.write(
self.modules[2].mode, self.addr[module][element], 0x0000
)
else:
raise LvmecpControllerError(f"{element} is not correct")
else:
raise LvmecpControllerError(f"{command} is not correct")
if module == "shutter2":
elements = self.modules[3].get_element()
if command == "status":
if element in elements:
self.result[element] = await self.get_status(
self.modules[3].mode, self.addr[module][element]
)
elif element == "all":
for element in elements:
self.result[element] = await self.get_status(
self.modules[3].mode, self.addr[module][element]
)
else:
raise LvmecpControllerError(f"{element} is not correct")
elif command == "on":
if element in elements:
await self.write(
self.modules[3].mode, self.addr[module][element], 0xFF00
)
elif element == "all":
for element in elements:
await self.write(
self.modules[3].mode, self.addr[module][element], 0xFF00
)
else:
raise LvmecpControllerError(f"{element} is not correct")
elif command == "off":
if element in elements:
await self.write(
self.modules[3].mode, self.addr[module][element], 0x0000
)
elif element == "all":
for element in elements:
await self.write(
self.modules[3].mode, self.addr[module][element], 0x0000
)
else:
raise LvmecpControllerError(f"{element} is not correct")
else:
raise LvmecpControllerError(f"{command} is not correct")
# module "emergengy_stop" -> 4
if module == "emergency":
if element == "0":
if command == "status":
elements = self.modules[4].get_element()
for element in elements:
self.result[element] = await self.get_status(
self.modules[3].mode, self.addr[module][element]
)
else:
raise LvmecpControllerError(f"{command} is not correct")
else:
raise LvmecpControllerError(f"{element} is not correct")
# module "hvac" -> 0
if module == "hvac":
if command == "status":
elements = self.modules[0].get_element()
if element in elements:
self.result[element] = await self.get_status(
self.modules[0].mode, self.addr[module][element]
)
self.result["unit"] = self.unit[module][element]
elif element == "all":
for element in elements:
see = {}
see["value"] = await self.get_status(
self.modules[0].mode, self.addr[module][element]
)
see["unit"] = self.unit[module][element]
self.result[element] = see
else:
raise LvmecpControllerError(f"{element} is not correct")
else:
raise LvmecpControllerError(f"{command} is not correct")
return self.result
except LvmecpControllerError:
raise LvmecpControllerError(f"We cannot send command to the PLC {module}")
self.log.error(f"We cannot send command to the PLC {module}")
[docs] async def get_status(self, mode: str, addr: int):
"""get the status of the device
parameters
------------
mode
coil or holding_registers
addr
modbus address
"""
if mode == "coil":
reply = await self.read("coil", addr)
status = await self.parse(reply)
elif mode == "holding_registers":
reply = await self.read("holding_registers", addr)
status = reply
else:
raise LvmecpControllerError(f"{mode} is not correct")
return status
[docs] @staticmethod
async def parse(value):
"""Parse the input data for ON/OFF."""
if value in ["off", "OFF", "0", 0, False]:
return 0
if value in ["on", "ON", "1", 1, True]:
return 1
return -1
[docs] def config_get(self, key, default=None):
"""Read the configuration and extract the data as a structure that we want.
Notice: DOESNT work for keys with dots !!!
Parameters
----------
key
The tree structure as a string to extract the data.
For example, if the configuration structure is
ports;
1;
desc; "Hg-Ar spectral callibration lamp"
You can input the key as
"ports.1.desc" to take the information "Hg-Ar spectral callibration lamp"
"""
def g(config, key, d=None):
"""Internal function for parsing the key from the configuration.
Parameters
----------
config
config from the class member, which is saved from the class instance
key
The tree structure as a string to extract the data.
For example, if the configuration structure is
ports:
num:1
1:
desc: "Hg-Ar spectral callibration lamp"
You can input the key as
"ports.1.desc" to take the information "Hg-Ar spectral callibration lamp"
"""
k = key.split(".", maxsplit=1)
c = config.get(
k[0] if not k[0].isnumeric() else int(k[0])
) # keys can be numeric
return (
d
if c is None
else c
if len(k) < 2
else g(c, k[1], d)
if type(c) is dict
else d
)
return g(self.config, key, default)
[docs]class Module:
"""Defines modules connected with an PLC.
Parameters
----------
plcname
A plc name connected with the module.
config
The configuration defined on the .yaml file under /etc/lvmecp.yml
name
Module name
mode
Modbus memory type
coil/register
channels
Number of elements in the module
description
An explanation of the module
"""
def __init__(
self,
plcname: str,
config: [],
name: str,
mode: str,
channels: int,
description: str,
*args,
**kwargs,
):
self.plc = plcname
self.config = config
self.name = name
self.mode = mode
self.description = description
self.channels = channels
[docs] def get_address(self):
"""return a dictionary about modbus address of each element in module."""
addr = {}
elements = self.config_get(f"modules.{self.name}.elements")
elements_list = list(elements.keys())
for element in elements_list:
addr[element] = elements[element]["address"]
return addr
[docs] def get_unit(self):
"""return a dictionary about units of each element in module."""
unit = {}
elements = self.config_get(f"modules.{self.name}.elements")
elements_list = list(elements.keys())
for element in elements_list:
unit[element] = elements[element]["units"]
return unit
[docs] def get_element(self):
"""return a list of elements in module."""
elements = self.config_get(f"modules.{self.name}.elements")
elements_list = list(elements.keys())
return elements_list
[docs] def config_get(self, key, default=None):
"""Read the configuration and extract the data as a structure that we want.
Notice: DOESNT work for keys with dots !!!
Parameters
----------
key
The tree structure as a string to extract the data.
For example, if the configuration structure is
ports;
1;
desc; "Hg-Ar spectral callibration lamp"
You can input the key as
"ports.1.desc" to take the information "Hg-Ar spectral callibration lamp"
"""
def g(config, key, d=None):
"""Internal function for parsing the key from the configuration.
Parameters
----------
config
config from the class member, which is saved from the class instance
key
The tree structure as a string to extract the data.
For example, if the configuration structure is
ports:
num:1
1:
desc: "Hg-Ar spectral callibration lamp"
You can input the key as
"ports.1.desc" to take the information "Hg-Ar spectral callibration lamp"
"""
k = key.split(".", maxsplit=1)
c = config.get(
k[0] if not k[0].isnumeric() else int(k[0])
) # keys can be numeric
return (
d
if c is None
else c
if len(k) < 2
else g(c, k[1], d)
if type(c) is dict
else d
)
return g(self.config, key, default)