fix: replace IS_SELF_MANAGED with WEBHOOK_ALLOWED_IPS allowlist (#8884)

* fix: replace IS_SELF_MANAGED toggle with explicit WEBHOOK_ALLOWED_IPS allowlist

Instead of blanket-allowing all private IPs on self-managed deployments,
webhook URL validation now blocks all private/internal IPs by default and
only permits specific networks listed in the WEBHOOK_ALLOWED_IPS env
variable (comma-separated IPs/CIDRs).

* fix: address PR review comments for webhook SSRF protection

- Sanitize error messages to avoid leaking internal details to clients
- Guard against TypeError with mixed IPv4/IPv6 allowlist networks
- Re-validate webhook URL at send time to prevent DNS-rebinding
- Add unit tests for mixed-version IP network allowlists
This commit is contained in:
sriram veeraghanta
2026-04-20 15:28:33 +05:30
committed by GitHub
parent ac11c3ef79
commit a8a16c8ba0
5 changed files with 133 additions and 55 deletions
+20 -55
View File
@@ -3,90 +3,55 @@
# See the LICENSE file for details.
# Python imports
import socket
import ipaddress
import logging
from urllib.parse import urlparse
# Third party imports
from rest_framework import serializers
# Django imports
from django.conf import settings
# Module imports
from .base import DynamicBaseSerializer
from plane.db.models import Webhook, WebhookLog
from plane.db.models.webhook import validate_domain, validate_schema
from plane.utils.ip_address import validate_url
logger = logging.getLogger(__name__)
class WebhookSerializer(DynamicBaseSerializer):
url = serializers.URLField(validators=[validate_schema, validate_domain])
def create(self, validated_data):
url = validated_data.get("url", None)
# Extract the hostname from the URL
hostname = urlparse(url).hostname
if not hostname:
raise serializers.ValidationError({"url": "Invalid URL: No hostname found."})
# Resolve the hostname to IP addresses
def _validate_webhook_url(self, url):
"""Validate a webhook URL against SSRF and disallowed domain rules."""
try:
ip_addresses = socket.getaddrinfo(hostname, None)
except socket.gaierror:
raise serializers.ValidationError({"url": "Hostname could not be resolved."})
validate_url(url, allowed_ips=settings.WEBHOOK_ALLOWED_IPS)
except ValueError as e:
logger.warning("Webhook URL validation failed for %s: %s", url, e)
raise serializers.ValidationError({"url": "Invalid or disallowed webhook URL."})
if not ip_addresses:
raise serializers.ValidationError({"url": "No IP addresses found for the hostname."})
hostname = (urlparse(url).hostname or "").rstrip(".").lower()
for addr in ip_addresses:
ip = ipaddress.ip_address(addr[4][0])
if ip.is_private or ip.is_loopback or ip.is_reserved or ip.is_link_local:
raise serializers.ValidationError({"url": "URL resolves to a blocked IP address."})
# Additional validation for multiple request domains and their subdomains
request = self.context.get("request")
disallowed_domains = ["plane.so"] # Add your disallowed domains here
disallowed_domains = ["plane.so"]
if request:
request_host = request.get_host().split(":")[0] # Remove port if present
request_host = request.get_host().split(":")[0].rstrip(".").lower()
disallowed_domains.append(request_host)
# Check if hostname is a subdomain or exact match of any disallowed domain
if any(hostname == domain or hostname.endswith("." + domain) for domain in disallowed_domains):
raise serializers.ValidationError({"url": "URL domain or its subdomain is not allowed."})
def create(self, validated_data):
url = validated_data.get("url", None)
self._validate_webhook_url(url)
return Webhook.objects.create(**validated_data)
def update(self, instance, validated_data):
url = validated_data.get("url", None)
if url:
# Extract the hostname from the URL
hostname = urlparse(url).hostname
if not hostname:
raise serializers.ValidationError({"url": "Invalid URL: No hostname found."})
# Resolve the hostname to IP addresses
try:
ip_addresses = socket.getaddrinfo(hostname, None)
except socket.gaierror:
raise serializers.ValidationError({"url": "Hostname could not be resolved."})
if not ip_addresses:
raise serializers.ValidationError({"url": "No IP addresses found for the hostname."})
for addr in ip_addresses:
ip = ipaddress.ip_address(addr[4][0])
if ip.is_private or ip.is_loopback or ip.is_reserved or ip.is_link_local:
raise serializers.ValidationError({"url": "URL resolves to a blocked IP address."})
# Additional validation for multiple request domains and their subdomains
request = self.context.get("request")
disallowed_domains = ["plane.so"] # Add your disallowed domains here
if request:
request_host = request.get_host().split(":")[0] # Remove port if present
disallowed_domains.append(request_host)
# Check if hostname is a subdomain or exact match of any disallowed domain
if any(hostname == domain or hostname.endswith("." + domain) for domain in disallowed_domains):
raise serializers.ValidationError({"url": "URL domain or its subdomain is not allowed."})
self._validate_webhook_url(url)
return super().update(instance, validated_data)
class Meta:
+4
View File
@@ -52,6 +52,7 @@ from plane.db.models import (
from plane.license.utils.instance_value import get_email_configuration
from plane.utils.email import generate_plain_text_from_html
from plane.utils.exception_logger import log_exception
from plane.utils.ip_address import validate_url
from plane.settings.mongo import MongoConnection
@@ -325,6 +326,9 @@ def webhook_send_task(
return
try:
# Re-validate the webhook URL at send time to prevent DNS-rebinding attacks
validate_url(webhook.url, allowed_ips=settings.WEBHOOK_ALLOWED_IPS)
# Send the webhook event
response = requests.post(webhook.url, headers=headers, json=payload, timeout=30)
+17
View File
@@ -5,6 +5,8 @@
"""Global Settings"""
# Python imports
import ipaddress
import logging
import os
from urllib.parse import urlparse
from urllib.parse import urljoin
@@ -32,6 +34,21 @@ DEBUG = int(os.environ.get("DEBUG", "0"))
# Self-hosted mode
IS_SELF_MANAGED = True
# Webhook IP allowlist — comma-separated IPs or CIDR ranges that are allowed as
# webhook targets even if they resolve to private networks.
# Example: "10.0.0.0/8,192.168.1.0/24,172.16.0.5"
_webhook_allowed_ips_raw = os.environ.get("WEBHOOK_ALLOWED_IPS", "")
WEBHOOK_ALLOWED_IPS = []
_logger = logging.getLogger("plane")
for _cidr in _webhook_allowed_ips_raw.split(","):
_cidr = _cidr.strip()
if not _cidr:
continue
try:
WEBHOOK_ALLOWED_IPS.append(ipaddress.ip_network(_cidr, strict=False))
except ValueError:
_logger.warning("WEBHOOK_ALLOWED_IPS: skipping invalid entry %r", _cidr)
# Allowed Hosts
ALLOWED_HOSTS = os.environ.get("ALLOWED_HOSTS", "*").split(",")
@@ -2,9 +2,12 @@
# SPDX-License-Identifier: AGPL-3.0-only
# See the LICENSE file for details.
import ipaddress
import pytest
from unittest.mock import patch, MagicMock
from plane.bgtasks.work_item_link_task import safe_get, validate_url_ip
from plane.utils.ip_address import validate_url
def _make_response(status_code=200, headers=None, is_redirect=False, content=b""):
@@ -43,6 +46,49 @@ class TestValidateUrlIp:
validate_url_ip("https://example.com") # Should not raise
@pytest.mark.unit
class TestValidateUrlAllowlist:
"""Test validate_url allowlist permits specific private IPs."""
def test_allowlist_permits_private_ip(self):
allowed = [ipaddress.ip_network("192.168.1.0/24")]
with patch("plane.utils.ip_address.socket.getaddrinfo") as mock_dns:
mock_dns.return_value = [(None, None, None, None, ("192.168.1.50", 0))]
validate_url("http://example.com", allowed_ips=allowed) # Should not raise
def test_allowlist_does_not_permit_other_private_ip(self):
allowed = [ipaddress.ip_network("192.168.1.0/24")]
with patch("plane.utils.ip_address.socket.getaddrinfo") as mock_dns:
mock_dns.return_value = [(None, None, None, None, ("10.0.0.1", 0))]
with pytest.raises(ValueError, match="private/internal"):
validate_url("http://example.com", allowed_ips=allowed)
def test_allowlist_permits_loopback_when_explicitly_allowed(self):
allowed = [ipaddress.ip_network("127.0.0.0/8")]
with patch("plane.utils.ip_address.socket.getaddrinfo") as mock_dns:
mock_dns.return_value = [(None, None, None, None, ("127.0.0.1", 0))]
validate_url("http://example.com", allowed_ips=allowed) # Should not raise
def test_allowlist_permits_matching_ipv4_with_mixed_version_networks(self):
allowed = [
ipaddress.ip_network("2001:db8::/32"),
ipaddress.ip_network("192.168.1.0/24"),
]
with patch("plane.utils.ip_address.socket.getaddrinfo") as mock_dns:
mock_dns.return_value = [(None, None, None, None, ("192.168.1.50", 0))]
validate_url("http://example.com", allowed_ips=allowed) # Should not raise
def test_allowlist_blocks_non_matching_ipv4_with_mixed_version_networks(self):
allowed = [
ipaddress.ip_network("2001:db8::/32"),
ipaddress.ip_network("192.168.1.0/24"),
]
with patch("plane.utils.ip_address.socket.getaddrinfo") as mock_dns:
mock_dns.return_value = [(None, None, None, None, ("10.0.0.1", 0))]
with pytest.raises(ValueError, match="private/internal"):
validate_url("http://example.com", allowed_ips=allowed)
@pytest.mark.unit
class TestSafeGet:
"""Test safe_get follows redirects safely and blocks SSRF."""
+46
View File
@@ -2,6 +2,52 @@
# SPDX-License-Identifier: AGPL-3.0-only
# See the LICENSE file for details.
# Python imports
import ipaddress
import socket
from urllib.parse import urlparse
def validate_url(url, allowed_ips=None):
"""
Validate that a URL doesn't resolve to a private/internal IP address (SSRF protection).
Args:
url: The URL to validate.
allowed_ips: Optional list of ipaddress.ip_network objects. IPs falling within
these networks are permitted even if they are private/loopback/reserved.
Typically sourced from the WEBHOOK_ALLOWED_IPS setting.
Raises:
ValueError: If the URL is invalid or resolves to a blocked IP.
"""
parsed = urlparse(url)
hostname = parsed.hostname
if not hostname:
raise ValueError("Invalid URL: No hostname found")
if parsed.scheme not in ("http", "https"):
raise ValueError("Invalid URL scheme. Only HTTP and HTTPS are allowed")
try:
addr_info = socket.getaddrinfo(hostname, None)
except socket.gaierror:
raise ValueError("Hostname could not be resolved")
if not addr_info:
raise ValueError("No IP addresses found for the hostname")
for addr in addr_info:
ip = ipaddress.ip_address(addr[4][0])
if ip.is_private or ip.is_loopback or ip.is_reserved or ip.is_link_local:
if allowed_ips and any(
network.version == ip.version and ip in network for network in allowed_ips
):
continue
raise ValueError("Access to private/internal networks is not allowed")
def get_client_ip(request):
x_forwarded_for = request.META.get("HTTP_X_FORWARDED_FOR")
if x_forwarded_for: