MOON
Server: Apache
System: Linux nserver.cafsindia.com 4.18.0-553.104.1.lve.el8.x86_64 #1 SMP Tue Feb 10 20:07:30 UTC 2026 x86_64
User: cafsindia (1002)
PHP: 8.2.30
Disabled: NONE
Upload Files
File: //opt/imunify360/venv/lib64/python3.11/site-packages/im360/subsys/whitelist_rbl.py
"""

This module provides functions for exporting whitelist for
Real-time Blackhole List (RBL).

"""
import itertools
import ipaddress
import logging
import os

from pathlib import Path
from typing import Optional

from defence360agent.subsys.panels.base import PanelException
from defence360agent.subsys import web_server
from defence360agent.utils import (
    COPY_TO_MODSEC_MAXTRIES,
    atomic_rewrite,
    check_run,
    CheckRunError,
    log_failed_to_copy_to_modsec,
    recurring_check,
    retry_on,
)
from defence360agent.utils.common import CoalesceCalls
from im360.subsys.int_config import is_force_use_coraza
from im360.subsys.panels.base import use_modsec_lock
from im360.subsys.panels.hosting_panel import HostingPanel
from im360.model.global_whitelist import GlobalWhitelist
from im360.model.custom_lists import CustomWhitelist
from im360.internals.core.ipset.ip import IPSetWhiteFullAccess, IPSetWhite

logger = logging.getLogger(__name__)


async def reload_wafd():
    if Path("/usr/bin/imunify360-wsctl").is_file():
        args = ["imunify360-wsctl", "reload"]
    else:
        args = ["systemctl", "reload", "imunify360-wafd"]
    try:
        await check_run(args)
    except CheckRunError:
        logger.warning("Failed to reload 'imunify360-wafd'")


#: how often to check the rbl_whitelist file
POLLING_PERIOD = 60  # seconds

#: Dedicated coalescer for Apache reloads triggered by create_rbl_whitelist.
#: Independent from the shared 300 s webserver_gracefull_restart — we want a
#: longer default window for the RBL path, and we don't want our throttle to
#: interfere with other reload callers.
_rbl_reload_coalescer = CoalesceCalls()


async def _schedule_apache_reload_for_rbl():
    """Schedule an Apache graceful reload on behalf of create_rbl_whitelist.

    Env-var tunables (read on every call so support can flip them via
    ``systemctl set-environment`` + ``systemctl restart imunify360`` without
    re-deploy):

    * ``IM360_RBL_RELOAD_DISABLED=1`` — suppress the reload entirely. The file
      on disk still advances; mod_security picks up changes on the next reload
      from any other code path (vendor update, panel integration, etc.).
    * ``IM360_RBL_RELOAD_MIN_PERIOD=<seconds>`` — minimum interval between
      reloads from this code path. Default 600 (10 min).
    """
    if os.environ.get("IM360_RBL_RELOAD_DISABLED", "").lower() in (
        "1",
        "true",
        "yes",
    ):
        logger.info(
            "Apache reload from create_rbl_whitelist suppressed "
            "(IM360_RBL_RELOAD_DISABLED is set)"
        )
        return
    # Read at call time (not import time) so operators can retune without a
    # redeploy. Fall back to the default on a non-numeric value instead of
    # letting ValueError propagate — the file on disk has already been
    # rewritten at this point, and a raised exception here would also skip
    # the Coraza/wafd reload path below, leaving mod_security with stale
    # data until something else triggered a reload.
    raw_period = os.environ.get("IM360_RBL_RELOAD_MIN_PERIOD")
    try:
        period = max(0, int(raw_period)) if raw_period is not None else 10 * 60
    except ValueError:
        logger.warning(
            "Invalid IM360_RBL_RELOAD_MIN_PERIOD=%r; "
            "falling back to default %d s",
            raw_period,
            10 * 60,
        )
        period = 10 * 60
    # Building the wrapper on each call is cheap; state lives on
    # _rbl_reload_coalescer, not on the wrapper — coalescing across calls
    # behaves identically.
    coalesced = _rbl_reload_coalescer.coalesce_calls(period)(
        web_server._graceful_restart
    )
    await coalesced()


