Создание приложений на FastAPI. Часть вторая: Загрузка изображений

Создание приложений на FastAPI. Часть вторая: Загрузка изображений

Картинка к публикации: Создание приложений на FastAPI. Часть вторая: Загрузка изображений

Введение в работу с S3

Основы работы с хранилищем

Когда мы говорим о современных веб-приложениях, особенно о тех, что активно работают с мультимедиа и большими объемами данных, необходимость в надежном и масштабируемом хранилище становится критической. В этом контексте Amazon Web Services или аналогом в РФ VK Cloud Storage, S3 (Simple Storage Service) становится одним из наиболее популярных решений. Давайте разберемся, почему это так, и какие ключевые особенности делают S3 идеальным выбором для многих разработчиков и компаний.

S3 — это объектное хранилище, предназначенное для хранения и извлечения любых объемов данных из любого места в интернете. В отличие от традиционных файловых систем или реляционных баз данных, S3 оперирует понятием "объекта", который представляет собой файл, дополненный метаданными и уникальным ключом (имя объекта).

Объекты в S3 хранятся в контейнерах, называемых "бакетами". Бакет — это своего рода директория на уровне S3, которая может содержать неограниченное количество объектов. Однако, в отличие от традиционных файловых систем, S3 не поддерживает вложенные структуры директорий. Тем не менее, мы можем эмулировать такие структуры, используя иерархические ключи объектов, например images/2024/08/photo.jpg.

Одним из основных преимуществ S3 является его масштабируемость. Вы можете хранить неограниченное количество данных без необходимости заботиться о расширении или управлении хранилищем. Оно автоматически масштабирует инфраструктуру под ваши потребности, что делает его идеальным для приложений с переменной нагрузкой.

Кроме того, S3 предлагает высокую доступность и надежность. Данные, хранимые в S3, автоматически реплицируются в нескольких дата-центрах, что значительно снижает вероятность их потери. По умолчанию S3 предоставляет 99,999999999% (11 девяток) надежности хранения данных. Это означает, что риск потерять данные практически отсутствует.

Другим важным аспектом является гибкость управления доступом к данным. S3 предоставляет механизмы контроля доступа, начиная от простых политик на уровне бакетов и заканчивая сложными конфигурациями на основе ролей и политик IAM (Identity and Access Management). Это позволяет точно контролировать, кто и какие действия может выполнять с данными, что особенно важно в контексте безопасности веб-приложений.

Как упоминалось ранее, основными элементами в S3 являются бакеты и объекты. Вот ключевые концепции, которые важно понимать:

  1. Бакеты: Это контейнеры верхнего уровня, в которых хранятся все объекты. У каждого бакета есть уникальное имя в глобальном пространстве имен S3. Это означает, что два различных аккаунта не могут создать бакеты с одинаковыми именами.
  2. Объекты: Это единицы хранения данных в S3. Каждый объект состоит из данных (файла), метаданных (информация о файле), и уникального идентификатора (ключа). Ключи объектов могут включать символы /, что позволяет эмулировать структуру папок.
  3. Права доступа: Управление доступом к данным осуществляется с помощью политик, которые можно применять как на уровне бакетов, так и на уровне отдельных объектов. Эти политики определяют, кто и что может делать с данными, будь то загрузка, скачивание, или удаление объектов.

Обзор библиотеки aioboto3 для асинхронного доступа к S3

В мире современных веб-приложений, которые требуют высокой производительности и быстрой обработки запросов, асинхронное программирование становится неотъемлемой частью разработки. В этом контексте работа с удаленными сервисами, такими как S3, представляет собой серьезный вызов, поскольку традиционные библиотеки синхронного доступа могут блокировать поток выполнения, замедляя работу всего приложения. Именно здесь на сцену выходит aioboto3 — библиотека, которая позволяет интегрировать S3 в асинхронные приложения, такие как FastAPI, обеспечивая высокую эффективность и отзывчивость.

aioboto3 — это асинхронная версия широко известной библиотеки boto3, которая является стандартным SDK для работы с AWS. Основная цель aioboto3 — предоставить разработчикам возможность взаимодействовать с сервисами S3, в асинхронном режиме, что особенно важно для приложений, построенных на таких фреймворках, как FastAPI или Aiohttp.

Когда мы говорим о "асинхронном доступе", мы имеем в виду возможность выполнения операций ввода-вывода (например, загрузка файла в S3) без блокировки основного потока выполнения программы. Это позволяет серверу обрабатывать другие запросы, пока текущая операция еще выполняется, что существенно повышает общую производительность системы.

FastAPI — это асинхронный фреймворк, который был создан для максимально эффективной обработки запросов благодаря использованию Python'а асинхронных возможностей (asyncio). Однако, если использовать синхронные библиотеки для выполнения задач, это может привести к блокировке основного потока и снизить производительность приложения. Именно поэтому использование асинхронных библиотек, таких как aioboto3, является важным.

aioboto3 позволяет вам выполнять операции, такие как загрузка, скачивание и удаление файлов в S3, в асинхронном режиме, что в сочетании с FastAPI дает возможность максимально эффективно использовать ресурсы сервера. Это особенно важно в приложениях с высокой нагрузкой или при работе с большими файлами, где время выполнения операций может быть значительным.

Одним из ключевых аспектов использования aioboto3 является его простая интеграция в существующий код. Если вы уже работали с boto3, переход на aioboto3 не потребует значительных усилий, так как обе библиотеки имеют очень схожий интерфейс. Основное различие заключается в том, что все вызовы методов, которые были синхронными в boto3, становятся асинхронными в aioboto3. Например, методы upload_file, download_file и put_object должны вызываться с ключевым словом await, что указывает на асинхронное выполнение.

Кроме того, aioboto3 предоставляет удобный контекстный менеджер для работы с ресурсами S3. Это позволяет автоматически управлять временем жизни клиента, что особенно полезно в приложениях, где создание клиента может быть дорогим по времени. С помощью aioboto3 вы можете эффективно управлять подключениями к S3, минимизируя накладные расходы на создание новых подключений.

Безопасность данных: IAM и политика доступа

Безопасность данных является одной из ключевых задач в разработке веб-приложений, особенно когда речь идет о работе с облачными хранилищами. В данной главе мы рассмотрим, как обеспечить безопасность данных в S3, используя политики управления доступом (IAM) и практические рекомендации по защите данных. Также мы подробно рассмотрим, как настроить локальное окружение с использованием MinIO для разработки и тестирования, что позволит вам безопасно разрабатывать и тестировать интеграцию с S3 без необходимости использования реального аккаунта.

Основы безопасности данных в S3:

S3 предоставляет разработчикам средства управления доступом к данным. Одним из ключевых компонентов является IAM (Identity and Access Management), который позволяет настраивать тонкоуровневый доступ к ресурсам.

Основная концепция безопасности в S3 основывается на следующих компонентах:

  1. Политики доступа на уровне бакетов: Эти политики определяют, кто и какие действия может выполнять с объектами внутри бакета. Политики могут быть настроены как для всех объектов в бакете, так и для отдельных объектов.
  2. IAM роли и политики: IAM предоставляет возможность создавать роли и назначать им политики, которые определяют права доступа. Например, вы можете создать роль, которая будет иметь доступ только на чтение данных из определенного бакета, и назначить эту роль вашему приложению.
  3. ACL (Access Control Lists): Это механизм более низкого уровня, который позволяет управлять доступом к каждому отдельному объекту внутри бакета. ACL позволяет точно настроить, кто может читать или записывать данные для конкретного объекта.
  4. Шифрование данных: S3 поддерживает как шифрование на стороне сервера (SSE), так и шифрование на стороне клиента (CSE). Это позволяет шифровать данные как при хранении, так и при передаче, обеспечивая их защиту от несанкционированного доступа.

Настройка MinIO для локальной разработки и тестирования:

Для разработки и тестирования взаимодействия с S3, часто бывает полезно развернуть локальную среду, которая эмулирует работу с облачным хранилищем. MinIO — это высокопроизводительное объектное хранилище, полностью совместимое с API S3. Оно идеально подходит для создания локальной тестовой среды.

