Source code for landroidcc

import json
import logging
import os
import tempfile
import uuid
import base64
from collections import namedtuple
from threading import Event

import OpenSSL
import paho.mqtt.client as mqtt
import requests

logging.basicConfig(format='%(asctime)s %(module)-8s %(funcName)-10s %(levelname)-8s %(message)s',
                    level=logging.INFO,
                    datefmt='%Y-%m-%d %H:%M:%S')
log = logging.getLogger(__name__)


[docs]class Landroid(object): _username = None _api_user = None _mqtt_endpoint = None _mqtt_topic_in = None _mqtt_topic_out = None _api_product_items = None _api_boards = None _api_products = None _mower_product = None # holds the current mower from _api_products _api_certificate = None _status = None # type: LandroidStatus _mqtt_client = None _cachedir = None _cache = {} def __init__(self): """ Class to communicate with the Landroid cloud using REST for user information and MQTT for getting status updates and send them to the mower """ self._statuscallback = None self._eventmessage = Event() self._eventconnect = Event()
[docs] def connect(self, username, password): """ Connect to the cloud with the given credentials. :param username: Username for the cloud login :param password: Password for the login :return: None """ self._username = username self._cachedir = os.path.join(tempfile.gettempdir(), "landroidcc", self._username) self._initcache() self._api_authentificate(username, password) self._api_user = self._apicall_rest("users/me") self._mqtt_endpoint = self._api_user["mqtt_endpoint"] self._api_product_items = self._apicall_rest("product-items") self._mqtt_topic_out = self._api_product_items[0]["mqtt_topics"]["command_out"] self._mqtt_topic_in = self._api_product_items[0]["mqtt_topics"]["command_in"] self._api_boards = self._apicall_rest("boards") self._api_products = self._apicall_rest("products") product_id = self._api_product_items[0]["product_id"] for product in self._api_products: if product["id"] == product_id: self._mower_product = product break self._api_certificate = self._apicall_rest("users/certificate") self._writecache() self._connectmqtt()
[docs] def disconnect(self): """ Disconnects from the cloud :return: None """ self._mqtt_client.disconnect()
[docs] def start(self): """ Sent the mower the command to start mowing :return: None """ self._send_command('{"cmd": 1}') log.info("Command sent: Start Mowing")
[docs] def pause(self): """ Sent the mower the command to pause mowing :return: None """ self._send_command('{"cmd": 2}') log.info("Command sent: Pause Mowing")
[docs] def go_home(self): """ Sent the mower the command to go home mowing :return: None """ self._send_command('{"cmd": 3}') log.info("Command sent: Go Home")
def _send_command(self, cmd): self._mqtt_client.publish(self._mqtt_topic_in, cmd) def _connectmqtt(self): # Callback for connect def on_connect(client, userdata, flags, rc): log.debug("MQTT onnected with result code " + str(rc)) client.subscribe(self._mqtt_topic_out) log.info("Successfully connected to the cloud") self._eventconnect.set() # The callback for when a PUBLISH message is received from the server. def on_message(client, userdata, msg): log.debug("MQTT Msg Received: " + msg.topic + " " + str(msg.payload)) payload = msg.payload.decode('utf-8') if isinstance(msg.payload, (bytes, bytearray)) else msg.payload status = LandroidStatus(payload) self._status = status self._eventmessage.set() if self._statuscallback: self._statuscallback(status) def on_log(client, userdata, level, buf): log.debug("MQTT Library Log: {}".format(buf)) try: pkcs12 = base64.decodebytes(self._api_certificate["pkcs12"].encode()) except AttributeError: pkcs12 = base64.decodestring(self._api_certificate["pkcs12"].encode()) p12 = OpenSSL.crypto.load_pkcs12(pkcs12) pem_filename = os.path.join(self._cachedir, "auth.pem") with open(pem_filename, "wb") as f_pem: f_pem.write(OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, p12.get_privatekey())) f_pem.write(OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, p12.get_certificate())) ca = p12.get_ca_certificates() if ca is not None: for cert in ca: f_pem.write(OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, cert)) self._mqtt_client = mqtt.Client(client_id="android-" + str(uuid.uuid4()), userdata=self) self._mqtt_client.tls_set(certfile=pem_filename, keyfile=pem_filename) self._mqtt_client.on_connect = on_connect self._mqtt_client.on_message = on_message self._mqtt_client.on_log = on_log self._mqtt_client.connect(self._mqtt_endpoint, 8883, keepalive=300) self._mqtt_client.loop_start() self._eventconnect.wait(30) return self._status
[docs] def set_statuscallback(self, func): """ Sets a callback function which will be called for any status update from the mower:: def callback(status): # type: (LandroidStatus) -> None print (status) landroid = Landroid() landroid.connect("", "") landroid.set_statuscallback(callback) :param func: The callback :return: None """ self._statuscallback = func
[docs] def get_status(self, refresh=True): """ Returns the last retrieved status from the mower. If refresh is True an update is requested from the mower and the call will block until an update is received. Once connected the status will automatically updated once the mower sent an automatic update message. This happens every 2-15 minutes and for all state changes. :param refresh: Force an update or only return the cached last status :rtype: LandroidStatus :return: The status of the mower. """ if refresh: if not self._apicall_mqtt("{}"): log.warning("Timeout while trying to get a new status") return self._status
def _apicall_mqtt(self, content, blocking=True): log.debug("MQTT call with: '{}'".format(content)) self._eventmessage.clear() self._mqtt_client.publish(self._mqtt_topic_in, content) result = True if blocking: result = self._eventmessage.wait(10) log.debug("MQTT call finished") return result def _initcache(self): cachedir = os.path.join(self._cachedir, self._username) cachefilename = os.path.join(cachedir, "cache.json") if not os.path.isfile(cachefilename): return with open(cachefilename) as fptr: try: self._cache = json.load(fptr) return except ValueError as ve: log.debug("Failed to parse cache file: {}".format(ve)) def _writecache(self): cachedir = os.path.join(self._cachedir, self._username) if not os.path.isdir(cachedir): os.makedirs(cachedir) with open(os.path.join(cachedir, "cache.json"), "w") as fptr: json.dump(self._cache, fptr) def _apicall_rest(self, url, postdata=None, set_headers=True, allow_cached=True): if allow_cached and not postdata and url in self._cache: log.debug("API Call form Cache: '{}': {}".format(url, self._cache[url])) return self._cache[url] headers = None if set_headers: headers = {"Content-Type": "application/json", "Authorization": self._accessTokenType + " " + self._accessToken} if postdata: response_plain = requests.post('https://api.worxlandroid.com/api/v2/' + url, data=postdata, headers=headers) else: response_plain = requests.get('https://api.worxlandroid.com/api/v2/' + url, headers=headers) response_plain.raise_for_status() response = response_plain.json() log.debug("API Call '{}': {}".format(url, response)) self._cache[url] = response return response def _api_authentificate(self, username, password): post_json = { "username": username, "password": password, "grant_type": "password", "client_id": 1, "type": "app", "client_secret": "nCH3A0WvMYn66vGorjSrnGZ2YtjQWDiCvjg7jNxK", "scope": "*" } response = self._apicall_rest('oauth/token', postdata=post_json, set_headers=False) self._accessToken = response["access_token"] self._accessTokenType = response["token_type"] log.info("Successfully logged in") def __str__(self): if not self._api_user: return "API not connected" return "landroid info\n" \ "#############\n" \ "Name: {name}\n" \ "Serial: {serial}\n" \ "Type: {code}\n".format(name=self._api_product_items[0]["name"], serial=self._api_product_items[0]["serial_number"], code=self._mower_product["code"])
[docs]class LandroidStatus(object): BatteryStatus = namedtuple("BatteryStatus", "percent,charges,volts,temperature,charging") Orientation = namedtuple("Orientation", "heading,pitch,roll") Statistics = namedtuple("Statistics", "distance,running,mowing") _lastStateDict = { 0: "Idle", 1: "Home", 2: "Start sequence", 3: "Leaving home", 4: "Follow wire", 5: "Searching home", 6: "Searching wire", 7: "Mowing", 8: "Lifted", 9: "Trapped", 10: "Blade blocked", 11: "Debug", 12: "Remote control", 30: "Going home", 32: "Border Cut", 33: "Searching zone", 34: "Pause" } _lastErrorDict = { 0: "No error", 1: "Trapped", 2: "Lifted", 3: "Wire missing", 4: "Outside wire", 5: "Raining", 6: "Close door to mow", 7: "Close door to go home", 8: "Blade motor blocked", 9: "Wheel motor blocked", 10: "Trapped timeout", 11: "Upside down", 12: "Battery low", 13: "Reverse wire", 14: "Charge error", 15: "Timeout finding home" } def __init__(self, inputraw): self._battery = None self._orientation = None self._statistics = None self._error = None self._state = None self._updated = None self._raw = inputraw # Raw string as received from the mower self._updatestatus(inputraw)
[docs] def get_battery(self): """ :return: :rtype: BatteryStatus """ return self._battery
[docs] def get_orientation(self): return self._orientation
[docs] def get_statistics(self): return self._statistics
[docs] def get_updated(self): return self._updated
[docs] def get_error(self): """ Returns the error as string. If there is no error, "No Error" is returned :return: The error as text :rtype: str """ return self._error
[docs] def get_state(self): """ Returns the state as string :return: The state as text :rtype: str """ return self._state
[docs] def get_raw(self): """ Returns the status update as received directly from MQTT/mower. :return: Raw status message :rtype: dict """ return self._raw
def _updatestatus(self, inputraw): self._raw = inputraw api_response = json.loads(inputraw) self._battery = self.BatteryStatus(api_response["dat"]["bt"]["p"], api_response["dat"]["bt"]["nr"], api_response["dat"]["bt"]["v"], api_response["dat"]["bt"]["t"], api_response["dat"]["bt"]["c"] != "0") self._orientation = self.Orientation(api_response["dat"]["dmp"][2], api_response["dat"]["dmp"][0], api_response["dat"]["dmp"][1]) self._statistics = self.Statistics(api_response["dat"]["st"]["d"], api_response["dat"]["st"]["wt"], api_response["dat"]["st"]["b"]) self._state = self._lastStateDict[api_response["dat"]["ls"]] self._error = self._lastErrorDict[api_response["dat"]["le"]] self._updated = api_response["cfg"]["tm"] + " " + api_response["cfg"]["dt"] def __str__(self): return "landroid status\n" \ "###############\n" \ "LastUpdate: {updated}\n" \ "State: {state}\n" \ "Error: {error}\n" \ "Battery: {percent}%/{temp}C/{voltage}v" \ "".format(updated=self._updated, state=self._state, error=self._error, percent=self._battery.percent, temp=self._battery.temperature, voltage=self._battery.volts)