Source code for lvmecp.proxy_async

import sys
import time
import uuid

from clu.actor import AMQPClient
from cluplus.proxy import Proxy


# actor = "lvmecp"

# amqpc = AMQPClient(name=f"proxy-{uuid.uuid4().hex[:8]}")
# lvmecp = Proxy(amqpc, actor)
# lvmecp.start()


[docs]class LvmecpProxy:
[docs] async def ping(self): try: amqpc = AMQPClient(name=f"{sys.argv[0]}.proxy-{uuid.uuid4().hex[:8]}") lvmecp = Proxy(amqpc, "lvmecp") lvmecp.start() except Exception as e: amqpc.log.error(f"Exception: {e}") # sequential try: result = await lvmecp.ping() except Exception as e: self.amqpc.log.error(f"Exception: {e}") return result
[docs] async def telemetry(self): try: amqpc = AMQPClient(name=f"{sys.argv[0]}.proxy-{uuid.uuid4().hex[:8]}") lvmecp = Proxy(amqpc, "lvmecp") lvmecp.start() except Exception as e: amqpc.log.error(f"Exception: {e}") # sequential try: result = await lvmecp.telemetry() except Exception as e: self.amqpc.log.error(f"Exception: {e}") return result
[docs] async def monitor(self): try: amqpc = AMQPClient(name=f"{sys.argv[0]}.proxy-{uuid.uuid4().hex[:8]}") lvmecp = Proxy(amqpc, "lvmecp") lvmecp.start() except Exception as e: amqpc.log.error(f"Exception: {e}") # sequential try: result = await lvmecp.monitor() except Exception as e: self.amqpc.log.error(f"Exception: {e}") return result
[docs] async def dome(self, command: str): """ parameters ------------ command enable status """ try: amqpc = AMQPClient(name=f"{sys.argv[0]}.proxy-{uuid.uuid4().hex[:8]}") lvmecp = Proxy(amqpc, "lvmecp") lvmecp.start() except Exception as e: amqpc.log.error(f"Exception: {e}") # sequential try: result = await lvmecp.dome(command) except Exception as e: self.amqpc.log.error(f"Exception: {e}") return result
[docs] async def light(self, command: str, room=None): """ parameters ------------ command enable status """ try: amqpc = AMQPClient(name=f"{sys.argv[0]}.proxy-{uuid.uuid4().hex[:8]}") lvmecp = Proxy(amqpc, "lvmecp") lvmecp.start() except Exception as e: amqpc.log.error(f"Exception: {e}") # sequential try: result = await lvmecp.light(command, room) except Exception as e: self.amqpc.log.error(f"Exception: {e}") return result
[docs] async def estop(self): try: amqpc = AMQPClient(name=f"{sys.argv[0]}.proxy-{uuid.uuid4().hex[:8]}") lvmecp = Proxy(amqpc, "lvmecp") lvmecp.start() except Exception as e: amqpc.log.error(f"Exception: {e}") # sequential try: result = await lvmecp.estop() except Exception as e: self.amqpc.log.error(f"Exception: {e}") return result