Flask + SQLAlchemy permissions decorator: How do I navigate the relational
database?
I'm writing a permission system for my Flask app, and I'm having a number
of issues with my permissions decorator. My github repo, if you want to
see everything. The decorator is intended to limit access to decorated
views.
def user_has(attribute):
"""
Takes an attribute (a string name of either a role or an ability) and
returns the function if the user has that attribute
"""
def wrapper(func):
@wraps(func)
def inner(*args, **kwargs):
attribute_object =
Role.query.filter_by(name=attribute).first() or \
Ability.query.filter_by(name=attribute).first()
if attribute_object in current_user.roles or attribute in
current_user.roles.abilities.all():
return func(*args, **kwargs)
else:
# Make this do someting way better.
return "You do not have access"
return inner
return wrapper
I'm using SQLAlchemy and storing Users, Roles, and Abilities in the
database. Users may have one or more roles. Roles may have one or more
abilities. I want to take the string passed to the decorator and check if
the user has that role or if one of the user's roles has that ability. The
decorator doesn't know or care whether it has been called with a role or
ability argument until after it looks to the database.
Apparently, this method (current_user.roles.abilities.all()) does not work
to get through my relational database as I'm attempting to do here to find
abilities. How can I compare the string argument with my current user's
abilities which are derived from his/her role?
For reference, my models:
user_role_table = db.Table('user_role',
db.Column(
'user_id', db.Integer,
db.ForeignKey('user.uid')),
db.Column(
'role_id', db.Integer, db.ForeignKey('role.id'))
)
role_ability_table = db.Table('role_ability',
db.Column(
'role_id', db.Integer,
db.ForeignKey('role.id')),
db.Column(
'ability_id', db.Integer,
db.ForeignKey('ability.id'))
)
class Role(db.Model):
__tablename__ = 'role'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(120), unique=True)
abilities = db.relationship(
'Ability', secondary=role_ability_table, backref='roles')
def __init__(self, name):
self.name = name.lower()
def __repr__(self):
return '<Role {}>'.format(self.name)
def __str__(self):
return self.name
class Ability(db.Model):
__tablename__ = 'ability'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(120), unique=True)
def __init__(self, name):
self.name = name.lower()
def __repr__(self):
return '<Ability {}>'.format(self.name)
def __str__(self):
return self.name
class User(db.Model):
__tablename__ = 'user'
uid = db.Column(db.Integer, primary_key=True)
email = db.Column(db.String(120), unique=True)
pwdhash = db.Column(db.String(100))
roles = db.relationship('Role', secondary=user_role_table,
backref='users')
def __init__(self, email, password, roles=None):
self.email = email.lower()
# If only a string is passed for roles, convert it to a list
containing
# that string
if roles and isinstance(roles, basestring):
roles = [roles]
# If a sequence is passed for roles (or if roles has been
converted to
# a sequence), fetch the corresponding database objects and make a
list
# of those.
if roles and is_sequence(roles):
role_list = []
for role in roles:
role_list.appen(Role.query.filter_by(name=role).first())
self.roles = role_list
# Otherwise, assign the default 'user' role. Create that role if it
# doesn't exist.
else:
r = Role.query.filter_by(name='user').first()
if not r:
r = Role('user')
db.session.add(r)
db.session.commit()
self.roles = [r]
self.set_password(password)
def set_password(self, password):
self.pwdhash = generate_password_hash(password)
def check_password(self, password):
return check_password_hash(self.pwdhash, password)
def is_authenticated(self):
return True
def is_active(self):
return True
def is_anonymous(self):
return False
def get_id(self):
return unicode(self.uid)
def __repr__(self):
return '<User {}>'.format(self.email)
def __str__(self):
return self.email
and the decorated view:
@app.route('/admin', methods=['GET', 'POST'])
@user_has('admin')
def admin():
users = models.User.query.all()
forms = {user.uid: RoleForm(uid=user.uid, roles=[role.id for role in
user.roles])
for user in users}
if request.method == "POST":
current_form = forms[int(request.form['uid'])]
if current_form.validate():
u = models.User.query.get(current_form.uid.data)
u.roles = [models.Role.query.get(role)
for role in current_form.roles.data]
db.session.commit()
flash('Roles updated for {}'.format(u))
return render_template('admin.html', users=users, forms=forms)
No comments:
Post a Comment