initial working env

This commit is contained in:
Arian Nasr
2025-12-11 03:30:35 -05:00
commit 25469a54b3
11 changed files with 799 additions and 0 deletions

5
.gitignore vendored Normal file
View File

@@ -0,0 +1,5 @@
.idea/
.env
.venv/
usage_data.db
__pycache__/

20
db_connector.py Normal file
View File

@@ -0,0 +1,20 @@
import sqlite3
from schemas import DatabaseRecord
def connect_db(db_name):
return sqlite3.connect(db_name)
def initialize_database(conn):
cursor = conn.cursor()
cursor.execute("CREATE TABLE IF NOT EXISTS usage_data (timestamp TEXT PRIMARY KEY, value_kwh REAL, cost REAL, tou INTEGER)")
conn.commit()
def insert_usage_data(conn, record: DatabaseRecord):
cursor = conn.cursor()
cursor.execute(
"INSERT OR REPLACE INTO usage_data (timestamp, value_kwh, cost, tou) VALUES (?, ?, ?, ?)",
(record.timestamp.isoformat(), record.value_kwh, record.cost, record.tou)
)
conn.commit()

66
download_xml.py Normal file
View File

@@ -0,0 +1,66 @@
from pathlib import Path
from playwright.async_api import async_playwright
from schemas import DownloadParameters
async def download_xml_files(params: DownloadParameters) -> Path:
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
page = await browser.new_page()
await page.goto("https://alectrautilitiesgbportal.savagedata.com")
await page.wait_for_load_state("networkidle")
account_name = page.locator("""xpath=//*[@id="account-name"]""")
await account_name.wait_for()
await account_name.click()
await account_name.fill(params.account_name)
account_number = page.locator("""xpath=//*[@id="idAccountNumber"]""")
await account_number.wait_for()
await account_number.click()
await account_number.fill(params.account_number)
await page.keyboard.press("Tab")
await page.keyboard.type(params.account_phone)
try:
# check if it was filled correctly
value = await account_name.input_value()
if value != params.account_name:
raise ValueError("Account name input failed")
except Exception as e:
print(f"Error locating or filling account name: {e}")
print("Retrying to fill account name...")
await account_name.fill(params.account_name)
submit_button = page.locator("""xpath=//*[@class="btn btn-primary btn-block"]""")
await submit_button.wait_for()
await submit_button.click()
download_page_button = page.locator("""xpath=//*[@href="DownloadMyData"]""")
await download_page_button.wait_for()
await download_page_button.click()
start_date_picker = page.locator("""xpath=//*[@id="start"]""")
await start_date_picker.wait_for()
await start_date_picker.fill(params.start_date.strftime("%m-%d-%Y"))
end_date_picker = page.locator("""xpath=//*[@id="end"]""")
await end_date_picker.wait_for()
await end_date_picker.fill(params.end_date.strftime("%m-%d-%Y"))
await page.keyboard.press("Tab")
await page.keyboard.press("Space")
download_button = page.locator("""xpath=//*[@class="btn"]""")
await download_button.wait_for()
# Start waiting for the download
async with page.expect_download() as download_info:
# Perform the action that initiates download
await download_button.click()
download = await download_info.value
download_path = params.output_dir / download.suggested_filename
await download.save_as(download_path)
await browser.close()
return download_path

0
greenbutton/__init__.py Normal file
View File

248
greenbutton/enums.py Normal file
View File

