Started with the login function and it works more or less :-)
This commit is contained in:
parent
8e77306c93
commit
8d4d166924
8 changed files with 300 additions and 58 deletions
|
@ -1,13 +1,51 @@
|
|||
import time
|
||||
from flask import Flask
|
||||
import os
|
||||
import time
|
||||
import flask
|
||||
import flask_sqlalchemy
|
||||
import flask_praetorian
|
||||
import flask_cors
|
||||
|
||||
import os
|
||||
db = flask_sqlalchemy.SQLAlchemy()
|
||||
guard = flask_praetorian.Praetorian()
|
||||
cors = flask_cors.CORS()
|
||||
|
||||
app = Flask(__name__)
|
||||
|
||||
# A generic user model that might be used by an app powered by flask-praetorian
|
||||
class User(db.Model):
|
||||
id = db.Column(db.Integer, primary_key=True)
|
||||
username = db.Column(db.Text, unique=True)
|
||||
password = db.Column(db.Text)
|
||||
roles = db.Column(db.Text)
|
||||
is_active = db.Column(db.Boolean, default=True, server_default='true')
|
||||
|
||||
@property
|
||||
def rolenames(self):
|
||||
try:
|
||||
return self.roles.split(',')
|
||||
except Exception:
|
||||
return []
|
||||
|
||||
@classmethod
|
||||
def lookup(cls, username):
|
||||
return cls.query.filter_by(username=username).one_or_none()
|
||||
|
||||
@classmethod
|
||||
def identify(cls, id):
|
||||
return cls.query.get(id)
|
||||
|
||||
@property
|
||||
def identity(self):
|
||||
return self.id
|
||||
|
||||
def is_valid(self):
|
||||
return self.is_active
|
||||
|
||||
|
||||
# Initialize flask app for the example
|
||||
app = flask.Flask(__name__)
|
||||
app.config['SECRET_KEY'] = 'top secret'
|
||||
app.config['JWT_ACCESS_LIFESPAN'] = {'hours': 24}
|
||||
app.config['JWT_REFRESH_LIFESPAN'] = {'days': 30}
|
||||
# Read environment variables
|
||||
if "DEBUG" in os.environ and os.environ["DEBUG"] == 'yes':
|
||||
debug = True
|
||||
|
@ -22,26 +60,93 @@ if "PORT" in os.environ:
|
|||
else:
|
||||
port = 5000
|
||||
|
||||
# Initialize the flask-praetorian instance for the app
|
||||
guard.init_app(app, User)
|
||||
|
||||
db = flask_sqlalchemy.SQLAlchemy()
|
||||
guard = flask_praetorian.Praetorian()
|
||||
cors = flask_cors.CORS()
|
||||
# Initialize a local database for the example
|
||||
app.config['SQLALCHEMY_DATABASE_URI'] = f"sqlite:///{os.path.join(os.getcwd(), 'database.db')}"
|
||||
db.init_app(app)
|
||||
|
||||
class User(db.Model):
|
||||
id = db.Column(db.Integer, primary_key=True)
|
||||
username = db.Column(db.Text, unique=True)
|
||||
password = db.Column(db.Text)
|
||||
# Initializes CORS so that the api_tool can talk to the example app
|
||||
cors.init_app(app)
|
||||
|
||||
# Add users for the example
|
||||
with app.app_context():
|
||||
db.create_all()
|
||||
if db.session.query(User).filter_by(username='Yasoob').count() < 1:
|
||||
db.session.add(User(
|
||||
username='Yasoob',
|
||||
password=guard.hash_password('strongpassword'),
|
||||
roles='admin'
|
||||
))
|
||||
db.session.commit()
|
||||
|
||||
|
||||
|
||||
@app.route('/')
|
||||
# Set up some routes for the example
|
||||
@app.route('/api/')
|
||||
def home():
|
||||
return "Hello World"
|
||||
return {"Hello": "World"}, 200
|
||||
|
||||
|
||||
@app.route('/api/login', methods=['POST'])
|
||||
def login():
|
||||
"""
|
||||
Logs a user in by parsing a POST request containing user credentials and
|
||||
issuing a JWT token.
|
||||
.. example::
|
||||
$ curl http://localhost:5000/api/login -X POST \
|
||||
-d '{"username":"Yasoob","password":"strongpassword"}'
|
||||
"""
|
||||
req = flask.request.get_json(force=True)
|
||||
username = req.get('username', None)
|
||||
password = req.get('password', None)
|
||||
user = guard.authenticate(username, password)
|
||||
ret = {'access_token': guard.encode_jwt_token(user)}
|
||||
return ret, 200
|
||||
|
||||
|
||||
@app.route('/api/refresh', methods=['POST'])
|
||||
def refresh():
|
||||
"""
|
||||
Refreshes an existing JWT by creating a new one that is a copy of the old
|
||||
except that it has a refrehsed access expiration.
|
||||
.. example::
|
||||
$ curl http://localhost:5000/api/refresh -X GET \
|
||||
-H "Authorization: Bearer <your_token>"
|
||||
"""
|
||||
print("refresh request")
|
||||
old_token = request.get_data()
|
||||
new_token = guard.refresh_jwt_token(old_token)
|
||||
ret = {'access_token': new_token}
|
||||
return ret, 200
|
||||
|
||||
|
||||
@app.route('/api/protected')
|
||||
@flask_praetorian.auth_required
|
||||
def protected():
|
||||
"""
|
||||
A protected endpoint. The auth_required decorator will require a header
|
||||
containing a valid JWT
|
||||
.. example::
|
||||
$ curl http://localhost:5000/api/protected -X GET \
|
||||
-H "Authorization: Bearer <your_token>"
|
||||
"""
|
||||
return {'message': f'protected endpoint (allowed user {flask_praetorian.current_user().username})'}
|
||||
|
||||
@app.route('/time')
|
||||
def get_current_time():
|
||||
return {'time': time.time()}
|
||||
|
||||
|
||||
# Run the example
|
||||
if __name__ == '__main__':
|
||||
app.run(host='0.0.0.0', port=5000)
|
||||
import time
|
||||
from flask import Flask
|
||||
import flask_sqlalchemy
|
||||
import flask_praetorian
|
||||
import flask_cors
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
app.run(debug=debug, host=host, port=port)
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue