✨ use toml to parse the recent_run file
This commit is contained in:
parent
0448192244
commit
61756b4054
5 changed files with 58 additions and 56 deletions
|
@ -1,15 +1,19 @@
|
|||
import datetime
|
||||
import json
|
||||
import logging
|
||||
import tomllib
|
||||
from collections import deque
|
||||
from datetime import datetime as dt, timezone
|
||||
from pathlib import Path
|
||||
|
||||
import pytz
|
||||
import tomli_w
|
||||
from civifang import api
|
||||
from httpx import post
|
||||
from ms_active_directory import ADUser, ADGroup
|
||||
|
||||
from .enums import Priority
|
||||
from .exceptions import ScriptAlreadyRunningError
|
||||
|
||||
logger = logging.getLogger(__package__)
|
||||
|
||||
|
@ -28,7 +32,8 @@ class RecentRun:
|
|||
self._datetime = None
|
||||
self._timezone = tz
|
||||
self._file_path = file_path
|
||||
self._is_running = False
|
||||
self._is_running = None
|
||||
self._already_running = None
|
||||
self._started_at = None
|
||||
|
||||
# Create the file if it does not exist
|
||||
|
@ -36,6 +41,10 @@ class RecentRun:
|
|||
|
||||
self._read_data_from_file()
|
||||
|
||||
# If the script was already running, throw an exception
|
||||
if self._already_running:
|
||||
raise ScriptAlreadyRunningError('The script is already running.')
|
||||
|
||||
def _sync_file(
|
||||
self,
|
||||
recent_run: dt | None = None,
|
||||
|
@ -47,69 +56,38 @@ class RecentRun:
|
|||
:param is_running:
|
||||
:return:
|
||||
"""
|
||||
# Convert the is_running boolean to a string
|
||||
is_running = 'true' if is_running else 'false' \
|
||||
if is_running is not None else None
|
||||
rr = recent_run if recent_run else self._datetime
|
||||
is_running = is_running if is_running is not None else self._already_running
|
||||
new_data = {
|
||||
'recent-run': rr,
|
||||
'is-running': is_running,
|
||||
}
|
||||
|
||||
# Read the file and update the values if they are different
|
||||
with open(self._file_path, 'r+') as f:
|
||||
# Read the data from the file
|
||||
data = f.readlines()
|
||||
old_recent_run, old_is_running = self._read_data(data)
|
||||
|
||||
# Update the values if they were provided
|
||||
timestamp = recent_run.timestamp() if recent_run else old_recent_run
|
||||
is_running = is_running or old_is_running
|
||||
new_data = [
|
||||
f"recent-run:{timestamp}",
|
||||
'\n',
|
||||
f"is-running:{is_running}",
|
||||
]
|
||||
|
||||
# Write the new data to the file
|
||||
f.seek(0)
|
||||
f.truncate()
|
||||
f.writelines(new_data)
|
||||
|
||||
@staticmethod
|
||||
def _read_data(data: list):
|
||||
"""
|
||||
Read data
|
||||
:param data:
|
||||
:return: Tuple of recent_run and is_running ('true'/'false')
|
||||
"""
|
||||
time = None
|
||||
is_running = None
|
||||
for line in data:
|
||||
line = line.strip()
|
||||
if line.startswith('recent-run:'):
|
||||
time = line.split(':', 1)[1].strip()
|
||||
elif line.startswith('is-running:'):
|
||||
is_running = line.split(':', 1)[1].strip()
|
||||
|
||||
return float(time), is_running
|
||||
# Write the data to the file
|
||||
with open(self._file_path, 'wb') as f:
|
||||
tomli_w.dump(new_data, f)
|
||||
|
||||
def _read_data_from_file(self):
|
||||
"""
|
||||
Read the recent run time from the file
|
||||
:return:
|
||||
"""
|
||||
with open(self._file_path, 'r') as f:
|
||||
data = f.readlines()
|
||||
recent_run, is_running = self._read_data(data)
|
||||
|
||||
# Read running status
|
||||
self._is_running = is_running == 'true'
|
||||
with open(self._file_path, 'rb') as f:
|
||||
data = tomllib.load(f)
|
||||
self._already_running = data.get('is-running', False)
|
||||
recent_run = data.get('recent-run')
|
||||
|
||||
# Set the datetime to the recent run time
|
||||
if not recent_run:
|
||||
return
|
||||
try:
|
||||
self._datetime = dt.fromtimestamp(float(recent_run)) \
|
||||
if isinstance(recent_run, datetime.datetime):
|
||||
self._datetime = recent_run
|
||||
elif isinstance(recent_run, float):
|
||||
self._datetime = dt.fromtimestamp(recent_run) \
|
||||
.astimezone(self._timezone)
|
||||
except ValueError as e:
|
||||
else:
|
||||
raise ValueError(
|
||||
f"Invalid timestamp '{recent_run}' in {self._file_path}: {e}")
|
||||
f"Invalid recent_run '{recent_run}' in {self._file_path}.")
|
||||
|
||||
@property
|
||||
def datetime(self) -> dt | None:
|
||||
|
@ -177,14 +155,14 @@ class RecentRun:
|
|||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
datetime = None
|
||||
recent_run = None
|
||||
self._is_running = False
|
||||
|
||||
# If an exception occurred, do not update the recent run timestamp
|
||||
# If no exception occurred, set the recent run time to the current time
|
||||
if exc_type is None:
|
||||
self.datetime = datetime = self._started_at
|
||||
self.datetime = recent_run = self._started_at
|
||||
|
||||
self._sync_file(datetime, is_running=self._is_running)
|
||||
self._sync_file(recent_run=recent_run, is_running=self._is_running)
|
||||
|
||||
def __gt__(self, other: dt | str | float):
|
||||
return self.datetime > self._to_datetime(other)
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue