import boto3
import json
from presidio_analyzer import AnalyzerEngine
from presidio_anonymizer import AnonymizerEngine
from presidio_anonymizer.entities import OperatorConfig
operators={
"CREDIT_CARD": OperatorConfig(operator_name="replace", params={'new_value': '<REDACTED_CREDIT_CARD>'}),
"CRYPTO": OperatorConfig(operator_name="replace", params={'new_value': '<REDACTED_CRYPTO>'}),
"EMAIL_ADDRESS": OperatorConfig(operator_name="replace", params={'new_value': '<REDACTED_EMAIL_ADDRESS>'}),
"IBAN_CODE": OperatorConfig(operator_name="replace", params={'new_value': '<REDACTED_IBAN_CODE>'}),
"IP_ADDRESS": OperatorConfig(operator_name="replace", params={'new_value': '<REDACTED_IP_ADDRESS>'}),
"NRP": OperatorConfig(operator_name="replace", params={'new_value': '<REDACTED_NRP>'}),
"LOCATION": OperatorConfig(operator_name="replace", params={'new_value': '<REDACTED_LOCATION>'}),
"PERSON": OperatorConfig(operator_name="replace", params={'new_value': '<REDACTED_PERSON>'}),
"PHONE_NUMBER": OperatorConfig(operator_name="replace", params={'new_value': '<REDACTED_PHONE_NUMBER>'}),
"MEDICAL_LICENSE": OperatorConfig(operator_name="replace", params={'new_value': '<REDACTED_MEDICAL_LICENSE>'}),
"URL": OperatorConfig(operator_name="replace", params={'new_value': '<REDACTED_URL>'}),
"US_BANK_NUMBER": OperatorConfig(operator_name="replace", params={'new_value': '<REDACTED_US_BANK_NUMBER>'}),
"US_DRIVER_LICENSE": OperatorConfig(operator_name="replace", params={'new_value': '<REDACTED_US_DRIVER_LICENSE>'}),
"US_ITIN": OperatorConfig(operator_name="replace", params={'new_value': '<REDACTED_US_ITIN>'}),
"US_PASSPORT": OperatorConfig(operator_name="replace", params={'new_value': '<REDACTED_US_PASSPORT>'}),
"US_SSN": OperatorConfig(operator_name="replace", params={'new_value': '<REDACTED_US_SSN>'})
}
# Recursively check for string values in the provided JSON object (in this case,
# the "metadata" field of the JSON object) and redact them
# as appropriate.
def check_string_values(obj, analyzer, anonymizer):
if isinstance(obj, dict):
for key, value in obj.items():
# Skip analyzing Base64-encoded fields.
if key == 'image_base64' or key == 'orig_elements':
pass
elif isinstance(value, str):
anonymized_results = anonymizer.anonymize(
text=value,
analyzer_results=analyzer.analyze(text=value, language="en"),
operators=operators
)
value = anonymized_results.text
# Recurse through nested "metadata" fields.
elif isinstance(value, dict):
check_string_values(value, analyzer, anonymizer)
# Skip analyzing non-string fields.
else:
pass
return obj
def main():
s3_input_bucket_name = '<input-bucket-name>'
s3_input_folder_prefix = '<input-folder-prefix>'
s3_output_bucket_name = '<output-bucket-name>'
s3_output_folder_prefix = '<output-folder-prefix>'
s3_bucket_region = '<bucket-region-short-id>'
s3_client = boto3.client('s3')
# Load the JSON files from the input folder.
# Normalize the input folder prefix to ensure it ends with '/'.
if not s3_input_folder_prefix.endswith('/'):
s3_input_folder_prefix += '/'
paginator = s3_client.get_paginator('list_objects_v2')
page_iterator = paginator.paginate(
Bucket=s3_input_bucket_name,
Prefix=s3_input_folder_prefix
)
files = []
# Get the list of file keys from the input folder to anaylyze.
# A file's key is the full path to the file within the bucket.
# For example, if the input folder's name is "original" and the
# input file's name is "file1.json", the file's key is
# "original/file1.json".
# There could be multiple "pages" of file listings available,
# so each of these "pages" must be looped through, so that
# no files are missed.
for page in page_iterator:
# "Contents" is missing if the folder is empty or the
# intended prefix is not found.
if 'Contents' in page:
for obj in page['Contents']:
key = obj['Key']
if not key.endswith('/'): # Skip if it's a folder placeholder.
files.append(key)
print(f"Found file: {s3_input_bucket_name}/{key}")
analyzer = AnalyzerEngine()
anonymizer = AnonymizerEngine()
s3_resource = boto3.resource('s3')
# For each JSON file to analyze, load the JSON data.
for key in files:
print(f"Analyzing file: {s3_input_bucket_name}/{key}")
content_object = s3_resource.Object(
bucket_name=s3_input_bucket_name,
key=key
)
file_content = content_object.get()['Body'].read().decode('utf-8') # Bytes to text.
json_data = json.loads(file_content) # Text to JSON.
# For each element in the JSON data...
for element in json_data:
print(f" Analyzing element with ID: {element['element_id']} in file {s3_input_bucket_name}/{key}")
# If there is a "text" field...
if 'text' in element:
# ...get the text content...
text_element = element['text']
# ...and analyze and redact the text content as appropriate.
anonymized_results = anonymizer.anonymize(
text=text_element,
analyzer_results=analyzer.analyze(text=text_element, language="en"),
operators=operators
)
element['text'] = anonymized_results.text
# If there is a "metadata" field...
if 'metadata' in element:
# ...get the metadata content...
metadata_element = element['metadata']
# ...and analyze and redact the metadata content as appropriate.
element['metadata'] = check_string_values(metadata_element, analyzer, anonymizer)
# Get the filename from the key.
filename = key.split(s3_input_folder_prefix)[1]
# Normalize the output folder prefix to ensure it ends with '/'.
if not s3_output_folder_prefix.endswith('/'):
s3_output_folder_prefix += '/'
# Then save the JSON data with its redactions to the output folder.
print(f"Saving file: {s3_output_bucket_name}/{s3_output_folder_prefix}{filename}")
s3_client.put_object(
Bucket=s3_output_bucket_name,
Key=f"{s3_output_folder_prefix}{filename}",
Body=json.dumps(obj=json_data, indent=4).encode('utf-8')
)
if __name__ == "__main__":
main()