Files
doorman/user-docs/examples/tests/test_api_key_rotation.py
seniorswe ab3bae0975 cleanup
2025-10-15 01:25:18 -04:00

397 lines
15 KiB
Python

"""
Test API Key Rotation with Grace Period
Documents the implementation of zero-downtime API key rotation for credit system
"""
def test_api_key_rotation_implementation():
"""Test API key rotation patterns"""
print("API Key Rotation with Grace Period - Implementation")
print("=" * 70)
print()
print("P1 Security Enhancement:")
print(" No mechanism to rotate API keys without downtime")
print(" → Key compromise requires service interruption")
print(" → Cannot rotate leaked keys gracefully")
print(" → Upstream services must update keys instantly")
print()
print("=" * 70)
print()
print("Implementation Locations:")
print()
locations = [
{
'file': 'models/credit_model.py',
'lines': '8, 10, 29-31',
'change': 'Added Optional, datetime imports and rotation fields',
'fields': ['api_key_new', 'api_key_rotation_expires']
},
{
'file': 'utils/credit_util.py',
'lines': '4, 32-89',
'change': 'Updated get_credit_api_header() to support rotation',
'behavior': 'Returns [header, [old_key, new_key]] during rotation'
},
{
'file': 'services/credit_service.py',
'lines': '60-61, 117-118',
'change': 'Encrypt api_key_new in create and update methods',
'security': 'New keys encrypted like primary keys'
}
]
for i, loc in enumerate(locations, 1):
print(f"{i}. {loc['file']}")
print(f" Lines: {loc['lines']}")
print(f" Change: {loc['change']}")
if 'fields' in loc:
print(f" Fields: {', '.join(loc['fields'])}")
if 'behavior' in loc:
print(f" Behavior: {loc['behavior']}")
if 'security' in loc:
print(f" Security: {loc['security']}")
print()
print("=" * 70)
print()
print("Rotation Fields:")
print()
print(" api_key_new: Optional[str]")
print(" - New API key to be activated after rotation")
print(" - Encrypted in database like primary api_key")
print(" - None when no rotation active")
print()
print(" api_key_rotation_expires: Optional[datetime]")
print(" - Expiration time for old key (grace period end)")
print(" - After this time, old key becomes invalid")
print(" - None when no rotation active")
print()
print("=" * 70)
print()
print("Rotation States:")
print()
states = [
{
'state': 'No Rotation',
'fields': 'api_key_new=None, rotation_expires=None',
'returns': '[header, api_key]',
'accepted_keys': 'Only api_key',
'description': 'Normal operation, single key'
},
{
'state': 'Rotation Active',
'fields': 'api_key_new=new_key, rotation_expires=future',
'returns': '[header, [api_key, api_key_new]]',
'accepted_keys': 'Both old and new keys',
'description': 'Grace period, either key works'
},
{
'state': 'Rotation Expired',
'fields': 'api_key_new=new_key, rotation_expires=past',
'returns': '[header, api_key_new]',
'accepted_keys': 'Only api_key_new',
'description': 'Old key expired, new key is primary'
}
]
for state in states:
print(f"State: {state['state']}")
print(f" Fields: {state['fields']}")
print(f" Returns: {state['returns']}")
print(f" Accepted Keys: {state['accepted_keys']}")
print(f" Description: {state['description']}")
print()
print("=" * 70)
print()
print("Rotation Workflow:")
print()
print("Phase 1: Initiate Rotation")
print(" 1. Admin updates credit definition:")
print(" PUT /credits/{group}")
print(" {")
print(" 'api_key': 'old_key', // keep existing")
print(" 'api_key_new': 'new_key', // set new key")
print(" 'api_key_rotation_expires': '2025-01-15T10:00:00Z' // 24hr grace")
print(" }")
print()
print(" 2. System encrypts both keys and stores in database")
print()
print(" 3. get_credit_api_header() returns [header, [old_key, new_key]]")
print()
print(" 4. Upstream requests work with EITHER key")
print()
print("Phase 2: Update Upstream Services (grace period)")
print(" 1. Gradually update upstream services with new_key")
print(" 2. Old requests with old_key still work")
print(" 3. New requests with new_key work")
print(" 4. No downtime required")
print()
print("Phase 3: Rotation Expires (automatic)")
print(" 1. Clock reaches api_key_rotation_expires")
print()
print(" 2. get_credit_api_header() returns [header, new_key]")
print()
print(" 3. Only new_key is accepted")
print()
print(" 4. Old_key is now invalid")
print()
print("Phase 4: Cleanup (optional)")
print(" 1. Admin updates credit definition:")
print(" PUT /credits/{group}")
print(" {")
print(" 'api_key': 'new_key', // promote new to primary")
print(" 'api_key_new': None, // clear rotation fields")
print(" 'api_key_rotation_expires': None")
print(" }")
print()
print(" 2. Database cleaned up, rotation complete")
print()
print("=" * 70)
print()
print("Zero-Downtime Key Rotation:")
print()
print("BEFORE (Downtime Required):")
print(" 1. Old key: sk-abc123 (in use by upstream)")
print(" 2. Key compromised (leaked on GitHub)")
print(" 3. Admin updates to new key: sk-xyz789")
print(" 4. Upstream services immediately fail (old key invalid)")
print(" 5. → Service interruption while updating all upstream services")
print(" 6. → Race to update before attackers exploit leaked key")
print()
print("AFTER (Zero Downtime):")
print(" 1. Old key: sk-abc123 (in use by upstream)")
print(" 2. Key compromised (leaked on GitHub)")
print(" 3. Admin initiates rotation:")
print(" - api_key=sk-abc123 (keep old)")
print(" - api_key_new=sk-xyz789 (new)")
print(" - api_key_rotation_expires=+24 hours")
print(" 4. Both keys work for 24 hours (grace period)")
print(" 5. Gradually update upstream services (no rush)")
print(" 6. After 24 hours, old key automatically expires")
print(" 7. → Zero downtime, secure rotation complete")
print()
print("=" * 70)
print()
print("Implementation in get_credit_api_header():")
print()
print(" def get_credit_api_header(api_credit_group):")
print(" credit_def = credit_def_collection.find_one(...)")
print()
print(" api_key = decrypt_value(credit_def.get('api_key'))")
print(" api_key_new = credit_def.get('api_key_new')")
print(" rotation_expires = credit_def.get('api_key_rotation_expires')")
print()
print(" # Check if rotation active")
print(" if api_key_new and rotation_expires:")
print(" now = datetime.now(timezone.utc)")
print(" rotation_expires_dt = parse_datetime(rotation_expires)")
print()
print(" if now < rotation_expires_dt:")
print(" # Grace period active - accept both keys")
print(" api_key_new_dec = decrypt_value(api_key_new)")
print(" return [header, [api_key, api_key_new_dec]]")
print()
print(" elif now >= rotation_expires_dt:")
print(" # Grace period expired - new key only")
print(" api_key_new_dec = decrypt_value(api_key_new)")
print(" return [header, api_key_new_dec]")
print()
print(" # No rotation - primary key only")
print(" return [header, api_key]")
print()
print("=" * 70)
print()
print("Datetime Handling:")
print()
print(" Supports both string and datetime:")
print(" - String: '2025-01-15T10:00:00Z' (ISO 8601)")
print(" - datetime: Python datetime object")
print()
print(" Timezone aware:")
print(" - Uses datetime.now(timezone.utc)")
print(" - Compares in UTC to avoid timezone issues")
print()
print(" Parsing:")
print(" - datetime.fromisoformat() for strings")
print(" - Handles 'Z' suffix (converts to +00:00)")
print(" - Falls back gracefully if parsing fails")
print()
print("=" * 70)
print()
print("Security Considerations:")
print()
print(" Encryption:")
print(" - api_key encrypted with encrypt_value()")
print(" - api_key_new encrypted with encrypt_value()")
print(" - Both stored encrypted in database")
print(" - Decrypted only when needed for upstream requests")
print()
print(" Grace Period Length:")
print(" - Recommended: 24 hours (1 day)")
print(" - Minimum: 1 hour (for urgent rotations)")
print(" - Maximum: 7 days (for slow-moving deployments)")
print()
print(" Key Compromise Response:")
print(" 1. Rotate immediately (grace period = 1 hour)")
print(" 2. Monitor logs for old key usage")
print(" 3. Block suspicious IPs using old key")
print(" 4. After grace period, old key is dead")
print()
print("=" * 70)
print()
print("Example API Usage:")
print()
print(" Initiate Rotation:")
print(" PUT /credits/openai-group")
print(" {")
print(" 'api_credit_group': 'openai-group',")
print(" 'api_key': 'sk-old-key-abc123',")
print(" 'api_key_header': 'Authorization',")
print(" 'api_key_new': 'sk-new-key-xyz789',")
print(" 'api_key_rotation_expires': '2025-01-16T10:00:00Z',")
print(" 'credit_tiers': [...]")
print(" }")
print()
print(" During Grace Period:")
print(" - Upstream request with old key: ✓ Works")
print(" - Upstream request with new key: ✓ Works")
print()
print(" After Expiry:")
print(" - Upstream request with old key: ✗ Fails")
print(" - Upstream request with new key: ✓ Works")
print()
print(" Cleanup (promote new to primary):")
print(" PUT /credits/openai-group")
print(" {")
print(" 'api_credit_group': 'openai-group',")
print(" 'api_key': 'sk-new-key-xyz789',")
print(" 'api_key_header': 'Authorization',")
print(" 'api_key_new': None,")
print(" 'api_key_rotation_expires': None,")
print(" 'credit_tiers': [...]")
print(" }")
print()
print("=" * 70)
print()
print("Testing Recommendations:")
print()
print("1. Test rotation initiation:")
print(" - Create credit definition with rotation fields")
print(" - Verify both keys are encrypted in database")
print(" - Verify get_credit_api_header() returns both keys")
print()
print("2. Test grace period:")
print(" - Set rotation_expires to +1 minute")
print(" - Make upstream request with old key → should succeed")
print(" - Make upstream request with new key → should succeed")
print(" - Wait 2 minutes")
print(" - Make upstream request with old key → should fail")
print(" - Make upstream request with new key → should succeed")
print()
print("3. Test datetime parsing:")
print(" - Test with ISO 8601 string: '2025-01-15T10:00:00Z'")
print(" - Test with datetime object")
print(" - Test with invalid format → should handle gracefully")
print()
print("4. Test cleanup:")
print(" - Promote new key to primary")
print(" - Clear rotation fields (set to None)")
print(" - Verify single key returned")
print()
print("=" * 70)
print()
print("Monitoring & Logging:")
print()
print(" Key Rotation Events:")
print(" - Log when rotation initiated (info level)")
print(" - Log when grace period expires (info level)")
print(" - Log when old key used during grace period (debug)")
print(" - Log when old key rejected after expiry (warning)")
print()
print(" Recommended Alerts:")
print(" - Alert if old key used after 80% of grace period")
print(" - Alert if old key rejected (indicates late adopter)")
print(" - Alert if rotation_expires not cleaned up after +7 days")
print()
print("=" * 70)
print()
print("Operational Procedures:")
print()
print(" Planned Rotation (no compromise):")
print(" 1. Set grace period = 24 hours")
print(" 2. Initiate rotation via API")
print(" 3. Update documentation with new key")
print(" 4. Notify upstream service owners")
print(" 5. Monitor old key usage for 24 hours")
print(" 6. Cleanup rotation fields after expiry")
print()
print(" Emergency Rotation (key compromised):")
print(" 1. Set grace period = 1 hour (minimum)")
print(" 2. Initiate rotation immediately")
print(" 3. Emergency notification to upstream owners")
print(" 4. Monitor for unauthorized usage")
print(" 5. Block suspicious IPs")
print(" 6. Force cleanup after 1 hour")
print()
print(" Gradual Rollout (large deployment):")
print(" 1. Set grace period = 7 days")
print(" 2. Initiate rotation")
print(" 3. Update canary services first")
print(" 4. Monitor for issues")
print(" 5. Roll out to 25%, 50%, 100%")
print(" 6. Cleanup after full rollout")
print()
print("=" * 70)
print()
print("Future Enhancements:")
print()
print(" 1. Automatic cleanup:")
print(" - Cron job to promote api_key_new after expiry")
print(" - Clear rotation fields automatically")
print()
print(" 2. Rotation history:")
print(" - Track all key rotations")
print(" - Audit log for security review")
print()
print(" 3. Key usage metrics:")
print(" - Count requests with old key during grace period")
print(" - Alert if usage doesn't decrease over time")
print()
print(" 4. Multi-key support:")
print(" - Support more than 2 concurrent keys")
print(" - Useful for complex migration scenarios")
print()
print("=" * 70)
print()
print("P1 Risk Mitigated:")
print(" No mechanism to rotate API keys without downtime")
print()
print("Production Impact:")
print(" ✓ Zero-downtime API key rotation")
print(" ✓ Graceful key compromise recovery")
print(" ✓ No service interruption during rotation")
print(" ✓ Configurable grace period for flexibility")
print(" ✓ Automatic expiry of old keys")
print()
if __name__ == '__main__':
test_api_key_rotation_implementation()