Для начала, давайте рассмотрим, как запустить MinIO и NGINX на локальном хосте, а также как настроить локальные доменные имена для эмуляции AWS S3.

Чтобы ваше приложение могло обращаться к MinIO как к локальному аналогу S3, необходимо настроить локальные доменные имена. Это можно сделать с помощью скрипта add_hosts.sh, который добавляет необходимые записи в файл /etc/hosts:

#!/bin/bash

HOSTS_FILE="/etc/hosts"
ENTRIES=(
    "127.0.0.1 www.localhost"
    "127.0.0.1 aws.localhost"
    "127.0.0.1 console.localhost"
)

# Проверка и добавление каждой записи
for ENTRY in "${ENTRIES[@]}"; do
    if grep -qF "$ENTRY" "$HOSTS_FILE"; then
        echo "Запись '$ENTRY' уже существует в файле hosts."
    else
        echo "$ENTRY" | sudo tee -a "$HOSTS_FILE" > /dev/null
        echo "Запись '$ENTRY' успешно добавлена в файл hosts."
    fi
done

Этот скрипт добавляет локальные доменные имена, которые будут использоваться для обращения к MinIO через NGINX. Это полезно для тестирования функционала, где нужно работать с доменными именами, подобными реальным условиям.

запуск скрипта в проекте

sudo infra/add_hosts.sh

Запуск NGINX и MinIO с помощью Docker Compose:

Теперь давайте рассмотрим, как настроить и запустить MinIO и NGINX с помощью Docker Compose. Это позволяет быстро развернуть необходимые сервисы на локальном хосте:

services:

  nginx:
    image: nginx:latest
    container_name: main_nginx
    volumes:
      - ./:/etc/nginx/templates/
      - ../../backend/src/static:/var/html/static/:ro
    ports:
      - '80:80'
    network_mode: host

  minio:
    image: minio/minio:latest
    container_name: main_minio
    restart: always
    command: server --console-address ":9001" /data
    entrypoint: |
      sh -c "
      minio server --console-address \":9001\" /data & 
      MINIO_PID=\$! && 
      sleep 5 && 
      /bin/sh /usr/bin/create_buckets.sh && 
      wait \$MINIO_PID
      "
    ports:
      - '9000:9000'
      - '9001:9001'
    volumes:
      - minio_volume:/data
      - ../create_buckets.sh:/usr/bin/create_buckets.sh
    env_file:
      - ../../.env

volumes:
  minio_volume:

Этот docker-compose.yml файл разворачивает два контейнера:

  • NGINX: используется для управления запросами к MinIO и эмуляции поведения реального S3.
  • MinIO: это сам объектный стор, который будет обрабатывать запросы, аналогично S3.

Контейнер MinIO настроен таким образом, чтобы при запуске автоматически создавать бакеты с помощью скрипта create_buckets.sh.

#!/bin/sh

# Список бакетов
BUCKETS=()
while IFS='=' read -r name value; do
    if [ "${name#*_BUCKET}" != "$name" ]; then
        BUCKETS+=("$value")
    fi
done < <(printenv)

# Ожидание запуска MinIO
while ! curl -s http://localhost:9000/minio/health/live; do
    sleep 1
done

# Установка алиаса для MinIO с указанием региона
mc alias set --region ${MINIO_REGION_NAME} myminio http://localhost:9000 ${MINIO_ROOT_USER} ${MINIO_ROOT_PASSWORD}

# Установка алиаса для MinIO
mc alias set myminio http://localhost:9000 ${MINIO_ROOT_USER} ${MINIO_ROOT_PASSWORD}

# Создание бакетов и установка анонимных политик доступа
for BUCKET in "${BUCKETS[@]}"; do
    if ! mc ls myminio/${BUCKET} > /dev/null 2>&1; then
        mc mb myminio/${BUCKET}
        mc anonymous set download myminio/${BUCKET}
        echo "Bucket ${BUCKET} created."
    else
        echo "Bucket ${BUCKET} already exists."
    fi
done

Этот скрипт выполняет следующие задачи:

  1. Чтение переменных окружения для определения списка бакетов, которые необходимо создать. Переменные должны оканчиваться на _BUCKET
  2. Ожидание готовности MinIO к приему запросов.
  3. Создание бакетов и настройка анонимного доступа. Скрипт создает бакеты и автоматически настраивает доступ, что позволяет мгновенно начать использовать их в приложении.

Настройка NGINX для работы с MinIO:

NGINX — это веб-сервер, который может быть использован в качестве реверс-прокси для маршрутизации запросов как к вашему FastAPI-приложению, так и к MinIO. Эта конфигурация позволяет вам создавать безопасную и масштабируемую среду для разработки, где MinIO эмулирует VK Cloud Storage S3, а NGINX управляет всеми запросами, обеспечивая балансировку нагрузки и улучшенное управление соединениями.

Приведенный ниже конфигурационный файл NGINX позволяет гибко настроить три различных сервера:

  1. Основной сервер, который маршрутизирует запросы к вашему FastAPI-приложению.
  2. MinIO сервер, который отвечает за обработку запросов, связанных с хранилищем данных, аналогично S3.
  3. MinIO Console сервер, который предоставляет доступ к консоли управления MinIO через веб-интерфейс.
access_log /var/log/nginx/access.log;
error_log /var/log/nginx/error.log;

upstream backend {
    server 127.0.0.1:8100;
}
upstream minio {
    server 127.0.0.1:9000;
}
upstream minio_console {
    server 127.0.0.1:9001;
}

# Основной сервер
server {
    server_name www.localhost;
    listen 80;
    listen [::]:80;

    client_max_body_size    50M;
    ignore_invalid_headers  off;
    proxy_buffering         off;
    proxy_request_buffering off;

    location / {
        proxy_pass http://backend;

        proxy_set_header Host              $host;
        proxy_set_header X-NginX-Proxy     true;
        proxy_set_header X-Real-IP         $remote_addr;
        proxy_set_header X-Forwarded-For   $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
    }

    location /static/ {
        autoindex on;
        alias     /var/html/static/;
    }
}

# MinIO сервер
server {
    server_name aws.localhost;
    listen 80;
    listen [::]:80;

    client_max_body_size    50M;
    ignore_invalid_headers  off;
    proxy_buffering         off;
    proxy_request_buffering off;

    location / {
        proxy_set_header Host              $host;
        proxy_set_header X-Real-IP         $remote_addr;
        proxy_set_header X-Forwarded-For   $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
        proxy_set_header Authorization     $http_authorization;

        proxy_connect_timeout              300;
        proxy_http_version                 1.1;
        proxy_set_header Connection        "keep-alive";
        chunked_transfer_encoding          off;

        proxy_pass http://minio;
    }
}

# MinIO Console сервер
server {
    server_name console.localhost;
    listen 80;
    listen [::]:80;

    client_max_body_size    50M;
    ignore_invalid_headers  off;
    proxy_buffering         off;
    proxy_request_buffering off;

    location / {
        proxy_set_header Host              $host;
        proxy_set_header X-Real-IP         $remote_addr;
        proxy_set_header X-Forwarded-For   $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
        proxy_set_header X-NginX-Proxy     true;
        real_ip_header X-Real-IP;

        proxy_connect_timeout              300;
        proxy_http_version                 1.1;
        proxy_set_header Upgrade           $http_upgrade;
        proxy_set_header Connection        "upgrade";
        chunked_transfer_encoding          off;

        proxy_pass http://minio_console;
    }
}
  1. Upstream блоки (backend, minio, minio_console): Эти блоки определяют серверы, к которым NGINX будет перенаправлять запросы. backend отвечает за маршрутизацию запросов к вашему FastAPI приложению, а minio и minio_console — за запросы к MinIO.
  2. Основной сервер (www.localhost): Этот сервер обслуживает веб-приложение, принимая все запросы, которые не связаны с MinIO. Он настроен на прослушивание порта 80 и поддерживает до 50МБ данных для загрузки. Это может быть полезно для загрузки больших файлов через API вашего приложения.
  3. MinIO сервер (aws.localhost): Этот сервер эмулирует работу Amazon S3, перенаправляя запросы на MinIO. Здесь установлены специальные заголовки, такие как Authorization, которые необходимы для авторизации запросов к S3.
  4. MinIO Console сервер (console.localhost): Этот сервер предоставляет доступ к консоли управления MinIO, через которую можно управлять бакетами, загружать и скачивать файлы, а также просматривать статус MinIO. Дополнительные настройки, такие как поддержка Upgrade и Connection: upgrade, необходимы для обеспечения корректной работы вебсокетов в консоли MinIO.

