Compare commits

..

No commits in common. "56cb94e1a098158aacc28e77a398b52e1e34f236" and "afd0a0d0a3e3e3292433e7922a7bdfeda5f5733d" have entirely different histories.

11 changed files with 151 additions and 147 deletions

View File

@ -6,7 +6,7 @@ from tlapbot.db import get_db
from tlapbot.owncast_requests import is_stream_live, give_points_to_chat from tlapbot.owncast_requests import is_stream_live, give_points_to_chat
def create_app(test_config: None = None) -> Flask: def create_app(test_config=None):
app = Flask(__name__, instance_relative_config=True) app = Flask(__name__, instance_relative_config=True)
# ensure the instance folder exists # ensure the instance folder exists
@ -59,7 +59,7 @@ def create_app(test_config: None = None) -> Flask:
app.cli.add_command(db.hard_reset_milestone_command) app.cli.add_command(db.hard_reset_milestone_command)
# scheduler job for giving points to users # scheduler job for giving points to users
def proxy_job() -> None: def proxy_job():
with app.app_context(): with app.app_context():
if is_stream_live(): if is_stream_live():
app.logger.info("Stream is LIVE. Giving points to chat.") app.logger.info("Stream is LIVE. Giving points to chat.")

View File

@ -1,13 +1,13 @@
import sqlite3 import sqlite3
import click import click
from flask import current_app, g, Flask from flask import current_app, g
from flask.cli import with_appcontext from flask.cli import with_appcontext
from tlapbot.redeems import milestone_complete from tlapbot.redeems import milestone_complete
def get_db() -> sqlite3.Connection: def get_db():
if 'db' not in g: if 'db' not in g:
g.db = sqlite3.connect( g.db = sqlite3.connect(
current_app.config['DATABASE'], current_app.config['DATABASE'],
@ -18,14 +18,14 @@ def get_db() -> sqlite3.Connection:
return g.db return g.db
def close_db() -> None: def close_db(e=None):
db: sqlite3.Connection = g.pop('db', None) db = g.pop('db', None)
if db is not None: if db is not None:
db.close() db.close()
def insert_counters(db: sqlite3.Connection) -> bool: def insert_counters(db):
for redeem, redeem_info in current_app.config['REDEEMS'].items(): for redeem, redeem_info in current_app.config['REDEEMS'].items():
if redeem_info["type"] == "counter": if redeem_info["type"] == "counter":
try: try:
@ -40,16 +40,17 @@ def insert_counters(db: sqlite3.Connection) -> bool:
return True return True
def init_db() -> bool: def init_db():
db = get_db() db = get_db()
with current_app.open_resource('schema.sql') as f: with current_app.open_resource('schema.sql') as f:
db.executescript(f.read().decode('utf8')) db.executescript(f.read().decode('utf8'))
return insert_counters(db) if insert_counters(db):
return True
def clear_redeem_queue() -> bool: def clear_redeem_queue():
db = get_db() db = get_db()
try: try:
@ -61,24 +62,25 @@ def clear_redeem_queue() -> bool:
) )
db.commit() db.commit()
except sqlite3.Error as e: except sqlite3.Error as e:
print("Error occurred deleting redeem queue:", e.args[0]) print("Error occured deleting redeem queue:", e.args[0])
return False return False
return True return True
def refresh_counters() -> bool: def refresh_counters():
db = get_db() db = get_db()
try: try:
db.execute("DELETE FROM counters") db.execute("DELETE FROM counters")
db.commit() db.commit()
except sqlite3.Error as e: except sqlite3.Error as e:
print("Error occurred deleting old counters:", e.args[0]) print("Error occured deleting old counters:", e.args[0])
return False return False
return insert_counters(db) if insert_counters(db):
return True
def refresh_milestones() -> bool: def refresh_milestones():
db = get_db() db = get_db()
# delete old milestones # delete old milestones
try: try:
@ -109,7 +111,7 @@ def refresh_milestones() -> bool:
result = cursor.fetchone() result = cursor.fetchone()
if result is None: if result is None:
cursor.execute( cursor.execute(
"INSERT INTO milestones(name, progress, goal) VALUES(?, 0, ?)", "INSERT INTO milestones(name, progress, goal, complete) VALUES(?, 0, ?, FALSE)",
(redeem, redeem_info['goal']) (redeem, redeem_info['goal'])
) )
# update existing milestone to new goal # update existing milestone to new goal
@ -125,7 +127,7 @@ def refresh_milestones() -> bool:
return True return True
def reset_milestone(milestone: str) -> bool: def reset_milestone(milestone):
if milestone not in current_app.config['REDEEMS']: if milestone not in current_app.config['REDEEMS']:
print(f"Failed resetting milestone, {milestone} not in redeems file.") print(f"Failed resetting milestone, {milestone} not in redeems file.")
return False return False
@ -136,19 +138,19 @@ def reset_milestone(milestone: str) -> bool:
(milestone,) (milestone,)
) )
db.execute( db.execute(
"INSERT INTO milestones(name, progress, goal) VALUES(?, ?, ?)", "INSERT INTO milestones(name, progress, goal, complete) VALUES(?, ?, ?, FALSE)",
(milestone, 0, current_app.config['REDEEMS'][milestone]['goal']) (milestone, 0, current_app.config['REDEEMS'][milestone]['goal'])
) )
db.commit() db.commit()
return True return True
except sqlite3.Error as e: except sqlite3.Error as e:
current_app.logger.error(f"Error occurred adding a milestone: {e.args[0]}") current_app.logger.error(f"Error occured adding a milestone: {e.args[0]}")
return False return False
@click.command('init-db') @click.command('init-db')
@with_appcontext @with_appcontext
def init_db_command() -> None: def init_db_command():
"""Clear the existing data and create new tables.""" """Clear the existing data and create new tables."""
if init_db(): if init_db():
click.echo('Initialized the database.') click.echo('Initialized the database.')
@ -156,7 +158,7 @@ def init_db_command() -> None:
@click.command('clear-queue') @click.command('clear-queue')
@with_appcontext @with_appcontext
def clear_queue_command() -> None: def clear_queue_command():
"""Remove all redeems from the redeem queue.""" """Remove all redeems from the redeem queue."""
if clear_redeem_queue(): if clear_redeem_queue():
click.echo('Cleared redeem queue.') click.echo('Cleared redeem queue.')
@ -164,7 +166,7 @@ def clear_queue_command() -> None:
@click.command('refresh-counters') @click.command('refresh-counters')
@with_appcontext @with_appcontext
def refresh_counters_command() -> None: def refresh_counters_command():
"""Refresh counters from current config file. """Refresh counters from current config file.
(Remove old ones, add new ones.)""" (Remove old ones, add new ones.)"""
if refresh_counters(): if refresh_counters():
@ -173,7 +175,7 @@ def refresh_counters_command() -> None:
@click.command('clear-refresh') @click.command('clear-refresh')
@with_appcontext @with_appcontext
def refresh_and_clear_command() -> None: def refresh_and_clear_command():
"""Refresh counters and clear queue.""" """Refresh counters and clear queue."""
if refresh_counters() and clear_redeem_queue(): if refresh_counters() and clear_redeem_queue():
click.echo('Counters refreshed and queue cleared.') click.echo('Counters refreshed and queue cleared.')
@ -181,7 +183,7 @@ def refresh_and_clear_command() -> None:
@click.command('refresh-milestones') @click.command('refresh-milestones')
@with_appcontext @with_appcontext
def refresh_milestones_command() -> None: def refresh_milestones_command():
"""Initialize all milestones from the redeems file, """Initialize all milestones from the redeems file,
delete milestones not in redeem file.""" delete milestones not in redeem file."""
if refresh_milestones(): if refresh_milestones():
@ -190,7 +192,7 @@ def refresh_milestones_command() -> None:
@click.command('reset-milestone') @click.command('reset-milestone')
@click.argument('milestone') @click.argument('milestone')
def reset_milestone_command(milestone: str) -> None: def reset_milestone_command(milestone):
"""Resets a completed milestone back to zero.""" """Resets a completed milestone back to zero."""
if milestone_complete(get_db(), milestone): if milestone_complete(get_db(), milestone):
if reset_milestone(milestone): if reset_milestone(milestone):
@ -202,12 +204,12 @@ def reset_milestone_command(milestone: str) -> None:
@click.command('hard-reset-milestone') @click.command('hard-reset-milestone')
@click.argument('milestone') @click.argument('milestone')
def hard_reset_milestone_command(milestone: str) -> None: def hard_reset_milestone_command(milestone):
"""Resets any milestone back to zero.""" """Resets any milestone back to zero."""
if reset_milestone(milestone): if reset_milestone(milestone):
click.echo(f"Hard reset milestone {milestone}.") click.echo(f"Hard reset milestone {milestone}.")
def init_app(app: Flask) -> None: def init_app(app):
app.teardown_appcontext(close_db) app.teardown_appcontext(close_db)
app.cli.add_command(init_db_command) app.cli.add_command(init_db_command)

