![]() |
![]() |
![]() |
![]() |
![]() |
![]() ![]() ![]() |
![]() |
![]() |
![]() |
![]() |
![]() |
![]() |
|
[14 декабря 2008 г.] |
|
![]() |
Иногда встречается ситуация, которую можно обобщить фразой "задача параллельной обработки очереди". В данной статье я кратко расскажу, почему для эффективного решения этой задачи удобнее всего использовать PostgreSQL, а не какую-либо другую СУБД (типа MySQL или FireBird). |
Предположим, у нас есть очередь из большого количества элементов (несколько десятков миллионов). Требуется обработать каждый элемент очереди, затратив на это минимальное время, при следующих допущениях:
Представим очередь таблицей PostgreSQL. Пусть это будет, например, таблица подписчиков на различные рассылки проекта, содержащая около 10 млн элементов:
Листинг 1 |
CREATE TABLE subscription( id INTEGER, первичный ключ user_id INTEGER, ссылка на пользователя subscription_type INTEGER, тип подписки: новости, события друзей и т. д. last_mail_at |
Обработка каждого
Пусть мы инициируем новую рассылку в момент времени MAILING_STARTED_AT.
Давайте предположим, что мы запускаем 50 параллельных процессов, каждый из
которых берет очередной необработанный элемент из очереди (last_mail_at < MAILING_STARTED_AT;
считаем, что каждый из процессов имеет возможность получить значение
MAILING_STARTED_AT, т.е. оно константно после инициации рассылки). Далее он
обрабатывает его, а
![]() |
Дело пойдет быстрее, если эти 50 процессов будут запущены сразу на нескольких физических машинах. Но с точки зрения архитектуры и отладки все равно, работают ли они в рамках одной машины или нескольких. |
SQL-запрос на выборку очередного элемента может выглядеть так:
Листинг 2 |
SELECT id FROM subscription WHERE last_mail_at < MAILING_STARTED_AT AND subscription_type = <тип подписки> LIMIT 1 |
Самое сложное в этой
Давайте сразу отбросим плохо работающие варианты.
И вот тут на сцену выходит PostgreSQL. В нем есть одна возможность, которой нет,
например, в MySQL и
Для начала договоримся, что будем извлекать элементы не по одному, а целыми пачками по 100 шт. Это снизит число SQL-запросов на выборку в 100 раз и примерно во столько же раз улучшит производительность:
Листинг 3 |
SELECT id FROM subscription WHERE last_mail_at < MAILING_STARTED_AT AND subscription_type = <тип подписки> LIMIT 100 |
Естественно, чтобы этот запрос отрабатывал мгновенно, а не сканировал всю базу, нужно создать правильный индекс:
Листинг 4 |
CREATE INDEX subscription_idx ON subscription USING btree (subscription_type, last_mail_at, id) |
После создания индекса проверим, что он действительно используется в нашем запросе, запустив EXPLAIN SELECT:
Листинг 5 |
QUERY PLAN Limit (cost=0.00..679.00 rows=100 width=4) -> Index Scan using subscription_idx on subscription (cost=0.00..490539.67 rows=722449 width=4) Index Cond: ((subscription_type = <тип подписки>) AND (last_mail_at < MAILING_STARTED_AT)) |
Итак, PostgreSQL выбирает первые 100 элементов индекса, а не сканирует всю таблицу целиком. Чего мы и добивались.
Теперь модифицируем наш запрос так, чтобы при его запуске на нескольких машинах результаты никогда не пересекались. Это делается так (псевдо-операция @IDS := ... означает, что данные нужно сохранить в некоторый массив вызывающего скрипта):
Листинг 6 |
/* Исходный запрос: получаем элементы-кандидаты на обработку. */ @IDS := ARRAY( SELECT id FROM subscription WHERE last_mail_at < MAILING_STARTED_AT AND subscription_type = <тип подписки> AND pg_try_advisory_lock(tableoid::INTEGER, id) /* вот она, блокировка! */ LIMIT 100 ); /* Пост-проверка: отсеиваем тех, кто успел пометиться обработанным за */ /* время работы предыдущего запроса (см. ниже подробности про это). */ @IDS := ARRAY( SELECT id FROM subscription WHERE last_mail_at < MAILING_STARTED_AT AND id IN (@IDS) ); |
![]() |
Функция pg_try_advisory_lock() пытается "повесить" исключительную блокировку на некоторый "виртуальный" идентификтор, который описывается парой ее параметров. Если этот идентификатор уже заблокирован, она ничего не делает и возвращает false. Если же блокировку удалось установить, то функция возвращает true. Блокировка снимается, когда вызывается pg_advisory_unlock(), либо когда клиент отсоединяется от базы данных. |
Посмотрим, насколько эффективно PostgreSQL выполняет этот запрос, запустив EXPLAIN ANALYZE:
Листинг 7 |
QUERY PLAN Limit (cost=0.00..2047.39 rows=100 width=4) (actual time=0.163..4.948 rows=100 loops=1) -> Index Scan using subscription_idx on subscription (cost=0.00..493513.69 rows=241046 width=4) (actual time=0.161..4.393 rows=100 loops=1) Index Cond: ((subscription_type = <тип подписки>) AND (last_mail_at < MAILING_STARTED_AT)) Filter: pg_try_advisory_lock((tableoid)::integer, id) Total runtime: 5.270 ms |
Мы видим, что запрос все еще "хороший": PostgreSQL идет по индексу, отбрасывая элементы, которые не удалось заблокировать.
![]() |
В качестве первого параметра pg_try_advisory_lock() мы используем tableoid::integer, что является внутренним идентификатором таблицы subscription. Это значение отлично подходит в качестве изолированного "пространства имен" для блокировки. |
Если теперь, не закрывая соединения, запустить этот же запрос в другом процессе, он вернет элементы, которые не были заблокированы в первом процессе, что нам и нужно.
Итак, у нас есть 100 значений id из таблицы subscription. В цикле для каждого из них выполняем следующие действия:
Если вы реализуете алгоритм в точности таком виде, как описано выше, то с удивлением обнаружите, что иногда один и тот же элемент будет обработан дважды. Удивительно, но это действительно так! Давайте посмотрим на картинке, почему это происходит.
Итак:
![]() |
В действительности не обязательно, чтобы SQL-запрос разорвался прямо в середине, потому что PostgreSQL поддерживает версионирование записей в таблицах. Процесс (B) может перехватить управление после старта охватывающей транзакции, но до запуска SELECT. Т.е. вероятность сбоя в алгоритме возрастает. |
К счастью, решить проблему очень легко: нужно после извлечения пачки элементов из очереди сделать повторную проверку, удовлетворяют ли они условию last_mail_at < MAILING_STARTED_AT:
Давайте посмотрим, почему это работает:
А значит, пост-проверка пройдет успешно, и в конце запроса A2 мы гарантировано получим необработанные элементы.
В примере выше мы обрабатываем элементы в произвольном порядке. (Этот порядок
на практике идет по возрастанию last_mail_at, однако вряд ли такое поведение
можно назвать полезным.) Как же быть, если элементы нужно брать в соответствии
с определенным
Вообще говоря, как только мы запускаем параллельную обработку, само понятие "порядка" становится весьма условным. Если же нужен порядок "приблизительный" (к примеру, по возрастанию id), напрашивается идея слегка изменить SQL-запрос выборки, добавив в него ORDER BY:
Листинг 8 |
SELECT id FROM subscription WHERE last_mail_at < MAILING_STARTED_AT AND subscription_type = <тип подписки> AND pg_try_advisory_lock(tableoid::INTEGER, id) ORDER BY id /* не делайте так! */ LIMIT 100 |
К сожалению, этот способ не работает: невозможно создать такой индекс, который одновременно учитывал бы и условие "last_mail_at < MAILING_STARTED_AT" (либо даже "last_mail_at <> MAILING_STARTED_AT"), и критерий сортировки "ORDER BY id". (Подробности см. в документации PostgreSQL.)
Можно предложить следующее решение:
Листинг 9 |
UPDATE subscription SET mailing_num=MAILING_NUM subscription_type = <тип подписки>; |
Листинг 10 |
SELECT id FROM subscription WHERE mailing_num = MAILING_NUM AND pg_try_advisory_lock(tableoid::INTEGER, id) ORDER BY id /* теперь так можно! */ LIMIT 100 |
Листинг 11 |
CREATE INDEX subscription_idx ON subscription USING btree (mailing_num, id) |
Все, что мы сделали, это избавились от условия "меньше" или "не равно" в WHERE-части запроса, за счет чего получили возможность создания эффективного индекса для ORDER BY.
Обратите внимание, что ни одна из операций, используемых выше, не является блокирующей всю работу. Т.е. 50 наших процессов могут работать параллельно и никогда не ждут друг друга. Даже операция UPDATE не блокирует другие процессы, т.к. в PostgreSQL запись никогда не блокирует чтение. А значит, архитектура допускает практически неограниченное масштабирование: достаточно увеличить количество машин в 2 раза, чтобы примерно во столько же ускорить рассылку.
Нужно также заметить, что реализация данного алгоритма в MySQL заведомо обречена на провал. В этой СУБД хотя и есть операция GET_LOCK, но она может блокировать только одну запись в соединении (если вы попытаетесь заблокировать вторую, то первая будет освобождена).
Ну и последнее. Операция pg_try_advisory_lock() принципиально отличается от SELECT ... FOR UPDATE тем, что она не блокирует выполнение SELECT-операций в других процессах и, согласно документации, ничего не пишет на диск. А значит, работает в нашей задаче очень эффективно.
![]() |
|
Дмитрий Котеров |
14 декабря 2008 г.
©1999-2018
|
|
Вернуться к оглавлению |