Автоматизация бэкапа и восстановления PostgreSQL в FastAPI

Автоматизация бэкапа и восстановления PostgreSQL в FastAPI

Картинка к публикации: Автоматизация бэкапа и восстановления PostgreSQL в FastAPI

Введение

Зачем нужно резервное копирование баз данных?

Резервное копирование баз данных является фундаментальной практикой в управлении данными, обеспечивающей защиту и сохранность информации в случае возникновения непредвиденных обстоятельств. Разберёмся, почему это настолько важно и какие проблемы решает.

1. Предотвращение потери данных

Современные информационные системы полагаются на базы данных для хранения критически важных данных, включая информацию о клиентах, транзакциях, продуктах и многом другом. Потеря таких данных может привести к значительным финансовым и репутационным потерям для организации. Причины потери данных могут быть различными:

  • Технические сбои: Жёсткие диски могут выходить из строя, серверы могут перестать функционировать из-за аппаратных проблем или сбоев в электропитании.
  • Человеческий фактор: Ошибки персонала, такие как случайное удаление данных или выполнение некорректных команд, могут привести к утрате важной информации.
  • Кибератаки и вредоносные программы: Вредоносное ПО, вирусы и хакерские атаки могут привести к утрате или порче данных.
  • Природные катастрофы: Пожары, наводнения и другие природные катастрофы могут физически уничтожить серверы и хранилища данных.

Резервное копирование помогает минимизировать риск потери данных, создавая копии информации, которые можно восстановить в случае необходимости. Это позволяет организациям быстро восстановить свои операции после инцидентов, минимизируя простои и убытки.

2. Снижение времени простоя

Время простоя информационных систем может стоить компании больших денег, особенно если речь идёт о бизнесах, которые зависят от онлайн-операций. Наличие актуальных резервных копий позволяет значительно сократить время на восстановление систем и возврат их к нормальной работе. Это особенно важно для:

  • Электронной коммерции: Любые задержки в доступе к данным могут привести к потерям в продажах.
  • Финансовых учреждений: Быстрый доступ к данным критичен для выполнения транзакций и обслуживания клиентов.
  • Медицинских учреждений: Задержки в доступе к пациентской информации могут иметь серьёзные последствия для здоровья пациентов.

3. Соответствие нормативным требованиям

Во многих отраслях наличие резервных копий является обязательным требованием для соответствия нормативным актам и стандартам. Например:

  • Финансовый сектор: Банки и другие финансовые учреждения обязаны обеспечивать безопасность и доступность данных клиентов в соответствии с требованиями регулирующих органов.
  • Медицина: Законы о защите персональных данных, такие как HIPAA в США, требуют надлежащего хранения и резервного копирования медицинских данных.
  • Правительство: Государственные учреждения должны соблюдать строгие стандарты по защите и резервному копированию данных.

Несоблюдение этих требований может привести к серьёзным штрафам и санкциям.

4. Обеспечение целостности данных

Регулярное резервное копирование позволяет проверить целостность данных и убедиться в их правильности и полноте. Это особенно важно при внесении крупных изменений в базу данных или перед выполнением обновлений систем.

5. Историческая сохранность данных

Резервные копии позволяют хранить исторические данные, что может быть полезно для анализа трендов, создания отчётов и выполнения аудитов. Исторические данные помогают понять, как изменялись данные с течением времени, и могут быть ценным ресурсом для бизнес-анализа.

Важность резервного копирования баз данных трудно переоценить. Это не просто техническая необходимость, а ключевой компонент стратегии управления рисками и обеспечения непрерывности бизнеса. Надежные и регулярные резервные копии обеспечивают спокойствие и уверенность в том, что данные будут доступны и защищены даже в самых неблагоприятных ситуациях. 

Методы резервного копирования и восстановления PostgreSQL

Резервное копирование и восстановление данных в PostgreSQL – это ключевые операции, обеспечивающие сохранность и доступность информации в случае непредвиденных обстоятельств. В PostgreSQL существует несколько методов резервного копирования и восстановления данных, каждый из которых имеет свои особенности, преимущества и недостатки. Рассмотрим их более детально.

1. Логическое резервное копирование (pg_dump)

Логическое резервное копирование – это процесс создания копии структуры и данных базы данных в формате, пригодном для последующего восстановления. PostgreSQL предоставляет утилиту pg_dump, которая выполняет эту задачу.

  • Команда pg_dump: Эта утилита экспортирует данные и структуру базы данных в виде SQL-скриптов или других форматов. Команда может быть выполнена как для всей базы данных, так и для отдельных таблиц.
pg_dump -h <host> -U <user> -d <dbname> -f backup.sql
  • Преимущества:
    • Простота использования и понимания.
    • Позволяет создавать резервные копии отдельных таблиц или всей базы данных.
    • Формат файла легко читается и редактируется.
  • Недостатки:
    • Медленное восстановление для больших баз данных.
    • Высокая нагрузка на систему при создании копий больших объемов данных.

2. Физическое резервное копирование (pg_basebackup)

Физическое резервное копирование представляет собой создание поблочной копии файлов, из которых состоит база данных. PostgreSQL предоставляет утилиту pg_basebackup для этой цели.

  • Команда pg_basebackup: Эта утилита создаёт физическую копию всей базы данных, включая все файлы и данные, что позволяет быстро восстановить состояние системы.