View File

@ -2,7 +2,7 @@ from flask import current_app
from tlapbot.owncast_requests import send_chat from tlapbot.owncast_requests import send_chat
def send_help() -> None: def send_help():
message = [] message = []
message.append("Tlapbot gives you points for being in chat, and then allows you to spend those points. <br>") message.append("Tlapbot gives you points for being in chat, and then allows you to spend those points. <br>")
message.append(f"People connected to chat receive {current_app.config['POINTS_AMOUNT_GIVEN']} points every {current_app.config['POINTS_CYCLE_TIME']} seconds. <br>") message.append(f"People connected to chat receive {current_app.config['POINTS_AMOUNT_GIVEN']} points every {current_app.config['POINTS_CYCLE_TIME']} seconds. <br>")

View File

@ -1,12 +1,11 @@
from flask import current_app from flask import current_app
from sqlite3 import Error, Connection from sqlite3 import Error
from re import sub from re import sub
from typing import Tuple
# # # db stuff # # # # # # db stuff # # #
def read_users_points(db: Connection, user_id: str) -> int | None: def read_users_points(db, user_id):
"""Returns None and logs error in case of error, or if user doesn't exist.""" """Errors out if user doesn't exist."""
try: try:
cursor = db.execute( cursor = db.execute(
"SELECT points FROM points WHERE id = ?", "SELECT points FROM points WHERE id = ?",
@ -14,12 +13,11 @@ def read_users_points(db: Connection, user_id: str) -> int | None:
) )
return cursor.fetchone()[0] return cursor.fetchone()[0]
except Error as e: except Error as e:
current_app.logger.error(f"Error occurred reading points: {e.args[0]}") current_app.logger.error(f"Error occured reading points: {e.args[0]}")
current_app.logger.error(f"Of user: {user_id}") current_app.logger.error(f"To user: {user_id}")
def read_all_users_with_username(db: Connection, username: str) -> list[Tuple[str, int]] | None: def read_all_users_with_username(db, username):
"""Returns None only if Error was logged."""
try: try:
cursor = db.execute( cursor = db.execute(
"SELECT name, points FROM points WHERE name = ?", "SELECT name, points FROM points WHERE name = ?",
@ -28,11 +26,11 @@ def read_all_users_with_username(db: Connection, username: str) -> list[Tuple[st
users = cursor.fetchall() users = cursor.fetchall()
return users return users
except Error as e: except Error as e:
current_app.logger.error(f"Error occurred reading points by username: {e.args[0]}") current_app.logger.error(f"Error occured reading points by username: {e.args[0]}")
current_app.logger.error(f"Of everyone with username: {username}") current_app.logger.error(f"To everyone with username: {username}")
def give_points_to_user(db: Connection, user_id: str, points: int) -> None: def give_points_to_user(db, user_id, points):
try: try:
db.execute( db.execute(
"UPDATE points SET points = points + ? WHERE id = ?", "UPDATE points SET points = points + ? WHERE id = ?",
@ -40,11 +38,11 @@ def give_points_to_user(db: Connection, user_id: str, points: int) -> None:
) )
db.commit() db.commit()
except Error as e: except Error as e:
current_app.logger.error(f"Error occurred giving points: {e.args[0]}") current_app.logger.error(f"Error occured giving points: {e.args[0]}")
current_app.logger.error(f"To user: {user_id} amount of points: {points}") current_app.logger.error(f"To user: {user_id} amount of points: {points}")
def use_points(db: Connection, user_id: str, points: int) -> bool: def use_points(db, user_id, points):
try: try:
db.execute( db.execute(
"UPDATE points SET points = points - ? WHERE id = ?", "UPDATE points SET points = points - ? WHERE id = ?",
@ -53,13 +51,12 @@ def use_points(db: Connection, user_id: str, points: int) -> bool:
db.commit() db.commit()
return True return True
except Error as e: except Error as e:
current_app.logger.error(f"Error occurred using points: {e.args[0]}") current_app.logger.error(f"Error occured using points: {e.args[0]}")
current_app.logger.error(f"From user: {user_id} amount of points: {points}") current_app.logger.error(f"From user: {user_id} amount of points: {points}")
return False return False
def user_exists(db: Connection, user_id: str) -> bool | None: def user_exists(db, user_id):
"""Returns None only if an error was logged."""
try: try:
cursor = db.execute( cursor = db.execute(
"SELECT points FROM points WHERE id = ?", "SELECT points FROM points WHERE id = ?",
@ -69,11 +66,11 @@ def user_exists(db: Connection, user_id: str) -> bool | None:
return False return False
return True return True
except Error as e: except Error as e:
current_app.logger.error(f"Error occurred checking if user exists: {e.args[0]}") current_app.logger.error(f"Error occured checking if user exists: {e.args[0]}")
current_app.logger.error(f"To user: {user_id}") current_app.logger.error(f"To user: {user_id}")
def add_user_to_database(db: Connection, user_id: str, display_name: str) -> None: def add_user_to_database(db, user_id, display_name):
""" Adds a new user to the database. Does nothing if user is already in.""" """ Adds a new user to the database. Does nothing if user is already in."""
try: try:
cursor = db.execute( cursor = db.execute(
@ -95,11 +92,11 @@ def add_user_to_database(db: Connection, user_id: str, display_name: str) -> Non
) )
db.commit() db.commit()
except Error as e: except Error as e:
current_app.logger.error(f"Error occurred adding user to db: {e.args[0]}") current_app.logger.error(f"Error occured adding user to db: {e.args[0]}")
current_app.logger.error(f"To user id: {user_id}, with display name: {display_name}") current_app.logger.error(f"To user id: {user_id}, with display name: {display_name}")
def change_display_name(db: Connection, user_id: str, new_name: str) -> None: def change_display_name(db, user_id, new_name):
try: try:
db.execute( db.execute(
"UPDATE points SET name = ? WHERE id = ?", "UPDATE points SET name = ? WHERE id = ?",
@ -107,11 +104,11 @@ def change_display_name(db: Connection, user_id: str, new_name: str) -> None:
) )
db.commit() db.commit()
except Error as e: except Error as e:
current_app.logger.error(f"Error occurred changing display name: {e.args[0]}") current_app.logger.error(f"Error occured changing display name: {e.args[0]}")
current_app.logger.error(f"To user id: {user_id}, with display name: {new_name}") current_app.logger.error(f"To user id: {user_id}, with display name: {new_name}")
def remove_duplicate_usernames(db: Connection, user_id: str, username: str) -> None: def remove_duplicate_usernames(db, user_id, username):
try: try:
db.execute( db.execute(
"""UPDATE points """UPDATE points
@ -121,12 +118,12 @@ def remove_duplicate_usernames(db: Connection, user_id: str, username: str) -> N
) )
db.commit() db.commit()
except Error as e: except Error as e:
current_app.logger.error(f"Error occurred removing duplicate usernames: {e.args[0]}") current_app.logger.error(f"Error occured removing duplicate usernames: {e.args[0]}")
# # # misc. stuff # # # # # # misc. stuff # # #
# This is now unused since rawBody attribute of the webhook now returns cleaned-up emotes. # This is now unused since rawBody attribute of the webhook now returns cleaned-up emotes.
def remove_emoji(message: str) -> str: def remove_emoji(message):
return sub( return sub(
r'<img class="emoji" alt="(:.*?:)" title=":.*?:" src="/img/emoji/.*?">', r'<img class="emoji" alt="(:.*?:)" title=":.*?:" src="/img/emoji/.*?">',
r'\1', r'\1',

View File

@ -1,30 +1,28 @@
import requests import requests
from flask import current_app from flask import current_app
from tlapbot.owncast_helpers import give_points_to_user from tlapbot.owncast_helpers import give_points_to_user
from sqlite3 import Connection
from typing import Any
def is_stream_live() -> bool: def is_stream_live():
url = current_app.config['OWNCAST_INSTANCE_URL'] + '/api/status' url = current_app.config['OWNCAST_INSTANCE_URL'] + '/api/status'
try: try:
r = requests.get(url) r = requests.get(url)
except requests.exceptions.RequestException as e: except requests.exceptions.RequestException as e:
current_app.logger.error(f"Error occurred checking if stream is live: {e.args[0]}") current_app.logger.error(f"Error occured checking if stream is live: {e.args[0]}")
return False return False
return r.json()["online"] return r.json()["online"]
def give_points_to_chat(db: Connection) -> None: def give_points_to_chat(db):
url = current_app.config['OWNCAST_INSTANCE_URL'] + '/api/integrations/clients' url = current_app.config['OWNCAST_INSTANCE_URL'] + '/api/integrations/clients'
headers = {"Authorization": "Bearer " + current_app.config['OWNCAST_ACCESS_TOKEN']} headers = {"Authorization": "Bearer " + current_app.config['OWNCAST_ACCESS_TOKEN']}
try: try:
r = requests.get(url, headers=headers) r = requests.get(url, headers=headers)
except requests.exceptions.RequestException as e: except requests.exceptions.RequestException as e:
current_app.logger.error(f"Error occurred getting users to give points to: {e.args[0]}") current_app.logger.error(f"Error occured getting users to give points to: {e.args[0]}")
return return
if r.status_code != 200: if r.status_code != 200:
current_app.logger.error(f"Error occurred when giving points: Response code not 200.") current_app.logger.error(f"Error occured when giving points: Response code not 200.")
current_app.logger.error(f"Response code received: {r.status_code}.") current_app.logger.error(f"Response code received: {r.status_code}.")
current_app.logger.error(f"Check owncast instance url and access key.") current_app.logger.error(f"Check owncast instance url and access key.")
return return
@ -33,16 +31,16 @@ def give_points_to_chat(db: Connection) -> None:
give_points_to_user(db, user_id, current_app.config['POINTS_AMOUNT_GIVEN']) give_points_to_user(db, user_id, current_app.config['POINTS_AMOUNT_GIVEN'])
def send_chat(message: str) -> Any: def send_chat(message):
url = current_app.config['OWNCAST_INSTANCE_URL'] + '/api/integrations/chat/send' url = current_app.config['OWNCAST_INSTANCE_URL'] + '/api/integrations/chat/send'
headers = {"Authorization": "Bearer " + current_app.config['OWNCAST_ACCESS_TOKEN']} headers = {"Authorization": "Bearer " + current_app.config['OWNCAST_ACCESS_TOKEN']}
try: try:
r = requests.post(url, headers=headers, json={"body": message}) r = requests.post(url, headers=headers, json={"body": message})
except requests.exceptions.RequestException as e: except requests.exceptions.RequestException as e:
current_app.logger.error(f"Error occurred sending chat message: {e.args[0]}") current_app.logger.error(f"Error occured sending chat message: {e.args[0]}")
return return
if r.status_code != 200: if r.status_code != 200:
current_app.logger.error(f"Error occurred when sending chat: Response code not 200.") current_app.logger.error(f"Error occured when sending chat: Response code not 200.")
current_app.logger.error(f"Response code received: {r.status_code}.") current_app.logger.error(f"Response code received: {r.status_code}.")
current_app.logger.error(f"Check owncast instance url and access key.") current_app.logger.error(f"Check owncast instance url and access key.")
return return

View File

@ -1,6 +1,4 @@
from flask import Flask, request, json, Blueprint, current_app from flask import Flask, request, json, Blueprint, current_app
from sqlite3 import Connection
from typing import Any
from tlapbot.db import get_db from tlapbot.db import get_db
from tlapbot.owncast_requests import send_chat from tlapbot.owncast_requests import send_chat
from tlapbot.owncast_helpers import (add_user_to_database, change_display_name, from tlapbot.owncast_helpers import (add_user_to_database, change_display_name,
@ -13,12 +11,9 @@ bp = Blueprint('owncast_webhooks', __name__)
@bp.route('/owncastWebhook', methods=['POST']) @bp.route('/owncastWebhook', methods=['POST'])
def owncast_webhook() -> Any | None: def owncast_webhook():
"""Reads webhook json -- adds new users, removes duplicate usernames, data = request.json
handles name changes and chat messages with commands. db = get_db()
Returns the 'data' json from the request."""
data: Any | None = request.json
db: Connection = get_db()
# Make sure user is in db before doing anything else. # Make sure user is in db before doing anything else.
if data["type"] in ["CHAT", "NAME_CHANGED", "USER_JOINED"]: if data["type"] in ["CHAT", "NAME_CHANGED", "USER_JOINED"]:

View File

@ -1,11 +1,8 @@
from flask import current_app from flask import current_app
from sqlite3 import Error, Connection from sqlite3 import Error
from typing import Tuple, Any
from tlapbot.owncast_helpers import use_points from tlapbot.owncast_helpers import use_points
from tlapbot.tlapbot_types import Redeems
def counter_exists(db: Connection, counter_name: str) -> bool | None: def counter_exists(db, counter_name):
"""Returns None only if error was logged."""
try: try:
cursor = db.execute( cursor = db.execute(
"SELECT count FROM counters WHERE name = ?", "SELECT count FROM counters WHERE name = ?",
@ -19,11 +16,11 @@ def counter_exists(db: Connection, counter_name: str) -> bool | None:
return False return False
return True return True
except Error as e: except Error as e:
current_app.logger.error(f"Error occurred checking if counter exists: {e.args[0]}") current_app.logger.error(f"Error occured checking if counter exists: {e.args[0]}")
current_app.logger.error(f"For counter: {counter_name}") current_app.logger.error(f"For counter: {counter_name}")
def add_to_counter(db: Connection, counter_name: str) -> bool: def add_to_counter(db, counter_name):
if counter_exists(db, counter_name): if counter_exists(db, counter_name):
try: try:
db.execute( db.execute(
@ -33,12 +30,12 @@ def add_to_counter(db: Connection, counter_name: str) -> bool:
db.commit() db.commit()
return True return True
except Error as e: except Error as e:
current_app.logger.error(f"Error occurred adding to counter: {e.args[0]}") current_app.logger.error(f"Error occured adding to counter: {e.args[0]}")
current_app.logger.error(f"To counter: {counter_name}") current_app.logger.error(f"To counter: {counter_name}")
return False return False
# TODO: test if the new default works
def add_to_redeem_queue(db: Connection, user_id: str, redeem_name: str, note: str="") -> bool: def add_to_redeem_queue(db, user_id, redeem_name, note=None):
try: try:
db.execute( db.execute(
"INSERT INTO redeem_queue(redeem, redeemer_id, note) VALUES(?, ?, ?)", "INSERT INTO redeem_queue(redeem, redeemer_id, note) VALUES(?, ?, ?)",
@ -47,12 +44,12 @@ def add_to_redeem_queue(db: Connection, user_id: str, redeem_name: str, note: st
db.commit() db.commit()
return True return True
except Error as e: except Error as e:
current_app.logger.error(f"Error occurred adding to redeem queue: {e.args[0]}") current_app.logger.error(f"Error occured adding to redeem queue: {e.args[0]}")
current_app.logger.error(f"To user: {user_id} with redeem: {redeem_name} with note: {note}") current_app.logger.error(f"To user: {user_id} with redeem: {redeem_name} with note: {note}")
return False return False
def add_to_milestone(db: Connection, user_id: str, redeem_name: str, points_donated: int) -> bool: def add_to_milestone(db, user_id, redeem_name, points_donated):
try: try:
cursor = db.execute( cursor = db.execute(
"SELECT progress, goal FROM milestones WHERE name = ?", "SELECT progress, goal FROM milestones WHERE name = ?",
@ -77,11 +74,28 @@ def add_to_milestone(db: Connection, user_id: str, redeem_name: str, points_dona
db.commit() db.commit()
return True return True
except Error as e: except Error as e:
current_app.logger.error(f"Error occurred updating milestone: {e.args[0]}") current_app.logger.error(f"Error occured updating milestone: {e.args[0]}")
return False return False
def milestone_complete(db: Connection, redeem_name: str) -> bool | None:
"""Returns None only if error was logged.""" def milestone_complete(db, redeem_name):
try:
cursor = db.execute(
"SELECT complete FROM milestones WHERE name = ?",
(redeem_name,)
)
row = cursor.fetchone()
if row is None:
current_app.logger.warning("Milestone not found in database.")
current_app.logger.warning("Maybe you forgot to run the refresh-milestones CLI command "
"after you added a new milestone to the config?")
else:
return row[0]
except Error as e:
current_app.logger.error(f"Error occured checking if milestone is complete: {e.args[0]}")
def check_apply_milestone_completion(db, redeem_name):
try: try:
cursor = db.execute( cursor = db.execute(
"SELECT progress, goal FROM milestones WHERE name = ?", "SELECT progress, goal FROM milestones WHERE name = ?",
@ -95,63 +109,57 @@ def milestone_complete(db: Connection, redeem_name: str) -> bool | None:
else: else:
progress, goal = row progress, goal = row
if progress == goal: if progress == goal:
cursor = db.execute(
"UPDATE milestones SET complete = TRUE WHERE name = ?",
(redeem_name,)
)
db.commit()
return True return True
return False return False
except Error as e: except Error as e:
current_app.logger.error(f"Error occurred checking if milestone is complete: {e.args[0]}") current_app.logger.error(f"Error occured applying milestone completion: {e.args[0]}")
return False
def all_milestones(db):
def all_milestones(db: Connection) -> list[Tuple[str, int, int]] | None:
"""Returns list of all (even inactive) milestones, their progress and their goal.
Returns None only if error was logged."""
try: try:
cursor = db.execute( cursor = db.execute(
"""SELECT name, progress, goal FROM milestones""" """SELECT name, progress, goal FROM milestones"""
) )
return cursor.fetchall() return cursor.fetchall()
except Error as e: except Error as e:
current_app.logger.error(f"Error occurred selecting all milestones: {e.args[0]}") current_app.logger.error(f"Error occured selecting all milestones: {e.args[0]}")
def all_active_milestones(db: Connection) -> list[Tuple[str, int, int]] | None: def all_counters(db):
"""Returns list of all active milestones, their progress and their goal.
Returns None only if error was logged."""
milestones = all_milestones(db)
if milestones is not None:
all_active_milestones = []
for name, progress, goal in milestones:
if is_redeem_active(name):
all_active_milestones.append((name, progress, goal))
return all_active_milestones
def all_counters(db: Connection) -> list[Tuple[str, int]] | None:
"""Returns list of all (even inactive) counters and their current value.
Returns None only if error was logged."""
try: try:
cursor = db.execute( cursor = db.execute(
"""SELECT name, count FROM counters""" """SELECT counters.name, counters.count FROM counters"""
) )
return cursor.fetchall() return cursor.fetchall()
except Error as e: except Error as e:
current_app.logger.error(f"Error occurred selecting all counters: {e.args[0]}") current_app.logger.error(f"Error occured selecting all counters: {e.args[0]}")
def all_active_counters(db: Connection) -> list[Tuple[str, int]] | None: def all_active_counters(db):
"""Returns list of all active counters, and their current value.
Returns None if error was logged."""
counters = all_counters(db) counters = all_counters(db)
if counters is not None: all_active_counters = []
all_active_counters = [] for name, count in counters:
for name, count in counters: if is_redeem_active(name):
if is_redeem_active(name): all_active_counters.append((name, count))
all_active_counters.append((name, count)) return all_active_counters
return all_active_counters
def all_active_redeems() -> Redeems: def all_active_milestones(db):
"""Returns list of all active redeems.""" milestones = all_milestones(db)
all_active_milestones = []
for name, progress, goal in milestones:
if is_redeem_active(name):
all_active_milestones.append((name, progress, goal))
return all_active_milestones
def all_active_redeems():
redeems = current_app.config['REDEEMS'] redeems = current_app.config['REDEEMS']
all_active_redeems = {} all_active_redeems = {}
for redeem_name, redeem_dict in redeems.items(): for redeem_name, redeem_dict in redeems.items():
@ -165,9 +173,7 @@ def all_active_redeems() -> Redeems:
return all_active_redeems return all_active_redeems
def pretty_redeem_queue(db: Connection) -> list[Tuple[str, str, str, str]] | None: def pretty_redeem_queue(db):
"""Returns a 'pretty' redeem queue, with name of the redeemer joined instead of ID.
Returns None only if error was logged."""
try: try:
cursor = db.execute( cursor = db.execute(
"""SELECT redeem_queue.created, redeem_queue.redeem, redeem_queue.note, points.name """SELECT redeem_queue.created, redeem_queue.redeem, redeem_queue.note, points.name
@ -177,31 +183,45 @@ def pretty_redeem_queue(db: Connection) -> list[Tuple[str, str, str, str]] | Non
) )
return cursor.fetchall() return cursor.fetchall()
except Error as e: except Error as e:
current_app.logger.error(f"Error occurred selecting pretty redeem queue: {e.args[0]}") current_app.logger.error(f"Error occured selecting pretty redeem queue: {e.args[0]}")
def whole_redeem_queue(db: Connection) -> list[Any] | None: def whole_redeem_queue(db):
"""Returns None if error was logged."""
try: try:
cursor = db.execute( cursor = db.execute(
"SELECT * from redeem_queue" #TODO: specify columns to fetch "SELECT * from redeem_queue"
) )
return cursor.fetchall() return cursor.fetchall()
except Error as e: except Error as e:
current_app.logger.error(f"Error occurred selecting redeem queue: {e.args[0]}") current_app.logger.error(f"Error occured selecting redeem queue: {e.args[0]}")
def is_redeem_active(redeem_name: str) -> bool | None: def is_redeem_active(redeem_name):
"""Checks if redeem is active. Pulls the redeem by name from config. """Checks if redeem is active. Pulls the redeem by name from config."""
Returns None if the redeem doesn't exist."""
active_categories = current_app.config['ACTIVE_CATEGORIES'] active_categories = current_app.config['ACTIVE_CATEGORIES']
redeem_dict = current_app.config['REDEEMS'].get(redeem_name, None) redeem_dict = current_app.config['REDEEMS'].get(redeem_name, None)
if redeem_dict: if redeem_dict:
if redeem_dict.get('category', None): if "category" in redeem_dict:
for category in redeem_dict["category"]: for category in redeem_dict["category"]:
if category in active_categories: if category in active_categories:
return True return True
return False return False
else: else:
return True return True
return None # redeem does not exist, unknown active state return None # redeem does not exist, unknown active state
def is_redeem_from_config_active(redeem, active_categories):
"""Checks if redeem is active. `redeem` is a whole key:value pair from redeems config."""
if "category" in redeem[1] and redeem[1]["category"]:
for category in redeem[1]["category"]:
if category in active_categories:
return True
return False
return True
def remove_inactive_redeems(redeems, active_categories):
return dict(filter(lambda redeem: is_redeem_from_config_active(redeem, active_categories),
redeems.items()))

View File

@ -2,11 +2,11 @@ from flask import current_app
from tlapbot.db import get_db from tlapbot.db import get_db
from tlapbot.owncast_requests import send_chat from tlapbot.owncast_requests import send_chat
from tlapbot.redeems import (add_to_redeem_queue, add_to_counter, add_to_milestone, from tlapbot.redeems import (add_to_redeem_queue, add_to_counter, add_to_milestone,
milestone_complete, is_redeem_active) check_apply_milestone_completion, milestone_complete, is_redeem_active)
from tlapbot.owncast_helpers import use_points, read_users_points from tlapbot.owncast_helpers import use_points, read_users_points
def handle_redeem(message: str, user_id: str) -> None: def handle_redeem(message, user_id):
split_message = message[1:].split(maxsplit=1) split_message = message[1:].split(maxsplit=1)
redeem = split_message[0] redeem = split_message[0]
if len(split_message) == 1: if len(split_message) == 1:
@ -25,11 +25,6 @@ def handle_redeem(message: str, user_id: str) -> None:
redeem_type = current_app.config['REDEEMS'][redeem]["type"] redeem_type = current_app.config['REDEEMS'][redeem]["type"]
points = read_users_points(db, user_id) points = read_users_points(db, user_id)
if points is None:
send_chat(f"Can't redeem {redeem}, failed to read users' points.")
return
# handle milestone first because it doesn't have a price # handle milestone first because it doesn't have a price
if redeem_type == "milestone": if redeem_type == "milestone":
if milestone_complete(db, redeem): if milestone_complete(db, redeem):
@ -44,7 +39,7 @@ def handle_redeem(message: str, user_id: str) -> None:
send_chat(f"Can't donate zero points.") send_chat(f"Can't donate zero points.")
elif add_to_milestone(db, user_id, redeem, int(note)): elif add_to_milestone(db, user_id, redeem, int(note)):
send_chat(f"Succesfully donated to {redeem} milestone!") send_chat(f"Succesfully donated to {redeem} milestone!")
if milestone_complete(db, redeem): if check_apply_milestone_completion(db, redeem):
send_chat(f"Milestone goal {redeem} complete!") send_chat(f"Milestone goal {redeem} complete!")
else: else:
send_chat(f"Redeeming milestone {redeem} failed.") send_chat(f"Redeeming milestone {redeem} failed.")

View File

@ -12,7 +12,8 @@ CREATE TABLE milestones (
id INTEGER PRIMARY KEY AUTOINCREMENT, id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL, name TEXT NOT NULL,
progress INTEGER NOT NULL, progress INTEGER NOT NULL,
goal INTEGER NOT NULL goal INTEGER NOT NULL,
complete BOOLEAN NOT NULL
); );
CREATE TABLE counters ( CREATE TABLE counters (

View File

@ -8,7 +8,7 @@ bp = Blueprint('redeem_dashboard', __name__)
@bp.route('/dashboard', methods=['GET']) @bp.route('/dashboard', methods=['GET'])
def dashboard() -> str: def dashboard():
db = get_db() db = get_db()
username = request.args.get("username") username = request.args.get("username")
if username is not None: if username is not None:

View File

@ -1,4 +0,0 @@
from typing import Any, TypeAlias
Redeems: TypeAlias = dict[str, dict[str, Any]]
# at the moment the Any could be specialized to str | int | list[str]