To create my own permissions for the custom user I created earlier, I create a class that inherits from the BasePermission class.
I’ll create two permission classes:
A class that allows access for users who are clients (the role attribute is client). Contractors are not authorized to the resource.
A class that allows users who are contractors read-only access.
First of all, in the permissions.py file of the contracts application, I import the BasePermission class and the list of “safe methods”, i.e. allowing the listing of the resource, without the possibility of editing, deleting or adding a new contract.
from rest_framework.permissions import BasePermission, SAFE_METHODS
"""Custom permissions classes"""
The IsClient() class allows access only to clients. Other users will see a message informing about no access to the resource:
class IsClient(BasePermission):
message = "Only clients can access"
def has_permission(self, request, view):
if request.user.role == "client":
return True
return False
The IsClientOrReadOnly() class allows users who are clients to add, delete and edit, other users can only view the data. i.e.:
class IsClientOrReadOnly(BasePermission):
message = "Only clients can modify, contractors can read only"
def has_permission(self, request, view):
if request.method in SAFE_METHODS:
return True
return request.user.role == "client"
I put the created permission classes in views, e.g.
class ContractViewSet(viewsets.ModelViewSet):
permission_classes = [IsAuthenticated, IsClientOrReadOnly]
serializer_class = ContractSerializer
In this post, I will describe testing the API for the created custom user. (earlier entries – >>part 1<< and >>part 2<<)
Access to the ContactUserCreateRetrieveViewSet view for creating a new user and displaying a specific user is registered in the urls.py file of the users application, i.e.
from rest_framework.routers import DefaultRouter
from .views import ContractUserCreateRetrieveViewSet
router = DefaultRouter()
router.register("", ContractUserCreateRetrieveViewSet, basename="user")
urlpatterns = router.urls
As the basename parameter, I defined it as user, which name I will refer to during the tests.
The users application urls.py file is then imported in the project urls.py file, i.e.
from django.contrib import admin
from django.urls import path, include
urlpatterns = [
path("admin/", admin.site.urls),
path("api-auth/", include("rest_framework.urls")),
path("api/user/", include("users.urls")),
]
So the complete endpoint for creating users will be /api/user/
Creating a user and displaying a specific user will be handled by specific methods from the views.py file of the users application, i.e. creating – the create() function, displaying the user – the retrieve() function.
from django.shortcuts import get_object_or_404
from rest_framework import viewsets, mixins
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from rest_framework import status
from .models import ContractUser
from .serializers import ContractUserSerializer
class ContractUserCreateRetrieveViewSet(
mixins.CreateModelMixin,
mixins.RetrieveModelMixin,
viewsets.GenericViewSet,
):
serializer_class = ContractUserSerializer
queryset = ContractUser.objects.all()
def retrieve(self, request, pk):
permission_classes = [IsAuthenticated]
queryset = ContractUser.objects.filter(id=request.user.id)
user = get_object_or_404(queryset, pk=pk)
serializer = ContractUserSerializer(user)
return Response(serializer.data)
def create(self, request):
serializer = ContractUserSerializer(data=request.data)
if serializer.is_valid():
serializer.save()
return Response(serializer.data, status=status.HTTP_201_CREATED)
else:
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
The ContractUserCreateRetrieveViewSet class inherits from the basic GenericViewSet class and the mixins classes, which add the required functionalities – creating and displaying a user.
The create() function serializes the given data and, after checking whether they are correct, saves them in the database and returns the Response object and the appropriate status code.
The retrieve() function checks if the user is logged in and filters users so that the returned queryset contains only the user who is currently logged in, and then returns a Response object and the appropriate status code.
In the signals.py file of the users application, I modify the email field provided by the AbstractUser class so that this field is required when creating the ContractUser model, i.e.
If we want the email field to be unique, we can set the unique parameter to True, i.e.
instance._meta.get_field("email")._unique = True
Then, if the user enters an email that is already in the database, a validation error will occur. Unfortunately, thanks to this, it will be known that the user with this email address is already present in the database.
To test creating and displaying a user, I create a tests directory in the users app.
I move the tests.py file created in the previous article, which contained model tests, to it and rename it to test_model.py
In the created tests directory, I create a new file called test_api.py, which will contain api tests for the users application, i.e.
from rest_framework.test import APITestCase
from rest_framework import status
from django.urls import reverse
from ..models import ContractUser
class ContractUserApiTestCase(APITestCase):
"""Testing ContractUser API"""
def test_create_user(self):
endpoint = reverse("user-list")
user_data = {
"username": "user_1",
"password": "123456789",
"email": "user_1@company.com",
}
response = self.client.post(endpoint, data=user_data, format="json")
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
def test_retrieve_user(self):
user = ContractUser.objects.create(
username="user1", password="12345", email="user1@company.com"
)
self.client.force_authenticate(user)
endpoint = reverse("user-detail", args=[user.id])
response = self.client.get(endpoint, format="json")
self.assertEqual(response.status_code, status.HTTP_200_OK)
In the test_create_user() method, I create an endpoint using the reverse() function. The reverse() function takes as a parameter the basename given in the urls.py file with the -list suffix. Then I create data for the test user and using the default client present in the APITestCase class, I make a query using the POST method. The assertion checks whether the status code of the received response is 201 when the user was created successfully, or 400 otherwise.
In the test_retrieve_user() method, I create a test user and then log him in using the force_authenticate() function. The reverse() function takes the basename value given in the urls.py file with the -detail suffix as a parameter. As an additional parameter, I include the logged in user’s id. The assertion checks if the response status code is 200 if the query was successful or if the status is 400 otherwise.
To test the custom ContractUser model created in the previous article, in the tests.py file of the users application, I import the TestCase class from the django.test module and the tested model, i.e.
A new ContractUserTestCase class that inherits from the TestCase class is created.
In the setUp() method, I create two temporary users user1 and user2.
In the test_create_user() method, I run 1 test consisting of several assertions, in particular the text representation of the created instance suggested by the coverage tool, i.e.
I test other models from the contracts application in a similar way.
The Contract model also includes a custom validator that checks that the delivery date is not earlier than the order date.
part of the Contract model from models.py of the contracts application:
class Contract(models.Model):
class Meta:
ordering = ("-date_of_order",)
class StatusChoices(models.TextChoices):
OPEN = "open"
ACCEPTED = "accepted"
CANCELED = "cancelled"
class DeliveryDateValidator:
def validate(value):
if value < timezone.now().date():
raise validators.ValidationError(
"Date of delivery can't be earlier than date of order"
)
else:
return value
date_of_order = models.DateField(default=timezone.now)
date_of_delivery = models.DateField(
validators=(DeliveryDateValidator.validate,)
)
..........
To test the custom validator, I create a separate test that checks whether the ValidationError exception will be raised and whether the entered date will be returned when the correct date is given (not older than the order date).
In the models.py file, I create a custom user that inherits from the AbstractUser class. I’m adding a role checkbox so that the user will be either a principal or a contractor (client or contractor).
from django.contrib.auth.models import AbstractUser
from django.db import models
class ContractUser(AbstractUser):
class RoleChoices(models.TextChoices):
CLIENT = "client"
CONTRACTOR = "contractor"
role = models.CharField(
max_length=10,
choices=RoleChoices.choices,
default=RoleChoices.CLIENT
)
def __str__(self):
return self.username
The new user model should be registered in the project’s settings.py file, i.e.
AUTH_USER_MODEL = "users.ContractUser"
In the users application, I create a serializers.py file containing the serializer of the new user model, i.e.
from django.contrib.auth.hashers import make_password
from rest_framework import serializers
from .models import ContractUser
class ContractUserSerializer(serializers.ModelSerializer):
password = serializers.CharField(
min_length=8,
write_only=True,
style={"input_type": "password"}
)
class Meta:
model = ContractUser
fields = [
"id",
"username",
"password",
"email",
"role"
]
def create(self, validated_data):
validated_data["password"] = make_password(validated_data["password"])
return super().create(validated_data)
The serializer includes a CharField to enter a password. This field contains the write_only parameter set to True to allow only writing and not reading of this field. The fields field of the Meta internal class contains a list of all available serializable fields. The create() method creates a password based on user input using the make_password() function.
In the urls.py file of the users application, I define an endpoint, i.e.
from rest_framework.routers import DefaultRouter
from .views import ContractUserCreateRetrieveViewSet
router = DefaultRouter()
router.register("", ContractUserCreateRetrieveViewSet, basename="user")
urlpatterns = router.urls
I attach the endpoint for the users application to the main urls.py file of the project:
from django.contrib import admin
from django.urls import path, include
urlpatterns = [
path("admin/", admin.site.urls),
path("api/user/", include("users.urls")),
]
The /api/user/ endpoint will allow you to create a user and view data for a given user. To do this, I create a ContractUserCreateRetrieveViewSet() class in the views.py file. To limit the number of available methods, I do not use the ModelViewSet class, but inherit from the GenericViewSet class and the appropriate Mixin classes.
from django.shortcuts import get_object_or_404
from rest_framework import viewsets, mixins
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from rest_framework import status
from .models import ContractUser
from .serializers import ContractUserSerializer
class ContractUserCreateRetrieveViewSet(
mixins.CreateModelMixin,
mixins.RetrieveModelMixin,
viewsets.GenericViewSet,
):
serializer_class = ContractUserSerializer
queryset = ContractUser.objects.all()
def retrieve(self, request, pk):
permission_classes = [IsAuthenticated]
queryset = ContractUser.objects.filter(id=request.user.id)
user = get_object_or_404(queryset, pk=pk)
serializer = ContractUserSerializer(user)
return Response(serializer.data)
def create(self, request):
serializer = ContractUserSerializer(data=request.data)
if serializer.is_valid():
serializer.save()
return Response(serializer.data, status=status.HTTP_201_CREATED)
else:
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
To make the newly created user inactive, I used the pre_save signal. For this purpose, I created a new signals.py file in the users application, i.e.
from django.db.models.signals import pre_save
from .models import ContractUser
from django.dispatch import receiver
@receiver(pre_save, sender=ContractUser)
def create_inactive_user(sender, instance, **kwargs):
instance.is_active = False
The signal should be imported in the ready() function of the UsersConfig() class from the apps.py file:
from django.apps import AppConfig
class UsersConfig(AppConfig):
default_auto_field = "django.db.models.BigAutoField"
name = "users"
def ready(self):
import users.signals
In this example, the data comes from the Orlen wholesale price archive.
Due to the fact that the way of downloading data from the website has changed, I also had to change the way my program works (the previous version used searching the website in order to find the appropriate values - web scraping). Now it has become necessary to use an API to obtain price data.
To determine what a correct query to the Orlen API should look like, I used the Web tab of the developer mode of the browser.
The program will download data of fuel prices, and display data and a chart according specific years and months.
The project will consist of the main program file: orlen-prices.py and the utils.py file containing a function to download responses from an external API and a function to convert data in json format to the appropriate Pandas data frame.
Downloaded data will be cached in a simple way to limit the number of requests to the API.
First, I will describe the fetch_data() function from the utils.py file responsible for fetching the response from an external server. If the query is successful, the function returns a response in json format. If an exception occurs, the function returns None.
The next function from the utils.py file converts the data in json format to a Pandas dataframe. The resulting frame will contain only two columns, the rest of the data is omitted. The effectiveDate column will contain datetime values and will be set as an index (to facilitate searching by date). The value column will be of type int. Instead, the entire frame will be sorted by index.
From the menu it will be possible to optionally close the program by selecting File/Exit, i.e.
def show_menu(self, root):
# Menu bar
menu_bar = tk.Menu(root)
root.config(menu=menu_bar)
# File menu
file_menu = tk.Menu(menu_bar, tearoff=0)
file_menu.add_command(label='Exit', command=self._exit)
menu_bar.add_cascade(label='File', menu=file_menu)
The show_product_combobox() method initializes the Combobox widget, which will be responsible for selecting the product for which we want to track price changes. The selection will be passed using the product_sv variable – StringVar object. Before the user selects the appropriate product, a hint is displayed on the widget.
def show_product_combobox(self, root):
self.product_sv = tk.StringVar(value='')
product_cb = ttk.Combobox(root, textvariable=self.product_sv)
product_cb['values'] = [x for x in self.products]
product_cb.set('Product')
product_cb.state = 'readonly'
product_cb.bind('<<ComboboxSelected>>', self.enable_show_chart_btn)
product_cb.grid(row=0, column=0, padx=10, pady=10)
When the user selects the appropriate product, the associated method is run: enable_show_chart_button(), because by default the chart drawing button is inactive. Initially, the condition is checked whether the button is inactive, then whether the product and year have been selected. If all these conditions are met, the state of the button changes to active.
def enable_show_chart_btn(self, sender):
if str(self.show_chart_btn['state']) == 'disabled' and self.product_sv.get() != 'Product' and self.year_sv.get() != 'Year':
self.show_chart_btn['state'] = '!disabled'
The method of selecting the year for which we want to view data looks similar. Selectable year values range from 2004 to the current year, with the current year being the highest in the list.
def show_year_combobox(self, root):
self.year_sv = tk.StringVar(value='')
year_cb = ttk.Combobox(root, textvariable=self.year_sv)
year_cb['values'] = [x for x in range(self.current_date.year, 2004-1, -1)]
year_cb.set('Year')
year_cb.state = 'readonly'
year_cb.bind('<<ComboboxSelected>>', self.enable_show_chart_btn)
year_cb.grid(row=0, column=1, padx=10, pady=10)
The radio fields for selecting a specific month are embedded in a separate frame, and the selected value is passed by the radio_sv variable. Since calendar.month_abbr returns a list where index 1 corresponds to January, and so on, the value at index 0 contains an empty string. I used this to put the default option – All – to display values for all months of a given year. The enumerate() function used in the list is to place the next radio widget in the new column.
def show_radiobuttons(self, root):
# Frame for Radiobuttons
rb_frame = tk.Frame(root)
rb_frame['borderwidth'] = 1
rb_frame['relief']='groove'
rb_frame.grid(row=0, column=2, sticky='w')
# Radiobuttons
self.radio_sv = tk.StringVar(value='All')
for i, month in enumerate(self.months):
if month=="":
month_rb = tk.Radiobutton(
rb_frame,
text='All',
value='All',
variable=self.radio_sv)
else:
month_rb = tk.Radiobutton(
rb_frame,
text=month,
value=month,
variable=self.radio_sv)
month_rb.grid(row=0, column=i)
The button that starts drawing the chart is inactive by default. Only selecting the product and year changes the state of the button to active. Pressing the button triggers the show_chart_btn_clicked() method.
Data is placed in the Treeview object. These data are downloaded for a specific product and date after pressing the button.
def show_data_table(self, root):
# Show Data Frame
data_frame = tk.Frame(root)
data_frame['borderwidth'] = 1
data_frame['relief']='groove'
data_frame.grid(row=1, column=0, columnspan=2, sticky=tk.N)
# Show Data Table
nametofont("TkHeadingFont").configure(weight='bold')
self.table = ttk.Treeview(data_frame,
show='headings')
scrollbar = ttk.Scrollbar(data_frame,
orient='vertical',
command=self.table.yview)
scrollbar.grid(row=0, column=1, sticky=tk.NS)
self.table.configure(yscrollcommand=scrollbar.set)
The program has a status bar showing info about correct loading of data, lack of data for a selected product in a selected period or errors related to e.g. lack of internet connection.
if the product data has already been downloaded, it does not query the external API
if the data are not downloaded yet, an attempt is made to download them from the server. When the attempt fails, a download error message is displayed
the data is filtered based on the date values selected by the user
the filtered data is passed to the methods: _set_data_table(), which updates the numerical data, and _show_chart(), which displays the chart
def show_chart_btn_clicked(self):
self.msg.set('')
product_id = self.products[self.product_sv.get()]
if product_id in self.price_data:
data = self.price_data[product_id]
else:
data = fetch_data(product_id, self.current_date)
if data is not None:
self.price_data[product_id] = data
else:
self.msg.set('Error fetching data')
return
product_df = json_to_df(data)
year = self.year_sv.get()
month = self.radio_sv.get()
self.table.delete(*self.table.get_children())
try:
if month == 'All':
chart_df = product_df.loc[year]
else:
month_int = datetime.strptime(month, '%b').month
chart_df = product_df.loc[f'{year}-{month_int}']
except:
self.msg.set('No data for selected date')
for widget in self.chart_canvas.winfo_children():
widget.destroy()
else:
self.msg.set('OK')
self._set_data_table(chart_df[::-1])
self._show_chart(root, chart_df, year, month)
Updating data in the Treeview object displaying numerical data on prices:
def _set_data_table(self, df):
for i, row in enumerate(df.iloc):
date, value = row
if i % 2:
self.table.insert(
parent='', index=i, text='',
values=(date.date(), value),
tag='odd')
else:
self.table.insert(
parent='', index=i, text='',
values=(date.date(), value))
values - an object containing values entered into the form and (after validation) passed to the callback() function, which queries the external API
errors – an object containing validation error values that will be displayed under the form fields
validate – a function that validates the entered values while typing
handleSubmit – validation function when submitting the form (onSubmit)
In the form, we pass the event object as the first argument, and the list of form element names as the second argument, i.e.
import { useState } from 'react'
function useForm(callback) {
const [values, setValues] = useState({})
const [errors, setErrors] = useState({})
const pattern = new RegExp(
/^[a-zA-Z0-9]+@(?:[a-zA-Z0-9]+\.)+[A-Za-z]+$/
)
At the beginning, the following objects are initialized: values - storing the values entered into the form fields and errors – storing error messages. Then a RegExp object is created that holds a pattern that matches a valid email address.
The useForm function takes as an argument the name of the callback function that will be called after submitting the form and executing the handleSubmit function:
The above function takes an event object and a list of form fields as arguments. Next, the preventDefault() function is called to prevent the default action when submitting the form, and then the validateOnSubmit() function is called for each form field, i.e.
const validateOnSubmit = (value) => {
if (values[value] === undefined) {
errors[[value]] = 'This field is required'
}
}
The above function checks if any values are entered in the form field. If not error is generated.
If the errors object is not empty, i.e. errors are present, the callback function will not be executed.
The validate() function validates the text entered into the form field as you type, and displays appropriate validation error messages as you type. Takes an event object and an object containing validation rules as arguments.
For any form field, the setValues() function is called, which completes the values object with an object with the key of the form field into which the text is currently being entered. Then, individual validations are called for specific rules, i.e.
If the ‘required’:true rule in the form is selected, the hook checks whether it is dealing with this method, and whether the value of the text entered into the form field is equal to 0. In this case, the errors object is supplemented with another element with the key being the name of the form field .
// is required validation
if (rules.required === true && event.target.value.length === 0) {
setErrors(errors => ({...errors, [event.target.name]: 'This field is required'}))
}
If the selected rule is ‘isEmail’:true in the form, then the hook checks whether the entered text is consistent with the pattern, i.e.
// is valid email address validation
else if (rules.isEmail === true && !pattern.test(event.target.value)) {
setErrors(errors => ({...errors, [event.target.name]: 'This email address is invalid'}))
}
If the selected rule is ‘min’: number, then the hook checks if the text entered in the form field is at least the length given as the number value, e.g. ‘min’: 6 will accept text with a length of at least 6 characters. Otherwise, an appropriate error will be set. The name of the form field is downloaded to the text of the error message, i.e. the text will change depending on the name of the variable.
// min value length validation
else if (rules.min && event.target.value.length < rules.min) {
setErrors(errors => ({...errors,
[event.target.name]:
[event.target.name.charAt(0).toUpperCase()]
+ [event.target.name.slice(1)]
+ ' is too short'}))
}
If the selected rule is match, then as the value for the match key we enter the name of the form field with which the values are to be compared. Hook does not have the fixed name of the compared field, i.e.
// match validation
else if (rules.match && event.target.value!==values[rules.match]) {
setErrors(errors => ({...errors,
[event.target.name]: "Passwords don't match"}))
}
If none of the above conditions are met, the else block is invoked:
In the post I will describe how to activate the modal component from the Bulma framework, so that after pressing the delete row-contract button in the contract table, it is possible to confirm or cancel the deletion of the contract.
Instead of directly calling the deleteContract() function, the showModal() function is called, which gets as the id parameter of the selected contract.
The showModal() function sets the variables id and modal, respectively.
The id variable stores the id number of the selected contract, while the modal variable stores the boolean value that specifies the state of the modal window (displayed or invisible-default). Calling the setModal() function changes the default value – false of the modal variable to true.
The code of the modal window displaying the confirmation of contract deletion looks like this:
The setCursor function gets the values of the cursor object and filters them so that the selected contract is discarded. Then the api is called in Flask, in addition to the contract id, the user’s token is also sent. The result of a successful contract canceling is displayed in the console.
What the API looks like in Flask I described in the previous post.
In the last row of the deleteContract() function, I set the modal window inactive.
In the contract table, after pressing the appropriate button to delete a row in the table, the deleteContract function is activated, which takes the contract id value as a parameter, i.e.:
The deleteContract function filters the data of the cursor variable in such a way that the row that we want to delete is rejected. Then a query is called to the backend, where the POST method sends the value of the contract id and the token. When the “deletion” of the record is successful, the API sends information in the message variable (in the given example, the value of this variable is displayed in the console).
Backend we Flask-RESTful z wykorzystaniem SQLAlchemy:
class DeleteContract(Resource):
def post(self):
req = request.json
contract_id = req['id']
token = req['token'].strip('"')
user = User.query.filter_by(token=token).first()
if user.role == 'Customer':
contract = Contract.query.filter_by(customer=user.username).filter_by(id=contract_id).first()
if contract:
contract.status ='cancelled'
db.session.commit()
return {"message": "Row deleted"}
I assign the id and token values sent using the POST method to the corresponding variables contract_id and token. Then I find the user in the database based on the token sent. If the user belongs to the Customer group, the contract is searched based on the user name and the contract id number. If this contract exists, instead of removing it from the database, I change its status to cancelled.
I also updated the post() method of the AllContracts(Resource) class so that the created cursor object does not contain records with an cancelled status, i.e.:
class AllContracts(Resource):
def get(self, token):
token = token.strip('"')
user = User.query.filter_by(token=token).first()
if user.role == 'Customer':
contracts = Contract.query.filter(
Contract.status!='cancelled').
filter_by(customer=user.username).
order_by(Contract.id.desc()).all()
elif user.role == 'Contractor':
contracts = Contract.query.
filter(Contract.status!='cancelled').
filter_by(contractor=user.username).
order_by(Contract.id.desc()).all()
cursor = {}
for i, contract in enumerate(contracts):
cursor[i] = contract.serialize()
return cursor
The useEffect() hook for loading contracts after selecting the Contracts option from the menu looks like this:
I download the data from the backend when selecting the option from the Navbar component of Bulma framework. I use the useEffect hook to get the data to be read during component rendering. Then, a query to the backend is created, and a token is passed as a parameter, set when the user logs in. The response is sent from the backend in the form of a Promise object, from which if the query is completed successfully, the cursor variable is set (change of state with setCursor). Cursor and setCursor function are passed to the component from the parent App.js component as parameters (props), i.e.
In order for the component to refresh the data after successfully logging in, in the additional parameter of the useEffect method in the array, I put the token variable, which is set in the LoginForm.js component.
In order for the component to refresh the data after changing the cursor content, e.g. by adding a new contract, I have placed the setCursor function as an additional parameter of the useEffect method.
Individual data lines are placed in the table, i.e.
In the init.py file, I add another resource class:
from api.resources.contracts import AllContracts
api.add_resource(AllContracts, '/api/contract/<token>', endpoint='all_contracts')
In contracts.py, I define the resource class:
from datetime import datetime
from flask import request
from flask_restful import Resource
from .. import db
from ..common.models import User, Contract
class AllContracts(Resource):
def get(self, token):
token = token.strip('"')
user = User.query.filter_by(token=token).first()
contracts = Contract.query.filter_by(customer=user.username).order_by(Contract.id.desc()).all()
cursor = {}
for i, contract in enumerate(contracts):
cursor[i] = contract.serialize()
return cursor
In the models.py file I define, among others, how the Contract class inheriting from the Model class from SQLAlchemy is to be serialized:
from .. import db
from datetime import datetime
class Contract(db.Model):
'''Model of contract between a contractor and a customer'''
id = db.Column(db.Integer, primary_key=True)
status = db.Column(db.String(10), nullable=False, default='open')
contract_number = db.Column(db.String(20), nullable=False)
contractor = db.Column(db.String(20), db.ForeignKey('user.username'), nullable=False)
customer = db.Column(db.String(20), db.ForeignKey('user.username'), nullable=False)
date_of_order = db.Column(db.Date, nullable=False, default=datetime.utcnow)
date_of_delivery = db.Column(db.Date, nullable=False)
pallets_position = db.Column(db.Integer)
pallets_planned = db.Column(db.Integer, nullable=False)
pallets_actual = db.Column(db.Integer)
warehouse = db.Column(db.String(10), nullable=False)
def serialize(self):
return {'id': self.id,
'status': self.status,
'contract_number': self.contract_number,
'contractor': self.contractor,
'customer': self.customer,
'date_of_order': datetime.strftime(self.date_of_order, '%Y-%m-%d'),
'date_of_delivery': datetime.strftime(self.date_of_delivery, '%Y-%m-%d'),
'pallets_position': self.pallets_position,
'pallets_planned': self.pallets_planned,
'pallets_actual': self.pallets_actual,
'warehouse': self.warehouse
}
def create_app(Config):
app = Flask(__name__, static_folder='../build', static_url_path='/')
app.config.from_object(Config)
api.init_app(app)
db.init_app(app)
app.db = db
bcrypt = Bcrypt(app)
app.bcrypt = bcrypt
from .common import utils
from .common import routes
app.register_blueprint(api_bp)
app.register_blueprint(utils.bp)
app.register_blueprint(routes.bp)
return app
from api.resources.auth import UserRegister
api.add_resource(UserRegister, '/api/user/register', endpoint='user_register')
File that contains the Resource class for handling user registration:
from flask import current_app, request
from flask_restful import Resource
from api.common.parsers import parser
from ..common.utils import send_email
from ..common.models import User
from .. import db
class UserRegister(Resource):
def post(self):
req = request.json
username = req['username']
password = req['password']
email = req['email']
role = req['role']
bcrypt = current_app.bcrypt
hashed_password = bcrypt.generate_password_hash(password).decode('utf-8')
status = send_email(email, category='confirm_account')
user = User(username=username, password=hashed_password, email=email, role=role)
db.session.add(user)
db.session.commit()
return {"message": status}
The application allows the customer to place new orders and accept orders by the contractor as well as generate reservations, i.e. manage delivery time slots to central warehouses and logistics centers.
The source code for the application is available here.
In this post, I will introduce the new_booking(id) function which handles the booking for a specific order (contract). It takes the contract identification number as a parameter, i.e.
Then, in the new_booking () function, I create a booking form and get the booking object for a given contract, i.e.
form = BookingForm()
result = Booking.query.filter_by(contract_id=id).first()
Next, I set the contract object for the booking, i.e.
current_contract = Contract.query.get(id) # Returns contract for current booking
In the following lines I get a list of orders that were generated for a specific warehouse on the day of delivery, i.e.
# Filtering by one column gives a list of tuple(s) so I converted it to a list of values
contracts = [ids[0] for ids in Contract.query.with_entities(Contract.id).filter_by(
date_of_delivery=current_contract.date_of_delivery).filter_by(
warehouse=current_contract.warehouse).all()]
Then I validate if the form has been submitted. Depending on whether the reservation for a given contract has already been created, I update the data in the database based on the data from the form. If the booking object does not exist, I create a new booking object and save it to the database, i.e.
If the page is loaded using the GET method, then depending on whether the booking is already present in the system or is being created, the data is completed in the form, i.e. if the booking is available and we open it, e.g. to edit the data, then the form will be completed with the previous one entered data. However, if the reservation is just being created, the form will contain a modification, the dates of which cannot be selected, because they have already been selected by other suppliers, i.e.
if result is not None:
reserved_booking_time = [times[0] for times in
Booking.query.with_entities(Booking.booking_time).filter(
Booking.contract_id.in_(contracts)).all() if times[0]!=result.booking_time]
form.booking_time.data = result.booking_time
form.driver_full_name.data = result.driver_full_name
form.driver_phone_number.data = result.driver_phone_number
form.truck_reg_number.data = result.truck_reg_number
else:
reserved_booking_time = [times[0] for times in
Booking.query.with_entities(Booking.booking_time).filter(
Booking.contract_id.in_(contracts)).all()]
return render_template('booking.html', form=form, reserved_booking_time=reserved_booking_time)
The complete code for the new_booking() function is available here.
A contract accepted in this way can only be edited by the customer. However, if the contract has already been accepted and the booking has been made, the change of the contract changes the status of the order from accepted back to open and requires the contractor to accept the changes again (but the entered data are present and the reservation is does not require data to be repeated).
The contractor may also download a pdf of the booking if the contract is accepted (for open contract the option to download pdf is inactive).