"""API for FHEM homeautomation server, supporting telnet or HTTP/HTTPS connections with authentication and CSRF-token support."""
import datetime
import json
import logging
import re
import socket
import errno
import ssl
import threading
import time
from urllib.parse import quote
from urllib.parse import urlencode
from urllib.request import urlopen
from urllib.error import URLError
from urllib.request import HTTPSHandler
from urllib.request import HTTPPasswordMgrWithDefaultRealm
from urllib.request import HTTPBasicAuthHandler
from urllib.request import build_opener
from urllib.request import install_opener
# needs to be in sync with setup.py and documentation (conf.py, branch gh-pages)
__version__ = "0.7.0"
[docs]
class Fhem:
"""Connects to FHEM via socket communication with optional SSL and password
support"""
[docs]
def __init__(
self,
server,
port=7072,
use_ssl=False,
protocol="telnet",
username="",
password="",
csrf=True,
cafile="",
loglevel=1,
):
"""
Instantiate connector object.
:param server: address of FHEM server
:param port: telnet/http(s) port of server
:param use_ssl: boolean for SSL (TLS) [https as protocol sets use_ssl=True]
:param protocol: 'telnet', 'http' or 'https'
:param username: username for http(s) basicAuth validation
:param password: (global) telnet or http(s) password
:param csrf: (http(s)) use csrf token (FHEM 5.8 and newer), default True
:param cafile: path to public certificate of your root authority, if left empty, https protocol will ignore certificate checks.
:param loglevel: deprecated, will be removed. Please use standard python logging API with logger 'Fhem'.
"""
self.log = logging.getLogger("Fhem")
validprots = ["http", "https", "telnet"]
self.server = server
self.port = port
self.ssl = use_ssl
self.csrf = csrf
self.csrftoken = ""
self.username = username
self.password = password
self.loglevel = loglevel
self.connection = False
self.cafile = cafile
self.nolog = False
self.bsock = None
self.sock = None
self.https_handler = None
self.opener = None
# Set LogLevel
# self.set_loglevel(loglevel)
# Check if protocol is supported
if protocol in validprots:
self.protocol = protocol
else:
self.log.error("Invalid protocol: {}".format(protocol))
# Set authenticication values if#
# the protocol is http(s) or use_ssl is True
if protocol != "telnet":
tmp_protocol = "http"
if (protocol == "https") or (use_ssl is True):
self.ssl = True
tmp_protocol = "https"
self.baseurlauth = "{}://{}:{}/".format(tmp_protocol, server, port)
self.baseurltoken = "{}fhem".format(self.baseurlauth)
self.baseurl = "{}fhem?XHR=1&cmd=".format(self.baseurlauth)
self._install_opener()
[docs]
def connect(self):
"""create socket connection to server (telnet protocol only)"""
if self.protocol == "telnet":
# try:
self.log.debug("Creating socket...")
if self.ssl:
self.bsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
self.sock = context.wrap_socket(self.bsock)
self.log.info(
"Connecting to {}:{} with SSL (TLS)".format(self.server, self.port)
)
else:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.log.info(
"Connecting to {}:{} without SSL".format(self.server, self.port)
)
# except Exception as e:
# self.connection = False
# self.log.error(
# "Failed to create socket to {}:{}: {}".format(self.server, self.port, e)
# )
# return
self.log.debug("pre-connect (no try/except)")
# try:
# self.sock.timeout = 5.0
self.sock.connect((self.server, self.port))
self.log.debug("post-connect")
# except Exception as e:
# self.connection = False
# self.log.error(
# "Failed to connect to {}:{}: {}".format(self.server, self.port, e)
# )
# return
self.connection = True
self.log.info("Connected to {}:{}".format(self.server, self.port))
if self.password != "":
# time.sleep(1.0)
# self.send_cmd("\n")
# prmpt = self._recv_nonblocking(4.0)
prmpt = self.sock.recv(32000)
self.log.debug("auth-prompt: {}".format(prmpt))
self.nolog = True
self.send_cmd(self.password)
self.nolog = False
time.sleep(0.1)
try:
po1 = self.sock.recv(32000)
self.log.debug("auth-repl1: {}".format(po1))
except socket.error:
self.log.error("Failed to recv auth reply")
self.connection = False
return
self.log.info("Auth password sent to {}".format(self.server))
else: # http(s)
if self.csrf:
dat = self.send("")
if dat is not None:
dat = dat.decode("UTF-8")
stp = dat.find("csrf_")
if stp != -1:
token = dat[stp:]
token = token[: token.find("'")]
self.csrftoken = token
self.connection = True
else:
self.log.error(
"CSRF token requested for server that doesn't know CSRF"
)
else:
self.log.error("No valid answer on send when expecting csrf.")
else:
self.connection = True
[docs]
def connected(self):
"""Returns True if socket/http(s) session is connected to server."""
return self.connection
[docs]
def set_loglevel(self, level):
"""Set logging level. [Deprecated, will be removed, use python logging.setLevel]
:param level: 0: critical, 1: errors, 2: info, 3: debug
"""
self.log.warning(
"Deprecation: please set logging levels using python's standard logging for logger 'Fhem'"
)
if level == 0:
self.log.setLevel(logging.CRITICAL)
elif level == 1:
self.log.setLevel(logging.ERROR)
elif level == 2:
self.log.setLevel(logging.INFO)
elif level == 3:
self.log.setLevel(logging.DEBUG)
[docs]
def close(self):
"""Closes socket connection. (telnet only)"""
if self.protocol == "telnet":
if self.connected():
time.sleep(0.2)
self.sock.close()
self.connection = False
self.log.info("Disconnected from fhem-server")
else:
self.log.error("Cannot disconnect, not connected")
else:
self.connection = False
def _install_opener(self):
self.opener = None
self.auth_handler = None
self.password_mgr = None
self.context = None
if self.username != "":
self.password_mgr = HTTPPasswordMgrWithDefaultRealm()
self.password_mgr.add_password(
None, self.baseurlauth, self.username, self.password
)
self.auth_handler = HTTPBasicAuthHandler(self.password_mgr)
if self.ssl is True:
if self.cafile == "":
self.context = ssl.create_default_context()
self.context.check_hostname = False
self.context.verify_mode = ssl.CERT_NONE
else:
self.context = ssl.create_default_context()
self.context.load_verify_locations(cafile=self.cafile)
self.context.verify_mode = ssl.CERT_REQUIRED
self.https_handler = HTTPSHandler(context=self.context)
if self.username != "":
self.opener = build_opener(self.https_handler, self.auth_handler)
else:
self.opener = build_opener(self.https_handler)
else:
if self.username != "":
self.opener = build_opener(self.auth_handler)
# if self.opener is not None:
# self.log.debug("Setting up opener on: {}".format(self.baseurlauth))
# install_opener(self.opener)
[docs]
def send(self, buf, timeout=10):
"""Sends a buffer to server
:param buf: binary buffer"""
if len(buf) > 0:
if not self.connected():
self.log.debug("Not connected, trying to connect...")
self.connect()
if self.protocol == "telnet":
if self.connected():
self.log.debug("Connected, sending...")
try:
self.sock.sendall(buf)
self.log.info("Sent msg, len={}".format(len(buf)))
return None
except OSError as err:
self.log.error(
"Failed to send msg, len={}. Exception raised: {}".format(
len(buf), err
)
)
self.connection = None
return None
else:
self.log.error(
"Failed to send msg, len={}. Not connected.".format(len(buf))
)
return None
else: # HTTP(S)
paramdata = None
# if self.opener is not None:
# install_opener(self.opener)
if self.csrf and len(buf) > 0:
if len(self.csrftoken) == 0:
self.log.error("CSRF token not available!")
self.connection = False
else:
datas = {"fwcsrf": self.csrftoken}
paramdata = urlencode(datas).encode("UTF-8")
# try:
if len(buf) > 0:
self.log.debug("Cmd: {}".format(buf))
cmd = quote(buf)
self.log.debug("Cmd-enc: {}".format(cmd))
else:
cmd = ""
if len(cmd) > 0:
ccmd = self.baseurl + cmd
else:
ccmd = self.baseurltoken
self.log.info("Request: {}".format(ccmd))
if ccmd.lower().startswith("http"):
if self.opener is not None:
ans = self.opener.open(ccmd, paramdata, timeout=timeout)
else:
if self.context is None:
ans = urlopen(ccmd, paramdata, timeout=timeout)
else:
ans = urlopen(
ccmd, paramdata, timeout=timeout, context=self.context
)
else:
self.log.error(
"Invalid URL {}, Failed to send msg, len={}, {}".format(
ccmd, len(buf), err
)
)
return None
data = ans.read()
return data
# except URLError as err:
# self.connection = False
# self.log.error("Failed to send msg, len={}, {}".format(len(buf), err))
# return None
# except socket.timeout as err:
# # Python 2.7 fix
# self.log.error("Failed to send msg, len={}, {}".format(len(buf), err))
# return None
[docs]
def send_cmd(self, msg, timeout=10.0):
"""Sends a command to server.
:param msg: string with FHEM command, e.g. 'set lamp on'
:param timeout: timeout on send (sec).
"""
if not self.connected():
self.connect()
if not self.nolog:
self.log.debug("Sending: {}".format(msg))
if self.protocol == "telnet":
if self.connection:
msg = "{}\n".format(msg)
cmd = msg.encode("utf-8")
return self.send(cmd)
else:
self.log.error(
"Failed to send msg, len={}. Not connected.".format(len(msg))
)
return None
else:
return self.send(msg, timeout=timeout)
def _recv_nonblocking(self, timeout=0.1):
if not self.connected():
self.connect()
data = b""
if self.connection:
self.sock.setblocking(False)
data = b""
try:
data = self.sock.recv(32000)
except socket.error as err:
# Resource temporarily unavailable, operation did not complete are expected
if err.errno != errno.EAGAIN and err.errno != errno.ENOENT:
self.log.debug(
"Exception in non-blocking (1). Error: {}".format(err)
)
time.sleep(timeout)
wok = 1
while len(data) > 0 and wok > 0:
time.sleep(timeout)
datai = b""
try:
datai = self.sock.recv(32000)
if len(datai) == 0:
wok = 0
else:
data += datai
except socket.error as err:
# Resource temporarily unavailable, operation did not complete are expected
if err.errno != errno.EAGAIN and err.errno != errno.ENOENT:
self.log.debug(
"Exception in non-blocking (2). Error: {}".format(err)
)
wok = 0
self.sock.setblocking(True)
return data
[docs]
def send_recv_cmd(self, msg, timeout=0.1, blocking=False):
"""
Sends a command to the server and waits for an immediate reply.
:param msg: FHEM command (e.g. 'set lamp on')
:param timeout: waiting time for reply
:param blocking: (telnet only) on True: use blocking socket communication (bool)
"""
data = b""
if not self.connected():
self.connect()
if self.protocol == "telnet":
if self.connection:
self.send_cmd(msg)
time.sleep(timeout)
data = []
if blocking is True:
try:
# This causes failures if reply is larger!
data = self.sock.recv(64000)
except socket.error:
self.log.error("Failed to recv msg. {}".format(data))
return {}
else:
data = self._recv_nonblocking(timeout)
self.sock.setblocking(True)
else:
self.log.error(
"Failed to send msg, len={}. Not connected.".format(len(msg))
)
else:
data = self.send_cmd(msg)
if data is None:
return None
if len(data) == 0:
return {}
try:
sdata = data.decode("utf-8")
jdata = json.loads(sdata)
except Exception as err:
self.log.error(
"Failed to decode json, exception raised. {} {}".format(data, err)
)
return {}
if len(jdata["Results"]) == 0:
self.log.error("Query had no result.")
return {}
else:
self.log.info("JSON answer received.")
return jdata
def get_dev_state(self, dev, timeout=0.1):
self.log.warning(
"Deprecation: use get_device('device') instead of get_dev_state"
)
return self.get_device(dev, timeout=timeout, raw_result=True)
def get_dev_reading(self, dev, reading, timeout=0.1):
self.log.warning(
"Deprecation: use get_device_reading('device', 'reading') instead of get_dev_reading"
)
return self.get_device_reading(dev, reading, value_only=True, timeout=timeout)
def getDevReadings(self, dev, reading, timeout=0.1):
self.log.warning(
"Deprecation: use get_device_reading('device', ['reading']) instead of getDevReadings"
)
return self.get_device_reading(
dev, timeout=timeout, value_only=True, raw_result=True
)
def get_dev_readings(self, dev, readings, timeout=0.1):
self.log.warning(
"Deprecation: use get_device_reading('device', ['reading']) instead of get_dev_readings"
)
return self.get_device_reading(
dev, readings, timeout=timeout, value_only=True, raw_result=True
)
def get_dev_reading_time(self, dev, reading, timeout=0.1):
self.log.warning(
"Deprecation: use get_device_reading('device', 'reading', time_only=True) instead of get_dev_reading_time"
)
return self.get_device_reading(dev, reading, timeout=timeout, time_only=True)
def get_dev_readings_time(self, dev, readings, timeout=0.1):
self.log.warning(
"Deprecation: use get_device_reading('device', ['reading'], time_only=True) instead of get_dev_reading_time"
)
return self.get_device_reading(dev, readings, timeout=timeout, time_only=True)
def getFhemState(self, timeout=0.1):
self.log.warning(
"Deprecation: use get() without parameters instead of getFhemState"
)
return self.get(timeout=timeout, raw_result=True)
def get_fhem_state(self, timeout=0.1):
self.log.warning(
"Deprecation: use get() without parameters instead of get_fhem_state"
)
return self.get(timeout=timeout, raw_result=True)
@staticmethod
def _sand_down(value):
return value if len(value.values()) - 1 else list(value.values())[0]
@staticmethod
def _append_filter(name, value, compare, string, filter_list):
value_list = [value] if isinstance(value, str) else value
values = ",".join(value_list)
filter_list.append(string.format(name, compare, values))
def _response_filter(self, response, arg, value, value_only=None, time_only=None):
if len(arg) > 2:
self.log.error("Too many positional arguments")
return {}
result = {}
for r in (
response if "totalResultsReturned" not in response else response["Results"]
):
arg = [arg[0]] if len(arg) and isinstance(arg[0], str) else arg
if value_only:
result[r["Name"]] = {
k: v["Value"]
for k, v in r[value].items()
if "Value" in v and (not len(arg) or (len(arg) and k == arg[0]))
} # k in arg[0]))} fixes #14
elif time_only:
result[r["Name"]] = {
k: v["Time"]
for k, v in r[value].items()
if "Time" in v and (not len(arg) or (len(arg) and k == arg[0]))
} # k in arg[0]))}
else:
result[r["Name"]] = {
k: v
for k, v in r[value].items()
if (not len(arg) or (len(arg) and k == arg[0]))
} # k in arg[0]))}
if not result[r["Name"]]:
result.pop(r["Name"], None)
elif len(result[r["Name"]].values()) == 1:
result[r["Name"]] = list(result[r["Name"]].values())[0]
return result
def _parse_filters(self, name, value, not_value, filter_list, case_sensitive):
compare = "=" if case_sensitive else "~"
if value:
self._append_filter(name, value, compare, "{}{}{}", filter_list)
elif not_value:
self._append_filter(name, not_value, compare, "{}!{}{}", filter_list)
def _convert_data(self, response, k, v):
try:
test_type = unicode
except NameError:
test_type = str
if isinstance(v, test_type):
if re.findall("^[0-9]+$", v):
response[k] = int(v)
elif re.findall(r"^[0-9]+\.[0-9]+$", v):
response[k] = float(v)
elif re.findall(
"^[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}$", v
):
response[k] = datetime.datetime.strptime(v, "%Y-%m-%d %H:%M:%S")
if isinstance(v, dict):
self._parse_data_types(response[k])
if isinstance(v, list):
self._parse_data_types(response[k])
def _parse_data_types(self, response):
if isinstance(response, dict):
for k, v in response.items():
self._convert_data(response, k, v)
if isinstance(response, list):
for i, v in enumerate(response):
self._convert_data(response, i, v)
[docs]
def get(
self,
name=None,
state=None,
group=None,
room=None,
device_type=None,
not_name=None,
not_state=None,
not_group=None,
not_room=None,
not_device_type=None,
case_sensitive=None,
filters=None,
timeout=0.1,
blocking=False,
raw_result=None,
):
"""
Get FHEM data of devices, can filter by parameters or custom defined filters.
All filters use regular expressions (except full match), so don't forget escaping.
Filters can be used by all other get functions.
For more information about filters, see https://FHEM.de/commandref.html#devspec
:param name: str or list, device name in FHEM
:param state: str or list, state in FHEM
:param group: str or list, filter FHEM groups
:param room: str or list, filter FHEM room
:param device_type: str or list, FHEM device type
:param not_name: not name
:param not_state: not state
:param not_group: not group
:param not_room: not room
:param not_device_type: not device_type
:param case_sensitive: bool, use case_sensitivity for all filter functions
:param filters: dict of filters - key=attribute/internal/reading, value=regex for value, e.g. {"battery": "ok"}
:param raw_result: On True: Don't convert to python types and send full FHEM response
:param timeout: timeout for reply
:param blocking: telnet socket mode, default blocking=False
:return: dict of FHEM devices
"""
if not self.connected():
self.connect()
if self.connected():
filter_list = []
self._parse_filters("NAME", name, not_name, filter_list, case_sensitive)
self._parse_filters("STATE", state, not_state, filter_list, case_sensitive)
self._parse_filters("group", group, not_group, filter_list, case_sensitive)
self._parse_filters("room", room, not_room, filter_list, case_sensitive)
self._parse_filters(
"TYPE", device_type, not_device_type, filter_list, case_sensitive
)
if filters:
for key, value in filters.items():
filter_list.append(
"{}{}{}".format(key, "=" if case_sensitive else "~", value)
)
cmd = "jsonlist2 {}".format(":FILTER=".join(filter_list))
if self.protocol == "telnet":
result = self.send_recv_cmd(cmd, blocking=blocking, timeout=timeout)
else:
result = self.send_recv_cmd(cmd, blocking=False, timeout=timeout)
if not result or raw_result:
return result
result = result["Results"]
self._parse_data_types(result)
return result
else:
self.log.error("Failed to get fhem state. Not connected.")
return {}
[docs]
def get_states(self, **kwargs):
"""
Return only device states, can use filters from get().
:param kwargs: Use keyword arguments from :py:meth:`Fhem.get` function
:return: dict of FHEM devices with states
"""
response = self.get(**kwargs)
if not response:
return response
return {
r["Name"]: r["Readings"]["state"]["Value"]
for r in response
if "state" in r["Readings"]
}
[docs]
def get_readings(self, *arg, **kwargs):
"""
Return readings of a device, can use filters from get().
:param arg: str, Get only a specified reading, return all readings of device when parameter not given
:param value_only: return only value of reading, not timestamp
:param time_only: return only timestamp of reading
:param kwargs: use keyword arguments from :py:meth:`Fhem.get` function
:return: dict of FHEM devices with readings
"""
value_only = kwargs["value_only"] if "value_only" in kwargs else None
time_only = kwargs["time_only"] if "time_only" in kwargs else None
kwargs.pop("value_only", None)
kwargs.pop("time_only", None)
response = self.get(**kwargs)
return self._response_filter(
response, arg, "Readings", value_only=value_only, time_only=time_only
)
[docs]
def get_attributes(self, *arg, **kwargs):
"""
Return attributes of a device, can use filters from get()
:param arg: str, Get only specified attribute, return all attributes of device when parameter not given
:param kwargs: use keyword arguments from :py:meth:`Fhem.get` function
:return: dict of FHEM devices with attributes
"""
response = self.get(**kwargs)
return self._response_filter(response, arg, "Attributes")
[docs]
def get_internals(self, *arg, **kwargs):
"""
Return internals of a device, can use filters from get()
:param arg: str, Get only specified internal, return all internals of device when parameter not given
:param kwargs: use keyword arguments from :py:meth:`Fhem.get` function
:return: dict of FHEM devices with internals
"""
response = self.get(**kwargs)
return self._response_filter(response, arg, "Internals")
[docs]
def get_device(self, device, **kwargs):
"""
Get all data from a device
:param device: str or list,
:param kwargs: use keyword arguments from :py:meth:`Fhem.get` function
:return: dict with data of specific FHEM device
"""
return self.get(name=device, **kwargs)
[docs]
def get_device_state(self, device, **kwargs):
"""
Get state of one device
:param device: str or list,
:param kwargs: use keyword arguments from :py:meth:`Fhem.get` and :py:meth:`Fhem.get_states` functions
:return: str, int, float when only specific value requested else dict
"""
result = self.get_states(name=device, **kwargs)
return self._sand_down(result)
[docs]
def get_device_reading(self, device, *arg, **kwargs):
"""
Get reading(s) of one device
:param device: str or list,
:param arg: str for one reading, list for special readings, empty for all readings
:param kwargs: use keyword arguments from :py:meth:`Fhem.get` and :py:meth:`Fhem.get_readings` functions
:return: str, int, float when only specific value requested else dict
"""
result = self.get_readings(*arg, name=device, **kwargs)
return self._sand_down(result)
[docs]
def get_device_attribute(self, device, *arg, **kwargs):
"""
Get attribute(s) of one device
:param device: str or list,
:param arg: str for one attribute, list for special attributes, empty for all attributes
:param kwargs: use keyword arguments from :py:meth:`Fhem.get` function
:return: str, int, float when only specific value requested else dict
"""
result = self.get_attributes(*arg, name=device, **kwargs)
return self._sand_down(result)
[docs]
def get_device_internal(self, device, *arg, **kwargs):
"""
Get internal(s) of one device
:param device: str or list,
:param arg: str for one internal value, list for special internal values, empty for all internal values
:param kwargs: use keyword arguments from :py:meth:`Fhem.get` function
:return: str, int, float when only specific value requested else dict
"""
result = self.get_internals(*arg, name=device, **kwargs)
return self._sand_down(result)
[docs]
class FhemEventQueue:
"""Creates a thread that listens to FHEM events and dispatches them to
a Python queue."""
[docs]
def __init__(
self,
server,
que,
port=7072,
protocol="telnet",
use_ssl=False,
username="",
password="",
csrf=True,
cafile="",
filterlist=None,
timeout=0.1,
eventtimeout=60,
serverregex=None,
loglevel=1,
raw_value=False,
):
"""
Construct an event queue object, FHEM events will be queued into the queue given at initialization.
:param server: FHEM server address
:param que: Python Queue object, receives FHEM events as dictionaries
:param port: FHEM telnet port
:param protocol: 'telnet', 'http' or 'https'. NOTE: for FhemEventQueue, currently only 'telnet' is supported!
:param use_ssl: boolean for SSL (TLS)
:param username: http(s) basicAuth username
:param password: (global) telnet password or http(s) basicAuth password
:param csrf: (http(s)) use csrf token (FHEM 5.8 and newer), default True (currently not used, since telnet-only)
:param cafile: path to public certificate of your root authority, if left empty, https protocol will ignore certificate checks.
:param filterlist: array of filter dictionaires [{"dev"="lamp1"}, {"dev"="livingtemp", "reading"="temperature"}]. A filter dictionary can contain devstate (type of FHEM device), dev (FHEM device name) and/or reading conditions. The filterlist works on client side.
:param timeout: internal timeout for socket receive (should be short)
:param eventtimeout: larger timeout for server keep-alive messages
:param serverregex: FHEM regex to restrict event messages on server side.
:param loglevel: deprecated, will be removed. Use standard python logging function for logger 'FhemEventQueue', old: 0: no log, 1: errors, 2: info, 3: debug
:param raw_value: default False. On True, the value of a reading is not parsed for units, and returned as-is.
"""
# self.set_loglevel(loglevel)
self.log = logging.getLogger("FhemEventQueue")
self.informcmd = "inform timer"
self.timeout = timeout
if serverregex is not None:
self.informcmd += " " + serverregex
if protocol != "telnet":
self.log.error("ONLY TELNET is currently supported for EventQueue")
return
self.fhem = Fhem(
server=server,
port=port,
use_ssl=use_ssl,
username=username,
password=password,
cafile=cafile,
loglevel=loglevel,
)
self.fhem.connect()
time.sleep(timeout)
self.EventThread = threading.Thread(
target=self._event_worker_thread,
args=(que, filterlist, timeout, eventtimeout, raw_value),
)
self.EventThread.setDaemon(True)
self.EventThread.start()
[docs]
def set_loglevel(self, level):
"""
Set logging level, [Deprecated, will be removed, use python's logging.setLevel]
:param level: 0: critical, 1: errors, 2: info, 3: debug
"""
self.log.warning(
"Deprecation: please set logging levels using python's standard logging for logger 'FhemEventQueue'"
)
if level == 0:
self.log.setLevel(logging.CRITICAL)
elif level == 1:
self.log.setLevel(logging.ERROR)
elif level == 2:
self.log.setLevel(logging.INFO)
elif level == 3:
self.log.setLevel(logging.DEBUG)
def _event_worker_thread(
self, que, filterlist, timeout=0.1, eventtimeout=120, raw_value=False
):
self.log.debug("FhemEventQueue worker thread starting...")
if self.fhem.connected() is not True:
self.log.warning("EventQueueThread: Fhem is not connected!")
time.sleep(timeout)
self.fhem.send_cmd(self.informcmd)
data = ""
first = True
lastreceive = time.time()
self.eventThreadActive = True
while self.eventThreadActive is True:
while self.fhem.connected() is not True:
self.fhem.connect()
if self.fhem.connected():
time.sleep(timeout)
lastreceive = time.time()
self.fhem.send_cmd(self.informcmd)
else:
self.log.warning(
"Fhem is not connected in EventQueue thread, retrying!"
)
time.sleep(5.0)
if first is True:
first = False
self.log.debug("FhemEventQueue worker thread active.")
time.sleep(timeout)
if time.time() - lastreceive > eventtimeout:
self.log.debug("Event-timeout, refreshing INFORM TIMER")
self.fhem.send_cmd(self.informcmd)
if self.fhem.connected() is True:
lastreceive = time.time()
if self.fhem.connected() is True:
data = self.fhem._recv_nonblocking(timeout)
lines = data.decode("utf-8").split("\n")
for l in lines:
if len(l) > 0:
lastreceive = time.time()
li = l.split(" ")
if len(li) > 4:
dd = li[0].split("-")
tt = li[1].split(":")
try:
if "." in tt[2]:
secs = float(tt[2])
tt[2] = str(int(secs))
tt.append(str(int((secs - int(secs)) * 1000000)))
except Exception as e:
self.log.warning(
"EventQueue: us-Bugfix failed with {}".format(e)
)
try:
if len(tt) == 3:
dt = datetime.datetime(
int(dd[0]),
int(dd[1]),
int(dd[2]),
int(tt[0]),
int(tt[1]),
int(tt[2]),
)
else:
dt = datetime.datetime(
int(dd[0]),
int(dd[1]),
int(dd[2]),
int(tt[0]),
int(tt[1]),
int(tt[2]),
int(tt[3]),
)
except Exception as e:
self.log.debug(
"EventQueue: invalid date format in date={} time={}, event {} ignored: {}".format(
li[0], li[1], l, e
)
)
continue
devtype = li[2]
dev = li[3]
val = ""
for i in range(4, len(li)):
val += li[i]
if i < len(li) - 1:
val += " "
full_val = val
vl = val.split(" ")
val = ""
unit = ""
if len(vl) > 0:
if len(vl[0]) > 0 and vl[0][-1] == ":":
read = vl[0][:-1]
if len(vl) > 1:
val = vl[1]
if len(vl) > 2:
unit = vl[2]
else:
read = "STATE"
if len(vl) > 0:
val = vl[0]
if len(vl) > 1:
unit = vl[1]
adQ = True
if filterlist is not None:
adQ = False
for f in filterlist:
adQt = True
for c in f:
if c == "devtype":
if devtype != f[c]:
adQt = False
if c == "device":
if dev != f[c]:
adQt = False
if c == "reading":
if read != f[c]:
adQt = False
if adQt:
adQ = True
if adQ:
if raw_value is False:
ev = {
"timestamp": dt,
"devicetype": devtype,
"device": dev,
"reading": read,
"value": val,
"unit": unit,
}
else:
ev = {
"timestamp": dt,
"devicetype": devtype,
"device": dev,
"reading": read,
"value": full_val,
"unit": None,
}
que.put(ev)
# self.log.debug("Event queued for {}".format(ev['device']))
time.sleep(timeout)
self.fhem.close()
self.log.debug("FhemEventQueue worker thread terminated.")
return
[docs]
def close(self):
"""Stop event thread and close socket."""
self.eventThreadActive = False
time.sleep(0.5 + self.timeout)