Source code for automate.system

# -*- coding: utf-8 -*-
# (c) 2015 Tuomas Airaksinen
#
# This file is part of Automate.
#
# Automate is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Automate is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Automate.  If not, see <http://www.gnu.org/licenses/>.
#
# ------------------------------------------------------------------
#
# If you like Automate, please take a look at this page:
# http://evankelista.net/automate/

from __future__ import print_function
from __future__ import absolute_import
from __future__ import unicode_literals
from past.builtins import basestring
from collections import defaultdict

from future import standard_library
from raven.handlers.logging import SentryHandler

standard_library.install_aliases()
from builtins import input
import threading
import operator
import sys
import os
import logging
import pickle
import pkg_resources
import argparse
import raven

from traits.api import (CStr, Instance, CBool, CList, Property, CInt, CUnicode, Event, CSet, Str, cached_property,
                        on_trait_change)

from .common import (SystemBase, ExitException, has_baseclass, Object)
from .namespace import Namespace
from .service import AbstractService, AbstractUserService, AbstractSystemService
from .statusobject import AbstractSensor, AbstractActuator
from .systemobject import SystemObject
from .worker import StatusWorkerThread
from .callable import AbstractCallable
from . import __version__

import sys

if sys.version_info >= (3, 0):
    TimerClass = threading.Timer
else:
    TimerClass = threading._Timer

def get_autoload_services():
    import automate.services
    return (i for i in list(automate.services.__dict__.values()) if has_baseclass(i, AbstractService) and i.autoload)


def get_service_by_name(name):
    import automate.services
    return getattr(automate.services, name)


