|
import asyncio |
|
from collections import deque |
|
import json |
|
import os |
|
from typing import Callable |
|
|
|
import aiohttp |
|
from aiohttp.resolver import AsyncResolver |
|
from cryptography.fernet import Fernet |
|
import discord |
|
from discord.ext import commands, tasks |
|
|
|
from utils.AiohttpManager import AiohttpManager, APIRequestError |
|
import utils.config as config |
|
import utils.datasets as datasets |
|
import utils.helpers as helpers |
|
import utils.keepalive as keepalive |
|
|
|
|
|
"""<-- VARIABLES -->""" |
|
|
|
|
|
bot_token = os.environ["DISCORD_BOT_TOKEN"] |
|
dataset_id = os.environ["DATASET_ID"] |
|
hf_api_key = os.environ["HF_API_KEY"] |
|
fernet = Fernet(os.environ["FERNET_KEY"].encode()) |
|
nameservers = os.getenv("NAMESERVERS") |
|
|
|
|
|
aiohttp_manager = AiohttpManager() |
|
user_data_changed = False |
|
user_data = datasets.load_dataset(dataset_id, config.datafile_name, hf_api_key) |
|
|
|
|
|
"""<-- MISC FUNCTIONS -->""" |
|
|
|
def change_interval_calc() -> int: |
|
"""Length of user_data times x, unless len(user_data) is 0 then default to y.""" |
|
return ( |
|
len(user_data) * config.delay_amounts["per_user"] |
|
if len(user_data) > 0 else config.delay_amounts["per_loop_default"] |
|
) |
|
|
|
def filter_user_data(exclude: set[str]) -> list[dict]: |
|
"""Don't write any of the excluded properties.""" |
|
return [ |
|
{ |
|
key: value for key, value in user.items() |
|
if (key not in exclude) |
|
} |
|
for user in user_data |
|
] |
|
|
|
async def convert_identifier(identifier: str, convert_to: str, api_key: str) -> str: |
|
"""Convert id to username and vice versa.""" |
|
try: |
|
data = await aiohttp_manager.read_api(config.user_api_url.format(identifier=identifier), api_key) |
|
await asyncio.sleep(config.delay_amounts["per_convert"]) |
|
if not data: |
|
helpers.log(f"User not found with identifier: {identifier}") |
|
raise ValueError(f"User not found with identifier: {identifier}") |
|
helpers.log(f"{identifier} converted to {convert_to}: {data[convert_to]}") |
|
return data[convert_to] |
|
except KeyError as e: |
|
helpers.log(f"Error during identifier convert getting {convert_to} from {data}:", e) |
|
raise |
|
except APIRequestError as e: |
|
helpers.log("Edge case http code handler during identifier convert:", e) |
|
raise |
|
|
|
def get_user(ctx: commands.Context | discord.Message) -> dict | None: |
|
"""Get the user from user_data.""" |
|
for user in user_data: |
|
if ctx.author.id == int(user["id"]): |
|
return user |
|
|
|
def is_registered() -> Callable[[commands.Context], bool]: |
|
"""Registration check decorator.""" |
|
async def func(ctx: commands.Context) -> bool: |
|
user = get_user(ctx) |
|
if not user: |
|
await ctx.send(config.welcome_msg) |
|
return bool(user) |
|
return commands.check(func) |
|
|
|
async def register_user(api_key: str, message: discord.Message) -> None: |
|
"""Register the user.""" |
|
global user_data_changed |
|
try: |
|
elements = await aiohttp_manager.read_api(config.notif_api_url, api_key) |
|
except APIRequestError as e: |
|
helpers.log(f"Edge case http code handler during registration of user id {message.author.id} ({message.author}):", e) |
|
await message.channel.send( |
|
"Uh oh, there was an error during registration. Please try again later " |
|
"(if it doesn't resolve on its own soon, please join the bot's " |
|
f"[Discord server](<{config.discord_url}>) and report the bug!)" |
|
) |
|
return |
|
|
|
if elements: |
|
user = { |
|
"id": message.author.id, |
|
"api_key": fernet.encrypt(api_key.encode()).decode(), |
|
"important": { |
|
"actor.username": {}, |
|
"type": [], |
|
"attachments.score.id": [] |
|
}, |
|
"override": False, |
|
"paused": False, |
|
"sendhere": { |
|
"bool": False |
|
}, |
|
"object": message.author, |
|
"processed_ids": deque(reversed([element['id'] for element in elements]), maxlen=config.notif_cache_length) |
|
} |
|
user_data.append(user) |
|
await message.channel.send( |
|
"Successfully registered! (If you didn't mean to do this, use the command `!flatnotifs unregister`. " |
|
"To learn how to start setting rules, use the command `!flatnotifs help` )" |
|
) |
|
user_data_changed = True |
|
else: |
|
await message.channel.send( |
|
"Please try again and provide a valid personal token " |
|
"(double check that the token is still valid and has the notifications.readonly scope!)" |
|
) |
|
|
|
|
|
"""<-- MAIN -->""" |
|
|
|
async def main(): |
|
"""<-- INSTANTIATE DISCORD BOT -->""" |
|
|
|
intents = discord.Intents.default() |
|
intents.message_content = True |
|
resolver = AsyncResolver(nameservers=nameservers.split(",") if nameservers else nameservers) |
|
connector = aiohttp.TCPConnector(resolver=resolver) |
|
bot = commands.Bot( |
|
command_prefix="!flatnotifs ", |
|
case_insensitive=True, |
|
strip_after_prefix=True, |
|
help_command=None, |
|
intents=intents, |
|
connector=connector, |
|
) |
|
|
|
|
|
"""<-- LOOPS -->""" |
|
|
|
@tasks.loop(hours=24) |
|
async def aiohttp_refresh_loop() -> None: |
|
"""Refresh the aiohttp session every 24 hours.""" |
|
await aiohttp_manager.refresh_session() |
|
|
|
@tasks.loop(seconds=60) |
|
async def check_notifs_loop() -> None: |
|
"""Check notifications every change_interval_calc seconds.""" |
|
check_notifs_loop.change_interval(seconds=change_interval_calc()) |
|
global user_data_changed |
|
if user_data_changed: |
|
filtered_user_data = filter_user_data(exclude={"object", "processed_ids", "channel"}) |
|
datasets.update_dataset(filtered_user_data, dataset_id, config.datafile_name, hf_api_key) |
|
user_data_changed = False |
|
|
|
for user in user_data: |
|
if not user["paused"]: |
|
|
|
api_key = fernet.decrypt(user["api_key"].encode()).decode() |
|
excluded = False |
|
try: |
|
elements = await aiohttp_manager.read_api(config.notif_api_url, api_key) |
|
except APIRequestError as e: |
|
helpers.log("Edge case http code handler:", e) |
|
continue |
|
|
|
if not user["object"]: |
|
try: |
|
user["object"] = await bot.fetch_user(user["id"]) |
|
except Exception as e: |
|
user["paused"] = True |
|
helpers.log(f"Error, user id {user['id']} ({user['object']}) not found:", e) |
|
continue |
|
|
|
if not user["processed_ids"] or not elements: |
|
user["paused"] = True |
|
user_data_changed = True |
|
await user["object"].send(config.check_err_msg) |
|
continue |
|
|
|
for element in elements: |
|
|
|
if element['id'] in user["processed_ids"]: |
|
break |
|
|
|
|
|
is_important = False |
|
triggered_rules = [] |
|
for category, values in user["important"].items(): |
|
nested_category = ("actor.id" if category == "actor.username" else category).split('.') |
|
|
|
|
|
value = element |
|
for k in nested_category: |
|
value = value.get(k, None) |
|
if value is None: |
|
break |
|
if value is None: |
|
continue |
|
|
|
|
|
if ("-"+value) in values: |
|
excluded = True |
|
if category == "actor.username": |
|
if values["-"+value] != element['actor']['username']: |
|
helpers.log("Updated", values["-"+value], "to", element['actor']['username']) |
|
values["-"+value] = element['actor']['username'] |
|
user_data_changed = True |
|
val = values["-"+value] |
|
else: |
|
val = value |
|
triggered_rules.append(category + ": -" + val) |
|
if not user["override"]: |
|
break |
|
|
|
|
|
if ("+"+value) in values: |
|
if not excluded: |
|
is_important = True |
|
if category == "actor.username": |
|
if values["+"+value] != element['actor']['username']: |
|
helpers.log("Updated", values["+"+value], "to", element['actor']['username']) |
|
values["+"+value] = element['actor']['username'] |
|
user_data_changed = True |
|
val = values["+"+value] |
|
else: |
|
val = value |
|
triggered_rules.append(category + ": +" + val) |
|
|
|
helpers.log( |
|
f"{element['actor']['printableName']}: {element['type']}, ID-{element['id']} " |
|
f"{'is' if is_important else 'is not'} categorized as important" |
|
f"{' by rule(s): ' + str(triggered_rules) if is_important else '.'}" |
|
) |
|
|
|
|
|
if is_important or user["override"]: |
|
|
|
if element['type'] == "scoreComment": |
|
url = element['attachments']['score']['htmlUrl'] + "#c-" + element['attachments']['scoreComment'] + "\n" |
|
elif element['type'] in {"scorePublication", "scoreStar", "scoreInvitation"}: |
|
url = element['attachments']['score']['htmlUrl'] + "\n" |
|
elif element['type'] == "userFollow": |
|
url = element['actor']['htmlUrl'] + "\n" |
|
else: |
|
url = "" |
|
|
|
|
|
m = ( |
|
f"{helpers.esc_md(element['actor']['printableName'])}: {helpers.esc_md(element['type'])} [(Open on Flat)]({url})\n" |
|
f"-# Rule(s): {helpers.esc_md(str(triggered_rules))}" |
|
) |
|
|
|
|
|
if user["sendhere"]["bool"]: |
|
try: |
|
await user["channel"].send(f"{user['object'].mention + ' ' if user['sendhere']['mention'] else ''}{m}") |
|
except Exception as e: |
|
helpers.log(f"Unable to find specified channel for user id {user['id']} ({user['object']}):", e) |
|
user["sendhere"]["bool"] = False |
|
await user["object"].send(config.channel_err_msg) |
|
await user["object"].send(m) |
|
else: |
|
await user["object"].send(m) |
|
|
|
|
|
user["processed_ids"].append(element['id']) |
|
|
|
await asyncio.sleep(config.delay_amounts["per_user"]) |
|
|
|
|
|
"""<-- EVENT HANDLERS -->""" |
|
|
|
@bot.event |
|
async def on_ready() -> None: |
|
"""Prepare on Discord bot startup.""" |
|
helpers.log("Bot has connected to Discord! :3") |
|
|
|
|
|
helpers.log("Starting aiohttp_refresh_loop...") |
|
try: |
|
aiohttp_refresh_loop.start() |
|
except RuntimeError as e: |
|
helpers.log("Error starting aiohttp_refresh_loop:", e) |
|
|
|
helpers.log("Processing users...") |
|
for user in user_data: |
|
try: |
|
user["object"] = await bot.fetch_user(user["id"]) |
|
try: |
|
if user["sendhere"]["bool"]: |
|
user["channel"] = await bot.fetch_channel(user["sendhere"]["channel_id"]) |
|
except Exception as e: |
|
helpers.log(f"Unable to find specified channel for user id {user['id']} ({user['object']}):", e) |
|
user["sendhere"]["bool"] = False |
|
await user["object"].send(config.channel_err_msg) |
|
except Exception as e: |
|
raise Exception(f"Error getting user id {user['id']} ({user['object']}) or user not found:", e) |
|
|
|
try: |
|
api_key = fernet.decrypt(user["api_key"].encode()).decode() |
|
elements = await aiohttp_manager.read_api(config.notif_api_url, api_key) |
|
user["processed_ids"] = deque(reversed([element['id'] for element in elements]), maxlen=config.notif_cache_length) |
|
except Exception as e: |
|
try: |
|
helpers.log(f"Unable to check notifications for user id {user['id']} ({user['object']}):", e) |
|
user["paused"] = True |
|
await user["object"].send(config.check_err_msg) |
|
except Exception as e2: |
|
raise Exception(f"Error sending 'unable to check notifications' message to user id {user['id']} ({user['object']}):", e2) |
|
|
|
await asyncio.sleep(config.delay_amounts["per_user_startup"]) |
|
|
|
|
|
helpers.log(f"{len(user_data)} user(s) on startup") |
|
await bot.change_presence( |
|
status=discord.Status.online, |
|
activity=discord.Game(name="with the Flat.io API") |
|
) |
|
|
|
|
|
helpers.log("Starting check_notifs_loop...") |
|
try: |
|
check_notifs_loop.start() |
|
except RuntimeError as e: |
|
helpers.log("Error starting check_notifs_loop:", e) |
|
|
|
@bot.event |
|
async def on_message(message: discord.Message) -> None: |
|
"""Handle registration, then pass to command handlers.""" |
|
message_content = message.content.split() |
|
|
|
|
|
if not message_content or message_content[0] != "!flatnotifs" or message.author.id == bot.user.id: |
|
return |
|
|
|
|
|
user = get_user(message) |
|
if not user: |
|
if len(message_content) < 3 or not (message_content[1] == "getstarted"): |
|
|
|
await message.channel.send(config.welcome_msg) |
|
return |
|
if not isinstance(message.channel, discord.DMChannel): |
|
await message.channel.send("`!flatnotifs getstarted` must be used in DMs!") |
|
return |
|
api_key = message_content[2] |
|
await register_user(api_key, message) |
|
return |
|
|
|
|
|
if len(message_content) < 2: |
|
await message.channel.send( |
|
"Hello to you too! <3\n" |
|
"(!flatnotifs is not a valid command by itself; use `!flatnotifs help` for a list of valid commands.)" |
|
) |
|
else: |
|
await bot.process_commands(message) |
|
|
|
@bot.event |
|
async def on_command_error(ctx: commands.Context, error: discord.ext.commands.errors.CommandError): |
|
"""Handle command errors.""" |
|
if isinstance(error, discord.ext.commands.errors.CommandNotFound): |
|
await ctx.send( |
|
"Whoops! That command was invalid.\n" |
|
"(Use `!flatnotifs help` for a list of valid commands. Make sure the command is spelled correctly!)" |
|
) |
|
else: |
|
helpers.log(f"There was an unknown command error for user id {ctx.author.id} ({ctx.author}):", error) |
|
|
|
|
|
"""<-- COMMAND HANDLERS -->""" |
|
|
|
@bot.command(description="Add a rule.") |
|
@is_registered() |
|
async def addrule(ctx: commands.Context, include_exclude: str | None = None, category: str | None = None, *input_values: str) -> None: |
|
if not include_exclude: |
|
await ctx.send( |
|
"Please try again and provide include/exclude and a category and value in this format: " |
|
"`!flatnotifs addrule include/exclude category value` (include/exclude was missing)" |
|
) |
|
return |
|
if not category or not input_values: |
|
await ctx.send( |
|
"Please try again and provide include/exclude and a category and value in this format: " |
|
"`!flatnotifs addrule include/exclude category value` (category or value was missing)" |
|
) |
|
return |
|
|
|
global user_data_changed |
|
user = get_user(ctx) |
|
api_key = fernet.decrypt(user["api_key"].encode()).decode() |
|
|
|
for input_value in input_values: |
|
if category == "actor.username": |
|
try: |
|
input_value_id = await convert_identifier(input_value, "id", api_key) |
|
except ValueError as e: |
|
await ctx.send( |
|
f"User not found with username {helpers.esc_md(input_value)}. " |
|
"Please try again and provide a valid username." |
|
) |
|
return |
|
except Exception as e: |
|
await ctx.send( |
|
"Uh oh, there was an error during addrule. Please try again later " |
|
"(if it doesn't resolve on its own soon, please join the bot's " |
|
f"[Discord server](<{config.discord_url}>) and report the bug!)" |
|
) |
|
return |
|
else: |
|
input_value_id = input_value |
|
|
|
if include_exclude == "include": |
|
temp = "+"+input_value_id |
|
elif include_exclude == "exclude": |
|
temp = "-"+input_value_id |
|
else: |
|
await ctx.send( |
|
"Please try again and provide include/exclude and a category and value in this format: " |
|
"`!flatnotifs addrule include/exclude category value` (first argument was not include or exclude)" |
|
) |
|
return |
|
|
|
if category in user["important"]: |
|
if category == "actor.username": |
|
user["important"][category][temp] = input_value |
|
else: |
|
user["important"][category].append(temp) |
|
|
|
await ctx.send(f"Rule {helpers.esc_md(category)}: {helpers.esc_md(input_value)} added") |
|
user_data_changed = True |
|
else: |
|
await ctx.send(f"Category {helpers.esc_md(category)} not found") |
|
|
|
@bot.command(description="Remove a rule.") |
|
@is_registered() |
|
async def removerule(ctx: commands.Context, *input_values: str) -> None: |
|
if not input_values: |
|
await ctx.send("Please try again and provide a value in this format: `!flatnotifs removerule value`") |
|
return |
|
|
|
global user_data_changed |
|
user = get_user(ctx) |
|
api_key = fernet.decrypt(user["api_key"].encode()).decode() |
|
|
|
for input_value in input_values: |
|
found = False |
|
for category, values in user["important"].items(): |
|
if category == "actor.username": |
|
try: |
|
input_value_id = dict(zip(values.values(), values.keys()))[input_value][1:] |
|
except KeyError as e: |
|
try: |
|
input_value_id = await convert_identifier(input_value, "id", api_key) |
|
except ValueError as e: |
|
input_value_id = input_value |
|
except Exception as e: |
|
await ctx.send( |
|
"Uh oh, there was an error during removerule. Please try again later " |
|
"(if it doesn't resolve on its own soon, please join the bot's " |
|
f"[Discord server](<{config.discord_url}>) and report the bug!)" |
|
) |
|
return |
|
else: |
|
input_value_id = input_value |
|
|
|
for v in [("+"+input_value_id), ("-"+input_value_id)]: |
|
if v in values: |
|
if category == "actor.username": |
|
values.pop(v) |
|
else: |
|
values.remove(v) |
|
|
|
found = True |
|
await ctx.send(f"Rule {helpers.esc_md(input_value)} removed from {helpers.esc_md(category)}") |
|
user_data_changed = True |
|
break |
|
|
|
if not found: |
|
await ctx.send(f"Rule {helpers.esc_md(input_value)} not found") |
|
|
|
@bot.command(description="Activate/deactivate overriding of all rules.") |
|
@is_registered() |
|
async def override(ctx: commands.Context) -> None: |
|
global user_data_changed |
|
user = get_user(ctx) |
|
if user["override"]: |
|
await ctx.send( |
|
"Override disabled (You will now only be notified of notifications that " |
|
"match your specified filters. Re-enable by using `!flatnotifs override`)" |
|
) |
|
else: |
|
await ctx.send( |
|
"Override enabled (You will now be notified of all notifications. " |
|
"Disable by using `!flatnotifs override`)" |
|
) |
|
user["override"] = not user["override"] |
|
user_data_changed = True |
|
|
|
@bot.command(description="Pause/unpause notifications.") |
|
@is_registered() |
|
async def pause(ctx: commands.Context) -> None: |
|
global user_data_changed |
|
user = get_user(ctx) |
|
if not user["paused"]: |
|
await ctx.send("Notifications paused (You will not be notified of any notifications. Unpause by using `!flatnotifs pause`)") |
|
user["paused"] = True |
|
user_data_changed = True |
|
else: |
|
try: |
|
api_key = fernet.decrypt(user["api_key"].encode()).decode() |
|
elements = await aiohttp_manager.read_api(config.notif_api_url, api_key) |
|
user["processed_ids"] = deque(reversed([element['id'] for element in elements]), maxlen=config.notif_cache_length) |
|
user["paused"] = False |
|
user_data_changed = True |
|
await ctx.send("Notifications unpaused (You will now resume being notified of notifications. Pause by using `!flatnotifs pause`)") |
|
except Exception as e: |
|
try: |
|
helpers.log(f"Unable to check notifications for user id {user['id']} ({user['object']}):", e) |
|
user["paused"] = True |
|
user_data_changed = True |
|
await ctx.send(config.check_err_msg) |
|
except Exception as e2: |
|
raise Exception( |
|
"Error sending 'unable to check notifications on unpause' " |
|
f"message to user id {user['id']} ({user['object']}):", e2 |
|
) |
|
|
|
@bot.command(description="Change your notification send channel to current channel.") |
|
@is_registered() |
|
async def sendhere(ctx: commands.Context, mention_flag: str | None = None) -> None: |
|
global user_data_changed |
|
user = get_user(ctx) |
|
|
|
if user["sendhere"]["bool"]: |
|
user["sendhere"]["bool"] = False |
|
await ctx.send("Successfully changed your notification channel back to default (your DMs)") |
|
user_data_changed = True |
|
else: |
|
if isinstance(ctx.channel, discord.DMChannel): |
|
await ctx.send("sendhere can only be set in non-DM channels.") |
|
return |
|
if not mention_flag or (mention_flag.lower() not in {"mention", "nomention"}): |
|
await ctx.send("Please try again and provide mention or nomention in this format: `!flatnotifs sendhere mention/nomention`") |
|
return |
|
await ctx.send( |
|
"Are you sure you want to switch your notification send channel to here? (Y/N)\n" |
|
"If this is a public channel, that means anyone can see the notifications that the bot sends you." |
|
) |
|
|
|
def check(m: discord.Message) -> bool: |
|
"""Function to check that it's still the same user in the same channel""" |
|
return m.author.id == ctx.author.id and m.channel.id == ctx.channel.id |
|
|
|
try: |
|
msg = await bot.wait_for("message", check=check, timeout=30) |
|
except Exception as e: |
|
helpers.log("Sendhere timeout:", e) |
|
await ctx.send("Cancelling sendhere (no response for 30 sec)") |
|
return |
|
else: |
|
if msg.content.upper() == "Y": |
|
try: |
|
user["sendhere"]["channel_id"] = ctx.channel.id |
|
user["channel"] = ctx.channel |
|
user["sendhere"]["mention"] = (mention_flag.lower() == "mention") |
|
await user["channel"].send( |
|
"Successfully changed your notification channel to this channel. " |
|
"You can disable this at any time using !flatnotifs sendhere" |
|
) |
|
user["sendhere"]["bool"] = True |
|
user_data_changed = True |
|
except Exception as e: |
|
helpers.log(f"Error setting sendhere for user id {user['id']} ({user['object']}):", e) |
|
await ctx.send( |
|
"Uh oh, there was an error during sendhere. Please try again later " |
|
"(if it doesn't resolve on its own soon, please join the bot's " |
|
f"[Discord server](<{config.discord_url}>) and report the bug!)" |
|
) |
|
else: |
|
await ctx.send("Cancelling sendhere (received a response other than 'Y')") |
|
|
|
@bot.command(description="Unregister, permanently deleting your rules and API key from the bot.") |
|
@is_registered() |
|
async def unregister(ctx: commands.Context) -> None: |
|
global user_data_changed |
|
user = get_user(ctx) |
|
await ctx.send( |
|
"Are you sure you want to unregister? (Y/N)\n" |
|
"Unregistering means that you will lose any rules that you have set and will no longer receive notifications from this bot. " |
|
"Only unregister if you don't want to receive notifications from this bot anymore or if you need to change your personal token." |
|
) |
|
|
|
def check(m: discord.Message) -> bool: |
|
"""Function to check that it's still the same user in the same channel""" |
|
return m.author.id == ctx.author.id and m.channel.id == ctx.channel.id |
|
|
|
try: |
|
msg = await bot.wait_for("message", check=check, timeout=30) |
|
except Exception as e: |
|
helpers.log("Unregister timeout:", e) |
|
await ctx.send("Cancelling unregister (no response for 30 sec)") |
|
return |
|
else: |
|
if msg.content.upper() == "Y": |
|
try: |
|
user_data.remove(user) |
|
await ctx.send("Successfully unregistered. You can re-register by using the command !flatnotifs getstarted") |
|
user_data_changed = True |
|
except Exception as e: |
|
helpers.log(f"Error unregistering for user id {user['id']} ({user['object']}):", e) |
|
await ctx.send( |
|
"Uh oh, there was an error during unregister. Please try again later " |
|
"(if it doesn't resolve on its own soon, please join the bot's " |
|
f"[Discord server](<{config.discord_url}>) and report the bug!)" |
|
) |
|
else: |
|
await ctx.send("Cancelling unregister (received a response other than 'Y')") |
|
|
|
@bot.command(description="Update your personal token.") |
|
@is_registered() |
|
async def updatetoken(ctx: commands.Context, api_key: str | None = None) -> None: |
|
if not isinstance(ctx.channel, discord.DMChannel): |
|
await ctx.send("`!flatnotifs updatetoken` must be used in DMs!") |
|
return |
|
if api_key is None: |
|
await ctx.send("Please provide your new personal token.") |
|
return |
|
|
|
global user_data_changed |
|
user = get_user(ctx) |
|
await ctx.send( |
|
"Are you sure you want to update your personal token? (Y/N)\n" |
|
"Updating your personal token will invalidate your old token. " |
|
"Make sure your new personal token is valid!" |
|
) |
|
|
|
def check(m: discord.Message) -> bool: |
|
"""Function to check that it's still the same user in the same channel""" |
|
return m.author.id == ctx.author.id and m.channel.id == ctx.channel.id |
|
|
|
try: |
|
msg = await bot.wait_for("message", check=check, timeout=30) |
|
except Exception as e: |
|
helpers.log("Updatetoken timeout:", e) |
|
await ctx.send("Cancelling updatetoken (no response for 30 sec)") |
|
return |
|
else: |
|
if msg.content.upper() == "Y": |
|
try: |
|
elements = await aiohttp_manager.read_api(config.notif_api_url, api_key) |
|
except APIRequestError as e: |
|
helpers.log(f"Edge case http code handler during updatetoken of user id {ctx.author.id} ({ctx.author}):", e) |
|
await ctx.send( |
|
"Uh oh, there was an error during updatetoken. Please try again later " |
|
"(if it doesn't resolve on its own soon, please join the bot's " |
|
f"[Discord server](<{config.discord_url}>) and report the bug!)" |
|
) |
|
return |
|
|
|
if elements: |
|
user["api_key"] = fernet.encrypt(api_key.encode()).decode() |
|
user["processed_ids"] = deque(reversed([element['id'] for element in elements]), maxlen=config.notif_cache_length) |
|
await ctx.send("Successfully updated your personal token!") |
|
user_data_changed = True |
|
helpers.log(f"User id {user['id']} ({user['object']}) updated token, newest element on startup is ID-{elements[0]['id']}") |
|
else: |
|
await ctx.send( |
|
"Please try again and provide a valid personal token " |
|
"(double check that the token is still valid and has the notifications.readonly scope!)" |
|
) |
|
else: |
|
await ctx.send("Cancelling updatetoken (received a response other than 'Y')") |
|
|
|
@bot.command(description="Show all rules that you have set.") |
|
@is_registered() |
|
async def rules(ctx: commands.Context) -> None: |
|
global user_data_changed |
|
user = get_user(ctx) |
|
important = { |
|
key: [user_id[0]+user_name for user_id, user_name in values.items()] if key == "actor.username" else values |
|
for key, values in user["important"].items() |
|
} |
|
|
|
await ctx.author.send( |
|
f"Rules: {helpers.esc_md(json.dumps(important, indent=4))}" |
|
f"{chr(10)+'Override is currently enabled (disable by using !flatnotifs override)' if user['override'] else ''}" |
|
f"{chr(10)+'Notifications are currently paused (unpause by using !flatnotifs pause)' if user['paused'] else ''}" |
|
) |
|
|
|
@bot.command(description="Show the version of the bot.") |
|
@is_registered() |
|
async def version(ctx: commands.Context) -> None: |
|
await ctx.send(config.version_msg) |
|
|
|
@bot.command(description="Show all valid commands.") |
|
@is_registered() |
|
async def help(ctx: commands.Context) -> None: |
|
|
|
await ctx.send(config.help_msg[0]) |
|
await ctx.send(config.help_msg[1]) |
|
|
|
@bot.command(description="Sync the command tree.") |
|
@commands.is_owner() |
|
async def sync(ctx: commands.Context) -> None: |
|
synced = await bot.tree.sync() |
|
await ctx.send(f"Synced {len(synced)} command(s)") |
|
|
|
@bot.hybrid_command(description="Hello world!") |
|
async def hello(ctx: commands.Context) -> None: |
|
await ctx.send("Hello world!") |
|
|
|
|
|
await bot.start(bot_token) |
|
|
|
if __name__ == "__main__": |
|
|
|
keepalive.run() |
|
asyncio.run(main()) |
|
|
|
|
|
asyncio.run(aiohttp_manager.close_session()) |
|
|