mirror of https://github.com/EasyCTF/librectf
348 lines
12 KiB
Python
348 lines
12 KiB
Python
import json
|
|
from datetime import datetime, timedelta
|
|
from io import BytesIO
|
|
from string import Template
|
|
|
|
import pyqrcode
|
|
from flask import Blueprint, abort, flash, redirect, render_template, request, url_for
|
|
from flask_login import current_user, login_required, login_user, logout_user
|
|
from sqlalchemy import func
|
|
|
|
from easyctf.constants import (
|
|
FORGOT_EMAIL_TEMPLATE,
|
|
REGISTRATION_EMAIL_TEMPLATE,
|
|
USER_LEVELS,
|
|
)
|
|
from easyctf.forms.users import (
|
|
ChangeLoginForm,
|
|
LoginForm,
|
|
PasswordForgotForm,
|
|
PasswordResetForm,
|
|
ProfileEditForm,
|
|
RegisterForm,
|
|
TwoFactorAuthSetupForm,
|
|
)
|
|
from easyctf.models import Config, PasswordResetToken, Team, User
|
|
from easyctf.objects import db, sentry
|
|
from easyctf.utils import (
|
|
generate_string,
|
|
get_redirect_target,
|
|
redirect_back,
|
|
sanitize_avatar,
|
|
save_file,
|
|
send_mail,
|
|
)
|
|
|
|
blueprint = Blueprint("users", __name__, template_folder="templates")
|
|
|
|
|
|
@blueprint.route("/accept/<int:id>")
|
|
@login_required
|
|
def accept(id):
|
|
target_team = Team.get_by_id(id)
|
|
max_size = Config.get_team_size()
|
|
try:
|
|
assert not current_user.tid, "You're already in a team!"
|
|
assert (
|
|
current_user in target_team.outgoing_invitations
|
|
), "There is no invitation for you!"
|
|
if not target_team.admin:
|
|
assert (
|
|
target_team.size < max_size
|
|
), "This team has already reached the maximum member limit!"
|
|
target_team.outgoing_invitations.remove(current_user)
|
|
target_team.members.append(current_user)
|
|
db.session.add(current_user)
|
|
db.session.add(target_team)
|
|
db.session.commit()
|
|
flash("Successfully joined team %s!" % target_team.teamname, "success")
|
|
return redirect(url_for("teams.profile"))
|
|
except AssertionError as e:
|
|
flash(str(e), "danger")
|
|
return redirect(url_for("teams.create"))
|
|
|
|
|
|
@blueprint.route("/password/forgot", methods=["GET", "POST"])
|
|
def forgot():
|
|
forgot_form = PasswordForgotForm()
|
|
if forgot_form.validate_on_submit():
|
|
if forgot_form.user is not None:
|
|
token = PasswordResetToken(
|
|
active=True,
|
|
uid=forgot_form.user.uid,
|
|
email=forgot_form.email.data,
|
|
expire=datetime.utcnow() + timedelta(days=1),
|
|
)
|
|
db.session.add(token)
|
|
db.session.commit()
|
|
url = url_for("users.reset", code=token.token, _external=True)
|
|
# TODO: stick this into the template
|
|
send_mail(
|
|
forgot_form.email.data,
|
|
"%s Password Reset" % Config.get("ctf_name"),
|
|
"Click here to reset your password: %s" % url,
|
|
)
|
|
flash("Sent! Check your email.", "success")
|
|
return redirect(url_for("users.forgot"))
|
|
return render_template("users/forgot.html", forgot_form=forgot_form)
|
|
|
|
|
|
@blueprint.route("/password/reset/<string:code>", methods=["GET", "POST"])
|
|
def reset(code):
|
|
token = PasswordResetToken.query.filter_by(token=code, active=True).first()
|
|
if (not token) or token.expired or token.email != token.user.email:
|
|
return redirect(url_for("base.index"))
|
|
reset_form = PasswordResetForm()
|
|
if reset_form.validate_on_submit():
|
|
user = User.get_by_id(token.uid)
|
|
user.password = reset_form.password.data
|
|
token.active = False
|
|
db.session.add(user)
|
|
db.session.add(token)
|
|
db.session.commit()
|
|
flash("Password has been reset! Try logging in now.", "success")
|
|
return redirect(url_for("users.login", next=url_for("users.profile")))
|
|
return render_template("users/reset.html", reset_form=reset_form)
|
|
|
|
|
|
@blueprint.route("/login", methods=["GET", "POST"])
|
|
def login():
|
|
if current_user.is_authenticated:
|
|
return redirect(url_for("users.profile", uid=current_user.uid))
|
|
login_form = LoginForm(prefix="login")
|
|
next = get_redirect_target()
|
|
if login_form.validate_on_submit():
|
|
target_user = login_form.get_user()
|
|
if target_user.otp_confirmed and not target_user.verify_totp(
|
|
login_form.code.data
|
|
):
|
|
flash("Invalid code.", "danger")
|
|
return render_template("users/login.html", login_form=login_form, next=next)
|
|
|
|
login_user(target_user, remember=login_form.remember.data)
|
|
flash("Successfully logged in as %s!" % target_user.username, "success")
|
|
if sentry.client:
|
|
sentry.client.capture_breadcrumb(
|
|
message="login",
|
|
category="user:login",
|
|
level="info",
|
|
data=dict(uid=target_user.uid, username=target_user.username),
|
|
timestamp=datetime.now(),
|
|
)
|
|
return redirect_back("users.profile")
|
|
return render_template("users/login.html", login_form=login_form, next=next)
|
|
|
|
|
|
@blueprint.route("/logout")
|
|
def logout():
|
|
logout_user()
|
|
return redirect(url_for("base.index"))
|
|
|
|
|
|
@blueprint.route("/profile")
|
|
@blueprint.route("/profile/<int:uid>")
|
|
def profile(uid=None):
|
|
if uid is None and current_user.is_authenticated:
|
|
return redirect(url_for("users.profile", uid=current_user.uid))
|
|
user = User.get_by_id(uid)
|
|
if user is None:
|
|
abort(404)
|
|
if not current_user.is_authenticated:
|
|
flash("Please login to view user profiles!", "warning")
|
|
return redirect(url_for("users.login"))
|
|
user.type = USER_LEVELS[user.level]
|
|
return render_template("users/profile.html", user=user)
|
|
|
|
|
|
@blueprint.route("/register", methods=["GET", "POST"])
|
|
def register():
|
|
if current_user.is_authenticated:
|
|
return redirect(url_for("users.profile", uid=current_user.uid))
|
|
register_form = RegisterForm(prefix="register")
|
|
if register_form.validate_on_submit():
|
|
new_user = register_user(
|
|
register_form.name.data,
|
|
register_form.email.data,
|
|
register_form.username.data,
|
|
register_form.password.data,
|
|
int(register_form.level.data),
|
|
admin=False,
|
|
)
|
|
login_user(new_user)
|
|
return redirect(url_for("users.profile"))
|
|
return render_template("users/register.html", register_form=register_form)
|
|
|
|
|
|
@blueprint.route("/settings", methods=["GET", "POST"])
|
|
@login_required
|
|
def settings():
|
|
change_login_form = ChangeLoginForm(prefix="change-password")
|
|
profile_edit_form = ProfileEditForm(prefix="profile-edit")
|
|
if change_login_form.validate_on_submit() and change_login_form.submit.data:
|
|
current_email = current_user.email
|
|
if change_login_form.old_password.data:
|
|
if change_login_form.email.data != current_email:
|
|
# flash("changed email")
|
|
current_user.email = change_login_form.email.data
|
|
current_user.email_verified = False
|
|
current_user.email_token = generate_string()
|
|
if change_login_form.password.data:
|
|
# flash("changed password")
|
|
current_user.password = change_login_form.password.data
|
|
db.session.add(current_user)
|
|
db.session.commit()
|
|
flash("Login info updated.", "success")
|
|
return redirect(url_for("users.settings"))
|
|
elif profile_edit_form.validate_on_submit() and profile_edit_form.submit.data:
|
|
for field in profile_edit_form:
|
|
if field.short_name == "avatar":
|
|
if hasattr(field.data, "read") and len(field.data.read()) > 0:
|
|
field.data.seek(0)
|
|
f = BytesIO(field.data.read())
|
|
new_avatar = sanitize_avatar(f)
|
|
if new_avatar:
|
|
response = save_file(
|
|
new_avatar, prefix="user_avatar", suffix=".png"
|
|
)
|
|
if response.status_code == 200:
|
|
current_user._avatar = response.text
|
|
continue
|
|
if hasattr(current_user, field.short_name):
|
|
setattr(current_user, field.short_name, field.data)
|
|
if profile_edit_form.remove_avatar.data:
|
|
current_user._avatar = None
|
|
db.session.add(current_user)
|
|
db.session.commit()
|
|
flash("Profile updated.", "success")
|
|
return redirect(url_for("users.settings"))
|
|
else:
|
|
change_login_form.email.data = current_user.email
|
|
for field in profile_edit_form:
|
|
if hasattr(current_user, field.short_name):
|
|
field.data = getattr(current_user, field.short_name, "")
|
|
return render_template(
|
|
"users/settings.html",
|
|
change_login_form=change_login_form,
|
|
profile_edit_form=profile_edit_form,
|
|
)
|
|
|
|
|
|
@blueprint.route("/two_factor/required")
|
|
def two_factor_required():
|
|
user = User.query.filter(
|
|
func.lower(User.username) == request.args.get("username", "").lower()
|
|
).first()
|
|
if not user:
|
|
return json.dumps(False)
|
|
return json.dumps(user.otp_confirmed)
|
|
|
|
|
|
@blueprint.route("/two_factor/setup", methods=["GET", "POST"])
|
|
@login_required
|
|
def two_factor_setup():
|
|
two_factor_form = TwoFactorAuthSetupForm()
|
|
if two_factor_form.validate_on_submit():
|
|
current_user.otp_confirmed = True
|
|
db.session.add(current_user)
|
|
db.session.commit()
|
|
flash("Two-factor authentication setup is complete.", "success")
|
|
return redirect(url_for("users.settings"))
|
|
return render_template(
|
|
"users/two_factor/setup.html", two_factor_form=two_factor_form
|
|
)
|
|
|
|
|
|
@blueprint.route("/two_factor/qr")
|
|
@login_required
|
|
def two_factor_qr():
|
|
url = pyqrcode.create(current_user.get_totp_uri())
|
|
stream = BytesIO()
|
|
url.svg(stream, scale=6)
|
|
return (
|
|
stream.getvalue(),
|
|
200,
|
|
{
|
|
"Content-Type": "image/svg+xml",
|
|
"Cache-Control": "no-cache, no-store, must-revalidate",
|
|
"Pragma": "no-cache",
|
|
"Expires": 0,
|
|
"Secret": current_user.otp_secret,
|
|
},
|
|
)
|
|
|
|
|
|
@blueprint.route("/two_factor/disable")
|
|
@login_required
|
|
def two_factor_disable():
|
|
current_user.otp_confirmed = False
|
|
db.session.add(current_user)
|
|
db.session.commit()
|
|
flash("Two-factor authentication disabled.", "success")
|
|
return redirect(url_for("users.settings"))
|
|
|
|
|
|
@blueprint.route("/verify/<string:code>")
|
|
@login_required
|
|
def verify(code):
|
|
if current_user.email_verified:
|
|
flash("You've already verified your email.", "info")
|
|
elif current_user.email_token == code:
|
|
current_user.email_verified = True
|
|
db.session.add(current_user)
|
|
db.session.commit()
|
|
flash("Email verified!", "success")
|
|
else:
|
|
flash("Incorrect code.", "danger")
|
|
return redirect(url_for("users.settings"))
|
|
|
|
|
|
@blueprint.route("/verify")
|
|
@login_required
|
|
def verify_email():
|
|
if current_user.email_verified:
|
|
return ""
|
|
code = generate_string()
|
|
current_user.email_token = code
|
|
db.session.add(current_user)
|
|
db.session.commit()
|
|
try:
|
|
link = url_for("users.verify", code=code, _external=True)
|
|
response = send_verification_email(
|
|
current_user.username, current_user.email, link
|
|
)
|
|
if response.status_code // 100 != 2:
|
|
return "failed"
|
|
return "success"
|
|
except Exception as e:
|
|
return str(e)
|
|
|
|
|
|
def send_verification_email(username, email, verification_link):
|
|
subject = "[ACTION REQUIRED] EasyCTF Email Verification"
|
|
body = Template(REGISTRATION_EMAIL_TEMPLATE).substitute(
|
|
{"link": verification_link, "username": username}
|
|
)
|
|
return send_mail(email, subject, body)
|
|
|
|
|
|
def register_user(name, email, username, password, level, admin=False, **kwargs):
|
|
new_user = User(
|
|
name=name,
|
|
username=username,
|
|
password=password,
|
|
email=email,
|
|
level=level,
|
|
admin=admin,
|
|
)
|
|
for key, value in list(kwargs.items()):
|
|
setattr(new_user, key, value)
|
|
code = generate_string()
|
|
new_user.email_token = code
|
|
|
|
# TODO: Config for this
|
|
# send_verification_email(username, email, url_for("users.verify", code=code, _external=True))
|
|
|
|
db.session.add(new_user)
|
|
db.session.commit()
|
|
return new_user
|