"""
ChessLink transport implementation for Bluetooth LE connections using `bluepy`.
"""
import logging
import threading
import queue
import time
import os
import chess_link_protocol as clp
try:
import bluepy
from bluepy.btle import Scanner, DefaultDelegate, Peripheral
bluepy_ble_support = True
except:
bluepy_ble_support = False
[docs]class Transport():
"""
ChessLink transport implementation for Bluetooth LE connections using `bluepy`.
This class does automatic hardware detection of any ChessLink board using bluetooth LE
and supports Linux and Raspberry Pi.
This transport uses an asynchronous background thread for hardware communcation.
All replies are written to the python queue `que` given during initialization.
For the details of the Chess Link protocol, please refer to:
`magic-link.md <https://github.com/domschl/python-mchess/blob/master/mchess/magic-board.md>`_.
"""
[docs] def __init__(self, que, protocol_dbg=False):
"""
Initialize with python queue for event handling.
Events are strings conforming to the ChessLink protocol as documented in
`magic-link.md <https://github.com/domschl/python-mchess/blob/master/mchess/magic-board.md>`_.
:param que: Python queue that will eceive events from chess board.
:param protocol_dbg: True: byte-level ChessLink protocol debug messages
"""
if bluepy_ble_support == False:
self.init = False
return
self.wrque = queue.Queue()
self.log = logging.getLogger("ChessLinkBluePy")
self.que = que # asyncio.Queue()
self.init = True
self.log.debug("bluepy_ble init ok")
self.protocol_debug = protocol_dbg
self.bp_path=os.path.dirname(os.path.abspath(bluepy.__file__))
self.bp_helper=os.path.join(self.bp_path,'bluepy-helper')
if not os.path.exists(self.bp_helper):
self.log.warning(f'Unexpected: {self.bp_helper} does not exists!')
self.fix_cmd="sudo setcap 'cap_net_raw,cap_net_admin+eip' "+self.bp_helper
[docs] def quit(self):
"""
Initiate worker-thread stop
"""
self.worker_thread_active = False
[docs] def search_board(self, iface=0):
"""
Search for ChessLink connections using Bluetooth LE.
:param iface: interface number of bluetooth adapter, default 1.
:returns: Bluetooth address of ChessLink board, or None on failure.
"""
self.log.debug("bluepy_ble: searching for boards")
class ScanDelegate(DefaultDelegate):
def __init__(self, log):
self.log = log
DefaultDelegate.__init__(self)
def handleDiscovery(self, dev, isNewDev, isNewData):
if isNewDev:
self.log.debug("Discovered device {}".format(dev.addr))
elif isNewData:
self.log.debug(
"Received new data from {}".format(dev.addr))
scanner = Scanner(iface=iface).withDelegate(ScanDelegate(self.log))
try:
devices = scanner.scan(20.0)
except Exception as e:
self.log.error(f"BLE scanning failed. {e}")
self.log.error(f"excecute: {self.fix_cmd}")
self.log.error(f"or (if that fails) start ONCE with: `sudo python mchess.py` (fix ownership of chess_link_config.json afterwards)")
return None
devs = sorted(devices, key=lambda x: x.rssi, reverse=True)
print("Sorted:")
for b in devs:
self.log.debug('sorted by rssi {} {}'.format(b.addr, b.rssi))
for bledev in devs:
self.log.debug("Device {} ({}), RSSI={} dB".format(
bledev.addr, bledev.addrType, bledev.rssi))
for (adtype, desc, value) in bledev.getScanData():
self.log.debug(" {} ({}) = {}".format(desc, adtype, value))
if desc == "Complete Local Name":
if "MILLENNIUM CHESS" in value:
self.log.info(
"Autodetected Millennium Chess Link board at Bluetooth LE address: {}, signal strength (rssi): {}".format(
bledev.addr, bledev.rssi))
return bledev.addr
return None
[docs] def test_board(self, address):
"""
Test dummy.
:returns: Version string "1.0" always.
"""
# self.open_mt(address)
return "1.0"
[docs] def open_mt(self, address):
"""
Open a bluetooth LE connection to ChessLink board.
:param address: bluetooth address
:returns: True on success.
"""
self.log.debug('Starting worker-thread for bluepy ble')
self.worker_thread_active = True
self.worker_threader = threading.Thread(
target=self.worker_thread, args=(self.log, address, self.wrque, self.que))
self.worker_threader.setDaemon(True)
self.worker_threader.start()
timer = time.time()
self.conn_state = None
while self.conn_state is None and time.time()-timer < 5.0:
time.sleep(0.1)
if self.conn_state is None:
return False
return self.conn_state
[docs] def write_mt(self, msg):
"""
Encode and asynchronously write a message to ChessLink.
:param msg: Message string. Parity will be added, and block CRC appended.
"""
if self.protocol_debug is True:
self.log.debug('write-que-entry {}'.format(msg))
self.wrque.put(msg)
[docs] def get_name(self):
"""
Get name of this transport.
:returns: 'chess_link_bluepy'
"""
return "chess_link_bluepy"
[docs] def is_init(self):
"""
Check, if hardware connection is up.
:returns: True on success.
"""
return self.init
def agent_state(self, que, state, msg):
que.put('agent-state: '+state + ' ' + msg)
def mil_open(self, address, mil, que, log):
class PeriDelegate(DefaultDelegate):
def __init__(self, log, que):
self.log = log
self.log.debug("Init delegate for peri")
self.chunks = ""
DefaultDelegate.__init__(self)
def handleNotification(self, cHandle, data):
self.log.debug(
"BLE: Handle: {}, data: {}".format(cHandle, data))
rcv = ""
for b in data:
rcv += chr(b & 127)
self.log.debug('BLE received [{}]'.format(rcv))
self.chunks += rcv
if self.chunks[0] not in clp.protocol_replies:
self.log.warning(
"Illegal reply start '{}' received, discarding".format(self.chunks[0]))
while len(self.chunks) > 0 and self.chunks[0] not in clp.protocol_replies:
self.chunks = self.chunks[1:]
if len(self.chunks) > 0:
mlen = clp.protocol_replies[self.chunks[0]]
if len(self.chunks) >= mlen:
valmsg = self.chunks[:mlen]
self.log.debug(
'bluepy_ble received complete msg: {}'.format(valmsg))
if clp.check_block_crc(valmsg):
que.put(valmsg)
self.chunks = self.chunks[mlen:]
rx = None
tx = None
log.debug('Peripheral generated {}'.format(address))
try:
services = mil.getServices()
except Exception as e:
emsg = 'Failed to enumerate services for {}, {}'.format(address, e)
log.error(emsg)
self.agent_state(que, 'offline', emsg)
return None, None
# time.sleep(0.1)
log.debug("services: {}".format(len(services)))
for ser in services:
log.debug('Service: {}'.format(ser))
chrs = ser.getCharacteristics()
for chri in chrs:
if chri.uuid == "49535343-1e4d-4bd9-ba61-23c647249616": # TX char, rx for us
rx = chri
rxh = chri.getHandle()
# Enable notification magic:
log.debug('Enabling notifications')
mil.writeCharacteristic(
rxh+1, (1).to_bytes(2, byteorder='little'))
if chri.uuid == "49535343-8841-43f4-a8d4-ecbe34729bb3": # RX char, tx for us
tx = chri
# txh = chri.getHandle()
if chri.supportsRead():
log.debug(" {} UUID={} {} -> {}".format(chri, chri.uuid,
chri.propertiesToString(), chri.read()))
else:
log.debug(" {} UUID={}{}".format(
chri, chri.uuid, chri.propertiesToString()))
try:
log.debug('Installing peripheral delegate')
delegate = PeriDelegate(log, que)
delegate.que = que
mil.withDelegate(delegate)
except Exception as e:
emsg = 'Bluetooth LE: Failed to install peripheral delegate! {}'.format(
e)
log.error(emsg)
self.agent_state(que, 'offline', emsg)
return None, None
self.agent_state(que, 'online', 'Connected to ChessLink board via BLE')
return (rx, tx)
[docs] def worker_thread(self, log, address, wrque, que):
"""
Background thread that handles bluetooth sending and forwards data received via
bluetooth to the queue `que`.
"""
mil = None
message_delta_time = 0.1 # least 0.1 sec between outgoing btle messages
rx = None
tx = None
log.debug("bluepy_ble open_mt {}".format(address))
# time.sleep(0.1)
try:
log.debug("per1")
mil = Peripheral(address)
log.debug("per2")
except Exception as e:
log.debug("per3")
emsg = 'Failed to create BLE peripheral at {}, {}'.format(
address, e)
log.error(emsg)
self.agent_state(que, 'offline', '{}'.format(e))
self.conn_state = False
return
rx, tx = self.mil_open(address, mil, que, log)
time_last_out = time.time()+0.2
if rx is None or tx is None:
bt_error = True
self.conn_state = False
else:
bt_error = False
self.conn_state = True
while self.worker_thread_active is True:
rep_err = False
while bt_error is True:
time.sleep(1)
bt_error = False
self.init = False
try:
mil.connect(address)
except Exception as e:
if rep_err is False:
self.log.warning(
"Reconnect failed: {} [Local bluetooth problem?]".format(e))
rep_err = True
bt_error = True
if bt_error is False:
self.log.info(
"Bluetooth reconnected to {}".format(address))
rx, tx = self.mil_open(address, mil, que, log)
time_last_out = time.time()+0.2
self.init = True
if wrque.empty() is False and time.time()-time_last_out > message_delta_time:
msg = wrque.get()
gpar = 0
for b in msg:
gpar = gpar ^ ord(b)
msg = msg+clp.hex2(gpar)
if self.protocol_debug is True:
log.debug("blue_ble write: <{}>".format(msg))
bts = ""
for c in msg:
bo = chr(clp.add_odd_par(c))
bts += bo
btsx = bts.encode('latin1')
if self.protocol_debug is True:
log.debug("Sending: <{}>".format(btsx))
try:
tx.write(btsx, withResponse=True)
time_last_out = time.time()
except Exception as e:
log.error(
"bluepy_ble: failed to write {}: {}".format(msg, e))
bt_error = True
self.agent_state(
que, 'offline', 'Connected to Bluetooth peripheral lost: {}'.format(e))
wrque.task_done()
try:
rx.read()
mil.waitForNotifications(0.05)
# time.sleep(0.1)
except Exception as e:
self.log.warning("Bluetooth error {}".format(e))
bt_error = True
self.agent_state(
que, 'offline', 'Connected to Bluetooth peripheral lost: {}'.format(e))
continue
log.debug('wt-end')