Source code for automate.extensions.arduino.arduino_service

# -*- coding: utf-8 -*-
# (c) 2015 Tuomas Airaksinen
#
# This file is part of automate-arduino.
#
# automate-arduino is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# automate-arduino is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with automate-arduino.  If not, see <http://www.gnu.org/licenses/>.

from __future__ import unicode_literals
import os
import threading
import logging
from collections import namedtuple

from builtins import range

from automate import Lock
from traits.api import HasTraits, Any, Dict, CList, Str, Int, List
from automate.service import AbstractSystemService

logger = logging.getLogger('automate.arduino_service')

PinTuple = namedtuple('PinTuple', ['type', 'pin'])

def patch_pyfirmata():
    """ Patch Pin class in pyfirmata to have Traits. Particularly, we need notification
        for value changes in Pins. """

    import pyfirmata
    if getattr(pyfirmata, 'patched', False):
        return

    PinOld = pyfirmata.Pin

    class Pin(PinOld, HasTraits):
        mode = property(PinOld._get_mode, PinOld._set_mode)

        def __init__(self, *args, **kwargs):
            HasTraits.__init__(self)
            self.add_trait("value", Any)
            PinOld.__init__(self, *args, **kwargs)

    import pyfirmata.pyfirmata

    pyfirmata.Pin = Pin
    pyfirmata.pyfirmata.Pin = Pin

    from pyfirmata.util import Iterator as OldIterator

    class FixedPyFirmataIterator(OldIterator):

        def run(iter_self):
            try:
                super(FixedPyFirmataIterator, iter_self).run()
            except Exception as e:
                logger.error('Exception %s occurred in Pyfirmata iterator, quitting now', e)
                logger.error('threads: %s', threading.enumerate())

    pyfirmata.util.Iterator.Fixed = FixedPyFirmataIterator
    pyfirmata.patched = True


