fix in ui

This commit is contained in:
Rolf
2026-02-13 21:17:04 +01:00
parent 154c3a7af3
commit 08bb09c418
9 changed files with 170 additions and 191 deletions

View File

@@ -3,19 +3,37 @@ from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.const import Platform
from .const import DOMAIN
PLATFORMS = [Platform.SENSOR, Platform.BUTTON]
from .const import DOMAIN, MANUAL_REFRESH_SERVICE
from .sensor import UsterWasteDataUpdateCoordinator
PLATFORMS = [Platform.SENSOR]
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up Uster Waste from a config entry."""
hass.data.setdefault(DOMAIN, {})[entry.entry_id] = entry.data
# Set up coordinator *once per entry*
coordinator = UsterWasteDataUpdateCoordinator(hass, entry)
hass.data[DOMAIN][entry.entry_id] = coordinator
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
# Register manual refresh service
async def handle_refresh(call):
entry_id = call.data.get("entry_id", entry.entry_id)
coord = hass.data[DOMAIN].get(entry_id)
if coord:
await coord.async_request_refresh()
else:
hass.components.persistent_notification.create(
"Uster Waste: No active entry found. Reload the integration.",
title="Uster Waste Error"
)
hass.services.async_register(DOMAIN, MANUAL_REFRESH_SERVICE, handle_refresh)
return True
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Unload a config entry."""
"""Unload config entry."""
hass.services.async_remove(DOMAIN, MANUAL_REFRESH_SERVICE)
return await hass.config_entries.async_unload_platforms(entry, PLATFORMS)

View File

@@ -1,48 +1,37 @@
"""Config flow for Uster Waste integration."""
"""Config flow for Uster Waste."""
import voluptuous as vol
from homeassistant import config_entries
from homeassistant.data_entry_flow import FlowResult
from homeassistant.const import CONF_NAME
from .const import DOMAIN
# Suggested defaults (update these as needed)
DEFAULT_NAME = "Uster Waste Schedule"
DATA_SCHEMA = vol.Schema({
vol.Required(CONF_NAME, default=DEFAULT_NAME): str,
vol.Required("token", default=""): str,
vol.Required("id", default=""): str,
vol.Required(CONF_NAME, default="Uster Waste"): str,
vol.Required("token", default="").strip(),
vol.Required("id", default="").strip(),
})
class UsterWasteConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
"""Handle a config flow for Uster Waste."""
VERSION = 1
async def async_step_user(
self, user_input: dict[str, str] | None = None
) -> FlowResult:
"""Handle the initial step."""
async def async_step_user(self, user_input=None):
if user_input is None:
return self.async_show_form(step_id="user", data_schema=DATA_SCHEMA)
token = user_input["token"].strip()
waste_id = user_input["id"].strip()
if not token or not waste_id:
return self.async_show_form(
step_id="user",
data_schema=DATA_SCHEMA,
errors={"base": "missing_params"},
step_id="user", data_schema=DATA_SCHEMA,
errors={"base": "missing_params"}
)
# Build the URL (store config, not raw URL)
config = {
CONF_NAME: user_input[CONF_NAME],
"token": token,
"id": waste_id,
}
return self.async_create_entry(title=user_input[CONF_NAME], data=config)
return self.async_create_entry(
title=user_input[CONF_NAME],
data={
CONF_NAME: user_input[CONF_NAME],
"token": token,
"id": waste_id,
}
)

View File

@@ -11,3 +11,5 @@ ATTR_ERROR = "error"
# UI Labels
MANUAL_REFRESH = "manual_refresh"
MANUAL_REFRESH_SERVICE = "refresh_waste_schedule"

View File