Реализация менеджера S3

Создание клиента: конфигурация и инициализация

В первой части статьи мы уже обсуждали, как настроить и организовать хранение  переменных окружения, не будем на этом заострять внимание.

Класс S3AsyncClient, представленный ниже, демонстрирует, как можно создать клиент для взаимодействия с S3, используя aioboto3. Этот класс включает конфигурацию сессии с использованием переменных окружения и предоставляет асинхронный интерфейс для работы с S3.

import io
import mimetypes
import os
from typing import List

import aioboto3
import aiohttp
import filetype
from botocore.exceptions import ClientError
from fastapi import HTTPException, UploadFile
from PIL import Image, UnidentifiedImageError
from src.conf import settings

Основные зависимости включают aioboto3 для работы с S3, aiohttp для обработки HTTP-запросов, filetype для определения типов файлов, и PIL (Pillow) для работы с изображениями. Эти библиотеки помогают реализовать функционал работы с файлами и изображениями в асинхронном режиме.

class S3AsyncClient:
    _cached_session = None

    def __init__(self):
        self.endpoint_domain = settings.MINIO_DOMAIN
        self.use_ssl = settings.MINIO_USE_SSL
        if S3AsyncClient._cached_session is None:
            S3AsyncClient._cached_session = aioboto3.Session(
                aws_access_key_id=settings.MINIO_ACCESS_KEY,
                aws_secret_access_key=settings.MINIO_SECRET_KEY,
                region_name=settings.MINIO_REGION_NAME
            )
        self.endpoint_url = self._get_endpoint_url()

В конструкторе класса инициализируется сессия для работы с S3, используя aioboto3.Session. Эта сессия кэшируется, что предотвращает создание нового объекта сессии при каждом запросе, что оптимизирует использование ресурсов.

  • self.endpoint_domain и self.use_ssl: Эти параметры конфигурации загружаются из переменных окружения и определяют домен MinIO/S3 и необходимость использования SSL.
  • _get_endpoint_url: Этот метод формирует корректный URL для подключения к S3/MinIO, основываясь на домене и использовании SSL.
    @property
    def client(self):
        return S3AsyncClient._cached_session.client(
            "s3",
            endpoint_url=self.endpoint_url,
            use_ssl=self.use_ssl
        )

Метод client предоставляет доступ к объекту клиента S3, который используется для выполнения операций с хранилищем. Здесь также происходит подключение к S3 с использованием кэшированной сессии.

    def _get_endpoint_url(self) -> str:
        if self.endpoint_domain.startswith("http"):
            raise ValueError("settings.MINIO_DOMAIN should not start with 'http' or 'https'. Please use just the domain name.")

        protocol = "https" if self.use_ssl else "http"
        return f"{protocol}://{self.endpoint_domain}"

Метод _get_endpoint_url формирует корректный URL для взаимодействия с хранилищем. Важно отметить, что домен не должен начинаться с http или https, чтобы избежать конфликта в формировании URL.

    async def __aenter__(self):
        self.s3_client = await self.client.__aenter__()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.s3_client.__aexit__(exc_type, exc_val, exc_tb)

Методы __aenter__ и __aexit__ позволяют использовать S3AsyncClient в асинхронном контексте с использованием async with, что упрощает управление временем жизни клиента, гарантируя, что все ресурсы будут освобождены корректно после завершения работы.

Разработка менеджера для работы с S3-хранилищем

Работа с S3-хранилищем подразумевает выполнение множества операций, таких как загрузка, скачивание, удаление и обновление файлов. Для упрощения этих задач и создания удобного интерфейса для взаимодействия с хранилищем, используется менеджер, который инкапсулирует все эти операции в одном классе.

Менеджер S3StorageManager наследуется от класса S3AsyncClient, описанного ранее, и добавляет функционал, необходимый для управления файлами в S3. Этот класс предоставляет методы для работы с файлами, включая загрузку, скачивание, удаление и обновление, а также генерацию URL для доступа к объектам.

class S3StorageManager(S3AsyncClient):
    bucket_name = None
    default_acl = None
    custom_domain = None
    use_ssl = False

    def __new__(cls, *args, **kwargs):
        instance = super().__new__(cls)
        cls._validate_class_attributes()
        return instance

    @classmethod
    def _validate_class_attributes(cls):
        required_attributes = ("bucket_name", "default_acl")

        for attr in required_attributes:
            value = getattr(cls, attr)
            if value is None:
                raise ValueError(f"The '{attr}' class attribute is required and cannot be None.")

Атрибуты класса:

  • bucket_name: Имя бакета, в который будут загружаться файлы.
  • default_acl: Контроль доступа по умолчанию для загружаемых объектов (например, public-read).
  • custom_domain: Пользовательский домен для генерации URL (если используется).
  • use_ssl: Флаг использования SSL для взаимодействия с S3.

Не стал описывать все методы здесь, их можно посмотреть в GItHub репозитории по ссылке >>>. Но вкратце реализовано:

Методы класса:

  1. __new__ и _validate_class_attributes: Эти методы обеспечивают проверку обязательных атрибутов класса при создании экземпляра менеджера. Если какой-либо из обязательных атрибутов не задан, будет выброшено исключение ValueError.
  2. _prepare_path: Метод добавляет / в конец пути, если его нет, чтобы гарантировать корректную работу с путями в S3.
  3. _read_file: Метод асинхронно читает содержимое файла, загруженного через FastAPI, и возвращает его содержимое вместе с именем файла.
  4. _generate_unique_key: Этот метод генерирует уникальный ключ для файла, который будет использоваться при загрузке в S3. Если файл с таким именем уже существует, к имени файла добавляется счетчик, чтобы избежать конфликтов.
  5. _convert_to_webp: Метод конвертирует изображение в формат WebP, чтобы уменьшить размер файла и улучшить производительность при загрузке изображений.
  6. _check_and_delete_object: Метод проверяет наличие объекта в S3 и удаляет его, если он существует.
  7. generate_url: Метод генерирует URL для доступа к объекту в S3. Если объект имеет public-read доступ, то возвращается прямой URL. В противном случае генерируется presigned URL с заданным временем истечения.
  8. list_objects: Метод возвращает список объектов в указанной директории бакета. Фильтрация по типу файлов (изображения или не изображения) также поддерживается.
  9. put_object: Метод загружает файл в S3. Если тип файла — изображение, он конвертируется в формат WebP перед загрузкой.
  10. update_object: Метод обновляет файл в S3, сначала удаляя старый объект, а затем загружая новый.
  11. generate_upload_url и generate_update_url: Эти методы генерируют presigned URL для загрузки нового файла или обновления существующего.
  12. delete_object: Метод удаляет объект из S3 по его ключу.
  13. download_file_by_url: Метод скачивает файл из S3 по указанному URL.
  14. collect_and_upload_static: Этот метод собирает и загружает все статические файлы из указанной директории в S3. Полезен для автоматической загрузки ресурсов, таких как CSS, JS, и изображения.

Использование менеджера S3StorageManager предоставляет несколько ключевых преимуществ:

  1. Централизованное управление файлами: Все операции с файлами (загрузка, удаление, обновление) сосредоточены в одном классе, что упрощает поддержку и тестирование кода.
  2. Асинхронная обработка: Поддержка асинхронных операций позволяет эффективно использовать ресурсы сервера, особенно при работе с большими объемами данных.
  3. Гибкость и расширяемость: Благодаря использованию наследования и тщательно продуманной архитектуры, менеджер легко адаптируется к изменяющимся требованиям, таким как изменение политики доступа или добавление новых типов файлов.
  4. Удобство интеграции: Предоставляя методы для генерации URL и работы с объектами, менеджер легко интегрируется в другие части вашего приложения, такие как API для работы с изображениями или системами хранения данных.

