Разработка слоя репозиториев web-приложения
Вам необходимо вручную создать SQL-таблицы, отражающие структуру социальной сети GopherTalk. Ниже описаны таблицы, их поля и связи между ними.
- Таблица
users
— хранит данные пользователей. - Таблица
posts
— хранит публикации пользователей. Полеreply_to_id
указывает на другой пост, если это ответ. - Таблица
likes
— отображает лайки пользователей к постам. - Таблица
views
— отображает просмотры постов пользователями.
Требования:
- Используйте типы данных и ограничения согласно описанию.
- Настройте первичные и внешние ключи.
- Создайте уникальный индекс по
user_name
, но только для не удалённых пользователей (deleted_at IS NULL
). - Убедитесь, что
status
может быть только0
или1
.
Подсказка: после создания таблиц, проверьте схему с помощью ER-диаграммы, чтобы убедиться в корректности связей.
Архитектура приложения: контроллеры, сервисы и репозитории
Когда приложение начинает расти, добавляется всё больше бизнес-логики, валидации, работы с базой данных — и код быстро превращается в нечитаемую "кашу". Чтобы этого избежать, используется разделение ответственности — принцип, при котором каждый компонент отвечает только за свою задачу.
В небольших веб-приложениях удобно придерживаться следующей архитектуры:
1. Контроллеры (controllers)
Контроллер — это слой, который принимает HTTP-запрос, обрабатывает его и возвращает ответ. Здесь происходит:
- чтение параметров из объектов
Request
,Path
,Query
,Body
илиDepends
FastAPI;, - вызов нужного метода сервиса,
- формирование ответа (через
Response
,JSONResponse
или возврат словаря).
Контроллер не содержит бизнес-логики и не обращается напрямую к базе данных — он просто управляет потоком данных. Кроме того, на уровне контроллера решаются вопросы по разграничению доступа к ресурсам и фильтрации запросов.
2. Сервисы (services)
Сервис — это слой, где находится основная бизнес-логика приложения. Он:
- обрабатывает данные,
- проверяет условия (например, "пользователь уже существует"),
- вызывает репозиторий для доступа к базе.
Сервис ничего не знает про HTTP или FastAPI — он универсален и может использоваться как в HTTP-приложении, так и, например, в CLI-утилите или фоновом скрипте.
3. Репозитории (repositories)
Репозиторий — это слой, отвечающий за доступ к данным. Обычно здесь хранятся SQL-запросы.
Сервис говорит: "дай мне пользователя по id", а репозиторий выполняет конкретный SQL-запрос и возвращает результат.
Такой подход позволяет:
- изолировать работу с базой,
- легче писать и запускать юнит-тесты,
- менять способ хранения данных (например, заменить PostgreSQL на MongoDB) с минимальными изменениями.
Преимущества архитектуры:
- Код становится чище, понятнее и масштабируемее;
- Каждый слой можно тестировать отдельно;
- Упрощается командная разработка — каждый работает в своей зоне ответственности;
- Легче поддерживать и расширять приложение в будущем.
В соответсвии с архитектурой мы построим разработку следующим образом: сначала разработаем слой репозитория, затем слой сервисов и в конце слой контроллеров. Для каждого слоя напишем Вам будут предоставлены unit-тесты для проверки корректности разработки конкретного слоя.
Разработка репозитория пользователей
На этом этапе мы реализуем слой работы с базой данных — репозиторий пользователей.
Задача этого слоя — обеспечивать сохранение, получение, обновление и удаление данных пользователей без участия бизнес-логики или HTTP-контроллеров.
Репозиторий будет включать методы:
- добавления нового пользователя,
- получения всех пользователей с пагинацией,
- поиска пользователя по
id
и поuser_name
, - обновления данных пользователя,
- мягкого удаления пользователя.
Мы начнем с самого простого метода — create_user
, который сохраняет нового пользователя в таблице users
.
Затем реализуем остальные методы и подключим unit-тесты для проверки корректности.
В папке src
проекта создайте папку repositories
, а в ней файл user_repository.py
. Поместите в него следующий код:
from config.db import pool
from psycopg.rows import dict_row
def create_user(dto: dict) -> dict:
query = """
INSERT INTO users (user_name, first_name, last_name, password_hash)
VALUES (%s, %s, %s, %s)
RETURNING id, user_name, password_hash, status;
"""
values = (
dto["user_name"],
dto["first_name"],
dto["last_name"],
dto["password_hash"],
)
with pool.connection() as conn:
with conn.cursor(row_factory=dict_row) as cur:
cur.execute(query, values)
return cur.fetchone()
Этот код реализует метод create_user
, который отвечает за добавление нового пользователя в базу данных.
Пошаговый разбор
Импорт подключения к базе данных:
pythonfrom config.db import pool
Здесь мы импортируем заранее настроенный пул соединений с PostgreSQL, созданный с помощью
psycopg_pool.ConnectionPool
в модулеconfig.db
. Через этот пул мы получаем подключения к базе и повторно их используем — без лишнего overhead'а на открытие новых TCP-сессий.Импорт
dict_row
для словарной формы результата:pythonfrom psycopg.rows import dict_row
dict_row
— это специальный row factory, который говоритpsycopg
возвращать строки не в виде кортежей, а в виде словарей (dict
). Это удобно и читабельно: вместоrow[0]
можно писатьrow["user_name"]
.Определение функции
create_user
:pythondef create_user(dto: dict) -> dict:
Функция принимает словарь
dto
, содержащий данные нового пользователя:user_name
,first_name
,last_name
,password_hash
. Тип возвращаемого значения —dict
, то есть мы возвращаем строку из базы в виде словаря.SQL-запрос на вставку
pythonquery = """ INSERT INTO users (user_name, first_name, last_name, password_hash) VALUES (%s, %s, %s, %s) RETURNING id, user_name, password_hash, status; """
Это SQL-запрос, который вставляет нового пользователя в таблицу
users
, использует позиционные параметры%s
— безопасный способ подстановки значений и сразу возвращаетid
,user_name
,password_hash
,status
— это нужно, чтобы после создания пользователя отдать его данные в API.SQL-инъекции
SQL-инъекция — это один из самых распространённых видов атак на базу данных. Она возникает, когда ввод пользователя напрямую вставляется в SQL-запрос без проверки и экранирования, что позволяет злоумышленнику изменить логику запроса.
Пример уязвимого кода:
pythoncur.execute(f"SELECT * FROM users WHERE user_name = '{user_input}'")
Вместо ожидаемого безопасного значения, пользователь ввёл строку
' OR 1=1 --
.В результате итоговый SQL-запрос будет выглядеть так:
sqlSELECT * FROM users WHERE user_name = '' OR 1=1 --';
Что здесь происходит:
user_name = ''
— первое условие, оно просто проверяет, что имя пользователя пустое;OR 1=1
— логическое выражение, которое всегда истинно, то есть условие выполняется для всех пользователей;--
— начало SQL-комментария, всё, что идёт после него, игнорируется СУБД;';
— эта часть уже не исполняется, так как закомментирована.
Этот запрос вернёт всех пользователей из базы, потому что
1=1
всегда истинно. Если такой запрос используется при входе в систему, злоумышленник может войти без пароля, просто потому что запрос "обманывает" проверку логина.Используя позиционные параметры, мы избегаем этой проблемы:
pythoncur.execute("SELECT * FROM users WHERE user_name = %s", (user_input,))
В случае использования позиционных параметров, даже если пользователь введёт
' OR 1=1 --
, это не приведёт к SQL-инъекции, потому что ввод не вставляется напрямую в текст SQL-запроса. Вместо этого он передаётся отдельно в виде значения, а не как часть кода, а на уровне драйвера PostgreSQL (psycopg
) реализован механизм, который:экранирует специальные символы,
оборачивает значение в кавычки при необходимости,
и гарантирует, что ввод будет интерпретироваться именно как строка, а не как SQL-операторы.
Проще говоря, драйвер сам "разделяет" SQL-код и пользовательские данные, не давая последним повлиять на логику выполнения запроса.
Поэтому даже вредоносная строка будет просто передана как обычное значение поля
user_name
, а не как часть SQL-запроса.Подготовка значений для запроса:
pythonvalues = ( dto["user_name"], dto["first_name"], dto["last_name"], dto["password_hash"], )
Значения берутся из входного объекта
dto
и передаются в том порядке, в котором указаны в SQL-запросе.Выполнение запроса
pythonwith pool.connection() as conn: with conn.cursor(row_factory=dict_row) as cur: cur.execute(query, values) return cur.fetchone()
pool.connection()
берёт соединение из пула,conn.cursor(row_factory=dict_row)
создаёт курсор, возвращающий строки как словари,cur.execute(...)
выполняет SQL-запрос, аcur.fetchone()
возвращает первую строку результата, то есть только что созданного пользователя.После выполнения запроса возвращается первая (и единственная) строка результата — то есть данные только что созданного пользователя.
Мы реализовали создание пользователя. Далее необходимо реализовать следующие функции работы с пользователями:
get_all_users
— получение списка всех пользователей с пагинацией;get_user_by_id
— получение пользователя по егоid
;get_user_by_username
— получение пользователя по его имени пользователя (user_name
);update_user
— обновление данных пользователя;delete_user
— мягкое удаление пользователя.
Начнём с функции get_all_users
.
from config.db import pool
from psycopg.rows import dict_row
def create_user(dto: dict) -> dict:
query = """
INSERT INTO users (user_name, first_name, last_name, password_hash)
VALUES (%s, %s, %s, %s)
RETURNING id, user_name, password_hash, status;
"""
values = (
dto["user_name"],
dto["first_name"],
dto["last_name"],
dto["password_hash"],
)
with pool.connection() as conn:
with conn.cursor(row_factory=dict_row) as cur:
cur.execute(query, values)
return cur.fetchone()
def get_all_users(limit: int, offset: int) -> list[dict]:
query = """
SELECT id, user_name, first_name, last_name, status, created_at, updated_at
FROM users
WHERE deleted_at IS NULL
OFFSET %s LIMIT %s;
"""
params = (offset, limit)
with pool.connection() as conn:
with conn.cursor(row_factory=dict_row) as cur:
cur.execute(query, params)
return cur.fetchall()
Обратите внимание, что метод принимает два параметра - offset
и limit
. Они необходимы для того, чтобы сделать пагинацию, то есть отдавать не всех пользователей сразу, а частями в рамках скользящего окна.
Перейдем к функциям get_user_by_id
и get_user_by_user_name
.
from config.db import pool
from psycopg.rows import dict_row
def create_user(dto: dict) -> dict:
query = """
INSERT INTO users (user_name, first_name, last_name, password_hash)
VALUES (%s, %s, %s, %s)
RETURNING id, user_name, password_hash, status;
"""
values = (
dto["user_name"],
dto["first_name"],
dto["last_name"],
dto["password_hash"],
)
with pool.connection() as conn:
with conn.cursor(row_factory=dict_row) as cur:
cur.execute(query, values)
return cur.fetchone()
def get_all_users(limit: int, offset: int) -> list[dict]:
query = """
SELECT id, user_name, first_name, last_name, status, created_at, updated_at
FROM users
WHERE deleted_at IS NULL
OFFSET %s LIMIT %s;
"""
params = (offset, limit)
with pool.connection() as conn:
with conn.cursor(row_factory=dict_row) as cur:
cur.execute(query, params)
return cur.fetchall()
def get_user_by_id(user_id: int) -> dict:
query = """
...
"""
with pool.connection() as conn:
with conn.cursor(row_factory=dict_row) as cur:
cur.execute(query, (user_id,))
result = cur.fetchone()
if result is None:
raise ValueError("User not found")
return result
def get_user_by_username(user_name: str) -> dict:
query = """
...
"""
with pool.connection() as conn:
with conn.cursor(row_factory=dict_row) as cur:
cur.execute(query, (user_name,))
result = cur.fetchone()
if result is None:
raise ValueError("User not found")
return result
Задание
Напишите самостоятельно SQL запросы для функций get_user_by_id
и get_user_by_user_name
. Для метода get_user_by_id
необходимо вернуть поля id
, user_name
, first_name
, last_name
, status
, created_at
, updated_at
, а для метода get_user_by_user_name
- id
, user_name
, password_hash
, status
.
Рассмотрим функцию update_user
def update_user(user_id: int, dto: dict) -> dict:
fields = []
values = []
if "password_hash" in dto:
fields.append("password_hash = %s")
values.append(dto["password_hash"])
if "user_name" in dto:
fields.append("user_name = %s")
values.append(dto["user_name"])
if "first_name" in dto:
fields.append("first_name = %s")
values.append(dto["first_name"])
if "last_name" in dto:
fields.append("last_name = %s")
values.append(dto["last_name"])
if not fields:
raise ValueError("No fields to update")
fields.append("updated_at = NOW()")
values.append(user_id)
query = f"""
UPDATE users
SET {", ".join(fields)}
WHERE id = %s AND deleted_at IS NULL
RETURNING id, user_name, first_name, last_name, status, created_at, updated_at;
"""
with pool.connection() as conn:
with conn.cursor(row_factory=dict_row) as cur:
cur.execute(query, values)
result = cur.fetchone()
if result is None:
raise ValueError("User not found")
return result
Эта функция предназначена для обновления данных пользователя в базе данных. Она принимает два аргумента:
id
: Идентификатор пользователя, которого необходимо обновить.dto
: Объект, содержащий данные для обновления.
Логика работы функции update_user
Инициализация:
- Создаются два списка:
fields
— содержит SQL-строки видаcolumn_name = %s
;values
— содержит значения, которые будут подставлены в SQL-запрос.
- Переменная
user_id
передаётся отдельно в конце, для подстановки вWHERE
.
- Создаются два списка:
Проверка и сбор полей для обновления:
- Функция последовательно проверяет наличие полей в словаре
dto
. - Если поле присутствует, оно добавляется в
fields
, а его значение — вvalues
. - Поддерживаются следующие поля:
password_hash
user_name
first_name
last_name
- Функция последовательно проверяет наличие полей в словаре
Проверка на пустое обновление:
- Если в
dto
не оказалось ни одного обновляемого поля, выбрасывается исключениеValueError("No fields to update")
.
- Если в
Добавление метки времени:
- В
fields
добавляетсяupdated_at = NOW()
— это системное поле, не требующее подстановки.
- В
Формирование SQL-запроса:
- Поля в
fields
объединяются через запятую. - Генерируется SQL-запрос вида:sql
UPDATE users SET field1 = %s, field2 = %s, ..., updated_at = NOW() WHERE id = %s AND deleted_at IS NULL RETURNING id, user_name, first_name, last_name, status, created_at, updated_at;
- Поля в
Добавление ID в параметры запроса:
- В
values
добавляетсяuser_id
как последний аргумент, который используется вWHERE id = %s
.
- В
Выполнение запроса:
- Открывается соединение с базой данных из пула.
- Выполняется SQL-запрос через
cursor.execute(...)
с подставленными значениями. - Используется
row_factory=dict_row
для возврата результата в виде словаря.
Обработка результата:
- Если пользователь не найден (
fetchone()
вернулNone
), выбрасываетсяValueError("User not found")
. - В противном случае возвращается результат — обновлённый пользователь в виде словаря.
- Если пользователь не найден (
Последняя функция, которую мы реализуем в этом репозитории - это функция удаления пользователя delete_user
.
def delete_user(user_id: int) -> None:
query = """
...
"""
with pool.connection() as conn:
with conn.cursor() as cur:
cur.execute(query, (user_id,))
if cur.rowcount == 0:
raise ValueError("User not found")
Эта функция предназначена для "удаления" пользователя из базы данных. Фактически, это мягкое удаление (soft delete), когда запись не удаляется физически, а лишь помечается как удалённая.
Задание
Напишите SQL-запрос, который выполняет мягкое удаление пользователя, устанавливая значение deleted_at
в текущее время для пользователя с указанным id
.
Тестирование репозитория пользователей
В корне проекта создайте файл pytest.ini
со следующим содержимым:
[pytest]
pythonpath = ./src
Также в корне проекта создайте папку __tests__
, а в ней папку repositories
. В папке repositories
создайте файл test_user_repository.py
и поместите в него код с unit-тестами:
Unit-тесты user_repository
import pytest
from unittest.mock import patch, MagicMock
from datetime import datetime, timedelta
from repositories.user_repository import (
create_user,
get_all_users,
get_user_by_id,
get_user_by_username,
update_user,
delete_user,
)
def normalize_sql(sql: str) -> str:
return " ".join(sql.lower().split())
@pytest.fixture
def mock_conn():
with patch("config.db.pool.connection") as mock_conn_context:
yield mock_conn_context
def test_create_user_success(mock_conn):
dto = {
"user_name": "john",
"first_name": "John",
"last_name": "Doe",
"password_hash": "password",
}
expected = {
"id": 1,
"user_name": "john",
"password_hash": "password",
"status": 1,
}
mock_cursor = MagicMock()
mock_cursor.fetchone.return_value = expected
mock_conn.return_value.__enter__.return_value.cursor.return_value.__enter__.return_value = mock_cursor
result = create_user(dto)
assert result == expected
def test_create_user_error(mock_conn):
dto = {
"user_name": "john",
"first_name": "John",
"last_name": "Doe",
"password_hash": "password",
}
fake_error = Exception("insert failed")
mock_cursor = MagicMock()
mock_cursor.execute.side_effect = fake_error
mock_conn.return_value.__enter__.return_value.cursor.return_value.__enter__.return_value = mock_cursor
with pytest.raises(Exception, match="insert failed"):
create_user(dto)
sql_called = mock_cursor.execute.call_args[0][0].lower()
assert "insert into users" in sql_called
assert "user_name" in sql_called
assert "password_hash" in sql_called
params = mock_cursor.execute.call_args[0][1]
assert params == (
dto["user_name"],
dto["first_name"],
dto["last_name"],
dto["password_hash"],
)
def test_get_all_users_success(mock_conn):
now = datetime.utcnow()
expected = [{
"id": 1,
"user_name": "john",
"first_name": "John",
"last_name": "Doe",
"status": 1,
"created_at": now,
"updated_at": now,
}]
mock_cursor = MagicMock()
mock_cursor.fetchall.return_value = expected
mock_conn.return_value.__enter__.return_value.cursor.return_value.__enter__.return_value = mock_cursor
result = get_all_users(limit=10, offset=0)
assert result == expected
def test_get_all_users_error(mock_conn):
mock_cursor = MagicMock()
mock_cursor.execute.side_effect = Exception("SQL error")
mock_conn.return_value.__enter__.return_value.cursor.return_value.__enter__.return_value = mock_cursor
with pytest.raises(Exception, match="SQL error"):
get_all_users(limit=100, offset=0)
sql_called = mock_cursor.execute.call_args[0][0].lower()
normalized_sql = normalize_sql(sql_called)
assert "from users where deleted_at is null" in normalized_sql
params = mock_cursor.execute.call_args[0][1]
assert params == (0, 100)
def test_get_user_by_id_success(mock_conn):
now = datetime.utcnow()
expected = {
"user_name": "john",
"first_name": "John",
"last_name": "Doe",
"status": 1,
"created_at": now,
"updated_at": now,
}
mock_cursor = MagicMock()
mock_cursor.fetchone.return_value = expected
mock_conn.return_value.__enter__.return_value.cursor.return_value.__enter__.return_value = mock_cursor
result = get_user_by_id(1)
assert result == expected
def test_get_user_by_id_not_found(mock_conn):
mock_cursor = MagicMock()
mock_cursor.fetchone.return_value = None
mock_conn.return_value.__enter__.return_value.cursor.return_value.__enter__.return_value = mock_cursor
with pytest.raises(ValueError, match="User not found"):
get_user_by_id(999)
def test_get_user_by_username_success(mock_conn):
expected = {
"id": 1,
"user_name": "john",
"password_hash": "password",
"status": 1,
}
mock_cursor = MagicMock()
mock_cursor.fetchone.return_value = expected
mock_conn.return_value.__enter__.return_value.cursor.return_value.__enter__.return_value = mock_cursor
result = get_user_by_username("john")
assert result == expected
def test_get_user_by_username_not_found(mock_conn):
mock_cursor = MagicMock()
mock_cursor.fetchone.return_value = None
mock_conn.return_value.__enter__.return_value.cursor.return_value.__enter__.return_value = mock_cursor
with pytest.raises(ValueError, match="User not found"):
get_user_by_username("notfound")
def test_update_user_success(mock_conn):
now = datetime.utcnow()
earlier = now - timedelta(hours=1)
dto = {
"user_name": "john_updated",
"first_name": "John",
"last_name": "Doe",
"password_hash": "password",
}
expected = {
"id": 1,
"user_name": "john_updated",
"first_name": "John",
"last_name": "Doe",
"status": 1,
"created_at": earlier,
"updated_at": now,
}
mock_cursor = MagicMock()
mock_cursor.fetchone.return_value = expected
mock_conn.return_value.__enter__.return_value.cursor.return_value.__enter__.return_value = mock_cursor
result = update_user(1, dto)
assert result == expected
def test_update_user_no_fields(mock_conn):
with pytest.raises(ValueError, match="No fields to update"):
update_user(1, {})
def test_update_user_not_found(mock_conn):
dto = {"user_name": "ghost"}
mock_cursor = MagicMock()
mock_cursor.fetchone.return_value = None
mock_conn.return_value.__enter__.return_value.cursor.return_value.__enter__.return_value = mock_cursor
with pytest.raises(ValueError, match="User not found"):
update_user(999, dto)
def test_delete_user_success(mock_conn):
mock_cursor = MagicMock()
mock_cursor.rowcount = 1
mock_conn.return_value.__enter__.return_value.cursor.return_value.__enter__.return_value = mock_cursor
assert delete_user(1) is None
def test_delete_user_not_found(mock_conn):
mock_cursor = MagicMock()
mock_cursor.rowcount = 0
mock_conn.return_value.__enter__.return_value.cursor.return_value.__enter__.return_value = mock_cursor
with pytest.raises(ValueError, match="User not found"):
delete_user(2)
После этого выполните команду
pytest-v
Если вы все сделали правильно, все тесты пройдены.
tests/repositories/test_user_repository.py::test_create_user_success PASSED [ 7%]
tests/repositories/test_user_repository.py::test_create_user_error PASSED [ 15%]
tests/repositories/test_user_repository.py::test_get_all_users_success PASSED [ 23%]
tests/repositories/test_user_repository.py::test_get_all_users_error PASSED [ 30%]
tests/repositories/test_user_repository.py::test_get_user_by_id_success PASSED [ 38%]
tests/repositories/test_user_repository.py::test_get_user_by_id_not_found PASSED [ 46%]
tests/repositories/test_user_repository.py::test_get_user_by_username_success PASSED [ 53%]
tests/repositories/test_user_repository.py::test_get_user_by_username_not_found PASSED [ 61%]
tests/repositories/test_user_repository.py::test_update_user_success PASSED [ 69%]
tests/repositories/test_user_repository.py::test_update_user_no_fields PASSED [ 76%]
tests/repositories/test_user_repository.py::test_update_user_not_found PASSED [ 84%]
tests/repositories/test_user_repository.py::test_delete_user_success PASSED [ 92%]
tests/repositories/test_user_repository.py::test_delete_user_not_found PASSED [100%]
=================================================================== 13 passed in 0.12s ====================================================================
Разработка репозитория постов
На этом этапе мы реализуем репозиторий постов — слой, отвечающий за взаимодействие с таблицей posts
, а также связанными с ней таблицами likes
, views
и вложенными ответами (реплаями).
Репозиторий постов будет включать следующие методы:
- создание нового поста (
create_post
); - получение списка постов с фильтрацией и пагинацией (
get_all_posts
); - получение одного поста по
id
, включая автора, количество лайков, просмотров и ответов (get_post_by_id
); - удаление поста владельцем (
delete_post
); - отметка, что пользователь просмотрел пост (
view_post
); - лайк/дизлайк поста (
like_post
,dislike_post
).
Мы начнем с реализации функции create_post
, затем последовательно опишем остальные. Все методы взаимодействуют с базой через SQL-запросы, используют подстановки для защиты от SQL-инъекций и возвращают данные в формате DTO.
Создайте файл src/repositories/post_repository.py
, в него поместите следующий код:
from config.db import pool
from psycopg.rows import dict_row
def create_post(dto: dict) -> dict:
query = """
...
"""
values = (
dto["text"],
dto["user_id"],
dto.get("reply_to_id"),
)
with pool.connection() as conn:
with conn.cursor(row_factory=dict_row) as cur:
cur.execute(query, values)
return cur.fetchone()
Пояснение:
dto
— словарь, содержащий данные нового поста (text
,user_id
,reply_to_id
);SQL-запрос вставляет данные в таблицу posts;
После вставки сразу возвращаются поля нового поста:
id
,text
,created_at
,reply_to_id
.
Задание
В соответствии с пояснением напишите SQL запрос для добавления нового поста. Не забудьте использовать позиционные параметры для предотвращения SQL-инъекций
Функция get_all_posts
возвращает список постов с расширенной информацией: количество лайков, просмотров, ответов, а также информацию о пользователе и отметках "нравится" и "просмотрено" от текущего пользователя.
def get_all_posts(dto: dict) -> list[dict]:
params = [dto["user_id"]]
query = """
WITH likes_count AS (
SELECT post_id, COUNT(*) AS likes_count
FROM likes GROUP BY post_id
),
views_count AS (
SELECT post_id, COUNT(*) AS views_count
FROM views GROUP BY post_id
),
replies_count AS (
SELECT reply_to_id, COUNT(*) AS replies_count
FROM posts WHERE reply_to_id IS NOT NULL GROUP BY reply_to_id
)
SELECT
p.id, p.text, p.reply_to_id, p.created_at,
u.id AS user_id, u.user_name, u.first_name, u.last_name,
COALESCE(lc.likes_count, 0) AS likes_count,
COALESCE(vc.views_count, 0) AS views_count,
COALESCE(rc.replies_count, 0) AS replies_count,
CASE WHEN l.user_id IS NOT NULL THEN true ELSE false END AS user_liked,
CASE WHEN v.user_id IS NOT NULL THEN true ELSE false END AS user_viewed
FROM posts p
JOIN users u ON p.user_id = u.id
LEFT JOIN likes_count lc ON p.id = lc.post_id
LEFT JOIN views_count vc ON p.id = vc.post_id
LEFT JOIN replies_count rc ON p.id = rc.reply_to_id
LEFT JOIN likes l ON l.post_id = p.id AND l.user_id = %s
LEFT JOIN views v ON v.post_id = p.id AND v.user_id = %s
WHERE p.deleted_at IS NULL
"""
if "search" in dto and dto["search"]:
query += f" AND p.text ILIKE %s"
params.append(f"%{dto['search']}%")
if "owner_id" in dto and dto["owner_id"]:
query += f" AND p.user_id = %s"
params.append(dto["owner_id"])
if "reply_to_id" in dto and dto["reply_to_id"]:
query += f" AND p.reply_to_id = %s ORDER BY p.created_at ASC"
params.append(dto["reply_to_id"])
else:
query += " AND p.reply_to_id IS NULL ORDER BY p.created_at DESC"
query += " OFFSET %s LIMIT %s"
params.extend([dto["offset"], dto["limit"]])
with pool.connection() as conn:
with conn.cursor(row_factory=dict_row) as cur:
cur.execute(query, params)
rows = cur.fetchall()
return [
{
"id": row["id"],
"text": row["text"],
"reply_to_id": row["reply_to_id"],
"created_at": row["created_at"],
"likes_count": row["likes_count"],
"views_count": row["views_count"],
"replies_count": row["replies_count"],
"user_liked": row["user_liked"],
"user_viewed": row["user_viewed"],
"user": {
"id": row["user_id"],
"user_name": row["user_name"],
"first_name": row["first_name"],
"last_name": row["last_name"],
},
}
for row in rows
]
Метод get_all_posts
предназначен для получения списка публикаций с расширенной информацией:
автор поста;
количество лайков, просмотров, ответов;
флаги, лайкнул и/или просматривал ли текущий пользователь этот пост.
Структура SQL-запроса
Запрос построен с использованием CTE (Common Table Expressions) и выглядит следующим образом:
WITH likes_count AS (...),
views_count AS (...),
replies_count AS (...)
SELECT ...
FROM posts ...
Рассмотрим все части по порядку.
1. Подсчёт количества лайков к каждому посту
likes_count AS (
SELECT post_id, COUNT(*) AS likes_count
FROM likes
GROUP BY post_id
)
Здесь из таблицы likes
собирается информация о количестве лайков для каждого поста. Используется GROUP BY post_id
, чтобы сгруппировать лайки по постам.
2. Подсчёт количества просмотров
views_count AS (
SELECT post_id, COUNT(*) AS views_count
FROM views
GROUP BY post_id
)
Аналогично первой CTE, но теперь считаются просмотры из таблицы views
.
3. Подсчёт количества ответов на каждый пост
replies_count AS (
SELECT reply_to_id, COUNT(*) AS replies_count
FROM posts
WHERE reply_to_id IS NOT NULL
GROUP BY reply_to_id
)
Здесь из самой таблицы posts
выбираются те строки, где reply_to_id IS NOT NULL
, то есть это ответы на другие посты. Считается, сколько таких ответов у каждого родительского поста.
4. Основной запрос
SELECT
p.id, p.text, p.reply_to_id, p.created_at,
u.id AS user_id, u.user_name, u.first_name, u.last_name,
COALESCE(lc.likes_count, 0) AS likes_count,
COALESCE(vc.views_count, 0) AS views_count,
COALESCE(rc.replies_count, 0) AS replies_count,
CASE WHEN l.user_id IS NOT NULL THEN true ELSE false END AS user_liked,
CASE WHEN v.user_id IS NOT NULL THEN true ELSE false END AS user_viewed
FROM posts p
JOIN users u ON p.user_id = u.id
LEFT JOIN likes_count lc ON p.id = lc.post_id
LEFT JOIN views_count vc ON p.id = vc.post_id
LEFT JOIN replies_count rc ON p.id = rc.reply_to_id
LEFT JOIN likes l ON l.post_id = p.id AND l.user_id = $1
LEFT JOIN views v ON v.post_id = p.id AND v.user_id = $1
WHERE p.deleted_at IS NULL
...
Что здесь происходит:
JOIN users
— соединение поста с его автором поuser_id
;LEFT JOIN
сlikes_count
,views_count
,replies_count
— добавляются данные из CTE о количестве лайков, просмотров и ответов;LEFT JOIN likes l
иviews v
— проверяется, поставил ли лайк или просмотр текущий пользователь ($1
— его id). Эти поля используются в логических выражениях ниже;CASE WHEN ... THEN true ELSE false
— определяетuser_liked
иuser_viewed
;COALESCE(..., 0)
— если данных о лайках/просмотрах/ответах нет (например, никто не лайкал), подставляется0
;WHERE p.deleted_at IS NULL
— фильтрация: берутся только не удалённые посты.
5. Дополнительные фильтры
По тексту:
if "search" in dto and dto["search"]:
query += f" AND p.text ILIKE %s"
params.append(f"%{dto['search']}%")
Если передана строка search
, ищутся посты, в тексте которых есть соответствие.
По пользователю (автору):
if "owner_id" in dto and dto["owner_id"]:
query += f" AND p.user_id = %s"
params.append(dto["owner_id"])
Если передан owner_id
, отбираются посты конкретного пользователя.
По ответам:
if "reply_to_id" in dto and dto["reply_to_id"]:
query += f" AND p.reply_to_id = %s ORDER BY p.created_at ASC"
params.append(dto["reply_to_id"])
else:
query += " AND p.reply_to_id IS NULL ORDER BY p.created_at DESC"
Проверяется, являются ли посты ответами на другой пост (reply_to_id
) или это корневые посты.
6. Пагинация
query += " OFFSET %s LIMIT %s"
params.extend([dto["offset"], dto["limit"]])
Реализуется механика "скользящего окна" — выбирается определённый диапазон постов.
7. Возвращаемый результат
Результат собирается в виде массива постов. Каждый пост содержит:
данные самого поста,
данные автора (
user
),количество лайков, просмотров, ответов,
флаги
user_liked
,user_viewed
.
Далее рассмотрим реализацию функции get_post_by_id
.
def get_post_by_id(post_id: int, user_id: int) -> dict:
query = """
WITH likes_count AS (
SELECT post_id, COUNT(*) AS likes_count
FROM likes
GROUP BY post_id
),
views_count AS (
SELECT post_id, COUNT(*) AS views_count
FROM views
GROUP BY post_id
),
replies_count AS (
SELECT reply_to_id, COUNT(*) AS replies_count
FROM posts
WHERE reply_to_id IS NOT NULL
GROUP BY reply_to_id
)
SELECT
p.id AS post_id,
p.text,
p.reply_to_id,
p.created_at,
u.id AS user_id,
u.user_name,
u.first_name,
u.last_name,
COALESCE(lc.likes_count, 0) AS likes_count,
COALESCE(vc.views_count, 0) AS views_count,
COALESCE(rc.replies_count, 0) AS replies_count,
CASE WHEN l.user_id IS NOT NULL THEN true ELSE false END AS user_liked,
CASE WHEN v.user_id IS NOT NULL THEN true ELSE false END AS user_viewed
FROM posts p
JOIN users u ON p.user_id = u.id
LEFT JOIN likes_count lc ON p.id = lc.post_id
LEFT JOIN views_count vc ON p.id = vc.post_id
LEFT JOIN replies_count rc ON p.id = rc.reply_to_id
LEFT JOIN likes l ON l.post_id = p.id AND l.user_id = %s
LEFT JOIN views v ON v.post_id = p.id AND v.user_id = %s
WHERE p.id = %s AND p.deleted_at IS NULL;
"""
params = (user_id, user_id, post_id)
with pool.connection() as conn:
with conn.cursor(row_factory=dict_row) as cur:
cur.execute(query, params)
row = cur.fetchone()
if row is None:
raise ValueError("Post not found")
return {
"id": row["post_id"],
"text": row["text"],
"reply_to_id": row["reply_to_id"],
"created_at": row["created_at"],
"likes_count": row["likes_count"],
"views_count": row["views_count"],
"replies_count": row["replies_count"],
"user_liked": row["user_liked"],
"user_viewed": row["user_viewed"],
"user": {
"id": row["user_id"],
"user_name": row["user_name"],
"first_name": row["first_name"],
"last_name": row["last_name"],
},
}
Функция get_post_by_id
используется для получения одного конкретного поста по его идентификатору. Она возвращает расширенную информацию по посту, включая лайки, просмотры, количество ответов и данные об авторе. Функция похожа на get_all_posts
, за исключением некоторых отличий.
Фильтрация по ID поста
Вместо выборки множества записей, запрос ограничивается одним постом:
WHERE p.id = $2 AND p.deleted_at IS NULL
Первый параметр — это user_id
(нужен для определения, лайкнул ли/просматривал ли пользователь пост), второй — это ID самого поста, который ищется.
Отсутствует пагинация
Метод возвращает только один пост, поэтому нет OFFSET
и LIMIT
.
Возвращаемое значение
get_post_by_id
возвращает один объект поста, а get_all_posts
- массив.
Обработка крайних случаев
Если пост не найден, get_post_by_id
выбрасывает исключение "Post not found", а get_all_posts
возвращает пустой массив.
Перейдем к реализации метода delete_post
.
def delete_post(post_id: int, owner_id: int) -> None:
query = """
...
"""
with pool.connection() as conn:
with conn.cursor() as cur:
cur.execute(query, (post_id, owner_id))
if cur.rowcount == 0:
raise ValueError("Post not found or already deleted")
Задание
Реализуйте метод delete_post
, который помечает пост как удалённый. SQL-запрос должен обновлять поле deleted_at
текущим временем, работать только с постами, принадлежащими автору и исключать уже удалённые посты.
Теперь реализуем метод, который регистрирует факт просмотра поста пользователем. Каждый пользователь может просмотреть пост только один раз — повторные просмотры не записываются.
from psycopg.errors import UniqueViolation
def view_post(post_id: int, user_id: int) -> None:
query = """
...
"""
try:
with pool.connection() as conn:
with conn.cursor() as cur:
cur.execute(query, (post_id, user_id))
if cur.rowcount == 0:
raise ValueError("Post not found")
except UniqueViolation as err:
if "pk__views" in str(err):
raise ValueError("Post already viewed") from err
raise
Внимание
Обратите внимание на строку if "pk__views" in str(err)
. Здесь pk__views
- это имя первичного ключа у таблицы views
. Подставьте свое, если у вас отличается.
Задание
Реализуйте метод view_post
, который добавляет новую запись в таблицу views
.
Теперь реализуем метод, который позволяет пользователю поставить лайк посту. Один пользователь может поставить лайк одному посту только один раз — повторные попытки должны вызывать ошибку.
def like_post(post_id: int, user_id: int) -> None:
query = """
...
"""
try:
with pool.connection() as conn:
with conn.cursor() as cur:
cur.execute(query, (post_id, user_id, post_id))
if cur.rowcount == 0:
raise ValueError("Post not found")
except UniqueViolation as err:
if "pk__likes" in str(err):
raise ValueError("Post already liked") from err
raise
Внимание
Обратите внимание на строку if "pk__likes" in str(err)
. Здесь pk__likes
- это имя первичного ключа у таблицы likes
. Подставьте свое, если у вас отличается.
Задание
Реализуйте метод like_post
, который добавляет новую запись в таблицу likes
.
Метод dislike_post
позволяет пользователю убрать лайк с поста, если он его ранее поставил.
def dislike_post(post_id: int, user_id: int) -> None:
query = """
...
"""
with pool.connection() as conn:
with conn.cursor() as cur:
cur.execute(query, (post_id, user_id, post_id))
if cur.rowcount == 0:
raise ValueError("Post not found")
Задание
Реализуйте метод dislike_post
, который удаляет запись из таблицы likes
.
Тестирование репозитория постов
В папке __tests__/repositories
создайте файл test_post_repository.py
и поместите в него код с unit-тестами:
Unit-тесты postRepository
from datetime import datetime
from unittest.mock import MagicMock, patch
import pytest
from psycopg.errors import UniqueViolation
from repositories.post_repository import (create_post, delete_post,
dislike_post, get_all_posts,
get_post_by_id, like_post, view_post)
def normalize_sql(sql: str) -> str:
return " ".join(sql.lower().split())
@pytest.fixture
def mock_conn():
with patch("config.db.pool.connection") as mock_conn_context:
yield mock_conn_context
def test_create_post_success(mock_conn):
dto = {
"text": "Lorem ipsum dolor sit amet, consectetur adipiscing",
"user_id": 1,
"reply_to_id": None,
}
expected = {
"id": 1,
"text": dto["text"],
"created_at": datetime.utcnow(),
"reply_to_id": None,
}
mock_cursor = MagicMock()
mock_cursor.fetchone.return_value = expected
mock_conn.return_value.__enter__.return_value.cursor.return_value.__enter__.return_value = mock_cursor
result = create_post(dto)
assert result == expected
sql_called = mock_cursor.execute.call_args[0][0].lower()
normalized_sql = normalize_sql(sql_called)
assert "insert into posts" in normalized_sql
params = mock_cursor.execute.call_args[0][1]
assert params == (dto["text"], dto["user_id"], dto["reply_to_id"])
def test_create_post_error(mock_conn):
dto = {
"text": "Lorem ipsum dolor sit amet, consectetur adipiscing",
"user_id": 1,
"reply_to_id": None,
}
fake_error = Exception("insert failed")
mock_cursor = MagicMock()
mock_cursor.execute.side_effect = fake_error
mock_conn.return_value.__enter__.return_value.cursor.return_value.__enter__.return_value = mock_cursor
with pytest.raises(Exception, match="insert failed"):
create_post(dto)
sql_called = mock_cursor.execute.call_args[0][0].lower()
normalized_sql = normalize_sql(sql_called)
assert "insert into posts" in normalized_sql
params = mock_cursor.execute.call_args[0][1]
assert params == (dto["text"], dto["user_id"], dto["reply_to_id"])
def test_create_post_success(mock_conn):
dto = {
"text": "Lorem ipsum dolor sit amet, consectetur adipiscing",
"user_id": 1,
"reply_to_id": None,
}
expected = {
"id": 1,
"text": dto["text"],
"created_at": datetime.utcnow(),
"reply_to_id": None,
}
mock_cursor = MagicMock()
mock_cursor.fetchone.return_value = expected
mock_conn.return_value.__enter__.return_value.cursor.return_value.__enter__.return_value = mock_cursor
result = create_post(dto)
assert result == expected
sql_called = mock_cursor.execute.call_args[0][0].lower()
normalized_sql = normalize_sql(sql_called)
assert "insert into posts" in normalized_sql
params = mock_cursor.execute.call_args[0][1]
assert params == (dto["text"], dto["user_id"], dto["reply_to_id"])
def test_create_post_error(mock_conn):
dto = {
"text": "Lorem ipsum dolor sit amet, consectetur adipiscing",
"user_id": 1,
"reply_to_id": None,
}
fake_error = Exception("insert failed")
mock_cursor = MagicMock()
mock_cursor.execute.side_effect = fake_error
mock_conn.return_value.__enter__.return_value.cursor.return_value.__enter__.return_value = mock_cursor
with pytest.raises(Exception, match="insert failed"):
create_post(dto)
sql_called = mock_cursor.execute.call_args[0][0].lower()
normalized_sql = normalize_sql(sql_called)
assert "insert into posts" in normalized_sql
params = mock_cursor.execute.call_args[0][1]
assert params == (dto["text"], dto["user_id"], dto["reply_to_id"])
def test_get_all_posts_success(mock_conn):
now = datetime(2025, 4, 24, 20, 55, 53, 21000)
dto = {
"user_id": 1,
"owner_id": 0,
"limit": 100,
"offset": 0,
"reply_to_id": 1,
"search": "test",
}
mock_cursor = MagicMock()
mock_cursor.fetchall.return_value = [
{
"id": 1,
"text": "Post 1",
"reply_to_id": None,
"created_at": now,
"likes_count": 10,
"views_count": 100,
"replies_count": 0,
"user_liked": True,
"user_viewed": True,
"user_id": 1,
"user_name": "username",
"first_name": "first",
"last_name": "last",
},
{
"id": 2,
"text": "Post 2",
"reply_to_id": None,
"created_at": now,
"likes_count": 5,
"views_count": 50,
"replies_count": 2,
"user_liked": False,
"user_viewed": True,
"user_id": 1,
"user_name": "username",
"first_name": "first",
"last_name": "last",
},
]
mock_conn.return_value.__enter__.return_value.cursor.return_value.__enter__.return_value = mock_cursor
result = get_all_posts(dto)
assert result == [
{
"id": 1,
"text": "Post 1",
"reply_to_id": None,
"created_at": now,
"likes_count": 10,
"views_count": 100,
"replies_count": 0,
"user_liked": True,
"user_viewed": True,
"user": {
"id": 1,
"user_name": "username",
"first_name": "first",
"last_name": "last",
},
},
{
"id": 2,
"text": "Post 2",
"reply_to_id": None,
"created_at": now,
"likes_count": 5,
"views_count": 50,
"replies_count": 2,
"user_liked": False,
"user_viewed": True,
"user": {
"id": 1,
"user_name": "username",
"first_name": "first",
"last_name": "last",
},
},
]
sql_called = mock_cursor.execute.call_args[0][0].lower()
normalized = normalize_sql(sql_called)
assert "select" in normalized
params = mock_cursor.execute.call_args[0][1]
assert params[0] == dto["user_id"]
def test_get_all_posts_error(mock_conn):
dto = {
"user_id": 1,
"owner_id": 0,
"limit": 100,
"offset": 0,
"reply_to_id": 1,
"search": "test",
}
mock_cursor = MagicMock()
mock_cursor.execute.side_effect = Exception("query failed")
mock_conn.return_value.__enter__.return_value.cursor.return_value.__enter__.return_value = mock_cursor
with pytest.raises(Exception, match="query failed"):
get_all_posts(dto)
sql_called = mock_cursor.execute.call_args[0][0].lower()
normalized = normalize_sql(sql_called)
assert "select" in normalized
params = mock_cursor.execute.call_args[0][1]
assert params[0] == dto["user_id"]
def test_get_post_by_id_success(mock_conn):
user_id = 1
post_id = 1
now = datetime.utcnow()
row = {
"post_id": post_id,
"text": "Lorem ipsum dolor sit amet, consectetur adipiscing",
"reply_to_id": None,
"created_at": now,
"user_id": user_id,
"user_name": "username",
"first_name": "first_name",
"last_name": "last_name",
"likes_count": 10,
"views_count": 100,
"replies_count": 0,
"user_liked": True,
"user_viewed": True,
}
mock_cursor = MagicMock()
mock_cursor.fetchone.return_value = row
mock_conn.return_value.__enter__.return_value.cursor.return_value.__enter__.return_value = mock_cursor
result = get_post_by_id(post_id, user_id)
assert result == {
"id": row["post_id"],
"text": row["text"],
"reply_to_id": row["reply_to_id"],
"created_at": row["created_at"],
"likes_count": row["likes_count"],
"views_count": row["views_count"],
"replies_count": row["replies_count"],
"user_liked": row["user_liked"],
"user_viewed": row["user_viewed"],
"user": {
"id": row["user_id"],
"user_name": row["user_name"],
"first_name": row["first_name"],
"last_name": row["last_name"],
},
}
sql_called = mock_cursor.execute.call_args[0][0].lower()
assert "select" in normalize_sql(sql_called)
params = mock_cursor.execute.call_args[0][1]
assert params == (user_id, user_id, post_id)
def test_get_post_by_id_not_found(mock_conn):
mock_cursor = MagicMock()
mock_cursor.fetchone.return_value = None
mock_conn.return_value.__enter__.return_value.cursor.return_value.__enter__.return_value = mock_cursor
with pytest.raises(Exception, match="Post not found"):
get_post_by_id(999, 1)
sql_called = mock_cursor.execute.call_args[0][0].lower()
assert "select" in normalize_sql(sql_called)
params = mock_cursor.execute.call_args[0][1]
assert params == (1, 1, 999)
def test_delete_post_success(mock_conn):
post_id = 1
owner_id = 1
mock_cursor = MagicMock()
mock_cursor.rowcount = 1
mock_conn.return_value.__enter__.return_value.cursor.return_value.__enter__.return_value = mock_cursor
assert delete_post(post_id, owner_id) is None
sql_called = mock_cursor.execute.call_args[0][0].lower()
normalized_sql = normalize_sql(sql_called)
assert "update posts set deleted_at = now()" in normalized_sql
assert "where id =" in normalized_sql and "user_id =" in normalized_sql
params = mock_cursor.execute.call_args[0][1]
assert params == (post_id, owner_id)
def test_delete_post_not_found(mock_conn):
post_id = 2
owner_id = 1
mock_cursor = MagicMock()
mock_cursor.rowcount = 0
mock_conn.return_value.__enter__.return_value.cursor.return_value.__enter__.return_value = mock_cursor
with pytest.raises(Exception, match="Post not found or already deleted"):
delete_post(post_id, owner_id)
sql_called = mock_cursor.execute.call_args[0][0].lower()
normalized_sql = normalize_sql(sql_called)
assert "update posts set deleted_at = now()" in normalized_sql
params = mock_cursor.execute.call_args[0][1]
assert params == (post_id, owner_id)
def test_view_post_success(mock_conn):
post_id = 1
user_id = 1
mock_cursor = MagicMock()
mock_cursor.rowcount = 1
mock_conn.return_value.__enter__.return_value.cursor.return_value.__enter__.return_value = mock_cursor
assert view_post(post_id, user_id) is None
sql_called = mock_cursor.execute.call_args[0][0].lower()
normalized_sql = normalize_sql(sql_called)
assert "insert into views (post_id, user_id)" in normalized_sql
params = mock_cursor.execute.call_args[0][1]
assert params == (post_id, user_id)
def test_view_post_error_sql(mock_conn):
post_id = 2
user_id = 1
mock_cursor = MagicMock()
mock_cursor.execute.side_effect = Exception("insert failed")
mock_conn.return_value.__enter__.return_value.cursor.return_value.__enter__.return_value = mock_cursor
with pytest.raises(Exception, match="insert failed"):
view_post(post_id, user_id)
sql_called = mock_cursor.execute.call_args[0][0].lower()
normalized_sql = normalize_sql(sql_called)
assert "insert into views (post_id, user_id)" in normalized_sql
params = mock_cursor.execute.call_args[0][1]
assert params == (post_id, user_id)
def test_view_post_already_viewed(mock_conn):
post_id = 3
user_id = 1
mock_cursor = MagicMock()
mock_cursor.execute.side_effect = UniqueViolation(
'duplicate key value violates unique constraint "pk__views"'
)
mock_conn.return_value.__enter__.return_value.cursor.return_value.__enter__.return_value = mock_cursor
with pytest.raises(ValueError, match="Post already viewed"):
view_post(post_id, user_id)
def test_like_post_success(mock_conn):
post_id = 1
user_id = 1
mock_cursor = MagicMock()
mock_cursor.rowcount = 1
mock_conn.return_value.__enter__.return_value.cursor.return_value.__enter__.return_value = mock_cursor
assert like_post(post_id, user_id) is None
sql_called = mock_cursor.execute.call_args[0][0].lower()
normalized_sql = normalize_sql(sql_called)
assert "insert into likes (post_id, user_id)" in normalized_sql
params = mock_cursor.execute.call_args[0][1]
assert params == (post_id, user_id, post_id)
def test_like_post_error(mock_conn):
post_id = 2
user_id = 1
mock_cursor = MagicMock()
mock_cursor.execute.side_effect = Exception("insert failed")
mock_conn.return_value.__enter__.return_value.cursor.return_value.__enter__.return_value = mock_cursor
with pytest.raises(Exception, match="insert failed"):
like_post(post_id, user_id)
sql_called = mock_cursor.execute.call_args[0][0].lower()
normalized_sql = normalize_sql(sql_called)
assert "insert into likes (post_id, user_id)" in normalized_sql
params = mock_cursor.execute.call_args[0][1]
assert params == (post_id, user_id, post_id)
def test_like_post_already_liked(mock_conn):
post_id = 3
user_id = 1
mock_cursor = MagicMock()
mock_cursor.execute.side_effect = UniqueViolation(
'duplicate key value violates unique constraint "pk__likes"'
)
mock_conn.return_value.__enter__.return_value.cursor.return_value.__enter__.return_value = mock_cursor
with pytest.raises(ValueError, match="Post already liked"):
like_post(post_id, user_id)
def test_dislike_post_success(mock_conn):
post_id = 1
user_id = 1
mock_cursor = MagicMock()
mock_cursor.rowcount = 1
mock_conn.return_value.__enter__.return_value.cursor.return_value.__enter__.return_value = mock_cursor
assert dislike_post(post_id, user_id) is None
sql_called = mock_cursor.execute.call_args[0][0].lower()
normalized_sql = normalize_sql(sql_called)
assert "delete from likes where post_id = %s and user_id = %s" in normalized_sql
params = mock_cursor.execute.call_args[0][1]
assert params == (post_id, user_id, post_id)
def test_dislike_post_error(mock_conn):
post_id = 2
user_id = 1
mock_cursor = MagicMock()
mock_cursor.execute.side_effect = Exception("delete failed")
mock_conn.return_value.__enter__.return_value.cursor.return_value.__enter__.return_value = mock_cursor
with pytest.raises(Exception, match="delete failed"):
dislike_post(post_id, user_id)
sql_called = mock_cursor.execute.call_args[0][0].lower()
normalized_sql = normalize_sql(sql_called)
assert "delete from likes where post_id = %s and user_id = %s" in normalized_sql
params = mock_cursor.execute.call_args[0][1]
assert params == (post_id, user_id, post_id)
def test_dislike_post_not_found(mock_conn):
post_id = 3
user_id = 1
mock_cursor = MagicMock()
mock_cursor.rowcount = 0
mock_conn.return_value.__enter__.return_value.cursor.return_value.__enter__.return_value = mock_cursor
with pytest.raises(Exception, match="Post not found"):
dislike_post(post_id, user_id)
sql_called = mock_cursor.execute.call_args[0][0].lower()
normalized_sql = normalize_sql(sql_called)
assert "delete from likes where post_id = %s and user_id = %s" in normalized_sql
params = mock_cursor.execute.call_args[0][1]
assert params == (post_id, user_id, post_id)
Запустите тесты. Если вы все сделали правильно, все тесты пройдены.
tests/repositories/test_post_repository.py::test_create_post_success PASSED [ 3%]
tests/repositories/test_post_repository.py::test_create_post_error PASSED [ 6%]
tests/repositories/test_post_repository.py::test_get_all_posts_success PASSED [ 10%]
tests/repositories/test_post_repository.py::test_get_all_posts_error PASSED [ 13%]
tests/repositories/test_post_repository.py::test_get_post_by_id_success PASSED [ 16%]
tests/repositories/test_post_repository.py::test_get_post_by_id_not_found PASSED [ 20%]
tests/repositories/test_post_repository.py::test_delete_post_success PASSED [ 23%]
tests/repositories/test_post_repository.py::test_delete_post_not_found PASSED [ 26%]
tests/repositories/test_post_repository.py::test_view_post_success PASSED [ 30%]
tests/repositories/test_post_repository.py::test_view_post_error_sql PASSED [ 33%]
tests/repositories/test_post_repository.py::test_view_post_already_viewed PASSED [ 36%]
tests/repositories/test_post_repository.py::test_like_post_success PASSED [ 40%]
tests/repositories/test_post_repository.py::test_like_post_error PASSED [ 43%]
tests/repositories/test_post_repository.py::test_like_post_already_liked PASSED [ 46%]
tests/repositories/test_post_repository.py::test_dislike_post_success PASSED [ 50%]
tests/repositories/test_post_repository.py::test_dislike_post_error PASSED [ 53%]
tests/repositories/test_post_repository.py::test_dislike_post_not_found PASSED [ 56%]
tests/repositories/test_user_repository.py::test_create_user_success PASSED [ 60%]
tests/repositories/test_user_repository.py::test_create_user_error PASSED [ 63%]
tests/repositories/test_user_repository.py::test_get_all_users_success PASSED [ 66%]
tests/repositories/test_user_repository.py::test_get_all_users_error PASSED [ 70%]
tests/repositories/test_user_repository.py::test_get_user_by_id_success PASSED [ 73%]
tests/repositories/test_user_repository.py::test_get_user_by_id_not_found PASSED [ 76%]
tests/repositories/test_user_repository.py::test_get_user_by_username_success PASSED [ 80%]
tests/repositories/test_user_repository.py::test_get_user_by_username_not_found PASSED [ 83%]
tests/repositories/test_user_repository.py::test_update_user_success PASSED [ 86%]
tests/repositories/test_user_repository.py::test_update_user_no_fields PASSED [ 90%]
tests/repositories/test_user_repository.py::test_update_user_not_found PASSED [ 93%]
tests/repositories/test_user_repository.py::test_delete_user_success PASSED [ 96%]
tests/repositories/test_user_repository.py::test_delete_user_not_found PASSED [100%]
=================================================================== 30 passed in 0.10s ====================================================================
Итог
Мы последовательно разработали два слоя доступа к данным — репозиторий пользователей и репозиторий постов, следуя архитектурному принципу разделения ответственности. Мы:
Создали функции для основных операций с базой данных (создание, чтение, обновление, удаление).
Реализовали SQL-запросы с использованием позиционных параметров, обеспечивающих защиту от SQL-инъекций.
Поддержали гибкие фильтры, пагинацию и условия отбора данных (например, по
user_id
,reply_to_id
,text
).Обработали все возможные ошибки, включая ситуации "не найдено" и конфликты при повторных действиях (например, повторный лайк).
Написали юнит-тесты, чтобы убедиться в корректности реализации всех функций.
Такой подход делает код читаемым, легко поддерживаемым и расширяемым. Теперь мы готовы перейти к разработке следующего слоя — функционального (сервисов), где будет реализована логика обработки данных и проверок перед их отправкой в репозитории.