@@ -0,0 +1,248 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
from enum import Enum
class AccumulationBehaviourType(Enum):
notApplicable = 0
bulkQuantity = 1
cumulative = 3
deltaData = 4
indicating = 6
summation = 9
instantaneous = 12
class CommodityType(Enum):
notApplicable = 0
electricity = 1
air = 4
naturalGas = 7
propane = 8
potableWater = 9
steam = 10
wastewater = 11
heatingFluid = 12
coolingFluid = 13
class ConsumptionTierType(Enum):
notApplicable = 0
blockTier1 = 1
blockTier2 = 2
blockTier3 = 3
blockTier4 = 4
blockTier5 = 5
blockTier6 = 6
blockTier7 = 7
blockTier8 = 8
blockTier9 = 9
blockTier10 = 10
blockTier11 = 11
blockTier12 = 12
blockTier13 = 13
blockTier14 = 14
blockTier15 = 15
blockTier16 = 16
class CurrencyCode(Enum):
na = 0
aus = 36
cad = 124
usd = 840
eur = 978
@property
def symbol(self):
if self in [CurrencyCode.aus, CurrencyCode.cad, CurrencyCode.usd]:
return '$'
elif self is CurrencyCode.eur:
return ''
else:
return '¤'
@property
def uom_id(self):
if self in CURRENCY_UOM_IDS:
return CURRENCY_UOM_IDS[self]
else:
return None
class DataQualifierType(Enum):
notApplicable = 0
average = 2
maximum = 8
minimum = 9
normal = 12
class FlowDirectionType(Enum):
notApplicable = 0
forward = 1
reverse = 19
class KindType(Enum):
notApplicable = 0
currency = 3
current = 4
currentAngle = 5
date = 7
demand = 8
energy = 12
frequency = 15
power = 37
powerFactor = 38
quantityPower = 40
voltage = 54
voltageAngle = 55
distortionPowerFactor = 64
volumetricFlow = 155
class PhaseCode(Enum):
notApplicable = 0
c = 32
ca = 40
b = 64
bc = 66
a = 128
an = 129
ab = 132
abc = 224
s2 = 256
s2n = 257
s1 = 512
s1n = 513
s1s2 = 768
s1s2n = 769
class QualityOfReading(Enum):
valid = 0
manuallyEdited = 7
estimatedUsingReferenceDay = 8
estimatedUsingLinearInterpolation = 9
questionable = 10
derived = 11
projected = 12
mixed = 13
raw = 14
normalizedForWeather = 15
other = 16
validated = 17
verified = 18
class ServiceKind(Enum):
electricity = 0
naturalGas = 1
water = 2
pressure = 4
heat = 5
cold = 6
communication = 7
time = 8
class TimeAttributeType(Enum):
notApplicable = 0
tenMinutes = 1
fifteenMinutes = 2
twentyFourHours = 4
thirtyMinutes = 5
sixtyMinutes = 7
daily = 11
monthly = 13
present = 15
previous = 16
weekly = 24
forTheSpecifiedPeriod = 32
daily30MinuteFixedBlock = 79
class UomType(Enum):
notApplicable = 0
amps = 5
volts = 29
joules = 31
hertz = 33
watts = 38
cubicMeters = 42
voltAmps = 61
voltAmpsReactive = 63
cosine = 65
voltsSquared = 67
ampsSquared = 69
voltAmpHours = 71
wattHours = 72
voltAmpReactiveHours = 73
ampHours = 106
cubicFeet = 119
cubicFeetPerHour = 122
cubicMetersPerHour = 125
usGallons = 128
usGallonsPerHour = 129
imperialGallons = 130
imperialGallonsPerHour = 131
britishThermalUnits = 132
britishThermalUnitsPerHour = 133
liters = 134
litersPerHour = 137
gaugePascals = 140
absolutePascals = 155
therms = 169
UOM_SYMBOLS = {
UomType.notApplicable: '',
UomType.amps: 'A',
UomType.volts: 'V',
UomType.joules: 'J',
UomType.hertz: 'Hz',
UomType.watts: 'W',
UomType.cubicMeters: '',
UomType.voltAmps: 'VA',
UomType.voltAmpsReactive: 'VAr',
UomType.cosine: 'cos',
UomType.voltsSquared: '',
UomType.ampsSquared: '',
UomType.voltAmpHours: 'VAh',
UomType.wattHours: 'Wh',
UomType.voltAmpReactiveHours: 'VArh',
UomType.ampHours: 'Ah',
UomType.cubicFeet: 'ft³',
UomType.cubicFeetPerHour: 'ft³/h',
UomType.cubicMetersPerHour: 'm³/h',
UomType.usGallons: 'US gal',
UomType.usGallonsPerHour: 'US gal/h',
UomType.imperialGallons: 'IMP gal',
UomType.imperialGallonsPerHour: 'IMP gal/h',
UomType.britishThermalUnits: 'BTU',
UomType.britishThermalUnitsPerHour: 'BTU/h',
UomType.liters: 'L',
UomType.litersPerHour: 'L/h',
UomType.gaugePascals: 'Pag',
UomType.absolutePascals: 'Pa',
UomType.therms: 'thm',
}
UOM_IDS = {
UomType.amps: 'electric current_A',
UomType.volts: 'electric potential_V',
UomType.joules: 'energy_J',
UomType.hertz: 'frequency_Hz',
UomType.watts: 'power_W',
UomType.cubicMeters: 'energy_m**3_gas',
UomType.voltAmps: 'apparent power_VA',
UomType.voltAmpsReactive: 'reactive power_VAR',
UomType.voltAmpHours: 'apparent energy_VAh',
UomType.wattHours: 'energy_Wh',
UomType.voltAmpReactiveHours: 'reactive energy_VARh',
UomType.ampHours: 'electric charge_Ah',
UomType.cubicFeet: 'energy_ft**3_gas',
UomType.usGallons: 'volume_gal',
UomType.britishThermalUnits: 'energy_BTU',
UomType.britishThermalUnitsPerHour: 'power_BTU-h',
UomType.liters: 'volume_L',
UomType.litersPerHour: 'volumetric flow_L-h',
UomType.absolutePascals: 'pressure_Pa',
UomType.therms: 'energy_therm',
}
CURRENCY_UOM_IDS = {
CurrencyCode.aus: 'currency_AUD',
CurrencyCode.cad: 'currency_CAD',
CurrencyCode.usd: 'currency_USD',
CurrencyCode.eur: 'currency_EUR'
}