Кроме того, наследуясь от менеджера S3StorageManager, можно создавать разделенные хранилища в S3 для различных частей приложения, например, для медиафайлов.

from src.conf import settings
from src.conf.s3_client import S3StorageManager

class MediaStorage(S3StorageManager):
    bucket_name = settings.MINIO_MEDIA_BUCKET
    default_acl = 'public-read'
    
media_storage = MediaStorage()

Он использует отдельный бакет, определенный в настройках приложения, и имеет доступ по умолчанию public-read, что делает файлы общедоступными после загрузки.

Логирование и обработка ошибок при работе с S3

Логирование позволяет отслеживать выполнение операций в приложении, фиксируя ключевые события, такие как успешные загрузки, удаления файлов, а также возникающие ошибки. Это значительно облегчает отладку, мониторинг и поддержание приложения.

В Python логирование обычно выполняется с использованием стандартной библиотеки logging, которая предоставляет гибкий интерфейс для записи логов в различные источники: файлы, консоли, внешние системы мониторинга.

import logging

class LoggerNameFilter(logging.Filter):
    def filter(self, record):
        parts = record.name.split('.')
        if len(parts) > 2:
            record.name = '.'.join(parts[2:])
        return True

def setup_logging():
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(levelname)s - %(name)s - %(message)s',
        datefmt='%Y-%m-%d %H:%M:%S'
    )
    logger = logging.getLogger()
    logger.addFilter(LoggerNameFilter())
    return logger


logger = setup_logging()
  • Класс LoggerNameFilter: Этот фильтр модифицирует имя логгера в записях лога, удаляя первые два сегмента имени. Это полезно, если вы хотите сократить отображаемое имя логгера, чтобы сделать логи более читабельными.
  • Функция setup_logging: Настраивает базовый логгер с уровнем логирования INFO или DEBUG.  Формат лог-сообщений включает временную метку, уровень логирования, имя логгера и само сообщение.
  • Форматирование логов: В формате сообщения используются следующие элементы:
    • %(asctime)s: Временная метка в формате ГГГГ-ММ-ДД ЧЧ:ММ:СС.
    • %(levelname)s: Уровень логирования (INFO, ERROR, и т. д.).
    • %(name)s: Имя логгера.
    • %(message)s: Текст самого сообщения.
  • Применение фильтра: Фильтр LoggerNameFilter добавляется к логгеру, чтобы автоматически сокращать имена логгеров в записях лога, делая их более понятными.

Теперь давайте интегрируем логирование в методы S3StorageManager, чтобы отслеживать важные операции с файлами.

async def put_object(self, file: UploadFile, path: str = "", file_type: str = "image") -> str:
    logger.info("Начало загрузки файла: %s", file.filename)
    
    async with self.client as s3_client:
        try:
            file_content, file_name = await self._read_file(file)
            path = self._prepare_path(path)
            
            kind = filetype.guess(file_content)
            is_image = kind is not None and kind.mime.startswith("image")
            
            if file_type == "image":
                if not is_image:
                    logger.error("Файл %s не является допустимым изображением", file.filename)
                    raise HTTPException(status_code=400, detail="Invalid image file")
                file_content, content_type = self._convert_to_webp(file_content)
                file_name = os.path.splitext(file_name)[0] + ".webp"
            else:
                content_type = kind.mime if kind else 'application/octet-stream'

            key = await self._generate_unique_key(s3_client, path + file_name)
            await s3_client.put_object(
                Bucket=self.bucket_name,
                Key=key,
                Body=file_content,
                ContentType=content_type,
                ACL=self.default_acl
            )
            
            logger.info("Файл успешно загружен в S3: %s", key)
            return key
        
        except ClientError as e:
            logger.error("Ошибка при загрузке файла %s: %s", file.filename, e)
            raise HTTPException(status_code=500, detail="Error uploading file to S3") from e

Логирование успехов и ошибок:

  • Начало операции: Логируем факт начала загрузки файла.
  • Проверка типа файла: Если файл не является допустимым изображением, логируем ошибку и выбрасываем исключение.
  • Успешная загрузка: Логируем успешное завершение загрузки файла в S3.
  • Обработка ошибок: Если возникает ошибка при взаимодействии с S3, логируем её и выбрасываем HTTP-исключение.

Корректная обработка ошибок при работе с S3 имеет первостепенное значение, поскольку сбои при взаимодействии с удалёнными сервисами могут возникать по множеству причин: проблемы с сетью, ошибки авторизации, нехватка прав доступа и т.д. Важно, чтобы такие ситуации не приводили к краху всего приложения и чтобы пользователи получали понятные сообщения об ошибках.

async def delete_object(self, key: str) -> None:
    logger.info("Удаление объекта из S3: %s", key)
    
    async with self.client as s3_client:
        try:
            await s3_client.delete_object(Bucket=self.bucket_name, Key=key)
            logger.info("Объект успешно удален: %s", key)
        except ClientError as e:
            error_code = e.response['Error']['Code']
            if error_code != 'NoSuchKey':
                logger.error("Ошибка при удалении объекта %s: %s", key, e)
                raise HTTPException(status_code=500, detail="Error deleting file from S3") from e
            else:
                logger.warning("Объект не найден в S3 при попытке удаления: %s", key)
  • Удаление объекта: Логируется как успешное удаление объекта, так и возможные ошибки.
  • Исключения: Если объект не найден, логируем предупреждение и не выбрасываем ошибку, так как это может быть допустимой ситуацией. Для других ошибок логируем их и выбрасываем исключение.

ClientError — это основное исключение, которое возникает при проблемах взаимодействия с S3 через boto3 или aioboto3. Важно внимательно проверять код ошибки и решать, как с ней справляться:

  • 403 Forbidden: Недостаточно прав для выполнения операции.
  • 404 Not Found: Объект не найден, что может быть допустимо, если он должен был быть удалён ранее.
  • 500 Internal Server Error: Ошибка на стороне S3, которая может потребовать повторной попытки или уведомления пользователя.

CRUD для изображений

Модель данных для хранения изображений: связь с пользователем

При разработке веб-приложений, где пользователи могут загружать и управлять изображениями, важно не только правильно организовать хранение этих изображений, но и связать их с соответствующими данными в базе данных. Это необходимо для того, чтобы обеспечивать корректное отображение, управление и доступ к файлам изображений в зависимости от пользователя.

Модель данных для хранения изображений должна учитывать несколько ключевых аспектов:

  1. Сохранение пути к файлу: Путь к файлу должен храниться в базе данных, чтобы можно было легко найти изображение, связанное с конкретной записью.
  2. Возможность указания основного изображения: Например, у пользователя может быть несколько изображений, но одно из них является основным (аватаром).
  3. Связь с пользователем: Модель изображения должна быть связана с пользователем, чтобы обеспечить индивидуальный доступ к изображениям и возможность управлять ими.
from typing import TYPE_CHECKING

from sqlalchemy import Boolean
from sqlalchemy.orm import Mapped, mapped_column, relationship
from src.conf import media_storage
from src.models.base_model import Base
from src.models.interim_tables import user_image_association
from src.models.sql_decorator import FilePath

if TYPE_CHECKING:
    from src.conf import S3StorageManager