@@ -1,209 +1,179 @@
"""Sensor platform for Uster Waste."""
import asyncio
"""Uster Waste Sensor."""
import logging
import re
import asyncio
from datetime import datetime, timedelta
from typing import Optional
import aiohttp
from bs4 import BeautifulSoup
from homeassistant.components.sensor import SensorEntity
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.update_coordinator import (
DataUpdateCoordinator,
UpdateFailed,
)
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from .const import (
DOMAIN,
ATTR_NEXT_COLLECTION,
ATTR_DATE,
ATTR_TYPE,
ATTR_DAYS_UNTIL,
ATTR_ENTRIES,
ATTR_ERROR,
MANUAL_REFRESH,
)
from .const import DOMAIN, MANUAL_REFRESH_SERVICE
_LOGGER = logging.getLogger(__name__)
SCAN_INTERVAL = timedelta(days=1)
# ✅ CORRECTED URL FORMAT (no %5B, no extra encoded brackets)
BASE_URL = "https://www.uster.ch/abfallstrassenabschnitt"
# Swiss date helpers
MONTH_MAP = {
# For Swiss date parsing
MONTHS = {
"Jan": "01", "Feb": "02", "Mrz": "03", "Mär": "03",
"Apr": "04", "Mai": "05", "Jun": "06", "Jul": "07",
"Aug": "08", "Sep": "09", "Okt": "10", "Nov": "11", "Dez": "12"
}
def _parse_date(date_str: str) -> Optional[datetime]:
"""Convert Swiss date string (e.g., '24.10.2023' or '24. Okt. 2023') to datetime."""
def parse_swiss_date(date_str: str) -> Optional[datetime]:
"""Parse Swiss date like '24. Okt. 2023' or '24.10.2023'."""
date_str = date_str.strip()
# Normalize Swiss month abbreviations
for key, value in MONTH_MAP.items():
date_str = date_str.replace(key, value)
# Try formats: dd.mm.yyyy, d.m.yy
if not date_str:
return None
# Replace Swiss months (e.g., "Okt" → "10")
for swiss, num in MONTHS.items():
date_str = date_str.replace(swiss, num)
# Clean extra spaces/dots
date_str = re.sub(r'\.\s*\.', '.', date_str)
# Try multiple formats
for fmt in ["%d.%m.%Y", "%d.%m.%y"]:
try:
return datetime.strptime(date_str, fmt)
except ValueError:
continue
_LOGGER.warning(f"Could not parse date: '{date_str}'")
_LOGGER.warning(f"Failed to parse date: {date_str}")
return None
class UsterWasteDataUpdateCoordinator(DataUpdateCoordinator[dict]):
"""Fetch and cache Uster waste data."""
def __init__(self, hass: HomeAssistant, entry: ConfigEntry):
self.hass = hass
self.entry = entry
self.token = entry.data["token"]
self.id = entry.data["id"]
self.name = entry.data.get("name", "Uster Waste")
# ✅ CORRECT URL:
self.url = f"{BASE_URL}?token={self.token}&strassenabschnittId={self.id}"
super().__init__(hass, _LOGGER, name="uster_waste", update_interval=timedelta(days=1))
async def _async_update_data(self) -> dict:
"""Fetch the latest data from Uster."""
session = async_get_clientsession(self.hass)
try:
async with session.get(self.url, timeout=10) as resp:
if resp.status != 200:
raise UpdateFailed(
f"HTTP {resp.status} — check token & ID (token may be expired)."
"Get a fresh URL from https://www.uster.ch/abfallstrassenabschnitt"
)
html = await resp.text()
except aiohttp.ClientError as e:
raise UpdateFailed(f"Network error: {e}") from e
# Parse HTML
soup = BeautifulSoup(html, "html.parser")
# Find the table (robust: class="table table-striped" or any <table>)
table = soup.find("table", class_="table table-striped") or soup.find("table")
if not table:
raise UpdateFailed("No waste schedule table found on Uster.ch page.")
rows = table.find_all("tr")
if len(rows) < 2:
raise UpdateFailed("Table has no data rows — check page layout changed.")
now = datetime.now()
entries = []
for row in rows[1:4]: # First 3 data rows
cols = row.find_all("td")
if len(cols) < 2:
continue
collection = cols[0].get_text(strip=True)
date_str = cols[1].get_text(strip=True)
dt = parse_swiss_date(date_str)
if not dt:
_LOGGER.warning(f"Skipping row: invalid date '{date_str}'")
continue
entries.append({
"type": collection,
"date": date_str,
"days_until": (dt - now).days
})
if not entries:
raise UpdateFailed("No valid dates found in table.")
# Sort ascending by date
entries.sort(key=lambda x: x["date"])
return {
"next_collection": entries[0]["type"],
"date": entries[0]["date"],
"days_until": entries[0]["days_until"],
"entries": entries,
}
async def async_setup_entry(
hass: HomeAssistant,
entry: ConfigEntry,
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up the sensor."""
config = entry.data
token = config["token"]
waste_id = config["id"]
name = config.get("name", "Uster Waste")
entity = UsterWasteSensor(
entry_id=entry.entry_id,
token=token,
waste_id=waste_id,
name=name
)
async_add_entities([entity])
coordinator: UsterWasteDataUpdateCoordinator = hass.data[DOMAIN][entry.entry_id]
async_add_entities([UsterWasteSensor(entry, coordinator)], update_before_add=True)
class UsterWasteSensor(SensorEntity):
"""Uster Waste Sensor Entity."""
"""Uster Waste Collection Sensor."""
_attr_has_entity_name = True
_attr_name = None # Use coordinator name
_attr_icon = "mdi:recycle"
_attr_device_class = None
_attr_native_unit_of_measurement = "days" if "days_until" else None
def __init__(
self,
entry_id: str,
token: str,
waste_id: str,
name: str
entry: ConfigEntry,
coordinator: UsterWasteDataUpdateCoordinator,
):
self._entry_id = entry_id
self.token = token
self.waste_id = waste_id
self._attr_name = f"{name} Schedule"
self._attr_unique_id = f"uster_waste_{entry_id}"
self._attr_extra_state_attributes = {
ATTR_ENTRIES: [],
self.coordinator = coordinator
self._attr_unique_id = f"uster_waste_{entry.entry_id}"
self._attr_device_info = {
"name": coordinator.name,
"identifiers": {(DOMAIN, entry.entry_id)},
}
self.data = None
async def async_update(self):
"""Update sensor state."""
# Fetch fresh data
data = await self._fetch_data()
self.data = data
self._attr_native_value = len(data.get("entries", []))
self._attr_extra_state_attributes.update(
{
ATTR_NEXT_COLLECTION: data.get("next_collection"),
ATTR_DATE: data.get("date"),
ATTR_TYPE: data.get("type"),
ATTR_DAYS_UNTIL: data.get("days_until"),
ATTR_ENTRIES: data.get("entries", []),
ATTR_ERROR: data.get("error")
}
)
async def _fetch_data(self) -> dict:
"""Fetch data from Uster website."""
try:
session = async_get_clientsession(self.hass)
url = (
"https://www.uster.ch/abfallstrassenabschnitt"
f"?strassenabschnitt%5B_token%5D={self.token}"
f"&strassenabschnitt%5BstrassenabschnittId%5D={self.waste_id}"
)
async with session.get(url, timeout=10) as response:
if response.status == 403 or response.status == 404:
raise Exception(
"Token expired or invalid. "
"Please get a fresh URL from https://www.uster.ch/abfallstrassenabschnitt"
)
response.raise_for_status()
html = await response.text()
# Parse HTML
soup = BeautifulSoup(html, "html.parser")
table = soup.find("table", class_="table table-striped")
if not table:
table = soup.find("table")
if not table:
raise ValueError("No table found on page.")
rows = table.find_all("tr")
if len(rows) < 2:
raise ValueError("Table has no data rows.")
entries = []
now = datetime.now()
for row in rows[1:4]: # Next 3 entries
cols = row.find_all("td")
if len(cols) < 2:
continue
collection_type = cols[0].get_text(strip=True)
date_str = cols[1].get_text(strip=True).replace("  ", " ") # Clean no-break space
dt = _parse_date(date_str)
if not dt:
_LOGGER.warning(f"Skipping row with invalid date: {date_str}")
continue
entries.append({
"Sammlung": collection_type,
"Wann?": date_str,
"date_obj": dt,
"days_until": (dt - now).days
})
# Sort by date (ascending)
entries.sort(key=lambda x: x["date_obj"])
return {
"next_collection": entries[0]["Sammlung"] if entries else None,
"date": entries[0]["Wann?"] if entries else None,
"type": entries[0]["Sammlung"] if entries else None,
"days_until": entries[0]["days_until"] if entries else None,
"entries": [
{
"type": e["Sammlung"],
"date": e["Wann?"],
"days_until": e["days_until"]
}
for e in entries[:3]
]
}
except Exception as e:
_LOGGER.error("Error fetching Uster data: %s", e)
return {
ATTR_ERROR: str(e),
"next_collection": None,
"date": None,
"entries": []
}
async def async_added_to_hass(self):
"""When entity is added to hass."""
await super().async_added_to_hass()
# Force first update
await self.async_update()
@property
def available(self) -> bool:
"""Return if entity is available."""
return self.data is not None
def native_value(self) -> Optional[str]:
"""Return next collection type."""
return self.coordinator.data.get("next_collection")
async def async_press(self):
"""Handle the button press (manual refresh)."""
await self.async_update()
self.async_write_ha_state()
@property
def extra_state_attributes(self) -> dict:
"""Return extended attributes."""
data = self.coordinator.data or {}
return {
"date": data.get("date"),
"days_until": data.get("days_until"),
"entries": data.get("entries", []),
"url": self.coordinator.url, # For debugging
}
async def async_update(self) -> None:
"""Manually trigger update (used by refresh service)."""
await self.coordinator.async_request_refresh()