Django – testing the UserManager

Here is a manager for my custom user model, that allows to create instances for regular users and admin/staff users.

My custom user model:

from django.db import models
from django.contrib.auth.models import AbstractUser
from .managers import ContractUserManager


class ContractUser(AbstractUser):

    class ProfileChoices(models.TextChoices):
        CLIENT = "client"
        CONTRACTOR = "contractor"

    profile = models.TextField(
        choices=ProfileChoices.choices, default=ProfileChoices.CLIENT
    )
    email = models.EmailField(max_length=255, unique=True)
    is_active = models.BooleanField(default=False)

    objects = ContractUserManager()

    def __str__(self):
        return self.username

Accounts for the created regular users are inactive, i.e. they can’t log in using the provided data until activated via e-mail.

Creating regular and privileged users is done using a manager that inherits from the UserManager class.

source file: backend/users/managers.py

from django.contrib.auth.models import UserManager
from django.utils.translation import gettext_lazy as _


class ContractUserManager(UserManager):

    def create_user(self, username, email, password, **extra_fields):
        if not email:
            raise ValueError(_("Email field is required"))
        email = self.normalize_email(email)
        user = self.model(username=username, email=email, **extra_fields)
        user.set_password(password)
        user.save()
        return user

    def create_superuser(self, username, email, password, **extra_fields):
        extra_fields.setdefault("is_superuser", True)
        extra_fields.setdefault("is_staff", True)
        extra_fields.setdefault("is_active", True)
        user = self.create_user(username, email, password, **extra_fields)
        return user

Then I test the custom manager:

from django.test import TestCase
from .models import ContractUser


class ContractUserTestCase(TestCase):

    def setUp(self):
        self.user = ContractUser.objects.create_user(
            username="user", password="123", email="user@company.com"
        )
        self.superuser = ContractUser.objects.create_superuser(
            username="admin",
            password="123",
            email="admin@company.com",
        )

    def test_create(self):
        self.assertEqual(self.user.username, "user")
        self.assertEqual(self.user.email, "user@company.com")
        self.assertEqual(self.user.profile, "client")
        self.assertEqual(str(self.user), "user")
        self.assertFalse(self.user.is_active)

        with self.assertRaisesMessage(ValueError, "Email field is required"):
            ContractUser.objects.create_user(username="user1", email="", password="123")

    def test_create_superuser(self):
        self.assertTrue(self.superuser.is_superuser)
        self.assertTrue(self.superuser.is_staff)
        self.assertTrue(self.superuser.is_active)
        self.assertEqual(self.superuser.username, "admin")

DRF – custom permissions #2

Previous post regarding permissions in DRF – see here.

For the warehouse model, I intend to define permissions thanks to which only the owner of a given warehouse will be able to edit the created model instance. Additionally, a warehouse can only be created by a user with a “customer” profile – who places orders.

The warehouse model:

source file: backend/contracts/models/warehouse.py

class Warehouse(models.Model):
    warehouse_name = models.CharField(max_length=15, unique=True)
    warehouse_info = models.TextField(max_length=100)
    client = models.ForeignKey(ContractUser, on_delete=models.CASCADE)

    def __str__(self):
        return self.warehouse_name

For this model, I will create a WarehouseWritePermission class that inherits from the BasePermission class.

In this class I define two methods: has_object_permission() and has_permission().

The has_permission() method defines permissions when executing the GET and POST methods, while the has_object_permission() method will define permissions when executing the GET method along with a parameter specifying a specific warehouse, as well as when editing the warehouse (PUT method), i.e.

   def has_permission(self, request, view):
        if request.method != "POST":
            return True
        return request.user.profile == "client"

In the above method, only a user with the “client” profile can create a new instance of the warehouse model. Users with the “contractor” profile can only view the list of created warehouses.

    def has_object_permission(self, request, view, obj):
        if request.method in SAFE_METHODS:
            return True
        return obj.client == request.user

In the above method, users can view a specific instance, but only the owner of the warehouse, which is saved in the instance’s client variable, can edit it.

How to add logged in user in serializer in DRF

In the example I discussed, the user appears as a foreign key in the warehouse model, i.e.

source file: backend/contracts/models/warehouse.py

class Warehouse(models.Model):
    warehouse_name = models.CharField(max_length=15, unique=True)
    warehouse_info = models.TextField(max_length=100)
    client = models.ForeignKey(ContractUser, on_delete=models.CASCADE)

    def __str__(self):
        return self.warehouse_name

When creating a new instance of the warehouse model, the user only needs to provide the name of the new warehouse and information about the warehouse (e.g. location or other useful information).

However, to create an instance, it is also necessary to provide a user who is the “owner” of the created warehouse.

I provide the user by adding him to the context in the view, i.e.

source file: backend/api/views/warehouse.py

class WarehouseViewSet(viewsets.ModelViewSet):
    queryset = Warehouse.objects.all()
    serializer_class = WarehouseSerializer
    permission_classes = [permissions.IsAuthenticated]

    def get_serializer_context(self):
        context = super().get_serializer_context()
        context.update({"client": self.request.user})
        return context

The added user can be used during serialization, i.e

source file: backend/api/serializers/warehouse.py

class WarehouseSerializer(serializers.ModelSerializer):

    class Meta:
        model = Warehouse
        fields = ["warehouse_name", "warehouse_info"]

    def create(self, validated_data):
        validated_data["client"] = self.context["client"]
        warehouse = Warehouse.objects.create(**validated_data)
        return warehouse

The logged in user will be added “automatically” when the model instance is created.

DRF – model and API testing

In the project – backend providing data for the frontend, which I’m going to create in the REACT framework, I set an application called contracts containing model – Warehouse.

source file: backend/contracts/model/warehouse.py

from django.db import models
from users.models import ContractUser


class Warehouse(models.Model):
    warehouse_name = models.CharField(max_length=15, unique=True)
    warehouse_info = models.TextField(max_length=100)
    client = models.ForeignKey(ContractUser, on_delete=models.CASCADE)

    def __str__(self):
        return self.warehouse_name

Model tests for the Warehouse model:

source file: backend/contracts/tests/models_tests/test_warehouse.py

from django.test import TestCase
from users.models import ContractUser
from ...models import Warehouse


