sandbox_v2: stateless sandboxes — push integration source on entry_setup

Make sandboxes hold no integration code: main attaches a typed
IntegrationSource to EntrySetup (builtin no-op, or a git source pinned to
an exact commit sha), and the sandbox fetches custom (HACS) code into
<config>/custom_components/<domain> before async_setup.

- proto: IntegrationSource sub-message + EntrySetup.integration_source (10);
  both _pb2 mirrors regenerated.
- core sources.py: registered-resolver hook (async_register_sandbox_source_
  resolver) keeping core HACS-agnostic; builtin short-circuit; a custom
  domain with no resolver raises. Resolver pins ref to a sha (no core I/O).
- router: _entry_setup_payload resolves + sets integration_source.
- client sources.py: codeload-tarball fetch (injectable primitive),
  process-lifetime (url, ref) cache, manifest.json verification; wired into
  entry_runner before setup.
- tests: resolver registry + payload (HA side), tarball fetch/cache/verify
  with local fixtures (client side). No network in any test.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
Paulus Schoutsen
2026-06-03 09:23:33 -04:00
parent c92348b931
commit d4b7aef732
12 changed files with 986 additions and 155 deletions
File diff suppressed because one or more lines are too long
@@ -95,8 +95,24 @@ class DeviceInfo(_message.Message):
translation_key: str
def __init__(self, identifiers: _Optional[_Iterable[_Union[DevicePair, _Mapping]]] = ..., connections: _Optional[_Iterable[_Union[DevicePair, _Mapping]]] = ..., via_device: _Optional[_Union[DevicePair, _Mapping]] = ..., entry_type: _Optional[str] = ..., name: _Optional[str] = ..., manufacturer: _Optional[str] = ..., model: _Optional[str] = ..., model_id: _Optional[str] = ..., sw_version: _Optional[str] = ..., hw_version: _Optional[str] = ..., serial_number: _Optional[str] = ..., suggested_area: _Optional[str] = ..., configuration_url: _Optional[str] = ..., default_name: _Optional[str] = ..., default_manufacturer: _Optional[str] = ..., default_model: _Optional[str] = ..., translation_key: _Optional[str] = ...) -> None: ...
class IntegrationSource(_message.Message):
__slots__ = ("kind", "url", "ref", "tag", "domain", "subdir")
KIND_FIELD_NUMBER: _ClassVar[int]
URL_FIELD_NUMBER: _ClassVar[int]
REF_FIELD_NUMBER: _ClassVar[int]
TAG_FIELD_NUMBER: _ClassVar[int]
DOMAIN_FIELD_NUMBER: _ClassVar[int]
SUBDIR_FIELD_NUMBER: _ClassVar[int]
kind: str
url: str
ref: str
tag: str
domain: str
subdir: str
def __init__(self, kind: _Optional[str] = ..., url: _Optional[str] = ..., ref: _Optional[str] = ..., tag: _Optional[str] = ..., domain: _Optional[str] = ..., subdir: _Optional[str] = ...) -> None: ...
class EntrySetup(_message.Message):
__slots__ = ("entry_id", "domain", "title", "data", "options", "source", "unique_id", "version", "minor_version")
__slots__ = ("entry_id", "domain", "title", "data", "options", "source", "unique_id", "version", "minor_version", "integration_source")
ENTRY_ID_FIELD_NUMBER: _ClassVar[int]
DOMAIN_FIELD_NUMBER: _ClassVar[int]
TITLE_FIELD_NUMBER: _ClassVar[int]
@@ -106,6 +122,7 @@ class EntrySetup(_message.Message):
UNIQUE_ID_FIELD_NUMBER: _ClassVar[int]
VERSION_FIELD_NUMBER: _ClassVar[int]
MINOR_VERSION_FIELD_NUMBER: _ClassVar[int]
INTEGRATION_SOURCE_FIELD_NUMBER: _ClassVar[int]
entry_id: str
domain: str
title: str
@@ -115,7 +132,8 @@ class EntrySetup(_message.Message):
unique_id: str
version: int
minor_version: int
def __init__(self, entry_id: _Optional[str] = ..., domain: _Optional[str] = ..., title: _Optional[str] = ..., data: _Optional[_Union[_struct_pb2.Struct, _Mapping]] = ..., options: _Optional[_Union[_struct_pb2.Struct, _Mapping]] = ..., source: _Optional[str] = ..., unique_id: _Optional[str] = ..., version: _Optional[int] = ..., minor_version: _Optional[int] = ...) -> None: ...
integration_source: IntegrationSource
def __init__(self, entry_id: _Optional[str] = ..., domain: _Optional[str] = ..., title: _Optional[str] = ..., data: _Optional[_Union[_struct_pb2.Struct, _Mapping]] = ..., options: _Optional[_Union[_struct_pb2.Struct, _Mapping]] = ..., source: _Optional[str] = ..., unique_id: _Optional[str] = ..., version: _Optional[int] = ..., minor_version: _Optional[int] = ..., integration_source: _Optional[_Union[IntegrationSource, _Mapping]] = ...) -> None: ...
class EntrySetupResult(_message.Message):
__slots__ = ("ok", "reason")
+24 -3
View File
@@ -35,6 +35,7 @@ from .manager import SandboxManager
from .messages import dict_to_struct
from .protocol import MSG_ENTRY_SETUP, MSG_ENTRY_UNLOAD
from .proxy_flow import SandboxFlowProxy
from .sources import SandboxSourceError, async_resolve_integration_source
if TYPE_CHECKING:
from . import SandboxV2Data
@@ -113,7 +114,19 @@ class SandboxFlowRouter:
)
return False
payload = _entry_setup_payload(entry)
try:
payload = await _entry_setup_payload(self._hass, entry)
except SandboxSourceError as err:
_LOGGER.error(
"Cannot resolve integration source for entry %s (%s): %s",
entry.title,
entry.domain,
err,
)
entry._async_set_state( # noqa: SLF001
self._hass, ConfigEntryState.SETUP_ERROR, str(err)
)
return False
try:
result = await channel.call(MSG_ENTRY_SETUP, payload)
except ChannelClosedError:
@@ -187,11 +200,16 @@ class SandboxFlowRouter:
return classify(integration)
def _entry_setup_payload(entry: ConfigEntry) -> pb.EntrySetup:
async def _entry_setup_payload(
hass: HomeAssistant, entry: ConfigEntry
) -> pb.EntrySetup:
"""Build the typed ``EntrySetup`` message for ``sandbox_v2/entry_setup``.
Surfaces the small subset of entry fields the integration's
``async_setup_entry`` reads.
``async_setup_entry`` reads, plus the ``integration_source`` descriptor
telling a stateless sandbox where to fetch the code (built-in → no-op;
custom → a git source pinned to an exact sha). May raise
:class:`SandboxSourceError` if a custom integration has no source resolver.
"""
msg = pb.EntrySetup(
entry_id=entry.entry_id,
@@ -205,6 +223,9 @@ def _entry_setup_payload(entry: ConfigEntry) -> pb.EntrySetup:
)
if entry.unique_id is not None:
msg.unique_id = entry.unique_id
msg.integration_source.CopyFrom(
await async_resolve_integration_source(hass, entry.domain)
)
return msg
@@ -0,0 +1,152 @@
"""Main-side integration-source resolution for stateless sandboxes.
A sandbox holds no persistent state. The last stateful bit was the
integration *code*: built-ins ride the bundled ``homeassistant`` package, but
custom (HACS) integrations live under ``<config>/custom_components`` on the
main install and are absent from a fresh sandbox. This module lets main tell
the sandbox *where to fetch the code* on ``entry_setup``; the sandbox fetches
it before setup (see ``hass_client.sources``).
Core stays HACS-agnostic via a registered-resolver hook (decision (c),
2026-06-03): HACS — or any other distribution mechanism — registers a
resolver mapping a custom domain to a git source. Core ships only the
builtin-vs-git decision; with no resolver registered the default is
builtin-only, and a custom domain raises rather than silently falling back.
Security / tag→sha contract: the ``ref`` that crosses the wire must be an
exact commit sha, never a moving tag. Core performs **no network I/O** here,
so the resolver is responsible for pinning the installed version to a sha and
returning it in ``ref`` (HACS already knows the sha of what the user
installed). ``tag`` is informational only (logs). If a resolver returns a git
source without a ``ref``, that is an error — main refuses to ship a sandbox a
moving reference.
"""
from collections.abc import Callable
import logging
from typing import TypedDict
from homeassistant.core import HomeAssistant, callback
from homeassistant.loader import async_get_integration
from homeassistant.util.hass_dict import HassKey
from ._proto import sandbox_v2_pb2 as pb
_LOGGER = logging.getLogger(__name__)
class IntegrationSourceDict(TypedDict, total=False):
"""The dict shape a resolver returns for a custom (git) integration.
``kind`` is always ``"git"`` (built-ins never reach a resolver). ``url``
and ``ref`` (an exact commit sha) are required; ``domain`` and ``subdir``
default from the domain being resolved when omitted.
"""
kind: str
url: str
ref: str
tag: str
domain: str
subdir: str
# A resolver maps a custom integration domain to its git source, or ``None``
# if it does not know that domain. Called only for non-built-in integrations.
SandboxSourceResolver = Callable[[str], IntegrationSourceDict | None]
DATA_SOURCE_RESOLVERS: HassKey[list[SandboxSourceResolver]] = HassKey(
"sandbox_v2_source_resolvers"
)
class SandboxSourceError(Exception):
"""Raised when an integration's source cannot be resolved."""
@callback
def async_register_sandbox_source_resolver(
hass: HomeAssistant, resolver: SandboxSourceResolver
) -> Callable[[], None]:
"""Register a resolver mapping a custom domain to its git source.
HACS (or any custom-integration distribution mechanism) calls this to
teach the sandbox where to fetch code from. Resolvers are consulted in
registration order; the first to return a non-``None`` source wins. The
resolver MUST pin ``ref`` to an exact commit sha (see module docstring).
Returns a callback that unregisters the resolver.
"""
resolvers = hass.data.setdefault(DATA_SOURCE_RESOLVERS, [])
resolvers.append(resolver)
@callback
def _unregister() -> None:
resolvers.remove(resolver)
return _unregister
async def async_resolve_integration_source(
hass: HomeAssistant, domain: str
) -> pb.IntegrationSource:
"""Resolve the source descriptor for ``domain``'s code.
Built-in integrations short-circuit to ``{kind: "builtin"}`` (the bundled
``homeassistant`` package provides them). For a custom integration the
registered resolvers are consulted in order; the first git source returned
is used. If no resolver knows the domain, raises :class:`SandboxSourceError`
— a custom integration with no source cannot run in a stateless sandbox, so
the failure is surfaced rather than masked.
"""
integration = await async_get_integration(hass, domain)
if integration.is_built_in:
return pb.IntegrationSource(kind="builtin")
for resolver in hass.data.get(DATA_SOURCE_RESOLVERS, []):
source = resolver(domain)
if source is not None:
return _git_source_from_dict(domain, source)
raise SandboxSourceError(
f"no sandbox source resolver knows custom integration {domain!r}; "
"a custom integration cannot run in a stateless sandbox without one"
)
def _git_source_from_dict(
domain: str, source: IntegrationSourceDict
) -> pb.IntegrationSource:
"""Build a typed git ``IntegrationSource`` from a resolver's dict.
Validates the tag→sha pinning contract: ``url`` and an exact-sha ``ref``
are required. ``domain`` and ``subdir`` default from ``domain``.
"""
url = source.get("url")
if not url:
raise SandboxSourceError(
f"resolver returned a git source for {domain!r} without a url"
)
ref = source.get("ref")
if not ref:
raise SandboxSourceError(
f"resolver returned a git source for {domain!r} without a ref; "
"the resolver must pin the version to an exact commit sha"
)
return pb.IntegrationSource(
kind="git",
url=url,
ref=ref,
tag=source.get("tag", ""),
domain=source.get("domain", domain),
subdir=source.get("subdir", f"custom_components/{domain}"),
)
__all__ = [
"IntegrationSourceDict",
"SandboxSourceError",
"SandboxSourceResolver",
"async_register_sandbox_source_resolver",
"async_resolve_integration_source",
]
File diff suppressed because one or more lines are too long
@@ -95,8 +95,24 @@ class DeviceInfo(_message.Message):
translation_key: str
def __init__(self, identifiers: _Optional[_Iterable[_Union[DevicePair, _Mapping]]] = ..., connections: _Optional[_Iterable[_Union[DevicePair, _Mapping]]] = ..., via_device: _Optional[_Union[DevicePair, _Mapping]] = ..., entry_type: _Optional[str] = ..., name: _Optional[str] = ..., manufacturer: _Optional[str] = ..., model: _Optional[str] = ..., model_id: _Optional[str] = ..., sw_version: _Optional[str] = ..., hw_version: _Optional[str] = ..., serial_number: _Optional[str] = ..., suggested_area: _Optional[str] = ..., configuration_url: _Optional[str] = ..., default_name: _Optional[str] = ..., default_manufacturer: _Optional[str] = ..., default_model: _Optional[str] = ..., translation_key: _Optional[str] = ...) -> None: ...
class IntegrationSource(_message.Message):
__slots__ = ("kind", "url", "ref", "tag", "domain", "subdir")
KIND_FIELD_NUMBER: _ClassVar[int]
URL_FIELD_NUMBER: _ClassVar[int]
REF_FIELD_NUMBER: _ClassVar[int]
TAG_FIELD_NUMBER: _ClassVar[int]
DOMAIN_FIELD_NUMBER: _ClassVar[int]
SUBDIR_FIELD_NUMBER: _ClassVar[int]
kind: str
url: str
ref: str
tag: str
domain: str
subdir: str
def __init__(self, kind: _Optional[str] = ..., url: _Optional[str] = ..., ref: _Optional[str] = ..., tag: _Optional[str] = ..., domain: _Optional[str] = ..., subdir: _Optional[str] = ...) -> None: ...
class EntrySetup(_message.Message):
__slots__ = ("entry_id", "domain", "title", "data", "options", "source", "unique_id", "version", "minor_version")
__slots__ = ("entry_id", "domain", "title", "data", "options", "source", "unique_id", "version", "minor_version", "integration_source")
ENTRY_ID_FIELD_NUMBER: _ClassVar[int]
DOMAIN_FIELD_NUMBER: _ClassVar[int]
TITLE_FIELD_NUMBER: _ClassVar[int]
@@ -106,6 +122,7 @@ class EntrySetup(_message.Message):
UNIQUE_ID_FIELD_NUMBER: _ClassVar[int]
VERSION_FIELD_NUMBER: _ClassVar[int]
MINOR_VERSION_FIELD_NUMBER: _ClassVar[int]
INTEGRATION_SOURCE_FIELD_NUMBER: _ClassVar[int]
entry_id: str
domain: str
title: str
@@ -115,7 +132,8 @@ class EntrySetup(_message.Message):
unique_id: str
version: int
minor_version: int
def __init__(self, entry_id: _Optional[str] = ..., domain: _Optional[str] = ..., title: _Optional[str] = ..., data: _Optional[_Union[_struct_pb2.Struct, _Mapping]] = ..., options: _Optional[_Union[_struct_pb2.Struct, _Mapping]] = ..., source: _Optional[str] = ..., unique_id: _Optional[str] = ..., version: _Optional[int] = ..., minor_version: _Optional[int] = ...) -> None: ...
integration_source: IntegrationSource
def __init__(self, entry_id: _Optional[str] = ..., domain: _Optional[str] = ..., title: _Optional[str] = ..., data: _Optional[_Union[_struct_pb2.Struct, _Mapping]] = ..., options: _Optional[_Union[_struct_pb2.Struct, _Mapping]] = ..., source: _Optional[str] = ..., unique_id: _Optional[str] = ..., version: _Optional[int] = ..., minor_version: _Optional[int] = ..., integration_source: _Optional[_Union[IntegrationSource, _Mapping]] = ...) -> None: ...
class EntrySetupResult(_message.Message):
__slots__ = ("ok", "reason")
@@ -19,6 +19,7 @@ from .approved_domains import ApprovedDomains
from .channel import Channel
from .messages import dict_to_struct, struct_to_dict
from .protocol import MSG_CALL_SERVICE, MSG_ENTRY_SETUP, MSG_ENTRY_UNLOAD
from .sources import FetchPrimitive, SandboxSourceError, async_ensure_integration_source
_LOGGER = logging.getLogger(__name__)
@@ -27,15 +28,22 @@ class EntryRunner:
"""Load integrations on demand and run config entries inside the sandbox."""
def __init__(
self, hass: HomeAssistant, approved: ApprovedDomains | None = None
self,
hass: HomeAssistant,
approved: ApprovedDomains | None = None,
*,
fetch: FetchPrimitive | None = None,
) -> None:
"""Initialise with the sandbox-private HA instance.
``approved`` is shared with the service + event mirrors so an
entry's domain becomes approved as soon as setup completes.
``fetch`` overrides the integration-source download primitive (tests
inject a local stub); ``None`` uses the real codeload tarball fetch.
"""
self.hass = hass
self.approved = approved if approved is not None else ApprovedDomains()
self._fetch = fetch
def register(self, channel: Channel) -> None:
"""Wire the ``sandbox_v2/entry_*`` + ``call_service`` handlers."""
@@ -50,6 +58,24 @@ class EntryRunner:
except (KeyError, TypeError) as err:
return pb.EntrySetupResult(ok=False, reason=f"bad payload: {err}")
# Fetch the integration code before setup so a stateless sandbox can
# load custom (HACS) integrations whose code isn't bundled. Built-in
# sources are a no-op.
try:
await async_ensure_integration_source(
self.hass.config.config_dir,
msg.integration_source,
fetch=self._fetch,
)
except SandboxSourceError as err:
_LOGGER.error(
"sandbox entry_setup: source fetch failed for %s (%s): %s",
entry.title,
entry.domain,
err,
)
return pb.EntrySetupResult(ok=False, reason=f"source fetch failed: {err}")
config_entries = self.hass.config_entries
if config_entries.async_get_entry(entry.entry_id) is not None:
return pb.EntrySetupResult(ok=False, reason="entry already loaded")
@@ -0,0 +1,193 @@
"""Sandbox-side integration-source fetching — fetch custom code at startup.
A stateless sandbox starts with only the bundled ``homeassistant`` package on
disk. Built-in integrations are already present (no-op); custom (HACS)
integrations carry a git :class:`~._proto.sandbox_v2_pb2.IntegrationSource`
descriptor on ``entry_setup`` that the sandbox fetches into
``<config>/custom_components/<domain>`` *before* ``async_setup`` runs (see
:meth:`hass_client.entry_runner.EntryRunner._handle_entry_setup`).
The fetch uses GitHub's codeload tarball for the exact commit sha (no ``git``
binary dependency, matching HACS). A process-lifetime cache keyed by
``(url, ref)`` means multiple entries sourced from the same repo download once.
The download primitive is injectable so tests substitute a local fixture for
the real network fetch — no test ever hits GitHub.
"""
import asyncio
from collections.abc import Awaitable, Callable
import io
import logging
from pathlib import Path
import tarfile
from ._proto import sandbox_v2_pb2 as pb
_LOGGER = logging.getLogger(__name__)
# url, ref -> downloaded tarball bytes. Process-lifetime only (honours
# "stateless": nothing survives a process restart). Guarded by _CACHE_LOCK so
# concurrent entries from the same repo download exactly once.
_TARBALL_CACHE: dict[tuple[str, str], bytes] = {}
_CACHE_LOCK = asyncio.Lock()
# (repo url, exact sha) -> tarball bytes.
FetchPrimitive = Callable[[str, str], Awaitable[bytes]]
class SandboxSourceError(Exception):
"""Raised when a custom integration's source cannot be fetched."""
async def async_ensure_integration_source(
config_dir: str,
source: pb.IntegrationSource,
*,
fetch: FetchPrimitive | None = None,
) -> None:
"""Ensure ``source``'s integration code is present under ``config_dir``.
* ``builtin`` (or an unset source) → no-op; the bundled ``homeassistant``
package provides built-ins.
* ``git`` → if ``<config_dir>/custom_components/<domain>`` is not already
present, download the tarball for the exact ``ref`` and extract the
repo's ``subdir`` into it.
``fetch`` is the download primitive ``(url, ref) -> tarball bytes``;
defaults to the real codeload download. Tests pass a local stub.
"""
kind = source.kind or "builtin"
if kind == "builtin":
return
if kind != "git":
raise SandboxSourceError(f"unknown integration source kind {kind!r}")
domain = source.domain
if not domain:
raise SandboxSourceError("git integration source is missing a domain")
if not source.url or not source.ref:
raise SandboxSourceError(
f"git integration source for {domain!r} is missing url/ref"
)
dest = Path(config_dir) / "custom_components" / domain
manifest = dest / "manifest.json"
if manifest.exists():
# Already fetched into this config dir (another entry of the same
# domain, or a prior call) — nothing to do.
return
subdir = source.subdir or f"custom_components/{domain}"
fetcher = fetch if fetch is not None else _default_fetch
key = (source.url, source.ref)
async with _CACHE_LOCK:
tarball = _TARBALL_CACHE.get(key)
if tarball is None:
_LOGGER.info(
"sandbox: fetching %s from %s@%s (%s)",
domain,
source.url,
source.ref,
source.tag or "no tag",
)
tarball = await fetcher(source.url, source.ref)
_TARBALL_CACHE[key] = tarball
await asyncio.get_running_loop().run_in_executor(
None, _extract_subdir, tarball, subdir, dest
)
if not manifest.exists():
raise SandboxSourceError(
f"fetched source for {domain!r} has no manifest.json at "
f"{subdir!r} (ref {source.ref})"
)
def _extract_subdir(tarball: bytes, subdir: str, dest: Path) -> None:
"""Extract ``<top>/<subdir>/`` from a gzipped tarball into ``dest``.
GitHub's codeload tarball wraps everything in a single top-level
``<repo>-<ref>/`` directory; the integration lives at ``<top>/<subdir>``.
Members outside the subdir are ignored; any member resolving outside
``dest`` (path traversal) is rejected.
"""
subdir = subdir.strip("/")
dest_root = dest.resolve()
with tarfile.open(fileobj=io.BytesIO(tarball), mode="r:gz") as tar:
members = tar.getmembers()
if not members:
raise SandboxSourceError("fetched tarball is empty")
top = members[0].name.split("/", 1)[0]
prefix = f"{top}/{subdir}/"
extracted = 0
for member in members:
if not member.name.startswith(prefix):
continue
rel = member.name[len(prefix) :]
if not rel:
continue
target = (dest / rel).resolve()
if not target.is_relative_to(dest_root):
raise SandboxSourceError(
f"refusing path-traversal member {member.name!r}"
)
if member.isdir():
target.mkdir(parents=True, exist_ok=True)
continue
if not member.isfile():
# Skip symlinks / devices / etc. — integration trees are
# plain files; anything else is suspect.
continue
target.parent.mkdir(parents=True, exist_ok=True)
source = tar.extractfile(member)
if source is None:
continue
with source, target.open("wb") as handle:
handle.write(source.read())
extracted += 1
if extracted == 0:
raise SandboxSourceError(
f"fetched tarball had no files under {prefix!r}"
)
def _codeload_url(url: str, ref: str) -> str:
"""Build the GitHub codeload tarball URL for ``url`` at ``ref``.
``https://github.com/owner/repo`` → ``https://codeload.github.com/owner/
repo/tar.gz/<ref>``.
"""
trimmed = url.rstrip("/").removesuffix(".git")
parts = trimmed.split("/")
if len(parts) < 2:
raise SandboxSourceError(f"cannot derive owner/repo from url {url!r}")
owner, repo = parts[-2], parts[-1]
return f"https://codeload.github.com/{owner}/{repo}/tar.gz/{ref}"
async def _default_fetch(url: str, ref: str) -> bytes:
"""Download the codeload tarball for ``url`` at the exact ``ref`` (sha).
The real network path. A one-shot transient session is fine here: this
runs once per ``(url, ref)`` at sandbox startup. Imported lazily so the
module (and tests using a stub) never require ``aiohttp``.
"""
import aiohttp # noqa: PLC0415 — keep aiohttp optional for the stubbed path
tar_url = _codeload_url(url, ref)
async with (
aiohttp.ClientSession() as session,
session.get(tar_url) as response,
):
response.raise_for_status()
return await response.read()
__all__ = [
"FetchPrimitive",
"SandboxSourceError",
"async_ensure_integration_source",
]
@@ -0,0 +1,210 @@
"""Tests for the sandbox-side integration-source fetch (``hass_client.sources``).
All fetches use a local in-memory tarball fixture — no test hits the network.
"""
from collections.abc import Iterator
import io
from pathlib import Path
import tarfile
from hass_client import sources as sources_module
from hass_client._proto import sandbox_v2_pb2 as pb
from hass_client.sources import SandboxSourceError, async_ensure_integration_source
import pytest
@pytest.fixture(autouse=True)
def _clear_tarball_cache() -> Iterator[None]:
"""Reset the process-lifetime tarball cache between tests for isolation."""
sources_module._TARBALL_CACHE.clear() # noqa: SLF001
yield
sources_module._TARBALL_CACHE.clear() # noqa: SLF001
def _make_tarball(
*, top: str, files: dict[str, str]
) -> bytes:
"""Build a gzipped tarball mimicking GitHub's codeload layout.
``files`` maps a path relative to ``top`` to its contents.
"""
buffer = io.BytesIO()
with tarfile.open(fileobj=buffer, mode="w:gz") as tar:
for rel, content in files.items():
data = content.encode()
info = tarfile.TarInfo(name=f"{top}/{rel}")
info.size = len(data)
tar.addfile(info, io.BytesIO(data))
return buffer.getvalue()
def _git_source(domain: str = "my_custom") -> pb.IntegrationSource:
return pb.IntegrationSource(
kind="git",
url="https://github.com/owner/my_custom",
ref="a" * 40,
tag="v1.0.0",
domain=domain,
subdir=f"custom_components/{domain}",
)
async def test_builtin_is_a_noop(tmp_path: Path) -> None:
"""A builtin source fetches nothing and creates no files."""
calls = 0
async def _fetch(url: str, ref: str) -> bytes:
nonlocal calls
calls += 1
return b""
await async_ensure_integration_source(
str(tmp_path), pb.IntegrationSource(kind="builtin"), fetch=_fetch
)
assert calls == 0
assert not (tmp_path / "custom_components").exists()
async def test_unset_source_is_a_noop(tmp_path: Path) -> None:
"""An unset (default) source is treated as builtin — no fetch."""
calls = 0
async def _fetch(url: str, ref: str) -> bytes:
nonlocal calls
calls += 1
return b""
await async_ensure_integration_source(
str(tmp_path), pb.IntegrationSource(), fetch=_fetch
)
assert calls == 0
async def test_git_source_extracts_into_config_dir(tmp_path: Path) -> None:
"""A git source extracts the repo subdir into custom_components/<domain>."""
tarball = _make_tarball(
top="my_custom-aaaa",
files={
"custom_components/my_custom/manifest.json": '{"domain": "my_custom"}',
"custom_components/my_custom/__init__.py": "DOMAIN = 'my_custom'",
"README.md": "ignored — outside the subdir",
},
)
async def _fetch(url: str, ref: str) -> bytes:
return tarball
await async_ensure_integration_source(
str(tmp_path), _git_source(), fetch=_fetch
)
dest = tmp_path / "custom_components" / "my_custom"
assert (dest / "manifest.json").read_text() == '{"domain": "my_custom"}'
assert (dest / "__init__.py").read_text() == "DOMAIN = 'my_custom'"
# Files outside the subdir are not extracted.
assert not (tmp_path / "README.md").exists()
async def test_second_call_same_ref_hits_cache(tmp_path: Path) -> None:
"""Two entries from the same (url, ref) download once."""
tarball = _make_tarball(
top="my_custom-aaaa",
files={
"custom_components/foo/manifest.json": "{}",
"custom_components/bar/manifest.json": "{}",
},
)
calls = 0
async def _fetch(url: str, ref: str) -> bytes:
nonlocal calls
calls += 1
return tarball
source_foo = pb.IntegrationSource(
kind="git",
url="https://github.com/owner/repo",
ref="z" * 40,
domain="foo",
subdir="custom_components/foo",
)
source_bar = pb.IntegrationSource(
kind="git",
url="https://github.com/owner/repo",
ref="z" * 40,
domain="bar",
subdir="custom_components/bar",
)
await async_ensure_integration_source(str(tmp_path), source_foo, fetch=_fetch)
await async_ensure_integration_source(str(tmp_path), source_bar, fetch=_fetch)
assert calls == 1
assert (tmp_path / "custom_components" / "foo" / "manifest.json").exists()
assert (tmp_path / "custom_components" / "bar" / "manifest.json").exists()
async def test_already_present_skips_fetch(tmp_path: Path) -> None:
"""An existing custom_components/<domain>/manifest.json skips the fetch."""
dest = tmp_path / "custom_components" / "my_custom"
dest.mkdir(parents=True)
(dest / "manifest.json").write_text("{}")
calls = 0
async def _fetch(url: str, ref: str) -> bytes:
nonlocal calls
calls += 1
return b""
await async_ensure_integration_source(
str(tmp_path), _git_source(), fetch=_fetch
)
assert calls == 0
async def test_missing_manifest_raises(tmp_path: Path) -> None:
"""A fetched tree without manifest.json is rejected."""
tarball = _make_tarball(
top="my_custom-aaaa",
files={"custom_components/my_custom/__init__.py": "x = 1"},
)
async def _fetch(url: str, ref: str) -> bytes:
return tarball
with pytest.raises(SandboxSourceError, match="manifest.json"):
await async_ensure_integration_source(
str(tmp_path), _git_source(), fetch=_fetch
)
async def test_unknown_kind_raises(tmp_path: Path) -> None:
"""An unrecognised source kind is an error."""
async def _fetch(url: str, ref: str) -> bytes:
return b""
with pytest.raises(SandboxSourceError, match="unknown integration source kind"):
await async_ensure_integration_source(
str(tmp_path), pb.IntegrationSource(kind="svn"), fetch=_fetch
)
async def test_empty_subdir_match_raises(tmp_path: Path) -> None:
"""A tarball with no files under the subdir is rejected."""
tarball = _make_tarball(
top="my_custom-aaaa",
files={"some/other/path.py": "x = 1"},
)
async def _fetch(url: str, ref: str) -> bytes:
return tarball
with pytest.raises(SandboxSourceError):
await async_ensure_integration_source(
str(tmp_path), _git_source(), fetch=_fetch
)
+16
View File
@@ -91,6 +91,21 @@ message DeviceInfo {
// --- entry_setup / entry_unload (main -> sandbox) -------------------------
// Where the sandbox should source the integration's code from. Makes the
// sandbox stateless: built-ins ride the bundled `homeassistant` package
// (no-op), custom (HACS) integrations carry a git source the sandbox fetches
// at startup. Main pins the source to an exact commit `ref` (sha) before it
// crosses the wire — never a moving tag — see `sources.py`.
message IntegrationSource {
string kind = 1; // "builtin" | "git"
// git-only fields (unset for builtin):
string url = 2; // repository url, e.g. https://github.com/owner/repo
string ref = 3; // exact commit sha (main pins tag -> sha)
string tag = 4; // human-readable version, for logs only
string domain = 5; // the custom integration's domain
string subdir = 6; // path within the repo, e.g. custom_components/<domain>
}
message EntrySetup {
string entry_id = 1;
string domain = 2;
@@ -101,6 +116,7 @@ message EntrySetup {
optional string unique_id = 7;
int32 version = 8;
int32 minor_version = 9;
IntegrationSource integration_source = 10;
}
message EntrySetupResult {
+40 -1
View File
@@ -8,7 +8,13 @@ from homeassistant.components.sandbox_v2._proto import sandbox_v2_pb2 as pb
from homeassistant.components.sandbox_v2.manager import SandboxManager
from homeassistant.components.sandbox_v2.messages import struct_to_dict
from homeassistant.components.sandbox_v2.proxy_flow import SandboxFlowProxy
from homeassistant.components.sandbox_v2.router import SandboxFlowRouter
from homeassistant.components.sandbox_v2.router import (
SandboxFlowRouter,
_entry_setup_payload,
)
from homeassistant.components.sandbox_v2.sources import (
async_register_sandbox_source_resolver,
)
from homeassistant.config_entries import SOURCE_USER, ConfigEntry, ConfigFlowContext
from homeassistant.core import HomeAssistant
@@ -118,6 +124,7 @@ async def test_async_setup_entry_routes_to_sandbox(
channel_b.start()
manager.install("built-in", channel_a)
mock_integration(hass, MockModule("test_entry"))
entry = MockConfigEntry(
domain="test_entry",
title="Test",
@@ -155,6 +162,7 @@ async def test_async_setup_entry_marks_setup_error_on_failure(
channel_b.start()
manager.install("built-in", channel_a)
mock_integration(hass, MockModule("test_fail"))
entry = MockConfigEntry(
domain="test_fail",
title="Test",
@@ -173,6 +181,37 @@ async def test_async_setup_entry_marks_setup_error_on_failure(
assert entry.reason == "boom"
async def test_entry_setup_payload_sets_builtin_source(hass: HomeAssistant) -> None:
"""A built-in entry's payload carries a ``{kind: builtin}`` source."""
mock_integration(hass, MockModule("payload_builtin"))
entry = MockConfigEntry(domain="payload_builtin", title="Payload")
payload = await _entry_setup_payload(hass, cast(ConfigEntry, entry))
assert payload.integration_source.kind == "builtin"
async def test_entry_setup_payload_sets_git_source(hass: HomeAssistant) -> None:
"""A custom entry's payload carries the resolver's pinned git source."""
mock_integration(hass, MockModule("payload_custom"), built_in=False)
async_register_sandbox_source_resolver(
hass,
lambda domain: {
"kind": "git",
"url": "https://github.com/owner/payload_custom",
"ref": "d" * 40,
"tag": "v2.0.0",
},
)
entry = MockConfigEntry(domain="payload_custom", title="Payload")
payload = await _entry_setup_payload(hass, cast(ConfigEntry, entry))
assert payload.integration_source.kind == "git"
assert payload.integration_source.ref == "d" * 40
assert payload.integration_source.subdir == "custom_components/payload_custom"
async def test_async_setup_entry_returns_none_when_not_sandboxed(
hass: HomeAssistant, manager: FakeSandboxManager
) -> None:
+134
View File
@@ -0,0 +1,134 @@
"""Tests for the main-side integration-source resolver registry."""
import pytest
from homeassistant.components.sandbox_v2.sources import (
IntegrationSourceDict,
SandboxSourceError,
async_register_sandbox_source_resolver,
async_resolve_integration_source,
)
from homeassistant.core import HomeAssistant
from tests.common import MockModule, mock_integration
async def test_resolve_builtin_returns_builtin_kind(hass: HomeAssistant) -> None:
"""A built-in integration short-circuits to ``{kind: builtin}``."""
mock_integration(hass, MockModule("demo_builtin"))
source = await async_resolve_integration_source(hass, "demo_builtin")
assert source.kind == "builtin"
assert source.url == ""
async def test_resolve_builtin_ignores_resolvers(hass: HomeAssistant) -> None:
"""Built-ins never consult a resolver, even when one is registered."""
def _resolver(domain: str) -> IntegrationSourceDict | None:
raise AssertionError("resolver must not be consulted for built-ins")
async_register_sandbox_source_resolver(hass, _resolver)
mock_integration(hass, MockModule("demo_builtin2"))
source = await async_resolve_integration_source(hass, "demo_builtin2")
assert source.kind == "builtin"
async def test_resolve_custom_uses_registered_resolver(hass: HomeAssistant) -> None:
"""A custom integration resolves to the registered git source."""
mock_integration(hass, MockModule("my_custom"), built_in=False)
def _resolver(domain: str) -> IntegrationSourceDict | None:
assert domain == "my_custom"
return {
"kind": "git",
"url": "https://github.com/owner/my_custom",
"ref": "a" * 40,
"tag": "v1.2.3",
}
async_register_sandbox_source_resolver(hass, _resolver)
source = await async_resolve_integration_source(hass, "my_custom")
assert source.kind == "git"
assert source.url == "https://github.com/owner/my_custom"
assert source.ref == "a" * 40
assert source.tag == "v1.2.3"
assert source.domain == "my_custom"
# subdir defaults from the domain when the resolver omits it.
assert source.subdir == "custom_components/my_custom"
async def test_resolve_custom_without_resolver_raises(hass: HomeAssistant) -> None:
"""A custom integration with no resolver cannot run — surface it."""
mock_integration(hass, MockModule("orphan_custom"), built_in=False)
with pytest.raises(SandboxSourceError, match="no sandbox source resolver"):
await async_resolve_integration_source(hass, "orphan_custom")
async def test_resolve_custom_resolver_returning_none_raises(
hass: HomeAssistant,
) -> None:
"""A resolver that doesn't know the domain falls through to the error."""
mock_integration(hass, MockModule("unknown_custom"), built_in=False)
async_register_sandbox_source_resolver(hass, lambda domain: None)
with pytest.raises(SandboxSourceError, match="no sandbox source resolver"):
await async_resolve_integration_source(hass, "unknown_custom")
async def test_resolve_git_source_without_ref_raises(hass: HomeAssistant) -> None:
"""A git source missing its pinned sha is rejected (no moving tags)."""
mock_integration(hass, MockModule("unpinned_custom"), built_in=False)
async_register_sandbox_source_resolver(
hass,
lambda domain: {
"kind": "git",
"url": "https://github.com/owner/unpinned_custom",
"tag": "v1.0.0",
},
)
with pytest.raises(SandboxSourceError, match="must pin"):
await async_resolve_integration_source(hass, "unpinned_custom")
async def test_resolvers_consulted_in_order(hass: HomeAssistant) -> None:
"""The first resolver returning a source wins."""
mock_integration(hass, MockModule("ordered_custom"), built_in=False)
async_register_sandbox_source_resolver(hass, lambda domain: None)
async_register_sandbox_source_resolver(
hass,
lambda domain: {
"kind": "git",
"url": "https://github.com/owner/ordered_custom",
"ref": "b" * 40,
},
)
source = await async_resolve_integration_source(hass, "ordered_custom")
assert source.ref == "b" * 40
async def test_unregister_resolver(hass: HomeAssistant) -> None:
"""Unregistering a resolver removes it from the consulted set."""
mock_integration(hass, MockModule("gone_custom"), built_in=False)
unregister = async_register_sandbox_source_resolver(
hass,
lambda domain: {
"kind": "git",
"url": "https://github.com/owner/gone_custom",
"ref": "c" * 40,
},
)
unregister()
with pytest.raises(SandboxSourceError):
await async_resolve_integration_source(hass, "gone_custom")