Source code for lvmecp.actor.actor
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: Mingyeong Yang (mingyeong@khu.ac.kr)
# @Date: 2021-09-30
# @Filename: actor.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)
from __future__ import annotations
import asyncio
import os
import warnings
from typing import ClassVar, Dict
import click
from clu.actor import AMQPActor
from lvmecp import __version__
from lvmecp.controller.controller import PlcController
from lvmecp.exceptions import LvmecpUserWarning
from .commands import parser as lvmecp_command_parser
__all__ = ["LvmecpActor"]
[docs]class LvmecpActor(AMQPActor):
"""Lvmecp controller actor.
Parameters
--------------
controllers
The list of `.PlcController` instances to manage.
"""
parser: ClassVar[click.Group] = lvmecp_command_parser
BASE_CONFIG: ClassVar[str | Dict | None] = None
def __init__(self, *args, controllers: tuple[PlcController, ...] = (), **kwargs):
#: dict[str, PlcController]: A mapping of controller name to controller.
self.controllers = {c.name: c for c in controllers}
if "schema" not in kwargs:
kwargs["schema"] = os.path.join(
os.path.dirname(__file__),
"../etc/schema.json",
)
super().__init__(*args, **kwargs)
self.version = __version__
[docs] async def start(self):
"""Start the actor and connect the controllers."""
connect_timeout = self.config["timeouts"]["controller_connect"]
print(self.parser_args[0])
for plc in self.parser_args[0]:
try:
self.log.debug(f"Start {plc.name} ...")
await asyncio.wait_for(plc.start(), timeout=connect_timeout)
except asyncio.TimeoutError:
warnings.warn(
f"Timeout out connecting to {plc.name!r}.",
LvmecpUserWarning,
)
raise LvmecpUserWarning(f"We cannot connect with {plc.name}")
await super().start()
self.log.debug("Start done")
[docs] async def stop(self):
"""Stop the actor and disconnect the controllers."""
await super().stop()
for plc in self.parser_args[0]:
await plc.stop()
[docs] @classmethod
def from_config(cls, config, *args, **kwargs):
"""Creates an actor from a configuration file."""
if config is None:
if cls.BASE_CONFIG is None:
raise RuntimeError("The class does not have a base configuration.")
config = cls.BASE_CONFIG
instance = super(LvmecpActor, cls).from_config(config, *args, **kwargs)
assert isinstance(instance, LvmecpActor)
assert isinstance(instance.config, dict)
if "plcs" in instance.config:
plcs = []
for (name, config) in instance.config["plcs"].items():
instance.log.info(f"Instance {name}: {config}")
try:
plcs.append(PlcController(name, config, instance.log))
except Exception as ex:
instance.log.error(f"Error in {type(ex)}: {ex}")
instance.plcs = plcs
instance.parser_args = [instance.plcs]
return instance