[docs]classDomeController(PLCModule[DomeStatus]):"""Controller for the rolling dome."""flag=DomeStatusinterval=15.0def__init__(self,*args,**kwargs):super().__init__(*args,**kwargs)# Timestamps when we have opened the dome. For the anti-flap mechanism.self._open_attempt_times:list[float]=[]asyncdef_update_internal(self,use_cache:bool=True,**kwargs):dome_registers=awaitself.plc.modbus.read_group("dome",use_cache=use_cache)dome_status=SimpleNamespace(**dome_registers)assertself.flagnew_status=self.flag(0)# The variable that would determine if the drive is available is not# does not exist any more, so we assume it is.new_status|=self.flag.DRIVE_AVAILABLEifdome_status.dome_errorordome_status.drive_status1>0:new_status|=self.flag.DRIVE_ERRORifdome_status.drive_enabled:new_status|=self.flag.DRIVE_ENABLEDnew_status|=self.flag.MOVINGifdome_status.motor_direction:new_status|=self.flag.MOTOR_OPENINGelse:new_status|=self.flag.MOTOR_CLOSINGifdome_status.dome_openisTrue:new_status|=self.flag.OPENelifdome_status.dome_closedisTrue:new_status|=self.flag.CLOSEDelse:new_status|=self.flag.POSITION_UNKNOWNifnew_status.value==0:new_status=self.flag(self.flag.__unknown__)ifdome_status.dome_open:percent_open=1elifdome_status.dome_closed:percent_open=0else:full_open=config["dome.full_open_mm"]percent_open=numpy.clip(dome_status.dome_position/full_open,0,1)extra_info={"dome_percent_open":round(float(percent_open)*100,1),}returnnew_status,extra_info
[docs]asyncdefset_direction(self,open:bool):"""Sets the motor direction (`True` means open, `False` close)."""awaitself.modbus["drive_direction"].write(open)awaitself.update(use_cache=False)
asyncdef_move(self,open:bool,force:bool=False,mode:DRIVE_MODE_TYPE="normal",):"""Moves the dome to open/close position."""ifmode=="overcurrent"andopen:raiseDomeError("Cannot open dome in overcurrent mode.")awaitself.update(use_cache=False)awaitself.plc.safety.update(use_cache=False)# Check safety flags.ifnot(awaitself.plc.safety.is_remote()):raiseDomeError("Cannot move dome while in local mode.")ifnotself.plc.safety.statusorself.plc.safety.status&SafetyStatus.E_STOP:raiseDomeError("E-stops are pressed.")assertself.statusisnotNoneandself.flagisnotNoneifself.status&self.flag.UNKNOWN:raiseDomeError("Dome is in unknown state.")ifself.status&self.flag.NODRIVE:raiseDomeError("Dome drive is not available.")# Check drive errors.ifself.status&self.flag.DRIVE_ERROR:raiseDomeError("Dome drive is in error state. Please check ""the drive and reset the error state if appropriate.")ifself.status&self.flag.DRIVE_ENABLED:# Dome is moving.opening=bool(self.status&self.flag.MOTOR_OPENING)closing=bool(self.status&self.flag.MOTOR_CLOSING)# If the dome is moving ing the right direction, do nothing.if(openandopening)or(notopenandclosing):log.info("Dome is already moving in the commanded direction.")awaitself._wait_until_movement_done(open)return# Otherwise we stop the move and wait a bit for things to clear.log.warning("Stopping the dome before moving to the commanded position.")awaitself.stop()already_at_position=Falseif(self.status&self.flag.OPEN)andopen:already_at_position=Trueelif(self.status&self.flag.CLOSED)andnotopen:already_at_position=Trueelse:log.warning("Dome in unknown or intermediate position.")ifalready_at_position:ifforceisFalse:returnlog.warning("Dome already at position but forcing.")ifmode=="normal":log.debug("Setting drive mode to normal.")awaitself.modbus["drive_mode_overcurrent"].write(0)elifmode=="overcurrent":log.debug("Setting drive mode to overcurrent.")awaitself.modbus["drive_mode_overcurrent"].write(1)log.debug("Setting motor_direction.")awaitself.modbus["motor_direction"].write(open)awaitasyncio.sleep(0.1)log.debug("Setting drive_enabled.")awaitself.modbus["drive_enabled"].write(True)awaitasyncio.sleep(0.1)# Wait until the dome finishes to move, with a timeout.awaitself._wait_until_movement_done(open)# Reset drive_mode_overcurrent.awaitself.modbus["drive_mode_overcurrent"].write(0)awaitself.update(use_cache=False)asyncdef_wait_until_movement_done(self,open:bool,timeout:float=300):"""Blocks until the dome has finished moving."""elapsed:float=0.0last_enabled:float=0.0whileTrue:awaitasyncio.sleep(MOVE_CHECK_INTERVAL)elapsed+=MOVE_CHECK_INTERVALifelapsed>=timeout:raiseDomeError("Timeout waiting for dome to finish moving.")drive_enabled=awaitself.modbus["drive_enabled"].read(use_cache=False)move_done_register=self.modbus["dome_open"ifopenelse"dome_closed"]move_done=awaitmove_done_register.read(use_cache=False)ifdrive_enabled:last_enabled=time()ifnotdrive_enabledandmove_done:break# Check if the drive is not enabled for more than 5 seconds without the# movement being done. This usually means the dome has been manually# stopped.ifnotdrive_enabledand(time()-last_enabled)>5:raiseDomeError("Dome drive has been disabled.")
[docs]asyncdefopen(self,force:bool=False):"""Open the dome."""self._open_attempt_times.append(time())ifnotawaitself.allowed_to_open():raiseDomeError("Dome is not allowed to open.")awaitself._move(True,force=force)
[docs]asyncdefclose(self,force:bool=False,mode:DRIVE_MODE_TYPE="normal"):"""Close the dome."""awaitself._move(False,force=force,mode=mode)
[docs]asyncdefstop(self):"""Stops the dome."""status=awaitself.update(use_cache=False)ifstatusisNoneorself.flagisNone:raiseRuntimeError("Failed retrieving dome status.")drive_enabled=bool(status&self.flag.DRIVE_ENABLED)ifnotdrive_enabled:returnawaitself.plc.modbus["drive_enabled"].write(False)awaitasyncio.sleep(AFTER_STOP_DELAY)awaitself.update(use_cache=False)
[docs]asyncdefreset(self):"""Resets the roll-off error state."""awaitself.modbus["dome_error_reset"].write(True)awaitasyncio.sleep(0.5)
[docs]asyncdefallowed_to_open(self):"""Returns whether the dome is allowed to move."""ifawaitself.plc.safety.engineering_mode_active():ifactor:=self.plc._actor:actor.write("w",text="Engineering mode active. Skipping dome checks.")returnTrueifnotconfig["dome.daytime_allowed"]andself.is_daytime():raiseDomeError("Dome is not allowed to open during daytime.")anti_flap_n,anti_flap_interval=config["dome.anti_flap_tolerance"]or[3,600]attempts_in_interval:list[float]=[]forttinself._open_attempt_times[::-1]:iftime()-tt<anti_flap_interval:attempts_in_interval.append(tt)else:breakiflen(attempts_in_interval)>=anti_flap_n:raiseDomeError("Too many open attempts in a short interval. "f"Wait {anti_flap_interval} seconds before trying again.")returnTrue
[docs]defis_daytime(self):# pragma: no cover"""Returns whether it is daytime."""daytime_tolerance=config["dome.daytime_tolerance"]or0.0ephemeris=get_ephemeris_summary()sunset=ephemeris["sunset"]-daytime_tolerance/86400sunrise=ephemeris["sunrise"]+daytime_tolerance/86400now=Time.now().jdassertisinstance(now,float)ifnow<sunsetornow>sunrise:returnTruereturnFalse