112
greenbutton/objects.py Normal file
View File

@@ -0,0 +1,112 @@
#!/usr/bin/python
import datetime
import functools
import pytz
from .enums import *
from .utils import *
@functools.total_ordering
class DateTimeInterval:
def __init__(self, entity):
self.duration = getEntity(entity, 'espi:duration',
lambda e: datetime.timedelta(seconds=int(e.text)))
self.start = getEntity(entity, 'espi:start',
lambda e: datetime.datetime.fromtimestamp(int(e.text), pytz.timezone("UTC")))
def __repr__(self):
return '<DateTimeInterval (%s, %s)>' % (self.start, self.duration)
def __eq__(self, other):
if not isinstance(other, DateTimeInterval):
return False
return (self.start, self.duration) == (other.start, other.duration)
def __lt__(self, other):
if not isinstance(other, DateTimeInterval):
return False
return (self.start, self.duration) < (other.start, other.duration)
@functools.total_ordering
class IntervalReading:
def __init__(self, entity, parent):
self.intervalBlock = parent
self.cost = getEntity(entity, 'espi:cost', lambda e: int(e.text) / 100000.0)
self.timePeriod = getEntity(entity, 'espi:timePeriod',
lambda e: DateTimeInterval(e))
self._value = getEntity(entity, 'espi:value', lambda e: int(e.text))
self.tou = getEntity(entity, 'espi:tou', lambda e: int(e.text))
self.readingQualities = set([ReadingQuality(rq, self) for rq in entity.findall('espi:ReadingQuality', ns)])
def __repr__(self):
return '<IntervalReading (%s, %s: %s %s)>' % (self.timePeriod.start, self.timePeriod.duration, self.value, self.value_symbol)
def __eq__(self, other):
if not isinstance(other, IntervalReading):
return False
return (self.timePeriod, self.value) == (other.timePeriod, other.value)
def __lt__(self, other):
if not isinstance(other, IntervalReading):
return False
return (self.timePeriod, self.value) < (other.timePeriod, other.value)
@property
def value(self):
if self.intervalBlock is not None and \
self.intervalBlock.meterReading is not None and \
self.intervalBlock.meterReading.readingType is not None and \
self.intervalBlock.meterReading.readingType.powerOfTenMultiplier is not None:
multiplier = 10 ** self.intervalBlock.meterReading.readingType.powerOfTenMultiplier
else:
multiplier = 1
return self._value * multiplier
@property
def cost_units(self):
if self.intervalBlock is not None and \
self.intervalBlock.meterReading is not None and \
self.intervalBlock.meterReading.readingType is not None and \
self.intervalBlock.meterReading.readingType.currency is not None:
return self.intervalBlock.meterReading.readingType.currency
else:
return CurrencyCode.na
@property
def cost_symbol(self):
return self.cost_units.symbol
@property
def cost_uom_id(self):
return self.cost_units.uom_id
@property
def value_units(self):
if self.intervalBlock is not None and \
self.intervalBlock.meterReading is not None and \
self.intervalBlock.meterReading.readingType is not None and \
self.intervalBlock.meterReading.readingType.uom is not None:
return self.intervalBlock.meterReading.readingType.uom
else:
return UomType.notApplicable
@property
def value_symbol(self):
return UOM_SYMBOLS[self.value_units]
@property
def value_uom_id(self):
if self.value_units in UOM_IDS:
return UOM_IDS[self.value_units]
else:
return None
class ReadingQuality:
def __init__(self, entity, parent):
self.intervalReading = parent
self.quality = getEntity(entity, 'espi:quality', lambda e: QualityOfReading(int(e.text)))