class Image(Base):
    __tablename__ = "image"
    _file_storage = media_storage

    file: Mapped[str] = mapped_column(FilePath(_file_storage), nullable=True)
    is_main: Mapped[bool] = mapped_column(Boolean, default=False, nullable=False)

    users = relationship("User", secondary=user_image_association, back_populates="image_files")

    def __repr__(self):
        return f"<Image(id={self.id}, file={self.file}, is_main={self.is_main})>"

    @property
    def storage(self) -> "S3StorageManager":
        """Возвращает объект storage, чтобы использовать его методы напрямую."""
        return self._file_storage
  • file: Это строковое поле, которое хранит путь к файлу изображения. Для его обработки используется кастомный декоратор FilePath, который связывает путь с менеджером хранения (например, S3).
  • is_main: Булево поле, которое указывает, является ли данное изображение основным. Это полезно для выбора основного изображения среди множества загруженных пользователем файлов.
  • users: Связь многие-ко-многим с моделью User, что позволяет пользователю иметь несколько изображений. Для этого используется промежуточная таблица user_image_association.
  • Метод storage: Возвращает объект S3StorageManager, который позволяет напрямую использовать методы для работы с хранилищем.

Модель пользователя также должна быть модифицирована, чтобы поддерживать связь с загруженными изображениями.

from fastapi_users.db import SQLAlchemyBaseUserTable
from sqlalchemy import String
from sqlalchemy.orm import Mapped, mapped_column, relationship
from src.models.base_model import Base
from src.models.interim_tables import user_image_association


class User(SQLAlchemyBaseUserTable, Base):
    first_name: Mapped[str] = mapped_column(String, nullable=True)
    last_name: Mapped[str] = mapped_column(String, nullable=True)

    image_files = relationship("Image", secondary=user_image_association, back_populates="users")

    def __repr__(self):
        return f"<User(id={self.id}, first_name={self.first_name}, last_name={self.last_name})>"
  • first_name и last_name: Эти поля хранят имя и фамилию пользователя.
  • image_files: Связь многие-ко-многим с моделью Image. Это позволяет пользователю иметь несколько связанных изображений, что важно для управления галереей изображений или аватарами.

Для того чтобы реализовать связь многие-ко-многим между пользователем и изображениями, используется промежуточная таблица.

from sqlalchemy import Column, ForeignKey, Table
from sqlalchemy.dialects.postgresql import UUID as UUIDType
from src.models.base_model import Base

user_image_association = Table(
    'user_image_association', Base.metadata,
    Column('user_id', UUIDType(as_uuid=True), ForeignKey('user.id'), primary_key=True),
    Column('image_id', UUIDType(as_uuid=True), ForeignKey('image.id'), primary_key=True)
)

Эта таблица связывает пользователей с изображениями, создавая тем самым связь многие-ко-многим. Каждое изображение может быть связано с несколькими пользователями (например, для совместных альбомов), и каждый пользователь может иметь несколько изображений.

Оптимальные практики работы с изображениями

  1. Сжатие и оптимизация изображений: При загрузке изображений важно обеспечить их сжатие и оптимизацию для сокращения размера файлов и улучшения производительности приложения. Это может включать в себя изменение размеров изображений, конвертацию в форматы сжатия (например, WebP) и удаление метаданных.
  2. Кеширование изображений: Для улучшения времени загрузки страниц и снижения нагрузки на серверы S3, можно использовать кеширование изображений на стороне клиента и CDN (Content Delivery Network).
  3. Безопасное управление доступом: Убедитесь, что изображения, хранящиеся в S3, имеют правильные права доступа. Например, личные изображения должны быть защищены от несанкционированного доступа, в то время как публичные изображения могут быть доступны всем пользователям.
  4. Обработка исключений и ошибок: Обязательно обрабатывайте возможные ошибки, связанные с загрузкой и удалением изображений, такие как проблемы с сетью, ошибки авторизации и недоступность S3.

Реализация CRUD операций для модели изображения

При работе с изображениями в веб-приложениях на базе FastAPI важно реализовать функциональные и эффективные CRUD-операции, которые будут управлять загрузкой, просмотром, обновлением и удалением изображений.

CRUD (Create, Read, Update, Delete) — это стандартный набор операций для управления ресурсами в приложениях. В контексте работы с изображениями это означает:

  • Create (Создание): Загрузка и сохранение нового изображения в хранилище (например, S3), а также запись пути к файлу в базу данных.
  • Read (Чтение): Получение информации об изображении и его URL для отображения пользователю.
  • Update (Обновление): Замена существующего изображения новым и обновление записи в базе данных.
  • Delete (Удаление): Удаление изображения как из хранилища, так и из базы данных.

ImageDAO (Data Access Object) — это класс, который предоставляет интерфейс для взаимодействия с моделью Image через асинхронные операции, используя SQLAlchemy и FastAPI. Этот класс наследуется от GenericCRUD, что позволяет повторно использовать общий CRUD-функционал, дополняя его специфичными для работы с изображениями методами.

class ImageDAO(GenericCRUD[Image, ImageCreate, ImageUpdate]):

    async def _reset_is_main(self, model_name: str, model_instance: Base, association_table_name: str, db_session: AsyncSession):
        """Сбрасывает значение is_main для всех изображений, связанных с моделью."""
        association_table = Table(association_table_name, Base.metadata, autoload_with=db_session.bind)
        image_alias = aliased(Image)
        stmt = (
            select(image_alias.id)
            .select_from(association_table)
            .join(image_alias, association_table.c["image_id"] == image_alias.id)
            .where(association_table.c[f"{model_name}_id"] == model_instance.id)
            .where(image_alias.is_main == True)  # noqa: E712
        )
        result = await db_session.execute(stmt)
        image_ids = [row[0] for row in result.fetchall()]
        if image_ids:
            stmt = (
                update(Image)
                .where(Image.id.in_(image_ids))
                .values(is_main=False)
            )
            await db_session.execute(stmt)
            await db_session.commit()

Метод _reset_is_main: Этот метод сбрасывает значение is_main для всех изображений, связанных с конкретной моделью, например, пользователем. Это нужно для того, чтобы установить новое основное изображение, сбрасывая статус у предыдущих изображений.

    async def _get_image_url(self, db_obj: Image) -> ImageDAOResponse:
        """Получает URL к экземпляру Image."""
        url = await db_obj.storage.generate_url(db_obj.file)
        return ImageDAOResponse(image=db_obj, url=url)

Метод _get_image_url: Этот метод генерирует URL для доступа к изображению, используя методы хранилища, например, S3. Он возвращает объект ImageDAOResponse, который содержит как информацию о модели, так и URL для доступа.

При реализации связи многие-ко-многим между моделями User и Image, используется промежуточная таблица user_image_association. Эта таблица обеспечивает связь между пользователями и их изображениями.

async def create_with_file(
    self, *, file: UploadFile, is_main: bool, model_instance: Type[Base], path: str = "", db_session: AsyncSession | None = None
) -> Image | None:
    model_name, association_table_name = await self._check_association_table(
        model_instance=model_instance,
        related_model=self.model,
        db_session=db_session
    )

    if is_main:
        await self._reset_is_main(model_name, model_instance, association_table_name, db_session)

    db_obj = self.model(is_main=is_main)
    db_obj.file = await db_obj.storage.put_object(file, path)
    db_session.add(db_obj)
    await db_session.flush()

    association_table = Table(association_table_name, Base.metadata, autoload_with=db_session.bind)
    stmt = association_table.insert().values(**{f"{model_name}_id": model_instance.id, "image_id": db_obj.id})
    await db_session.execute(stmt)
    await db_session.commit()
    await db_session.refresh(db_obj)

    return db_obj

Метод create_with_file: Этот метод выполняет создание новой записи изображения, загрузку файла в хранилище (например, S3), и добавление записи в промежуточную таблицу для связи с пользователем.

  • Проверка и создание связи: Метод проверяет наличие связи через _check_association_table, чтобы убедиться, что промежуточная таблица существует.
  • Установка основного изображения: Если изображение является основным, все предыдущие изображения этого пользователя будут сброшены (метод _reset_is_main).
  • Добавление в базу данных: После успешной загрузки файла и создания записи в промежуточной таблице, данные сохраняются в базе данных.

Класс ImageDAO наследуется от GenericCRUD, который предоставляет общие методы для работы с базой данных:

  • get: Получение одной записи по ID.
  • get_by_ids: Получение нескольких записей по списку ID.
  • create: Создание новой записи.
  • update: Обновление существующей записи.
  • remove: Удаление записи по ID.