pg_basebackup -h <host> -D /path/to/backup -U <user> -Fp -Xs -P
  • Преимущества:
    • Быстрое восстановление, особенно для больших баз данных.
    • Возможность использования для создания реплик баз данных.
  • Недостатки:
    • Требует больше дискового пространства для хранения резервных копий.
    • Может быть сложнее в настройке и управлении по сравнению с логическим резервным копированием.

3. Резервное копирование с использованием WAL (Write-Ahead Logging)

Write-Ahead Logging (WAL) – это метод ведения журналов изменений, который используется PostgreSQL для обеспечения целостности данных. Резервное копирование с использованием WAL включает архивирование файлов журналов транзакций, что позволяет восстановить базу данных до определенного момента времени.

  • Команда архивирования WAL: Настройка архивации WAL позволяет автоматически сохранять файлы журналов изменений, которые затем можно использовать для восстановления.
archive_mode = on
archive_command = 'cp %p /path/to/archive/%f'
  • Преимущества:
    • Возможность восстановления базы данных до любого момента времени.
    • Высокая надёжность и целостность данных.
  • Недостатки:
    • Требует тщательной настройки и мониторинга.
    • Зависит от постоянного хранения и управления архивами WAL.

4. Использование сторонних инструментов

Существуют также сторонние инструменты, которые могут помочь в автоматизации и управлении процессом резервного копирования и восстановления PostgreSQL. Некоторые из них включают:

  • Barman: Инструмент для управления резервным копированием и восстановлением PostgreSQL, который предоставляет мощные возможности для автоматизации этих процессов.
  • PgBouncer: Легкий прокси-сервер для PostgreSQL, который может помочь с управлением соединениями, что важно при выполнении резервного копирования и восстановления.
  • OmniDB: Веб-интерфейс для управления базами данных PostgreSQL, который также включает возможности резервного копирования и восстановления.

Выбор метода резервного копирования и восстановления зависит от конкретных требований вашего проекта, включая объем данных, частоту изменений, доступное дисковое пространство и требуемое время восстановления. Логическое резервное копирование подходит для небольших и средних баз данных, тогда как физическое резервное копирование и использование WAL более эффективны для крупных и критически важных систем. Сторонние инструменты могут дополнить эти методы, предоставляя дополнительные возможности и упрощая управление процессами.

Введение в инструменты и технологии

Мы рассмотрим метод самого простого логического резервного копирования и восстановления базы данных. Использовать будем PostgreSQL, Docker, FastAPI, Typer, MinIO и S3. Понимание их ролей и возможностей позволит вам создать надёжную и масштабируемую систему для управления базой данных.

1. PostgreSQL – это открытая система управления реляционными базами данных (СУБД), известная своей надёжностью, гибкостью и расширяемостью. PostgreSQL поддерживает множество функций, включая транзакции, целостность данных, расширенные запросы и индексы, а также возможность работы с большими объемами данных.

  • Основные возможности:
    • Поддержка сложных запросов и индексов.
    • Расширяемость с помощью пользовательских функций и типов данных.
    • Механизмы резервного копирования и восстановления (pg_dump, pg_basebackup, WAL).

PostgreSQL широко используется в различных приложениях, от небольших стартапов до крупных корпоративных систем, благодаря своей производительности и богатому набору функций.

2. Docker – это платформа для разработки, доставки и запуска приложений в контейнерах. Контейнеризация позволяет изолировать приложения и их зависимости, что упрощает их развертывание и управление.

  • Преимущества Docker:
    • Изоляция приложений и их зависимостей.
    • Упрощённое управление и развертывание приложений.
    • Поддержка масштабируемости и портативности.

Использование Docker для развертывания PostgreSQL и других компонентов позволяет создать легко управляемую и воспроизводимую среду для разработки и эксплуатации.

3. FastAPI – это современный, высокопроизводительный веб-фреймворк для создания API на Python. Он предназначен для обеспечения высокой производительности и удобства разработки.

  • Основные особенности FastAPI:
    • Высокая производительность.
    • Поддержка асинхронного программирования.
    • Автоматическая генерация документации API (OpenAPI и Swagger).

FastAPI идеально подходит для создания микросервисов и веб-приложений, требующих высокой производительности и масштабируемости.

4. Typer – это библиотека для создания командных интерфейсов (CLI) на Python, основанная на тех же принципах, что и FastAPI. Она позволяет легко создавать интуитивно понятные CLI-приложения.

  • Основные возможности Typer:
    • Простота использования и интеграция с FastAPI.
    • Поддержка автодокументирования команд.
    • Гибкость и расширяемость.

Typer упрощает создание CLI-интерфейсов для управления задачами резервного копирования и восстановления базы данных, делая процесс интуитивно понятным и удобным.

5. MinIO – это высокопроизводительная система хранения объектов, совместимая с Amazon S3. MinIO предоставляет простой и масштабируемый способ хранения больших объемов данных.

  • Преимущества MinIO:
    • Полная совместимость с S3 API.
    • Высокая производительность и масштабируемость.
    • Простота развертывания и управления.

MinIO может использоваться для хранения резервных копий баз данных, обеспечивая надёжное и доступное хранилище.

Создание приложения

Установка и настройка Docker и Docker Compose

Для успешного развертывания приложений с использованием контейнеров необходимо установить и настроить Docker и Docker Compose. Эти инструменты обеспечивают контейнеризацию и оркестрацию многокомпонентных приложений, что значительно упрощает их развертывание и управление. 

1. Установка Docker