48
greenbutton/parse.py Executable file
View File

@@ -0,0 +1,48 @@
#!/usr/bin/python
import sys
import xml.etree.ElementTree as ET
from .resources import *
def parse_feed(filename):
tree = ET.parse(filename)
usagePoints = []
for entry in tree.getroot().findall('atom:entry/atom:content/espi:UsagePoint/../..', ns):
up = UsagePoint(entry)
usagePoints.append(up)
meterReadings = []
for entry in tree.getroot().findall('atom:entry/atom:content/espi:MeterReading/../..', ns):
mr = MeterReading(entry, usagePoints=usagePoints)
meterReadings.append(mr)
for entry in tree.getroot().findall('atom:entry/atom:content/espi:LocalTimeParameters/../..', ns):
ltp = LocalTimeParameters(entry, usagePoints=usagePoints)
readingTypes = []
for entry in tree.getroot().findall('atom:entry/atom:content/espi:ReadingType/../..', ns):
rt = ReadingType(entry, meterReadings=meterReadings)
readingTypes.append(rt)
intervalBlocks = []
for entry in tree.getroot().findall('atom:entry/atom:content/espi:IntervalBlock/../..', ns):
ib = IntervalBlock(entry, meterReadings=meterReadings)
intervalBlocks.append(ib)
return usagePoints
if __name__ == '__main__':
ups = parse_feed(sys.argv[1])
for up in ups:
print('UsagePoint (%s) %s %s:' % (up.title, up.serviceCategory.name, up.status))
for mr in up.meterReadings:
print(' Meter Reading (%s) %s:' % (mr.title, mr.readingType.uom.name))
for ir in mr.intervalReadings:
print(' %s, %s: %s %s' % (ir.timePeriod.start, ir.timePeriod.duration, ir.value, ir.value_symbol), end = '')
if ir.cost is not None:
print(' (%s%s)' % (ir.cost_symbol, ir.cost))
if len(ir.readingQualities) > 0:
print('[%s]' % ', '.join([rq.quality.name for rq in ir.readingQualities]))
print

154
greenbutton/resources.py Normal file
View File

