Skip to content

Commit

Permalink
WIP: Vacuum support for new cli
Browse files Browse the repository at this point in the history
  • Loading branch information
yawor committed Jan 28, 2018
1 parent 64a314d commit 6ca0790
Showing 1 changed file with 130 additions and 3 deletions.
133 changes: 130 additions & 3 deletions miio/vacuum.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,22 @@
import logging
import os
import math
import time
from typing import List
import enum
import datetime
import json
import pathlib
import pytz

import click
from appdirs import user_cache_dir
from .vacuumcontainers import (VacuumStatus, ConsumableStatus, DNDStatus,
CleaningSummary, CleaningDetails, Timer,
SoundStatus, SoundInstallStatus)
from .device import Device, DeviceException
from .click_common import (
DeviceGroup, DeviceGroupMeta, device_command, GlobalContextObject
)

_LOGGER = logging.getLogger(__name__)

Expand All @@ -30,45 +37,57 @@ class Consumable(enum.Enum):
SensorDirty = "sensor_dirty_time"


class Vacuum(Device):
class Vacuum(Device, metaclass=DeviceGroupMeta):
"""Main class representing the vacuum."""

def __init__(self, ip: str, token: str = None, start_id: int = 0,
debug: int = 0) -> None:
super().__init__(ip, token, start_id, debug)
self.manual_seqnum = -1

@device_command()
def start(self):
"""Start cleaning."""
return self.send("app_start")

@device_command()
def stop(self):
"""Stop cleaning."""
return self.send("app_stop")

@device_command()
def spot(self):
"""Start spot cleaning."""
return self.send("app_spot")

@device_command()
def pause(self):
"""Pause cleaning."""
return self.send("app_pause")

@device_command()
def home(self):
"""Stop cleaning and return home."""
self.send("app_stop")
return self.send("app_charge")

@device_command()
def manual_start(self):
"""Start manual control mode."""
self.manual_seqnum = 0
return self.send("app_rc_start")

@device_command()
def manual_stop(self):
"""Stop manual control mode."""
self.manual_seqnum = 0
return self.send("app_rc_end")

@device_command(
click.argument("rotation", type=int),
click.argument("velocity", type=float),
click.argument("duration", type=int, required=False, default=1500)
)
def manual_control_once(
self, rotation: int, velocity: float, duration: int=1500):
"""Starts the remote control mode and executes
Expand All @@ -85,6 +104,11 @@ def manual_control_once(
time.sleep(2)
number_of_tries -= 1

@device_command(
click.argument("rotation", type=int),
click.argument("velocity", type=float),
click.argument("duration", type=int, required=False, default=1500)
)
def manual_control(self, rotation: int, velocity: float,
duration: int=1500):
"""Give a command over manual control interface."""
Expand All @@ -103,6 +127,7 @@ def manual_control(self, rotation: int, velocity: float,

self.send("app_rc_move", [params])

@device_command()
def status(self) -> VacuumStatus:
"""Return status of the vacuum."""
return VacuumStatus(self.send("get_status")[0])
Expand All @@ -111,27 +136,37 @@ def enable_log_upload(self):
raise NotImplementedError("unknown parameters")
# return self.send("enable_log_upload")

@device_command()
def log_upload_status(self):
# {"result": [{"log_upload_status": 7}], "id": 1}
return self.send("get_log_upload_status")

@device_command()
def consumable_status(self) -> ConsumableStatus:
"""Return information about consumables."""
return ConsumableStatus(self.send("get_consumable")[0])

@device_command(
click.argument("consumable", type=Consumable),
)
def consumable_reset(self, consumable: Consumable):
"""Reset consumable information."""
return self.send("reset_consumable", [consumable.value])

@device_command()
def map(self):
"""Return map token."""
# returns ['retry'] without internet
return self.send("get_map_v1")

@device_command()
def clean_history(self) -> CleaningSummary:
"""Return generic cleaning history."""
return CleaningSummary(self.send("get_clean_summary"))

@device_command(
click.argument("id_", type=int, metavar="ID"),
)
def clean_details(self, id_: int) -> List[CleaningDetails]:
"""Return details about specific cleaning."""
details = self.send("get_clean_record", [id_])
Expand All @@ -142,10 +177,12 @@ def clean_details(self, id_: int) -> List[CleaningDetails]:

return res

@device_command()
def find(self):
"""Find the robot."""
return self.send("find_me", [""])

@device_command()
def timer(self) -> List[Timer]:
"""Return a list of timers."""
timers = list()
Expand All @@ -154,6 +191,11 @@ def timer(self) -> List[Timer]:

return timers

@device_command(
click.argument("cron"),
click.argument("command", required=False, default=""),
click.argument("parameters", required=False, default=""),
)
def add_timer(self, cron: str, command: str, parameters: str):
"""Add a timer.
Expand All @@ -166,12 +208,19 @@ def add_timer(self, cron: str, command: str, parameters: str):
[str(ts), [cron, [command, parameters]]]
])

@device_command(
click.argument("timer_id", type=int),
)
def delete_timer(self, timer_id: int):
"""Delete a timer with given ID.
:param int timer_id: Timer ID"""
return self.send("del_timer", [str(timer_id)])