Этот подход позволяет вам избежать дублирования кода и использовать уже реализованные методы для стандартных операций, расширяя и дополняя их в ImageDAO специфичными функциями для работы с изображениями.

Для примера были рассмотрены несколько методов класса ImageDAO, остальные вы можете найти в репозитории проекта >>>

Интеграция с S3: загрузка и хранение изображений

Когда пользователь загружает изображение через ваше приложение, оно должно быть отправлено на сервер, обработано и сохранено в удаленном хранилище, таком как S3. Процесс обычно включает следующие шаги:

  1. Загрузка файла: Получение файла от пользователя через API.
  2. Сохранение в S3: Загрузка файла в S3 и сохранение ссылки на него в базе данных.
  3. Обновление файла: Замена существующего файла новым в S3 и обновление записи в базе данных.
  4. Удаление файла: Удаление файла из S3 и удаление записи из базы данных.

Для иллюстрации интеграции S3 с CRUD операциями рассмотрим пример класса UserManager, который использует image_dao для выполнения операций с изображениями.

class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
    reset_password_token_secret = settings.SECRET_KEY
    verification_token_secret = settings.SECRET_KEY
    image_path = "users"

    def __init__(self, user_db: SQLAlchemyUserDatabase, user: Optional[User] = None):
        super().__init__(user_db)
        self.user = user

    @property
    def db_session(self):
        return self.user_db.session

    async def save_user_image(self, file: UploadFile, is_main: bool) -> Image:
        image = await image_dao.create_with_file(
            file=file, is_main=is_main, model_instance=self.user, path=self.image_path, db_session=self.db_session
        )
        return image

    async def update_user_image(self, image_id: uuid.UUID, file: UploadFile, is_main: bool) -> Image:
        image = await image_dao.update_file(id=image_id, file=file, is_main=is_main, model_instance=self.user, path=self.image_path, db_session=self.db_session)
        return image

    async def delete_user_image(self, image_id: uuid.UUID) -> None:
        await image_dao.delete(
            id=image_id, model_instance=self.user, related_model=Image, db_session=self.db_session
        )

    async def save_user_image_with_upload_url(self, file_name: str, is_main: bool) -> str:
        image_dao_resp = await image_dao.create_without_file(
            is_main=is_main, file_name=file_name, model_instance=self.user, path=self.image_path, db_session=self.db_session
        )
        return image_dao_resp

    async def update_user_image_with_upload_url(self, image_id: uuid.UUID, file_name: str, is_main: bool) -> str:
        image_dao_resp = await image_dao.update_without_file(
            id=image_id, file_name=file_name, is_main=is_main, model_instance=self.user, path=self.image_path, db_session=self.db_session
        )
        return image_dao_resp
  1. save_user_image: Этот метод загружает файл изображения в S3 с помощью метода create_with_file из image_dao, который отвечает за сохранение файла в S3 и запись пути к нему в базу данных. Кроме того, метод управляет флагом is_main, указывающим на основное изображение пользователя.
  2. update_user_image: Метод обновляет существующее изображение пользователя. Он сначала удаляет старое изображение из S3, загружает новое и обновляет запись в базе данных.
  3. delete_user_image: Этот метод удаляет изображение из S3 и записи из базы данных, обеспечивая корректное удаление и освобождение ресурсов.
  4. save_user_image_with_upload_url: Метод генерирует URL для загрузки изображения в S3. Это полезно, когда вы хотите предоставить пользователю возможность загружать файлы напрямую в S3 через предподписанный URL.
  5. update_user_image_with_upload_url: Этот метод работает аналогично save_user_image_with_upload_url, но предназначен для обновления существующих изображений, используя предподписанные URL.

Преимущества использования S3 для хранения изображений

  1. Масштабируемость: S3 автоматически масштабируется, что позволяет вам хранить неограниченное количество изображений без необходимости заботиться о размере дискового пространства.
  2. Безопасность: S3 предоставляет мощные механизмы управления доступом, такие как IAM и политики бакетов, которые позволяют гибко настраивать доступ к вашим изображениям.
  3. Доступность и надежность: Благодаря встроенному механизмам резервирования и репликации, S3 обеспечивает высокий уровень доступности и надежности данных.
  4. Производительность: Использование предподписанных URL для загрузки и скачивания файлов снижает нагрузку на ваш сервер и позволяет эффективно управлять загрузкой больших файлов.

Создание роутов

Маршруты для загрузки и обработки изображений

Ключевые аспекты реализации:

  1. Загрузка изображения: Обработка загрузки изображения, его сохранение в S3 и возврат ответа клиенту с данными о загруженном изображении.
  2. Обновление изображения: Замена существующего изображения новым с обновлением данных в S3.
  3. Удаление изображения: Удаление изображения как из S3, так и из базы данных.
  4. Получение изображения: Возврат информации об изображении и его URL.

Файл users/routers.py включает в себя маршруты для управления изображениями, связанными с пользователем. Эти маршруты обрабатывают операции загрузки, обновления и удаления изображений.

from fastapi import APIRouter, Depends, File, Form, UploadFile
from pydantic import UUID4
from src.crud import UserManager, get_current_active_user_and_manager
from src.schemas.image_schema import (ImageDAOResponse, UploadImageResponse, UploadUrlImageResponse)
from starlette import status

account_router = APIRouter()

@account_router.post("/upload-image", response_model=UploadImageResponse, status_code=status.HTTP_201_CREATED)
async def upload_user_image(
    file: UploadFile = File(...),
    is_main: bool = Form(...),
    user_manager: UserManager = Depends(get_current_active_user_and_manager)
):
    image = await user_manager.save_user_image(file=file, is_main=is_main)
    return UploadImageResponse(id=image.id, file=image.file, is_main=image.is_main)

@account_router.put("/update-image/{image_id}", response_model=UploadImageResponse, status_code=status.HTTP_200_OK)
async def update_user_image(
    image_id: UUID4,
    file: UploadFile = File(...),
    is_main: bool = Form(...),
    user_manager: UserManager = Depends(get_current_active_user_and_manager)
):
    image = await user_manager.update_user_image(image_id=image_id, file=file, is_main=is_main)
    return UploadImageResponse(id=image.id, file=image.file, is_main=image.is_main)

@account_router.delete("/delete-image/{image_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_user_image(
    image_id: UUID4,
    user_manager: UserManager = Depends(get_current_active_user_and_manager)
):
    await user_manager.delete_user_image(image_id=image_id)
    return {"detail": "Image deleted successfully"}

@account_router.post("/upload-url-image", response_model=UploadUrlImageResponse, status_code=status.HTTP_201_CREATED)
async def upload_user_image_with_url(
    file_name: str = Form(...),
    is_main: bool = Form(...),
    front_key: str = Form(...),
    user_manager: UserManager = Depends(get_current_active_user_and_manager)
):
    image_dao_resp = await user_manager.save_user_image_with_upload_url(file_name, is_main=is_main)
    return UploadUrlImageResponse(image=image_dao_resp, front_key=front_key)

@account_router.post("/update-url-image/{image_id}", response_model=ImageDAOResponse, status_code=status.HTTP_201_CREATED)
async def update_user_image_with_url(
    image_id: UUID4,
    file_name: str = Form(...),
    is_main: bool = Form(...),
    user_manager: UserManager = Depends(get_current_active_user_and_manager)
):
    image_dao_resp = await user_manager.update_user_image_with_upload_url(image_id, file_name, is_main)
    return image_dao_resp
  • upload_user_image: Этот маршрут обрабатывает загрузку изображения, сохранение его в S3 и возврат данных о загруженном изображении в формате JSON.
  • update_user_image: Маршрут для обновления изображения. Замещает существующее изображение новым и возвращает обновленные данные.
  • delete_user_image: Этот маршрут удаляет изображение, как из S3, так и из базы данных. Возвращает сообщение о успешном удалении.
  • upload_user_image_with_url: Обрабатывает загрузку изображения через предподписанный URL, что позволяет пользователю загружать файлы напрямую в S3.
  • update_user_image_with_url: Аналогично предыдущему маршруту, но для обновления существующего изображения.

