Spaces:
Running
Running
import argparse | |
import json | |
import os | |
from pyspark.sql import SparkSession | |
from anonyspark.masking import ( | |
mask_email_udf, mask_name_udf, mask_date_udf, | |
mask_ssn_udf, mask_itin_udf, mask_phone_udf | |
) | |
def apply_masking(df, schema): | |
""" | |
Apply masking UDFs based on schema definitions. | |
""" | |
for column, dtype in schema.items(): | |
if dtype == "email": | |
df = df.withColumn(f"masked_{column}", mask_email_udf(df[column])) | |
elif dtype == "name": | |
df = df.withColumn(f"masked_{column}", mask_name_udf(df[column])) | |
elif dtype == "dob": | |
df = df.withColumn(f"masked_{column}", mask_date_udf(df[column])) | |
elif dtype == "ssn": | |
df = df.withColumn(f"masked_{column}", mask_ssn_udf(df[column])) | |
elif dtype == "itin": | |
df = df.withColumn(f"masked_{column}", mask_itin_udf(df[column])) | |
elif dtype == "phone": | |
df = df.withColumn(f"masked_{column}", mask_phone_udf(df[column])) | |
return df | |
def main(): | |
parser = argparse.ArgumentParser(description="AnonySpark CLI for masking sensitive data.") | |
parser.add_argument('--input', type=str, required=True, help='Path to input CSV file') | |
parser.add_argument('--output', type=str, required=True, help='Directory to save masked output') | |
parser.add_argument('--schema', type=str, required=True, help='Path to masking schema JSON file') | |
args = parser.parse_args() | |
# Create output directory if it doesn't exist | |
os.makedirs(args.output, exist_ok=True) | |
# Start Spark | |
spark = SparkSession.builder.master("local[*]").appName("AnonysparkCLI").getOrCreate() | |
# Load data and schema | |
df = spark.read.csv(args.input, header=True) | |
with open(args.schema, 'r') as f: | |
schema = json.load(f) | |
# Apply masking | |
masked_df = apply_masking(df, schema) | |
# Save to output directory | |
masked_df.write.mode("overwrite").csv(args.output, header=True) | |
print(f"Masked file written to: {args.output}") | |
if __name__ == "__main__": | |
main() | |