ADD: Cron job arg creation

This commit is contained in:
Álvaro Leiva 2025-03-24 13:57:45 +01:00
parent b7311d088d
commit 91d93832c6
2 changed files with 110 additions and 56 deletions

View file

@ -17,6 +17,7 @@ dependencies = [
"civifang>=0.2.6", "civifang>=0.2.6",
"httpx>=0.28.1", "httpx>=0.28.1",
"ms-active-directory>=1.14.1", "ms-active-directory>=1.14.1",
"python-crontab>=3.2.0",
"pyyaml>=6.0.2", "pyyaml>=6.0.2",
"validators>=0.34.0", "validators>=0.34.0",
] ]

View file

@ -2,6 +2,8 @@ import argparse
import logging import logging
import os import os
from pathlib import Path from pathlib import Path
from crontab import CronTab, CronSlices
import shutil
import pytz import pytz
import yaml import yaml
@ -12,40 +14,72 @@ logger = logging.getLogger(__package__)
def create_config_file(dest: Path): def create_config_file(dest: Path):
if dest.is_dir(): if dest.is_dir():
dest = dest / f"{__package__}_config.yml" dest = dest / f"{__package__}_config.yml"
example_conf = Path(__file__).parent / 'resources' / 'example_config.yml' example_conf = Path(__file__).parent / "resources" / "example_config.yml"
with open(example_conf, "r") as source: with open(example_conf, "r") as source:
with open(dest, "w") as d: with open(dest, "w") as d:
d.writelines(source) d.writelines(source)
def create_cron_job(cron_job: str, config_file: Path):
# Check if the script exists and its executable
if not shutil.which("test1"):
print(
f"No executable found, please add this to your crontab manually: '/path/to/adgroupsync --conf {config_file} >/dev/null 2>&1'"
)
# Checking if the string is valid
if not CronSlices.is_valid(cron_job):
raise Exception(f"Cron job '{cron_job}' is not valid.")
# Creating the cron job
cron = CronTab(user=True)
job = cron.new(command=f"adgroupsync --conf {config_file} >/dev/null 2>&1")
job.setall(cron_job)
cron.write()
# Assign environment variables or configuration file values # Assign environment variables or configuration file values
AD_DOMAIN = os.getenv('AD_DOMAIN') AD_DOMAIN = os.getenv("AD_DOMAIN")
AD_USER_NAME = os.getenv('AD_USER') AD_USER_NAME = os.getenv("AD_USER")
AD_PASSWORD = os.getenv('AD_PASSWORD') AD_PASSWORD = os.getenv("AD_PASSWORD")
AD_LDAP_SERVER = [s.strip() for s in os.getenv('AD_LDAP_SERVER').split(',')] \ AD_LDAP_SERVER = (
if os.getenv('AD_LDAP_SERVER') is not None else None [s.strip() for s in os.getenv("AD_LDAP_SERVER").split(",")]
AD_TIMEZONE = pytz.timezone(os.getenv('AD_TIMEZONE')) \ if os.getenv("AD_LDAP_SERVER") is not None
if os.getenv('AD_TIMEZONE') else None else None
AD_PARENT_GROUP = os.getenv('AD_PARENT_GROUP') )
STDOUT_LOG_LEVEL = os.getenv('STDOUT_LOG_LEVEL') AD_TIMEZONE = (
FILE_LOG_LEVEL = os.getenv('FILE_LOG_LEVEL') pytz.timezone(os.getenv("AD_TIMEZONE")) if os.getenv("AD_TIMEZONE") else None
LOG_DIR = os.getenv('LOG_DIR') )
CIVICRM_BASE_URL = os.getenv('CIVICRM_BASE_URL') AD_PARENT_GROUP = os.getenv("AD_PARENT_GROUP")
CIVICRM_API_KEY = os.getenv('CIVICRM_API_KEY') STDOUT_LOG_LEVEL = os.getenv("STDOUT_LOG_LEVEL")
CIVICRM_BATCH_SIZE = int(os.getenv('CIVICRM_BATCH_SIZE')) \ FILE_LOG_LEVEL = os.getenv("FILE_LOG_LEVEL")
if os.getenv('CIVICRM_BATCH_SIZE') is not None else None LOG_DIR = os.getenv("LOG_DIR")
CIVICRM_RETRIES = int(os.getenv('CIVICRM_RETRIES')) \ CIVICRM_BASE_URL = os.getenv("CIVICRM_BASE_URL")
if os.getenv('CIVICRM_RETRIES') is not None else None CIVICRM_API_KEY = os.getenv("CIVICRM_API_KEY")
CIVICRM_IGNORE_SSL = bool(os.getenv('CIVICRM_IGNORE_SSL')) \ CIVICRM_BATCH_SIZE = (
if os.getenv('CIVICRM_IGNORE_SSL') is not None else None int(os.getenv("CIVICRM_BATCH_SIZE"))
NTFY_URL = os.getenv('NTFY_URL') if os.getenv("CIVICRM_BATCH_SIZE") is not None
NTFY_TOPIC = os.getenv('NTFY_TOPIC') else None
NTFY_ACCESS_TOKEN = os.getenv('NTFY_ACCESS_TOKEN') )
CIVICRM_RETRIES = (
int(os.getenv("CIVICRM_RETRIES"))
if os.getenv("CIVICRM_RETRIES") is not None
else None
)
CIVICRM_IGNORE_SSL = (
bool(os.getenv("CIVICRM_IGNORE_SSL"))
if os.getenv("CIVICRM_IGNORE_SSL") is not None
else None
)
NTFY_URL = os.getenv("NTFY_URL")
NTFY_TOPIC = os.getenv("NTFY_TOPIC")
NTFY_ACCESS_TOKEN = os.getenv("NTFY_ACCESS_TOKEN")
try: try:
argparser = argparse.ArgumentParser( argparser = argparse.ArgumentParser(
description='This program synchronizes Active Directory groups with ' description="This program synchronizes Active Directory groups with "
'CiviCRM groups.') "CiviCRM groups."
)
argparser.add_argument( argparser.add_argument(
"--conf", "--conf",
@ -60,6 +94,12 @@ try:
help="Create a configuration file", help="Create a configuration file",
) )
argparser.add_argument(
"--create-cron",
action="store",
help="Create a cron job",
)
args = argparser.parse_args() args = argparser.parse_args()
# If a path to a config file was provided # If a path to a config file was provided
@ -69,42 +109,54 @@ try:
config_file = Path(args.conf) config_file = Path(args.conf)
if not config_file.is_file() and not args.create_conf: if not config_file.is_file() and not args.create_conf:
raise FileNotFoundError( raise FileNotFoundError(
f"Configuration file '{config_file}' does not exist.") f"Configuration file '{config_file}' does not exist."
)
# Create configuration file if requested and exit # Create configuration file if requested and exit
if args.create_conf: if args.create_conf:
create_config_file(config_file) create_config_file(config_file)
exit(0) exit(0)
# Create the crob job if requested and exit
if args.create_cron:
cron_job = args.create_cron
create_cron_job(cron_job, config_file)
exit(0)
# Load configuration file # Load configuration file
with open(config_file, 'r') as file: with open(config_file, "r") as file:
config = yaml.safe_load(file) config = yaml.safe_load(file)
# Get values from configuration file # Get values from configuration file
AD_DOMAIN = AD_DOMAIN or config['AD']['DOMAIN'] AD_DOMAIN = AD_DOMAIN or config["AD"]["DOMAIN"]
AD_USER_NAME = AD_USER_NAME or config['AD']['USER'] AD_USER_NAME = AD_USER_NAME or config["AD"]["USER"]
AD_PASSWORD = AD_PASSWORD or config['AD']['PASSWORD'] AD_PASSWORD = AD_PASSWORD or config["AD"]["PASSWORD"]
AD_LDAP_SERVER = AD_LDAP_SERVER or config['AD'].get('LDAP_SERVER') AD_LDAP_SERVER = AD_LDAP_SERVER or config["AD"].get("LDAP_SERVER")
AD_TIMEZONE = AD_TIMEZONE \ AD_TIMEZONE = AD_TIMEZONE or pytz.timezone(config["AD"].get("TIMEZONE", "UTC"))
or pytz.timezone(config['AD'].get('TIMEZONE', 'UTC')) AD_PARENT_GROUP = AD_PARENT_GROUP or config["AD"]["PARENT_GROUP"]
AD_PARENT_GROUP = AD_PARENT_GROUP or config['AD']['PARENT_GROUP'] STDOUT_LOG_LEVEL = STDOUT_LOG_LEVEL or config["LOGGING"].get(
STDOUT_LOG_LEVEL = STDOUT_LOG_LEVEL \ "STDOUT_LOG_LEVEL", "INFO"
or config['LOGGING'].get('STDOUT_LOG_LEVEL', 'INFO') )
FILE_LOG_LEVEL = FILE_LOG_LEVEL \ FILE_LOG_LEVEL = FILE_LOG_LEVEL or config["LOGGING"].get(
or config['LOGGING'].get('FILE_LOG_LEVEL', 'WARNING') "FILE_LOG_LEVEL", "WARNING"
LOG_DIR = LOG_DIR or config['LOGGING'].get('LOG_DIR') )
CIVICRM_BASE_URL = CIVICRM_BASE_URL or config['CIVICRM']['BASE_URL'] LOG_DIR = LOG_DIR or config["LOGGING"].get("LOG_DIR")
CIVICRM_API_KEY = CIVICRM_API_KEY or config['CIVICRM']['API_KEY'] CIVICRM_BASE_URL = CIVICRM_BASE_URL or config["CIVICRM"]["BASE_URL"]
CIVICRM_BATCH_SIZE = CIVICRM_BATCH_SIZE \ CIVICRM_API_KEY = CIVICRM_API_KEY or config["CIVICRM"]["API_KEY"]
or config['CIVICRM']['BATCH_SIZE'] CIVICRM_BATCH_SIZE = CIVICRM_BATCH_SIZE or config["CIVICRM"]["BATCH_SIZE"]
CIVICRM_RETRIES = CIVICRM_RETRIES \ CIVICRM_RETRIES = CIVICRM_RETRIES or config["CIVICRM"].get("RETRIES", 3)
or config['CIVICRM'].get('RETRIES', 3) CIVICRM_IGNORE_SSL = CIVICRM_IGNORE_SSL or bool(
CIVICRM_IGNORE_SSL = CIVICRM_IGNORE_SSL \ config["CIVICRM"].get("IGNORE_SSL", False)
or bool(config['CIVICRM'].get('IGNORE_SSL', False)) )
NTFY_URL = NTFY_URL or config['NTFY'].get('URL') if 'NTFY' in config else None NTFY_URL = NTFY_URL or config["NTFY"].get("URL") if "NTFY" in config else None
NTFY_TOPIC = NTFY_TOPIC or config['NTFY'].get('TOPIC') if 'NTFY' in config else None NTFY_TOPIC = (
NTFY_ACCESS_TOKEN = NTFY_ACCESS_TOKEN \ NTFY_TOPIC or config["NTFY"].get("TOPIC") if "NTFY" in config else None
or config['NTFY'].get('ACCESS_TOKEN') if 'NTFY' in config else None )
NTFY_ACCESS_TOKEN = (
NTFY_ACCESS_TOKEN or config["NTFY"].get("ACCESS_TOKEN")
if "NTFY" in config
else None
)
# Check if some required values are missing # Check if some required values are missing
required = { required = {
@ -117,11 +169,12 @@ try:
"CIVICRM_API_KEY": CIVICRM_API_KEY, "CIVICRM_API_KEY": CIVICRM_API_KEY,
} }
if len(missing := [k for k, v in required.items() if v is None]) > 0: if len(missing := [k for k, v in required.items() if v is None]) > 0:
raise ValueError('Some required values are missing. ' raise ValueError(
'Please use a configuration file ' "Some required values are missing. "
'or provide all required environment variables. ' "Please use a configuration file "
'Missing: %s' "or provide all required environment variables. "
% ','.join(missing)) "Missing: %s" % ",".join(missing)
)
except Exception as e: except Exception as e:
logger.error(e, exc_info=True) logger.error(e, exc_info=True)