mirror of
https://github.com/PretendoNetwork/Yamamura.git
synced 2025-04-02 11:01:56 -04:00
264 lines
No EOL
8.5 KiB
Python
264 lines
No EOL
8.5 KiB
Python
#
|
|
# core/db.py - database utilities
|
|
#
|
|
# author: superwhiskers
|
|
# license: gplv3
|
|
#
|
|
# import things
|
|
import core.utils as utils # import the utility module
|
|
import dataset # modmail has never looked this good
|
|
|
|
|
|
#
|
|
# TODO: finish tags class
|
|
#
|
|
# the modmail class, should make programming easier for
|
|
# you devs :)
|
|
class Modmail:
|
|
"""
|
|
a class that makes per-server modmail possible, and easier
|
|
"""
|
|
|
|
# initiation function
|
|
def __init__(self, server_object):
|
|
"""
|
|
initiates the modmail class
|
|
:argument server_object: a discord.Guild
|
|
"""
|
|
# set some variables to save these
|
|
self.db = dataset.connect(
|
|
f"servers/{ server_object.id }/server.db" # open a connection to the server database
|
|
)
|
|
self.settings = utils.KeyValueTable(self.db["settings"]) # get the settings table as a key-value store
|
|
self.modmail = self.db["modmail"] # get the modmail table
|
|
self.log = utils.Logger(
|
|
f"db::mail ({ server_object.id }/{ server_object.name })",
|
|
f"servers/{ server_object.id }/db_modmail.log",
|
|
)
|
|
# notify the sysadmin of the connection
|
|
self.log(f"connected to { server_object.id }/{ server_object.name }")
|
|
# display the current number of stored message
|
|
self.log(f"current number of messages in database: { len(self.db['mail']) }")
|
|
# finish it...
|
|
self.log(f"finished loading...")
|
|
|
|
# parse modmail to text
|
|
# this function doesn't change much
|
|
@staticmethod
|
|
def parse(mail):
|
|
"""
|
|
parses a mail dict. intended to be used by the modmail class but may have other uses.
|
|
:argument mail: a mail dict
|
|
"""
|
|
# parse a list of mail
|
|
if isinstance(mail, list):
|
|
# the string to send to Discord
|
|
return_string = f"""mail from { self.server.id }/{ self.server.name }:
|
|
"""
|
|
# loop through the mail list
|
|
for x in range(0, len(mail)):
|
|
# append the parsed message to the string
|
|
return_string += f"""
|
|
{ mail[x]["id"] } - sent by { mail["sender_id"] }/{ mail["sender_name"] }```
|
|
{ mail[x]["message"] }```
|
|
"""
|
|
# return the string
|
|
return return_string
|
|
|
|
# parse only a single message
|
|
else:
|
|
# the string to send to Discord
|
|
return_string = f"""
|
|
{ mail["id"] } - sent by { mail["sender_id"] }/{ mail["sender_name"] }```
|
|
{ mail["message"] }```
|
|
|
|
"""
|
|
# return the parsed message
|
|
return return_string
|
|
|
|
# get the current modmail
|
|
def read(self, mod):
|
|
"""
|
|
returns a string of mail ready for sending to discord
|
|
:argument mod: a discord.Member of a mod
|
|
:returns: a string of mail ready to send to discord
|
|
"""
|
|
# tell the sysadmin that we're reading mail for a mod
|
|
self.log(f"reading mail for { mod.id }/{ mod.name }")
|
|
# the unread mail list
|
|
unread = []
|
|
# add all of the unread mail
|
|
for mail in self.modmail:
|
|
try:
|
|
mail["readBy"].index(mod.id)
|
|
except ValueError:
|
|
unread.append(mail)
|
|
mail["readBy"].append(mod.id)
|
|
# if no mail is unread
|
|
if not unread:
|
|
return None
|
|
|
|
# return the unread mail
|
|
return self.parse(unread)
|
|
|
|
# send modmail
|
|
def send(self, msg, sender):
|
|
"""
|
|
send mail to mods
|
|
:argument msg: a string to send as a message
|
|
:argument sender: a discord.Member for the sender
|
|
"""
|
|
# tell the sysadmin that someone is sending mail
|
|
self.log(f"{ sender.id }/{ sender.name } is sending modmail")
|
|
# construct the message
|
|
mail_to_send = dict(
|
|
id=utils.whiskerflake(), sender_id=sender.id, sender_name=sender.name, message=msg, readBy=[]
|
|
)
|
|
# send the message
|
|
self.modmail.insert(mail_to_send)
|
|
# tell the sysadmin the current number of messages in the db
|
|
self.log(f"{ sender.id }/{ sender.name }'s mail has been sent")
|
|
self.log(f"current number of messages: { len(self.db['mail']) }")
|
|
# exit the function
|
|
return
|
|
|
|
# clean mail
|
|
def clean(self):
|
|
"""
|
|
wipe all mail that has been read by all of the mods. takes no arguments
|
|
"""
|
|
# tell the sysadmin that the mail is being cleaned
|
|
self.log(f"cleaning mail...")
|
|
# list of moderator's ids
|
|
mods = self.settings["mod_ids"]
|
|
# indexes of mail to delete
|
|
indexes_to_delete = []
|
|
# check if all the mods have read the mail
|
|
for mail in self.modmail:
|
|
mail_read = True
|
|
for y in range(0, len(mods)):
|
|
try:
|
|
mail["readBy"].index(mods[y])
|
|
except ValueError:
|
|
mail_read = False
|
|
if mail_read:
|
|
indexes_to_delete.append(self.modmail.find(id=mail["id"]))
|
|
# then clean from the mail list the indexes to delete
|
|
for x in range(0, len(indexes_to_delete)):
|
|
self.modmail.delete(id=x)
|
|
# return with nothing
|
|
return
|
|
|
|
# delete a specific message
|
|
def delete(self, sid):
|
|
"""
|
|
deletes a specific message by it's identifier. takes one argument, an identifier. returns none if not found,
|
|
true if it was
|
|
"""
|
|
# tell the sysadmin that mail is being deleted
|
|
self.log(f"searching for mail id { sid }...")
|
|
# search the mail for a specific id
|
|
if self.modmail.find_one(id=sid):
|
|
self.modmail.delete(id=mail["id"])
|
|
return True
|
|
# if we could not find this, then tell the sysadmin this too
|
|
self.log(f"could not find mail id { sid }...")
|
|
# then return none because nothing was found
|
|
return None
|
|
|
|
# shows all mail
|
|
def list(self):
|
|
"""
|
|
lists all mail in the database. returns a parsed version of all of the mail
|
|
"""
|
|
# tell the sysadmin **everything**
|
|
self.log(f"listing all mail to someone...")
|
|
# this might happen
|
|
if not self.modmail.all():
|
|
return None
|
|
# the mail
|
|
return self.parse(self.modmail.all())
|
|
|
|
# read a specific message
|
|
def read_single(self, sid):
|
|
"""
|
|
read a specific message. takes one argument, an identifier for a message. returns none if not found, otherwise,
|
|
it returns the message
|
|
"""
|
|
# might be useful for statistics or something
|
|
self.log(f"searching for mail id { sid }...")
|
|
# find the message
|
|
message = self.modmail.find_one(id=sid)
|
|
# check if we found it
|
|
if message:
|
|
# return the message
|
|
return self.parse(message)
|
|
# debugging???
|
|
self.log(f" could not find mail id { sid }...")
|
|
# return None if not found
|
|
return None
|
|
|
|
|
|
"""
|
|
# tag class for Netux's tags
|
|
class Tags:
|
|
""
|
|
class for handling the tags system
|
|
""
|
|
|
|
def __init__(self, serverInfo):
|
|
""
|
|
initiates the tags class. takes one argument, a serverinfo dict (doc.py #1)
|
|
""
|
|
# set some variables to save these
|
|
self.server = serverInfo # server info, from a YAML file
|
|
self.db = dataset.connect(
|
|
self.server["dbPath"] # the sqlite3 database object for the server
|
|
)
|
|
self.log = utils.Logger(
|
|
f"db::tags ({ self.server['dbName'] })",
|
|
f"{ self.server['rootDir'] }/db_tags.log",
|
|
)
|
|
# notify the sysadmin of the connection
|
|
self.log(f"connected to { self.server['dbName'] }")
|
|
# finish it...
|
|
self.log(f"finished loading...")
|
|
|
|
# get the tags, all of them
|
|
@staticmethod
|
|
def get_tags():
|
|
with open("tags.json", "r") as tags_file:
|
|
return json.load(tags_file)
|
|
|
|
# get a specific tag by name
|
|
def get_tag(tag_name):
|
|
tags = get_tags()
|
|
for name, tag in tags.items():
|
|
if name == tag_name:
|
|
return tag
|
|
|
|
return None
|
|
|
|
# create a tag
|
|
def create_tag(name, content):
|
|
tags = get_tags()
|
|
tags[
|
|
name
|
|
] = {
|
|
# support for future additions
|
|
'content': content
|
|
}
|
|
save_tags(tags)
|
|
|
|
# delete a tag
|
|
def delete_tag(name):
|
|
tags = get_tags()
|
|
del tags[name]
|
|
save_tags(tags)
|
|
|
|
def save_tags(tags):
|
|
with open("tags.json", "w") as tags_file:
|
|
tags_file.seek(0)
|
|
tags_file.write(json.dumps(tags))
|
|
tags_file.truncate()
|
|
""" |