Skip to content

Commit

Permalink
revert reverted dbus stuff
Browse files Browse the repository at this point in the history
  • Loading branch information
olethanh committed May 2, 2024
1 parent c928c5e commit 6809e34
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 201 deletions.
2 changes: 1 addition & 1 deletion packaging/aleph-vm/DEBIAN/control
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ Version: 0.1.8
Architecture: all
Maintainer: Aleph.im
Description: Aleph.im VM execution engine
Depends: python3,python3-pip,python3-aiohttp,python3-msgpack,python3-aiodns,python3-alembic,python3-sqlalchemy,python3-setproctitle,redis,python3-aioredis,python3-psutil,sudo,acl,curl,systemd-container,squashfs-tools,debootstrap,python3-packaging,python3-cpuinfo,python3-nftables,python3-jsonschema,cloud-image-utils,ndppd,python3-yaml,python3-dotenv,python3-schedule,qemu-system-x86,qemu-utils,python3-systemd,btrfs-progs,nftables,python3-jwcrypto
Depends: python3,python3-pip,python3-aiohttp,python3-msgpack,python3-aiodns,python3-alembic,python3-sqlalchemy,python3-setproctitle,redis,python3-aioredis,python3-psutil,sudo,acl,curl,systemd-container,squashfs-tools,debootstrap,python3-packaging,python3-cpuinfo,python3-nftables,python3-jsonschema,cloud-image-utils,ndppd,python3-yaml,python3-dotenv,python3-schedule,qemu-system-x86,qemu-utils,python3-systemd,python3-dbus,btrfs-progs,nftables,python3-jwcrypto
Section: aleph-im
Priority: Extra
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ dependencies = [
"packaging==23.2",
"jsonschema==4.19.1",
"qmp==0.0.1",
"dbus-fast==1.90.1",
"dbus-python==1.3.2",
"systemd-python==235",
"systemd-python==235",
"superfluid~=0.2.1",
"sqlalchemy[asyncio]>=2.0",
Expand Down
4 changes: 0 additions & 4 deletions src/aleph/vm/orchestrator/supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,6 @@ async def http_not_found(request: web.Request):
web.post("/control/machine/{ref}/stop", operate_stop),
web.post("/control/machine/{ref}/erase", operate_erase),
web.post("/control/machine/{ref}/reboot", operate_reboot),
# web.options(
# "/control/machine/{ref}/{view:.*}",
# allow_cors_on_endpoint,
# ),
# /status APIs are used to check that the VM Orchestrator is running properly
web.get("/status/check/fastapi", status_check_fastapi),
web.get("/status/check/fastapi/legacy", status_check_fastapi_legacy),
Expand Down
4 changes: 0 additions & 4 deletions src/aleph/vm/orchestrator/views/operator.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
"""Endpoint defined here allow for the user launching the VM
they allow rebooting, stopping, erasing and streaming it's log via websocket"""

import logging
from datetime import timedelta

Expand Down
4 changes: 2 additions & 2 deletions src/aleph/vm/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ async def create_a_vm(

# Start VM and snapshots automatically
if execution.persistent:
await self.systemd_manager.enable_and_start(execution.controller_service)
self.systemd_manager.enable_and_start(execution.controller_service)
await execution.wait_for_init()
if execution.is_program and execution.vm:
await execution.vm.load_configuration()
Expand Down Expand Up @@ -191,7 +191,7 @@ async def stop_vm(self, vm_hash: ItemHash) -> Optional[VmExecution]:
async def stop_persistent_execution(self, execution: VmExecution):
"""Stop persistent VMs in the pool."""
assert execution.persistent, "Execution isn't persistent"
await self.systemd_manager.stop_and_disable(execution.controller_service)
self.systemd_manager.stop_and_disable(execution.controller_service)
await execution.stop()

def forget_vm(self, vm_hash: ItemHash) -> None:
Expand Down
229 changes: 40 additions & 189 deletions src/aleph/vm/systemd.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,224 +2,75 @@
async SystemD Manager implementation.
"""

import enum
import logging
from typing import Literal, Optional, Protocol, runtime_checkable

from dbus_fast import BusType, DBusError
from dbus_fast.aio import MessageBus, ProxyObject
import dbus
from dbus import DBusException, SystemBus
from dbus.proxies import Interface

logger = logging.getLogger(__name__)


class UnitFileState(str, enum.Enum):
"""This StrEnum class represents the different possible states of a unit file."""

ENABLED = "enabled"
"""Indicates that a unit file is permanently enabled."""

ENABLED_RUNTIME = "enabled-runtime"
"""Indicates the unit file is only temporarily enabled and will no longer be enabled after a reboot
(that means, it is enabled via /run/ symlinks, rather than /etc/)."""

LINKED = "linked"
"""Indicates that a unit is linked into /etc/ permanently."""

LINKED_RUNTIME = "linked-runtime"
"""Indicates that a unit is linked into /run/ temporarily (until the next reboot)."""

MASKED = "masked"
"""Indicates that the unit file is masked permanently."""

MASKED_RUNTIME = "masked-runtime"
"""Indicates that it is masked in /run/ temporarily (until the next reboot)."""

STATIC = "static"
"""Indicates that the unit is statically enabled, i.e. always enabled and doesn't need to be enabled explicitly."""

DISABLED = "disabled"
"""Indicates that the unit file is not enabled."""

INVALID = "invalid"
"""Indicates that it could not be determined whether the unit file is enabled."""


UnitFileStateLiteral = Literal[
"enabled",
"enabled-runtime",
"linked",
"linked-runtime",
"masked",
"masked-runtime",
"static",
"disabled",
"invalid",
]


class Mode(str, enum.Enum):
REPLACE = "replace"
FAIL = "fail"
ISOLATE = "isolate"
IGNORE_DEPENDENCIES = "ignore-dependencies"
IGNORE_REQUIREMENTS = "ignore-requirements"


class ActiveState(str, enum.Enum):
"""
ActiveState contains a state value that reflects the unit's current status.
"""

ACTIVE = "active"
"""
The unit is active.
"""

RELOADING = "reloading"
"""
The unit is active and reloading its configuration.
"""

INACTIVE = "inactive"
"""
The unit is inactive, previous run was successful or hasn't yet occurred.
"""

FAILED = "failed"
"""
The unit is inactive, previous run was unsuccessful.
"""

ACTIVATING = "activating"
"""
The unit is transitioning from inactive to active state.
"""

DEACTIVATING = "deactivating"
"""
The unit is in the process of deactivation.
"""


ActiveStateLiteral = Literal["active", "reloading", "inactive", "failed", "activating", "deactivating"]


@runtime_checkable
class SystemdProxy(Protocol):
"""ABC for typing.
for description of methodsp
see https://www.freedesktop.org/software/systemd/man/latest/org.freedesktop.systemd1.html#The%20Manager%20Object"""

async def call_enable_unit_files(self, files: list[str], runtime: bool, force: bool): ...

async def call_get_unit_file_state(self, service) -> UnitFileStateLiteral: ...

async def call_start_unit(self, name, mode):
pass

async def call_stop_unit(self, name, mode): ...

async def call_restart_unit(self, name, mode): ...

async def call_disable_unit_files(self, files: list[str], runtime: bool): ...

async def call_get_unit(self, name: str) -> str: ...


@runtime_checkable
class UnitProxy(Protocol):
"""for typing.
for description of methods see
https://www.freedesktop.org/software/systemd/man/latest/org.freedesktop.systemd1.html#Service%20Unit%20Objects"""

async def get_active_state(self) -> ActiveStateLiteral: ...


class SystemDManager:
"""SystemD Manager class.
Used to manage the systemd services on the host on Linux.
"""

bus: Optional[MessageBus]
manager: Optional[SystemdProxy]
bus: SystemBus
manager: Interface

def __init__(self):
pass

async def connect(self):
self.bus = MessageBus(bus_type=BusType.SYSTEM)
await self.bus.connect()
path = "/org/freedesktop/systemd1"
bus_name = "org.freedesktop.systemd1"
introspect = await self.bus.introspect(bus_name, path)
systemd_proxy: ProxyObject = self.bus.get_proxy_object(bus_name, path, introspection=introspect)
interface = systemd_proxy.get_interface("org.freedesktop.systemd1.Manager")
# Check required method are implemented
assert isinstance(interface, SystemdProxy)
self.manager = interface

async def enable(self, service: str) -> None:
assert self.manager, "connect() not called"
await self.manager.call_enable_unit_files([service], False, True)
self.bus = dbus.SystemBus()
systemd = self.bus.get_object("org.freedesktop.systemd1", "/org/freedesktop/systemd1")
self.manager = dbus.Interface(systemd, "org.freedesktop.systemd1.Manager")

def stop_and_disable(self, service: str) -> None:
if self.is_service_active(service):
self.stop(service)
if self.is_service_enabled(service):
self.disable(service)

def enable(self, service: str) -> None:
self.manager.EnableUnitFiles([service], False, True)
logger.debug(f"Enabled {service} service")

async def start(self, service: str) -> None:
assert self.manager, "connect() not called"
await self.manager.call_start_unit(service, Mode.REPLACE)
def start(self, service: str) -> None:
self.manager.StartUnit(service, "replace")
logger.debug(f"Started {service} service")

async def stop(self, service: str) -> None:
assert self.manager, "connect() not called"
await self.manager.call_stop_unit(service, Mode.REPLACE)
def stop(self, service: str) -> None:
self.manager.StopUnit(service, "replace")
logger.debug(f"Stopped {service} service")

async def restart(self, service: str) -> None:
assert self.manager, "connect() not called"
await self.manager.call_restart_unit(service, Mode.REPLACE)
def restart(self, service: str) -> None:
self.manager.RestartUnit(service, "replace")
logger.debug(f"Restarted {service} service")

async def disable(self, service: str) -> None:
assert self.manager, "connect() not called"
await self.manager.call_disable_unit_files([service], False)
def disable(self, service: str) -> None:
self.manager.DisableUnitFiles([service], False)
logger.debug(f"Disabled {service} service")

async def is_service_enabled(self, service: str) -> bool:
assert self.manager, "connect() not called"
def is_service_enabled(self, service: str) -> bool:
try:
state = await self.manager.call_get_unit_file_state(service)
return state == UnitFileState.ENABLED
except DBusError as error:
return self.manager.GetUnitFileState(service) == "enabled"
except DBusException as error:
logger.error(error)
return False

async def is_service_active(self, service: str) -> bool:
assert self.manager, "connect() not called"
assert self.bus, "connect() not called"
def is_service_active(self, service: str) -> bool:
try:
path = await self.manager.call_get_unit(service)
bus_name = "org.freedesktop.systemd1"
introspect = await self.bus.introspect(bus_name, path)
systemd_service = self.bus.get_proxy_object(bus_name, path, introspection=introspect)
unit = systemd_service.get_interface("org.freedesktop.systemd1.Unit")
# Check required method are implemented
assert isinstance(unit, UnitProxy)
active_state = await unit.get_active_state()
return active_state == ActiveState.ACTIVE
except DBusError as error:
systemd_service = self.bus.get_object("org.freedesktop.systemd1", object_path=self.manager.GetUnit(service))
unit = dbus.Interface(systemd_service, "org.freedesktop.systemd1.Unit")
unit_properties = dbus.Interface(unit, "org.freedesktop.DBus.Properties")
active_state = unit_properties.Get("org.freedesktop.systemd1.Unit", "ActiveState")
return active_state == "active"
except DBusException as error:
logger.error(error)
return False

async def enable_and_start(self, service: str) -> None:
if not await self.is_service_enabled(service):
await self.enable(service)
if not await self.is_service_active(service):
await self.start(service)

async def stop_and_disable(self, service: str) -> None:
if await self.is_service_active(service):
await self.stop(service)
if await self.is_service_enabled(service):
await self.disable(service)
def enable_and_start(self, service: str) -> None:
if not self.is_service_enabled(service):
self.enable(service)
if not self.is_service_active(service):
self.start(service)

0 comments on commit 6809e34

Please sign in to comment.