[docs]class System(SystemBase): #: Name of the system (shown in WEB UI for example) name = CStr #: Allow referencing objects by their names in Callables. If disabled, you can still refer to objects by names #: by Object('name') allow_name_referencing = CBool(True) #: Filename to where to dump the system state filename = Str # LOGGING ########### #: Name of the file where logs are stored logfile = CUnicode #: Log level for the handler that writes to stdout print_level = CInt(logging.INFO, transient=True) @on_trait_change('print_level', post_init=True) def print_level_changed(self, new): self.print_handler.setLevel(new) #: Reference to logger instance (read-only) logger = Instance(logging.Logger) #: Sentry: Raven DSN configuration (see http://sentry.io) raven_dsn = Str #: Raven client (is created automatically if raven_dsn is set and this is left empty) raven_client = Instance(raven.Client, transient=True) #: Format string of the log handler that writes to stdout log_format = Str('%(asctime)s %(log_color)s%(name)s%(reset)s %(message)s') #: Format string of the log handler that writes to logfile logfile_format = Str('%(process)d:%(threadName)s:%(name)s:%(asctime)s:%(levelname)s:%(message)s') #: Log level of the handler that writes to logfile log_level = CInt(logging.DEBUG, transient=True) @on_trait_change('log_level', post_init=True) def log_level_changed(self, new): if not self.logfile: self.logger.error('No logfile specified') return self.log_handler.setLevel(new) # SERVICES ########### #: Add here services that you want to be added automatically. This is meant to be re-defined in subclass. default_services = CList(trait=Str) #: List of services that are loaded in the initialization of the System. services = CList(trait=Instance(AbstractService)) #: List of servicenames that are desired to be avoided (even if normally autoloaded). exclude_services = CSet(trait=Str) #: Reference to the worker thread (read-only) worker_thread = Instance(StatusWorkerThread, transient=True) #: System namespace (read-only) namespace = Instance(Namespace) # Set of all SystemObjects within the system. This is where SystemObjects are ultimately stored # in the System initialization. (read-only) objects = CSet(trait=SystemObject) #: Property giving objects sorted alphabetically (read-only) objects_sorted = Property(depends_on='objects') @cached_property def _get_objects_sorted(self): return sorted(list(self.objects), key=operator.attrgetter('_order')) #: Read-only property giving all sensors of the system sensors = Property(depends_on='objects[]') @cached_property def _get_sensors(self): return {i for i in self.objects_sorted if isinstance(i, AbstractSensor)} #: Read-only property giving all actuator of the system actuators = Property(depends_on='objects[]') @cached_property def _get_actuators(self): return {i for i in self.objects_sorted if isinstance(i, AbstractActuator)} #: Read-only property giving all objects that have program features in use programs = Property(depends_on='objects[]') @cached_property def _get_programs(self): from .program import Program, DefaultProgram return {i for i in self.objects_sorted if isinstance(i, (Program, DefaultProgram))} #: Read-only property giving all :class:`~program.Program` objects ordinary_programs = Property(depends_on='programs[]') @cached_property def _get_ordinary_programs(self): from . import program return {i for i in self.programs if isinstance(i, program.Program)} #: Start worker thread automatically after system is initialized worker_autostart = CBool(True) #: Trigger which is triggered after initialization is ready (used by Services) post_init_trigger = Event #: Trigger which is triggered before quiting (used by Services) pre_exit_trigger = Event #: Read-only property that gives list of all object tags all_tags = Property(depends_on='objects.tags[]') #: Number of state backup files num_state_backups = CInt(5) @cached_property def _get_all_tags(self): newset = set([]) for i in self.system.objects: for j in i.tags: if j: newset.add(j) return newset #: Enable experimental two-phase queue handling technique (not recommended) two_phase_queue = CBool(False) @classmethod
[docs] def load_or_create(cls, filename=None, no_input=False, **kwargs): """ Load system from a dump, if dump file exists, or create a new system if it does not exist. """ parser = argparse.ArgumentParser() parser.add_argument('--no_input', action='store_true') args = parser.parse_args() if args.no_input: print('Parameter --no_input was given') no_input = True def savefile_more_recent(): time_savefile = os.path.getmtime(filename) time_program = os.path.getmtime(sys.argv[0]) return time_savefile > time_program def load(): print('Loading %s' % filename) file = open(filename, 'rb') state = pickle.load(file) file.close() system = System(loadstate=state, filename=filename, **kwargs) return system def create(): print('Creating new system') return cls(filename=filename, **kwargs) if filename and os.path.isfile(filename): if savefile_more_recent(): return load() else: if no_input: print('Program file more recent. Loading that instead.') return create() while True: answer = input('Program file more recent. Do you want to load it? (y/n) ') if answer == 'y': return create() elif answer == 'n': return load() else: return create()
[docs] def save_state(self): """ Save state of the system to a dump file :attr:`System.filename` """ if not self.filename: self.logger.error('Filename not specified. Could not save state') return self.logger.debug('Saving system state to %s', self.filename) for i in reversed(range(self.num_state_backups)): fname = self.filename if i == 0 else '%s.%d' % (self.filename, i) new_fname = '%s.%d' % (self.filename, i+1) try: os.rename(fname, new_fname) except FileNotFoundError: pass with open(self.filename, 'wb') as file, self.worker_thread.queue.mutex: pickle.dump((list(self.objects)), file, pickle.HIGHEST_PROTOCOL)
@property def cmd_namespace(self): """ A read-only property that gives the namespace of the system for evaluating commands. """ import automate ns = dict(list(automate.__dict__.items()) + list(self.namespace.items())) return ns def __getattr__(self, item): if self.namespace and item in self.namespace: return self.namespace[item] raise AttributeError
[docs] def get_unique_name(self, obj, name='', name_from_system=''): """ Give unique name for an Sensor/Program/Actuator object """ ns = self.namespace newname = name if not newname: newname = name_from_system if not newname: newname = u"Nameless_" + obj.__class__.__name__ if not newname in ns: return newname counter = 0 while True: newname1 = u"%s_%.2d" % (newname, counter) if not newname1 in ns: return newname1 counter += 1
@property def services_by_name(self): """ A property that gives a dictionary that contains services as values and their names as keys. """ srvs = defaultdict(list) for i in self.services: srvs[i.__class__.__name__].append(i) return srvs @property def service_names(self): """ A property that gives the names of services as a list """ return set(self.services_by_name.keys())
[docs] def flush(self): """ Flush the worker queue. Usefull in unit tests. """ self.worker_thread.flush()
[docs] def name_to_system_object(self, name): """ Give SystemObject instance corresponding to the name """ if isinstance(name, basestring): if self.allow_name_referencing: name = name else: raise NameError('System.allow_name_referencing is set to False, cannot convert string to name') elif isinstance(name, Object): name = str(name) return self.namespace.get(name, None)
[docs] def eval_in_system_namespace(self, exec_str): """ Get Callable for specified string (for GUI-based editing) """ ns = self.cmd_namespace try: return eval(exec_str, ns) except Exception as e: self.logger.warning('Could not execute %s, gave error %s', exec_str, e) return None
[docs] def register_service_functions(self, *funcs): """ Register function in the system namespace. Called by Services. """ for func in funcs: self.namespace[func.__name__] = func
[docs] def register_service(self, service): """ Register service into the system. Called by Services. """ if service not in self.services: self.services.append(service)
[docs] def request_service(self, type, id=0): """ Used by Sensors/Actuators/other services that need to use other services for their operations. """ srvs = self.services_by_name.get(type) if not srvs: return ser = srvs[id] if not ser.system: ser.setup_system(self) return ser
[docs] def cleanup(self): """ Clean up before quitting """ self.pre_exit_trigger = True self.logger.info("Shutting down %s, please wait a moment.", self.name) for t in threading.enumerate(): if isinstance(t, TimerClass): t.cancel() self.logger.debug('Timers cancelled') for i in self.objects: i.cleanup() del i self.logger.debug('Sensors etc cleanups done') for ser in (i for i in self.services if isinstance(i, AbstractUserService)): ser.cleanup_system() self.logger.debug('User services cleaned up') self.worker_thread.stop() self.logger.debug('Worker thread really stopped') for ser in (i for i in self.services if isinstance(i, AbstractSystemService)): ser.cleanup_system() self.logger.debug('System services cleaned up')
[docs] def cmd_exec(self, cmd): """ Execute commands in automate namespace """ if not cmd: return ns = self.cmd_namespace import copy rval = True nscopy = copy.copy(ns) try: r = eval(cmd, ns) if isinstance(r, SystemObject) and not r.system: r.setup_system(self) if callable(r): r = r() cmd += "()" self.logger.info("Eval: %s", cmd) self.logger.info("Result: %s", r) except SyntaxError: r = {} try: exec (cmd, ns) self.logger.info("Exec: %s", cmd) except ExitException: raise except Exception as e: self.logger.info("Failed to exec cmd %s: %s.", cmd, e) rval = False for key, value in list(ns.items()): if key not in nscopy or not value is nscopy[key]: if key in self.namespace: del self.namespace[key] self.namespace[key] = value r[key] = value self.logger.info("Set items in namespace: %s", r) except ExitException: raise except Exception as e: self.logger.info("Failed to eval cmd %s: %s", cmd, e) return False return rval
def __init__(self, loadstate=None, **traits): super(System, self).__init__(**traits) if not self.name: self.name = os.path.split(sys.argv[0])[-1].replace('.py', '') self.worker_thread = StatusWorkerThread(name="Status worker thread", system=self) # Initialize Sentry / raven client, if is configured if not self.raven_client and self.raven_dsn: self.raven_client = raven.Client(self.raven_dsn, release=__version__, tags={'automate-system': self.name}) self._initialize_logging() self.logger.info('Initializing services') self._initialize_services() self.logger.info('Initializing namespace') self._initialize_namespace(loadstate) self.logger.info('Initialize user services') self._setup_user_services() if self.worker_autostart: self.logger.info('Starting worker thread') self.worker_thread.start() self.post_init_trigger = True def _initialize_logging(self): root_logger = logging.getLogger('automate') self.logger = root_logger.getChild(self.name) # Check if root level logging has been set up externally. if len(root_logger.handlers) > 0: root_logger.info('Logging has been configured already, ' 'skipping logging configuration') return root_logger.propagate = False root_logger.setLevel(logging.DEBUG) if self.raven_client: sentry_handler = SentryHandler(client=self.raven_client, level=logging.ERROR) root_logger.addHandler(sentry_handler) if self.logfile: formatter = logging.Formatter(fmt=self.logfile_format) log_handler = logging.FileHandler(self.logfile) log_handler.setLevel(self.log_level) log_handler.setFormatter(formatter) root_logger.addHandler(log_handler) stream_handler = logging.StreamHandler() stream_handler.setLevel(self.print_level) from colorlog import ColoredFormatter, default_log_colors colors = default_log_colors.copy() colors['DEBUG'] = 'purple' stream_handler.setFormatter(ColoredFormatter(self.log_format, datefmt='%H:%M:%S', log_colors=colors)) root_logger.addHandler(stream_handler) self.logger.info('Logging setup ready') def _initialize_namespace(self, loadstate=None): self.namespace = Namespace(system=self) self.namespace.set_system(loadstate) self.logger.info('Setup loggers per object') for k, v in list(self.namespace.items()): if isinstance(v, SystemObject): ctype = v.__class__.__name__ v.logger = self.logger.getChild('%s.%s' % (ctype, k)) def _initialize_services(self): # Add default_services, if not already for servname in self.default_services: if servname not in self.service_names | self.exclude_services: self.services.append(get_service_by_name(servname)()) # Add autorun services if not already for servclass in get_autoload_services(): if servclass.__name__ not in self.service_names | self.exclude_services: self.services.append(servclass()) def _setup_user_services(self): for ser in (i for i in self.services if isinstance(i, AbstractUserService)): self.logger.info('...%s', ser.__class__.__name__) ser.setup_system(self)
# Load extensions from . import services, sensors, actuators, callables print('Loading extensions') for entry_point in pkg_resources.iter_entry_points('automate.extension'): print('Trying to load extension %s' % entry_point) try: ext_classes = entry_point.load(require=False) except ImportError: print('Loading extension %s failed. Perhaps missing requirements? Skipping.' % entry_point) continue for ext_class in ext_classes: print('... %s' % ext_class.__name__) if issubclass(ext_class, AbstractService): setattr(services, ext_class.__name__, ext_class) elif issubclass(ext_class, AbstractSensor): setattr(sensors, ext_class.__name__, ext_class) elif issubclass(ext_class, AbstractActuator): setattr(actuators, ext_class.__name__, ext_class) elif issubclass(ext_class, AbstractCallable): setattr(callables, ext_class.__name__, ext_class)