init. working relay
This commit is contained in:
@@ -0,0 +1,2 @@
|
||||
venv/
|
||||
__pycache__/
|
||||
@@ -0,0 +1,105 @@
|
||||
import amqp
|
||||
import ssl
|
||||
import sys
|
||||
import logging
|
||||
import requests
|
||||
from weather_utility import parse_weather_xml, publish_weather
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s - %(levelname)s - %(message)s',
|
||||
datefmt='%Y-%m-%d %H:%M:%S'
|
||||
)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Server Configuration
|
||||
HOST = "dd.weather.gc.ca"
|
||||
PORT = 5671
|
||||
USER = "anonymous"
|
||||
PASSWORD = "anonymous"
|
||||
EXCHANGE = "xpublic"
|
||||
|
||||
QUEUE_NAME = "q_anonymous.subscribe.citypage.companyis2ari.ca"
|
||||
SUBTOPIC = "#.WXO-DD.citypage_weather.ON.#"
|
||||
|
||||
def on_message(message):
|
||||
routing_key = message.delivery_info.get('routing_key')
|
||||
body = message.body.decode('utf-8') if isinstance(message.body, bytes) else message.body
|
||||
|
||||
# Split the body by whitespace
|
||||
# parts[0] = Timestamp
|
||||
# parts[1] = Base URL (https://dd1.weather.gc.ca/)
|
||||
# parts[2] = Relative Path
|
||||
parts = body.split()
|
||||
|
||||
clean_url = f"{parts[1].strip()}{parts[2].strip()}"
|
||||
|
||||
if clean_url.endswith("s0000458_en.xml"):
|
||||
logger.debug(f"New Toronto weather update available at: {clean_url}")
|
||||
try:
|
||||
response = requests.get(clean_url)
|
||||
response.raise_for_status()
|
||||
weather_data = parse_weather_xml(response.content)
|
||||
publish_weather(weather_data)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to fetch or process weather data: {e}")
|
||||
elif clean_url.endswith("s0000305_en.xml"):
|
||||
logger.debug(f"New Kaladar weather update available at: {clean_url}")
|
||||
try:
|
||||
response = requests.get(clean_url)
|
||||
response.raise_for_status()
|
||||
weather_data = parse_weather_xml(response.content)
|
||||
publish_weather(weather_data)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to fetch or process weather data: {e}")
|
||||
else:
|
||||
logger.debug(f"Ignored URL (not Toronto weather): {clean_url}")
|
||||
|
||||
|
||||
message.channel.basic_ack(message.delivery_info['delivery_tag'])
|
||||
|
||||
def main():
|
||||
ssl_context = ssl.create_default_context()
|
||||
|
||||
try:
|
||||
logger.debug(f"Connecting to {HOST}:{PORT} as {USER}")
|
||||
conn = amqp.Connection(
|
||||
host=f"{HOST}:{PORT}",
|
||||
userid=USER,
|
||||
password=PASSWORD,
|
||||
ssl=ssl_context,
|
||||
virtual_host="/"
|
||||
)
|
||||
conn.connect()
|
||||
channel = conn.channel()
|
||||
|
||||
logger.debug(f"Declaring exclusive, auto-deleting queue: {QUEUE_NAME}")
|
||||
channel.queue_declare(
|
||||
queue=QUEUE_NAME,
|
||||
exclusive=True,
|
||||
auto_delete=True
|
||||
)
|
||||
|
||||
logger.debug(f"Binding queue to exchange '{EXCHANGE}' with routing key '{SUBTOPIC}'")
|
||||
channel.queue_bind(
|
||||
queue=QUEUE_NAME,
|
||||
exchange=EXCHANGE,
|
||||
routing_key=SUBTOPIC
|
||||
)
|
||||
|
||||
|
||||
channel.basic_consume(queue=QUEUE_NAME, callback=on_message)
|
||||
|
||||
logger.info("Waiting for Ontario weather updates...")
|
||||
while True:
|
||||
conn.drain_events()
|
||||
|
||||
except KeyboardInterrupt:
|
||||
logger.info(f"Stopping script...")
|
||||
sys.exit(0)
|
||||
except Exception as e:
|
||||
logger.error(f"An error occurred: {e}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
@@ -0,0 +1,170 @@
|
||||
import xml.etree.ElementTree as ET
|
||||
import json
|
||||
import logging
|
||||
import paho.mqtt.client as mqtt
|
||||
import re
|
||||
import io
|
||||
import os
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# --- Configuration ---
|
||||
BROKER = os.getenv("MQTT_BROKER", "192.168.2.17")
|
||||
PORT = int(os.getenv("MQTT_PORT", 1883))
|
||||
DISCOVERY_PREFIX = "homeassistant"
|
||||
BASE_TOPIC = "weather/station"
|
||||
|
||||
|
||||
def slugify(text):
|
||||
return re.sub(r'\W+', '_', text).lower() if text else "unknown_station"
|
||||
|
||||
|
||||
def parse_weather_xml(source) -> dict | None:
|
||||
try:
|
||||
tree = ET.parse(io.BytesIO(source))
|
||||
root = tree.getroot()
|
||||
current = root.find('currentConditions')
|
||||
if current is None: return None
|
||||
|
||||
wind_speed_str = current.findtext('wind/speed')
|
||||
if wind_speed_str == "calm":
|
||||
wind_speed_str = "0"
|
||||
|
||||
# Extract and cast data
|
||||
return {
|
||||
"station_name": current.findtext('station'),
|
||||
"condition": current.findtext('condition'),
|
||||
"temperature": float(current.findtext('temperature') or 0),
|
||||
"relative_humidity": float(current.findtext('relativeHumidity') or 0),
|
||||
"pressure": float(current.findtext('pressure') or 0),
|
||||
"wind_speed": float(wind_speed_str or 0),
|
||||
"wind_bearing": float(current.findtext('wind/bearing') or 0),
|
||||
}
|
||||
except Exception as e:
|
||||
logger.error(f"XML Parse Error: {e}")
|
||||
return None
|
||||
|
||||
|
||||
def setup_ha_discovery(client, data):
|
||||
station_id = slugify(data['station_name'])
|
||||
state_topic = f"{BASE_TOPIC}/{station_id}/state"
|
||||
|
||||
# --- SENSOR CONFIGURATION ---
|
||||
# device_class: Describes what the sensor measures (temperature, humidity, etc.)
|
||||
# state_class: 'measurement' allows HA to create long-term statistics/graphs
|
||||
sensors = [
|
||||
{
|
||||
"id": "cond",
|
||||
"name": "Condition",
|
||||
"class": None, # Generic text sensor
|
||||
"state_class": None, # Text cannot have a state class
|
||||
"unit": None,
|
||||
"key": "condition",
|
||||
"icon": "mdi:weather-partly-cloudy" # Default icon
|
||||
},
|
||||
{
|
||||
"id": "temp",
|
||||
"name": "Temperature",
|
||||
"class": "temperature",
|
||||
"state_class": "measurement",
|
||||
"unit": "°C",
|
||||
"key": "temperature"
|
||||
},
|
||||
{
|
||||
"id": "hum",
|
||||
"name": "Humidity",
|
||||
"class": "humidity",
|
||||
"state_class": "measurement",
|
||||
"unit": "%",
|
||||
"key": "relative_humidity"
|
||||
},
|
||||
{
|
||||
"id": "pres",
|
||||
"name": "Pressure",
|
||||
"class": "atmospheric_pressure",
|
||||
"state_class": "measurement",
|
||||
"unit": "kPa",
|
||||
"key": "pressure"
|
||||
},
|
||||
{
|
||||
"id": "wind_s",
|
||||
"name": "Wind Speed",
|
||||
"class": "wind_speed",
|
||||
"state_class": "measurement",
|
||||
"unit": "km/h",
|
||||
"key": "wind_speed"
|
||||
},
|
||||
{
|
||||
"id": "wind_b",
|
||||
"name": "Wind Bearing",
|
||||
"class": "wind_direction", # Correct HA class for bearing
|
||||
"state_class": "measurement",
|
||||
"unit": "°",
|
||||
"key": "wind_bearing"
|
||||
},
|
||||
]
|
||||
|
||||
device_info = {
|
||||
"identifiers": [f"weather_station_{station_id}"],
|
||||
"name": data['station_name'],
|
||||
"model": "XML Weather Station",
|
||||
"manufacturer": "Python Script"
|
||||
}
|
||||
|
||||
for s in sensors:
|
||||
discovery_topic = f"{DISCOVERY_PREFIX}/sensor/{station_id}/{s['id']}/config"
|
||||
|
||||
payload = {
|
||||
"name": f"{data['station_name']} {s['name']}",
|
||||
"unique_id": f"{station_id}_{s['id']}",
|
||||
"state_topic": state_topic,
|
||||
"value_template": f"{{{{ value_json.{s['key']} }}}}",
|
||||
"device": device_info
|
||||
}
|
||||
|
||||
# Apply Optional Configurations
|
||||
if s.get('class'):
|
||||
payload["device_class"] = s['class']
|
||||
|
||||
if s.get('state_class'):
|
||||
payload["state_class"] = s['state_class']
|
||||
|
||||
if s.get('unit'):
|
||||
payload["unit_of_measurement"] = s['unit']
|
||||
|
||||
if s.get('icon'):
|
||||
payload["icon"] = s['icon']
|
||||
|
||||
client.publish(discovery_topic, json.dumps(payload), retain=True)
|
||||
|
||||
return state_topic
|
||||
|
||||
|
||||
def publish_weather(data):
|
||||
if not data: return
|
||||
|
||||
client = mqtt.Client()
|
||||
try:
|
||||
client.connect(BROKER, PORT, 60)
|
||||
|
||||
# Start the network loop to handle message handshakes
|
||||
client.loop_start()
|
||||
|
||||
# This creates the entities in HA (Retained)
|
||||
state_topic = setup_ha_discovery(client, data)
|
||||
|
||||
# FIX 1: Add retain=True so HA sees the value even if it subscribes late
|
||||
# FIX 2: Capture the message info object to verify delivery
|
||||
msg_info = client.publish(state_topic, json.dumps(data), retain=True)
|
||||
|
||||
# FIX 3: Block execution until the message is actually sent to the broker
|
||||
msg_info.wait_for_publish()
|
||||
|
||||
logger.info(f"Published all fields for {data['station_name']} to {state_topic}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"MQTT Error: {e}")
|
||||
finally:
|
||||
# Stop the loop and disconnect cleanly
|
||||
client.loop_stop()
|
||||
client.disconnect()
|
||||
Reference in New Issue
Block a user