async def _get_whitelists_data():
    global_white_list = await GlobalWhitelist.load()
    full_access_white_list = (
        item["ip"] for item in IPSetWhiteFullAccess().query_all()
    )
    # ignore "whitelisted by passing captcha" ips (DEF-13665)
    manual_white_list = IPSetWhite().get_non_captcha_passed_ips()
    custom_white_list = await CustomWhitelist.load()
    return itertools.chain(
        global_white_list,
        full_access_white_list,
        manual_white_list,
        custom_white_list,
    )


@use_modsec_lock
@retry_on(
    FileNotFoundError,
    max_tries=COPY_TO_MODSEC_MAXTRIES,
    on_error=log_failed_to_copy_to_modsec,
    silent=True,
)
async def create_rbl_whitelist():
    rbl_whitelist_path = await _get_rbl_whitelist_path()
    if not rbl_whitelist_path:
        return
    whitelist_chain = await _get_whitelists_data()
    whitelist_chain = _convert_ip_addresses(whitelist_chain)
    new_whitelist = list(whitelist_chain)
    current_whitelist = _read_whitelist_from_file(rbl_whitelist_path)

    if set(new_whitelist) != set(current_whitelist):
        logger.info("Create RBL whitelist: %s", rbl_whitelist_path)
        text = "\n".join(sorted(new_whitelist))
        # rbl_whitelist is a data file read by an @ipMatchFromFile SecRule —
        # safe_update_config's configtest/revert machinery is for Apache config
        # files and doesn't apply here. We write directly so we can attach our
        # own longer-window coalescer to the reload step (DEF-41807).
        if atomic_rewrite(str(rbl_whitelist_path), text, backup=False):
            logger.info("RBL whitelist was successfully updated")
            await _schedule_apache_reload_for_rbl()
            # Sort of a workaround to avoid redundant imports which can
            # cause circular dependencies
            if (
                is_force_use_coraza()
                or HostingPanel().__class__.__name__ == "cPanelCoraza"
            ):
                logger.info(
                    "Reloading 'imunify360-wafd' as coraza ruleset is in"
                    " action"
                )
                await reload_wafd()
    else:
        logger.info("No changes in RBL whitelist, no restart required")


@recurring_check(POLLING_PERIOD)
async def ensure_rbl_whitelist():
    """Make sure rbl_whitelist is not empty."""
    rbl_whitelist_path = await _get_rbl_whitelist_path()
    if not rbl_whitelist_path:
        return  # do nothing at this time

    try:
        empty = not os.path.getsize(str(rbl_whitelist_path))
    except FileNotFoundError:
        return
    else:
        if empty:
            await create_rbl_whitelist()  # recreate


def _convert_ip_addresses(iterable):
    for ip in iterable:
        ip = ipaddress.ip_network(ip)

        # RBL whitelist can't handle /32 nets.
        # we need to convert /32 nets to ips
        if ip.num_addresses == 1:
            ip = ipaddress.ip_address(ip.network_address)
        yield str(ip)


async def _get_rbl_whitelist_path() -> Optional[Path]:
    """RBL whitelist stored in ModSec ruleset directory,
    returns Path for RBL whitelist file, or None if panel errors, or modsec
    rulest dir doesn't exists.

    """
    try:
        rbl_whitelist_path = await HostingPanel().get_rbl_whitelist_path()
    except PanelException as e:
        logging.warning("Can't create rbl whitelist: %s", e)
        return None

    if not rbl_whitelist_path:
        logger.info("RBL whitelist path is undefined. Creation skipped")
    return rbl_whitelist_path


def _read_whitelist_from_file(rbl_whitelist_path):
    logger.info("Read RBL whitelist: %s", rbl_whitelist_path)
    try:
        # `rbl_whitelist_path.open()` does not have to raise FileNotFoundError,
        # it might be, for example, OSError in the case of `py.path.local`
        #
        with open(str(rbl_whitelist_path), "r") as f:
            yield from map(str.strip, f)
    except FileNotFoundError:
        logger.info("RBL whitelist doest not exist: %s", rbl_whitelist_path)