Spaces:
Running
Running
import httpx | |
from fastapi import FastAPI | |
from fastapi.responses import JSONResponse, FileResponse | |
from pydantic import BaseModel | |
from enum import Enum | |
from transformers import pipeline | |
from phishing_datasets import submit_entry | |
from url_tools import extract_urls, resolve_short_url, extract_domain_from_url | |
from urlscan_client import UrlscanClient | |
import requests | |
app = FastAPI() | |
urlscan = UrlscanClient() | |
class MessageModel(BaseModel): | |
text: str | |
class QueryModel(BaseModel): | |
sender: str | |
message: MessageModel | |
class AppModel(BaseModel): | |
version: str | |
class InputModel(BaseModel): | |
_version: int | |
query: QueryModel | |
app: AppModel | |
class ActionModel(Enum): | |
# Insufficient information to determine an action to take. In a query response, has the effect of allowing the message to be shown normally. | |
NONE = 0 | |
# Allow the message to be shown normally. | |
ALLOW = 1 | |
# Prevent the message from being shown normally, filtered as Junk message. | |
JUNK = 2 | |
# Prevent the message from being shown normally, filtered as Promotional message. | |
PROMOTION = 3 | |
# Prevent the message from being shown normally, filtered as Transactional message. | |
TRANSACTION = 4 | |
class SubActionModel(Enum): | |
NONE = 0 | |
class OutputModel(BaseModel): | |
action: ActionModel | |
sub_action: SubActionModel | |
pipe = pipeline(task="text-classification", model="mrm8488/bert-tiny-finetuned-sms-spam-detection") | |
def get_well_known_aasa(): | |
return JSONResponse( | |
content={ | |
"messagefilter": { | |
"apps": [ | |
"X9NN3FSS3T.com.lela.Serenity.SerenityMessageFilterExtension", | |
"X9NN3FSS3T.com.lela.Serenity" | |
] | |
} | |
}, | |
media_type="application/json" | |
) | |
def get_robot_txt(): | |
return FileResponse("robot.txt") | |
def predict(model: InputModel) -> OutputModel: | |
text = model.query.message.text | |
print(f"Predict: {text}") | |
urls = extract_urls(text) | |
if urls: | |
print("Searching for past scans") | |
search_results = [urlscan.search(f"domain:{extract_domain_from_url(url)}") for url in urls] | |
scan_results = [] | |
for search_result in search_results: | |
results = search_result.get('results', []) | |
for result in results: | |
result_uuid = result.get('_id', str) | |
scan_result = urlscan.get_result(result_uuid) | |
scan_results.append(scan_result) | |
if not scan_results: | |
print("Scanning...") | |
scan_results = [urlscan.scan(url) for url in urls] | |
for result in scan_results: | |
overall = result.get('verdicts', {}).get('overall', {}) | |
print(f"Checking overall verdict: {overall}") | |
if overall.get('hasVerdicts'): | |
score = overall.get('score') | |
print(f"Has verdicts score {score}") | |
if 0 < overall.get('score'): | |
print("Submitting entry and returning JUNK.") | |
submit_entry(model.query.sender, model.query.message.text) | |
return OutputModel(action=ActionModel.JUNK, sub_action=SubActionModel.NONE) | |
# elif overall.get('score') < 0: | |
# print("Returning ALLOW.") | |
# return OutputModel(action=ActionModel.ALLOW, sub_action=SubActionModel.NONE) | |
label = pipe(text) | |
if label[0]['label'] == 'LABEL_1': | |
print("Classify LABEL_1. Submitting entry and returning JUNK.") | |
submit_entry(model.query.sender, model.query.message.text) | |
return OutputModel(action=ActionModel.JUNK, sub_action=SubActionModel.NONE) | |
else: | |
print("Classify LABEL_0. Submitting entry and returning NONE.") | |
return OutputModel(action=ActionModel.NONE, sub_action=SubActionModel.NONE) | |
class ReportModel(BaseModel): | |
sender: str | |
message: str | |
def report(model: ReportModel): | |
submit_entry(model.sender, model.message) |