mirror of
https://github.com/apidoorman/doorman.git
synced 2026-02-07 17:48:53 -06:00
304 lines
7.7 KiB
Python
304 lines
7.7 KiB
Python
"""
|
|
GraphQL Utility Functions
|
|
|
|
Provides:
|
|
- Query depth calculation and limiting
|
|
- Schema introspection fetching and caching
|
|
- GraphQL query validation helpers
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import re
|
|
from typing import Any
|
|
|
|
import httpx
|
|
|
|
logger = logging.getLogger('doorman.gateway')
|
|
|
|
# Default maximum query depth
|
|
DEFAULT_MAX_DEPTH = 10
|
|
|
|
|
|
def calculate_query_depth(query: str) -> int:
|
|
"""
|
|
Calculate the maximum nesting depth of a GraphQL query.
|
|
|
|
Uses regex-based parsing for performance (no full AST parsing required).
|
|
|
|
Args:
|
|
query: GraphQL query string
|
|
|
|
Returns:
|
|
Maximum nesting depth (1 = flat query like "{ users }")
|
|
"""
|
|
if not query or not isinstance(query, str):
|
|
return 0
|
|
|
|
# Remove comments (line-by-line to avoid ReDoS on pathological inputs)
|
|
try:
|
|
query = '\n'.join(line.split('#', 1)[0] for line in query.splitlines())
|
|
except Exception:
|
|
pass
|
|
|
|
# Remove string literals to avoid counting braces in strings
|
|
query = re.sub(r'"[^"]*"', '""', query)
|
|
query = re.sub(r"'[^']*'", "''", query)
|
|
|
|
max_depth = 0
|
|
current_depth = 0
|
|
|
|
for char in query:
|
|
if char == '{':
|
|
current_depth += 1
|
|
max_depth = max(max_depth, current_depth)
|
|
elif char == '}':
|
|
current_depth = max(0, current_depth - 1)
|
|
|
|
return max_depth
|
|
|
|
|
|
def validate_query_depth(query: str, max_depth: int | None = None) -> tuple[bool, str | None]:
|
|
"""
|
|
Validate that a query does not exceed maximum depth.
|
|
|
|
Args:
|
|
query: GraphQL query string
|
|
max_depth: Maximum allowed depth (uses DEFAULT_MAX_DEPTH if None)
|
|
|
|
Returns:
|
|
Tuple of (is_valid, error_message)
|
|
"""
|
|
if max_depth is None:
|
|
max_depth = DEFAULT_MAX_DEPTH
|
|
|
|
if max_depth <= 0:
|
|
# Depth limiting disabled
|
|
return True, None
|
|
|
|
depth = calculate_query_depth(query)
|
|
|
|
if depth > max_depth:
|
|
return False, f'Query depth {depth} exceeds maximum allowed depth of {max_depth}'
|
|
|
|
return True, None
|
|
|
|
|
|
def estimate_query_complexity(query: str) -> int:
|
|
"""
|
|
Estimate query complexity based on field count.
|
|
|
|
This is a simple heuristic - counts field-like patterns.
|
|
|
|
Args:
|
|
query: GraphQL query string
|
|
|
|
Returns:
|
|
Estimated complexity score
|
|
"""
|
|
if not query:
|
|
return 0
|
|
|
|
# Remove comments (line-by-line) and strings
|
|
try:
|
|
query = '\n'.join(line.split('#', 1)[0] for line in query.splitlines())
|
|
except Exception:
|
|
pass
|
|
query = re.sub(r'"[^"]*"', '', query)
|
|
|
|
# Count field names (words before { or after { or before })
|
|
fields = re.findall(r'\b[a-zA-Z_][a-zA-Z0-9_]*\b', query)
|
|
|
|
# Remove common keywords
|
|
keywords = {'query', 'mutation', 'subscription', 'fragment', 'on', 'true', 'false', 'null'}
|
|
fields = [f for f in fields if f.lower() not in keywords]
|
|
|
|
return len(fields)
|
|
|
|
|
|
# Standard introspection query
|
|
INTROSPECTION_QUERY = '''
|
|
query IntrospectionQuery {
|
|
__schema {
|
|
queryType { name }
|
|
mutationType { name }
|
|
subscriptionType { name }
|
|
types {
|
|
kind
|
|
name
|
|
description
|
|
fields(includeDeprecated: true) {
|
|
name
|
|
description
|
|
args { name type { name kind } }
|
|
type { name kind ofType { name kind } }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
'''
|
|
|
|
|
|
async def fetch_introspection_schema(
|
|
url: str,
|
|
headers: dict | None = None,
|
|
timeout: float = 30.0,
|
|
) -> dict | None:
|
|
"""
|
|
Fetch GraphQL schema via introspection query.
|
|
|
|
Args:
|
|
url: GraphQL endpoint URL
|
|
headers: Optional headers for authentication
|
|
timeout: Request timeout
|
|
|
|
Returns:
|
|
Schema dictionary or None if failed
|
|
"""
|
|
try:
|
|
request_headers = {
|
|
'Content-Type': 'application/json',
|
|
'Accept': 'application/json',
|
|
}
|
|
if headers:
|
|
request_headers.update(headers)
|
|
|
|
async with httpx.AsyncClient(timeout=timeout) as client:
|
|
response = await client.post(
|
|
url,
|
|
json={'query': INTROSPECTION_QUERY},
|
|
headers=request_headers,
|
|
)
|
|
|
|
if response.status_code != 200:
|
|
logger.warning(f'Introspection failed: HTTP {response.status_code}')
|
|
return None
|
|
|
|
data = response.json()
|
|
|
|
if 'errors' in data and data['errors']:
|
|
logger.warning(f"Introspection errors: {data['errors']}")
|
|
return None
|
|
|
|
return data.get('data', {}).get('__schema')
|
|
|
|
except Exception as e:
|
|
logger.error(f'Error fetching GraphQL schema: {e}')
|
|
return None
|
|
|
|
|
|
def extract_types_from_schema(schema: dict) -> list[dict]:
|
|
"""
|
|
Extract type definitions from introspection schema.
|
|
|
|
Args:
|
|
schema: Introspection schema dictionary
|
|
|
|
Returns:
|
|
List of type definitions with fields
|
|
"""
|
|
if not schema:
|
|
return []
|
|
|
|
types = []
|
|
for type_def in schema.get('types', []):
|
|
# Skip introspection types
|
|
if type_def.get('name', '').startswith('__'):
|
|
continue
|
|
|
|
types.append({
|
|
'name': type_def.get('name'),
|
|
'kind': type_def.get('kind'),
|
|
'description': type_def.get('description'),
|
|
'fields': [
|
|
{
|
|
'name': f.get('name'),
|
|
'type': f.get('type', {}).get('name') or f.get('type', {}).get('kind'),
|
|
}
|
|
for f in (type_def.get('fields') or [])
|
|
],
|
|
})
|
|
|
|
return types
|
|
|
|
|
|
def get_operation_type(schema: dict) -> dict:
|
|
"""
|
|
Extract operation types (query, mutation, subscription) from schema.
|
|
|
|
Args:
|
|
schema: Introspection schema dictionary
|
|
|
|
Returns:
|
|
Dict with operation type names
|
|
"""
|
|
if not schema:
|
|
return {}
|
|
|
|
return {
|
|
'query': schema.get('queryType', {}).get('name') if schema.get('queryType') else None,
|
|
'mutation': schema.get('mutationType', {}).get('name') if schema.get('mutationType') else None,
|
|
'subscription': schema.get('subscriptionType', {}).get('name') if schema.get('subscriptionType') else None,
|
|
}
|
|
|
|
|
|
def has_subscription_support(schema: dict) -> bool:
|
|
"""
|
|
Check if schema supports subscriptions.
|
|
|
|
Args:
|
|
schema: Introspection schema dictionary
|
|
|
|
Returns:
|
|
True if subscriptions are supported
|
|
"""
|
|
if not schema:
|
|
return False
|
|
|
|
sub_type = schema.get('subscriptionType')
|
|
return sub_type is not None and sub_type.get('name') is not None
|
|
|
|
|
|
def detect_operation_type(query: str) -> str:
|
|
"""
|
|
Detect the operation type from a GraphQL query.
|
|
|
|
Args:
|
|
query: GraphQL query string
|
|
|
|
Returns:
|
|
'query', 'mutation', 'subscription', or 'unknown'
|
|
"""
|
|
if not query:
|
|
return 'unknown'
|
|
|
|
# Remove comments
|
|
query = re.sub(r'#.*$', '', query, flags=re.MULTILINE)
|
|
query = query.strip()
|
|
|
|
# Check for explicit operation type
|
|
if query.startswith('mutation'):
|
|
return 'mutation'
|
|
elif query.startswith('subscription'):
|
|
return 'subscription'
|
|
elif query.startswith('query'):
|
|
return 'query'
|
|
elif query.startswith('{'):
|
|
# Anonymous query
|
|
return 'query'
|
|
|
|
return 'unknown'
|
|
|
|
|
|
def is_subscription_operation(query: str) -> bool:
|
|
"""
|
|
Check if a query is a subscription operation.
|
|
|
|
Args:
|
|
query: GraphQL query string
|
|
|
|
Returns:
|
|
True if this is a subscription
|
|
"""
|
|
return detect_operation_type(query) == 'subscription'
|