Files
doorman/backend-services/utils/graphql_util.py
seniorswe eeb18f427c fixes
2026-01-24 23:41:01 -05:00

304 lines
7.7 KiB
Python

"""
GraphQL Utility Functions
Provides:
- Query depth calculation and limiting
- Schema introspection fetching and caching
- GraphQL query validation helpers
"""
import json
import logging
import re
from typing import Any
import httpx
logger = logging.getLogger('doorman.gateway')
# Default maximum query depth
DEFAULT_MAX_DEPTH = 10
def calculate_query_depth(query: str) -> int:
"""
Calculate the maximum nesting depth of a GraphQL query.
Uses regex-based parsing for performance (no full AST parsing required).
Args:
query: GraphQL query string
Returns:
Maximum nesting depth (1 = flat query like "{ users }")
"""
if not query or not isinstance(query, str):
return 0
# Remove comments (line-by-line to avoid ReDoS on pathological inputs)
try:
query = '\n'.join(line.split('#', 1)[0] for line in query.splitlines())
except Exception:
pass
# Remove string literals to avoid counting braces in strings
query = re.sub(r'"[^"]*"', '""', query)
query = re.sub(r"'[^']*'", "''", query)
max_depth = 0
current_depth = 0
for char in query:
if char == '{':
current_depth += 1
max_depth = max(max_depth, current_depth)
elif char == '}':
current_depth = max(0, current_depth - 1)
return max_depth
def validate_query_depth(query: str, max_depth: int | None = None) -> tuple[bool, str | None]:
"""
Validate that a query does not exceed maximum depth.
Args:
query: GraphQL query string
max_depth: Maximum allowed depth (uses DEFAULT_MAX_DEPTH if None)
Returns:
Tuple of (is_valid, error_message)
"""
if max_depth is None:
max_depth = DEFAULT_MAX_DEPTH
if max_depth <= 0:
# Depth limiting disabled
return True, None
depth = calculate_query_depth(query)
if depth > max_depth:
return False, f'Query depth {depth} exceeds maximum allowed depth of {max_depth}'
return True, None
def estimate_query_complexity(query: str) -> int:
"""
Estimate query complexity based on field count.
This is a simple heuristic - counts field-like patterns.
Args:
query: GraphQL query string
Returns:
Estimated complexity score
"""
if not query:
return 0
# Remove comments (line-by-line) and strings
try:
query = '\n'.join(line.split('#', 1)[0] for line in query.splitlines())
except Exception:
pass
query = re.sub(r'"[^"]*"', '', query)
# Count field names (words before { or after { or before })
fields = re.findall(r'\b[a-zA-Z_][a-zA-Z0-9_]*\b', query)
# Remove common keywords
keywords = {'query', 'mutation', 'subscription', 'fragment', 'on', 'true', 'false', 'null'}
fields = [f for f in fields if f.lower() not in keywords]
return len(fields)
# Standard introspection query
INTROSPECTION_QUERY = '''
query IntrospectionQuery {
__schema {
queryType { name }
mutationType { name }
subscriptionType { name }
types {
kind
name
description
fields(includeDeprecated: true) {
name
description
args { name type { name kind } }
type { name kind ofType { name kind } }
}
}
}
}
'''
async def fetch_introspection_schema(
url: str,
headers: dict | None = None,
timeout: float = 30.0,
) -> dict | None:
"""
Fetch GraphQL schema via introspection query.
Args:
url: GraphQL endpoint URL
headers: Optional headers for authentication
timeout: Request timeout
Returns:
Schema dictionary or None if failed
"""
try:
request_headers = {
'Content-Type': 'application/json',
'Accept': 'application/json',
}
if headers:
request_headers.update(headers)
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.post(
url,
json={'query': INTROSPECTION_QUERY},
headers=request_headers,
)
if response.status_code != 200:
logger.warning(f'Introspection failed: HTTP {response.status_code}')
return None
data = response.json()
if 'errors' in data and data['errors']:
logger.warning(f"Introspection errors: {data['errors']}")
return None
return data.get('data', {}).get('__schema')
except Exception as e:
logger.error(f'Error fetching GraphQL schema: {e}')
return None
def extract_types_from_schema(schema: dict) -> list[dict]:
"""
Extract type definitions from introspection schema.
Args:
schema: Introspection schema dictionary
Returns:
List of type definitions with fields
"""
if not schema:
return []
types = []
for type_def in schema.get('types', []):
# Skip introspection types
if type_def.get('name', '').startswith('__'):
continue
types.append({
'name': type_def.get('name'),
'kind': type_def.get('kind'),
'description': type_def.get('description'),
'fields': [
{
'name': f.get('name'),
'type': f.get('type', {}).get('name') or f.get('type', {}).get('kind'),
}
for f in (type_def.get('fields') or [])
],
})
return types
def get_operation_type(schema: dict) -> dict:
"""
Extract operation types (query, mutation, subscription) from schema.
Args:
schema: Introspection schema dictionary
Returns:
Dict with operation type names
"""
if not schema:
return {}
return {
'query': schema.get('queryType', {}).get('name') if schema.get('queryType') else None,
'mutation': schema.get('mutationType', {}).get('name') if schema.get('mutationType') else None,
'subscription': schema.get('subscriptionType', {}).get('name') if schema.get('subscriptionType') else None,
}
def has_subscription_support(schema: dict) -> bool:
"""
Check if schema supports subscriptions.
Args:
schema: Introspection schema dictionary
Returns:
True if subscriptions are supported
"""
if not schema:
return False
sub_type = schema.get('subscriptionType')
return sub_type is not None and sub_type.get('name') is not None
def detect_operation_type(query: str) -> str:
"""
Detect the operation type from a GraphQL query.
Args:
query: GraphQL query string
Returns:
'query', 'mutation', 'subscription', or 'unknown'
"""
if not query:
return 'unknown'
# Remove comments
query = re.sub(r'#.*$', '', query, flags=re.MULTILINE)
query = query.strip()
# Check for explicit operation type
if query.startswith('mutation'):
return 'mutation'
elif query.startswith('subscription'):
return 'subscription'
elif query.startswith('query'):
return 'query'
elif query.startswith('{'):
# Anonymous query
return 'query'
return 'unknown'
def is_subscription_operation(query: str) -> bool:
"""
Check if a query is a subscription operation.
Args:
query: GraphQL query string
Returns:
True if this is a subscription
"""
return detect_operation_type(query) == 'subscription'