import argparse import logging import os from pathlib import Path import pytz import tomlkit from tomlkit.toml_file import TOMLFile logger = logging.getLogger(__package__) def create_config_file(dest: Path): if dest.is_dir(): dest = dest / f"{__package__}_config.toml" conf = tomlkit.document() conf.add(tomlkit.comment(f"Configuration file for {__package__}")) conf.add(tomlkit.nl()) # Add AD section ad = tomlkit.table() ad.add('DOMAIN', 'ad.example.com') ad['DOMAIN'].comment('The domain of the Active Directory server') ad.add('LDAP_SERVER', ['ldaps://server1.ad.example.com:636']) ad['LDAP_SERVER'].comment('List of LDAP servers to connect to') ad.add('PARENT_GROUP', 'Mailinglists') ad['PARENT_GROUP'].comment('The parent group in Active Directory that ' 'contains all groups that should be ' 'synchronized') ad.add('TIMEZONE', 'UTC') ad.add('USER', tomlkit.string('example\\username', literal=True)) ad['USER'].comment('The username of the user to connect to the Active ' 'Directory server. (use single quotes)') ad.add('PASSWORD', 'xxxxxxxx') ad['PASSWORD'].comment('The password of the user to connect to the Active ' 'Directory') conf.add('AD', ad) # Add CiviCRM section civicrm = tomlkit.table() civicrm.add('BASE_URL', 'https://civicrm.example.com') civicrm.add('API_KEY', 'xxxxxxxx') civicrm['API_KEY'].comment('The API key of the CiviCRM user') civicrm['BASE_URL'].comment('The URL of the CiviCRM server') civicrm.add('BATCH_SIZE', 50) civicrm['BATCH_SIZE'].comment('The batch size for the API requests to the ' 'CiviCRM server - only applied to contact ' 'sync (default: 50)') civicrm.add('RETRIES', 3) civicrm['RETRIES'].comment('The number of retries for the API requests to ' 'the CiviCRM (default: 3)') civicrm.add('IGNORE_SSL', False) civicrm.value.item('IGNORE_SSL').comment('Allow insecure connections to ' 'the CiviCRM server (default: ' 'false)') conf.add('CIVICRM', civicrm) # Add Logging section logging_ = tomlkit.table() logging_.add('STDOUT_LOG_LEVEL', 'info') logging_['STDOUT_LOG_LEVEL'].comment('The log level for the stdout logger ' '(default: info)') logging_.add('FILE_LOG_LEVEL', 'info') logging_['FILE_LOG_LEVEL'].comment('The log level for the file logger ' '(default: info)') logging_.add('LOG_DIR', '/var/log/adGroupSync/') logging_['LOG_DIR'].comment('The directory to store the log file') conf.add('LOGGING', logging_) # Add Ntfy section ntfy = tomlkit.table() ntfy.add(tomlkit.comment('Optional section for ntfy configuration')) ntfy.add('URL', 'https://ntfy.example.com') ntfy['URL'].comment('The URL of the ntfy server') ntfy.add('TOPIC', 'adGroupSync') ntfy['TOPIC'].comment('The topic to post the message to') ntfy.add('ACCESS_TOKEN', 'tk_xxxxxxxxxxxxxxxxxxx') ntfy['ACCESS_TOKEN'].comment('The access token for the ntfy server') conf.add('NTFY', ntfy) # Write configuration file TOMLFile(dest).write(conf) # Assign environment variables or configuration file values AD_DOMAIN = os.getenv('AD_DOMAIN') AD_USER_NAME = os.getenv('AD_USER') AD_PASSWORD = os.getenv('AD_PASSWORD') AD_LDAP_SERVER = [s.strip() for s in os.getenv('AD_LDAP_SERVER').split(',')] \ if os.getenv('AD_LDAP_SERVER') is not None else None AD_TIMEZONE = pytz.timezone(os.getenv('AD_TIMEZONE')) \ if os.getenv('AD_TIMEZONE') else None AD_PARENT_GROUP = os.getenv('AD_PARENT_GROUP') STDOUT_LOG_LEVEL = os.getenv('STDOUT_LOG_LEVEL') FILE_LOG_LEVEL = os.getenv('FILE_LOG_LEVEL') LOG_DIR = os.getenv('LOG_DIR') CIVICRM_BASE_URL = os.getenv('CIVICRM_BASE_URL') CIVICRM_API_KEY = os.getenv('CIVICRM_API_KEY') CIVICRM_BATCH_SIZE = int(os.getenv('CIVICRM_BATCH_SIZE')) \ if os.getenv('CIVICRM_BATCH_SIZE') is not None else None CIVICRM_RETRIES = int(os.getenv('CIVICRM_RETRIES')) \ if os.getenv('CIVICRM_RETRIES') is not None else None CIVICRM_IGNORE_SSL = bool(os.getenv('CIVICRM_IGNORE_SSL')) \ if os.getenv('CIVICRM_IGNORE_SSL') is not None else None NTFY_URL = os.getenv('NTFY_URL') NTFY_TOPIC = os.getenv('NTFY_TOPIC') NTFY_ACCESS_TOKEN = os.getenv('NTFY_ACCESS_TOKEN') try: argparser = argparse.ArgumentParser( description='This program synchronizes Active Directory groups with ' 'CiviCRM groups.') argparser.add_argument( "--conf", action="store", type=Path, help="Path the configuration file", ) argparser.add_argument( "--create-conf", action="store_true", help="Create a configuration file", ) args = argparser.parse_args() # If a path to a config file was provided if args.conf: # Check if configuration file exists config_file = Path(args.conf) if not config_file.is_file() and not args.create_conf: raise FileNotFoundError( f"Configuration file '{config_file}' does not exist.") # Create configuration file if requested and exit if args.create_conf: create_config_file(config_file) exit(0) # Load configuration file config = TOMLFile(config_file).read() # Get values from configuration file AD_DOMAIN = AD_DOMAIN or config['AD']['DOMAIN'] AD_USER_NAME = AD_USER_NAME or config['AD']['USER'] AD_PASSWORD = AD_PASSWORD or config['AD']['PASSWORD'] AD_LDAP_SERVER = AD_LDAP_SERVER or config['AD'].get('LDAP_SERVER') AD_TIMEZONE = AD_TIMEZONE \ or pytz.timezone(config['AD'].get('TIMEZONE', 'UTC')) AD_PARENT_GROUP = AD_PARENT_GROUP or config['AD']['PARENT_GROUP'] STDOUT_LOG_LEVEL = STDOUT_LOG_LEVEL \ or config['LOGGING'].get('STDOUT_LOG_LEVEL', 'INFO') FILE_LOG_LEVEL = FILE_LOG_LEVEL \ or config['LOGGING'].get('FILE_LOG_LEVEL', 'WARNING') LOG_DIR = LOG_DIR or config['LOGGING'].get('LOG_DIR') CIVICRM_BASE_URL = CIVICRM_BASE_URL or config['CIVICRM']['BASE_URL'] CIVICRM_API_KEY = CIVICRM_API_KEY or config['CIVICRM']['API_KEY'] CIVICRM_BATCH_SIZE = CIVICRM_BATCH_SIZE \ or config['CIVICRM']['BATCH_SIZE'] CIVICRM_RETRIES = CIVICRM_RETRIES \ or config['CIVICRM'].get('RETRIES', 3) CIVICRM_IGNORE_SSL = CIVICRM_IGNORE_SSL \ or bool(config['CIVICRM'].get('IGNORE_SSL', False)) NTFY_URL = NTFY_URL or config['NTFY'].get( 'URL') if 'NTFY' in config else None NTFY_TOPIC = NTFY_TOPIC or config['NTFY'].get( 'TOPIC') if 'NTFY' in config else None NTFY_ACCESS_TOKEN = NTFY_ACCESS_TOKEN \ or config['NTFY'].get( 'ACCESS_TOKEN') if 'NTFY' in config else None # Check if some required values are missing required = { "AD_DOMAIN": AD_DOMAIN, "AD_USER_NAME": AD_USER_NAME, "AD_PASSWORD": AD_PASSWORD, "AD_LDAP_SERVER": AD_LDAP_SERVER, "AD_PARENT_GROUP": AD_PARENT_GROUP, "CIVICRM_BASE_URL": CIVICRM_BASE_URL, "CIVICRM_API_KEY": CIVICRM_API_KEY, } if len(missing := [k for k, v in required.items() if v is None]) > 0: raise ValueError('Some required values are missing. ' 'Please use a configuration file ' 'or provide all required environment variables. ' 'Missing: %s' % ','.join(missing)) except Exception as e: logger.error(e, exc_info=True) exit(1)