adGroupSync/src/adgroupsync/conf.py
Marc Koch e8503bd073
replace tomlib with tomlkit
tomlkit can read and write toml files
2025-03-25 10:05:33 +01:00

224 lines
8.3 KiB
Python

import argparse
import logging
import os
from pathlib import Path
from crontab import CronTab, CronSlices
import shutil
import pytz
import tomlkit
from tomlkit.toml_file import TOMLFile
logger = logging.getLogger(__package__)
def create_config_file(dest: Path):
if dest.is_dir():
dest = dest / f"{__package__}_config.toml"
conf = tomlkit.document()
conf.add(tomlkit.comment(f"Configuration file for {__package__}"))
conf.add(tomlkit.nl())
# Add AD section
ad = tomlkit.table()
ad.add('DOMAIN', 'ad.example.com')
ad['DOMAIN'].comment('The domain of the Active Directory server')
ad.add('LDAP_SERVER', ['ldaps://server1.ad.example.com:636'])
ad['LDAP_SERVER'].comment('List of LDAP servers to connect to')
ad.add('PARENT_GROUP', 'Mailinglists')
ad['PARENT_GROUP'].comment('The parent group in Active Directory that '
'contains all groups that should be '
'synchronized')
ad.add('TIMEZONE', 'UTC')
ad.add('USER', tomlkit.string('example\\username', literal=True))
ad['USER'].comment('The username of the user to connect to the Active '
'Directory server. (use single quotes)')
ad.add('PASSWORD', 'xxxxxxxx')
ad['PASSWORD'].comment('The password of the user to connect to the Active '
'Directory')
conf.add('AD', ad)
# Add CiviCRM section
civicrm = tomlkit.table()
civicrm.add('BASE_URL', 'https://civicrm.example.com')
civicrm.add('API_KEY', 'xxxxxxxx')
civicrm['API_KEY'].comment('The API key of the CiviCRM user')
civicrm['BASE_URL'].comment('The URL of the CiviCRM server')
civicrm.add('BATCH_SIZE', 50)
civicrm['BATCH_SIZE'].comment('The batch size for the API requests to the '
'CiviCRM server - only applied to contact '
'sync (default: 50)')
civicrm.add('RETRIES', 3)
civicrm['RETRIES'].comment('The number of retries for the API requests to '
'the CiviCRM (default: 3)')
civicrm.add('IGNORE_SSL', False)
civicrm.value.item('IGNORE_SSL').comment('Allow insecure connections to '
'the CiviCRM server (default: '
'false)')
conf.add('CIVICRM', civicrm)
# Add Logging section
logging_ = tomlkit.table()
logging_.add('STDOUT_LOG_LEVEL', 'info')
logging_['STDOUT_LOG_LEVEL'].comment('The log level for the stdout logger '
'(default: info)')
logging_.add('FILE_LOG_LEVEL', 'info')
logging_['FILE_LOG_LEVEL'].comment('The log level for the file logger '
'(default: info)')
logging_.add('LOG_DIR', '/var/log/adGroupSync/')
logging_['LOG_DIR'].comment('The directory to store the log file')
conf.add('LOGGING', logging_)
# Add Ntfy section
ntfy = tomlkit.table()
ntfy.add(tomlkit.comment('Optional section for ntfy configuration'))
ntfy.add('URL', 'https://ntfy.example.com')
ntfy['URL'].comment('The URL of the ntfy server')
ntfy.add('TOPIC', 'adGroupSync')
ntfy['TOPIC'].comment('The topic to post the message to')
ntfy.add('ACCESS_TOKEN', 'tk_xxxxxxxxxxxxxxxxxxx')
ntfy['ACCESS_TOKEN'].comment('The access token for the ntfy server')
conf.add('NTFY', ntfy)
# Write configuration file
TOMLFile(dest).write(conf)
# Assign environment variables or configuration file values
AD_DOMAIN = os.getenv("AD_DOMAIN")
AD_USER_NAME = os.getenv("AD_USER")
AD_PASSWORD = os.getenv("AD_PASSWORD")
AD_LDAP_SERVER = (
[s.strip() for s in os.getenv("AD_LDAP_SERVER").split(",")]
if os.getenv("AD_LDAP_SERVER") is not None
else None
)
AD_TIMEZONE = (
pytz.timezone(os.getenv("AD_TIMEZONE")) if os.getenv("AD_TIMEZONE") else None
)
AD_PARENT_GROUP = os.getenv("AD_PARENT_GROUP")
STDOUT_LOG_LEVEL = os.getenv("STDOUT_LOG_LEVEL")
FILE_LOG_LEVEL = os.getenv("FILE_LOG_LEVEL")
LOG_DIR = os.getenv("LOG_DIR")
CIVICRM_BASE_URL = os.getenv("CIVICRM_BASE_URL")
CIVICRM_API_KEY = os.getenv("CIVICRM_API_KEY")
CIVICRM_BATCH_SIZE = (
int(os.getenv("CIVICRM_BATCH_SIZE"))
if os.getenv("CIVICRM_BATCH_SIZE") is not None
else None
)
CIVICRM_RETRIES = (
int(os.getenv("CIVICRM_RETRIES"))
if os.getenv("CIVICRM_RETRIES") is not None
else None
)
CIVICRM_IGNORE_SSL = (
bool(os.getenv("CIVICRM_IGNORE_SSL"))
if os.getenv("CIVICRM_IGNORE_SSL") is not None
else None
)
NTFY_URL = os.getenv("NTFY_URL")
NTFY_TOPIC = os.getenv("NTFY_TOPIC")
NTFY_ACCESS_TOKEN = os.getenv("NTFY_ACCESS_TOKEN")
try:
argparser = argparse.ArgumentParser(
description="This program synchronizes Active Directory groups with "
"CiviCRM groups."
)
argparser.add_argument(
"--conf",
action="store",
type=Path,
help="Path the configuration file",
)
argparser.add_argument(
"--create-conf",
action="store_true",
help="Create a configuration file",
)
argparser.add_argument(
"--create-cron",
action="store",
help="Create a cron job",
)
args = argparser.parse_args()
# If a path to a config file was provided
if args.conf:
# Check if configuration file exists
config_file = Path(args.conf)
if not config_file.is_file() and not args.create_conf:
raise FileNotFoundError(
f"Configuration file '{config_file}' does not exist."
)
# Create configuration file if requested and exit
if args.create_conf:
create_config_file(config_file)
exit(0)
# Create the crob job if requested and exit
if args.create_cron:
cron_job = args.create_cron
create_cron_job(cron_job, config_file)
exit(0)
# Load configuration file
config = TOMLFile(config_file).read()
# Get values from configuration file
AD_DOMAIN = AD_DOMAIN or config['AD']['DOMAIN']
AD_USER_NAME = AD_USER_NAME or config['AD']['USER']
AD_PASSWORD = AD_PASSWORD or config['AD']['PASSWORD']
AD_LDAP_SERVER = AD_LDAP_SERVER or config['AD'].get('LDAP_SERVER')
AD_TIMEZONE = AD_TIMEZONE \
or pytz.timezone(config['AD'].get('TIMEZONE', 'UTC'))
AD_PARENT_GROUP = AD_PARENT_GROUP or config['AD']['PARENT_GROUP']
STDOUT_LOG_LEVEL = STDOUT_LOG_LEVEL \
or config['LOGGING'].get('STDOUT_LOG_LEVEL', 'INFO')
FILE_LOG_LEVEL = FILE_LOG_LEVEL \
or config['LOGGING'].get('FILE_LOG_LEVEL', 'WARNING')
LOG_DIR = LOG_DIR or config['LOGGING'].get('LOG_DIR')
CIVICRM_BASE_URL = CIVICRM_BASE_URL or config['CIVICRM']['BASE_URL']
CIVICRM_API_KEY = CIVICRM_API_KEY or config['CIVICRM']['API_KEY']
CIVICRM_BATCH_SIZE = CIVICRM_BATCH_SIZE \
or config['CIVICRM']['BATCH_SIZE']
CIVICRM_RETRIES = CIVICRM_RETRIES \
or config['CIVICRM'].get('RETRIES', 3)
CIVICRM_IGNORE_SSL = CIVICRM_IGNORE_SSL \
or bool(config['CIVICRM'].get('IGNORE_SSL', False))
NTFY_URL = NTFY_URL or config['NTFY'].get(
'URL') if 'NTFY' in config else None
NTFY_TOPIC = NTFY_TOPIC or config['NTFY'].get(
'TOPIC') if 'NTFY' in config else None
NTFY_ACCESS_TOKEN = NTFY_ACCESS_TOKEN \
or config['NTFY'].get(
'ACCESS_TOKEN') if 'NTFY' in config else None
# Check if some required values are missing
required = {
"AD_DOMAIN": AD_DOMAIN,
"AD_USER_NAME": AD_USER_NAME,
"AD_PASSWORD": AD_PASSWORD,
"AD_LDAP_SERVER": AD_LDAP_SERVER,
"AD_PARENT_GROUP": AD_PARENT_GROUP,
"CIVICRM_BASE_URL": CIVICRM_BASE_URL,
"CIVICRM_API_KEY": CIVICRM_API_KEY,
}
if len(missing := [k for k, v in required.items() if v is None]) > 0:
raise ValueError(
"Some required values are missing. "
"Please use a configuration file "
"or provide all required environment variables. "
"Missing: %s" % ",".join(missing)
)
except Exception as e:
logger.error(e, exc_info=True)
exit(1)