class WarehouseTestCase(TestCase):

    def setUp(self):
        self.user = ContractUser.objects.create(
            username="user", password="123", email="user@company.com"
        )
        self.warehouse = Warehouse.objects.create(
            warehouse_name="TestWarehouse",
            warehouse_info="TestWarehouse info",
            client=self.user,
        )

    def test_create(self):
        self.assertEqual(self.warehouse.warehouse_name, "TestWarehouse")
        self.assertEqual(self.warehouse.warehouse_info, "TestWarehouse info")
        self.assertEqual(self.warehouse.client, self.user)
        self.assertEqual(str(self.warehouse), "TestWarehouse")

In the test above, I check whether the model instance was created correctly.

Then I create CRUD tests for the API created in DRF:

source file: backend/api/tests/test_warehouse.py

from rest_framework.test import APITestCase
from rest_framework import status
from django.urls import reverse
from users.models import ContractUser
from contracts.models import Warehouse


class WarehouseAPITestCase(APITestCase):

    def setUp(self):
        self.user = ContractUser.objects.create(
            username="TestUser", password="123", email="testuser@company.com"
        )
        self.warehouse = Warehouse.objects.create(
            warehouse_name="Warehouse",
            warehouse_info="Warehouse info",
            client=self.user,
        )

    def test_get(self):
        endpoint = reverse("warehouse-list")
        response = self.client.get(endpoint, format="json")
        self.assertEqual(response.status_code, status.HTTP_200_OK)

    def test_create(self):
        endpoint = reverse("warehouse-list")
        data = {
            "warehouse_name": "Warehouse-new",
            "warehouse_info": "Warehouse-new info",
            "client": self.user.id,
        }
        response = self.client.post(endpoint, data, format="json")
        self.assertEqual(response.status_code, status.HTTP_201_CREATED)

    def test_retrieve(self):
        endpoint = reverse("warehouse-detail", args=[self.warehouse.id])
        response = self.client.get(endpoint, format="json")
        self.assertEqual(response.status_code, status.HTTP_200_OK)
        self.assertEqual(response.data["warehouse_name"], "Warehouse")

    def test_update(self):
        data = {
            "warehouse_name": "Warehouse-upd",
            "warehouse_info": "Warehouse1-upd info",
            "client": self.user.id,
        }
        endpoint = reverse("warehouse-detail", args=[self.warehouse.id])
        response = self.client.put(endpoint, data, format="json")
        self.assertEqual(response.status_code, status.HTTP_200_OK)
        self.assertEqual(response.data["warehouse_name"], "Warehouse-upd")

    def test_delete(self):
        endpoint = reverse("warehouse-detail", args=[self.warehouse.id])
        response = self.client.delete(endpoint)
        self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)

In the above tests, I check the GET, POST, PUT and DELETE request methods.

Custom Permissions in DRF

To create my own permissions for the custom user I created earlier, I create a class that inherits from the BasePermission class.

I’ll create two permission classes:

  • A class that allows access for users who are clients (the role attribute is client). Contractors are not authorized to the resource.
  • A class that allows users who are contractors read-only access.

First of all, in the permissions.py file of the contracts application, I import the BasePermission class and the list of “safe methods”, i.e. allowing the listing of the resource, without the possibility of editing, deleting or adding a new contract.

from rest_framework.permissions import BasePermission, SAFE_METHODS


"""Custom permissions classes"""

The IsClient() class allows access only to clients. Other users will see a message informing about no access to the resource:

class IsClient(BasePermission):
    message = "Only clients can access"

    def has_permission(self, request, view):
        if request.user.role == "client":
            return True
        return False

The IsClientOrReadOnly() class allows users who are clients to add, delete and edit, other users can only view the data. i.e.:

class IsClientOrReadOnly(BasePermission):
    message = "Only clients can modify, contractors can read only"

    def has_permission(self, request, view):
        if request.method in SAFE_METHODS:
            return True
        return request.user.role == "client"

I put the created permission classes in views, e.g.

class ContractViewSet(viewsets.ModelViewSet):
    permission_classes = [IsAuthenticated, IsClientOrReadOnly]
    serializer_class = ContractSerializer

Django Rest Framework – API testing

In this post, I will describe testing the API for the created custom user. (earlier entries – >>part 1<< and >>part 2<<)

Access to the ContactUserCreateRetrieveViewSet view for creating a new user and displaying a specific user is registered in the urls.py file of the users application, i.e.

from rest_framework.routers import DefaultRouter
from .views import ContractUserCreateRetrieveViewSet

router = DefaultRouter()

router.register("", ContractUserCreateRetrieveViewSet, basename="user")

urlpatterns = router.urls

As the basename parameter, I defined it as user, which name I will refer to during the tests.

The users application urls.py file is then imported in the project urls.py file, i.e.

from django.contrib import admin
from django.urls import path, include

urlpatterns = [
    path("admin/", admin.site.urls),
    path("api-auth/", include("rest_framework.urls")),
    path("api/user/", include("users.urls")),
]

So the complete endpoint for creating users will be /api/user/

Creating a user and displaying a specific user will be handled by specific methods from the views.py file of the users application, i.e. creating – the create() function, displaying the user – the retrieve() function.

from django.shortcuts import get_object_or_404
from rest_framework import viewsets, mixins
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from rest_framework import status
from .models import ContractUser
from .serializers import ContractUserSerializer

class ContractUserCreateRetrieveViewSet(
    mixins.CreateModelMixin,
    mixins.RetrieveModelMixin,
    viewsets.GenericViewSet,
):
    serializer_class = ContractUserSerializer
    queryset = ContractUser.objects.all()

    def retrieve(self, request, pk):
        permission_classes = [IsAuthenticated]
        queryset = ContractUser.objects.filter(id=request.user.id)
        user = get_object_or_404(queryset, pk=pk)
        serializer = ContractUserSerializer(user)
        return Response(serializer.data)

    def create(self, request):
        serializer = ContractUserSerializer(data=request.data)
        if serializer.is_valid():
            serializer.save()
            return Response(serializer.data, status=status.HTTP_201_CREATED)
        else:
            return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)

The ContractUserCreateRetrieveViewSet class inherits from the basic GenericViewSet class and the mixins classes, which add the required functionalities – creating and displaying a user.

The create() function serializes the given data and, after checking whether they are correct, saves them in the database and returns the Response object and the appropriate status code.

The retrieve() function checks if the user is logged in and filters users so that the returned queryset contains only the user who is currently logged in, and then returns a Response object and the appropriate status code.

In the signals.py file of the users application, I modify the email field provided by the AbstractUser class so that this field is required when creating the ContractUser model, i.e.

from .models import ContractUser
from django.dispatch import receiver