@@ -0,0 +1,154 @@
#!/usr/bin/python
import bisect
import functools
from .utils import *
from .enums import *
from .objects import *
class Resource(object):
def __init__(self, entry):
self.link_self = getLink(entry, 'self')
self.link_up = getLink(entry, 'up')
self.link_related = getLink(entry, 'related', True)
self.title = getEntity(entry, 'atom:title', lambda e: e.text)
def __repr__(self):
return '<%s (%s)>' % (self.__class__.__name__, self.title or self.link_self)
def isParentOf(self, other):
return other.link_self in self.link_related or other.link_up in self.link_related
class UsagePoint(Resource):
def __init__(self, entry, meterReadings=[]):
super(UsagePoint, self).__init__(entry)
obj = entry.find('./atom:content/espi:UsagePoint', ns)
self.roleFlags = getEntity(obj, 'espi:roleFlags', lambda e: int(e.text, 16))
self.status = getEntity(obj, 'espi:status', lambda e: int(e.text))
self.serviceCategory = getEntity(obj, './espi:ServiceCategory/espi:kind',
lambda e: ServiceKind(int(e.text)))
self.meterReadings = set()
for mr in meterReadings:
if self.isParentOf(mr):
self.addMeterReading(mr)
self.localTimeParameters = None
def addMeterReading(self, meterReading):
assert self.isParentOf(meterReading)
self.meterReadings.add(meterReading)
meterReading.usagePoint = self
def addLocalTimeParameters(self, localTimeParameters):
assert self.isParentOf(localTimeParameters)
self.localTimeParameters = localTimeParameters
class MeterReading(Resource):
def __init__(self, entry, usagePoints=[], readingTypes=[], intervalBlocks=[]):
super(MeterReading, self).__init__(entry)
self.usagePoint = None
self.readingType = None
self.intervalBlocks = []
for up in usagePoints:
if up.isParentOf(self):
up.addMeterReading(self)
for rt in readingTypes:
if self.isParentOf(rt):
self.setReadingType(rt)
for ib in intervalBlocks:
if self.isParentOf(ib):
self.addIntervalBlock(r)
@property
def intervalReadings(self):
for ib in self.intervalBlocks:
for ir in ib.intervalReadings:
yield ir
def setReadingType(self, readingType):
assert self.isParentOf(readingType)
assert self.readingType is None or self.readingType.link_self == readingType.link_self
self.readingType = readingType
readingType.meterReading = self
def addIntervalBlock(self, intervalBlock):
assert self.isParentOf(intervalBlock)
bisect.insort(self.intervalBlocks, intervalBlock)
intervalBlock.meterReading = self
class ReadingType(Resource):
def __init__(self, entry, meterReadings=[]):
super(ReadingType, self).__init__(entry)
self.meterReading = None
obj = entry.find('./atom:content/espi:ReadingType', ns)
self.accumulationBehaviour = getEntity(obj, 'espi:accumulationBehaviour',
lambda e: AccumulationBehaviourType(int(e.text)))
self.commodity = getEntity(obj, 'espi:commodity',
lambda e: CommodityType(int(e.text)))
self.consumptionTier = getEntity(obj, 'espi:consumptionTier',
lambda e: ConsumptionTierType(int(e.text)))
self.currency = getEntity(obj, 'espi:currency',
lambda e: CurrencyCode(int(e.text)))
self.dataQualifier = getEntity(obj, 'espi:dataQualifier',
lambda e: DataQualifierType(int(e.text)))
self.defaultQuality = getEntity(obj, 'espi:defaultQuality',
lambda e: QualityOfReading(int(e.text)))
self.flowDirection = getEntity(obj, 'espi:flowDirection',
lambda e: FlowDirectionType(int(e.text)))
self.intervalLength = getEntity(obj, 'espi:intervalLength', lambda e: int(e.text))
self.kind = getEntity(obj, 'espi:kind', lambda e: KindType(int(e.text)))
self.phase = getEntity(obj, 'espi:phase', lambda e: PhaseCode(int(e.text)))
self.powerOfTenMultiplier = getEntity(obj, 'espi:powerOfTenMultiplier',
lambda e: int(e.text))
self.timeAttribute = getEntity(obj, 'espi:timeAttribute',
lambda e: TimeAttributeType(int(e.text)))
self.tou = getEntity(obj, 'espi:tou', lambda e: TOUType(int(e.text)))
self.uom = getEntity(obj, 'espi:uom', lambda e: UomType(int(e.text)))
for mr in meterReadings:
if mr.isParentOf(self):
mr.setReadingType(self)
class LocalTimeParameters(Resource):
def __init__(self, entry, usagePoints=[]):
super(LocalTimeParameters, self).__init__(entry)
self.localTimeParameters = None
obj = entry.find('./atom:content/espi:LocalTimeParameters', ns)
self.tzOffset = getEntity(obj, 'espi:tzOffset', lambda e: int(e.text))
for up in usagePoints:
if up.isParentOf(self):
up.addLocalTimeParameters(self)
@functools.total_ordering
class IntervalBlock(Resource):
def __init__(self, entry, meterReadings=[]):
super(IntervalBlock, self).__init__(entry)
self.meterReading = None
obj = entry.find('./atom:content/espi:IntervalBlock', ns)
self.interval = getEntity(obj, 'espi:interval', lambda e: DateTimeInterval(e))
self.intervalReadings = sorted([IntervalReading(ir, self) for ir in obj.findall('espi:IntervalReading', ns)])
for mr in meterReadings:
if mr.isParentOf(self):
mr.addIntervalBlock(self)
def __eq__(self, other):
if not isinstance(other, IntervalBlock):
return False
return self.link_self == other.link_self
def __lt__(self, other):
return self.interval < other.interval

28
greenbutton/utils.py Normal file
View File

@@ -0,0 +1,28 @@
#!/usr/bin/python
ns = {'atom': 'http://www.w3.org/2005/Atom',
'espi': 'http://naesb.org/espi'}
def getEntity(source, target, accessor=None, multiple=False):
"""Extracts the named entity from the source XML tree. `accessor` is a
function of one argyment; if provided and the target entity is found, the
target will be passed into `accessor` and its result will be returned. If
`multiple` is true, the result will be all entities that match (i.e. the
function will use `finall` instead of `find`)."""
if multiple:
es = source.findall(target, ns)
if accessor:
return [ accessor(e) for e in es ]
else:
return es
else:
e = source.find(target, ns)
if e is not None and accessor is not None:
return accessor(e)
else:
return e
def getLink(source, relation, multiple=False):
"""Shorthand for pulling a link with the given "rel" attribute from the source."""
return getEntity(source, './atom:link[@rel="%s"]' % relation, lambda e: e.attrib['href'], multiple)

