import datetime import json import logging import tomllib from collections import deque from datetime import datetime as dt, timezone from pathlib import Path import pytz import tomli_w from civifang import api from httpx import post from ms_active_directory import ADUser, ADGroup from .enums import Priority from .exceptions import ScriptAlreadyRunningError logger = logging.getLogger(__package__) class RecentRun: """ Class to manage the last run of the script """ def __init__(self, file_path: Path, tz: pytz = pytz.utc): """ Initialize the class :param file_path: File path to store the recent run timestamp :param tz: Timezone to use for the timestamp """ self._datetime = None self._timezone = tz self._file_path = file_path self._is_running = None self._already_running = None self._started_at = None # Create the file if it does not exist self._file_path.touch(exist_ok=True) self._read_data_from_file() # If the script was already running, throw an exception if self._already_running: raise ScriptAlreadyRunningError('The script is already running.') def _sync_file( self, recent_run: dt | None = None, is_running: bool = False ): """ Write the recent run timestamp and running status to the file :param recent_run: :param is_running: :return: """ rr = recent_run if recent_run else self._datetime is_running = is_running if is_running is not None else self._already_running new_data = { 'recent-run': rr, 'is-running': is_running, } # Write the data to the file with open(self._file_path, 'wb') as f: tomli_w.dump(new_data, f) def _read_data_from_file(self): """ Read the recent run time from the file :return: """ with open(self._file_path, 'rb') as f: data = tomllib.load(f) self._already_running = data.get('is-running', False) recent_run = data.get('recent-run') # Set the datetime to the recent run time if not recent_run: return if isinstance(recent_run, datetime.datetime): self._datetime = recent_run elif isinstance(recent_run, float): self._datetime = dt.fromtimestamp(recent_run) \ .astimezone(self._timezone) else: raise ValueError( f"Invalid recent_run '{recent_run}' in {self._file_path}.") @property def datetime(self) -> dt | None: """ Get the recent run timestamp :return: """ return self._datetime @property def started_at(self) -> dt | None: """ Get the time the script was started :return: """ return self._started_at @property def timestamp(self) -> float: """ Get the recent run timestamp as a timestamp :return: """ return self._datetime.timestamp() @property def is_running(self): """ Get the running status :return: """ return self._is_running @datetime.setter def datetime(self, value: datetime): """ Set the recent run timestamp :param value: :return: """ if value.tzinfo is None: value = value.astimezone(self._timezone) self._datetime = value @staticmethod def _to_datetime(value: dt | str | float) -> datetime: """ Convert the value to a datetime object :param value: :return: """ try: if isinstance(value, str): value = float(value) if isinstance(value, float): value = dt.fromtimestamp(value).astimezone(timezone.utc) except ValueError: raise ValueError(f"Invalid timestamp '{value}'") return value def __enter__(self): self._started_at = dt.now(self._timezone) self._is_running = True self._sync_file(is_running=self._is_running) return self def __exit__(self, exc_type, exc_val, exc_tb): recent_run = None self._is_running = False # If no exception occurred, set the recent run time to the current time if exc_type is None: self.datetime = recent_run = self._started_at self._sync_file(recent_run=recent_run, is_running=self._is_running) def __gt__(self, other: dt | str | float): return self.datetime > self._to_datetime(other) def __lt__(self, other: dt | str | float): return self.datetime < self._to_datetime(other) def __eq__(self, other: dt | str | float): return self.datetime == self._to_datetime(other) def __ge__(self, other: dt | str | float): return self.datetime >= self._to_datetime(other) def __le__(self, other: dt | str | float): return self.datetime <= self._to_datetime(other) class CiviCrm: """ Class to interact with CiviCRM via the API """ def __init__( self, base_url: str, api_key: str, batch_size: int, ignore_ssl: bool = False, ): """ Initialize the class :param base_url: Base URL of the CiviCRM installation :param api_key: API key for CiviCRM :param batch_size: Number of users to send in one request :param ignore_ssl: Accept unencrypted connections """ self._base_url = base_url self._api_key = api_key self._auth_flow = api.AUTH_FLOWS.XHEADER self._batch_size = batch_size self._ignore_ssl = ignore_ssl self._requests = {'groups': deque(), 'users': deque()} self._failed_requests = {'groups': [], 'users': []} self._error_bag = [] def __enter__(self): self._setup() return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() if isinstance(exc_val, Exception): logger.exception( "The connection to CiviCRM was closed due to an exception", extra={'exc_type': exc_type, 'exc_val': exc_val, 'exc_tb': exc_tb}) exit(1) def _setup(self): api_params = { "url": self._base_url, "api_key": self._api_key, "auth_flow": self._auth_flow, "ignore_ssl": self._ignore_ssl, } # Check for missing parameters if any([v for v in api_params.values() if not v]): missing_params = [k for k, v in api_params.items() if not v] raise ValueError( f"Missing API parameters: {', '.join(missing_params)}") # Connect to CiviCRM try: logger.debug("Connecting to CiviCRM", extra=api_params) api.setup(**api_params) except Exception as e: logger.exception(f"Error connecting to CiviCRM: {e}", extra=api_params) raise e def update_groups(self, groups: dict[ADGroup, set]): """ Update the groups in CiviCRM via Mailingslistsync.Adgroupsync API :param groups: :return: """ groups_data = [] for group, users in groups.items(): # Filter users for missing values and wrong types users = self._filter_users(users) group_data = { 'sid': group.get("objectSid"), 'email': group.get("mail"), 'name': group.name, 'description': group.get("description"), 'recipients': json.dumps(users), } # Check group for missing values name = group.name or 'Unknown' sid = group.get('objectSid') or 'Unknown' message = f"Missing values for group '{name}' ({sid}): %s" if self.check_values(group_data, message, ['description']): groups_data.append(self._filter_data(group_data)) # Add the groups to the request list for group in groups_data: self._requests['groups'].append({ 'entity': 'Mailinglistsync', 'action': 'Adgroupsync', 'query': group, 'method': api.HTTP_METHODS.POST, }) def update_users(self, users: set[ADUser]): """ Update the users in CiviCRM via Mailingslistsync.Adgroupsync API :param users: :return: """ # Filter users for missing values and wrong types users = self._filter_users(users) # Split the users into batches data_batches = self._chunks(users, self._batch_size) # Add the users to the request list for batch in data_batches: self._requests['users'].append({ 'entity': 'Mailinglistsync', 'action': 'Adgroupsync', 'query': {'recipients': json.dumps(batch)}, 'method': api.HTTP_METHODS.POST, }) def send_requests(self) -> int: """ Run the tasks in the task queue :return: Number of failed requests """ error_count = 0 failed_requests = {'groups': deque(), 'users': deque()} for name, requests in self._requests.items(): logger.info(f"Sending {len(requests)} {name}") while requests: request = requests.popleft() try: result = api.api3(**request) logger.info(f"Result: {result}", extra={'result': result}) if result.get('is_error', False): raise Exception(result.get('error_message')) except Exception as e: self._error_bag.append({ 'name': name, 'request': { 'entity': request['entity'], 'action': request['action'], 'query': { k: (json.loads(v) if k == 'recipients' else v) for k, v in request['query'].items()}, 'method': str(request['method']), }, 'error': str(e), }) logger.exception(f"Error sending request: {e}", extra=request) failed_requests[name].append(request) error_count += 1 # Append failed requests to requests again for name, requests in failed_requests.items(): while requests: self._requests[name].append(requests.popleft()) return error_count @staticmethod def _chunks(lst, n): """Yield successive n-sized chunks from lst.""" for i in range(0, len(lst), n): yield lst[i:i + n] @classmethod def _filter_users(cls, users: set) -> list | None: """ Filter users for missing values and wrong types :param users: Set of users :return: List of filtered users """ result = [] for user in users: if isinstance(user, ADUser): data = { 'sid': user.get("objectSid"), 'email': user.get("mail"), 'first_name': user.get("givenName"), 'last_name': user.get("sn"), } # Check for missing values and log them name = user.get('sn') or 'Unknown' sid = user.get('objectSid') or 'Unknown' message = f"Missing values for user '{name}' ({sid}): %s" if cls.check_values(data, message, ['first_name', 'last_name']): result.append(cls._filter_data(data)) else: raise ValueError(f"Invalid user type: {type(user)}") return result @staticmethod def _filter_data(data): """ Filter the data for missing values :return: """ return {k: v for k, v in data.items() if v is not None} @staticmethod def check_values(data: dict, message: str, ignore_keys: list[str] = None): """ Check for missing values in the data and log them. :param data: :param message: Should contain an %s placeholder for the missing values :param ignore_keys: List of keys to ignore :return: True if all values are present, False otherwise """ if ignore_keys is None: ignore_keys = [] missing_values = { key: value for key, value in data.items() if not value and key not in ignore_keys } if missing_values: message = message % ', '.join(missing_values.keys()) log_data = {} for key, value in data.items(): # Sanitize the data if key in ['name']: log_data['name_'] = value else: log_data[key] = value logger.debug( message, extra={'data': log_data} ) return not bool(missing_values) @staticmethod def close(): """ Close the connection to CiviCRM :return: """ api.disconnect() @property def requests(self) -> dict: """ Get the requests :return: """ return self._requests @property def error_bag(self) -> list: """ Get the error bag :return: """ return self._error_bag class Ntfy: """ Class to send notifications via ntfy """ PRIORITY = Priority def __init__(self, url: str, access_token: str = None): """ Initialize the class :param url: nfyt URL :param access_token: Access token if required """ self.url = url if url.endswith('/') else f"{url}/" self.access_token = access_token def send( self, topic, message: str = None, title: str = None, tags: str | list = None, priority: int | PRIORITY = None, link: str = None, markdown: bool = False, **kwargs ): """ Send a notification via ntfy :param topic: Topic to send the notification to :param message: Message to send :param title: Message title :param tags: Tags to add to the message (see ntfy documentation) :param priority: See Priority enum :param link: A link to add to the message :param markdown: Whether to use markdown :param kwargs: :return: """ if self.access_token: headers = { 'Authorization': f'Bearer {self.access_token}', } | kwargs.get('headers', {}) else: headers = kwargs.get('headers', {}) match priority: case self.PRIORITY.MIN: headers['Priority'] = 'min' case self.PRIORITY.LOW: headers['Priority'] = 'low' case self.PRIORITY.DEFAULT: headers['Priority'] = 'default' case self.PRIORITY.HIGH: headers['Priority'] = 'high' case self.PRIORITY.MAX: headers['Priority'] = 'max' case _: headers['Priority'] = 'default' if title: headers['Title'] = title if tags: headers['Tags'] = tags if isinstance(tags, str) else ','.join(tags) if link: headers['Click'] = link if markdown: headers['Markdown'] = 'yes' try: post(f"{self.url}{topic}", headers=headers, data=message) except Exception as e: logger.exception(f"Error sending notification: {e}", extra={ 'url': self.url, 'topic': topic, 'headers': headers, 'message': message, })