@receiver(pre_save, sender=ContractUser)
def create_inactive_user(sender, instance, **kwargs):
    instance.is_active = False
    instance._meta.get_field("email").blank = False
    instance._meta.get_field("email").null = False

If we want the email field to be unique, we can set the unique parameter to True, i.e.

    instance._meta.get_field("email")._unique = True

Then, if the user enters an email that is already in the database, a validation error will occur. Unfortunately, thanks to this, it will be known that the user with this email address is already present in the database.

To test creating and displaying a user, I create a tests directory in the users app.

I move the tests.py file created in the previous article, which contained model tests, to it and rename it to test_model.py

In the created tests directory, I create a new file called test_api.py, which will contain api tests for the users application, i.e.

from rest_framework.test import APITestCase
from rest_framework import status
from django.urls import reverse
from ..models import ContractUser


class ContractUserApiTestCase(APITestCase):
    """Testing ContractUser API"""

    def test_create_user(self):
        endpoint = reverse("user-list")
        user_data = {
            "username": "user_1",
            "password": "123456789",
            "email": "user_1@company.com",
        }
        response = self.client.post(endpoint, data=user_data, format="json")
        self.assertEqual(response.status_code, status.HTTP_201_CREATED)

    def test_retrieve_user(self):
        user = ContractUser.objects.create(
            username="user1", password="12345", email="user1@company.com"
        )
        self.client.force_authenticate(user)
        endpoint = reverse("user-detail", args=[user.id])
        response = self.client.get(endpoint, format="json")
        self.assertEqual(response.status_code, status.HTTP_200_OK)

In the test_create_user() method, I create an endpoint using the reverse() function. The reverse() function takes as a parameter the basename given in the urls.py file with the -list suffix. Then I create data for the test user and using the default client present in the APITestCase class, I make a query using the POST method. The assertion checks whether the status code of the received response is 201 when the user was created successfully, or 400 otherwise.

In the test_retrieve_user() method, I create a test user and then log him in using the force_authenticate() function. The reverse() function takes the basename value given in the urls.py file with the -detail suffix as a parameter. As an additional parameter, I include the logged in user’s id. The assertion checks if the response status code is 200 if the query was successful or if the status is 400 otherwise.

Django – model testing

To test the custom ContractUser model created in the previous article, in the tests.py file of the users application, I import the TestCase class from the django.test module and the tested model, i.e.

A new ContractUserTestCase class that inherits from the TestCase class is created.

In the setUp() method, I create two temporary users user1 and user2.

In the test_create_user() method, I run 1 test consisting of several assertions, in particular the text representation of the created instance suggested by the coverage tool, i.e.

class ContractUserTestCase(TestCase):
    def setUp(self):
        self.user1 = ContractUser.objects.create(
            username="user_1",
            password="12345",
            role="contractor"
        )
        self.user2 = ContractUser.objects.create(
            username="user_2",
            password="12345")

    def test_create_user(self):
        self.assertEqual(self.user1.username, "user_1")
        self.assertEqual(self.user2.username, "user_2")
        self.assertEqual(self.user1.role, "contractor")
        self.assertEqual(self.user2.role, "client")
        self.assertEqual(str(self.user1), "user_1")
        self.assertEqual(str(self.user2), "user_2")

I test other models from the contracts application in a similar way.

The Contract model also includes a custom validator that checks that the delivery date is not earlier than the order date.

part of the Contract model from models.py of the contracts application:

class Contract(models.Model):
    class Meta:
        ordering = ("-date_of_order",)

    class StatusChoices(models.TextChoices):
        OPEN = "open"
        ACCEPTED = "accepted"
        CANCELED = "cancelled"

    class DeliveryDateValidator:
        def validate(value):
            if value < timezone.now().date():
                raise validators.ValidationError(
                    "Date of delivery can't be earlier than date of order"
                )
            else:
                return value

    date_of_order = models.DateField(default=timezone.now)

    date_of_delivery = models.DateField(
        validators=(DeliveryDateValidator.validate,)
)

             ..........

To test the custom validator, I create a separate test that checks whether the ValidationError exception will be raised and whether the entered date will be returned when the correct date is given (not older than the order date).

    def test_delivery_date_validator(self):
        with self.assertRaises(ValidationError):
            self.contract.full_clean()
        self.contract.date_of_delivery = datetime.strptime(
            "2200-01-01", "%Y-%m-%d"
        ).date()
        self.assertEqual(
            self.contract.DeliveryDateValidator.validate(
                self.contract.date_of_delivery
            ),
            self.contract.date_of_delivery,
        )

Django Rest Framework – creating users

First, I create a new application called users.

python manage.py startapp users

In the models.py file, I create a custom user that inherits from the AbstractUser class. I’m adding a role checkbox so that the user will be either a principal or a contractor (client or contractor).

from django.contrib.auth.models import AbstractUser
from django.db import models


class ContractUser(AbstractUser):
    class RoleChoices(models.TextChoices):
        CLIENT = "client"
        CONTRACTOR = "contractor"

    role = models.CharField(
        max_length=10,
        choices=RoleChoices.choices,
        default=RoleChoices.CLIENT
    )

    def __str__(self):
        return self.username

The new user model should be registered in the project’s settings.py file, i.e.

AUTH_USER_MODEL = "users.ContractUser"

In the users application, I create a serializers.py file containing the serializer of the new user model, i.e.

from django.contrib.auth.hashers import make_password
from rest_framework import serializers
from .models import ContractUser


class ContractUserSerializer(serializers.ModelSerializer):
    password = serializers.CharField(
        min_length=8,
        write_only=True,
        style={"input_type": "password"}
    )

    class Meta:
        model = ContractUser
        fields = [
             "id",
             "username",
             "password",
             "email",
             "role"
        ]

    def create(self, validated_data):
        validated_data["password"] = make_password(validated_data["password"])
        return super().create(validated_data)

The serializer includes a CharField to enter a password. This field contains the write_only parameter set to True to allow only writing and not reading of this field. The fields field of the Meta internal class contains a list of all available serializable fields. The create() method creates a password based on user input using the make_password() function.

In the urls.py file of the users application, I define an endpoint, i.e.

from rest_framework.routers import DefaultRouter
from .views import ContractUserCreateRetrieveViewSet

router = DefaultRouter()

router.register("", ContractUserCreateRetrieveViewSet, basename="user")

urlpatterns = router.urls

I attach the endpoint for the users application to the main urls.py file of the project:

from django.contrib import admin
from django.urls import path, include

urlpatterns = [
    path("admin/", admin.site.urls),
    path("api/user/", include("users.urls")),
]

