sandbox_v2: unix socket transport (transport T3)

Add an opt-in unix-domain-socket control-channel transport alongside the
default stdio transport. The manager opens a listening unix socket, passes
its path to the subprocess as --url unix://<path>, and the runtime dials
back; the manager is the server. Both transports reuse StreamTransport's
length-prefixed framing, so no dedicated unix transport class is needed.

* Manager: SandboxManager(transport="stdio"|"unix") (default stdio,
  unchanged behavior). _run_one splits into stdio/unix paths sharing a
  _supervise_until_exit helper; the unix path creates the socket in a
  short per-attempt tempdir (sidesteps the ~108-char sun_path limit),
  races accept against early exit, and force-closes lingering accepted
  connections (server.close_clients) so wait_closed cannot hang.
* CommandFactory is now (group, url) -> argv; the manager owns the
  transport and hands the factory the right --url.
* Runtime: --url scheme selects the transport — stdio:// (default /
  absent), unix://<path>, or ws://|wss:// (reserved, rejected with a
  clear not-implemented error). New _transport_scheme + _open_unix_channel.
* Tests: unix round-trip + socket cleanup (core), scheme selection + ws
  rejection + unix round-trip (client); existing factories updated to the
  (group, url) signature.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
Paulus Schoutsen
2026-06-03 09:03:36 -04:00
parent f03474c029
commit 1eaa79d261
8 changed files with 434 additions and 53 deletions
+180 -37
View File
@@ -6,10 +6,21 @@ sandbox group (``main`` / ``built-in`` / ``custom``); higher phases call
The contract between manager and runtime is:
* the manager launches ``python -m hass_client.sandbox_v2``
* the manager launches ``python -m hass_client.sandbox_v2`` and tells it
which control-channel transport to use via ``--url``
* the runtime opens the control channel and sends a :data:`MSG_READY`
frame as its first message once it is up (no stdout text marker)
* on ``SIGTERM`` the runtime exits cleanly
Two transports are supported (selected by :class:`SandboxManager`'s
``transport`` option, defaulting to ``stdio``):
* **stdio** — frames ride the subprocess's stdin/stdout pipes
(``--url stdio://``); the default, unchanged from earlier phases.
* **unix** — the manager opens a unix-domain socket, passes its path as
``--url unix://<path>``, and the runtime dials back; the manager is the
server. Both transports share :class:`~.channel.StreamTransport`'s
length-prefixed framing, so there is no dedicated unix transport class.
"""
import asyncio
@@ -18,7 +29,10 @@ from collections.abc import Awaitable, Callable
import contextlib
from dataclasses import dataclass
import logging
import os
import shutil
import sys
import tempfile
import time
from typing import Any
@@ -36,8 +50,16 @@ DEFAULT_RESTART_BACKOFF = 1.0
DEFAULT_READY_TIMEOUT = 30.0
DEFAULT_SHUTDOWN_GRACE = 10.0
CommandFactory = Callable[[str], list[str]]
# A command factory receives ``(group, url)`` — the manager decides the
# control-channel URL from its transport and hands it to the factory so the
# spawned argv carries the right ``--url``.
CommandFactory = Callable[[str, str], list[str]]
TokenFactory = Callable[[str], Awaitable[str]]
# Supported control-channel transports.
TRANSPORT_STDIO = "stdio"
TRANSPORT_UNIX = "unix"
_TRANSPORTS = (TRANSPORT_STDIO, TRANSPORT_UNIX)
# The reply is a protobuf ``ShutdownResult``; typed loosely to keep the
# manager free of a proto import.
ShutdownReplyCallback = Callable[[str, Any], Awaitable[None]]
@@ -76,15 +98,20 @@ class SandboxProcess:
def __init__(
self,
group: str,
command_factory: Callable[[], list[str]],
command_factory: Callable[[str], list[str]],
config: SandboxConfig,
*,
transport: str = TRANSPORT_STDIO,
on_failed: Callable[[str], None] | None = None,
on_channel_ready: Callable[[str, Channel], None] | None = None,
on_shutdown_reply: ShutdownReplyCallback | None = None,
) -> None:
"""Initialise a supervised sandbox subprocess.
``command_factory`` is called with the control-channel URL the
chosen ``transport`` requires (``stdio://`` or ``unix://<path>``)
and returns the argv to spawn.
``on_channel_ready`` is invoked with the live :class:`Channel` as
soon as it is opened — before the runtime's :data:`MSG_READY`
frame arrives — so its handlers are in place before the runtime's
@@ -98,6 +125,7 @@ class SandboxProcess:
self.group = group
self._command_factory = command_factory
self._config = config
self._transport = transport
self._on_failed = on_failed
self._on_channel_ready = on_channel_ready
self._on_shutdown_reply = on_shutdown_reply
@@ -309,9 +337,98 @@ class SandboxProcess:
async def _run_one(self) -> None:
"""Spawn one process attempt and wait for it to exit."""
command = self._command_factory()
if self._transport == TRANSPORT_UNIX:
await self._run_one_unix()
else:
await self._run_one_stdio()
async def _run_one_stdio(self) -> None:
"""Spawn over stdio: the channel rides the subprocess's pipes."""
proc = await self._spawn(self._command_factory("stdio://"))
if proc is None:
return
self._process = proc
try:
proc = await asyncio.create_subprocess_exec(
# Open the channel up front — stdout carries nothing but frames
# now. Handlers go on before the reader starts so the runtime's
# warm-load round-trip (and any early push) is never dropped.
assert proc.stdout is not None
assert proc.stdin is not None
self._channel = self._build_channel(proc.stdout, proc.stdin)
await self._supervise_until_exit(proc, self._channel, drain_stdout=False)
finally:
self._process = None
async def _run_one_unix(self) -> None:
"""Spawn over a unix socket: the manager listens, runtime dials back.
The socket lives in a short-lived per-attempt tempdir rather than
under the (possibly long) config dir, sidestepping the ~108-char
``sun_path`` limit on Linux. It is unlinked when the server closes
and the tempdir is removed on the way out — no leaked socket file.
"""
socket_dir = tempfile.mkdtemp(prefix=f"sandbox_v2_{self.group}_")
socket_path = os.path.join(socket_dir, "control.sock")
loop = asyncio.get_running_loop()
connected: asyncio.Future[tuple[asyncio.StreamReader, asyncio.StreamWriter]] = (
loop.create_future()
)
def _on_connect(
reader: asyncio.StreamReader, writer: asyncio.StreamWriter
) -> None:
if connected.done():
# Only the first (runtime) connection is honoured.
writer.close()
return
connected.set_result((reader, writer))
server = await asyncio.start_unix_server(_on_connect, path=socket_path)
try:
proc = await self._spawn(self._command_factory(f"unix://{socket_path}"))
if proc is None:
return
self._process = proc
try:
# The runtime connects back as part of its startup; race the
# accept against an early exit so a crash-before-connect does
# not hang here forever.
exit_task = asyncio.create_task(proc.wait())
waiters: set[asyncio.Future[Any]] = {connected, exit_task}
try:
await asyncio.wait(waiters, return_when=asyncio.FIRST_COMPLETED)
finally:
if not exit_task.done():
exit_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await exit_task
if not connected.done():
_LOGGER.warning(
"Sandbox %s exited before connecting to its control socket",
self.group,
)
return
reader, writer = connected.result()
self._channel = self._build_channel(reader, writer)
await self._supervise_until_exit(proc, self._channel, drain_stdout=True)
finally:
self._process = None
finally:
server.close()
# The accepted connection may linger in the server's client set:
# when the runtime exits, the channel's read loop sees EOF and
# marks the channel closed, so the later ``channel.close()`` is a
# no-op that never closes the accepted transport. Force-close any
# such leftover so ``wait_closed()`` cannot block forever.
server.close_clients()
with contextlib.suppress(Exception):
await server.wait_closed()
shutil.rmtree(socket_dir, ignore_errors=True)
async def _spawn(self, command: list[str]) -> asyncio.subprocess.Process | None:
"""Spawn the subprocess, returning ``None`` if it cannot start."""
try:
return await asyncio.create_subprocess_exec(
*command,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
@@ -321,14 +438,23 @@ class SandboxProcess:
_LOGGER.exception(
"Sandbox %s could not be spawned (%s)", self.group, command
)
return
return None
self._process = proc
# Open the channel up front — stdout carries nothing but frames now.
# Handlers go on before the reader starts so the runtime's warm-load
# round-trip (and any early push) is never dropped.
channel = self._open_channel(proc)
self._channel = channel
async def _supervise_until_exit(
self,
proc: asyncio.subprocess.Process,
channel: Channel,
*,
drain_stdout: bool,
) -> None:
"""Wire the ready handshake, run until the process exits, clean up.
Shared by both transports — they reach here with a live channel and
a running process; only how the channel's byte pipe was obtained
differs. ``drain_stdout`` is set for the unix transport, where the
subprocess's stdout pipe is unused (frames ride the socket) and must
still be drained so its buffer never fills.
"""
ready_frame = asyncio.Event()
async def _on_ready(_payload: object) -> None:
@@ -346,7 +472,11 @@ class SandboxProcess:
ready_task = asyncio.create_task(ready_frame.wait())
exit_task = asyncio.create_task(proc.wait())
stderr_task = asyncio.create_task(self._drain_stream(proc.stderr, "stderr"))
drain_tasks = [asyncio.create_task(self._drain_stream(proc.stderr, "stderr"))]
if drain_stdout:
drain_tasks.append(
asyncio.create_task(self._drain_stream(proc.stdout, "stdout"))
)
try:
await asyncio.wait(
@@ -358,31 +488,27 @@ class SandboxProcess:
# Hold here until the process exits.
await exit_task
finally:
for task in (ready_task, exit_task):
for task in (ready_task, exit_task, *drain_tasks):
if not task.done():
task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await task
if not stderr_task.done():
stderr_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await stderr_task
if self._channel is not None:
await self._channel.close()
self._channel = None
self._process = None
self._ready.clear()
def _open_channel(self, proc: asyncio.subprocess.Process) -> Channel:
"""Wrap the subprocess pipes in a :class:`Channel`.
def _build_channel(
self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter
) -> Channel:
"""Wrap a reader/writer pair in a :class:`Channel`.
``proc.stdout`` is a :class:`~asyncio.StreamReader`; ``proc.stdin``
is a :class:`~asyncio.StreamWriter`. Both carry length-prefixed
channel frames end-to-end — there is no text preamble.
Length-prefixed channel frames cross end-to-end — there is no text
preamble. The pair comes from the subprocess's stdout/stdin (stdio)
or from the accepted unix-socket connection (unix); the channel core
is identical either way.
"""
assert proc.stdout is not None
assert proc.stdin is not None
return Channel(proc.stdout, proc.stdin, name=self.group, codec=ProtobufCodec())
return Channel(reader, writer, name=self.group, codec=ProtobufCodec())
async def _drain_stream(
self, stream: asyncio.StreamReader | None, name: str
@@ -412,13 +538,20 @@ class SandboxManager:
on_channel_ready: Callable[[str, Channel], None] | None = None,
on_shutdown_reply: ShutdownReplyCallback | None = None,
token_factory: TokenFactory | None = None,
transport: str = TRANSPORT_STDIO,
) -> None:
"""Initialise the manager.
``command_factory`` lets tests substitute the spawned command; the
default builds the ``python -m hass_client.sandbox_v2`` argv that
``command_factory`` lets tests substitute the spawned command; it is
called with ``(group, url)`` and the default builds the
``python -m hass_client.sandbox_v2`` argv that
:class:`hass_client.sandbox.SandboxRuntime` consumes.
``transport`` selects the control-channel transport for every
spawned sandbox: ``"stdio"`` (default — unchanged behavior) or
``"unix"`` (the manager opens a unix socket and the runtime dials
back). Unix is opt-in so existing deployments keep using stdio.
``on_channel_ready`` is invoked once a sandbox's control channel is
live; Phase 4's router uses it to register inbound flow handlers
(e.g., ``sandbox_v2/notify_flow_changed``).
@@ -436,6 +569,12 @@ class SandboxManager:
self._on_channel_ready = on_channel_ready
self._on_shutdown_reply = on_shutdown_reply
self._token_factory = token_factory
if transport not in _TRANSPORTS:
raise ValueError(
f"unknown sandbox transport {transport!r}; expected one of "
f"{_TRANSPORTS}"
)
self._transport = transport
self._tokens: dict[str, str] = {}
self._sandboxes: dict[str, SandboxProcess] = {}
self._locks: dict[str, asyncio.Lock] = {}
@@ -478,13 +617,14 @@ class SandboxManager:
# Keeping the SandboxProcess in the map after a failed start lets
# callers observe its state — ensure_started won't try to
# restart a failed sandbox.
def make_command() -> list[str]:
return self._command_factory(group)
def make_command(url: str) -> list[str]:
return self._command_factory(group, url)
process = SandboxProcess(
group,
make_command,
self._config,
transport=self._transport,
on_failed=self._on_failed,
on_channel_ready=self._on_channel_ready,
on_shutdown_reply=self._on_shutdown_reply,
@@ -527,13 +667,14 @@ class SandboxManager:
return_exceptions=True,
)
def _default_command(self, group: str) -> list[str]:
def _default_command(self, group: str, url: str) -> list[str]:
"""Argv for ``python -m hass_client.sandbox_v2``.
Phase 7 plugs the scoped sandbox access token into the CLI; the
runtime does not yet open the websocket but carries the token so
future phases can. The URL still defaults to localhost because
the runtime does not consume it today.
``url`` is the control-channel URL the manager's transport requires
(``stdio://`` or ``unix://<path>``) — the runtime reads its scheme
to pick the transport. Phase 7's scoped sandbox access token is
still passed for the deferred websocket transport, which is the only
path that consumes it.
"""
token = self._tokens.get(group, "sandbox_v2_placeholder")
return [
@@ -543,13 +684,15 @@ class SandboxManager:
"--name",
group,
"--url",
"ws://localhost:8123/api/websocket",
url,
"--token",
token,
]
__all__ = [
"TRANSPORT_STDIO",
"TRANSPORT_UNIX",
"CommandFactory",
"SandboxConfig",
"SandboxFailedError",
+50 -1
View File
@@ -221,7 +221,24 @@ class SandboxRuntime:
return 0
async def _default_channel_factory(self) -> Channel:
"""Open a :class:`Channel` over stdin/stdout for the manager."""
"""Open the control channel selected by the runtime's ``--url`` scheme.
* ``stdio://`` (or empty) — frames ride the process's stdin/stdout.
* ``unix://<path>`` — dial back to the manager's unix socket.
* ``ws://`` / ``wss://`` — reserved for the deferred websocket
transport; rejected here with a clear error (this build ships
stdio + unix only).
"""
kind = _transport_scheme(self.url)
if kind == "unix":
return await _open_unix_channel(
self.url.removeprefix("unix://"), name=self.group
)
if kind == "ws":
raise NotImplementedError(
"websocket transport is not implemented in this build; it is "
"reserved for the share-states work — use stdio:// or unix://"
)
return await _open_stdio_channel(name=self.group)
async def _handle_shutdown(self, _payload: object) -> pb.ShutdownResult:
@@ -333,6 +350,38 @@ async def _load_restore_state(hass: Any) -> None:
_LOGGER.exception("sandbox: failed to pre-load core.restore_state")
def _transport_scheme(url: str) -> str:
"""Map a ``--url`` to its transport kind.
Returns ``"stdio"`` (empty / ``stdio://``), ``"unix"``
(``unix://<path>``) or ``"ws"`` (``ws://`` / ``wss://``, reserved for
the deferred websocket transport). Raises :class:`ValueError` for any
other scheme.
"""
if not url:
return "stdio"
scheme = url.split("://", 1)[0] if "://" in url else url
if scheme in ("", "stdio"):
return "stdio"
if scheme == "unix":
return "unix"
if scheme in ("ws", "wss"):
return "ws"
raise ValueError(f"unsupported sandbox transport url: {url!r}")
async def _open_unix_channel(path: str, *, name: str) -> Channel:
"""Connect to the manager's unix socket and wrap it in a :class:`Channel`.
The manager is the unix server; the runtime dials back here. Framing is
the same length-prefixed :class:`~.channel.StreamTransport` the stdio
path uses — a unix socket is just a different byte pipe under it, so no
dedicated transport class is needed.
"""
reader, writer = await asyncio.open_unix_connection(path)
return Channel(reader, writer, name=name, codec=ProtobufCodec())
async def _open_stdio_channel(*, name: str) -> Channel:
"""Wrap the runtime's stdin/stdout into a :class:`Channel`."""
loop = asyncio.get_running_loop()
@@ -25,8 +25,12 @@ def _build_parser() -> argparse.ArgumentParser:
)
parser.add_argument(
"--url",
required=True,
help="Websocket URL of the main Home Assistant instance.",
default="stdio://",
help=(
"Control-channel URL selecting the transport: stdio:// (default), "
"unix://<path>, or ws://… (reserved — not implemented in this "
"build)."
),
)
parser.add_argument(
"--token",
@@ -0,0 +1,93 @@
"""Runtime-side transport selection + unix channel (transport T3).
The HA Core suite owns the manager-driven subprocess coverage. These
tests pin the runtime side: ``--url`` scheme → transport kind, the
websocket rejection, and that :func:`_open_unix_channel` round-trips a
call over a real unix socket.
"""
import asyncio
import contextlib
from pathlib import Path
import tempfile
from hass_client._proto import sandbox_v2_pb2 as pb
from hass_client.channel import Channel
from hass_client.codec_protobuf import ProtobufCodec
from hass_client.sandbox import SandboxRuntime, _open_unix_channel, _transport_scheme
from hass_client.sandbox_v2.__main__ import _build_parser
import pytest
@pytest.mark.parametrize(
("url", "expected"),
[
("", "stdio"),
("stdio://", "stdio"),
("unix:///tmp/sandbox.sock", "unix"),
("ws://localhost:8123/api/websocket", "ws"),
("wss://example.test/ws", "ws"),
],
)
def test_transport_scheme_selection(url: str, expected: str) -> None:
"""Each ``--url`` maps to its transport kind."""
assert _transport_scheme(url) == expected
def test_transport_scheme_rejects_unknown() -> None:
"""An unrecognised scheme raises rather than silently defaulting."""
with pytest.raises(ValueError, match="unsupported sandbox transport url"):
_transport_scheme("amqp://broker/queue")
def test_cli_url_defaults_to_stdio() -> None:
"""Omitting ``--url`` selects the stdio transport."""
args = _build_parser().parse_args(["--name", "built-in", "--token", "t"])
assert args.url == "stdio://"
async def test_websocket_transport_rejected() -> None:
"""A ``ws://`` URL is rejected with a clear not-implemented error."""
runtime = SandboxRuntime(url="ws://localhost:8123/api/websocket", token="t", group="g")
with pytest.raises(NotImplementedError, match="not implemented in this build"):
await runtime._default_channel_factory() # noqa: SLF001
async def test_open_unix_channel_round_trips() -> None:
"""A call round-trips over a real unix socket via the runtime opener.
The server side stands in for the manager: it accepts the connection,
registers a ``ping`` handler returning a typed proto result, and the
client opened by :func:`_open_unix_channel` calls it.
"""
server_channels: list[Channel] = []
async def _ping(_payload: object) -> pb.PingResult:
return pb.PingResult(pong="pong-unix")
def _on_connect(
reader: asyncio.StreamReader, writer: asyncio.StreamWriter
) -> None:
channel = Channel(reader, writer, name="server", codec=ProtobufCodec())
channel.register("sandbox_v2/ping", _ping)
channel.start()
server_channels.append(channel)
with tempfile.TemporaryDirectory(prefix="sandbox_v2_test_") as socket_dir:
socket_path = str(Path(socket_dir) / "control.sock")
server = await asyncio.start_unix_server(_on_connect, path=socket_path)
client = await _open_unix_channel(socket_path, name="client")
try:
client.start()
result = await asyncio.wait_for(
client.call("sandbox_v2/ping", None), timeout=5.0
)
assert result.pong == "pong-unix"
finally:
await client.close()
for channel in server_channels:
await channel.close()
server.close()
server.close_clients()
with contextlib.suppress(Exception):
await server.wait_closed()
+6 -5
View File
@@ -68,7 +68,7 @@ async def test_crash_restart_budget(hass: HomeAssistant) -> None:
"""A sandbox that crashes 3 times in the window is marked failed."""
spawn_times: list[float] = []
def failing_factory(group: str) -> list[str]:
def failing_factory(group: str, url: str) -> list[str]:
spawn_times.append(time.monotonic())
return _FAILING_CMD
@@ -99,7 +99,7 @@ async def test_crash_restart_budget(hass: HomeAssistant) -> None:
async def test_multiple_groups_independent(hass: HomeAssistant) -> None:
"""A failed group does not stop a healthy group from running."""
def mixed_factory(group: str) -> list[str]:
def mixed_factory(group: str, url: str) -> list[str]:
if group == "broken":
return _FAILING_CMD
return [
@@ -109,7 +109,7 @@ async def test_multiple_groups_independent(hass: HomeAssistant) -> None:
"--name",
group,
"--url",
"ws://localhost:8123/api/websocket",
url,
"--token",
"test-token",
]
@@ -160,10 +160,11 @@ async def test_default_command_includes_token(
# Prime the token cache without actually launching a subprocess.
mgr._tokens["built-in"] = "token-built-in"
builtin_argv = mgr._default_command("built-in")
builtin_argv = mgr._default_command("built-in", "stdio://")
assert "token-built-in" in builtin_argv
assert "--name" in builtin_argv
assert "built-in" in builtin_argv
assert "stdio://" in builtin_argv
async def test_ensure_started_awaits_token_factory(hass: HomeAssistant) -> None:
@@ -174,7 +175,7 @@ async def test_ensure_started_awaits_token_factory(hass: HomeAssistant) -> None:
calls.append(group)
return f"token-{group}"
def quick_command(group: str) -> list[str]:
def quick_command(group: str, url: str) -> list[str]:
return _FAILING_CMD # Force a fast failure so the test runs fast.
mgr = SandboxManager(
@@ -31,7 +31,7 @@ FAST_CONFIG = SandboxConfig(
async def _manager_fixture(hass: HomeAssistant):
"""Manager + tighter timings; tears every sandbox down on exit."""
def _factory(group: str) -> list[str]:
def _factory(group: str, url: str) -> list[str]:
return [
sys.executable,
"-m",
@@ -39,7 +39,7 @@ async def _manager_fixture(hass: HomeAssistant):
"--name",
group,
"--url",
"ws://localhost:8123/api/websocket",
url,
"--token",
"phase4-test-token",
]
@@ -37,7 +37,7 @@ FAST_CONFIG = SandboxConfig(
async def _manager_fixture(hass: HomeAssistant) -> AsyncIterator[SandboxManager]:
"""Manager that spawns the real runtime; cleans up on teardown."""
def _factory(group: str) -> list[str]:
def _factory(group: str, url: str) -> list[str]:
return [
sys.executable,
"-m",
@@ -45,7 +45,7 @@ async def _manager_fixture(hass: HomeAssistant) -> AsyncIterator[SandboxManager]
"--name",
group,
"--url",
"ws://localhost:8123/api/websocket",
url,
"--token",
"phase9-test-token",
]
@@ -89,7 +89,7 @@ async def test_graceful_shutdown_falls_through_to_sigterm_on_timeout(
SIGTERM / SIGKILL.
"""
def _hung_factory(group: str) -> list[str]:
def _hung_factory(group: str, url: str) -> list[str]:
return [
sys.executable,
"-c",
@@ -137,7 +137,7 @@ async def test_graceful_shutdown_on_no_channel_is_noop(
) -> None:
"""A sandbox without a live channel reports failure and stays up."""
def _factory(group: str) -> list[str]:
def _factory(group: str, url: str) -> list[str]:
# Failing argv — supervisor records a failed attempt then dies.
return [sys.executable, "-c", "import sys; sys.exit(1)"]
@@ -179,7 +179,7 @@ async def test_on_shutdown_reply_callback_is_invoked(
async def _on_shutdown_reply(group: str, reply: pb.ShutdownResult) -> None:
replies.append((group, reply))
def _factory(group: str) -> list[str]:
def _factory(group: str, url: str) -> list[str]:
return [
sys.executable,
"-m",
@@ -187,7 +187,7 @@ async def test_on_shutdown_reply_callback_is_invoked(
"--name",
group,
"--url",
"ws://localhost:8123/api/websocket",
url,
"--token",
"phase9-reply-test",
]
@@ -0,0 +1,91 @@
"""Unix-socket control-channel transport (transport T3).
Spawns the real ``python -m hass_client.sandbox_v2`` runtime with the
manager configured for the unix-socket transport: the manager opens a
listening unix socket, passes ``--url unix://<path>`` to the subprocess,
the runtime dials back, and a ``ping`` round-trips over the socket. Also
covers manager-side transport selection and socket cleanup on shutdown.
"""
import asyncio
import os
import sys
import pytest
from homeassistant.components.sandbox_v2.manager import (
TRANSPORT_UNIX,
SandboxConfig,
SandboxManager,
)
from homeassistant.core import HomeAssistant
FAST_CONFIG = SandboxConfig(
restart_limit=2,
restart_window=30.0,
restart_backoff=0.05,
ready_timeout=20.0,
shutdown_grace=5.0,
)
def _runtime_factory(seen: dict[str, str]) -> object:
"""Command factory that records the URL the manager hands it."""
def _factory(group: str, url: str) -> list[str]:
seen["url"] = url
return [
sys.executable,
"-m",
"hass_client.sandbox_v2",
"--name",
group,
"--url",
url,
"--token",
"t3-unix-token",
]
return _factory
async def test_unix_socket_round_trip(hass: HomeAssistant) -> None:
"""Manager opens a unix socket; runtime connects; ping round-trips."""
seen: dict[str, str] = {}
mgr = SandboxManager(
hass,
command_factory=_runtime_factory(seen),
config=FAST_CONFIG,
transport=TRANSPORT_UNIX,
)
try:
sandbox = await mgr.ensure_started("built-in")
assert sandbox.state == "running"
# The manager selected the unix transport and handed the runtime a
# unix:// socket path that exists while the sandbox is running.
assert seen["url"].startswith("unix://")
socket_path = seen["url"].removeprefix("unix://")
assert os.path.exists(socket_path)
channel = sandbox.channel
assert channel is not None
result = await asyncio.wait_for(
channel.call("sandbox_v2/ping", None), timeout=5.0
)
assert result.pong == "sandbox_v2"
finally:
await mgr.async_stop_all()
assert sandbox.state == "stopped"
# No leaked socket file or tempdir after shutdown.
socket_path = seen["url"].removeprefix("unix://")
assert not os.path.exists(socket_path)
assert not os.path.exists(os.path.dirname(socket_path))
async def test_unknown_transport_rejected(hass: HomeAssistant) -> None:
"""An unknown transport name is rejected at construction time."""
with pytest.raises(ValueError, match="unknown sandbox transport"):
SandboxManager(hass, transport="carrier-pigeon")