sandbox_v2: unix socket transport (transport T3)
Add an opt-in unix-domain-socket control-channel transport alongside the default stdio transport. The manager opens a listening unix socket, passes its path to the subprocess as --url unix://<path>, and the runtime dials back; the manager is the server. Both transports reuse StreamTransport's length-prefixed framing, so no dedicated unix transport class is needed. * Manager: SandboxManager(transport="stdio"|"unix") (default stdio, unchanged behavior). _run_one splits into stdio/unix paths sharing a _supervise_until_exit helper; the unix path creates the socket in a short per-attempt tempdir (sidesteps the ~108-char sun_path limit), races accept against early exit, and force-closes lingering accepted connections (server.close_clients) so wait_closed cannot hang. * CommandFactory is now (group, url) -> argv; the manager owns the transport and hands the factory the right --url. * Runtime: --url scheme selects the transport — stdio:// (default / absent), unix://<path>, or ws://|wss:// (reserved, rejected with a clear not-implemented error). New _transport_scheme + _open_unix_channel. * Tests: unix round-trip + socket cleanup (core), scheme selection + ws rejection + unix round-trip (client); existing factories updated to the (group, url) signature. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -6,10 +6,21 @@ sandbox group (``main`` / ``built-in`` / ``custom``); higher phases call
|
||||
|
||||
The contract between manager and runtime is:
|
||||
|
||||
* the manager launches ``python -m hass_client.sandbox_v2``
|
||||
* the manager launches ``python -m hass_client.sandbox_v2`` and tells it
|
||||
which control-channel transport to use via ``--url``
|
||||
* the runtime opens the control channel and sends a :data:`MSG_READY`
|
||||
frame as its first message once it is up (no stdout text marker)
|
||||
* on ``SIGTERM`` the runtime exits cleanly
|
||||
|
||||
Two transports are supported (selected by :class:`SandboxManager`'s
|
||||
``transport`` option, defaulting to ``stdio``):
|
||||
|
||||
* **stdio** — frames ride the subprocess's stdin/stdout pipes
|
||||
(``--url stdio://``); the default, unchanged from earlier phases.
|
||||
* **unix** — the manager opens a unix-domain socket, passes its path as
|
||||
``--url unix://<path>``, and the runtime dials back; the manager is the
|
||||
server. Both transports share :class:`~.channel.StreamTransport`'s
|
||||
length-prefixed framing, so there is no dedicated unix transport class.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
@@ -18,7 +29,10 @@ from collections.abc import Awaitable, Callable
|
||||
import contextlib
|
||||
from dataclasses import dataclass
|
||||
import logging
|
||||
import os
|
||||
import shutil
|
||||
import sys
|
||||
import tempfile
|
||||
import time
|
||||
from typing import Any
|
||||
|
||||
@@ -36,8 +50,16 @@ DEFAULT_RESTART_BACKOFF = 1.0
|
||||
DEFAULT_READY_TIMEOUT = 30.0
|
||||
DEFAULT_SHUTDOWN_GRACE = 10.0
|
||||
|
||||
CommandFactory = Callable[[str], list[str]]
|
||||
# A command factory receives ``(group, url)`` — the manager decides the
|
||||
# control-channel URL from its transport and hands it to the factory so the
|
||||
# spawned argv carries the right ``--url``.
|
||||
CommandFactory = Callable[[str, str], list[str]]
|
||||
TokenFactory = Callable[[str], Awaitable[str]]
|
||||
|
||||
# Supported control-channel transports.
|
||||
TRANSPORT_STDIO = "stdio"
|
||||
TRANSPORT_UNIX = "unix"
|
||||
_TRANSPORTS = (TRANSPORT_STDIO, TRANSPORT_UNIX)
|
||||
# The reply is a protobuf ``ShutdownResult``; typed loosely to keep the
|
||||
# manager free of a proto import.
|
||||
ShutdownReplyCallback = Callable[[str, Any], Awaitable[None]]
|
||||
@@ -76,15 +98,20 @@ class SandboxProcess:
|
||||
def __init__(
|
||||
self,
|
||||
group: str,
|
||||
command_factory: Callable[[], list[str]],
|
||||
command_factory: Callable[[str], list[str]],
|
||||
config: SandboxConfig,
|
||||
*,
|
||||
transport: str = TRANSPORT_STDIO,
|
||||
on_failed: Callable[[str], None] | None = None,
|
||||
on_channel_ready: Callable[[str, Channel], None] | None = None,
|
||||
on_shutdown_reply: ShutdownReplyCallback | None = None,
|
||||
) -> None:
|
||||
"""Initialise a supervised sandbox subprocess.
|
||||
|
||||
``command_factory`` is called with the control-channel URL the
|
||||
chosen ``transport`` requires (``stdio://`` or ``unix://<path>``)
|
||||
and returns the argv to spawn.
|
||||
|
||||
``on_channel_ready`` is invoked with the live :class:`Channel` as
|
||||
soon as it is opened — before the runtime's :data:`MSG_READY`
|
||||
frame arrives — so its handlers are in place before the runtime's
|
||||
@@ -98,6 +125,7 @@ class SandboxProcess:
|
||||
self.group = group
|
||||
self._command_factory = command_factory
|
||||
self._config = config
|
||||
self._transport = transport
|
||||
self._on_failed = on_failed
|
||||
self._on_channel_ready = on_channel_ready
|
||||
self._on_shutdown_reply = on_shutdown_reply
|
||||
@@ -309,9 +337,98 @@ class SandboxProcess:
|
||||
|
||||
async def _run_one(self) -> None:
|
||||
"""Spawn one process attempt and wait for it to exit."""
|
||||
command = self._command_factory()
|
||||
if self._transport == TRANSPORT_UNIX:
|
||||
await self._run_one_unix()
|
||||
else:
|
||||
await self._run_one_stdio()
|
||||
|
||||
async def _run_one_stdio(self) -> None:
|
||||
"""Spawn over stdio: the channel rides the subprocess's pipes."""
|
||||
proc = await self._spawn(self._command_factory("stdio://"))
|
||||
if proc is None:
|
||||
return
|
||||
self._process = proc
|
||||
try:
|
||||
proc = await asyncio.create_subprocess_exec(
|
||||
# Open the channel up front — stdout carries nothing but frames
|
||||
# now. Handlers go on before the reader starts so the runtime's
|
||||
# warm-load round-trip (and any early push) is never dropped.
|
||||
assert proc.stdout is not None
|
||||
assert proc.stdin is not None
|
||||
self._channel = self._build_channel(proc.stdout, proc.stdin)
|
||||
await self._supervise_until_exit(proc, self._channel, drain_stdout=False)
|
||||
finally:
|
||||
self._process = None
|
||||
|
||||
async def _run_one_unix(self) -> None:
|
||||
"""Spawn over a unix socket: the manager listens, runtime dials back.
|
||||
|
||||
The socket lives in a short-lived per-attempt tempdir rather than
|
||||
under the (possibly long) config dir, sidestepping the ~108-char
|
||||
``sun_path`` limit on Linux. It is unlinked when the server closes
|
||||
and the tempdir is removed on the way out — no leaked socket file.
|
||||
"""
|
||||
socket_dir = tempfile.mkdtemp(prefix=f"sandbox_v2_{self.group}_")
|
||||
socket_path = os.path.join(socket_dir, "control.sock")
|
||||
loop = asyncio.get_running_loop()
|
||||
connected: asyncio.Future[tuple[asyncio.StreamReader, asyncio.StreamWriter]] = (
|
||||
loop.create_future()
|
||||
)
|
||||
|
||||
def _on_connect(
|
||||
reader: asyncio.StreamReader, writer: asyncio.StreamWriter
|
||||
) -> None:
|
||||
if connected.done():
|
||||
# Only the first (runtime) connection is honoured.
|
||||
writer.close()
|
||||
return
|
||||
connected.set_result((reader, writer))
|
||||
|
||||
server = await asyncio.start_unix_server(_on_connect, path=socket_path)
|
||||
try:
|
||||
proc = await self._spawn(self._command_factory(f"unix://{socket_path}"))
|
||||
if proc is None:
|
||||
return
|
||||
self._process = proc
|
||||
try:
|
||||
# The runtime connects back as part of its startup; race the
|
||||
# accept against an early exit so a crash-before-connect does
|
||||
# not hang here forever.
|
||||
exit_task = asyncio.create_task(proc.wait())
|
||||
waiters: set[asyncio.Future[Any]] = {connected, exit_task}
|
||||
try:
|
||||
await asyncio.wait(waiters, return_when=asyncio.FIRST_COMPLETED)
|
||||
finally:
|
||||
if not exit_task.done():
|
||||
exit_task.cancel()
|
||||
with contextlib.suppress(asyncio.CancelledError):
|
||||
await exit_task
|
||||
if not connected.done():
|
||||
_LOGGER.warning(
|
||||
"Sandbox %s exited before connecting to its control socket",
|
||||
self.group,
|
||||
)
|
||||
return
|
||||
reader, writer = connected.result()
|
||||
self._channel = self._build_channel(reader, writer)
|
||||
await self._supervise_until_exit(proc, self._channel, drain_stdout=True)
|
||||
finally:
|
||||
self._process = None
|
||||
finally:
|
||||
server.close()
|
||||
# The accepted connection may linger in the server's client set:
|
||||
# when the runtime exits, the channel's read loop sees EOF and
|
||||
# marks the channel closed, so the later ``channel.close()`` is a
|
||||
# no-op that never closes the accepted transport. Force-close any
|
||||
# such leftover so ``wait_closed()`` cannot block forever.
|
||||
server.close_clients()
|
||||
with contextlib.suppress(Exception):
|
||||
await server.wait_closed()
|
||||
shutil.rmtree(socket_dir, ignore_errors=True)
|
||||
|
||||
async def _spawn(self, command: list[str]) -> asyncio.subprocess.Process | None:
|
||||
"""Spawn the subprocess, returning ``None`` if it cannot start."""
|
||||
try:
|
||||
return await asyncio.create_subprocess_exec(
|
||||
*command,
|
||||
stdin=asyncio.subprocess.PIPE,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
@@ -321,14 +438,23 @@ class SandboxProcess:
|
||||
_LOGGER.exception(
|
||||
"Sandbox %s could not be spawned (%s)", self.group, command
|
||||
)
|
||||
return
|
||||
return None
|
||||
|
||||
self._process = proc
|
||||
# Open the channel up front — stdout carries nothing but frames now.
|
||||
# Handlers go on before the reader starts so the runtime's warm-load
|
||||
# round-trip (and any early push) is never dropped.
|
||||
channel = self._open_channel(proc)
|
||||
self._channel = channel
|
||||
async def _supervise_until_exit(
|
||||
self,
|
||||
proc: asyncio.subprocess.Process,
|
||||
channel: Channel,
|
||||
*,
|
||||
drain_stdout: bool,
|
||||
) -> None:
|
||||
"""Wire the ready handshake, run until the process exits, clean up.
|
||||
|
||||
Shared by both transports — they reach here with a live channel and
|
||||
a running process; only how the channel's byte pipe was obtained
|
||||
differs. ``drain_stdout`` is set for the unix transport, where the
|
||||
subprocess's stdout pipe is unused (frames ride the socket) and must
|
||||
still be drained so its buffer never fills.
|
||||
"""
|
||||
ready_frame = asyncio.Event()
|
||||
|
||||
async def _on_ready(_payload: object) -> None:
|
||||
@@ -346,7 +472,11 @@ class SandboxProcess:
|
||||
|
||||
ready_task = asyncio.create_task(ready_frame.wait())
|
||||
exit_task = asyncio.create_task(proc.wait())
|
||||
stderr_task = asyncio.create_task(self._drain_stream(proc.stderr, "stderr"))
|
||||
drain_tasks = [asyncio.create_task(self._drain_stream(proc.stderr, "stderr"))]
|
||||
if drain_stdout:
|
||||
drain_tasks.append(
|
||||
asyncio.create_task(self._drain_stream(proc.stdout, "stdout"))
|
||||
)
|
||||
|
||||
try:
|
||||
await asyncio.wait(
|
||||
@@ -358,31 +488,27 @@ class SandboxProcess:
|
||||
# Hold here until the process exits.
|
||||
await exit_task
|
||||
finally:
|
||||
for task in (ready_task, exit_task):
|
||||
for task in (ready_task, exit_task, *drain_tasks):
|
||||
if not task.done():
|
||||
task.cancel()
|
||||
with contextlib.suppress(asyncio.CancelledError):
|
||||
await task
|
||||
if not stderr_task.done():
|
||||
stderr_task.cancel()
|
||||
with contextlib.suppress(asyncio.CancelledError):
|
||||
await stderr_task
|
||||
if self._channel is not None:
|
||||
await self._channel.close()
|
||||
self._channel = None
|
||||
self._process = None
|
||||
self._ready.clear()
|
||||
|
||||
def _open_channel(self, proc: asyncio.subprocess.Process) -> Channel:
|
||||
"""Wrap the subprocess pipes in a :class:`Channel`.
|
||||
def _build_channel(
|
||||
self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter
|
||||
) -> Channel:
|
||||
"""Wrap a reader/writer pair in a :class:`Channel`.
|
||||
|
||||
``proc.stdout`` is a :class:`~asyncio.StreamReader`; ``proc.stdin``
|
||||
is a :class:`~asyncio.StreamWriter`. Both carry length-prefixed
|
||||
channel frames end-to-end — there is no text preamble.
|
||||
Length-prefixed channel frames cross end-to-end — there is no text
|
||||
preamble. The pair comes from the subprocess's stdout/stdin (stdio)
|
||||
or from the accepted unix-socket connection (unix); the channel core
|
||||
is identical either way.
|
||||
"""
|
||||
assert proc.stdout is not None
|
||||
assert proc.stdin is not None
|
||||
return Channel(proc.stdout, proc.stdin, name=self.group, codec=ProtobufCodec())
|
||||
return Channel(reader, writer, name=self.group, codec=ProtobufCodec())
|
||||
|
||||
async def _drain_stream(
|
||||
self, stream: asyncio.StreamReader | None, name: str
|
||||
@@ -412,13 +538,20 @@ class SandboxManager:
|
||||
on_channel_ready: Callable[[str, Channel], None] | None = None,
|
||||
on_shutdown_reply: ShutdownReplyCallback | None = None,
|
||||
token_factory: TokenFactory | None = None,
|
||||
transport: str = TRANSPORT_STDIO,
|
||||
) -> None:
|
||||
"""Initialise the manager.
|
||||
|
||||
``command_factory`` lets tests substitute the spawned command; the
|
||||
default builds the ``python -m hass_client.sandbox_v2`` argv that
|
||||
``command_factory`` lets tests substitute the spawned command; it is
|
||||
called with ``(group, url)`` and the default builds the
|
||||
``python -m hass_client.sandbox_v2`` argv that
|
||||
:class:`hass_client.sandbox.SandboxRuntime` consumes.
|
||||
|
||||
``transport`` selects the control-channel transport for every
|
||||
spawned sandbox: ``"stdio"`` (default — unchanged behavior) or
|
||||
``"unix"`` (the manager opens a unix socket and the runtime dials
|
||||
back). Unix is opt-in so existing deployments keep using stdio.
|
||||
|
||||
``on_channel_ready`` is invoked once a sandbox's control channel is
|
||||
live; Phase 4's router uses it to register inbound flow handlers
|
||||
(e.g., ``sandbox_v2/notify_flow_changed``).
|
||||
@@ -436,6 +569,12 @@ class SandboxManager:
|
||||
self._on_channel_ready = on_channel_ready
|
||||
self._on_shutdown_reply = on_shutdown_reply
|
||||
self._token_factory = token_factory
|
||||
if transport not in _TRANSPORTS:
|
||||
raise ValueError(
|
||||
f"unknown sandbox transport {transport!r}; expected one of "
|
||||
f"{_TRANSPORTS}"
|
||||
)
|
||||
self._transport = transport
|
||||
self._tokens: dict[str, str] = {}
|
||||
self._sandboxes: dict[str, SandboxProcess] = {}
|
||||
self._locks: dict[str, asyncio.Lock] = {}
|
||||
@@ -478,13 +617,14 @@ class SandboxManager:
|
||||
# Keeping the SandboxProcess in the map after a failed start lets
|
||||
# callers observe its state — ensure_started won't try to
|
||||
# restart a failed sandbox.
|
||||
def make_command() -> list[str]:
|
||||
return self._command_factory(group)
|
||||
def make_command(url: str) -> list[str]:
|
||||
return self._command_factory(group, url)
|
||||
|
||||
process = SandboxProcess(
|
||||
group,
|
||||
make_command,
|
||||
self._config,
|
||||
transport=self._transport,
|
||||
on_failed=self._on_failed,
|
||||
on_channel_ready=self._on_channel_ready,
|
||||
on_shutdown_reply=self._on_shutdown_reply,
|
||||
@@ -527,13 +667,14 @@ class SandboxManager:
|
||||
return_exceptions=True,
|
||||
)
|
||||
|
||||
def _default_command(self, group: str) -> list[str]:
|
||||
def _default_command(self, group: str, url: str) -> list[str]:
|
||||
"""Argv for ``python -m hass_client.sandbox_v2``.
|
||||
|
||||
Phase 7 plugs the scoped sandbox access token into the CLI; the
|
||||
runtime does not yet open the websocket but carries the token so
|
||||
future phases can. The URL still defaults to localhost because
|
||||
the runtime does not consume it today.
|
||||
``url`` is the control-channel URL the manager's transport requires
|
||||
(``stdio://`` or ``unix://<path>``) — the runtime reads its scheme
|
||||
to pick the transport. Phase 7's scoped sandbox access token is
|
||||
still passed for the deferred websocket transport, which is the only
|
||||
path that consumes it.
|
||||
"""
|
||||
token = self._tokens.get(group, "sandbox_v2_placeholder")
|
||||
return [
|
||||
@@ -543,13 +684,15 @@ class SandboxManager:
|
||||
"--name",
|
||||
group,
|
||||
"--url",
|
||||
"ws://localhost:8123/api/websocket",
|
||||
url,
|
||||
"--token",
|
||||
token,
|
||||
]
|
||||
|
||||
|
||||
__all__ = [
|
||||
"TRANSPORT_STDIO",
|
||||
"TRANSPORT_UNIX",
|
||||
"CommandFactory",
|
||||
"SandboxConfig",
|
||||
"SandboxFailedError",
|
||||
|
||||
@@ -221,7 +221,24 @@ class SandboxRuntime:
|
||||
return 0
|
||||
|
||||
async def _default_channel_factory(self) -> Channel:
|
||||
"""Open a :class:`Channel` over stdin/stdout for the manager."""
|
||||
"""Open the control channel selected by the runtime's ``--url`` scheme.
|
||||
|
||||
* ``stdio://`` (or empty) — frames ride the process's stdin/stdout.
|
||||
* ``unix://<path>`` — dial back to the manager's unix socket.
|
||||
* ``ws://`` / ``wss://`` — reserved for the deferred websocket
|
||||
transport; rejected here with a clear error (this build ships
|
||||
stdio + unix only).
|
||||
"""
|
||||
kind = _transport_scheme(self.url)
|
||||
if kind == "unix":
|
||||
return await _open_unix_channel(
|
||||
self.url.removeprefix("unix://"), name=self.group
|
||||
)
|
||||
if kind == "ws":
|
||||
raise NotImplementedError(
|
||||
"websocket transport is not implemented in this build; it is "
|
||||
"reserved for the share-states work — use stdio:// or unix://"
|
||||
)
|
||||
return await _open_stdio_channel(name=self.group)
|
||||
|
||||
async def _handle_shutdown(self, _payload: object) -> pb.ShutdownResult:
|
||||
@@ -333,6 +350,38 @@ async def _load_restore_state(hass: Any) -> None:
|
||||
_LOGGER.exception("sandbox: failed to pre-load core.restore_state")
|
||||
|
||||
|
||||
def _transport_scheme(url: str) -> str:
|
||||
"""Map a ``--url`` to its transport kind.
|
||||
|
||||
Returns ``"stdio"`` (empty / ``stdio://``), ``"unix"``
|
||||
(``unix://<path>``) or ``"ws"`` (``ws://`` / ``wss://``, reserved for
|
||||
the deferred websocket transport). Raises :class:`ValueError` for any
|
||||
other scheme.
|
||||
"""
|
||||
if not url:
|
||||
return "stdio"
|
||||
scheme = url.split("://", 1)[0] if "://" in url else url
|
||||
if scheme in ("", "stdio"):
|
||||
return "stdio"
|
||||
if scheme == "unix":
|
||||
return "unix"
|
||||
if scheme in ("ws", "wss"):
|
||||
return "ws"
|
||||
raise ValueError(f"unsupported sandbox transport url: {url!r}")
|
||||
|
||||
|
||||
async def _open_unix_channel(path: str, *, name: str) -> Channel:
|
||||
"""Connect to the manager's unix socket and wrap it in a :class:`Channel`.
|
||||
|
||||
The manager is the unix server; the runtime dials back here. Framing is
|
||||
the same length-prefixed :class:`~.channel.StreamTransport` the stdio
|
||||
path uses — a unix socket is just a different byte pipe under it, so no
|
||||
dedicated transport class is needed.
|
||||
"""
|
||||
reader, writer = await asyncio.open_unix_connection(path)
|
||||
return Channel(reader, writer, name=name, codec=ProtobufCodec())
|
||||
|
||||
|
||||
async def _open_stdio_channel(*, name: str) -> Channel:
|
||||
"""Wrap the runtime's stdin/stdout into a :class:`Channel`."""
|
||||
loop = asyncio.get_running_loop()
|
||||
|
||||
@@ -25,8 +25,12 @@ def _build_parser() -> argparse.ArgumentParser:
|
||||
)
|
||||
parser.add_argument(
|
||||
"--url",
|
||||
required=True,
|
||||
help="Websocket URL of the main Home Assistant instance.",
|
||||
default="stdio://",
|
||||
help=(
|
||||
"Control-channel URL selecting the transport: stdio:// (default), "
|
||||
"unix://<path>, or ws://… (reserved — not implemented in this "
|
||||
"build)."
|
||||
),
|
||||
)
|
||||
parser.add_argument(
|
||||
"--token",
|
||||
|
||||
@@ -0,0 +1,93 @@
|
||||
"""Runtime-side transport selection + unix channel (transport T3).
|
||||
|
||||
The HA Core suite owns the manager-driven subprocess coverage. These
|
||||
tests pin the runtime side: ``--url`` scheme → transport kind, the
|
||||
websocket rejection, and that :func:`_open_unix_channel` round-trips a
|
||||
call over a real unix socket.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import contextlib
|
||||
from pathlib import Path
|
||||
import tempfile
|
||||
|
||||
from hass_client._proto import sandbox_v2_pb2 as pb
|
||||
from hass_client.channel import Channel
|
||||
from hass_client.codec_protobuf import ProtobufCodec
|
||||
from hass_client.sandbox import SandboxRuntime, _open_unix_channel, _transport_scheme
|
||||
from hass_client.sandbox_v2.__main__ import _build_parser
|
||||
import pytest
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("url", "expected"),
|
||||
[
|
||||
("", "stdio"),
|
||||
("stdio://", "stdio"),
|
||||
("unix:///tmp/sandbox.sock", "unix"),
|
||||
("ws://localhost:8123/api/websocket", "ws"),
|
||||
("wss://example.test/ws", "ws"),
|
||||
],
|
||||
)
|
||||
def test_transport_scheme_selection(url: str, expected: str) -> None:
|
||||
"""Each ``--url`` maps to its transport kind."""
|
||||
assert _transport_scheme(url) == expected
|
||||
|
||||
|
||||
def test_transport_scheme_rejects_unknown() -> None:
|
||||
"""An unrecognised scheme raises rather than silently defaulting."""
|
||||
with pytest.raises(ValueError, match="unsupported sandbox transport url"):
|
||||
_transport_scheme("amqp://broker/queue")
|
||||
|
||||
|
||||
def test_cli_url_defaults_to_stdio() -> None:
|
||||
"""Omitting ``--url`` selects the stdio transport."""
|
||||
args = _build_parser().parse_args(["--name", "built-in", "--token", "t"])
|
||||
assert args.url == "stdio://"
|
||||
|
||||
|
||||
async def test_websocket_transport_rejected() -> None:
|
||||
"""A ``ws://`` URL is rejected with a clear not-implemented error."""
|
||||
runtime = SandboxRuntime(url="ws://localhost:8123/api/websocket", token="t", group="g")
|
||||
with pytest.raises(NotImplementedError, match="not implemented in this build"):
|
||||
await runtime._default_channel_factory() # noqa: SLF001
|
||||
|
||||
|
||||
async def test_open_unix_channel_round_trips() -> None:
|
||||
"""A call round-trips over a real unix socket via the runtime opener.
|
||||
|
||||
The server side stands in for the manager: it accepts the connection,
|
||||
registers a ``ping`` handler returning a typed proto result, and the
|
||||
client opened by :func:`_open_unix_channel` calls it.
|
||||
"""
|
||||
server_channels: list[Channel] = []
|
||||
|
||||
async def _ping(_payload: object) -> pb.PingResult:
|
||||
return pb.PingResult(pong="pong-unix")
|
||||
|
||||
def _on_connect(
|
||||
reader: asyncio.StreamReader, writer: asyncio.StreamWriter
|
||||
) -> None:
|
||||
channel = Channel(reader, writer, name="server", codec=ProtobufCodec())
|
||||
channel.register("sandbox_v2/ping", _ping)
|
||||
channel.start()
|
||||
server_channels.append(channel)
|
||||
|
||||
with tempfile.TemporaryDirectory(prefix="sandbox_v2_test_") as socket_dir:
|
||||
socket_path = str(Path(socket_dir) / "control.sock")
|
||||
server = await asyncio.start_unix_server(_on_connect, path=socket_path)
|
||||
client = await _open_unix_channel(socket_path, name="client")
|
||||
try:
|
||||
client.start()
|
||||
result = await asyncio.wait_for(
|
||||
client.call("sandbox_v2/ping", None), timeout=5.0
|
||||
)
|
||||
assert result.pong == "pong-unix"
|
||||
finally:
|
||||
await client.close()
|
||||
for channel in server_channels:
|
||||
await channel.close()
|
||||
server.close()
|
||||
server.close_clients()
|
||||
with contextlib.suppress(Exception):
|
||||
await server.wait_closed()
|
||||
@@ -68,7 +68,7 @@ async def test_crash_restart_budget(hass: HomeAssistant) -> None:
|
||||
"""A sandbox that crashes 3 times in the window is marked failed."""
|
||||
spawn_times: list[float] = []
|
||||
|
||||
def failing_factory(group: str) -> list[str]:
|
||||
def failing_factory(group: str, url: str) -> list[str]:
|
||||
spawn_times.append(time.monotonic())
|
||||
return _FAILING_CMD
|
||||
|
||||
@@ -99,7 +99,7 @@ async def test_crash_restart_budget(hass: HomeAssistant) -> None:
|
||||
async def test_multiple_groups_independent(hass: HomeAssistant) -> None:
|
||||
"""A failed group does not stop a healthy group from running."""
|
||||
|
||||
def mixed_factory(group: str) -> list[str]:
|
||||
def mixed_factory(group: str, url: str) -> list[str]:
|
||||
if group == "broken":
|
||||
return _FAILING_CMD
|
||||
return [
|
||||
@@ -109,7 +109,7 @@ async def test_multiple_groups_independent(hass: HomeAssistant) -> None:
|
||||
"--name",
|
||||
group,
|
||||
"--url",
|
||||
"ws://localhost:8123/api/websocket",
|
||||
url,
|
||||
"--token",
|
||||
"test-token",
|
||||
]
|
||||
@@ -160,10 +160,11 @@ async def test_default_command_includes_token(
|
||||
# Prime the token cache without actually launching a subprocess.
|
||||
mgr._tokens["built-in"] = "token-built-in"
|
||||
|
||||
builtin_argv = mgr._default_command("built-in")
|
||||
builtin_argv = mgr._default_command("built-in", "stdio://")
|
||||
assert "token-built-in" in builtin_argv
|
||||
assert "--name" in builtin_argv
|
||||
assert "built-in" in builtin_argv
|
||||
assert "stdio://" in builtin_argv
|
||||
|
||||
|
||||
async def test_ensure_started_awaits_token_factory(hass: HomeAssistant) -> None:
|
||||
@@ -174,7 +175,7 @@ async def test_ensure_started_awaits_token_factory(hass: HomeAssistant) -> None:
|
||||
calls.append(group)
|
||||
return f"token-{group}"
|
||||
|
||||
def quick_command(group: str) -> list[str]:
|
||||
def quick_command(group: str, url: str) -> list[str]:
|
||||
return _FAILING_CMD # Force a fast failure so the test runs fast.
|
||||
|
||||
mgr = SandboxManager(
|
||||
|
||||
@@ -31,7 +31,7 @@ FAST_CONFIG = SandboxConfig(
|
||||
async def _manager_fixture(hass: HomeAssistant):
|
||||
"""Manager + tighter timings; tears every sandbox down on exit."""
|
||||
|
||||
def _factory(group: str) -> list[str]:
|
||||
def _factory(group: str, url: str) -> list[str]:
|
||||
return [
|
||||
sys.executable,
|
||||
"-m",
|
||||
@@ -39,7 +39,7 @@ async def _manager_fixture(hass: HomeAssistant):
|
||||
"--name",
|
||||
group,
|
||||
"--url",
|
||||
"ws://localhost:8123/api/websocket",
|
||||
url,
|
||||
"--token",
|
||||
"phase4-test-token",
|
||||
]
|
||||
|
||||
@@ -37,7 +37,7 @@ FAST_CONFIG = SandboxConfig(
|
||||
async def _manager_fixture(hass: HomeAssistant) -> AsyncIterator[SandboxManager]:
|
||||
"""Manager that spawns the real runtime; cleans up on teardown."""
|
||||
|
||||
def _factory(group: str) -> list[str]:
|
||||
def _factory(group: str, url: str) -> list[str]:
|
||||
return [
|
||||
sys.executable,
|
||||
"-m",
|
||||
@@ -45,7 +45,7 @@ async def _manager_fixture(hass: HomeAssistant) -> AsyncIterator[SandboxManager]
|
||||
"--name",
|
||||
group,
|
||||
"--url",
|
||||
"ws://localhost:8123/api/websocket",
|
||||
url,
|
||||
"--token",
|
||||
"phase9-test-token",
|
||||
]
|
||||
@@ -89,7 +89,7 @@ async def test_graceful_shutdown_falls_through_to_sigterm_on_timeout(
|
||||
SIGTERM / SIGKILL.
|
||||
"""
|
||||
|
||||
def _hung_factory(group: str) -> list[str]:
|
||||
def _hung_factory(group: str, url: str) -> list[str]:
|
||||
return [
|
||||
sys.executable,
|
||||
"-c",
|
||||
@@ -137,7 +137,7 @@ async def test_graceful_shutdown_on_no_channel_is_noop(
|
||||
) -> None:
|
||||
"""A sandbox without a live channel reports failure and stays up."""
|
||||
|
||||
def _factory(group: str) -> list[str]:
|
||||
def _factory(group: str, url: str) -> list[str]:
|
||||
# Failing argv — supervisor records a failed attempt then dies.
|
||||
return [sys.executable, "-c", "import sys; sys.exit(1)"]
|
||||
|
||||
@@ -179,7 +179,7 @@ async def test_on_shutdown_reply_callback_is_invoked(
|
||||
async def _on_shutdown_reply(group: str, reply: pb.ShutdownResult) -> None:
|
||||
replies.append((group, reply))
|
||||
|
||||
def _factory(group: str) -> list[str]:
|
||||
def _factory(group: str, url: str) -> list[str]:
|
||||
return [
|
||||
sys.executable,
|
||||
"-m",
|
||||
@@ -187,7 +187,7 @@ async def test_on_shutdown_reply_callback_is_invoked(
|
||||
"--name",
|
||||
group,
|
||||
"--url",
|
||||
"ws://localhost:8123/api/websocket",
|
||||
url,
|
||||
"--token",
|
||||
"phase9-reply-test",
|
||||
]
|
||||
|
||||
@@ -0,0 +1,91 @@
|
||||
"""Unix-socket control-channel transport (transport T3).
|
||||
|
||||
Spawns the real ``python -m hass_client.sandbox_v2`` runtime with the
|
||||
manager configured for the unix-socket transport: the manager opens a
|
||||
listening unix socket, passes ``--url unix://<path>`` to the subprocess,
|
||||
the runtime dials back, and a ``ping`` round-trips over the socket. Also
|
||||
covers manager-side transport selection and socket cleanup on shutdown.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
import sys
|
||||
|
||||
import pytest
|
||||
|
||||
from homeassistant.components.sandbox_v2.manager import (
|
||||
TRANSPORT_UNIX,
|
||||
SandboxConfig,
|
||||
SandboxManager,
|
||||
)
|
||||
from homeassistant.core import HomeAssistant
|
||||
|
||||
FAST_CONFIG = SandboxConfig(
|
||||
restart_limit=2,
|
||||
restart_window=30.0,
|
||||
restart_backoff=0.05,
|
||||
ready_timeout=20.0,
|
||||
shutdown_grace=5.0,
|
||||
)
|
||||
|
||||
|
||||
def _runtime_factory(seen: dict[str, str]) -> object:
|
||||
"""Command factory that records the URL the manager hands it."""
|
||||
|
||||
def _factory(group: str, url: str) -> list[str]:
|
||||
seen["url"] = url
|
||||
return [
|
||||
sys.executable,
|
||||
"-m",
|
||||
"hass_client.sandbox_v2",
|
||||
"--name",
|
||||
group,
|
||||
"--url",
|
||||
url,
|
||||
"--token",
|
||||
"t3-unix-token",
|
||||
]
|
||||
|
||||
return _factory
|
||||
|
||||
|
||||
async def test_unix_socket_round_trip(hass: HomeAssistant) -> None:
|
||||
"""Manager opens a unix socket; runtime connects; ping round-trips."""
|
||||
seen: dict[str, str] = {}
|
||||
mgr = SandboxManager(
|
||||
hass,
|
||||
command_factory=_runtime_factory(seen),
|
||||
config=FAST_CONFIG,
|
||||
transport=TRANSPORT_UNIX,
|
||||
)
|
||||
try:
|
||||
sandbox = await mgr.ensure_started("built-in")
|
||||
assert sandbox.state == "running"
|
||||
|
||||
# The manager selected the unix transport and handed the runtime a
|
||||
# unix:// socket path that exists while the sandbox is running.
|
||||
assert seen["url"].startswith("unix://")
|
||||
socket_path = seen["url"].removeprefix("unix://")
|
||||
assert os.path.exists(socket_path)
|
||||
|
||||
channel = sandbox.channel
|
||||
assert channel is not None
|
||||
result = await asyncio.wait_for(
|
||||
channel.call("sandbox_v2/ping", None), timeout=5.0
|
||||
)
|
||||
assert result.pong == "sandbox_v2"
|
||||
finally:
|
||||
await mgr.async_stop_all()
|
||||
|
||||
assert sandbox.state == "stopped"
|
||||
|
||||
# No leaked socket file or tempdir after shutdown.
|
||||
socket_path = seen["url"].removeprefix("unix://")
|
||||
assert not os.path.exists(socket_path)
|
||||
assert not os.path.exists(os.path.dirname(socket_path))
|
||||
|
||||
|
||||
async def test_unknown_transport_rejected(hass: HomeAssistant) -> None:
|
||||
"""An unknown transport name is rejected at construction time."""
|
||||
with pytest.raises(ValueError, match="unknown sandbox transport"):
|
||||
SandboxManager(hass, transport="carrier-pigeon")
|
||||
Reference in New Issue
Block a user