import argparse import logging import os from pathlib import Path from crontab import CronTab, CronSlices import shutil import pytz import tomlkit from tomlkit.toml_file import TOMLFile logger = logging.getLogger(__package__) def create_config_file(dest: Path): if dest.is_dir(): dest = dest / f"{__package__}_config.toml" conf = tomlkit.document() conf.add(tomlkit.comment(f"Configuration file for {__package__}")) conf.add(tomlkit.nl()) # Add AD section ad = tomlkit.table() ad.add('DOMAIN', 'ad.example.com') ad['DOMAIN'].comment('The domain of the Active Directory server') ad.add('LDAP_SERVER', ['ldaps://server1.ad.example.com:636']) ad['LDAP_SERVER'].comment('List of LDAP servers to connect to') ad.add('PARENT_GROUP', 'Mailinglists') ad['PARENT_GROUP'].comment('The parent group in Active Directory that ' 'contains all groups that should be ' 'synchronized') ad.add('TIMEZONE', 'UTC') ad.add('USER', tomlkit.string('example\\username', literal=True)) ad['USER'].comment('The username of the user to connect to the Active ' 'Directory server. (use single quotes)') ad.add('PASSWORD', 'xxxxxxxx') ad['PASSWORD'].comment('The password of the user to connect to the Active ' 'Directory') conf.add('AD', ad) # Add CiviCRM section civicrm = tomlkit.table() civicrm.add('BASE_URL', 'https://civicrm.example.com') civicrm.add('API_KEY', 'xxxxxxxx') civicrm['API_KEY'].comment('The API key of the CiviCRM user') civicrm['BASE_URL'].comment('The URL of the CiviCRM server') civicrm.add('BATCH_SIZE', 50) civicrm['BATCH_SIZE'].comment('The batch size for the API requests to the ' 'CiviCRM server - only applied to contact ' 'sync (default: 50)') civicrm.add('RETRIES', 3) civicrm['RETRIES'].comment('The number of retries for the API requests to ' 'the CiviCRM (default: 3)') civicrm.add('IGNORE_SSL', False) civicrm.value.item('IGNORE_SSL').comment('Allow insecure connections to ' 'the CiviCRM server (default: ' 'false)') conf.add('CIVICRM', civicrm) # Add Logging section logging_ = tomlkit.table() logging_.add('STDOUT_LOG_LEVEL', 'info') logging_['STDOUT_LOG_LEVEL'].comment('The log level for the stdout logger ' '(default: info)') logging_.add('FILE_LOG_LEVEL', 'info') logging_['FILE_LOG_LEVEL'].comment('The log level for the file logger ' '(default: info)') logging_.add('LOG_DIR', '/var/log/adGroupSync/') logging_['LOG_DIR'].comment('The directory to store the log file') conf.add('LOGGING', logging_) # Add Ntfy section ntfy = tomlkit.table() ntfy.add(tomlkit.comment('Optional section for ntfy configuration')) ntfy.add('URL', 'https://ntfy.example.com') ntfy['URL'].comment('The URL of the ntfy server') ntfy.add('TOPIC', 'adGroupSync') ntfy['TOPIC'].comment('The topic to post the message to') ntfy.add('ACCESS_TOKEN', 'tk_xxxxxxxxxxxxxxxxxxx') ntfy['ACCESS_TOKEN'].comment('The access token for the ntfy server') conf.add('NTFY', ntfy) # Write configuration file TOMLFile(dest).write(conf) def create_cron_job(cron_job: str, config_file: Path): # Check if the script exists and its executable if not shutil.which(__package__): print( f"No executable found, please add this to your crontab manually: '/path/to/adgroupsync --conf {config_file} >/dev/null 2>&1'" ) return # Checking if the string is valid if not CronSlices.is_valid(cron_job): raise Exception(f"Cron job '{cron_job}' is not valid.") # Creating the cron job cron = CronTab(user=True) job = cron.new(command=f"adgroupsync --conf {config_file} >/dev/null 2>&1") job.setall(cron_job) cron.write() # Assign environment variables or configuration file values AD_DOMAIN = os.getenv("AD_DOMAIN") AD_USER_NAME = os.getenv("AD_USER") AD_PASSWORD = os.getenv("AD_PASSWORD") AD_LDAP_SERVER = ( [s.strip() for s in os.getenv("AD_LDAP_SERVER").split(",")] if os.getenv("AD_LDAP_SERVER") is not None else None ) AD_TIMEZONE = ( pytz.timezone(os.getenv("AD_TIMEZONE")) if os.getenv("AD_TIMEZONE") else None ) AD_PARENT_GROUP = os.getenv("AD_PARENT_GROUP") STDOUT_LOG_LEVEL = os.getenv("STDOUT_LOG_LEVEL") FILE_LOG_LEVEL = os.getenv("FILE_LOG_LEVEL") LOG_DIR = os.getenv("LOG_DIR") CIVICRM_BASE_URL = os.getenv("CIVICRM_BASE_URL") CIVICRM_API_KEY = os.getenv("CIVICRM_API_KEY") CIVICRM_BATCH_SIZE = ( int(os.getenv("CIVICRM_BATCH_SIZE")) if os.getenv("CIVICRM_BATCH_SIZE") is not None else None ) CIVICRM_RETRIES = ( int(os.getenv("CIVICRM_RETRIES")) if os.getenv("CIVICRM_RETRIES") is not None else None ) CIVICRM_IGNORE_SSL = ( bool(os.getenv("CIVICRM_IGNORE_SSL")) if os.getenv("CIVICRM_IGNORE_SSL") is not None else None ) NTFY_URL = os.getenv("NTFY_URL") NTFY_TOPIC = os.getenv("NTFY_TOPIC") NTFY_ACCESS_TOKEN = os.getenv("NTFY_ACCESS_TOKEN") try: argparser = argparse.ArgumentParser( description="This program synchronizes Active Directory groups with " "CiviCRM groups." ) argparser.add_argument( "--conf", action="store", type=Path, help="Path the configuration file", ) argparser.add_argument( "--create-conf", action="store_true", help="Create a configuration file", ) argparser.add_argument( "--create-cron", action="store", help="Create a cron job", ) argparser.add_argument( "--force", action="store_true", help="Forces the script execution skipping recent run checking", ) args = argparser.parse_args() # If a path to a config file was provided if args.conf: # Check if configuration file exists config_file = Path(args.conf) if not config_file.is_file() and not args.create_conf: raise FileNotFoundError( f"Configuration file '{config_file}' does not exist." ) # Create configuration file if requested and exit if args.create_conf: create_config_file(config_file) exit(0) # Create the crob job if requested and exit if args.create_cron: cron_job = args.create_cron create_cron_job(cron_job, config_file) exit(0) # Saving the force flag FORCE_FLAG=args.force # Load configuration file config = TOMLFile(config_file).read() # Get values from configuration file AD_DOMAIN = AD_DOMAIN or config["AD"]["DOMAIN"] AD_USER_NAME = AD_USER_NAME or config["AD"]["USER"] AD_PASSWORD = AD_PASSWORD or config["AD"]["PASSWORD"] AD_LDAP_SERVER = AD_LDAP_SERVER or config["AD"].get("LDAP_SERVER") AD_TIMEZONE = AD_TIMEZONE or pytz.timezone(config["AD"].get("TIMEZONE", "UTC")) AD_PARENT_GROUP = AD_PARENT_GROUP or config["AD"]["PARENT_GROUP"] STDOUT_LOG_LEVEL = STDOUT_LOG_LEVEL or config["LOGGING"].get( "STDOUT_LOG_LEVEL", "INFO" ) FILE_LOG_LEVEL = FILE_LOG_LEVEL or config["LOGGING"].get( "FILE_LOG_LEVEL", "WARNING" ) LOG_DIR = LOG_DIR or config["LOGGING"].get("LOG_DIR") CIVICRM_BASE_URL = CIVICRM_BASE_URL or config["CIVICRM"]["BASE_URL"] CIVICRM_API_KEY = CIVICRM_API_KEY or config["CIVICRM"]["API_KEY"] CIVICRM_BATCH_SIZE = CIVICRM_BATCH_SIZE or config["CIVICRM"]["BATCH_SIZE"] CIVICRM_RETRIES = CIVICRM_RETRIES or config["CIVICRM"].get("RETRIES", 3) CIVICRM_IGNORE_SSL = CIVICRM_IGNORE_SSL or bool( config["CIVICRM"].get("IGNORE_SSL", False) ) NTFY_URL = NTFY_URL or config["NTFY"].get("URL") if "NTFY" in config else None NTFY_TOPIC = ( NTFY_TOPIC or config["NTFY"].get("TOPIC") if "NTFY" in config else None ) NTFY_ACCESS_TOKEN = ( NTFY_ACCESS_TOKEN or config["NTFY"].get("ACCESS_TOKEN") if "NTFY" in config else None ) # Check if some required values are missing required = { "AD_DOMAIN": AD_DOMAIN, "AD_USER_NAME": AD_USER_NAME, "AD_PASSWORD": AD_PASSWORD, "AD_LDAP_SERVER": AD_LDAP_SERVER, "AD_PARENT_GROUP": AD_PARENT_GROUP, "CIVICRM_BASE_URL": CIVICRM_BASE_URL, "CIVICRM_API_KEY": CIVICRM_API_KEY, } if len(missing := [k for k, v in required.items() if v is None]) > 0: raise ValueError( "Some required values are missing. " "Please use a configuration file " "or provide all required environment variables. " "Missing: %s" % ",".join(missing) ) except Exception as e: logger.error(e, exc_info=True) exit(1)