[docs]class ArduinoService(AbstractSystemService): """ Service that provides interface to Arduino devices via `pyFirmata library <https://github.com/tino/pyFirmata>`_. """ #: Arduino devices to use, as a list arduino_devs = CList(Str, ["/dev/ttyUSB0"]) #: Arduino device board types, as a list of strings. Choices are defined by pyFirmata board #: class names, i.e. allowed values are "Arduino", "ArduinoMega", "ArduinoDue". arduino_dev_types = CList(Str, ["Arduino"]) #: Arduino device sampling rates, as a list (in milliseconds). arduino_dev_sampling = CList(Int, [500]) _sens_analog = Dict _sens_digital = Dict _act_digital = Dict _boards = List _locks = List _iterator_thread = Any class FileNotReadableError(Exception): pass def setup(self): self.logger.debug("Initializing Arduino subsystem") try: import pyfirmata except ImportError: self.logger.error("Please install pyfirmata if you want to use Arduino interface") return patch_pyfirmata() from pyfirmata.util import Iterator, to_two_bytes # Initialize configured self.boards ard_devs = self.arduino_devs ard_types = self.arduino_dev_types samplerates = self.arduino_dev_sampling assert len(ard_devs) == len(ard_types) == len(samplerates), 'Arduino configuration invalid!' for i in range(len(ard_devs)): try: if not os.access(ard_devs[i], os.R_OK): raise self.FileNotReadableError cls = getattr(pyfirmata, ard_types[i]) board = cls(ard_devs[i]) board.send_sysex(pyfirmata.SAMPLING_INTERVAL, to_two_bytes(samplerates[i])) self._iterator_thread = it = Iterator.Fixed(board) it.daemon = True it.name = "PyFirmata thread for {dev}".format(dev=ard_devs[i]) board._iter = it it.start() self._boards.append(board) except (self.FileNotReadableError, OSError) as e: if isinstance(e, self.FileNotReadableError) or e.errno == os.errno.ENOENT: self.logger.warning('Your arduino device %s is not available. Arduino will be mocked.', ard_devs[i]) self._boards.append(None) else: raise e self._locks.append(Lock()) def cleanup(self): self.logger.debug("Cleaning up Arduino subsystem. ") while self._boards: board = self._boards.pop() if board: board.exit() if self._iterator_thread and self._iterator_thread.is_alive(): self._iterator_thread.board = None self._iterator_thread.join() self._iterator_thread = None def reload(self): digital_sensors = list(self._sens_digital.items()) analog_sensors = list(self._sens_analog.items()) digital_actuators = list(self._act_digital.items()) for (dev, pin_nr), (_type, pin) in digital_actuators: self.cleanup_digital_actuator(dev, pin_nr) for (dev, pin_nr), (sens, pin) in digital_sensors: self.unsubscribe_digital(dev, pin_nr) for (dev, pin_nr), (sens, pin) in analog_sensors: self.unsubscribe_analog(dev, pin_nr) super(ArduinoService, self).reload() # Restore subscriptions for (dev, pin_nr), (sens, pin) in digital_sensors: self.subscribe_digital(dev, pin_nr, sens) for (dev, pin_nr), (sens, pin) in analog_sensors: self.subscribe_analog(dev, pin_nr, sens) for (dev, pin_nr), (_type, pin) in digital_actuators: setup_func = {'p': self.setup_pwm, 'd': self.setup_digital}.get(_type) if setup_func: setup_func(dev, pin_nr) #TODO: servo reload! def setup_digital(self, dev, pin_nr): if not self._boards[dev]: return with self._locks[dev]: pin = self._boards[dev].get_pin("d:{pin}:o".format(pin=pin_nr)) self._act_digital[(dev, pin_nr)] = PinTuple('o', pin) def setup_pwm(self, dev, pin_nr): if not self._boards[dev]: return with self._locks[dev]: pin = self._boards[dev].get_pin("d:{pin}:p".format(pin=pin_nr)) self._act_digital[(dev, pin_nr)] = PinTuple('p', pin) def setup_servo(self, dev, pin_nr, min_pulse, max_pulse, angle): if not self._boards[dev]: return with self._locks[dev]: pin = self._boards[dev].get_pin("d:{pin}:s".format(pin=pin_nr)) self._act_digital[(dev, pin_nr)] = PinTuple('s', pin) self._boards[dev].servo_config(pin_nr, min_pulse, max_pulse, angle)
[docs] def change_digital(self, dev, pin_nr, value): """ Change digital Pin value (boolean). Also PWM supported(float)""" if not self._boards[dev]: return with self._locks[dev]: self._act_digital[(dev, pin_nr)].pin.write(value)
# Functions for input signals def handle_analog(self, obj, name, old, new): dev = obj.__dev_id pin = obj.pin_number if not self._boards[dev]: return self._sens_analog[(dev, pin)][0].set_status(new) def handle_digital(self, obj, name, old, new): dev = obj.__dev_id pin = obj.pin_number if not self._boards[dev]: return self._sens_digital[(dev, pin)][0].set_status(new) def subscribe_analog(self, dev, pin_nr, sens): if not self._boards[dev]: return with self._locks[dev]: pin = self._boards[dev].get_pin("a:{pin}:i".format(pin=pin_nr)) pin.__dev_id = dev self._sens_analog[(dev, pin_nr)] = (sens, pin) s = pin.read() if s is not None: sens.set_status(s) pin.on_trait_change(self.handle_analog, "value") def cleanup_digital_actuator(self, dev, pin_nr): pin = self._act_digital.pop((dev, pin_nr), None) def unsubscribe_digital(self, dev, pin_nr): pin = self._sens_digital.pop((dev, pin_nr), None) if pin: pin[1].remove_trait('value') def unsubscribe_analog(self, dev, pin_nr): pin = self._sens_analog.pop((dev, pin_nr), None) if pin: pin[1].remove_trait('value') def subscribe_digital(self, dev, pin_nr, sens): if not self._boards[dev]: return with self._locks[dev]: pin = self._boards[dev].get_pin("d:{pin}:i".format(pin=pin_nr)) pin.__dev_id = dev self._sens_digital[(dev, pin_nr)] = (sens, pin) s = pin.read() if s is not None: sens.set_status(s) pin.on_trait_change(self.handle_digital, "value")