101
main.py Normal file
View File

@@ -0,0 +1,101 @@
import shutil
from dotenv import load_dotenv
import os
from time import sleep
from datetime import datetime, timedelta
from zoneinfo import ZoneInfo
from pathlib import Path
from greenbutton import parse
from db_connector import insert_usage_data
from sqlite3 import Connection
from schemas import DatabaseRecord, DownloadParameters
import asyncio
load_dotenv()
account_name = os.getenv("ALECTRA_ACCOUNT_NAME")
account_number = os.getenv("ALECTRA_ACCOUNT_NUMBER")
account_phone = os.getenv("ALECTRA_ACCOUNT_PHONE")
db_path = os.getenv("USAGE_DB_PATH", "./usage_data.db")
xml_download_dir = os.getenv("XML_DOWNLOAD_DIR", "./downloads")
def calculate_dates_for_retrieval(bill_start_date: str, bill_end_date: str) -> tuple[datetime, datetime, datetime, datetime]:
def round_date_up(dt):
if dt.hour == 0 and dt.minute == 0 and dt.second == 0:
return dt
else:
return (dt + timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0)
def round_date_down(dt):
return dt.replace(hour=0, minute=0, second=0, microsecond=0)
utc_bill_start_date = datetime.strptime(bill_start_date, '%m-%d-%Y').replace(tzinfo=ZoneInfo('America/Toronto')).astimezone(ZoneInfo('UTC'))
utc_bill_end_date = datetime.strptime(bill_end_date, '%m-%d-%Y').replace(tzinfo=ZoneInfo('America/Toronto')).astimezone(ZoneInfo('UTC'))
utc_retrieval_start_date = round_date_down(utc_bill_start_date)
utc_retrieval_end_date = round_date_up(utc_bill_end_date)
return utc_retrieval_start_date, utc_retrieval_end_date, utc_bill_start_date, utc_bill_end_date
def process_xml_file(conn: Connection, xml_file: Path):
def get_correct_meter_reading(meter_readings):
for meterReading in meter_readings:
if meterReading.readingType and meterReading.readingType.title == 'KWH Interval Data':
return meterReading
raise Exception("KWH Interval Data meter reading not found")
usagePoint = parse.parse_feed(xml_file)[0]
meterReadings = usagePoint.meterReadings
meterReading = get_correct_meter_reading(meterReadings)
intervalBlocks = meterReading.intervalBlocks
for intervalBlock in intervalBlocks:
for intervalReading in intervalBlock.intervalReadings:
timestamp = intervalReading.timePeriod.start
value_kwh = intervalReading.value
cost = intervalReading.cost
tou = intervalReading.tou
record = DatabaseRecord(
timestamp=timestamp,
value_kwh=value_kwh,
cost=cost,
tou=tou
)
insert_usage_data(conn, record)
def get_dates_last_2_weeks() -> tuple[datetime, datetime]:
now = datetime.now(tz=ZoneInfo('America/Toronto'))
two_weeks_ago = now - timedelta(weeks=2)
return two_weeks_ago, now
if __name__ == "__main__":
while True:
Path(xml_download_dir).mkdir(parents=True, exist_ok=True)
from download_xml import download_xml_files
from db_connector import connect_db, initialize_database
start_date, end_date = get_dates_last_2_weeks()
download_params = DownloadParameters(
start_date=start_date,
end_date=end_date,
output_dir=Path(xml_download_dir),
account_name=account_name,
account_number=account_number,
account_phone=account_phone
)
xml_file_path = asyncio.run(download_xml_files(download_params))
conn = connect_db(db_path)
initialize_database(conn)
process_xml_file(conn, xml_file_path)
conn.close()
shutil.rmtree(xml_download_dir)
print(f"Processed data from {start_date} to {end_date}. Waiting for next cycle...")
sleep(4 * 60 * 60)

17
schemas.py Normal file
View File

@@ -0,0 +1,17 @@
from pydantic import BaseModel, DirectoryPath
from datetime import datetime
class DownloadParameters(BaseModel):
start_date: datetime
end_date: datetime
output_dir: DirectoryPath
account_name: str
account_number: str
account_phone: str
class DatabaseRecord(BaseModel):
timestamp: datetime
value_kwh: float
cost: float
tou: int