Source code for automate.sensors.builtin_sensors

# -*- coding: utf-8 -*-
# (c) 2015 Tuomas Airaksinen
#
# This file is part of Automate.
#
# Automate is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Automate is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Automate.  If not, see <http://www.gnu.org/licenses/>.
#
# ------------------------------------------------------------------
#
# If you like Automate, please take a look at this page:
# http://evankelista.net/automate/

"""
    Module for various Sensor classes.
"""
from __future__ import unicode_literals
from future import standard_library
standard_library.install_aliases()

import socket
import subprocess
import types
import pyinotify
import threading
import queue
from copy import deepcopy

from datetime import datetime, timedelta

from croniter import croniter
from traits.api import Any, CInt, CFloat, Unicode, CUnicode, CBool, Instance, CStr, Int, Property

from automate.common import get_modules_all, LogicStr
from automate.common import threaded, Lock
from automate.statusobject import AbstractSensor
from automate.callables import Value
from automate.callable import AbstractCallable


[docs]class UserAnySensor(AbstractSensor): """User editable sensor type that accepts values of any types""" user_editable = CBool(True) _status = Any
[docs]class UserBoolSensor(AbstractSensor): """Boolean-valued user-editable sensor""" user_editable = CBool(True) _status = CBool
[docs]class UserEventSensor(UserBoolSensor): """ Boolean-valued user-editable sensor suitable for using for singular events. After status has been changed to ``True``, it changes automatically its status back to ``False``. """ def __status_changed(self): self.status = False def get_default_callables(self): callables = super(UserEventSensor, self).get_default_callables() callables['active_condition'] = Value(self) return callables
[docs]class AbstractNumericSensor(AbstractSensor): """ Abstract class for numeric sensor types, that allows limiting value within a specific range. If limiting values (:attr:`.value_min`, :attr:`.value_max`) are used, value that exceeds these limits, is clipped to the range. """ #: Minimum allowed value for status value_min = CFloat(float('-inf')) #: Maximum allowed value for status value_max = CFloat(float('inf')) view = AbstractSensor.view + ['value_min', 'value_max'] @property def is_finite_range(self): return self.value_max - self.value_min < float('inf') def get_as_datadict(self): d = super(AbstractNumericSensor, self).get_as_datadict() d.update(dict(value_min=self.value_min, value_max=self.value_max)) return d def set_status(self, status, **kwargs): if status is None: clipped_status = None else: clipped_status = max(min(float(status), self.value_max), self.value_min) super(AbstractNumericSensor, self).set_status(clipped_status, **kwargs)
[docs]class UserIntSensor(AbstractNumericSensor): """Integer-valued user-editable sensor""" user_editable = CBool(True) _status = CInt(0)
[docs]class UserFloatSensor(AbstractNumericSensor): """Float-valued user-editable sensor""" user_editable = CBool(True) _status = CFloat silent = CBool(True)
[docs]class UserStrSensor(AbstractSensor): """String-valued user-editable sensor""" user_editable = CBool(True) _status = CUnicode
class CroniterOn(croniter): pass class CroniterOff(croniter): pass
[docs]class CronTimerSensor(AbstractSensor): """ Scheduled start/stop timer. Both start and stop times are configured by cron-type string (see man 5 crontab for description of the definition format). """ class CronListStr(Unicode): """Validation class for cron-compatible strings (for timers)""" def validate(self, object, name, value): vals = value.split(";") for v in vals: try: c = croniter(v) except: self.error(object, name, value) return return value _status = CBool(False) #: Semicolon separated lists of cron-compatible strings that indicate #: when to switch status to ``True`` timer_on = CronListStr("0 0 0 0 0") #: Semicolon separated lists of cron-compatible strings that indicate #: when to switch status to ``False`` timer_off = CronListStr("0 0 0 0 0") _update_timer = Any(transient=True) # Timer object _timerlock = Any(transient=True) # Lock object view = UserBoolSensor.view + ["timer_on", "timer_off"] def setup_system(self, *args, **traits): self._timerlock = Lock() super(CronTimerSensor, self).setup_system(*args, **traits) self.update_status() def _now(self): return datetime.now() def update_status(self): with self._timerlock: now = self._now() next_iters = [CroniterOn(i, now) for i in self.timer_on.split(";")] + \ [CroniterOff(i, now) for i in self.timer_off.split(";")] for i in next_iters: i.get_next(datetime) next_iters.sort(key=lambda x: x.get_current(datetime)) prev_iters = deepcopy(next_iters) for i in prev_iters: i.get_prev(datetime) prev_iters.sort(key=lambda x: x.get_current(datetime)) self.status = isinstance(prev_iters[-1], CroniterOn) self._setup_next_update(next_iters[0].get_current(datetime)) def _timer_on_changed(self, name, new): self.update_status() def _timer_off_changed(self, name, new): self.update_status() def _setup_next_update(self, next_update_time): now = self._now() if self._update_timer and self._update_timer.is_alive(): self._update_timer.cancel() delay = next_update_time - now + timedelta(seconds=5) self.logger.info('Setting timer to %s, %s seconds, at %s', delay, delay.seconds, now+delay) self._update_timer = threading.Timer(delay.seconds, threaded(self.system, self.update_status,)) self._update_timer.name = ("Timer for TimerSensor %s at %s (%s seconds)" % (self.name, now + delay, delay.seconds)) self._update_timer.start() def cleanup(self): with self._timerlock: if self._update_timer: self._update_timer.cancel()
[docs]class FileChangeSensor(AbstractSensor): """ Sensor that detects file changes on filesystem. Integer valued status is incremented by each change. """ _status = CInt(0) #: Name of file or directory to monitor filename = CUnicode #: PyInotify flags to configure what file change events to monitor watch_flags = Int(pyinotify.IN_MODIFY | pyinotify.IN_CREATE | pyinotify.IN_DELETE) _notifier = Any(transient=True) class InotifyEventHandler(pyinotify.ProcessEvent): def __init__(self, func, *args, **kwargs): self.func = func super(FileChangeSensor.InotifyEventHandler, self).__init__(*args, **kwargs) def process_default(self, event): self.func() def notify(self): self.status += 1 def setup(self): if self._notifier: self._notifier.stop() wm = pyinotify.WatchManager() handler = self.InotifyEventHandler(self.notify) self._notifier = pyinotify.ThreadedNotifier(wm, default_proc_fun=handler) wm.add_watch(self.filename, self.watch_flags, rec=True) self._notifier.start() def cleanup(self): self._notifier.stop()
[docs]class AbstractPollingSensor(AbstractSensor): """ Abstract baseclass for sensor that polls periodically its status""" #: How often to do polling interval = CFloat(5) #: This can be used to enable/disable polling poll_active = CBool(True) _stop = CBool(False, transient=True) _pollthread = Any(transient=True) view = AbstractSensor.view + ["interval"] silent = CBool(True) def setup(self): self._restart() def _poll_active_changed(self, old, new): if not self.traits_inited(): return if new: self._restart() else: self._pollthread.cancel() def _restart(self): if self._stop: return if self._pollthread and self._pollthread.is_alive(): self._pollthread.cancel() if self.poll_active: self.update_status() self._pollthread = threading.Timer(self.interval, threaded(self.system, self._restart)) time_after_interval = datetime.now() + timedelta(seconds=self.interval) self._pollthread.name = "PollingSensor: %s next poll at %s (%.2f sek)" % (self.name, time_after_interval, self.interval) self._pollthread.start() def update_status(self): pass def cleanup(self): if self._pollthread: self._stop = True self._pollthread.cancel()
[docs]class PollingSensor(AbstractPollingSensor): """ Polling sensor that uses a Callable when setting the status of the sensor. """ #: Return value of this Callable is used to set the status of the sensor when polling status_updater = Instance(AbstractCallable) status_updater_str = Property(depends_on="status_updater", trait=LogicStr, transient=True, fset=AbstractPollingSensor._str_setter, fget=AbstractPollingSensor._str_getter) #: If set, typeconversion to this is used. Can be any function or type. type = Any callables = AbstractPollingSensor.callables + ['status_updater'] def get_default_callables(self): from automate.callables import Empty c = super(PollingSensor, self).get_default_callables() c['status_updater'] = Empty() return c def setup(self): self.status_updater.setup_callable_system(self.system) self.on_trait_change(lambda: self.status_updater.setup_callable_system(self.system), 'status_updater') super(PollingSensor, self).setup() def update_status(self): if self.type is None: self.status = self.status_updater.call(self) else: self.status = self.type(self.status_updater.call(self))
[docs]class SimplePollingSensor(AbstractPollingSensor): """ Polling sensor that calls on_update periodically. """ def update_status(self): self.on_update.call(self)
[docs]class IntervalTimerSensor(AbstractPollingSensor): """ Sensor that switches status between True and False periodically. """ _status = CFloat def update_status(self): self.status = False if self.status else True
[docs]class SocketSensor(AbstractSensor): """ Sensor that reads a TCP socket. Over TCP port, it reads data per lines and tries to set the status of the sensor to the value specified by the line. If content of the line is 'close', then connection is dropped. """ #: Hostname/IP to listen. Use ``'0.0.0.0'`` to listen all interfaces. host = CStr('0.0.0.0') #: Port to listen port = CInt #: set to ``True`` to tell SocketSensor to stop listening to port stop = CBool(transient=True) _socket = Instance(socket.socket, transient=True) _status = CInt def listen_loop(self): while not self.stop: try: self.logger.info('%s listening to connections in port %s', self.name, self.port) self._socket.listen(1) self._socket.settimeout(1) while not self.stop: try: conn, addr = self._socket.accept() except socket.timeout: continue break self.logger.info('%s connected from %s', self.name, addr) conn.settimeout(1) while not self.stop: try: data = conn.recv(1024) if not data: break self.status = int(data.strip()) conn.sendall('OK\n') except socket.timeout: data = '' except ValueError: if data.strip() == 'close': break conn.sendall('NOK\n') except socket.error as e: self.logger.info("%s: Error %s caught.", self, e) except: if self.stop: return else: raise conn.close() self.logger.info('%s: connection %s closed', self.name, addr) self._socket.close() def setup(self): self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._socket.bind((self.host, self.port)) t = threading.Thread(target=self.listen_loop, name='SocketSensor %s' % self.name) t.start() def cleanup(self): self.stop = True
[docs]class ShellSensor(AbstractSensor): """ Run a shell command and follow its output. Status is set according to output, which is filtered through custome filter function. """ #: Command can be, for example, 'tail -f logfile.log', which is convenient approach to follow log files. cmd = CStr #: If this is set to true, caller object is passed to the filter function as second argument caller = CBool filter = Any """ Filter function, which must be a generator, such as for example: .. code-block:: python def filter(queue): while True: line = queue.get() if line == 'EOF': break yield line or a simple line-by-line filter:: def filter(line): return processed(line) """ _simple = CBool _stop = CBool _queue = Any(transient=True) _process = Any(transient=True) def cmd_loop(self): p = self._process = subprocess.Popen(self.cmd, shell=True, executable='bash', stdout=subprocess.PIPE) while True: line = p.stdout.readline().decode('utf-8') self._queue.put(line) if not line: self.logger.debug('Process exiting (cmd_loop)') break def status_loop(self): args = (self,) if self.caller else () def default_filter(queue, *args): while True: line = queue.get() if not line: self.logger.debug('Process exiting (status_loop)') break yield line def simple_filter(line, *args): return line # Let's test if filter is 'simple' or not if self.filter: tst = self.filter('test line') if not isinstance(tst, types.GeneratorType): self._simple = True if self._simple: filter = self.filter or simple_filter while True: line = self._queue.get() if not line: self.logger.debug('Process exiting (status_loop)') break self.status = filter(line, *args) else: filter = self.filter or default_filter for s in filter(self._queue, *args): if self._stop: break self.status = s def setup(self): self._queue = queue.Queue() t1 = threading.Thread(target=self.cmd_loop, name='ShellSensor.cmd_loop %s' % self.name) t1.start() t2 = threading.Thread(target=self.status_loop, name='ShellSensor.status_loop %s' % self.name) t2.start() self.logger.debug('Threads started') def cleanup(self): self._process.terminate() self.logger.debug('Process exiting (cleanup)') self._stop = True
__all__ = get_modules_all(AbstractSensor, locals())