Spaces:
Sleeping
Sleeping
import gradio as gr | |
import os | |
import zipfile | |
import json | |
import pandas as pd | |
import datetime | |
import shutil | |
import hashlib | |
from io import StringIO | |
from pdfminer.pdfinterp import PDFResourceManager, PDFPageInterpreter | |
from pdfminer.converter import TextConverter | |
from pdfminer.layout import LAParams | |
from pdfminer.pdfpage import PDFPage | |
from PyPDF2 import PdfReader | |
from openai import OpenAI, RateLimitError | |
import backoff | |
import re | |
class PDFUtils: | |
def extract_text_with_pdfminer(path): | |
rsrcmgr = PDFResourceManager() | |
retstr = StringIO() | |
codec = 'utf-8' | |
laparams = LAParams() | |
device = TextConverter(rsrcmgr, retstr, codec=codec, laparams=laparams) | |
with open(path, 'rb') as fp: | |
interpreter = PDFPageInterpreter(rsrcmgr, device) | |
for page in PDFPage.get_pages(fp, check_extractable=True): | |
interpreter.process_page(page) | |
text = retstr.getvalue() | |
return text | |
def extract_text_with_pypdf2(path): | |
reader = PdfReader(path) | |
text = ''.join(page.extract_text() for page in reader.pages) | |
return text | |
def convert_pdf_to_text(path): | |
text = PDFUtils.extract_text_with_pdfminer(path) | |
if text is None: | |
print("Processing using PyPDF2") | |
text = PDFUtils.extract_text_with_pypdf2(path) | |
return text | |
class OpenAIClassifier: | |
def __init__(self, api_key): | |
self.client = OpenAI(api_key=api_key) | |
self.processed_resumes = set() | |
def completions_with_backoff(self, **kwargs): | |
return self.client.chat.completions.create(**kwargs) | |
def hash_resume_text(self, resume_text): | |
return hashlib.md5(resume_text.encode()).hexdigest() | |
def classify_resume(self, resume_text, questions_string): | |
resume_hash = self.hash_resume_text(resume_text) | |
if resume_hash in self.processed_resumes: | |
return "Already Processed" | |
response = self.client.chat.completions.create( | |
model="gpt-3.5-turbo", | |
messages=[ | |
{"role": "system", "content": f"As a very strict hiring manager who is evaluating a resume. Answer each question, let questions be keys and answers be values {questions_string} As a hiring manager parse through the resume thoroughly and answer the following questions with the same index in dictionary format. In the experience section, if there is no direct reference to the number of years of experience, count the number of years from the least present year in the experience section."}, | |
{"role": "user", "content": resume_text} | |
], | |
temperature=0.7, | |
max_tokens=516 | |
) | |
summary = str(response.choices[0].message) | |
self.processed_resumes.add(resume_hash) | |
# Extract JSON substring using regular expression | |
json_match = re.search(r"\{.*\}", summary) | |
if json_match: | |
return json_match.group(0) # Return only the JSON part | |
else: | |
return "{}" | |
def read_json_file(file_path): | |
with open(file_path, 'r') as json_file: | |
json_data = json.load(json_file) | |
return json.dumps(json_data, indent=2) | |
def process_text(text): | |
return text.replace('\\n', '<br>') if text else "Error: Text is None" | |
def process_pdf(file, model_choice, openai_classifier, questions_string): | |
data = PDFUtils.convert_pdf_to_text(file) | |
if model_choice == "openai": | |
classification = openai_classifier.classify_resume(data, questions_string) | |
print(f"Debug: classification output:\n{classification}\n") # Debugging line to confirm JSON structure | |
return {"file_path": file, "result": process_text(classification)} | |
else: | |
print("Only openai can be utilized") | |
def unzip_and_process(zip_file, model_choice, openai_classifier, questions_string): | |
extract_folder, selected_folder, rejected_folder, error_folder = 'extracted_files/', 'extracted_files/Selected/', 'extracted_files/Rejected/', 'extracted_files/Error/' | |
os.makedirs(extract_folder, exist_ok=True) | |
os.makedirs(selected_folder, exist_ok=True) | |
os.makedirs(rejected_folder, exist_ok=True) | |
os.makedirs(error_folder, exist_ok=True) | |
for root, dirs, files in os.walk(extract_folder): | |
for file_name in files: | |
os.remove(os.path.join(root, file_name)) | |
with zipfile.ZipFile(zip_file, 'r') as zip_ref: | |
zip_ref.extractall(extract_folder) | |
df_results = pd.DataFrame() | |
questions_dict = json.loads(questions_string) | |
questions_list = list(questions_dict.values()) | |
df_results = pd.DataFrame(columns=questions_list) | |
for root, dirs, files in os.walk(extract_folder): | |
for file_name in files: | |
file_path = os.path.join(root, file_name) | |
if file_path.lower().endswith('.pdf'): | |
try: | |
result = process_pdf(file_path, model_choice, openai_classifier, questions_string) | |
# If already processed, skip this file | |
if result['result'] == "Already Processed": | |
print(f"{file_name} is already processed. Skipping.") | |
continue | |
cleaned_data_str = result['result'].replace("<br>", "").replace("\\", "") | |
# Attempt to parse JSON data | |
data_dict = json.loads(cleaned_data_str) | |
df_resume = pd.DataFrame(data_dict.items(), columns=["Key", os.path.basename(file_path)]) | |
df_resume["Key"] = df_resume["Key"].replace(questions_dict) | |
df_results = pd.concat([df_results, df_resume.set_index(["Key"]).T.reset_index(drop=True)], axis=0, ignore_index=True) | |
# Decide on the destination based on the final selection key | |
selection_rejection = data_dict.get(list(data_dict.keys())[-1]) | |
destination = selected_folder if selection_rejection == "Selected" else rejected_folder if selection_rejection == "Rejected" else error_folder | |
shutil.move(file_path, os.path.join(destination, os.path.basename(file_path))) | |
except json.JSONDecodeError as e: | |
print(f"JSON decoding error for {file_path}: {e}") | |
shutil.move(file_path, os.path.join(error_folder, file_name)) | |
except Exception as ex: | |
print(f"Error processing file {file_path}: {ex}") | |
shutil.move(file_path, os.path.join(error_folder, file_name)) | |
else: | |
shutil.move(file_path, os.path.join(error_folder, file_name)) | |
print(f"File '{file_name}' is not a PDF. Moved to error directory.") | |
# Save processed results to an Excel file | |
timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S") | |
excel_file = f'extracted_files/output_results_{timestamp}.xlsx' | |
df_results.to_excel(excel_file, index=False) | |
print(f"Processed resumes saved to {excel_file}") | |
return excel_file # Return the path to the Excel file | |
def run_interface(questions_json, resumes_zip): | |
try: | |
# Handle the questions.json file content | |
if isinstance(questions_json, bytes): | |
# Parse bytes directly as JSON string | |
questions_string = questions_json.decode('utf-8') | |
elif isinstance(questions_json, str): | |
# If it's a file path | |
with open(questions_json, 'r') as f: | |
questions_string = f.read() | |
else: | |
raise ValueError("Unexpected questions_json type") | |
# Handle the ZIP file | |
if isinstance(resumes_zip, bytes): | |
# Write bytes to a temporary zip file | |
temp_zip_path = "temp_uploaded_resumes.zip" | |
with open(temp_zip_path, "wb") as f: | |
f.write(resumes_zip) | |
zip_path = temp_zip_path | |
elif isinstance(resumes_zip, str): | |
zip_path = resumes_zip | |
else: | |
raise ValueError("Unexpected resumes_zip type") | |
openai_classifier = OpenAIClassifier(api_key="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") | |
output_file = unzip_and_process(zip_path, "openai", openai_classifier, questions_string) | |
# Clean up temporary zip file if it was created | |
if isinstance(resumes_zip, bytes) and os.path.exists(temp_zip_path): | |
os.remove(temp_zip_path) | |
return output_file | |
except Exception as e: | |
print(f"Error in run_interface: {str(e)}") | |
# Clean up temporary files in case of error | |
if 'temp_zip_path' in locals() and os.path.exists(temp_zip_path): | |
os.remove(temp_zip_path) | |
raise gr.Error(f"Processing failed: {str(e)}") | |
# Set up the Gradio interface | |
interface = gr.Interface( | |
fn=run_interface, | |
inputs=[ | |
gr.File( | |
label="Upload questions.json", | |
type="binary", | |
file_types=[".json"] | |
), | |
gr.File( | |
label="Upload ZIP of Resumes", | |
type="binary", | |
file_types=[".zip"] | |
) | |
], | |
outputs=gr.File(label="Processed Results Excel"), | |
title="Resume Analysis System", | |
description="Upload a questions.json file and a ZIP file containing resumes to process.", | |
allow_flagging="never", | |
examples=[ | |
["questions.json", "to_be_screened_2.zip"] | |
], | |
cache_examples=False | |
) | |
# Launch with debugging and sharing enabled | |
interface.launch(debug=True, share=True) |