cami
2aae4e2b1b
In this commit the survey data will be written into the database for the corresponding user. The usercheck works with the authtoken which is sent in the header
191 lines
5.1 KiB
Python
191 lines
5.1 KiB
Python
import os
|
|
import time
|
|
import flask
|
|
import flask_sqlalchemy
|
|
import flask_praetorian
|
|
import flask_cors
|
|
|
|
db = flask_sqlalchemy.SQLAlchemy()
|
|
guard = flask_praetorian.Praetorian()
|
|
cors = flask_cors.CORS()
|
|
|
|
|
|
# A generic user model that might be used by an app powered by flask-praetorian
|
|
class User(db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
username = db.Column(db.Text, unique=True)
|
|
password = db.Column(db.Text)
|
|
roles = db.Column(db.Text)
|
|
is_active = db.Column(db.Boolean, default=True, server_default='true')
|
|
age = db.Column(db.Integer)
|
|
gender = db.Column(db.Text)
|
|
education = db.Column(db.Text)
|
|
skills = db.Column(db.Text)
|
|
|
|
@property
|
|
def rolenames(self):
|
|
try:
|
|
return self.roles.split(',')
|
|
except Exception:
|
|
return []
|
|
|
|
@classmethod
|
|
def lookup(cls, username):
|
|
return cls.query.filter_by(username=username).one_or_none()
|
|
|
|
@classmethod
|
|
def identify(cls, id):
|
|
return cls.query.get(id)
|
|
|
|
@property
|
|
def identity(self):
|
|
return self.id
|
|
|
|
def is_valid(self):
|
|
return self.is_active
|
|
|
|
|
|
# Initialize flask app for the example
|
|
app = flask.Flask(__name__)
|
|
app.config['SECRET_KEY'] = 'top secret'
|
|
app.config['JWT_ACCESS_LIFESPAN'] = {'hours': 24}
|
|
app.config['JWT_REFRESH_LIFESPAN'] = {'days': 30}
|
|
# Read environment variables
|
|
if "DEBUG" in os.environ and os.environ["DEBUG"] == 'yes':
|
|
debug = True
|
|
else:
|
|
debug = False
|
|
if "HOST" in os.environ:
|
|
host = os.environ["HOST"]
|
|
else:
|
|
host = '0.0.0.0'
|
|
if "PORT" in os.environ:
|
|
port = int(os.environ["PORT"])
|
|
else:
|
|
port = 5000
|
|
|
|
# Initialize the flask-praetorian instance for the app
|
|
guard.init_app(app, User)
|
|
|
|
# Initialize a local database for the example
|
|
app.config['SQLALCHEMY_DATABASE_URI'] = f"sqlite:///{os.path.join(os.getcwd(), 'database.db')}"
|
|
db.init_app(app)
|
|
|
|
# Initializes CORS so that the api_tool can talk to the example app
|
|
cors.init_app(app)
|
|
|
|
# Add users for the example
|
|
with app.app_context():
|
|
db.create_all()
|
|
if db.session.query(User).filter_by(username='Yasoob').count() < 1:
|
|
db.session.add(User(
|
|
username='Yasoob',
|
|
password=guard.hash_password('strongpassword'),
|
|
roles='admin'
|
|
))
|
|
db.session.commit()
|
|
|
|
|
|
# Set up some routes for the example
|
|
@app.route('/api/')
|
|
def home():
|
|
return {"Hello": "World"}, 200
|
|
|
|
|
|
@app.route('/api/login', methods=['POST'])
|
|
def login():
|
|
"""
|
|
Logs a user in by parsing a POST request containing user credentials and
|
|
issuing a JWT token.
|
|
.. example::
|
|
$ curl http://localhost:5000/api/login -X POST \
|
|
-d '{"username":"Yasoob","password":"strongpassword"}'
|
|
"""
|
|
req = flask.request.get_json(force=True)
|
|
username = req.get('username', None)
|
|
password = req.get('password', None)
|
|
user = guard.authenticate(username, password)
|
|
ret = {'access_token': guard.encode_jwt_token(user)}
|
|
return ret, 200
|
|
|
|
|
|
@app.route('/api/refresh', methods=['POST'])
|
|
def refresh():
|
|
"""
|
|
Refreshes an existing JWT by creating a new one that is a copy of the old
|
|
except that it has a refrehsed access expiration.
|
|
.. example::
|
|
$ curl http://localhost:5000/api/refresh -X GET \
|
|
-H "Authorization: Bearer <your_token>"
|
|
"""
|
|
print("refresh request")
|
|
old_token = request.get_data()
|
|
new_token = guard.refresh_jwt_token(old_token)
|
|
ret = {'access_token': new_token}
|
|
return ret, 200
|
|
|
|
|
|
@app.route('/api/protected')
|
|
@flask_praetorian.auth_required
|
|
def protected():
|
|
"""
|
|
A protected endpoint. The auth_required decorator will require a header
|
|
containing a valid JWT
|
|
.. example::
|
|
$ curl http://
|
|
localhost:5000/api/protected -X GET \
|
|
-H "Authorization: Bearer <your_token>"
|
|
"""
|
|
return {'message': f'protected endpoint (allowed user {flask_praetorian.current_user().username})'}
|
|
|
|
|
|
@app.route('/api/register', methods=['POST'])
|
|
def register():
|
|
req = flask.request.get_json(force=True)
|
|
username = req.get('username', None)
|
|
password = req.get('password', None)
|
|
new_user = User(
|
|
username=username,
|
|
password=guard.hash_password(password)
|
|
)
|
|
db.session.add(new_user)
|
|
db.session.commit()
|
|
ret = {'message': 'Account erstellt für den Account {}'.format(
|
|
new_user.username
|
|
)}
|
|
return ret, 200
|
|
|
|
|
|
@app.route('/umfrage', methods=['POST'])
|
|
@flask_praetorian.auth_required
|
|
def survey():
|
|
req = flask.request.get_json(force=True)
|
|
token = guard.read_token_from_header()
|
|
extracted_token = guard.extract_jwt_token(token)
|
|
id_req = extracted_token['id']
|
|
|
|
# get data from the survey and write it to the database
|
|
|
|
age = req.get('age', None)
|
|
gender = req.get('gender', None)
|
|
education = req.get('education', None)
|
|
skills = req.get('skills', None)
|
|
user_db = User.query.filter_by(id=id_req).first()
|
|
user_db.age = age
|
|
user_db.gender = gender
|
|
user_db.education = education
|
|
user_db.skills = skills
|
|
db.session.commit()
|
|
ret = {'message': 'Umfrage freigestellt'}
|
|
return ret, 200
|
|
|
|
|
|
@app.route('/time')
|
|
def get_current_time():
|
|
return {'time': time.time()}
|
|
|
|
|
|
# Run the example
|
|
if __name__ == '__main__':
|
|
app.run(debug=debug, host=host, port=port)
|