File: //opt/imunify360/venv/lib/python3.11/site-packages/im360/subsys/whitelist_rbl.py
"""
This module provides functions for exporting whitelist for
Real-time Blackhole List (RBL).
"""
import itertools
import ipaddress
import logging
import os
from pathlib import Path
from typing import Optional
from defence360agent.subsys.panels.base import PanelException
from defence360agent.subsys import web_server
from defence360agent.utils import (
COPY_TO_MODSEC_MAXTRIES,
atomic_rewrite,
check_run,
CheckRunError,
log_failed_to_copy_to_modsec,
recurring_check,
retry_on,
)
from defence360agent.utils.common import CoalesceCalls
from im360.subsys.int_config import is_force_use_coraza
from im360.subsys.panels.base import use_modsec_lock
from im360.subsys.panels.hosting_panel import HostingPanel
from im360.model.global_whitelist import GlobalWhitelist
from im360.model.custom_lists import CustomWhitelist
from im360.internals.core.ipset.ip import IPSetWhiteFullAccess, IPSetWhite
logger = logging.getLogger(__name__)
async def reload_wafd():
if Path("/usr/bin/imunify360-wsctl").is_file():
args = ["imunify360-wsctl", "reload"]
else:
args = ["systemctl", "reload", "imunify360-wafd"]
try:
await check_run(args)
except CheckRunError:
logger.warning("Failed to reload 'imunify360-wafd'")
#: how often to check the rbl_whitelist file
POLLING_PERIOD = 60 # seconds
#: Dedicated coalescer for Apache reloads triggered by create_rbl_whitelist.
#: Independent from the shared 300 s webserver_gracefull_restart — we want a
#: longer default window for the RBL path, and we don't want our throttle to
#: interfere with other reload callers.
_rbl_reload_coalescer = CoalesceCalls()
async def _schedule_apache_reload_for_rbl():
"""Schedule an Apache graceful reload on behalf of create_rbl_whitelist.
Env-var tunables (read on every call so support can flip them via
``systemctl set-environment`` + ``systemctl restart imunify360`` without
re-deploy):
* ``IM360_RBL_RELOAD_DISABLED=1`` — suppress the reload entirely. The file
on disk still advances; mod_security picks up changes on the next reload
from any other code path (vendor update, panel integration, etc.).
* ``IM360_RBL_RELOAD_MIN_PERIOD=<seconds>`` — minimum interval between
reloads from this code path. Default 600 (10 min).
"""
if os.environ.get("IM360_RBL_RELOAD_DISABLED", "").lower() in (
"1",
"true",
"yes",
):
logger.info(
"Apache reload from create_rbl_whitelist suppressed "
"(IM360_RBL_RELOAD_DISABLED is set)"
)
return
# Read at call time (not import time) so operators can retune without a
# redeploy. Fall back to the default on a non-numeric value instead of
# letting ValueError propagate — the file on disk has already been
# rewritten at this point, and a raised exception here would also skip
# the Coraza/wafd reload path below, leaving mod_security with stale
# data until something else triggered a reload.
raw_period = os.environ.get("IM360_RBL_RELOAD_MIN_PERIOD")
try:
period = max(0, int(raw_period)) if raw_period is not None else 10 * 60
except ValueError:
logger.warning(
"Invalid IM360_RBL_RELOAD_MIN_PERIOD=%r; "
"falling back to default %d s",
raw_period,
10 * 60,
)
period = 10 * 60
# Building the wrapper on each call is cheap; state lives on
# _rbl_reload_coalescer, not on the wrapper — coalescing across calls
# behaves identically.
coalesced = _rbl_reload_coalescer.coalesce_calls(period)(
web_server._graceful_restart
)
await coalesced()
async def _get_whitelists_data():
global_white_list = await GlobalWhitelist.load()
full_access_white_list = (
item["ip"] for item in IPSetWhiteFullAccess().query_all()
)
# ignore "whitelisted by passing captcha" ips (DEF-13665)
manual_white_list = IPSetWhite().get_non_captcha_passed_ips()
custom_white_list = await CustomWhitelist.load()
return itertools.chain(
global_white_list,
full_access_white_list,
manual_white_list,
custom_white_list,
)
@use_modsec_lock
@retry_on(
FileNotFoundError,
max_tries=COPY_TO_MODSEC_MAXTRIES,
on_error=log_failed_to_copy_to_modsec,
silent=True,
)
async def create_rbl_whitelist():
rbl_whitelist_path = await _get_rbl_whitelist_path()
if not rbl_whitelist_path:
return
whitelist_chain = await _get_whitelists_data()
whitelist_chain = _convert_ip_addresses(whitelist_chain)
new_whitelist = list(whitelist_chain)
current_whitelist = _read_whitelist_from_file(rbl_whitelist_path)
if set(new_whitelist) != set(current_whitelist):
logger.info("Create RBL whitelist: %s", rbl_whitelist_path)
text = "\n".join(sorted(new_whitelist))
# rbl_whitelist is a data file read by an @ipMatchFromFile SecRule —
# safe_update_config's configtest/revert machinery is for Apache config
# files and doesn't apply here. We write directly so we can attach our
# own longer-window coalescer to the reload step (DEF-41807).
if atomic_rewrite(str(rbl_whitelist_path), text, backup=False):
logger.info("RBL whitelist was successfully updated")
await _schedule_apache_reload_for_rbl()
# Sort of a workaround to avoid redundant imports which can
# cause circular dependencies
if (
is_force_use_coraza()
or HostingPanel().__class__.__name__ == "cPanelCoraza"
):
logger.info(
"Reloading 'imunify360-wafd' as coraza ruleset is in"
" action"
)
await reload_wafd()
else:
logger.info("No changes in RBL whitelist, no restart required")
@recurring_check(POLLING_PERIOD)
async def ensure_rbl_whitelist():
"""Make sure rbl_whitelist is not empty."""
rbl_whitelist_path = await _get_rbl_whitelist_path()
if not rbl_whitelist_path:
return # do nothing at this time
try:
empty = not os.path.getsize(str(rbl_whitelist_path))
except FileNotFoundError:
return
else:
if empty:
await create_rbl_whitelist() # recreate
def _convert_ip_addresses(iterable):
for ip in iterable:
ip = ipaddress.ip_network(ip)
# RBL whitelist can't handle /32 nets.
# we need to convert /32 nets to ips
if ip.num_addresses == 1:
ip = ipaddress.ip_address(ip.network_address)
yield str(ip)
async def _get_rbl_whitelist_path() -> Optional[Path]:
"""RBL whitelist stored in ModSec ruleset directory,
returns Path for RBL whitelist file, or None if panel errors, or modsec
rulest dir doesn't exists.
"""
try:
rbl_whitelist_path = await HostingPanel().get_rbl_whitelist_path()
except PanelException as e:
logging.warning("Can't create rbl whitelist: %s", e)
return None
if not rbl_whitelist_path:
logger.info("RBL whitelist path is undefined. Creation skipped")
return rbl_whitelist_path
def _read_whitelist_from_file(rbl_whitelist_path):
logger.info("Read RBL whitelist: %s", rbl_whitelist_path)
try:
# `rbl_whitelist_path.open()` does not have to raise FileNotFoundError,
# it might be, for example, OSError in the case of `py.path.local`
#
with open(str(rbl_whitelist_path), "r") as f:
yield from map(str.strip, f)
except FileNotFoundError:
logger.info("RBL whitelist doest not exist: %s", rbl_whitelist_path)