The /api/user/ endpoint will allow you to create a user and view data for a given user. To do this, I create a ContractUserCreateRetrieveViewSet() class in the views.py file. To limit the number of available methods, I do not use the ModelViewSet class, but inherit from the GenericViewSet class and the appropriate Mixin classes.

from django.shortcuts import get_object_or_404
from rest_framework import viewsets, mixins
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from rest_framework import status
from .models import ContractUser
from .serializers import ContractUserSerializer

class ContractUserCreateRetrieveViewSet(
    mixins.CreateModelMixin,
    mixins.RetrieveModelMixin,
    viewsets.GenericViewSet,
):
    serializer_class = ContractUserSerializer
    queryset = ContractUser.objects.all()

    def retrieve(self, request, pk):
        permission_classes = [IsAuthenticated]
        queryset = ContractUser.objects.filter(id=request.user.id)
        user = get_object_or_404(queryset, pk=pk)
        serializer = ContractUserSerializer(user)
        return Response(serializer.data)

    def create(self, request):
        serializer = ContractUserSerializer(data=request.data)
        if serializer.is_valid():
            serializer.save()
            return Response(serializer.data, status=status.HTTP_201_CREATED)
        else:
            return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)

To make the newly created user inactive, I used the pre_save signal. For this purpose, I created a new signals.py file in the users application, i.e.

from django.db.models.signals import pre_save
from .models import ContractUser
from django.dispatch import receiver

@receiver(pre_save, sender=ContractUser)
def create_inactive_user(sender, instance, **kwargs):
    instance.is_active = False

The signal should be imported in the ready() function of the UsersConfig() class from the apps.py file:

from django.apps import AppConfig


class UsersConfig(AppConfig):
    default_auto_field = "django.db.models.BigAutoField"
    name = "users"

    def ready(self):
        import users.signals

Tkinter app – data from remote API

source code is available on my GitHub

In this example, the data comes from the Orlen wholesale price archive.

Due to the fact that the way of downloading data from the website has changed, I also had to change the way my program works (the previous version used searching the website in order to find the appropriate values ​​- web scraping). Now it has become necessary to use an API to obtain price data.

To determine what a correct query to the Orlen API should look like, I used the Web tab of the developer mode of the browser.

The program will download data of fuel prices, and display data and a chart according specific years and months.

The project will consist of the main program file: orlen-prices.py and the utils.py file containing a function to download responses from an external API and a function to convert data in json format to the appropriate Pandas data frame.

Downloaded data will be cached in a simple way to limit the number of requests to the API.

First, I will describe the fetch_data() function from the utils.py file responsible for fetching the response from an external server. If the query is successful, the function returns a response in json format. If an exception occurs, the function returns None.

def fetch_data(product_id, current_date):
        url = f'https://tool.orlen.pl/api/wholesalefuelprices/'\
                    f'ByProduct?productId={product_id}'\
                        f'&from=2000-01-01'\
                            f'&to={current_date}'
        try:
            response = requests.get(url)
        except requests.RequestException:
            return None
        else:
            return response.json()

The next function from the utils.py file converts the data in json format to a Pandas dataframe. The resulting frame will contain only two columns, the rest of the data is omitted. The effectiveDate column will contain datetime values ​​and will be set as an index (to facilitate searching by date). The value column will be of type int. Instead, the entire frame will be sorted by index.

def json_to_df(data):
    df = pd.json_normalize(data)
    df = df[['effectiveDate', 'value']]
    df['effectiveDate'] = pd.to_datetime(df['effectiveDate'])
    df['value'] = df['value'].astype(int)
    df.set_index(df['effectiveDate'], inplace=True)
    df.sort_index(inplace=True)
    return df

The main file contains the MainWindow class responsible for displaying the program window and all widgets, which is initialized by default:

if __name__ == "__main__":
    root = tk.Tk()
    root.protocol("WM_DELETE_WINDOW", MainWindow._exit)
    app = MainWindow(root)
    app.mainloop()

The above code creates an instance of the MainWindow class and sets a function that allows the program to close – it’s a static function of the class:

background – defines the background color of the TreeView widget

The MainWindow class will inherit from the Frame class of the tkinter library:

import tkinter as tk
class MainWindow(tk.Frame):
    pass

First, I define class constants and variables, e.g.

  • background – defines the background color of the TreeView widget
  • price_data – a dictionary containing data downloaded from the API
  • current_date – variable containing the current date
  • months – list of month abbreviations
  • products – a dictionary containing product names and their codes
    bacground = '#EEEEEE'
    price_data = {}
    current_date = date.today()
    months = calendar.month_abbr
    products = {
        'Eurosuper95': 41,
        'SuperPlus98': 42,
        'Bio100': 47,
        'Arktyczny2': 44,
        'Ekodiesel': 43,
        'GrzewczyEkoterm': 46,
        'MiejskiSuper' : 45
    }

In the init() method, I initialize the values ​​from the parent class, and then run the create_UI() method responsible for displaying the widgets.

    def __init__(self, root):
        super().__init__(root)
        self.create_UI(root)

the createUI() method sets the size of the program window to maximized, the window title is set and then the appropriate widgets are initialized, i.e.

  • the show_menu() method sets the menu bar
  • the show_product_combobox() method sets the product combobox
  • the show year_combobox() method sets the year box
  • the show_radiobuttons() method sets the radio buttons to select the month
  • the show_chart_button() method sets the chart display button
  • the show_msg_label() method sets the status bar label
  • the show_data_table() method sets up a widget that displays price data
def create_UI(self, root):
    root.state('zoomed')
    root.title('Orlen wholesale prices')
    self.show_menu(root)
    self.show_product_combobox(root)
    self.show_year_combobox(root)
    self.show_radiobuttons(root)
    self.show_chart_button(root)
    self.show_msg_label(root)
    self.show_data_table(root)

From the menu it will be possible to optionally close the program by selecting File/Exit, i.e.

    def show_menu(self, root):
        # Menu bar
        menu_bar = tk.Menu(root)
        root.config(menu=menu_bar)
        # File menu 
        file_menu = tk.Menu(menu_bar, tearoff=0)
        file_menu.add_command(label='Exit', command=self._exit)
        menu_bar.add_cascade(label='File', menu=file_menu)