Docker можно установить на различных операционных системах. Рассмотрим основные шаги для каждой из них.

  1. Перейдите на официальный сайт Docker и выполните инструкции согласно вашей операционной системы.
  2. Запустите установочный файл и следуйте инструкциям мастера установки.
  3. После завершения установки перезагрузите компьютер, чтобы изменения вступили в силу.
  4. Откройте Docker Desktop и убедитесь, что Docker работает корректно.

2. Установка Docker Compose

Docker Compose – это инструмент для определения и запуска многоконтейнерных Docker приложений. Установка Docker Compose также зависит от операционной системы.

Docker Compose включен в состав Docker Desktop для Windows и macOS, поэтому дополнительных шагов не требуется. Для установки Docker Compose на Linux выполните следующие команды:

  1. Скачайте текущую стабильную версию Docker Compose:

    sudo curl -L "https://github.com/docker/compose/releases/download/$(curl -s https://api.github.com/repos/docker/compose/releases/latest | grep 'tag_name' | cut -d\" -f4)/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose
  2. Сделайте бинарный файл Docker Compose исполняемым:

    sudo chmod +x /usr/local/bin/docker-compose
  3. Проверьте установку Docker Compose:

    docker-compose --version

Создание Docker Compose файла для FastAPI и PostgreSQL

Использование Docker Compose позволяет легко развернуть и управлять многокомпонентными приложениями. Ниже мы подробно рассмотрим процесс создания и настройки Docker Compose файла для развертывания приложения на базе FastAPI и PostgreSQL. Этот процесс включает определение сервисов, настройку переменных среды и управление зависимостями между контейнерами.

1. Определение структуры проекта

Прежде чем начать создание Docker Compose файла, важно организовать структуру проекта. Создайте рабочую директорию и необходимые поддиректории:

mkdir my_project
cd my_project
mkdir app
touch app/main.py
touch Dockerfile
touch docker-compose.yml

2. Создание Dockerfile для FastAPI

Dockerfile содержит инструкции по сборке образа для вашего приложения. В нашем случае это будет приложение на базе FastAPI.

# Используем официальный образ Python
FROM python:3.11

# Устанавливаем рабочую директорию внутри контейнера
WORKDIR /app
ENV PYTHONPATH="${PYTHONPATH}:/app"

# Копируем файл зависимостей и устанавливаем их
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Устанавливаем PostgreSQL клиентские инструменты
RUN apt-get update && apt-get install -y postgresql-client

# Копируем код приложения в рабочую директорию
COPY . .

Создайте файл requirements.txt и добавьте стандартные зависимости для проекта на fastapi:

fastapi
sqlalchemy[asyncio]
asyncpg
pydantic-settings
alembic
uvicorn
typer[all]
boto3
python-dotenv

3. Настройка файла docker-compose.yml

Файл docker-compose.yml содержит описание всех сервисов, которые будут развернуты. В нашем случае это FastAPI и PostgreSQL. Minio необходим лишь для теста того как БД будет бэкапится.

services:
  db:
    image: postgres:14-alpine
    restart: always
    env_file:
      - ./.env
    volumes:
      - db_data:/var/lib/postgresql/data

  web:
    build: .
    command: uvicorn app.main:app --host 0.0.0.0 --port 8000 --reload
    restart: always
    volumes:
      - .:/app
    ports:
      - "8000:8000"
    env_file:
      - ./.env
    depends_on:
      - db

  minio:
    image: minio/minio:latest
    restart: always
    command: server --console-address ":9001" /data
    ports:
      - '9000:9000'
      - '9090:9001'
    volumes:
      - upload_minio_volume:/data
    env_file:
      - ./.env

volumes:
  db_data:
  upload_minio_volume:

4. Создание файла .env

Файл .env используется для хранения конфиденциальной информации и переменных среды, которые будут использоваться в docker-compose.yml.

POSTGRES_DB=mydatabase
POSTGRES_USER=myuser
POSTGRES_PASSWORD=mypassword
POSTGRES_HOST=db
POSTGRES_PORT=5432

MINIO_ACCESS_KEY=myaccesskey
MINIO_SECRET_KEY=mysecretkey
MINIO_ENDPOINT=http://minio:9000
MINIO_BUCKET=mybucket
MINIO_ROOT_USER=myrootaccesskey
MINIO_ROOT_PASSWORD=myrootpassword

5. Настройка FastAPI приложения

Создайте простое FastAPI приложение в файле app/main.py:

from fastapi import FastAPI

app = FastAPI()

@app.get("/")
def read_root():
    return {"Hello": "World"}

6. Запуск Docker Compose

Теперь, когда все файлы созданы и настроены, вы можете запустить Docker Compose для развертывания вашего приложения и базы данных PostgreSQL.

Запустите команду:

docker-compose up --build 

Docker Compose начнёт процесс сборки образов и развертывания контейнеров для ваших сервисов. После завершения процесса вы увидите вывод логов из контейнеров.

Не забудьте создать бакет в minio через консоль, получить актуальные  ACCESS_KEY и SECRET_KEY, а после обновить .env. Консоль доступен по адресу http://localhost:9090.

7. Проверка работы приложения

После успешного развертывания откройте браузер и перейдите по адресу http://localhost:8000. Вы должны увидеть JSON ответ {"Hello": "World"}, что подтверждает корректную работу вашего FastAPI приложения.

8. Управление контейнерами

Docker Compose предоставляет удобные команды для управления контейнерами:

  • Остановить все контейнеры:

    docker-compose down
  • Перезапустить контейнеры:

    docker-compose restart
  • Просмотреть логи контейнеров:

    docker-compose logs
  • Выполнить команду внутри контейнера:

    docker-compose exec <service_name> <command>

