Source code for chess_link_usb

"""
ChessLink transport implementation for USB connections.
"""
import logging
import threading
import queue
import time

import chess_link_protocol as clp

try:
    import serial
    import serial.tools.list_ports
    usb_support = True
except:
    usb_support = False


[docs]class Transport(): """ ChessLink transport implementation for USB connections. This class does automatic hardware detection of any ChessLink board connected via USB and support Linux, macOS and Windows. This transport uses an asynchronous background thread for hardware communcation. All replies are written to the python queue `que` given during initialization. """
[docs] def __init__(self, que, protocol_dbg=False): """ Initialize with python queue for event handling. Events are strings conforming to the ChessLink protocol as documented in `magic-link.md <https://github.com/domschl/python-mchess/blob/master/mchess/magic-board.md>`_. :param que: Python queue that will eceive events from chess board. :param protocol_dbg: True: byte-level ChessLink protocol debug messages """ self.log = logging.getLogger("ChessLinkUSB") if usb_support == False: self.log.error( 'Cannot communicate: PySerial module not installed.') self.init = False return self.que = que # asyncio.Queue() self.init = True self.log.debug("USB init ok") self.protocol_debug = protocol_dbg self.last_agent_state = None
[docs] def quit(self): """ Initiate worker-thread stop """ self.thread_active = False
[docs] def search_board(self, iface=None): """ Search for ChessLink connections on all USB ports. :param iface: not used for USB. :returns: Name of the port with a ChessLink board, None on failure. """ self.log.info("Searching for ChessLink boards...") self.log.info( 'Note: search can be disabled in < chess_link_config.json > by setting {"autodetect": false}') port = None ports = self.usb_port_search() if len(ports) > 0: if len(ports) > 1: self.log.warning( "Found {} Millennium boards, using first found.".format(len(ports))) port = ports[0] self.log.info( "Autodetected Millennium board at USB port: {}".format(port)) return port
[docs] def test_board(self, port): """ Test an usb port for correct answer on get version command. :returns: Version string on ok, None on failure. """ self.log.debug("Testing port: {}".format(port)) try: self.usb_dev = serial.Serial(port, 38400, timeout=2) self.usb_dev.dtr = 0 self.write_mt("V") version = self.usb_read_synchr(self.usb_dev, 'v', 7) if len(version) != 7: self.usb_dev.close() self.log.debug( "Message length {} instead of 7".format(len(version))) return None if version[0] != 'v': self.log.debug("Unexpected reply {}".format(version)) self.usb_dev.close() return None verstring = '{}.{}'.format( version[1]+version[2], version[3]+version[4]) self.log.debug("Millennium {} at {}".format(verstring, port)) self.usb_dev.close() return verstring except (OSError, serial.SerialException) as e: self.log.debug( 'Board detection on {} resulted in error {}'.format(port, e)) try: self.usb_dev.close() except Exception: pass return None
[docs] def usb_port_check(self, port): """ Check usb port for valid ChessLink connection :returns: True on success, False on failure. """ self.log.debug("Testing port: {}".format(port)) try: s = serial.Serial(port, 38400) s.close() return True except (OSError, serial.SerialException) as e: self.log.debug("Can't open port {}, {}".format(port, e)) return False
[docs] def write_mt(self, msg): """ Encode and write a message to ChessLink. :param msg: Message string. Parity will be added, and block CRC appended. """ msg = clp.add_block_crc(msg) bts = [] for c in msg: bo = clp.add_odd_par(c) bts.append(bo) try: if self.protocol_debug is True: self.log.debug('Trying write <{}>'.format(bts)) self.usb_dev.write(bts) self.usb_dev.flush() except Exception as e: self.log.error("Failed to write {}: {}".format(msg, e)) self.error_state = True return False if self.protocol_debug is True: self.log.debug("Written '{}' as < {} > ok".format(msg, bts)) return True
[docs] def usb_read_synchr(self, usbdev, cmd, num): """ Synchronous reads for initial hardware detection. """ rep = [] start = False while start is False: try: b = chr(ord(usbdev.read()) & 127) except: return [] if b == cmd: rep.append(b) start = True for _ in range(num-1): try: b = chr(ord(usbdev.read()) & 127) rep.append(b) except (Exception) as e: self.log.error("Read error {}".format(e)) break if clp.check_block_crc(rep) is False: return [] return rep
def agent_state(self, que, state, msg): if state != self.last_agent_state: self.last_agent_state = state que.put('agent-state: '+state + ' ' + msg)
[docs] def open_mt(self, port): """ Open an usb port to a connected ChessLink board. :returns: True on success. """ self.uport = port try: self.usb_dev = serial.Serial(port, 38400, timeout=0.1) self.usb_dev.dtr = 0 except Exception as e: emsg = 'USB cannot open port {}, {}'.format(port, e) self.log.error(emsg) self.agent_state(self.que, 'offline', emsg) return False self.log.debug('USB port {} open'.format(port)) self.thread_active = True self.event_thread = threading.Thread( target=self.event_worker_thread, args=(self.que,)) self.event_thread.setDaemon(True) self.event_thread.start() return True
[docs] def event_worker_thread(self, que): """ Background thread that sends data received via usb to the queue `que`. """ self.log.debug('USB worker thread started.') cmd_started = False cmd_size = 0 cmd = "" self.agent_state(self.que, 'online', 'Connected to {}'.format(self.uport)) self.error_state = False posted = False while self.thread_active: while self.error_state is True: time.sleep(1.0) try: self.usb_dev.close() except: pass try: self.usb_dev = serial.Serial( self.uport, 38400, timeout=0.1) self.usb_dev.dtr = 0 self.agent_state(self.que, 'online', 'Reconnected to {}'.format(self.uport)) self.error_state = False posted = False break except Exception as e: if posted is False: emsg = "Failed to reconnected to {}, {}".format( self.uport, e) self.log.warning(emsg) self.agent_state(self.que, 'offline', emsg) posted = True b = "" try: if cmd_started == False: self.usb_dev.timeout = None else: self.usb_dev.timeout = 0.2 by = self.usb_dev.read() if len(by) > 0: b = chr(ord(by) & 127) else: continue except Exception as e: if len(cmd) > 0: self.log.debug( "USB command '{}' interrupted: {}".format(cmd[0], e)) time.sleep(0.1) cmd_started = False cmd_size = 0 cmd = "" self.error_state = True continue if len(b) > 0: if cmd_started is False: if b in clp.protocol_replies: cmd_started = True cmd_size = clp.protocol_replies[b] cmd = b cmd_size -= 1 else: cmd += b cmd_size -= 1 if cmd_size == 0: cmd_started = False cmd_size = 0 if self.protocol_debug is True: self.log.debug("USB received cmd: {}".format(cmd)) if clp.check_block_crc(cmd): que.put(cmd) cmd = ""
[docs] def get_name(self): """ Get name of this transport. :returns: 'chess_link_usb' """ return "chess_link_usb"
[docs] def is_init(self): """ Check, if hardware connection is up. :returns: True on success. """ self.log.debug("Ask for init") return self.init