Файл media/routers.py включает дополнительные маршруты для обработки изображений, включая получение информации о изображении и выполнение фоновых задач обработки изображений.

from typing import List
from fastapi import APIRouter, Depends, Form, Query
from pydantic import UUID4
from sqlalchemy.ext.asyncio import AsyncSession
from src.crud import current_active_user, image_dao
from src.db.deps import get_async_session
from src.models.user_model import User
from src.tasks.image_tasks import process_image_task
from starlette import status

media_router = APIRouter()

@media_router.get("/images/{image_id}", status_code=status.HTTP_200_OK)
async def get_image(image_id: UUID4, db_session: AsyncSession = Depends(get_async_session)):
    image = await image_dao.get(id=image_id, db_session=db_session)
    return image

@media_router.get("/images/", status_code=status.HTTP_200_OK)
async def get_images(
    image_ids: List[UUID4] = Query(...),
    db_session: AsyncSession = Depends(get_async_session)
):
    images = await image_dao.get_by_ids(list_ids=image_ids, db_session=db_session)
    return images

@media_router.post("/images/treatment/", status_code=status.HTTP_200_OK)
async def create_task_image_treatment(
    image_id: UUID4 = Form(...),
    _: User = Depends(current_active_user), db_session: AsyncSession = Depends(get_async_session)
):
    await process_image_task.kiq(image_id=image_id)
    return 'Done'
  • get_image: Возвращает информацию о конкретном изображении по его идентификатору.
  • get_images: Возвращает список изображений по их идентификаторам.
  • create_task_image_treatment: Создает задачу на фоновую обработку изображения.

Валидация и проверка файлов изображений

При работе с файлами изображений в веб-приложении важно убедиться, что загружаемые файлы соответствуют необходимым требованиям. Валидация изображений помогает предотвратить загрузку некорректных или потенциально опасных файлов, что повышает безопасность и стабильность приложения. 

При разработке API для загрузки изображений необходимо учитывать следующие аспекты:

  1. Тип файла: Убедитесь, что загружаемые файлы действительно являются изображениями (например, PNG, JPEG, GIF, WebP).
  2. Размер файла: Ограничение максимального размера загружаемых изображений, чтобы предотвратить перегрузку системы.
  3. Имя файла: Проверка корректности имени файла и его расширения.

В маршрутах, которые мы рассмотрели ранее, мы можем добавить валидацию загружаемых файлов с использованием дополнительных проверок.

from fastapi import APIRouter, Depends, File, Form, UploadFile, HTTPException
from pydantic import UUID4
from src.crud import UserManager, get_current_active_user_and_manager
from src.schemas.image_schema import UploadImageResponse, UploadUrlImageResponse
from starlette import status
import filetype

account_router = APIRouter()

MAX_IMAGE_SIZE = 5 * 1024 * 1024  # 5 MB

def validate_image_file(file: UploadFile):
    # Проверка на размер файла
    if file.size > MAX_IMAGE_SIZE:
        raise HTTPException(status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE,
                            detail="Uploaded file exceeds size limit of 5MB.")

    # Проверка на тип файла
    kind = filetype.guess(file.file.read(1024))  # Чтение первых 1024 байт файла для определения типа
    if kind is None or kind.mime.split('/')[0] != 'image':
        raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST,
                            detail="Invalid file type. Only image files are allowed.")
    
    file.file.seek(0)  # Сброс указателя файла после чтения

@account_router.post("/upload-image", response_model=UploadImageResponse, status_code=status.HTTP_201_CREATED)
async def upload_user_image(
    file: UploadFile = File(...),
    is_main: bool = Form(...),
    user_manager: UserManager = Depends(get_current_active_user_and_manager)
):
    # Валидация файла перед загрузкой
    validate_image_file(file)
    
    image = await user_manager.save_user_image(file=file, is_main=is_main)
    return UploadImageResponse(id=image.id, file=image.file, is_main=image.is_main)

@account_router.put("/update-image/{image_id}", response_model=UploadImageResponse, status_code=status.HTTP_200_OK)
async def update_user_image(
    image_id: UUID4,
    file: UploadFile = File(...),
    is_main: bool = Form(...),
    user_manager: UserManager = Depends(get_current_active_user_and_manager)
):
    # Валидация файла перед загрузкой
    validate_image_file(file)
    
    image = await user_manager.update_user_image(image_id=image_id, file=file, is_main=is_main)
    return UploadImageResponse(id=image.id, file=image.file, is_main=image.is_main)
  • Размер файла: Ограничение размера файла до 5 МБ (параметр MAX_IMAGE_SIZE) с помощью проверки file.size. Если размер файла превышает лимит, вызывается исключение HTTPException с кодом 413 (Payload Too Large).
  • Тип файла: Использование библиотеки filetype для проверки типа файла. Функция filetype.guess() определяет тип файла на основе первых 1024 байт. Если файл не является изображением, выбрасывается исключение HTTPException с кодом 400 (Bad Request).
  • Сброс указателя файла: После чтения файла для проверки типа, необходимо сбросить указатель на начало файла (file.file.seek(0)), чтобы последующие операции загрузки могли работать с файлом правильно.

Интеграция с фронтендом

Настройка клиентской части для работы с Presigned URL

Интеграция клиентской части, написанной на VUE.js, с серверной частью FastAPI для загрузки изображений в S3 с использованием Presigned URL предоставляет всего лишь тестовое представление для работы с файлами. Этот подход позволяет загружать файлы напрямую в хранилище S3, минуя серверное приложение. В результате снижается нагрузка на сервер, что улучшает производительность и ускоряет обработку данных.

Мы уже частично описали работу серверной части, включая генерацию Presigned URL и управление загрузками. В следующей статье мы рассмотрим, как использовать Taskiq для асинхронной обработки задач, таких как преобразование изображений и приведение их к нужному состоянию. А сейчас вернемся к VUE.js.

Шаги настройки:

  1. Аутентификация: Перед загрузкой изображений необходимо убедиться, что пользователь аутентифицирован. В данном примере аутентификация осуществляется через JWT-токен, который сохраняется в localStorage.
  2. Выбор файлов: Пользователь выбирает файлы для загрузки через <input type="file">. После выбора файла происходит его предварительная обработка на клиенте, в том числе создание превью и подготовка к отправке.
  3. Генерация Presigned URL: Серверная часть FastAPI генерирует предподписанный URL для загрузки файла напрямую в S3. Этот URL затем используется на клиентской стороне для отправки файла.
  4. Загрузка файлов в S3: Используя сгенерированный Presigned URL, VUE.js отправляет файл напрямую в S3. В случае успешной загрузки сервер FastAPI уведомляется о завершении операции.

Мы настроили интерфейс для загрузки изображений, а также соответствующую логику на VUE.js для работы с Presigned URL.

<!DOCTYPE html>
<html lang="en">
<head>
  <meta charset="UTF-8">
  <meta name="viewport" content="width=device-width, initial-scale=1.0">
  <title>Image Upload with Vue.js</title>
  <link href="/static/css/app.css" rel="stylesheet">
  <script src="/static/js/vue.js"></script>
</head>
<body>
  <div id="app">
    <div id="auth-section" v-if="!isAuthenticated">
      <h2>Login</h2>
      <input type="text" v-model="username" placeholder="Username" />
      <input type="password" v-model="password" placeholder="Password" />
      <button id="upload-button" @click="login">Login</button>
    </div>

    <div id="add-image" v-if="isAuthenticated">
      <h2>Upload Your Images</h2>

      <div>
        <label for="objectApp">Select Application:</label>
        <select v-model="objectApp" id="objectApp">
          <option value="users-account">Users Account</option>
          <option value="another-app">Another App</option>
        </select>
      </div>

      <input type="file" multiple accept="image/*" @change="onFileChange" />

      <div class="image-grid">
        <div v-for="(image, index) in inputImages" :key="image.front_key" class="grid-item">
          <img :src="image.preview" class="preview">
          <label>
            <input type="checkbox" :checked="image.is_main" @change="setAsMainImage(index)" />
            Set as main image
          </label>
        </div>        
      </div>

      <button id="upload-button" @click="createProcess">Upload</button>
    </div>
  </div>

  <script src="/static/js/app.js"></script>