9. Безопасность и управление файлами .env

Файл .env содержит конфиденциальные данные, поэтому важно соблюдать меры предосторожности:

  • Добавьте .env в .gitignore

    Пример .gitignore:

    # ... тут множество других исключений для push`а в git 
    .env
  • Добавьте .env в .dockerignore

    Пример .dockerignore:

    **/.git
    **/.gitignore
    **/.vscode
    **/coverage
    **/.aws
    **/.ssh
    **/.DS_Store
    **/venv
    **/__pycache__
    **/.env
  • Используйте секреты и защищённые хранилища: Для продакшн-сред лучше использовать специализированные решения для управления секретами, такие как AWS Secrets Manager, HashiCorp Vault или Kubernetes Secrets.

Настройка приложения FastAPI

Создание приложения на базе FastAPI, которое поддерживает резервное копирование базы данных, включает в себя не только создание веб-сервиса, но и настройку удобного интерфейса командной строки (CLI) для выполнения различных задач. Для этого мы будем использовать Typer – библиотеку для создания CLI-приложений на Python, которая имеет схожий синтаксис с FastAPI, что делает её идеальным выбором для нашего проекта.

1. Структура проекта: 

my_project/
├── alembic/
│   ├── env.py
│   ├── script.py.mako
│   ├── versions/
│   │   └── <migration_files>
├── app/
│   ├── __init__.py
│   ├── config.py
│   └── main.py
│   └── db/
│      ├── __init__.py
│      ├── deps.py
│      └── session.py
│   └── models/
│      ├── __init__.py
│      ├── base_model.py
│      └── time_model.py
├── alembic.ini
├── cli.py
├── .dockerignore
├── .gitignore
├── .env
├── Dockerfile
├── docker-compose.yml
└── requirements.txt

2. Создадим файл config.py с данными из .env, файл session.pyс асинхронным коннектором и файл deps.py с функцией управления сессией БД.

# config.py
import os
from typing import Any

from dotenv import load_dotenv
from pydantic import PostgresDsn, field_validator
from pydantic_core.core_schema import FieldValidationInfo
from pydantic_settings import BaseSettings, SettingsConfigDict
import boto3

load_dotenv()


class Settings(BaseSettings):

    MINIO_ACCESS_KEY: str = os.getenv("MINIO_ACCESS_KEY")
    MINIO_SECRET_KEY: str = os.getenv("MINIO_SECRET_KEY")
    MINIO_ENDPOINT: str = os.getenv("MINIO_ENDPOINT")
    MINIO_BUCKET: str = os.getenv("MINIO_BUCKET")

    DATABASE_HOST: str = os.getenv("POSTGRES_HOST")
    DATABASE_PORT: int = os.getenv("POSTGRES_PORT")
    DATABASE_NAME: str = os.getenv("POSTGRES_DB")
    DATABASE_USER: str = os.getenv("POSTGRES_USER")
    DATABASE_PASSWORD: str = os.getenv("POSTGRES_PASSWORD")

    ASYNC_DATABASE_URI: PostgresDsn | str = ""
    S3_CLIENT: Any = None

    @field_validator("ASYNC_DATABASE_URI", mode="after")
    def assemble_async_db_connection(cls, v: str | None, info: FieldValidationInfo) -> Any:
        if isinstance(v, str):
            if v == "":
                return PostgresDsn.build(
                    scheme="postgresql+asyncpg",
                    username=info.data["DATABASE_USER"],
                    password=info.data["DATABASE_PASSWORD"],
                    host=info.data["DATABASE_HOST"],
                    port=info.data["DATABASE_PORT"],
                    path=f"/{info.data['DATABASE_NAME']}",
                )
        return v

    model_config = SettingsConfigDict(
        case_sensitive=True, env_file=os.path.expanduser("~/.env")
    )

    @field_validator("S3_CLIENT", mode="after")
    def create_s3_client(cls, v: Any, info: FieldValidationInfo) -> Any:
        return boto3.client(
            's3',
            use_ssl=True,
            aws_access_key_id=info.data["MINIO_ACCESS_KEY"],
            aws_secret_access_key=info.data["MINIO_SECRET_KEY"],
            endpoint_url=info.data["MINIO_ENDPOINT"],
        )


settings = Settings()
# db/session.py
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.ext.asyncio.session import AsyncSession
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import AsyncAdaptedQueuePool
from app.config import settings

engine = create_async_engine(
    str(settings.ASYNC_DATABASE_URI),
    echo=False,
    poolclass=AsyncAdaptedQueuePool,
)
AsyncSessionLocal = sessionmaker(
    autocommit=False,
    autoflush=False,
    bind=engine,
    class_=AsyncSession,
    expire_on_commit=False,
)
# db/deps.py
from typing import AsyncGenerator

from app.db.session import AsyncSessionLocal
from sqlalchemy.ext.asyncio import AsyncSession

async def get_db() -> AsyncGenerator[AsyncSession, None]:
    async with AsyncSessionLocal() as session:
        yield session

3. Развернем Alembic для работы с миграциями

Инициализируем Alembic в нашем проекте:

alembic init alembic

Это создаст каталог alembic и файл alembic.ini в папке app проекта.

исправим файл env.py

from __future__ import with_statement

import asyncio
from logging.config import fileConfig

from sqlalchemy.ext.asyncio import create_async_engine

