From 5bfadf8f9ada07926fdcfe7f10929ef66a092544 Mon Sep 17 00:00:00 2001 From: ddupont <3820588+ddupont808@users.noreply.github.com> Date: Fri, 1 Aug 2025 11:46:32 -0400 Subject: [PATCH] Update pii_anonymization.py --- .../agent/callbacks/pii_anonymization.py | 179 +----------------- 1 file changed, 8 insertions(+), 171 deletions(-) diff --git a/libs/python/agent/agent/callbacks/pii_anonymization.py b/libs/python/agent/agent/callbacks/pii_anonymization.py index f5c31a61..68f4b2fc 100644 --- a/libs/python/agent/agent/callbacks/pii_anonymization.py +++ b/libs/python/agent/agent/callbacks/pii_anonymization.py @@ -9,10 +9,7 @@ import io import logging try: - from presidio_analyzer import AnalyzerEngine - from presidio_anonymizer import AnonymizerEngine, DeanonymizeEngine - from presidio_anonymizer.entities import RecognizerResult, OperatorConfig - from presidio_image_redactor import ImageRedactorEngine + # TODO: Add Presidio dependencies from PIL import Image PRESIDIO_AVAILABLE = True except ImportError: @@ -32,11 +29,7 @@ class PIIAnonymizationCallback(AsyncCallbackHandler): def __init__( self, - anonymize_text: bool = True, - anonymize_images: bool = True, - entities_to_anonymize: Optional[List[str]] = None, - anonymization_operator: str = "replace", - image_redaction_color: Tuple[int, int, int] = (255, 192, 203) # Pink + # TODO: Any extra kwargs if needed ): """ Initialize the PII anonymization callback. @@ -51,23 +44,10 @@ class PIIAnonymizationCallback(AsyncCallbackHandler): if not PRESIDIO_AVAILABLE: raise ImportError( "Presidio is not available. Install with: " - "pip install presidio-analyzer presidio-anonymizer presidio-image-redactor" + "pip install cua-agent[pii-anonymization]" ) - self.anonymize_text = anonymize_text - self.anonymize_images = anonymize_images - self.entities_to_anonymize = entities_to_anonymize - self.anonymization_operator = anonymization_operator - self.image_redaction_color = image_redaction_color - - # Initialize Presidio engines - self.analyzer = AnalyzerEngine() - self.anonymizer = AnonymizerEngine() - self.deanonymizer = DeanonymizeEngine() - self.image_redactor = ImageRedactorEngine() - - # Store anonymization mappings for deanonymization - self.anonymization_mappings: Dict[str, Any] = {} + # TODO: Implement __init__ async def on_llm_start(self, messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """ @@ -79,9 +59,6 @@ class PIIAnonymizationCallback(AsyncCallbackHandler): Returns: List of messages with PII anonymized """ - if not self.anonymize_text and not self.anonymize_images: - return messages - anonymized_messages = [] for msg in messages: anonymized_msg = await self._anonymize_message(msg) @@ -99,9 +76,6 @@ class PIIAnonymizationCallback(AsyncCallbackHandler): Returns: List of output with PII deanonymized for tool calls """ - if not self.anonymize_text: - return output - deanonymized_output = [] for item in output: # Only deanonymize tool calls and computer_call messages @@ -114,146 +88,9 @@ class PIIAnonymizationCallback(AsyncCallbackHandler): return deanonymized_output async def _anonymize_message(self, message: Dict[str, Any]) -> Dict[str, Any]: - """Anonymize PII in a single message.""" - msg_copy = message.copy() - - # Anonymize text content - if self.anonymize_text: - msg_copy = await self._anonymize_text_content(msg_copy) - - # Redact images in computer_call_output - if self.anonymize_images and msg_copy.get("type") == "computer_call_output": - msg_copy = await self._redact_image_content(msg_copy) - - return msg_copy - - async def _anonymize_text_content(self, message: Dict[str, Any]) -> Dict[str, Any]: - """Anonymize text content in a message.""" - msg_copy = message.copy() - - # Handle content array - content = msg_copy.get("content", []) - if isinstance(content, str): - anonymized_text, _ = await self._anonymize_text(content) - msg_copy["content"] = anonymized_text - elif isinstance(content, list): - anonymized_content = [] - for item in content: - if isinstance(item, dict) and item.get("type") == "text": - text = item.get("text", "") - anonymized_text, _ = await self._anonymize_text(text) - item_copy = item.copy() - item_copy["text"] = anonymized_text - anonymized_content.append(item_copy) - else: - anonymized_content.append(item) - msg_copy["content"] = anonymized_content - - return msg_copy - - async def _redact_image_content(self, message: Dict[str, Any]) -> Dict[str, Any]: - """Redact PII from images in computer_call_output messages.""" - msg_copy = message.copy() - output = msg_copy.get("output", {}) - - if isinstance(output, dict) and "image_url" in output: - try: - # Extract base64 image data - image_url = output["image_url"] - if image_url.startswith("data:image/"): - # Parse data URL - header, data = image_url.split(",", 1) - image_data = base64.b64decode(data) - - # Load image with PIL - image = Image.open(io.BytesIO(image_data)) - - # Redact PII from image - redacted_image = self.image_redactor.redact(image, self.image_redaction_color) - - # Convert back to base64 - buffer = io.BytesIO() - redacted_image.save(buffer, format="PNG") - redacted_data = base64.b64encode(buffer.getvalue()).decode() - - # Update image URL - output_copy = output.copy() - output_copy["image_url"] = f"data:image/png;base64,{redacted_data}" - msg_copy["output"] = output_copy - - except Exception as e: - logger.warning(f"Failed to redact image: {e}") - - return msg_copy + # TODO: Implement _anonymize_message + return message async def _deanonymize_item(self, item: Dict[str, Any]) -> Dict[str, Any]: - """Deanonymize PII in tool calls and computer outputs.""" - item_copy = item.copy() - - # Handle computer_call arguments - if item.get("type") == "computer_call": - args = item_copy.get("args", {}) - if isinstance(args, dict): - deanonymized_args = {} - for key, value in args.items(): - if isinstance(value, str): - deanonymized_value, _ = await self._deanonymize_text(value) - deanonymized_args[key] = deanonymized_value - else: - deanonymized_args[key] = value - item_copy["args"] = deanonymized_args - - return item_copy - - async def _anonymize_text(self, text: str) -> Tuple[str, List[RecognizerResult]]: - """Anonymize PII in text and return the anonymized text and results.""" - if not text.strip(): - return text, [] - - try: - # Analyze text for PII - analyzer_results = self.analyzer.analyze( - text=text, - entities=self.entities_to_anonymize, - language="en" - ) - - if not analyzer_results: - return text, [] - - # Anonymize the text - anonymized_result = self.anonymizer.anonymize( - text=text, - analyzer_results=analyzer_results, - operators={entity_type: OperatorConfig(self.anonymization_operator) - for entity_type in set(result.entity_type for result in analyzer_results)} - ) - - # Store mapping for deanonymization - mapping_key = str(hash(text)) - self.anonymization_mappings[mapping_key] = { - "original": text, - "anonymized": anonymized_result.text, - "results": analyzer_results - } - - return anonymized_result.text, analyzer_results - - except Exception as e: - logger.warning(f"Failed to anonymize text: {e}") - return text, [] - - async def _deanonymize_text(self, text: str) -> Tuple[str, bool]: - """Attempt to deanonymize text using stored mappings.""" - try: - # Look for matching anonymized text in mappings - for mapping_key, mapping in self.anonymization_mappings.items(): - if mapping["anonymized"] == text: - return mapping["original"], True - - # If no mapping found, return original text - return text, False - - except Exception as e: - logger.warning(f"Failed to deanonymize text: {e}") - return text, False + # TODO: Implement _deanonymize_item + return item