The show_product_combobox() method initializes the Combobox widget, which will be responsible for selecting the product for which we want to track price changes. The selection will be passed using the product_sv variable – StringVar object. Before the user selects the appropriate product, a hint is displayed on the widget.

    def show_product_combobox(self, root):
        self.product_sv = tk.StringVar(value='')
        product_cb = ttk.Combobox(root, textvariable=self.product_sv)
        product_cb['values'] = [x for x in self.products]
        product_cb.set('Product')
        product_cb.state = 'readonly'
        product_cb.bind('<<ComboboxSelected>>', self.enable_show_chart_btn)
        product_cb.grid(row=0, column=0, padx=10, pady=10)

When the user selects the appropriate product, the associated method is run: enable_show_chart_button(), because by default the chart drawing button is inactive. Initially, the condition is checked whether the button is inactive, then whether the product and year have been selected. If all these conditions are met, the state of the button changes to active.

    def enable_show_chart_btn(self, sender):
        if str(self.show_chart_btn['state']) == 'disabled' and self.product_sv.get() != 'Product' and self.year_sv.get() != 'Year':
            self.show_chart_btn['state'] = '!disabled'

The method of selecting the year for which we want to view data looks similar. Selectable year values ​​range from 2004 to the current year, with the current year being the highest in the list.

    def show_year_combobox(self, root):
        self.year_sv = tk.StringVar(value='')
        year_cb = ttk.Combobox(root, textvariable=self.year_sv)
        year_cb['values'] = [x for x in range(self.current_date.year, 2004-1, -1)]
        year_cb.set('Year')
        year_cb.state = 'readonly'
        year_cb.bind('<<ComboboxSelected>>', self.enable_show_chart_btn)
        year_cb.grid(row=0, column=1, padx=10, pady=10)

The radio fields for selecting a specific month are embedded in a separate frame, and the selected value is passed by the radio_sv variable. Since calendar.month_abbr returns a list where index 1 corresponds to January, and so on, the value at index 0 contains an empty string. I used this to put the default option – All – to display values ​​for all months of a given year. The enumerate() function used in the list is to place the next radio widget in the new column.

    def show_radiobuttons(self, root):
        # Frame for Radiobuttons
        rb_frame = tk.Frame(root)
        rb_frame['borderwidth'] = 1
        rb_frame['relief']='groove'
        rb_frame.grid(row=0, column=2, sticky='w')
        # Radiobuttons
        self.radio_sv = tk.StringVar(value='All')
        for i, month in enumerate(self.months):
            if month=="":
                month_rb = tk.Radiobutton(
                    rb_frame, 
                    text='All',
                    value='All', 
                    variable=self.radio_sv)
            else:
                month_rb = tk.Radiobutton(
                    rb_frame, 
                    text=month,
                    value=month, 
                    variable=self.radio_sv)
            month_rb.grid(row=0, column=i)

The button that starts drawing the chart is inactive by default. Only selecting the product and year changes the state of the button to active. Pressing the button triggers the show_chart_btn_clicked() method.

    def show_chart_button(self, root):
        self.show_chart_btn = ttk.Button(
            root,
             text='Show Chart',
             command=self.show_chart_btn_clicked)
        self.show_chart_btn['state'] = 'disabled'
        self.show_chart_btn.grid(row=0, column=3, padx=10, sticky='w')

Data is placed in the Treeview object. These data are downloaded for a specific product and date after pressing the button.

    def show_data_table(self, root):
        # Show Data Frame
        data_frame = tk.Frame(root)
        data_frame['borderwidth'] = 1
        data_frame['relief']='groove'
        data_frame.grid(row=1, column=0, columnspan=2, sticky=tk.N)
        # Show Data Table
        nametofont("TkHeadingFont").configure(weight='bold')
        self.table = ttk.Treeview(data_frame, 
                    show='headings')
        scrollbar = ttk.Scrollbar(data_frame, 
                                orient='vertical',
                                command=self.table.yview)
        scrollbar.grid(row=0, column=1, sticky=tk.NS)
        
        self.table.configure(yscrollcommand=scrollbar.set)

The program has a status bar showing info about correct loading of data, lack of data for a selected product in a selected period or errors related to e.g. lack of internet connection.

    def show_msg_label(self, root):
        self.msg = tk.StringVar()
        msg_label = tk.Label(root, 
                                relief='groove',
                                anchor='w',
                                textvariable=self.msg)
        msg_label.grid(row=3, column=0, columnspan=6, sticky='ews')
        root.grid_rowconfigure(3, weight=1)
        root.grid_columnconfigure(5, weight=1)

Selecting the graph button does the following:

  • clears the status bar message
  • if the product data has already been downloaded, it does not query the external API
  • if the data are not downloaded yet, an attempt is made to download them from the server. When the attempt fails, a download error message is displayed
  • the data is filtered based on the date values ​​selected by the user
  • the filtered data is passed to the methods: _set_data_table(), which updates the numerical data, and _show_chart(), which displays the chart
def show_chart_btn_clicked(self):
        self.msg.set('')
        product_id = self.products[self.product_sv.get()]
        if product_id in self.price_data:
            data = self.price_data[product_id]
        else:
            data = fetch_data(product_id, self.current_date)
            if data is not None:
                self.price_data[product_id] = data
            else:
                self.msg.set('Error fetching data')
                return 
        product_df = json_to_df(data)
        year = self.year_sv.get()
        month = self.radio_sv.get()
        self.table.delete(*self.table.get_children())
        try:
            if month == 'All':
                chart_df = product_df.loc[year]
            else: 
                month_int = datetime.strptime(month, '%b').month
                chart_df = product_df.loc[f'{year}-{month_int}']
        except:
            self.msg.set('No data for selected date')
            for widget in self.chart_canvas.winfo_children():
                widget.destroy()
        else:
            self.msg.set('OK')
            self._set_data_table(chart_df[::-1])
            self._show_chart(root, chart_df, year, month)

Updating data in the Treeview object displaying numerical data on prices:

def _set_data_table(self, df):
        for i, row in enumerate(df.iloc):
            date, value = row
            if i % 2:
                self.table.insert(
                    parent='', index=i, text='', 
                    values=(date.date(), value),
                    tag='odd')
            else:
                self.table.insert(
                    parent='', index=i, text='', 
                    values=(date.date(), value))

Displaying the graph:

def _show_chart(self, root, df, year, month):
        # Show Chart Frame
        self.chart_canvas = tk.Frame(root)
        self.chart_canvas['borderwidth'] = 1
        # self.chart_canvas['relief']='groove'
        self.chart_canvas.grid(row=1, column=2)
        # Chart
        date = [str(x.date()) for x in df['effectiveDate']]
        price = df['value'].tolist()
        fig, ax = plt.subplots()
        if month == 'All':
            month = ''
        ax.set_title(
            f'Orlen wholesale prices \
                {self.product_sv.get()} [PLN/m3] - {month} {year}')
        ax.set_xlabel('Date')
        ax.set_ylabel('Price [PLN]')
        fig.autofmt_xdate()
        ax.grid(True)
        ax.xaxis.set_major_locator(plt.MaxNLocator(12))
        ax.yaxis.set_major_locator(plt.MaxNLocator(12))
        textstr='(c) S.Kwiatkowski'
        props = dict(boxstyle='round', alpha=0.5)
        ax.plot(date, price, c='#CA3F62')
        ax.text(0.8, 0.95, textstr, transform=ax.transAxes, fontsize=8,
            verticalalignment='top', bbox=props)
        canvas = FigureCanvasTkAgg(fig, master=self.chart_canvas)
        canvas.draw()
        canvas.get_tk_widget().grid(row=1, column=2, columnspan=2, rowspan=2, sticky='nswe')
        

Bulma – modal component + React +Flask-RESTful

In the post I will describe how to activate the modal component from the Bulma framework, so that after pressing the delete row-contract button in the contract table, it is possible to confirm or cancel the deletion of the contract.

<tbody>
  {cursor && Object.keys(cursor).map(
      (keyName, keyIndex) =>
      <tr  className="has-text-centered is-size-7"
           key={keyIndex}>
          <td>{keyIndex+1}</td>
          <td>{(cursor[keyName].status)}</td>
          <td>{(cursor[keyName].contract_number)}</td>
          <td>{(cursor[keyName].contractor)}</td>
          <td>{(cursor[keyName].customer)}</td>
          <td>{(cursor[keyName].date_of_order)}</td>
          <td>{(cursor[keyName].date_of_delivery)}</td>
          <td>{(cursor[keyName].pallets_position)}</td>
          <td>{(cursor[keyName].pallets_planned)}</td>
          <td>{(cursor[keyName].pallets_actual)}</td>
          <td>{(cursor[keyName].warehouse)}</td>
          <td>
            <FaEdit />
            <FaTrashAlt onClick={() => 
               showModal(cursor[keyName].id)}
               title={`Delete contract ${keyIndex+1}`} />
          </td>
      </tr>)}   
</tbody>
Contract deletion

Instead of directly calling the deleteContract() function, the showModal() function is called, which gets as the id parameter of the selected contract.

The showModal() function sets the variables id and modal, respectively.

const [modal, setModal] = useState(false)
const [id, setId] = useState()

const showModal = (id) => {
        setId(id)
        setModal(!modal)
    }

The id variable stores the id number of the selected contract, while the modal variable stores the boolean value that specifies the state of the modal window (displayed or invisible-default). Calling the setModal() function changes the default value – false of the modal variable to true.

The code of the modal window displaying the confirmation of contract deletion looks like this:

<div class={`modal ${modal && 'is-active'}`}>
  <div class="modal-background"></div>
  <div class="modal-card column is-one-quarter">
     <header class="modal-card-head">
        <p class="modal-card-title has-text-danger">
           Delete Contract?
        </p>
      </header>
      <section class="modal-card-foot">
         <button class="button is-danger" 
             onClick={() => deleteContract(id)}>Delete
         </button>
         <button class="button"
             onClick={() => setModal(!modal)}>Cancel
         </button>
       </section>
  </div>
</div>

Selecting the Cancel button makes the modal window invisible. Selecting the Delete button activates the function of deleting the contract, i.e.

const deleteContract = (id) => {
  setCursor(Object.values(cursor)
    .filter((row) => row.id !== id))
  fetch('/api/contract/delete', {
    method: 'POST',
    headers: {
      'Content-type': 'application/json',
     },
       body: JSON.stringify({'id': id, 'token': token})
  })
  .then(res => res.json())
  .then(data => console.log(data))
  .catch((err) => console.log(err));
  setModal(!modal)
}

The setCursor function gets the values of the cursor object and filters them so that the selected contract is discarded. Then the api is called in Flask, in addition to the contract id, the user’s token is also sent. The result of a successful contract canceling is displayed in the console.

What the API looks like in Flask I described in the previous post.

In the last row of the deleteContract() function, I set the modal window inactive.

Delete a record (Flask-RESTful, SQLAlchemy, React, and Bulma CSS framework)

In the contract table, after pressing the appropriate button to delete a row in the table, the deleteContract function is activated, which takes the contract id value as a parameter, i.e.:

<td>
    <FaTrashAlt onClick={() => deleteContract(cursor[keyName].id)} />
</td>

The deleteContract function filters the data of the cursor variable in such a way that the row that we want to delete is rejected. Then a query is called to the backend, where the POST method sends the value of the contract id and the token. When the “deletion” of the record is successful, the API sends information in the message variable (in the given example, the value of this variable is displayed in the console).

const deleteContract = (id) => {
    setCursor(Object.values(cursor).filter((row) => row.id !== id))
    fetch('/api/contract/delete', {
            method: 'POST',
            headers: {
                'Content-type': 'application/json',
            },
            body: JSON.stringify({'id': id, 'token': token})
        })
        .then(res => res.json()).then(data => console.log(data)).catch((err) => console.log(err));
    }

Backend we Flask-RESTful z wykorzystaniem SQLAlchemy:

class DeleteContract(Resource):
    def post(self):
        req = request.json
        contract_id = req['id']
        token = req['token'].strip('"')
        user = User.query.filter_by(token=token).first()
        if user.role == 'Customer':
            contract = Contract.query.filter_by(customer=user.username).filter_by(id=contract_id).first()
            if contract:
                contract.status ='cancelled'
                db.session.commit()
                return {"message": "Row deleted"}

I assign the id and token values sent using the POST method to the corresponding variables contract_id and token. Then I find the user in the database based on the token sent. If the user belongs to the Customer group, the contract is searched based on the user name and the contract id number. If this contract exists, instead of removing it from the database, I change its status to cancelled.

I also updated the post() method of the AllContracts(Resource) class so that the created cursor object does not contain records with an cancelled status, i.e.:

class AllContracts(Resource):
    def get(self, token):
        token = token.strip('"')
        user = User.query.filter_by(token=token).first()
        if user.role == 'Customer':
            contracts = Contract.query.filter(
                Contract.status!='cancelled').
                filter_by(customer=user.username).
                order_by(Contract.id.desc()).all()
        elif user.role == 'Contractor':
            contracts = Contract.query.
                    filter(Contract.status!='cancelled').
                    filter_by(contractor=user.username).
                    order_by(Contract.id.desc()).all()
        cursor = {}
        for i, contract in enumerate(contracts):
            cursor[i] = contract.serialize()
        return cursor