from alembic import context
from app.config import settings
from app.models import *
from app.models.base_model import Base

config = context.config

if config.config_file_name is not None:
    fileConfig(config.config_file_name)

target_metadata = Base.metadata
db_url = str(settings.ASYNC_DATABASE_URI)

def run_migrations_offline():
    context.configure(
        url=db_url, target_metadata=target_metadata, literal_binds=True, compare_type=True, dialect_opts={"paramstyle": "named"}
    )

    with context.begin_transaction():
        context.run_migrations()

def do_run_migrations(connection):
    context.configure(connection=connection, target_metadata=target_metadata)

    with context.begin_transaction():
        context.run_migrations()

async def run_migrations_online():
    connectable = create_async_engine(db_url, echo=True, future=True)

    async with connectable.connect() as connection:
        await connection.run_sync(do_run_migrations)

if context.is_offline_mode():
    run_migrations_offline()
else:
    asyncio.run(run_migrations_online())

4. Создадим базовую модель в папке models

# models/base_model.py
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

модель которая будет хранить время обращения к url

# models/time_model.py
from datetime import datetime

from app.models.base_model import Base
from sqlalchemy import Column, DateTime, Integer

class AccessTime(Base):
    __tablename__ = 'access_times'
    id = Column(Integer, primary_key=True)
    created_at = Column(DateTime, default=datetime.utcnow)

в файл __init__.py импортируем созданную модель, чтобы Alembic мог их обнаружить и использовать для создания и миграции базы данных.

# models/__init__.py
from .time_model import AccessTime

Выполним миграцию и коммит в контейнере, находясь на уровне где лежит файл docker-compose.yml. Ведь БД у нас работает в контейнере, без открытого порта наружу.

docker-compose exec web alembic revision --autogenerate -m 'initial migration'

и далее

docker-compose exec web alembic upgrade head

5. Дополнение основного приложения FastAPI

Сделаем так, чтоб он отдавал время создания записи в БД. Заодно и убедимся что она создана.

# app/main.py
from app.db.deps import get_db
from fastapi import Depends, FastAPI
from sqlalchemy.ext.asyncio import AsyncSession

from app.models.time_model import AccessTime

app = FastAPI()


@app.get("/")
async def read_root(db: AsyncSession = Depends(get_db)):
    new_access = AccessTime()
    db.add(new_access)
    await db.commit()
    await db.refresh(new_access)
    return {"message": "Welcome to FastAPI with Typer!", "created_at": new_access.created_at}

Реализация функций backup

Создание функции резервного копирования

Функция резервного копирования базы данных является одной из ключевых задач в управлении данными. Ее цель – создать резервную копию (дамп) текущей базы данных, чтобы в случае возникновения непредвиденных обстоятельств (например, сбоя оборудования, кибератаки или ошибки пользователя) можно было восстановить данные до момента создания дампа. Резервные копии обеспечивают дополнительный уровень защиты и позволяют минимизировать потери данных.