</body>
</html>
  • Аутентификация (login): Пользователь вводит имя пользователя и пароль. После успешного входа токен сохраняется в localStorage и используется для последующих запросов.
  • Выбор и предварительный просмотр изображений (onFileChange): Пользователь выбирает одно или несколько изображений. Каждое изображение отображается в виде превью, и для каждого из них можно выбрать флаг is_main, указывающий, что это основное изображение.
  • Загрузка файлов (createProcess): После выбора файлов и их подготовки вызывается createProcess, которая отправляет запрос на сервер FastAPI для получения Presigned URL и затем загружает изображения напрямую в S3.
  • Обработка ошибок: В случае, если токен устарел или произошла другая ошибка, пользователю предлагается войти заново.

Пошаговое руководство по загрузке изображений с клиента

Здесь представлено пошаговое руководство по загрузке изображений с клиентской стороны, интегрируя его с серверной частью на FastAPI. Это руководство поможет понять, как VUE.js взаимодействует с FastAPI для работы с изображениями и как настроить рабочий процесс для максимальной эффективности.

  1. Аутентификация пользователя

    • Перед загрузкой изображений пользователь должен пройти аутентификацию. Это обеспечит безопасность процесса загрузки и гарантирует, что доступ к функционалу имеют только авторизованные пользователи.
    async login() {
      const params = new URLSearchParams();
      params.append('username', this.username);
      params.append('password', this.password);
    
      try {
        const response = await fetch('/auth/jwt/login', {
          method: 'POST',
          headers: {
            'Content-Type': 'application/x-www-form-urlencoded',
            'Accept': 'application/json',
          },
          body: params.toString(),
        });
    
        if (response.ok) {
          const data = await response.json();
          this.token = data.access_token;
          localStorage.setItem('token', this.token);
        } else {
          console.error('Login failed:', response.statusText);
        }
      } catch (error) {
        console.error('Error during login:', error);
      }
    }
  2. Выбор изображений для загрузки

    • После успешной аутентификации пользователь выбирает изображения с помощью <input type="file">. Каждый выбранный файл отображается в виде миниатюры для предварительного просмотра.
    onFileChange(event) {
      const files = event.target.files;
    
      for (const file of files) {
        if (file && file.type.startsWith('image/')) {
          const reader = new FileReader();
          reader.onload = (e) => {
            this.inputImages.push({
              front_key: this.generateUniqueId(),
              file_name: file.name,
              file: file,
              preview: e.target.result,
              uploaded: false,
              image_id: null,
              is_main: false,
            });
          };
          reader.readAsDataURL(file);
        }
      }
    }
  3. Получение Presigned URL и загрузка файла

    • VUE.js отправляет запрос на сервер FastAPI для получения Presigned URL, затем загружает файл напрямую в S3. Если загрузка прошла успешно, сервер уведомляется об успешной операции.
    async createProcess() {
      if (!this.token) {
        console.error('No token found. Please login first.');
        return;
      }
    
      for (let i = 0; i < this.inputImages.length; i++) {
        const image = this.inputImages[i];
        if (!image.uploaded) {
          const formData = new FormData();
          formData.append('file_name', image.file_name);
          formData.append('is_main', image.is_main);
          formData.append('front_key', image.front_key);
    
          const uploadUrl = `/${this.objectApp}/upload-url-image`;
    
          try {
            const response = await fetch(uploadUrl, {
              method: 'POST',
              headers: {
                'Authorization': `Bearer ${this.token}`,
              },
              body: formData,
            });
    
            if (response.ok) {
              const data = await response.json();
              image.image_id = data.image.image.id;
              const presignedUrl = data.image.url;
    
              const uploadResult = await this.uploadFile(image.file, presignedUrl);
    
              if (uploadResult) {
                this.changeFileStatus(image.front_key, image.image_id);
              }
            } else if (response.status === 401) {
              this.token = null;
              localStorage.removeItem('token');
              alert('Your session has expired. Please log in again.');
              window.location.reload();
            } else {
              console.error('Error uploading image:', response.statusText);
            }
          } catch (error) {
            console.error('Error in createProcess:', error);
          }
        }
      }
    }
  4. Обработка ошибок и управление состоянием

    • Если во время загрузки или аутентификации происходит ошибка, клиент должен корректно обработать её, уведомив пользователя и предложив повторить операцию.
    async uploadFile(file, presignedUrl) {
      try {
        const uploadResponse = await fetch(presignedUrl, {
          method: 'PUT',
          body: file,
        });
    
        return uploadResponse.ok;
      } catch (error) {
        console.error('Error in uploadFile:', error);
        return false;
      }
    }
    
    async changeFileStatus(front_key, image_id) {
      const image = this.inputImages.find(image => image.front_key === front_key);
      if (image) {
        image.uploaded = true;
    
        try {
          const response = await fetch('/assets/images/treatment/', {
            method: 'POST',
            headers: {
              'Authorization': `Bearer ${this.token}`,
              'Content-Type': 'application/x-www-form-urlencoded',
            },
            body: new URLSearchParams({
              image_id: image_id.toString(),
            }),
          });
    
          if (response.ok) {
            console.log('Image treatment initiated successfully.');
          } else if (response.status === 401) {
            this.token = null;
            localStorage.removeItem('token');
            alert('Your session has expired. Please log in again.');
            window.location.reload();
          } else {
            console.error('Image treatment failed:', response.statusText);
          }
        } catch (error) {
          console.error('Error in changeFileStatus:', error);
        }
      }
    }

Обработка ответов от сервера и управление состоянием приложения

В процессе взаимодействия с сервером, клиентская часть приложения должна корректно обрабатывать ответы от FastAPI. Это включает управление состоянием приложения, отображение результатов пользователю и обработку ошибок.

Основные задачи:

  1. Обработка успешных ответов: В случае успешной загрузки изображения необходимо обновить состояние приложения, пометив файл как загруженный и инициировав дальнейшие процессы, такие как обработка изображения на сервере.
  2. Обработка ошибок: Если загрузка не удалась, важно корректно обработать ошибки, например, обновить состояние аутентификации, предложить пользователю повторить попытку или показать сообщение об ошибке.
  3. Управление состоянием: Сохранение данных о загруженных изображениях, токенах и состоянии аутентификации в локальном хранилище (например, localStorage) позволяет сохранять информацию между сеансами пользователя.
async uploadFile(file, presignedUrl) {
  try {
    const uploadResponse = await fetch(presignedUrl, {
      method: 'PUT',
      body: file,
    });

    return uploadResponse.ok;
  } catch (error) {
    console.error('Error in uploadFile:', error);
    return false;
  }
}

async changeFileStatus(front_key, image_id) {
  const image = this.inputImages.find(image => image.front_key === front_key);
  if (image) {
    image.uploaded = true;

    try {
      const response = await fetch('/assets/images/treatment/', {
        method: 'POST',
        headers: {
          'Authorization': `Bearer ${this.token}`,
          'Content-Type': 'application/x-www-form-urlencoded',
        },
        body: new URLSearchParams({
          image_id: image_id.toString(),
        }),
      });

      if (response.ok) {
        console.log('Image treatment initiated successfully.');
      } else if (response.status === 401) {
        this.token = null;
        localStorage.removeItem('token');
        alert('Your session has expired. Please log in again.');
        window.location.reload();
      } else {
        console.error('Image treatment failed:', response.statusText);
      }
    } catch (error) {
      console.error('Error in changeFileStatus:', error);
    }
  }
}

Продолжение следует >>>

PS: Код проекта доступен на GitHub по следующей ссылке: https://github.com/exp-ext/fastapi_template. Обратите внимание, что он может не полностью соответствовать описанному здесь, так как проект находится в процессе развития. В дальнейшем шаблон будет дополняться новыми методами и проходить рефакторинг.


Читайте также:

ChatGPT
Eva
💫 Eva assistant