🔖 Bump version: 0.1.0 → 1.0.0

This commit is contained in:
Marc Koch 2025-03-20 15:32:30 +01:00
parent cbf68294ca
commit b7311d088d
Signed by untrusted user who does not match committer: marc.koch
GPG key ID: 12406554CFB028B9
21 changed files with 1993 additions and 223 deletions

548
src/adgroupsync/models.py Normal file
View file

@ -0,0 +1,548 @@
import json
import logging
from collections import deque
from datetime import datetime as dt, timezone
from pathlib import Path
import pytz
from civifang import api
from httpx import post
from ms_active_directory import ADUser, ADGroup
from .enums import Priority
logger = logging.getLogger(__package__)
class RecentRun:
"""
Class to manage the last run of the script
"""
def __init__(self, file_path: Path, tz: pytz = pytz.utc):
"""
Initialize the class
:param file_path: File path to store the recent run timestamp
:param tz: Timezone to use for the timestamp
"""
self._datetime = None
self._timezone = tz
self._file_path = file_path
self._is_running = False
self._started_at = None
# Create the file if it does not exist
self._file_path.touch(exist_ok=True)
self._read_data_from_file()
def _sync_file(
self,
recent_run: dt | None = None,
is_running: bool = False
):
"""
Write the recent run timestamp and running status to the file
:param recent_run:
:param is_running:
:return:
"""
# Convert the is_running boolean to a string
is_running = 'true' if is_running else 'false' \
if is_running is not None else None
# Read the file and update the values if they are different
with open(self._file_path, 'r+') as f:
# Read the data from the file
data = f.readlines()
old_recent_run, old_is_running = self._read_data(data)
# Update the values if they were provided
timestamp = recent_run.timestamp() if recent_run else old_recent_run
is_running = is_running or old_is_running
new_data = [
f"recent-run:{timestamp}",
'\n',
f"is-running:{is_running}",
]
# Write the new data to the file
f.seek(0)
f.truncate()
f.writelines(new_data)
@staticmethod
def _read_data(data: list):
"""
Read data
:param data:
:return: Tuple of recent_run and is_running ('true'/'false')
"""
time = None
is_running = None
for line in data:
line = line.strip()
if line.startswith('recent-run:'):
time = line.split(':', 1)[1].strip()
elif line.startswith('is-running:'):
is_running = line.split(':', 1)[1].strip()
return float(time), is_running
def _read_data_from_file(self):
"""
Read the recent run time from the file
:return:
"""
with open(self._file_path, 'r') as f:
data = f.readlines()
recent_run, is_running = self._read_data(data)
# Read running status
self._is_running = is_running == 'true'
# Set the datetime to the recent run time
if not recent_run:
return
try:
self._datetime = dt.fromtimestamp(float(recent_run)) \
.astimezone(self._timezone)
except ValueError as e:
raise ValueError(
f"Invalid timestamp '{recent_run}' in {self._file_path}: {e}")
@property
def datetime(self) -> dt | None:
"""
Get the recent run timestamp
:return:
"""
return self._datetime
@property
def started_at(self) -> dt | None:
"""
Get the time the script was started
:return:
"""
return self._started_at
@property
def timestamp(self) -> float:
"""
Get the recent run timestamp as a timestamp
:return:
"""
return self._datetime.timestamp()
@property
def is_running(self):
"""
Get the running status
:return:
"""
return self._is_running
@datetime.setter
def datetime(self, value: datetime):
"""
Set the recent run timestamp
:param value:
:return:
"""
if value.tzinfo is None:
value = value.astimezone(self._timezone)
self._datetime = value
@staticmethod
def _to_datetime(value: dt | str | float) -> datetime:
"""
Convert the value to a datetime object
:param value:
:return:
"""
try:
if isinstance(value, str):
value = float(value)
if isinstance(value, float):
value = dt.fromtimestamp(value).astimezone(timezone.utc)
except ValueError:
raise ValueError(f"Invalid timestamp '{value}'")
return value
def __enter__(self):
self._started_at = dt.now(self._timezone)
self._is_running = True
self._sync_file(is_running=self._is_running)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
datetime = None
self._is_running = False
# If an exception occurred, do not update the recent run timestamp
if exc_type is None:
self.datetime = datetime = self._started_at
self._sync_file(datetime, is_running=self._is_running)
def __gt__(self, other: dt | str | float):
return self.datetime > self._to_datetime(other)
def __lt__(self, other: dt | str | float):
return self.datetime < self._to_datetime(other)
def __eq__(self, other: dt | str | float):
return self.datetime == self._to_datetime(other)
def __ge__(self, other: dt | str | float):
return self.datetime >= self._to_datetime(other)
def __le__(self, other: dt | str | float):
return self.datetime <= self._to_datetime(other)
class CiviCrm:
"""
Class to interact with CiviCRM via the API
"""
def __init__(
self,
base_url: str,
api_key: str,
batch_size: int,
ignore_ssl: bool = False,
):
"""
Initialize the class
:param base_url: Base URL of the CiviCRM installation
:param api_key: API key for CiviCRM
:param batch_size: Number of users to send in one request
:param ignore_ssl: Accept unencrypted connections
"""
self._base_url = base_url
self._api_key = api_key
self._auth_flow = api.AUTH_FLOWS.XHEADER
self._batch_size = batch_size
self._ignore_ssl = ignore_ssl
self._requests = {'groups': deque(), 'users': deque()}
self._failed_requests = {'groups': [], 'users': []}
self._error_bag = []
def __enter__(self):
self._setup()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
if isinstance(exc_val, Exception):
logger.exception(
"The connection to CiviCRM was closed due to an exception",
extra={'exc_type': exc_type, 'exc_val': exc_val,
'exc_tb': exc_tb})
exit(1)
def _setup(self):
api_params = {
"url": self._base_url,
"api_key": self._api_key,
"auth_flow": self._auth_flow,
"ignore_ssl": self._ignore_ssl,
}
# Check for missing parameters
if any([v for v in api_params.values() if not v]):
missing_params = [k for k, v in api_params.items() if not v]
raise ValueError(
f"Missing API parameters: {', '.join(missing_params)}")
# Connect to CiviCRM
try:
logger.debug("Connecting to CiviCRM", extra=api_params)
api.setup(**api_params)
except Exception as e:
logger.exception(f"Error connecting to CiviCRM: {e}",
extra=api_params)
raise e
def update_groups(self, groups: dict[ADGroup, set]):
"""
Update the groups in CiviCRM via Mailingslistsync.Adgroupsync API
:param groups:
:return:
"""
groups_data = []
for group, users in groups.items():
# Filter users for missing values and wrong types
users = self._filter_users(users)
group_data = {
'sid': group.get("objectSid"),
'email': group.get("mail"),
'name': group.name,
'description': group.get("description"),
'recipients': json.dumps(users),
}
# Check group for missing values
name = group.name or 'Unknown'
sid = group.get('objectSid') or 'Unknown'
message = f"Missing values for group '{name}' ({sid}): %s"
if self.check_values(group_data, message, ['description']):
groups_data.append(self._filter_data(group_data))
# Add the groups to the request list
for group in groups_data:
self._requests['groups'].append({
'entity': 'Mailinglistsync',
'action': 'Adgroupsync',
'query': group,
'method': api.HTTP_METHODS.POST,
})
def update_users(self, users: set[ADUser]):
"""
Update the users in CiviCRM via Mailingslistsync.Adgroupsync API
:param users:
:return:
"""
# Filter users for missing values and wrong types
users = self._filter_users(users)
# Split the users into batches
data_batches = self._chunks(users, self._batch_size)
# Add the users to the request list
for batch in data_batches:
self._requests['users'].append({
'entity': 'Mailinglistsync',
'action': 'Adgroupsync',
'query': {'recipients': json.dumps(batch)},
'method': api.HTTP_METHODS.POST,
})
def send_requests(self) -> int:
"""
Run the tasks in the task queue
:return: Number of failed requests
"""
error_count = 0
failed_requests = {'groups': deque(), 'users': deque()}
for name, requests in self._requests.items():
logger.info(f"Sending {len(requests)} {name}")
while requests:
request = requests.popleft()
try:
result = api.api3(**request)
logger.info(f"Result: {result}", extra={'result': result})
if result.get('is_error', False):
raise Exception(result.get('error_message'))
except Exception as e:
self._error_bag.append({
'name': name,
'request': {
'entity': request['entity'],
'action': request['action'],
'query': {
k: (json.loads(v) if k == 'recipients' else v)
for k, v in request['query'].items()},
'method': str(request['method']),
},
'error': str(e),
})
logger.exception(f"Error sending request: {e}",
extra=request)
failed_requests[name].append(request)
error_count += 1
# Append failed requests to requests again
for name, requests in failed_requests.items():
while requests:
self._requests[name].append(requests.popleft())
return error_count
@staticmethod
def _chunks(lst, n):
"""Yield successive n-sized chunks from lst."""
for i in range(0, len(lst), n):
yield lst[i:i + n]
@classmethod
def _filter_users(cls, users: set) -> list | None:
"""
Filter users for missing values and wrong types
:param users: Set of users
:return: List of filtered users
"""
result = []
for user in users:
if isinstance(user, ADUser):
data = {
'sid': user.get("objectSid"),
'email': user.get("mail"),
'first_name': user.get("givenName"),
'last_name': user.get("sn"),
}
# Check for missing values and log them
name = user.get('sn') or 'Unknown'
sid = user.get('objectSid') or 'Unknown'
message = f"Missing values for user '{name}' ({sid}): %s"
if cls.check_values(data, message,
['first_name', 'last_name']):
result.append(cls._filter_data(data))
else:
raise ValueError(f"Invalid user type: {type(user)}")
return result
@staticmethod
def _filter_data(data):
"""
Filter the data for missing values
:return:
"""
return {k: v for k, v in data.items() if v is not None}
@staticmethod
def check_values(data: dict, message: str, ignore_keys: list[str] = None):
"""
Check for missing values in the data and log them.
:param data:
:param message: Should contain an %s placeholder for the missing values
:param ignore_keys: List of keys to ignore
:return: True if all values are present, False otherwise
"""
if ignore_keys is None:
ignore_keys = []
missing_values = {
key: value for key, value in data.items() if
not value and key not in ignore_keys
}
if missing_values:
message = message % ', '.join(missing_values.keys())
log_data = {}
for key, value in data.items(): # Sanitize the data
if key in ['name']:
log_data['name_'] = value
else:
log_data[key] = value
logger.debug(
message,
extra={'data': log_data}
)
return not bool(missing_values)
@staticmethod
def close():
"""
Close the connection to CiviCRM
:return:
"""
api.disconnect()
@property
def requests(self) -> dict:
"""
Get the requests
:return:
"""
return self._requests
@property
def error_bag(self) -> list:
"""
Get the error bag
:return:
"""
return self._error_bag
class Ntfy:
"""
Class to send notifications via ntfy
"""
PRIORITY = Priority
def __init__(self, url: str, access_token: str = None):
"""
Initialize the class
:param url: nfyt URL
:param access_token: Access token if required
"""
self.url = url if url.endswith('/') else f"{url}/"
self.access_token = access_token
def send(
self,
topic,
message: str = None,
title: str = None,
tags: str | list = None,
priority: int | PRIORITY = None,
link: str = None,
markdown: bool = False,
**kwargs
):
"""
Send a notification via ntfy
:param topic: Topic to send the notification to
:param message: Message to send
:param title: Message title
:param tags: Tags to add to the message (see ntfy documentation)
:param priority: See Priority enum
:param link: A link to add to the message
:param markdown: Whether to use markdown
:param kwargs:
:return:
"""
if self.access_token:
headers = {
'Authorization': f'Bearer {self.access_token}',
} | kwargs.get('headers', {})
else:
headers = kwargs.get('headers', {})
match priority:
case self.PRIORITY.MIN:
headers['Priority'] = 'min'
case self.PRIORITY.LOW:
headers['Priority'] = 'low'
case self.PRIORITY.DEFAULT:
headers['Priority'] = 'default'
case self.PRIORITY.HIGH:
headers['Priority'] = 'high'
case self.PRIORITY.MAX:
headers['Priority'] = 'max'
case _:
headers['Priority'] = 'default'
if title:
headers['Title'] = title
if tags:
headers['Tags'] = tags if isinstance(tags, str) else ','.join(tags)
if link:
headers['Click'] = link
if markdown:
headers['Markdown'] = 'yes'
try:
post(f"{self.url}{topic}", headers=headers, data=message)
except Exception as e:
logger.exception(f"Error sending notification: {e}", extra={
'url': self.url,
'topic': topic,
'headers': headers,
'message': message,
})