The useEffect() hook for loading contracts after selecting the Contracts option from the menu looks like this:

    useEffect( 
        () => {
            const token = sessionStorage.getItem('token')
            fetch('/api/contract/' + token)
            .then(res => res.json()).then(data => setCursor(data)).catch((err) => console.log(err))
        }, [token, setCursor],
    );

React, Flask-restful and SQLAlchemy – getting data from the database

Frontend:

I download the data from the backend when selecting the option from the Navbar component of Bulma framework. I use the useEffect hook to get the data to be read during component rendering. Then, a query to the backend is created, and a token is passed as a parameter, set when the user logs in. The response is sent from the backend in the form of a Promise object, from which if the query is completed successfully, the cursor variable is set (change of state with setCursor). Cursor and setCursor function are passed to the component from the parent App.js component as parameters (props), i.e.

import LoginForm from '../Forms/LoginForm/LoginForm'
import { useEffect } from "react";

function Contracts({token, putToken, cursor, setCursor}) {

    useEffect( 
        () => {
            const token = sessionStorage.getItem('token')
            fetch('/api/contract/' + token)
            .then(res => res.json()).then(data => setCursor(data)).catch((err) => console.log(err))
        }, [token, setCursor],
    );
    
    if (!token) {
        return <LoginForm putToken={putToken} />
    };

In order for the component to refresh the data after successfully logging in, in the additional parameter of the useEffect method in the array, I put the token variable, which is set in the LoginForm.js component.

In order for the component to refresh the data after changing the cursor content, e.g. by adding a new contract, I have placed the setCursor function as an additional parameter of the useEffect method.

Individual data lines are placed in the table, i.e.

<tbody>
    {cursor && Object.keys(cursor).map((keyName, keyIndex) => 
    <tr key={keyIndex}>
            <td>{keyIndex+1}</td>
            <td>{(cursor[keyName].status)}</td>
            <td>{(cursor[keyName].contract_number)}</td>
            <td>{(cursor[keyName].contractor)}</td>
            <td>{(cursor[keyName].customer)}</td>
            <td>{(cursor[keyName].date_of_order)}</td>
            <td>{(cursor[keyName].date_of_delivery)}</td>
            <td>{(cursor[keyName].pallets_position)}</td>
            <td>{(cursor[keyName].pallets_planned)}</td>
            <td>{(cursor[keyName].pallets_actual)}</td>
            <td>{(cursor[keyName].warehouse)}</td>
     </tr>)}   
</tbody>

Backend with Flask-restful:

In the init.py file, I add another resource class:

from api.resources.contracts import AllContracts

api.add_resource(AllContracts, '/api/contract/<token>', endpoint='all_contracts')

In contracts.py, I define the resource class:

from datetime import datetime
from flask import request
from flask_restful import Resource
from .. import db
from ..common.models import User, Contract

class AllContracts(Resource):
    def get(self, token):
        token = token.strip('"')
        user = User.query.filter_by(token=token).first()
        contracts = Contract.query.filter_by(customer=user.username).order_by(Contract.id.desc()).all()
        cursor = {}
        for i, contract in enumerate(contracts):
            cursor[i] = contract.serialize()
        return cursor

In the models.py file I define, among others, how the Contract class inheriting from the Model class from SQLAlchemy is to be serialized:

from .. import db
from datetime import datetime

class Contract(db.Model):
    '''Model of contract between a contractor and a customer'''

    id = db.Column(db.Integer, primary_key=True)
    status = db.Column(db.String(10), nullable=False, default='open')
    contract_number = db.Column(db.String(20), nullable=False)
    contractor = db.Column(db.String(20), db.ForeignKey('user.username'), nullable=False)
    customer = db.Column(db.String(20), db.ForeignKey('user.username'), nullable=False)
    date_of_order = db.Column(db.Date, nullable=False, default=datetime.utcnow)
    date_of_delivery = db.Column(db.Date, nullable=False)
    pallets_position = db.Column(db.Integer)
    pallets_planned = db.Column(db.Integer, nullable=False)
    pallets_actual = db.Column(db.Integer)    
    warehouse = db.Column(db.String(10), nullable=False)

    def serialize(self):
        return {'id': self.id,
                'status': self.status,
                'contract_number': self.contract_number,
                'contractor': self.contractor,
                'customer': self.customer,
                'date_of_order': datetime.strftime(self.date_of_order, '%Y-%m-%d'),
                'date_of_delivery': datetime.strftime(self.date_of_delivery, '%Y-%m-%d'),
                'pallets_position': self.pallets_position,
                'pallets_planned': self.pallets_planned,
                'pallets_actual': self.pallets_actual,
                'warehouse': self.warehouse 
                }

React and Bulma navbar

Problems solved:

  1. How to collapse a burger menu when selecting options from the menu (by default, selecting options from the burger menu leaves the menu expanded).
  2. How to collapse a pull-down menu when selecting an option. (by default, the drop-down menu does not close when an option is selected)

re. 1

The menu finally collapses when an option is selected.
<div id="navbarBasicExample" className={`navbar-menu ${burgerActive? "is-active": ""}`}>
                <div class="navbar-start">
                <Link to="/" class="navbar-item" onClick={handleOnClick}>
                    Home
                </Link>

wherein:

    const [burgerActive, setBurgerActive] = useState(false)

    const handleOnClick = () => {
        setBurgerActive(false)
    }

re. 2

The pull-down menu collapses when an option is selected.
<div class="navbar-item has-dropdown is-hoverable" key={location.pathname}>

wherein:

import {
    Link, useLocation
  } from 'react-router-dom' 

const Navbar = () => {
    let location = useLocation();

Order processing for Logistics Centers

The application allows the customer to place new orders and accept orders by the contractor as well as generate reservations, i.e. manage delivery time slots to central warehouses and logistics centers.

The source code for the application is available here.

In this post, I will introduce the new_booking(id) function which handles the booking for a specific order (contract). It takes the contract identification number as a parameter, i.e.

@bp.route('/booking/<int:id>', methods=['GET', 'POST'])
@login_required
def new_booking(id):

Then, in the new_booking () function, I create a booking form and get the booking object for a given contract, i.e.

    form = BookingForm()
    result = Booking.query.filter_by(contract_id=id).first()

Next, I set the contract object for the booking, i.e.

    current_contract = Contract.query.get(id)     # Returns contract for current booking

In the following lines I get a list of orders that were generated for a specific warehouse on the day of delivery, i.e.

# Filtering by one column gives a list of tuple(s) so I converted it to a list of values
    contracts = [ids[0] for ids in Contract.query.with_entities(Contract.id).filter_by(
        date_of_delivery=current_contract.date_of_delivery).filter_by(
        warehouse=current_contract.warehouse).all()]

Then I validate if the form has been submitted. Depending on whether the reservation for a given contract has already been created, I update the data in the database based on the data from the form. If the booking object does not exist, I create a new booking object and save it to the database, i.e.

    if form.validate_on_submit():
        if result:
            result.booking_time = form.booking_time.data
            result.driver_full_name = form.driver_full_name.data
            result.driver_phone_number = form.driver_phone_number.data
            result.truck_reg_number = form.truck_reg_number.data
            db.session.commit()
        else:
            booking = Booking(booking_time=form.booking_time.data, 
                            contract_id = id,
                            driver_full_name=form.driver_full_name.data,
                            driver_phone_number=form.driver_phone_number.data,
                            truck_reg_number=form.truck_reg_number.data)
            db.session.add(booking)
            db.session.commit()

Next, I change the order status to accepted and redirect to the function displaying all orders for a given contractor, i.e.

        contract = Contract.query.get(id)
        contract.status = 'accepted'
        db.session.commit()
        page = session.get('page')
        per_page = session.get('per_page')
        return redirect(url_for('contracts.contracts', page=page, per_page=per_page))

If the page is loaded using the GET method, then depending on whether the booking is already present in the system or is being created, the data is completed in the form, i.e. if the booking is available and we open it, e.g. to edit the data, then the form will be completed with the previous one entered data. However, if the reservation is just being created, the form will contain a modification, the dates of which cannot be selected, because they have already been selected by other suppliers, i.e.

    if result is not None:
        reserved_booking_time = [times[0] for times in 
            Booking.query.with_entities(Booking.booking_time).filter(
            Booking.contract_id.in_(contracts)).all() if times[0]!=result.booking_time]   
        form.booking_time.data = result.booking_time
        form.driver_full_name.data = result.driver_full_name
        form.driver_phone_number.data = result.driver_phone_number
        form.truck_reg_number.data = result.truck_reg_number   
    else:
        reserved_booking_time = [times[0] for times in 
            Booking.query.with_entities(Booking.booking_time).filter(
            Booking.contract_id.in_(contracts)).all()] 
    return render_template('booking.html', form=form, reserved_booking_time=reserved_booking_time)

The complete code for the new_booking() function is available here.

A contract accepted in this way can only be edited by the customer. However, if the contract has already been accepted and the booking has been made, the change of the contract changes the status of the order from accepted back to open and requires the contractor to accept the changes again (but the entered data are present and the reservation is does not require data to be repeated).

The contractor may also download a pdf of the booking if the contract is accepted (for open contract the option to download pdf is inactive).

Reservation made – now you can generate pdf

Reportlab – generating and downloading pdf in Flask without creating the file on the server

Solved problem: how to generate pdf with order data and reservation barcode without saving the file on disk, which can then be downloaded by the website user in Flask.

We can use the Canvas object to embed document elements such as texts, tables, etc. As an argument we can give the name of the created pdf, but in this case I will use the buffer instead of creating the file on the server. I’m explicitly entering the page size as A4.

from io import BytesIO
from reportlab.pdfgen.canvas import Canvas
from reportlab.lib.pagesizes import A4

buffer = BytesIO()
canvas = Canvas(buffer, pagesize=A4)

To use non-standard characters, register the appropriate fonts that support the required characters. In the sample pdf file, I used the Vera font.

from reportlab.pdfbase import pdfmetrics
from reportlab.pdfbase.ttfonts import TTFont

pdfmetrics.registerFont(TTFont('Vera', 'Vera.ttf'))

The registered font can now be used:

canvas.setFont("Vera", size=10)

I place the images (in this case, the reservation barcode) generated on the basis of the data from the application in the document being created using the drawImage () method, which takes the ImageReader object as the first argument, i.e.

im = ImageReader(image)
canvas.drawImage(im, x=0, y=-5*cm, width=150, height=100)

A list of text strings can be added to a document using a text object, i.e.

txt_obj = canvas.beginText(14, -6.5 * cm)
txt_lst = ["line of text 1", "line of text 2", "line of text 3"]
for line in txt_lst:
        txt_obj.textOut(line)
        txt_obj.moveCursor(0, 16)
canvas.drawText(txt_obj)

A table can be added to a document as a Table class object. I separately define a list containing the data of individual table rows (table_data variable). I also define styles that are valid for all or part of the table.

t = Table(table_data, colWidths=[60, 230, 70, 60, 50], rowHeights=30)
style = [('BACKGROUND',(0,0),(-1,-2),colors.lightblue),
            ('ALIGN',(0,-1),(-1,-1),'CENTER'),
            ('BOX', (0,0), (-1,-1), 0.25, colors.black),
            ('INNERGRID', (0,0), (-1,-1), 0.25, colors.black),
            ('FONTSIZE', (0,0), (-1,-1), 10),
            ('ALIGN', (0, 0), (-1, -1), 'CENTER'),
            ('VALIGN', (0, 0), (-1, -1), 'MIDDLE')]
t.setStyle(tblstyle=style)
t.wrapOn(canvas, 10, 10)
    t.drawOn(canvas=canvas, x=20, y=-22*cm)

Then I save the created Canvas object and return the resulting buffer to the controller. In order to send a pdf file based on data from the buffer, I use the send_file () Flask function:

from flask.helpers import send_file

@bp.route('/get-pdf/<int:id>', methods=['GET'])
@login_required
def get_pdf(id):
    contract = Contract.query.get(id)
    contractor = User.query.get(contract.contractor_id)
    booking = Booking.query.filter_by(contract_id=contract.id).first()
    pdf = create_pdf(booking_no=booking.id,
                        contractor=contractor.username,
                        contractor_no=contractor.id,
                        truck_plate=booking.truck_reg_number,
                        warehouse=contract.warehouse,
                        date=contract.date_of_delivery,
                        time=booking.booking_time,
                        pallets_pos=contract.pallets_position,
                        pallets=contract.pallets_actual)
    pdf.seek(0)
    return send_file(pdf, as_attachment=True, mimetype='application/pdf',
        attachment_filename='booking.pdf', cache_timeout=0)
sample file generated on the server side, downloaded by the contractor.