Source code for automate.statusobject

# -*- coding: utf-8 -*-
# (c) 2015 Tuomas Airaksinen
# This file is part of Automate.
# Automate is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# Automate is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with Automate.  If not, see <>.
# ------------------------------------------------------------------
# If you like Automate, please take a look at this page:

import logging
import operator
import threading
import time
import sys
import collections

import datetime
from functools import lru_cache
from numbers import Number

from traits.api import (cached_property, Any, CBool, Instance, Dict, Str, CFloat,
                        List, Enum, Bool, Property, Event, CInt)
from traits.trait_errors import TraitError

from .common import Lock, AbstractStatusObject, CompareMixin, nomutex
from .worker import StatusWorkerTask, DummyStatusWorkerTask
from .program import ProgrammableSystemObject, DefaultProgram
from .systemobject import SystemObject

[docs]class StatusObject(AbstractStatusObject, ProgrammableSystemObject, CompareMixin): """ Baseclass for Sensors and Actuators """ editable = True #: Determines minimum time required for switching. State change is then delayed if necessary. safety_delay = CFloat(0.) #: Determines when :attr:`.safety_delay` needs to be taken into account: when status is #: rising, falling or both. safety_mode = Enum("rising", "falling", "both") #: Similar to :attr:`.safety_delay`, but just delays change to make sure that events shorter #: than change_delay are not taken into account change_delay = CFloat(0.) #: As :attr:`.safety_mode`, but for :attr:`.change_delay` change_mode = Enum("rising", "falling", "both") #: Do not emit actuator status changes into logs silent = CBool(False) #: Print more debugging information into logs debug = CBool(False) #: (property) Is delayed change taking place at the moment? changing = Property(trait=Bool, transient=True, depends_on='_timed_action, _queued_job') # Deque of history, which consists of tuples (timestamp, status), read only history = Any(transient=True) #: Transpose of history (timesstamps, statuses) history_transpose = Property(transient=True, depends_on='history, _status') #: Amount of status change events to be stored in history history_length = CInt(1000) #: How often new values are saved to history, in seconds history_frequency = CFloat(0) @cached_property def _get_changing(self): if self._queued_job or self._timed_action: return True return False # Thread of currently running action _timed_action = Instance(threading.Timer, transient=True) # Reference of status change job that is in the worker queue is saved here _queued_job = Instance(StatusWorkerTask, transient=True) # Time when status was last changed _last_changed = CFloat # The time when last change started _change_start = CFloat(transient=True) # Lock that is acquired when changing the status _status_lock = Instance(Lock, transient=True) logger = Instance(logging.Logger, transient=True) view = ["name", "status", "description", "safety_delay", "safety_mode", "change_delay", "change_mode", "history_length", 'history_frequency'] + SystemObject.view simple_view = [] # used by Web UI, for templates data_type = Str(transient=True) @cached_property def _get_history_transpose(self): return list(zip(*self.history)) @property def times(self): return self.history_transpose[0] @property def datetimes(self): return [datetime.datetime.fromtimestamp(i) for i in self.times] @property def statuses(self): return self.history_transpose[1] def status_at_time(self, T): if isinstance(T, datetime.datetime): T = T.timestamp() t_max = 0 times, statuses = self.history_transpose if T < times[0]: return 0. for t in times: if t <= T: t_max = t else: break t_index = times.index(t_max) return statuses[t_index] @staticmethod def _convert_times(t_a, t_b): if t_a is None: t_a = 0. elif isinstance(t_a, datetime.datetime): t_a = t_a.timestamp() if t_b is None: t_b = time.time() elif isinstance(t_b, datetime.datetime): t_b = t_b.timestamp() return t_a, t_b @lru_cache() def integral(self, t_a=None, t_b=None): with self._status_lock: self.logger.debug('Calculating integral for %s', self) t_a, t_b = self._convert_times(t_a, t_b) history = ((t, s) for t, s in self.history if t_a <= t <= t_b and isinstance(s, Number)) t_prev = t_a s_prev = self.status_at_time(t_a) if not isinstance(s_prev, Number): s_prev = 0. s_sum = 0. for t, s in history: s_sum += s_prev * (t-t_prev) s_prev, t_prev = s, t s_sum += s_prev * (t_b-t_prev) return s_sum def average(self, t_a=None, t_b=None): t_a, t_b = self._convert_times(t_a, t_b) if t_a == t_b: return 0. return self.integral(t_a, t_b) / (t_b-t_a) def __init__(self, *args, **kwargs): self._status_lock = Lock('statuslock') super(StatusObject, self).__init__(*args, **kwargs) def __setstate__(self, *args, **kwargs): self._status_lock = Lock('statuslock') super(StatusObject, self).__setstate__(*args, **kwargs) def _history_length_changed(self, new_value): self.history = collections.deque(list(self.history or [])[-new_value:], maxlen=new_value) @property def is_program(self): """ A property which can be used to check if StatusObject uses program features or not. """ from automate.callables import Empty return not (isinstance(self.on_activate, Empty) and isinstance(self.on_deactivate, Empty) and isinstance(self.on_update, Empty)) #: Status of the object. status = Property(depends_on="_status, _status_trigger", transient=True) #: To force trigger status change events even if status itself does not change _status_trigger = Event
[docs] def get_status_display(self, **kwargs): """ Define how status is displayed in UIs (add units etc.). """ if 'value' in kwargs: value = kwargs['value'] else: value = self.status return u"%s" % value
[docs] def get_as_datadict(self): """ Get data of this object as a data dictionary. Used by websocket service. """ d = super(StatusObject, self).get_as_datadict() d.update(dict(status=self.status, data_type=self.data_type, editable=self.editable)) return d
@cached_property def _get_status(self): return self._status def _set_status(self, value): self.set_status(value) def setup_system(self, *args, **kwargs): super(StatusObject, self).setup_system(*args, **kwargs) self.history = collections.deque(maxlen=self.history_length) self.data_type = self._status.__class__.__name__
[docs] def set_status(self, new_status, origin=None, force=False): """ For sensors, this is synonymous to:: sensor.status = new_status For (non-slave) actuators, origin argument (i.e. is the program that is changing the status) need to be given, """ raise NotImplementedError
[docs] def update_status(self): """ In sensors: implement particular value reading from device etc. here (this calls set_status(value)). In actuators: set value in particular device. Implement in subclasses. """
[docs] def activate_program(self, program): """ When program controlling this object activates, it calls this function. """
[docs] def deactivate_program(self, program): """ When program controlling this object deactivates, it calls this function. """
[docs] def get_program_status(self, program): """ Determine status of this object set by a particular program. Useful only for Actuators but defined here for interface compatibility. """ return self.status
def _set_real_status(self, status, prog): # _status_lock is acquired at StatusWorkerTask. Do not call this function from anywhere else! # This function is finally called by queue and this sets the value as desired. # Thus no need to worry that this function would be called in two threads # simultaneously (caller always WorkerThread). # prog is always None for non-actuators self.logger.debug('_set_real_status %s', status) if not self.silent: if isinstance(self, AbstractActuator):"%s %s (by %s) setting status to %s", self.__class__.__name__, self, prog, status) # if not self.slave and prog != getattr(self, 'program', None): if not getattr(self, 'slave', True) and not prog is getattr(self, 'program', None): self.logger.debug('program had changed, not changing') return self._change_start = 0. change_time = self._last_changed = time.time() try: if self._status == status: self._status_trigger = True else: if self.history: last_time, last_value = self.history[-1] if self._last_changed - last_time < self.history_frequency: self.history.pop() change_time = last_time self.history.append((change_time, status)) self.integral.cache_clear() self._status = status except TraitError as e: self.logger.warning('Wrong type of status %s was passed to %s. Error: %s', status, self, e) def _add_statuschange_to_queue(self, status, prog, with_statuslock=False): # This function is used for delayed actions if self.system.two_phase_queue: with self._status_lock if with_statuslock else nomutex: if status != self._status: self._queued_job = StatusWorkerTask(func=self._set_real_status, args=(status, prog), object=self) self.system.worker_thread.put(self._queued_job) else: self.system.worker_thread.put(DummyStatusWorkerTask(self._set_real_status, status, prog)) def _are_delays_active(self, new_status): try: mode = "rising" if new_status > self._status else "falling" except TypeError: # if we can not determine whether value is rising or falling, we'll assume it's rising mode = 'rising' safety_active = False change_active = False if self.safety_delay > 0. and self.safety_mode in [mode, "both"]: safety_active = True if self.change_delay > 0. and self.change_mode in [mode, "both"]: change_active = True return safety_active, change_active def _do_change_status(self, status, force=False): """ This function is called by - set_status - _update_program_stack if active program is being changed - thia may be launched by sensor status change. status lock is necessary because these happen from different threads. This does not directly change status, but adds change request to queue. """ self.system.worker_thread.put(DummyStatusWorkerTask(self._request_status_change_in_queue, status, force=force)) @property def next_scheduled_action(self): return getattr(self._timed_action, 'next_action', None) def _request_status_change_in_queue(self, status, force=False): def timer_func(func, *args): with self._status_lock: func(*args) self._timed_action = None with self._status_lock: timenow = time.time() logger = if self.debug else self.logger.debug logger("_do_change_status prg:%s status:%s", getattr(self, 'program', None), status) if self._timed_action: if self._timed_action.is_alive(): logger("Cancelling previous safety/change_delay action. Now changing to %s", status) self._timed_action.cancel() self._timed_action = None self._queued_job = None safetydelay_active, changedelay_active = self._are_delays_active(status) run_now = not (safetydelay_active or changedelay_active) if not self._change_start: self._change_start = timenow if status == self._status and not force: self._change_start = 0 logger('Status same %s, no need to change', status) return orig_changedelay = self.change_delay if changedelay_active else 0 safetydelay = self.safety_delay if safetydelay_active else 0 changedelay = orig_changedelay - (timenow - self._change_start) if changedelay < 0: changedelay = orig_changedelay if run_now or (changedelay <= 0. and (timenow - self._last_changed > safetydelay)): logger("Adding status change to queue, about to change status to %s", status) if self.system.two_phase_queue: self._add_statuschange_to_queue(status, getattr(self, "program", None)) else: self._set_real_status(status, getattr(self, 'program', None)) else: timesince = time.time() - self._last_changed delaytime = max(0, safetydelay - timesince, changedelay) time_after_delay = + datetime.timedelta(seconds=delaytime) logger("Scheduling safety/change_delay timer for %f sek. Now %s. Going to change to %s.", delaytime, self._status, status) self._timed_action = threading.Timer(delaytime, timer_func, args=(self._add_statuschange_to_queue, status, getattr(self, "program", None), False)) = "Safety/change_delay for %s timed at %s (%f sek)" % (, time_after_delay, delaytime) self._timed_action.next_action = time_after_delay self._timed_action.start() return False
[docs]class AbstractSensor(StatusObject): """ Base class for all sensors """ #: Is sensor user-editable in UIs. This variable is meant for per-instance tuning for Sensors, #: whereas :attr:`.editable` is for per-class adjustment. user_editable = CBool(False) #: Status type is defined as _status, but always use :attr:`.status` property to set/get values. _status = Any #: Default value for status default = Any #: If non-zero, Sensor status will be reset to default after defined time (in seconds). reset_delay = CFloat _reset_timer = Any(transient=True) #: Do not log status changes silent = CBool(False) #: Filter status with a function (lambdas are not serializable, don't use them if you are #: using system state saving) status_filter = Any view = StatusObject.view + ['default', 'name', 'tags', 'reset_delay'] simpleview = StatusObject.simple_view + ['_status'] def get_as_datadict(self): d = super(AbstractSensor, self).get_as_datadict() d.update(dict(user_editable=self.user_editable)) return d def _setup_reset_delay(self): if self.reset_delay: if self._reset_timer and self._reset_timer.is_alive(): self._reset_timer.cancel() self._reset_timer = threading.Timer(self.reset_delay, lambda: self.set_status(self.default)) self._reset_timer.start()
[docs] def set_status(self, status, origin=None, force=False): """ Compatibility to actuator class. Also :class:`~automate.callables.builtin_callables.SetStatus` callable can be used for sensors too, if so desired. """ if status != self.default: self._setup_reset_delay() if self.status_filter: status = self.status_filter(status) return self._do_change_status(status, force)
[docs] def update_status(self): """A method to read and update actual status. Implement it in subclasses, if necessary"""
def _status_changed(self): if not self.silent:"%s status changed to %s", self, repr(self.status)) def __str__(self): return def setup_system(self, system, *args, **kwargs): name, traits = self._passed_arguments default = traits.get('default', None) super(AbstractSensor, self).setup_system(system, *args, **kwargs) load_state = kwargs.get('loadstate', None) if not default is None and not load_state: self.set_status(default) elif load_state: self.history.append((time.time(), self._status))
[docs]class AbstractActuator(StatusObject): """ Base class for all actuators.""" #: Default value for status. For actuators, this is set by automatically created #: :class:`~automate.program.DefaultProgram` dp_actuatorname default = Any(False, transient=True) #: If ``True``, actual status can be set by any program anytime without restrictions. slave = CBool #: A property giving current program governing the status of this actuator (program that has the highest priority) program = Property(trait=Instance(ProgrammableSystemObject), transient=True, depends_on="program_stack[]") #: This dictionary can be used to override program priorities. #: #: .. note:: Keys here must be program names, (not :class:`~automate.program.Program` instances). priorities = Dict(key_trait=Str, value_trait=CFloat) @cached_property def _get_program(self): try: return self.program_stack[-1] except IndexError: return #: Reference to actuators :class:`~automate.program.DefaultProgram` default_program = Instance(DefaultProgram) # Locks for thread-safety _actuator_status_lock = Instance(Lock, transient=True) _program_lock = Instance(Lock, transient=True) view = StatusObject.view + ["status", "slave", "tags"] simple_view = StatusObject.simple_view + ['status'] def __init__(self, *args, **kwargs): self._actuator_status_lock = Lock("statuslock") self._program_lock = Lock("programlock") super(AbstractActuator, self).__init__(*args, **kwargs) def __setstate__(self, *args, **kwargs): self._actuator_status_lock = Lock("statuslock") self._program_lock = Lock("programlock") return super(AbstractActuator, self).__setstate__(*args, **kwargs)
[docs] def set_status(self, status, origin=None, force=False): """ For programs, to set current status of the actuator. Each active program has its status in :attr:`.program_stack` dictionary and the highest priority is realized in the actuator """ if not self.slave and origin not in self.program_stack: raise ValueError('Status cannot be changed directly') with self._actuator_status_lock: self.logger.debug("set_status got through, program: %s", origin) if self.debug:"Set_status %s %s %s",, origin, status) if self.slave: return self._do_change_status(status, force) self.logger.debug("Sets status %s for %s", status, with self._program_lock: self.program_status[origin] = status if self.program == origin: return self._do_change_status(status, force)
[docs] def activate_program(self, program): """ Called by program which desires to manipulate this actuator, when it is activated. """ self.logger.debug("activate_program %s", program) if program in self.program_stack: return with self._program_lock: self.logger.debug("activate_program got through %s", program) self.program_stack.append(program) self._update_program_stack()
[docs] def deactivate_program(self, program): """ Called by program, when it is deactivated. """ self.logger.debug("deactivate_program %s", program) with self._program_lock: self.logger.debug("deactivate_program got through %s", program) if program not in self.program_stack: import ipdb ipdb.set_trace() self.program_stack.remove(program) if program in self.program_status: del self.program_status[program] self._update_program_stack()
[docs] def update_program_stack(self): """ Update :attr:`.program_stack`. Used by programs :attr:`~automate.program.ProgrammableSystemObject._priority_changed` attribute to reset ordering. """ with self._program_lock: self._update_program_stack()
[docs] def get_program_status(self, prog): """ Give status defined by given program ``prog`` """ try: return self.program_status[prog] except KeyError: return None
def _status_changed(self): """ Override this method to whatever action you want to actualize when status is changed. This one is called in the initialization procedure as well. Usually this method is enough. Sometimes you might not want to do anything in the initialization, and then you can override __status_changed method instead. """ # Private attributes #: Use property 'status' instead in normal operations (status is set via status worker thread). _status = Any(transient=True) #: Program stack of current programs, sorted automatically by program priority. program_stack = List(trait=Instance(ProgrammableSystemObject), transient=True) #: Dictionary containing statuses set by each active program program_status = Dict(key_trait=Instance(ProgrammableSystemObject), transient=True) def setup_system(self, system, *args, **kwargs): from automate.callables import SetStatus, Value, Empty super(AbstractActuator, self).setup_system(system, *args, **kwargs) actevent = SetStatus(self, Value(self.default)) if self.slave: actevent = Empty() self.status = self.default from .program import DefaultProgram if not self.default_program: self.default_program = DefaultProgram( targets=[self], active_condition=Value(False), on_activate=actevent, priority=0, system=self.system, name='dp_%s' %, ) self.default_program.active_condition = Value(True) = "programlock " + = "statuslock " + def __str__(self): return def _update_program_stack(self): # Use of this function must be protected from outside by self.program_lock! if self.priorities: self.program_stack.sort(key=lambda x: self.priorities.get(, x.priority)) else: self.program_stack.sort(key=operator.attrgetter('priority')) if self.program in self.program_status: self._do_change_status(self.program_status[self.program])