Пошаговое описание реализации функции dump_db

  1. Перед созданием дампа необходимо настроить переменные окружения, чтобы обеспечить безопасный доступ к базе данных.
    • Установим переменную PGPASSWORD, которая будет использоваться для аутентификации при подключении к базе данных PostgreSQL:

      os.environ['PGPASSWORD'] = settings.DATABASE_PASSWORD
    • Команда pg_dump используется для создания дампа базы данных в формате, который можно будет использовать для восстановления. Мы указываем параметры подключения, формат вывода и другие опции:

      dump_command = [
          'pg_dump',
          '-h', settings.DATABASE_HOST,
          '-U', settings.DATABASE_USER,
          '--section', 'pre-data',
          '--section', 'data',
          '--section', 'post-data',
          '--format', 'custom',
          '--blobs',
          '-f', local_dump_file,
          settings.DATABASE_NAME
      ]
  2. Выполнение команды pg_dump и обработка результата
    • Используем subprocess.Popen для выполнения команды pg_dump и захвата вывода и ошибок:

      process = subprocess.Popen(dump_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    • Ожидаем завершения процесса и проверяем результат. Если команда завершилась с ошибкой, выбрасываем исключение:

      _, stderr = process.communicate()
      if process.returncode != 0:
          raise Exception(f"pg_dump failed: {stderr.decode()}")
      logger.info(f"Database dump completed. File saved as {local_dump_file}")
  3. Для уменьшения размера файла дампа и удобства хранения, архивируем его с помощью gzip.
    • Архивируем дамп в файл с расширением .gz:

      FileManager.archive_file(local_dump_file, local_dump_file + '.gz')
  4. Загружаем архивированный дамп в облачное хранилище MinIO (S3).
    • Генерируем уникальное имя файла, включающее текущую дату и время:

      destination_filename = 'pgsql_' + datetime.strftime(datetime.now(), "%Y.%m.%d.%H:%M") + 'UTC' + '.backup.gz'
    • Загрузка архива в S3 с помощью S3Manager.upload_to_s3

      with open(local_dump_file + '.gz', 'rb') as data:
          self.s3_manager.upload_to_s3(data, destination_filename)
  5. Удаляем локальные файлы дампа, чтобы освободить дисковое пространство:

    os.remove(local_dump_file)
    os.remove(local_dump_file + '.gz')
    logger.info(f"Local dump files {local_dump_file} and {local_dump_file}.gz removed")

Восстановление базы данных из дампа

Восстановление базы данных из резервной копии — это критически важная операция, обеспечивающая возможность восстановления данных в случае их утраты или повреждения. Основная цель функции восстановления базы данных — гарантировать целостность и актуальность данных, что необходимо для обеспечения непрерывности работы системы. Корректное восстановление данных позволяет минимизировать время простоя и избежать потерь информации.

Пошаговое описание реализации функции восстановления:

Первым шагом в процессе восстановления базы данных является получение последнего доступного дампа из S3. Это обеспечивает актуальность данных, которые будут восстановлены.

  • Метод get_latest_dump_file класса S3Manager возвращает ключ последнего дампа, хранящегося в S3:

    latest_dump_s3_key = self.s3_manager.get_latest_dump_file()
  • Загрузка последнего дампа на локальную машину для последующего восстановления:

    local_dump_file = "latest_dump.backup.gz"
    self.s3_manager.download_file_from_s3(latest_dump_s3_key, local_dump_file)

Процесс восстановления базы данных включает несколько этапов: остановка активных соединений, удаление существующей базы данных, создание новой базы данных, разархивация дампа и восстановление данных.

  • Для предотвращения конфликтов необходимо завершить все активные соединения к базе данных.
    • Используем команду pg_terminate_backend для завершения всех активных соединений к базе данных:

      terminate_command = [
          'psql', '-h', settings.DATABASE_HOST, '-p', str(settings.DATABASE_PORT), '-U', settings.DATABASE_USER, '-d', 'postgres', '-c',
          f"SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname='{settings.DATABASE_NAME}' AND pid <> pg_backend_pid();"
      ]
      terminate_process = subprocess.run(terminate_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
      if terminate_process.returncode != 0:
          logger.error(f"Terminate connections command failed with return code {terminate_process.returncode}")
          logger.error(f"stderr: {terminate_process.stderr}")
          logger.error(f"stdout: {terminate_process.stdout}")
          raise Exception(f"Terminate connections failed with return code {terminate_process.returncode}")
      logger.info("Terminated all active connections to the database")
  • После завершения всех активных соединений удаляем существующую базу данных и создаем новую пустую базу данных.
    • Удаляем существующую базу данных:

      drop_command = [
          'dropdb', '--if-exists', '-h', settings.DATABASE_HOST, '-p', str(settings.DATABASE_PORT), '--username', settings.DATABASE_USER, settings.DATABASE_NAME
      ]
      drop_process = subprocess.run(drop_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
      if drop_process.returncode != 0:
          logger.error(f"dropdb failed with return code {drop_process.returncode}")
          logger.error(f"stderr: {drop_process.stderr}")
          logger.error(f"stdout: {drop_process.stdout}")
          raise Exception(f"dropdb failed with return code {drop_process.returncode}")
      logger.info("Existing database dropped successfully")
    • Создаем новую пустую базу данных:

      create_command = [
          'createdb', '-h', settings.DATABASE_HOST, '-p', str(settings.DATABASE_PORT), '--username', settings.DATABASE_USER, settings.DATABASE_NAME
      ]
      create_process = subprocess.run(create_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
      if create_process.returncode != 0:
          logger.error(f"createdb failed with return code {create_process.returncode}")
          logger.error(f"stderr: {create_process.stderr}")
          logger.error(f"stdout: {create_process.stdout}")
          raise Exception(f"createdb failed with return code {create_process.returncode}")
      logger.info("New database created successfully")
  • После создания новой базы данных разархивируем дамп и восстанавливаем данные.
    • Использование FileManager.unarchive_file для разархивации дампа

      FileManager.unarchive_file(local_dump_file, local_dump_file.replace('.gz', ''))
    • Выполнение команды pg_restore для восстановления данных из дампа

      with open(local_dump_file.replace('.gz', ''), 'rb') as f:
          restore_command = [
              'pg_restore', '--no-owner', '--dbname',
              f'postgresql://{settings.DATABASE_USER}:{settings.DATABASE_PASSWORD}@{settings.DATABASE_HOST}:{settings.DATABASE_PORT}/{settings.DATABASE_NAME}'
          ]
          result = subprocess.run(restore_command, stdin=f, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
          if result.returncode != 0:
              logger.error(f"pg_restore failed with return code {result.returncode}")
              logger.error(f"stderr: {result.stderr}")
              logger.error(f"stdout: {result.stdout}")
              raise Exception(f"pg_restore failed with return code {result.returncode}")
      logger.info("Database restored successfully")

Полный код менеджера backup

В этом разделе мы завершим наш менеджер бэкапа базы данных, предоставив инструкции по активации команд для резервного копирования и восстановления базы данных внутри контейнеров Docker, а также обсудим использование планировщиков задач для автоматизации этих процессов.

В рамках нашего проекта мы используем Docker для контейнеризации нашего приложения. Команды для резервного копирования и восстановления базы данных можно запускать внутри контейнера с приложением FastAPI.

Для выполнения команды резервного копирования базы данных:

docker-compose exec web python cli.py dump_db

Для выполнения команды восстановления базы данных:

docker-compose exec web python cli.py restore-database

Эти команды запускают наши функции dump_db и restore_database внутри контейнера web, где развернуто приложение FastAPI.

Автоматизация процессов резервного копирования и восстановления базы данных помогает поддерживать регулярные бэкапы и быстро восстанавливать данные при необходимости. Для этих целей мы можем использовать различные планировщики задач, такие как Celery, APScheduler или встроенные функции UNIX, такие как cron.

  1. Установка APScheduler:

    pip install apscheduler
  2. Настройка APScheduler: Добавьте код для планирования задач в ваше приложение:

    from apscheduler.schedulers.background import BackgroundScheduler
    from fastapi import FastAPI
    
    def scheduled_backup():
        db_manager = DatabaseManager()
        db_manager.dump_db()
    
    scheduler = BackgroundScheduler()
    scheduler.add_job(scheduled_backup, 'cron', hour=0, minute=0)
    
    @asynccontextmanager
    async def lifespan(app: FastAPI):
        scheduler.start()
        yield
        scheduler.shutdown()
    
    app = FastAPI(lifespan=lifespan)

Мы рассмотрели процесс создания менеджера резервного копирования базы данных для FastAPI, включающий резервное копирование, восстановление данных, обработку ошибок и уведомления. Также мы описали, как запускать эти команды внутри Docker-контейнеров и автоматизировать процессы с помощью Celery и APScheduler. В результате у вас будет надёжная и масштабируемая система управления резервными копиями баз данных, обеспечивающая защиту и восстановление данных при любых обстоятельствах.

import gzip
import logging
import os
import shutil
import subprocess
import sys
from datetime import datetime

import typer
from dotenv import load_dotenv

from app.config import settings

load_dotenv()

app = typer.Typer()
logging.basicConfig(level="INFO")
logger = logging.getLogger()


class S3Manager:
    def __init__(self):
        self.client = settings.S3_CLIENT
        self.bucket = settings.MINIO_BUCKET
        self.prefix = "backup/"

    def upload_to_s3(self, source_stream, destination_filename):
        try:
            destination_filename = self.prefix + destination_filename
            self.client.upload_fileobj(source_stream, self.bucket, destination_filename)
            logger.info(f"Uploaded {destination_filename} to bucket {self.bucket}")
        except Exception as e:
            logger.error(f"Failed to upload {destination_filename} to bucket {self.bucket}: {e}")
            raise

    def get_latest_dump_file(self):
        try:
            response = self.client.list_objects_v2(Bucket=self.bucket, Prefix=self.prefix)
            if 'Contents' not in response:
                raise FileNotFoundError("No backup files found in the bucket.")
            files = sorted(response['Contents'], key=lambda x: x['LastModified'], reverse=True)
            latest_file = files[0]['Key']
            logger.info(f"Latest dump file found: {latest_file}")
            return latest_file
        except Exception as e:
            logger.error(f"Failed to get latest dump file: {e}")
            raise

    def download_file_from_s3(self, s3_key, local_path):
        try:
            self.client.download_file(self.bucket, s3_key, local_path)
            logger.info(f"Downloaded {s3_key} to {local_path}")
        except Exception as e:
            logger.error(f"Failed to download {s3_key} to {local_path}: {e}")
            raise

    def delete_old_dumps(self, days=5):
        try:
            response = self.client.list_objects_v2(Bucket=self.bucket, Prefix=self.prefix)
            if 'Contents' not in response:
                logger.info("No backup files found in the bucket.")
                return

            now = datetime.now(timezone.utc)
            cutoff_date = now - timedelta(days=days)

            for obj in response['Contents']:
                last_modified = obj['LastModified']
                if last_modified < cutoff_date:
                    self.client.delete_object(Bucket=self.bucket, Key=obj['Key'])
                    logger.info(f"Deleted {obj['Key']} from bucket {self.bucket}")
        except Exception as e:
            logger.error(f"Failed to delete old dumps: {e}")
            raise


class FileManager:
    @staticmethod
    def archive_file(source_file, dest_file):
        try:
            with open(source_file, 'rb') as f_in:
                with gzip.open(dest_file, 'wb') as f_out:
                    shutil.copyfileobj(f_in, f_out)
            logger.info(f"Archived {source_file} to {dest_file}")
        except Exception as e:
            logger.error(f"Failed to archive {source_file} to {dest_file}: {e}")
            raise

    @staticmethod
    def unarchive_file(source_file, dest_file):
        try:
            with gzip.open(source_file, 'rb') as f_in:
                with open(dest_file, 'wb') as f_out:
                    shutil.copyfileobj(f_in, f_out)
            logger.info(f"Unarchived {source_file} to {dest_file}")
        except Exception as e:
            logger.error(f"Failed to unarchive {source_file} to {dest_file}: {e}")
            raise


class DatabaseManager:
    def __init__(self):
        self.s3_manager = S3Manager()

    def dump_db(self):
        os.environ['PGPASSWORD'] = settings.DATABASE_PASSWORD
        local_dump_file = 'latest_dump.backup'
        try:
            dump_command = [
                'pg_dump',
                '-h', settings.DATABASE_HOST,
                '-U', settings.DATABASE_USER,
                '--section', 'pre-data',
                '--section', 'data',
                '--section', 'post-data',
                '--format', 'custom',
                '--blobs',
                '-f', local_dump_file,
                settings.DATABASE_NAME
            ]
            process = subprocess.Popen(dump_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
            _, stderr = process.communicate()
            if process.returncode != 0:
                raise Exception(f"pg_dump failed: {stderr.decode()}")

            logger.info(f"Database dump completed. File saved as {local_dump_file}")

            FileManager.archive_file(local_dump_file, local_dump_file + '.gz')

            destination_filename = 'pgsql_' + datetime.strftime(datetime.now(), "%Y.%m.%d.%H:%M") + 'UTC' + '.backup.gz'
            with open(local_dump_file + '.gz', 'rb') as data:
                self.s3_manager.upload_to_s3(data, destination_filename)

            logger.info(f"Local dump files {local_dump_file} and {local_dump_file}.gz removed")

            self.s3_manager.delete_old_dumps()
        except Exception as e:
            logger.error(f"An error occurred during dump_db: {e}")
            raise
        finally:
            if os.path.exists(local_dump_file):
                os.remove(local_dump_file)
            if os.path.exists(local_dump_file + '.gz'):
                os.remove(local_dump_file + '.gz')

    def terminate_db_connections(self):
        try:
            terminate_command = [
                'psql', '-h', settings.DATABASE_HOST, '-p', str(settings.DATABASE_PORT), '-U', settings.DATABASE_USER, '-d', 'postgres', '-c',
                f"SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname='{settings.DATABASE_NAME}' AND pid <> pg_backend_pid();"
            ]
            terminate_process = subprocess.run(terminate_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
            if terminate_process.returncode != 0:
                logger.error(f"Terminate connections command failed with return code {terminate_process.returncode}")
                logger.error(f"stderr: {terminate_process.stderr}")
                logger.error(f"stdout: {terminate_process.stdout}")
                raise Exception(f"Terminate connections failed with return code {terminate_process.returncode}")
            logger.info("Terminated all active connections to the database")
        except Exception as e:
            logger.error(f"An error occurred during terminate_db_connections: {e}")
            raise

    def drop_and_create_db(self):
        try:
            drop_command = [
                'dropdb', '--if-exists', '-h', settings.DATABASE_HOST, '-p', str(
                    settings.DATABASE_PORT), '--username', settings.DATABASE_USER, settings.DATABASE_NAME
            ]
            drop_process = subprocess.run(
                drop_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
            if drop_process.returncode != 0:
                logger.error(f"dropdb failed with return code {drop_process.returncode}")
                logger.error(f"stderr: {drop_process.stderr}")
                logger.error(f"stdout: {drop_process.stdout}")
                raise Exception(f"dropdb failed with return code {drop_process.returncode}")
            logger.info("Existing database dropped successfully")

            create_command = [
                'createdb', '-h', settings.DATABASE_HOST, '-p', str(settings.DATABASE_PORT), '--username', settings.DATABASE_USER, settings.DATABASE_NAME
            ]
            create_process = subprocess.run(create_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
            if create_process.returncode != 0:
                logger.error(f"createdb failed with return code {create_process.returncode}")
                logger.error(f"stderr: {create_process.stderr}")
                logger.error(f"stdout: {create_process.stdout}")
                raise Exception(f"createdb failed with return code {create_process.returncode}")
            logger.info("New database created successfully")
        except Exception as e:
            logger.error(f"An error occurred during drop_and_create_db: {e}")
            raise

    def restore_db(self, dump_file):
        os.environ['PGPASSWORD'] = settings.DATABASE_PASSWORD
        try:
            if not os.path.exists(dump_file):
                raise FileNotFoundError(f"The file {dump_file} does not exist")

            self.terminate_db_connections()
            self.drop_and_create_db()

            FileManager.unarchive_file(dump_file, dump_file.replace('.gz', ''))

            with open(dump_file.replace('.gz', ''), 'rb') as f:
                restore_command = [
                    'pg_restore', '--no-owner', '--dbname',
                    f'postgresql://{settings.DATABASE_USER}:{settings.DATABASE_PASSWORD}@{settings.DATABASE_HOST}:{settings.DATABASE_PORT}/{settings.DATABASE_NAME}'
                ]
                result = subprocess.run(restore_command, stdin=f, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
                if result.returncode != 0:
                    logger.error(f"pg_restore failed with return code {result.returncode}")
                    logger.error(f"stderr: {result.stderr}")
                    logger.error(f"stdout: {result.stdout}")
                    raise Exception(f"pg_restore failed with return code {result.returncode}")
        except Exception as e:
            logger.error(f"An error occurred during restore_db: {e}")
            raise
        finally:
            if os.path.exists(dump_file.replace('.gz', '')):
                os.remove(dump_file.replace('.gz', ''))

    def restore_database(self):
        local_dump_file = "latest_dump.backup.gz"
        try:
            latest_dump_s3_key = self.s3_manager.get_latest_dump_file()
            self.s3_manager.download_file_from_s3(latest_dump_s3_key, local_dump_file)

            if not os.path.exists(local_dump_file):
                raise FileNotFoundError(f"The downloaded file {local_dump_file} does not exist")

            self.restore_db(local_dump_file)
            logger.info(f"Database restored successfully from {latest_dump_s3_key}")
        except Exception as e:
            logger.error(f"An error occurred during restore_database: {e}")
            raise
        finally:
            if os.path.exists(local_dump_file):
                os.remove(local_dump_file)


if __name__ == "__main__":
    db_manager = DatabaseManager()
    if len(sys.argv) > 1 and sys.argv[1] == 'restore_database':
        try:
            db_manager.restore_database()
        except Exception as e:
            logger.error(f"Failed to restore database: {e}")
            sys.exit(1)
    elif len(sys.argv) > 1 and sys.argv[1] == 'dump_db':
        try:
            db_manager.dump_db()
        except Exception as e:
            logger.error(f"Failed to dump database: {e}")
            sys.exit(1)
    else:
        app()

Этот код включает классы S3Manager, FileManager, DatabaseManager, а также основные функции для резервного копирования и восстановления базы данных, подробно описанные в предыдущих разделах.


Читайте также:

ChatGPT
Eva
💫 Eva assistant