@device_command(
click.argument("timer_id", type=int),
click.argument("mode", type=TimerState),
)
def update_timer(self, timer_id: int, mode: TimerState):
"""Update a timer with given ID.
Expand All @@ -181,12 +230,19 @@ def update_timer(self, timer_id: int, mode: TimerState):
raise DeviceException("Only 'On' or 'Off' are allowed")
return self.send("upd_timer", [str(timer_id), mode.value])

@device_command()
def dnd_status(self):
"""Returns do-not-disturb status."""
# {'result': [{'enabled': 1, 'start_minute': 0, 'end_minute': 0,
# 'start_hour': 22, 'end_hour': 8}], 'id': 1}
return DNDStatus(self.send("get_dnd_timer")[0])

@device_command(
click.argument("start_hr", type=int),
click.argument("start_min", type=int),
click.argument("end_hr", type=int),
click.argument("end_min", type=int),
)
def set_dnd(self, start_hr: int, start_min: int,
end_hr: int, end_min: int):
"""Set do-not-disturb.
Expand All @@ -198,25 +254,36 @@ def set_dnd(self, start_hr: int, start_min: int,
return self.send("set_dnd_timer",
[start_hr, start_min, end_hr, end_min])

@device_command()
def disable_dnd(self):
"""Disable do-not-disturb."""
return self.send("close_dnd_timer", [""])

@device_command(
click.argument("speed", type=int),
)
def set_fan_speed(self, speed: int):
"""Set fan speed.
:param int speed: Fan speed to set"""
# speed = [38, 60 or 77]
return self.send("set_custom_mode", [speed])

@device_command()
def fan_speed(self):
"""Return fan speed."""
return self.send("get_custom_mode")[0]

@device_command()
def sound_info(self):
"""Get voice settings."""
return SoundStatus(self.send("get_current_sound")[0])

@device_command(
click.argument("url"),
click.argument("md5sum"),
click.argument("sound_id", type=int),
)
def install_sound(self, url: str, md5sum: str, sound_id: int):
"""Install sound from the given url."""
payload = {
Expand All @@ -226,29 +293,38 @@ def install_sound(self, url: str, md5sum: str, sound_id: int):
}
return SoundInstallStatus(self.send("dnld_install_sound", payload)[0])

@device_command()
def sound_install_progress(self):
"""Get sound installation progress."""
return SoundInstallStatus(self.send("get_sound_progress")[0])

@device_command()
def sound_volume(self) -> int:
"""Get sound volume."""
return self.send("get_sound_volume")[0]

@device_command(
click.argument("vol", type=int),
)
def set_sound_volume(self, vol: int):
"""Set sound volume [0-100]."""
return self.send("change_sound_volume", [vol])

@device_command()
def test_sound_volume(self):
"""Test current sound volume."""
return self.send("test_sound_volume")

@device_command()
def serial_number(self):
"""Get serial number."""
return self.send("get_serial_number")[0]["serial_number"]

@device_command()
def timezone(self):
"""Get the timezone."""
return self.send("get_timezone")[0]
# return self.send("get_timezone")[0]
return None

def set_timezone(self, new_zone):
"""Set the timezone."""
Expand All @@ -268,3 +344,54 @@ def configure_wifi(self, ssid, password, uid=0, timezone=None):
def raw_command(self, cmd, params):
"""Send a raw command to the robot."""
return self.send(cmd, params)

@classmethod
def get_device_group(cls):

@click.pass_context
def callback(ctx, *args, id_file, **kwargs):
gco = ctx.find_object(GlobalContextObject)
if gco:
kwargs['debug'] = gco.debug

start_id = manual_seq = 0
try:
with open(id_file, 'r') as f:
x = json.load(f)
start_id = x.get("seq", 0)
manual_seq = x.get("manual_seq", 0)
_LOGGER.debug("Read stored sequence ids: %s", x)
except (FileNotFoundError, TypeError, ValueError):
pass

ctx.obj = cls(*args, start_id=start_id, **kwargs)
ctx.obj.manual_seqnum = manual_seq

dg = DeviceGroup(cls, params=DeviceGroup.DEFAULT_PARAMS + [
click.Option(
['--id-file'], type=click.Path(dir_okay=False, writable=True),
default=os.path.join(
user_cache_dir('python-miio'),
'python-mirobo.seq'
)
),
], callback=callback)

@dg.resultcallback()
@dg.device_pass
def cleanup(vac: Vacuum, **kwargs):
if vac.ip is None: # dummy Device for discovery, skip teardown
return
id_file = kwargs['id_file']
seqs = {'seq': vac.raw_id, 'manual_seq': vac.manual_seqnum}
_LOGGER.debug("Writing %s to %s", seqs, id_file)
path_obj = pathlib.Path(id_file)
cache_dir = path_obj.parents[0]
try:
cache_dir.mkdir(parents=True)
except FileExistsError:
pass # after dropping py3.4 support, use exist_ok for mkdir
with open(id_file, 'w') as f:
json.dump(seqs, f)

return dg

0 comments on commit 6ca0790

Please sign in to comment.