|
Professor Seleznov
|
В этот раз мы рассмотрим, как пересвёртка рекурсивных структур данных помогает в решении задач динамического программирования. Подразумевается, что читатель уже знаком с предыдущими частями обзора, но на всякий случай в первых двух разделах представлены некоторые выдержки оттуда, наиболее важные для данной публикации. Далее мы разберём нюансы реализации пересвёртки — решим проблему переполнения стека и рассмотрим частный случай пересвёртки последовательностей. В конце приводятся примеры решения некоторых задач динамического программирования с использованием полученных ранее функций пересвёртки.
Содержание
- Рекурсия
- Неподвижные точки конструкторов типов
- Пересвёртка
- Пересвёртка неподвижной точки
- Защита от переполнения стека
- Пересвёртка последовательностей
- Решение задач
- Динамическое программирование
- Скалярное произведение сжатых векторов
- Количество заправок
- Задача о ферзях
- Выводы
Выдержки из предыдущих частей Рекурсия Рекурсивным называется алгоритм, в процессе выполнения которого происходит вызов его самого. Прежде всего, рекурсия — это техника описания алгоритмов с повторениями. В программировании чаще всего она реализуется через возможность использования идентификаторов, которые будут определены позднее. Например, метод считается полностью определенным, лишь когда завершена его реализация, значит вызовы этого же метода изнутри уже считаются рекурсией. Ссылки на другие неизвестные пока методы также могут приводить к косвенной рекурсии. Одной из существенных проблем рекурсии является её слабая предсказуемость. Тогда как в обычных циклах условие завершения повторений зафиксировано в одном месте, в случае рекурсии найти и оценить такие условия может быть крайне сложно. Более того, существуют запутанные ситуации, когда даже теоретически невозможно предсказать, завершится ли рекурсивный алгоритм, или зациклится намертво. Поэтому в некоторых языках программирования (F#) по умолчанию запрещены ссылки на ещё не определённые идентификаторы. Это позволяет защититься от нежелательных рекурсивных (циклических) зависимостей, но так или иначе программистам оставляется возможность реализовать рекурсию с помощью дополнительного синтаксиса. Подобную непредсказуемость повторений нельзя устранять полностью, так как она является неотъемлемой характеристикой тьюринг-полноных языков. Поведение алгоритмов изучает такой раздел математики, как лямбда-исчисление, со всеми своими расширениями. В математике в целом, запрещено ссылаться на то, чего ещё нет. Концепция повторений реализуется в лямбда-исчислении посредством дополнительных кванторов неподвижных точек функций. Любая функция одного аргумента, принимая на вход свою неподвижную точку, возвращает в точности её же. Такая техника позволяет переписать любую рекурсию через комбинатор неподвижной точки (Y-комбинатор), сохраняя при этом тьюринг-полноту языка. В программировании мы можем вынести любые повторения в Y-комбинатор:
// Y-комбинатор для функций F => F, где F = A => B def fix[A, B]: ((A => B) => (A => B)) => (A => B) = f => f(a => fix(f)(a)) // λ-выражение необходимо для "ленивости" type Int3 = (Int, Int, Int) def fib: (Int3 => Int) => (Int3 => Int) = // функция высшего порядка F => F self => (n, a, b) => n match // аргумент self - "ссылка на себя" case 0 => a case 1 => b case _ => self(n-1, b, a + b) // неподвижная точка функции высшего порядка - это функция val fibonacci: Int => Int = fix(fib)(_, 0, 1)
Другая беда рекурсии связана с переполнениями стека. Дело в том, что каждый вызов функции потребляет ценный ресурс — сравнительно небольшую область памяти, выделяемую под стек. Рекурсия обычно связана с глубокой вложенностью вызовов, что часто приводит к исчерпанию стека и знаменитому исключению StackOverflow. Поэтому многие программисты чаще отдают предпочтение циклам взамен рекурсии. Но это проблема не самой концепции рекурсии, а лишь её реализации в современных компьютерах. Есть и такие реализации, которые позволяют сохранить описательные преимущества рекурсии, избегая ловушки переполнения стека. Речь идёт об оптимизации хвостовой рекурсии, когда «вызов себя» является последним действием в методе. Некоторые компиляторы (Scala) и среды исполнения, обнаружив хвостовой вызов, просто передают управление на начало метода, не потребляя стек. По сути, «под капотом» рекурсия превращается в цикл! Существуют специальные теоремы, доказывающие возможность переписать любую рекурсию в виде цикла и наоборот. Показанный выше Y-комбинатор fix реализует так называемую неявную рекурсию. Рекурсивные вызовы инициируются не в теле исходной функции, а уже внутри f, передаваемой в fix как аргумент. Заранее неизвестно, сколько раз будет вызвана рекурсия, и сработает ли она вообще. Поэтому комбинатор с такой сигнатурой невозможно привести к стекобезопасному циклу. Для решения этой задачи необходимо переработать сигнатуру зацикливаемой функции так, чтобы в результате её вычисления сохранялась полная информация обо всех рекурсивных вызовах, в том числе и многократно вложенных. То есть, тип возвращаемого значения должен быть рекурсивной структурой данных, что соответствует неподвижной точке некого конструктора типов. Только в этом случае для Y-комбинатора появится возможность стекобезопасной оптимизации. Одной из подходящих рекурсивных структур является встроенный контейнер TailRec[_] с конструкторами done[A](result: A) и tailcall[A](rest: => TailRec[A]). Он как раз предназначен для обеспечения стекобезопасной рекурсии, и с его помощью наш Y-комбинатор можно переписать так:
import scala.util.control.TailCalls.* def fixTail[A, B](f: (A => TailRec[B]) => (A => TailRec[B])): A => TailRec[B] = a => tailcall(f(fixTail(f))(a))
Эта функция нам ещё пригодится позднее, тогда же рассмотрим и некоторые свойства TailRec. Важно понимать, что на хранение такой структуры данных всё равно будет потребляться память, размер которой пропорционален количеству рекурсивных вызовов. Однако данные будут размещаться уже не в ограниченном стеке, а в динамической памяти (куче). Неподвижные точки конструкторов типов Программистам часто приходится работать с рекурсивными структурами данных. Например, такими: «список — это либо пустой список, либо элемент (голова) и другой список (хвост)». Здесь видна ссылка на себя, значит, это рекурсивное определение. Основные идеи лямбда-исчисления действуют и на уровне типов, когда роли функций играют конструкторы типов вроде Option[_], преобразующие одни типы в другие. В определении классов обычно допускаются «ссылки на себя», а также косвенная рекурсия. Но больше возможностей и предсказуемости предоставляет техника неподвижных точек конструкторов типов, то есть типов такого вида:
case class Fix[F[_]](unfix: F[Fix[F]])
Существуют разные виды неподвижных точек, но прежде всего интересны те из них, которые описывают полярные возможности — конструирование и схлопывание. Так наименьшая неподвижная точка

определяется функцией свёртки (fold), позволяющей вычислить некоторое значение на основе всех данных, хранящихся в структуре. В свою очередь, наибольшая неподвижная точка

отвечает за процесс развёртывания (unfold) структуры данных из заданного значения. Наименьшая и наибольшая неподвижные точки функтора F[_] называются также начальной F-алгеброй и терминальной F-коалгеброй соответственно, и для них предоставляются такие функции:
type Algebra[F[_]] = [X] =>> F[X] => X type Coalgebra[F[_]] = [X] =>> X => F[X] def inμ[F[+_]: Functor]: Algebra[F][μ[F]] // F[μ[F]] => μ[F] def out𝛎[F[+_]: Functor]: Coalgebra[F][𝛎[F]] // 𝛎[F] => F[𝛎[F]]
Для свёртки необходимо предоставить алгебру функтора, тогда как развёртка требует коалгебру:
def foldμ[F[_]]: [X] => Algebra[F][X] => μ[F] => X def unfold𝛎[F[_]]: [X] => Coalgebra[F][X] => X => 𝛎[F]
Наибольшая неподвижная точка представляет собой экзистенциальный тип, который здесь мы запишем в кодировке Чёрча как инструкцию по применению к ней функции-продолжения типа Unfolder:
infix type ~>[F[_], G[_]] = [X] => F[X] => G[X] // просто похоже на естественное преобразование type Unfolder[F[_]] = [R] =>> Coalgebra[F] ~> (* => R) // функция-продолжение type μ[F[_]] = Algebra[F] ~> Id // я могу свернуться в R, если дашь F-алгебру для R type 𝛎[F[_]] = Unfolder[F] ~> Id // я могу свернуться в R, если подскажешь, как получить R, имея F-коалгебру и экземпляр X
На самом деле все реализации неподвижных точек оказываются изоморфными друг другу (со скидкой на бесконечные конструкции). Из пары функций (fold, in) всегда можно получить (unfold, out) и обратно. Значит, все неподвижные точки одинаково характеризуются всеми четырьмя функциями. Это можно выразить с помощью классов типов:
type Fold = [Fix[_[_]]] =>> [F[_]] =>> [X] =>> Fix[F] => X type Unfold = [Fix[_[_]]] =>> [F[_]] =>> [X] =>> X => Fix[F] type FoldFix = [Fix[_[_]]] =>> [F[_]] => Functor[F] ?=> [X] => Algebra[F][X] => Fold[Fix][F][X] type UnfoldFix = [Fix[_[_]]] =>> [F[_]] => Functor[F] ?=> [X] => Coalgebra[F][X] => Unfold[Fix][F][X] type InFix[Fix[_[_]]] = [F[_]] => Functor[F] ?=> Algebra[F][Fix[F]] type OutFix[Fix[_[_]]] = [F[_]] => Functor[F] ?=> Coalgebra[F][Fix[F]]
Очевидно, что функции начальной алгебры InFix[Fix] и терминальной коалгебры OutFix[Fix] образуют изоморфизм — их композиции являются тождественными морфизмами (возвращают в точности то же, что им передали). Чтобы описать неподвижную точку, достаточно реализовать только одну из этих пар. Другая пара выводится автоматически:
type Functor[F[_]] = [A, B] => (A => B) => (F[A] => F[B]) def fmap[F[_] : Functor as F] = F given unfoldFromInFix: [Fix[_[_]]: InFix as in] => UnfoldFix[Fix] = [F[_]] => (_: Functor[F]) ?=> [X] => (coalg: Coalgebra[F][X]) => (x: X) => (in[F] `compose` fmap(unfoldFromInFix(coalg)) `compose` coalg)(x) given foldFromOutFix: [Fix[_[_]]: OutFix as out] => FoldFix[Fix] = [F[_]] => (_: Functor[F]) ?=> [X] => (alg: Algebra[F][X]) => (fixF: Fix[F]) => (alg `compose` fmap(foldFromOutFix(alg)) `compose` out[F])(fixF) given outFromIn: [Fix[_[_]]: {InFix as in, FoldFix as fold}] => OutFix[Fix] = [F[_]] => (_: Functor[F]) ?=> fold(fmap(in[F])) given inFromOut[Fix[_[_]]: {OutFix as out, UnfoldFix as unfold}]: InFix[Fix] = [F[_]] => (_: Functor[F]) ?=> unfold(fmap(out[F]))
Реализации для наименьшей и наибольшей неподвижных точек
given foldμ: FoldFix[μ] = [F[_]] => (_: Functor[F]) ?=> [X] => (alg: Algebra[F][X]) => (fixf: μ[F]) => fixf(alg) given inFixμ: InFix[μ] = [F[_]] => (_: Functor[F]) ?=> (fμF: F[μ[F]]) => [X] => (alg: Algebra[F][X]) => alg(fmap(summon[Fold[μ]](alg))(fμF)) given unfold𝛎: UnfoldFix[𝛎] = [F[_]] => (_: Functor[F]) ?=> [A] => (coalg: Coalgebra[F][A]) => (x: A) => [R] => (cont: CoalgebraRunner[F][R]) => cont(coalg)(x) given outFix𝛎: OutFix[𝛎] = [F[_]] => (_: Functor[F]) ?=> (𝛎F: 𝛎[F]) => 𝛎F([X] => (coalg: Coalgebra[F][X]) => coalg `andThen` fmap(unfold𝛎(coalg)))
Для использования возможностей неподвижных точек больше подойдёт следующий конструктор типов:
type FixedPoint[Fix[_[_]]] = ( fold: FoldFix[Fix], unfold: UnfoldFix[Fix], in: InFix[Fix], out: OutFix[Fix] ) given fixFromParts: [Fix[_[_]]: {FoldFix as fold, InFix as in, UnfoldFix as unfold, OutFix as out}] => FixedPoint[Fix] = ( fold = fold, unfold = unfold, in = in, out = out ) inline def foldFix[Fix[_[_]] : FixedPoint as fix] = fix.fold inline def unfoldFix[Fix[_[_]] : FixedPoint as fix] = fix.unfold inline def inFix[Fix[_[_]] : FixedPoint as fix] = fix.in inline def outFix[Fix[_[_]] : FixedPoint as fix] = fix.out
Например, тип списка и все основные методы работы с ним можно представить в виде неподвижной точки функтора

:
type OptCell = [A] =>> [L] =>> Option[(A, L)] given optCellFunctor: [X] => Functor[OptCell[X]] = [A, B] => (f: A => B) => _.map((x, a) => (x, f(a))) type μList[A] = μ[OptCell[A]] // вместо μ можно подставить любую неподвижную точку! def nil: μList[Nothing] = inFix[μ][OptCell[Nothing]](None) def cons[A](head: A, tail: μList[A]): μList[A] = inFix[μ][OptCell[A]](Some(head -> tail)) def foldList[A] = foldFix[μ][OptCell[A]]
Аналогичным образом описываются деревья произвольной формы. Все возможные неподвижные точки представляются деревьями различной формы. И чаще всего мы имеем дело с неподвижными точками полиномиальных функторов, также известных как алгебраические типы данных (ADT). У получаемых таким образом деревьев всего несколько ветвей в узле (список — вырожденный случай дерева). Для других же функторов неподвижные точки либо вообще не существуют, либо сложно найти им какое-то применение. Пересвёртка Пересвёртка неподвижной точки Среди всех возможностей неподвижных точек рекурсивными являются лишь функции unfoldFromInFix и foldFromOutFix. Независимо от того, какую пару функций (fold, in) или (unfold, out) мы выбираем для задания свойств неподвижной точки, в любом случае одна из функций дополняющей пары окажется рекурсивной. Именно она и будет отвечать за «пробегание по структуре данных». У любой неподвижной точки рекурсивной будет либо развёртка, либо свёртка. Но для широкого спектра задач оказывается важной только их комбинация, называемая «пересвёрткой» (refold) или гиломорфизмом (hylomorphism):
type Refold[Fix[_[_]]] = [F[_]] => Functor[F] ?=> [A, B] => (Coalgebra[F][A], Algebra[F][B]) => (A => B) def refoldFix[Fix[_[_]]: FixedPoint]: Refold[Fix] = [F[_]] => (_: Functor[F]) ?=> [A, B] => (coalg: Coalgebra[F][A], alg: Algebra[F][B]) => foldFix(alg) `compose` unfoldFix(coalg) // свёртка после развёртки
Эта функция будет рекурсивной для любого Fix. Если подставить (формально) вместо unfoldFix и foldFix определения unfoldFromInFix и foldFromOutFix, то пересвёртку можно сильно упростить:

Упоминание неподвижной точки вообще выпадает из определения пересвёртки!

Неподвижная точка не участвует в пересвёртке, но руководит ею с помощью функтора. А всю рекурсию можно спрятать в Y-комбинатор fix, определённый ранее:
def hylo[F[_]: Functor, A, B]( // «облегчённая» пересвёртка coalg: Coalgebra[F][A], alg: Algebra[F][B] ): A => B = fix(alg `compose` fmap(_) `compose` coalg)
На использовании этой мощной функции будут строиться представленные далее решения типовых задач динамического программирования. Защита от переполнения стека В приведённой выше реализации гиломорфизма рекурсивные вызовы «спрятаны» внутри fmap. Они полностью подчинены неизвестному функтору F, что не позволяет в общем случае развернуть рекурсию в линейный цикл. Впрочем, как было отмечено выше, структуру вызовов можно сохранять в контейнере TailRec. Нужно изменить шаг рекурсии, чтобы он возвращал значение в этом контейнере, что позволит воспользоваться «безопасным» Y-комбинатором fixTail. Но всплывает один нюанс — у нас теперь есть два контейнера F[_] и TailRec[_], и чтобы свести концы с концами, нам потребуется возможность «вытащить» F изнутри TailRec[_]. Нам нужен дистрибутивный закон для этих контейнеров:
type ∘[F[_], G[_]] = [X] =>> F[G[X]] // композиция контейнеров infix type ⇄[F[_], G[_]] = (F ∘ G) ~> (G ∘ F) // дистрибутивный закон type SwapOut[F[_]] = [G[_]] =>> F ⇄ G // классы типов type SwapIn [F[_]] = [G[_]] =>> G ⇄ F // мы используем этот inline def swap[F[_], G[_]](using sw: F ⇄ G) = sw
Экземпляр SwapIn[TailRec][F] подсказывает, как именно нужно «пробежать» по структуре F[_], чтобы пересобрать её уже внутри TailRec[_]. В результате «безопасный» гиломорфизм можно записать так:
// для удобства inline def runTailRec: TailRec ~> Id = [A] => _.result inline def fmapTailRec: Functor[TailRec] = [A, B] => (f: A => B) => _.map(f) def safeHylo[F[_]: {Functor, SwapIn[TailRec] as swap}, A, B]( coalg: Coalgebra[F][A], alg: Algebra[F][B] ): A => B = fixTail((self: A => TailRec[B]) => fmapTailRec(alg) `compose` // алгебру «поднимаем в мир TailRec» swap[B] `compose` // добавился этот шаг fmap(self) `compose` // также как и в hylo coalg // также как и в hylo ) `andThen` runTailRec[B] // в самом конце «распаковываем» TailRec[B]
Такая реализация действительно не боится переполнения стека! В библиотеках функционального программирования обычно используется не такой дистрибутивный закон ⇄, а специальные классы типов для пробрасывания целевого контейнера внутрь (Traverse[F[_]]) или наружу (Distributive[F[_]]) любого другого контейнера. Такое разделение базируется на фундаментальных закономерностях из теории категорий. Для наших целей вполне подошли бы экземпляры Traverse[F] или Distributive[TailRec] вместо узкоспециализированного SwapIn[TailRec][F]. Оказывается, что для контейнера TailRec невозможно реализовать честный экземпляр Distributive. Зато реализация Traverse для полиномиальных конструкторов типов доступна всегда. Scala-библиотека Kittens из экосистемы Cats даже предоставляет механизмы автоматического вывода экземпляров Traverse для таких F[_]. Таким образом, на практике безопасный гиломорфизм не накладывает на программистов обязательств по «ручной» реализации дистрибутивного закона F ∘ TailRec ~> TailRec ∘ F. Контейнер TailRec уже рассматривался в третьей части данного обзора, напомню лишь основные моменты. Этот контейнер является минималистичной реализацией паттерна «батут» (trampoline). По сути, он изоморфен свободной монаде, которая, в свою очередь, является неподвижной точкой конструктора типов:
type FreeBase[F[_]] = [A] =>> A `Either` F type FreeF[Fix[_[_]]] = [F[_]] =>> [A] =>> FixedPoint[Fix] ?=> Fix[FreeBase[F][A]] // FreeF[Fix][F][A] ≅ (Id + F + F∘F + F∘F∘F + …)[A] type Thunk[A] = () => A type Trampoline[Fix[_[_]]] = FreeF[Fix][Thunk] // Fix[[X] =>> Either[A, () => X]] // TailRec[A] ≅ Trampoline[Fix][A] для любой неподвижной точки Fix
Благодаря «свободной структуре» TailRec накапливает дерево всех своих преобразований, а Thunk[_] позволяет делать это «лениво». Но самая хитрая магия скрыта в методе .result (наш runTailRec). Дело в том, что в нём реализован механизм коплотности, характерный для большинства современных свободных монад. Он позволяет линеаризовать накопленную иерархию, чтобы вместо ресурсоемкого обхода дерева выполнялся один быстрый проход. В итоге мы получаем константное время на каждый шаг вычисления и полную защиту от переполнения стека. Наш safeHylo полностью закрывает потребность в стекобезопасной пересвёртке, однако он нечасто находит применение. Дело в том, что на практике мы обычно работаем с ограниченными объёмами данных. Во многих задачах, разворачивая даже бинарное дерево на тридцать уровней, на последнем мы можем получить больше миллиарда объектов, тогда как его рекурсивный обход задействует всего тридцать фреймов стека, что достаточно безопасно. В частном же случае линейной последовательности (вырожденное дерево) также не понадобится сохранять промежуточные данные в памяти, а рекурсию всегда несложно свести к прямому циклу. Давайте разберём этот случай подробнее. Пересвёртка последовательностей Когда конструктор типов F[_] фиксирован, мы всегда знаем, как именно будут осуществляться рекурсивные вызовы, и можем сделать их хвостовыми. Последовательности элементов A изоморфны неподвижной точке от OptCell[A][_]. Значит, для её рекурсивной пересвёртки подойдёт такая реализация:
def refoldRec[A, B, C]( coalgebra: Coalgebra[OptCell[A]][B], defaultC: C, // эти два параметра вместе accumulate: (C, A) => C // эквивалентны Algebra[OptCell[A]][C] ): B => C = (seedB: B) => @tailrec def loop(currentC: C, currentSeed: B): C = coalgebra(currentSeed) match case None => currentC case Some((a, nextSeed)) => loop(accumulate(currentC, a), nextSeed) loop(defaultC, seedB) end refoldRec
Впрочем, для линейных итераций необязательно тянуть в свою кодовую базу подобный «велосипед». В стандартной библиотеке Scala уже есть подходящее решение — Iterator. С его помощью пересвёртку последовательности можно реализовать следующим образом:
def refoldIter[A, B, C]( coalgebra: Coalgebra[OptCell[A]][B], defaultC: C, accum: (C, A) => C ): B => C = (seedB: B) => Iterator .unfold(seedB)(coalgebra) .foldLeft(defaultC)(accum)
Метод unfold создаёт экземпляр типа Iterator[A]. Он генерирует новое состояние посредством coalgebra лишь в тот момент, когда его запрашивает свёртка foldLeft, которая сразу же потребляет это состояние, не сохраняя его в памяти. В итоге мы практически «из коробки» имеем оптимизированную стекобезопасную пересвёртку последовательности. Метод foldLeft работает «до конца», что не всегда бывает удобно. Зачастую бывает полезно найти лишь нужный элемент или профильтровать. Для этих целей у Iterator предусмотрены методы filter, find и takeWhile. Но «ручная» работа с Iterator может быть опасна, так как это не честный функциональный контейнер, а изменяемый тип данных. Он как река — нельзя дважды «войти» в один и тот же итератор. Поэтому в целях защиты от возможных ошибок, полезно прятать его от разработчиков в безопасные методы наподобие refold. Например, механизм быстрого выхода (short circuit) можно унифицировать с приведённой выше функцией. При этом необходимо изменить сигнатуру аккумулятора: accum: (C, A) => Either[C, D]. Семантика такова: либо продолжаем вычисления со следующим C, либо успешно завершаем их с D. И эту алгебру достаточно «встроить» в коалгебру:
type Result[D, C] = D `Either` C // завершила либо алгебра с D, либо коалгебра с C def upgradeCoalgebra[A, B, C, D]( coalg: Coalgebra[OptCell[A]][B], alg: (C, A) => Either[C, D] // результат ↓ либо продолжение ): Coalgebra[OptCell[Result[D, C]]][Result[D, C] `Either` (B, C)] = def resultWithLeft[R](r: R) = r -> Left(r) // для удобства _.toOption.map { (b, c) => coalg(b) .fold(resultWithLeft(Right(c))) { (a, nextB) => alg(c, a) .fold( // из-за семантики алгебры левая ветка даёт правую и наоборот nextC => Right(nextC) -> Right((nextB, nextC)), // продолжение вычислений finalD => resultWithLeft(Left(finalD)) // остановка ) } } end upgradeCoalgebra
Тогда «короткая» пересвёртка будет такой:
def lastAccum[A, C](a: A, c: C) = c def shortRefold[A, B, C, D]( coalg: Coalgebra[OptCell[A]][B], defaultC: C, accum: (C, A) => Either[C, D] ): B => Either[D, C] = seedB => val superCoalg = upgradeCoalgebra(coalg, accum) val rightDefault: Either[D, C] = Right(defaultC) refoldIter(superCoalg, rightDefault, lastAccum)(Right((seedB, defaultC)))
Пример использования короткой пересврйтки
// даёт пердыдущее чилсло, большее 0 val natCoalg: Coalgebra[OptCell[Int]][Int] = n => Option.when(n > 0)(n, n - 1) // если нашли ответ, возвращаем строку, иначе - следующее число val shortAaccum(goal: Int): (Int, Int) => Either[Int, String] = (counter, x) => Either.cond(x == goal, s"нашли на $counter", counter + 1) shortRefold(natCoalg, 0, shortAaccum(goal = 999999958))(1000000000) // Left(нашли на 42) shortRefold(natCoalg, 0, shortAaccum(goal = -1))(1000000000) // Right(1000000000)
В первом случае будет всего сорок две итерации, тогда как во втором — миллиард.
Если нужно просто развернуть последовательность, которая будет использоваться более одного раза, то вместо итератора удобнее взять коллекцию LazyList. Её метод unfold работает аналогично Iterator.unfold, но после первой пробежки все элементы будут кэшированы, и при повторном использовании они не будут вычисляться заново. Аналогичным образом можно реализовать стратегию раннего выхода и для пересвёрток более сложных структур данных. Необходимым условием для этого является наличие терминального слагаемого в F[_]. Например, неподвижная точка F[X] = Option[(A, X, X)] порождает бинарное дерево элементов A и для его пересвёртки также доступен ранний выход. Решение задач Динамическое программирование

Если бы мы знали, что такое это ваше Динамическое Программирование... Страшно! Общепринятого определения динамического программирования не существует. Это просто очередная маркетинговая этикетка, под которой удобно продавать различные медиа-материалы (вроде данной публикации)). Например, в Википедии приводится такая банальная формулировка: «это способ решения сложных задач путём разбиения их на более простые подзадачи». Но такое определение описывает общий метод решения абсолютно любых задач (декомпозиция, «разделяй и властвуй» и т.п.). Здесь же сошлюсь на идею, почерпнутую в хабр-статье О динамическом программировании на пальцах. Там говорится о «рассмотрении всех возможных путей решения, и выбора лучших среди них». В терминах теории типов получаем такое обобщение:
Развёртка рекурсивной структуры промежуточных данных с последующей её свёрткой с поиском/накоплением нужного результата.
Практически каждый алгоритм под этикеткой «Динамического программирования» сводится к пересвёртке неподвижной точки полиномиального функтора. Продемонстрируем это на примере трёх типовых задач. Скалярное произведение сжатых векторов Первую задачу решим показательно «многословно», последовательно переходя от «наивной» реализации к пересвёртке. Условие задачи
Даны 2 вектора целых чисел одинаковой длины, заданные в сжатой форме списками пар вида (value, count). Например, вектор [4, 4, 5] задается как [(4, 2), (5, 1)]. Необходимо посчитать скалярное произведение заданных векторов. Сжатие сохраняет исходный порядок элементов.
Сигнатура функции:
def dotProduct(vec1: List[(Int, Int)], vec2: List[(Int, Int)]): Long
Сценарии для тестирования:
- (List.empty[(Int, Int)], List.empty[(Int, Int)]) -> 0L,
- (List((4, 2), (5, 1)), List((2, 1), (1, 2))) -> 17L,
- (List((1, 999999)), List((1, 999999))) -> 999999L.
Наивное решение Давайте просто сперва развернём сжатые вектора в исходные и вычислим обычное скалярное произведение:
val getOrigView: List[(Int, Int)] => View[Int] = // Представление исходного вектора _.view.flatMap((i, n) => View.fill(n)(i)) def simpleDotProduct(v1: Iterable[Int], v2: Iterable[Int]) = (v1 `lazyZip` v2).map(_.toLong * _).sum def dotProduct(vec1: List[(Int, Int)], vec2: List[(Int, Int)]): Long = simpleDotProduct(getOrigView(vec1), getOrigView(vec2))
Решение очень простое, прямо-таки тривиальное. Память алгоритма константна за счёт использования View — это псевдоколлекция, которая не только лениво накапливает применяемые к ней преобразования, но и применяет их все поэлементно, не сохраняя в памяти промежуточные значения и результат. Оценка производительности стабильно пропорциональна длине несжатых векторов. Однако используемое в задаче сжатие вектора позволяет существенно оптимизировать вычисление скалярного произведения в случае, когда у векторов повторяются значения подряд идущих координат. Прогон такого алгоритма на втором тестовом сценарии со сжатыми векторами миллионной размерности демонстрирует на современных компьютерах задержку, заметную глазу, хотя в этом случае результат вычисляется в уме, причём гораздо быстрее. Рекурсия Нужно переписать алгоритм так, чтобы при расчёте использовались преимущества сжатых векторов. Воспользуемся функциональной интерпретацией метода двух указателей (two pointers): будем одновременно рассматривать головы обоих списков и сдвигать только тот указатель, чей текущий сжатый блок закончился. Сделаем это с помощью рекурсии:
def dotProduct(vec1: List[(Int, Int)], vec2: List[(Int, Int)]): Long = (vec1.headOption `zip` vec2.headOption) .fold(0L) { case ((i1, n1), (i2, n2)) => val ii = i1.toLong * i2 // важно не получить переполнение уже здесь val nn = n2 - n1 if nn > 0 then ii * n1 + dotProduct(vec1.tail, (i2, nn) :: vec2.tail) else ii * n2 + dotProduct((i1, -nn) :: vec1.tail, vec2.tail) }
Если какой-то из списков пустой? то это не страшно — значит, либо другой тоже пустой, либо там будет единственная пара с нулевым количеством повторений, и в этом случае возвращаем 0. Иначе разбираем первые пары из обоих векторов: оптимизированно вычисляем промежуточную свёртку для первых n1 min n2 измерений, и остаётся лишь добавить свёртку оставшихся хвостов с учётом недосчитанного остатка от нашей оптимизации. Тестовые сценарии второй алгоритм обработает успешно, причём заметно быстрее первого варианта. Однако, ввиду передачи параметров через стек, данный алгоритм потребляет память, пропорциональную длинам сжатых векторов, и если в исходном векторе миллионной размерности вообще не будет повторяющихся последовательных значений, то произойдёт переполнение стека… Хвостовая рекурсия Проблема обозначена, давайте решим её, переписав рекурсию в хвостовой форме, благо компилятор Scala умеет её оптимизировать. Для этого, в частности, вручную разберём Option с «головами» векторов, применим ранний выход, а также нам придётся добавить в список аргументов аккумулятор результата:
@tailrec def dotProductRec(accum: Long, vec1: List[(Int, Int)], vec2: List[(Int, Int)]): Long = val heads = vec1.headOption `zip` vec2.headOption if heads.isEmpty then return accum // да, ранний выход val ((i1, n1), (i2, n2)) = heads.get val nextVec1 = if n1 > n2 then (i1, n1 - n2) :: vec1.tail else vec1.tail val nextVec2 = if n2 > n1 then (i2, n2 - n1) :: vec2.tail else vec2.tail dotProductRec(accum + (n1 min n2).toLong * i1 * i2, nextVec1, nextVec2) end dotProductRec def dotProduct(vec1: List[(Int, Int)], vec2: List[(Int, Int)]): Long = dotProductRec(0L, vec1, vec2)
Вот такой реализации уже не страшны векторы больших размерностей, и потребляемая память не будет зависеть от длины векторов! Пересвёртка Из всех возможных реализаций рекурсии наиболее безопасной оказывается именно хвостовая. Чтобы вовсе с ней не заморачиваться, лучше сразу реализовывать алгоритм в терминах пересвёртки неподвижной точки конструктора типов. Посмотрим на одну отдельную итерацию в dotProductRec:
- в одном из вариантов останавливаем вычисления,
- в другом варианте вычисления продолжаются с изменившимся состоянием,
- результатом каждого шага вычислений будет accum: Long. Отсюда следует, что искомый конструктор типов имеет знакомый вид OptCell.
На каждом шаге используется состояние такого вида:
type State = (accum: Long, vec1: List[(Int, Int)], vec2: List[(Int, Int)])
Эти состояния вычисляются посредством коалгебры OptCell:
val coalgebra: Coalgebra[OptCell[Long]][State] = case (accum, vec1, vec2) => (vec1.headOption `zip` vec2.headOption) .map { case ((i1, n1), (i2, n2)) => val nextAccum = accum + (n1 min n2).toLong * i1 * i2 val nextVec1 = if n1 > n2 then (i1, n1 - n2) :: vec1.tail else vec1.tail val nextVec2 = if n2 > n1 then (i2, n2 - n1) :: vec2.tail else vec2.tail nextAccum -> (nextAccum, nextVec1, nextVec2) }
Нас интересует только значение аккумулятора у самого последнего значения, либо ноль, если векторы пустые. Значит, в качестве сворачивающей алгебры выступит пара (0L, lastAccum). Разобравшись с (ко)алгебрами, можно записать нерекурсивное стекобезопасное решение:
def dotProduct(vec1: List[(Int, Int)], vec2: List[(Int, Int)]): Long = refoldIter(coalgebra, 0L, lastAccum)(0L, vec1, vec2)
В итоге получаем те же метрики, что и у предыдущего алгоритма, но здесь вся рекурсия спрятана в пересвёртку. Количество заправок
Есть список городов, заданный целочисленными координатами. Все города соединены дорогами только по вертикали и горизонтали (никаких диагоналей!). Автозаправочные станции есть только в городах. Одна единица топлива тратится на преодоление расстояния между соседними координатами (например, на путь (5, 3) -> (6, 2) понадобится уже две единицы). Нужно определить минимальное количество дозаправок (включая стартовую), необходимых для поездки между двумя заданными городами, если известна также ёмкость бензобака автомобиля. Если на таком авто невозможно достичь пункта назначения, то вернуть -1.
Термины предметной области
type Town = Int // номер города (начиная с 0) type Dist = Int // дистанция type Fuel = Dist // количество топлива = дистанция type Coord = (x: Dist, y: Dist) // координаты города type Route = (from: Town, to: Town) // маршрут def dist(t1: Coord, t2: Coord): Dist = // расстояние между любыми городами (t2.x - t1.x).abs + (t2.y - t1.y).abs
Сигнатура искомой функции:
def refuelingsCount( towns: Vector[Coord], // список городов tank: Fuel, // объём бензобака goalRoute: Route // пункты отправления и назначения ): Int // кол-во дозаправок или -1
Решать задачу будем поиском в ширину (Breadth-First Search, BFS). Сперва вычислим города, достижимые из стартового, а затем на каждом шаге будем искать непосещённые города, достижимые из найденных на предыдущем шаге. Как только встречаем целевой город, подсчитываем количество шагов — это и будет искомое количество дозаправок. Мы как бы отправляем «волну поиска» из стартового города и по принципу Гюйгенса каждый город фронта волны порождает собственные волны, образующие следующий фронт. Волна будет распространяться не обязательно радиально — в зависимости от конфигурации городов итоговый маршрут может иметь любую форму, даже спиральную. В итоге «двумерная» задача сводится к линейной итерации фронтов волны поиска. Но в этих итерациях нам понадобятся дополнительные стандартные структуры данных — индексированная коллекция координат городов Vector[Coord] из начального условия, а также множества непосещённых городов и фронта волны Set[Town]. По сути, они являются оптимизированными функциями Town => Coord и Town => Boolean соответственно (хотя нужно ещё уметь «пробегать по фронту»). Состояние, меняющееся от итерации к итерации — это множества непосещённых городов и фронта волны:
type State = (unvisited: Set[Town], front: Set[Town])
Коалгебра, порождающая последовательность фронтов, будет иметь сигнатуру Coalgebra[OptCell[Set[Town]]][State]. Если фронт пустой — прекращаем вычисления. Иначе для нового фронта вычисляем города, доступные из текущего. Свёртку последовательности полезно останавливать, как только во фронте будет обнаружен целевой город. Значит, нам нужна пересвёртка shortRefold, допускающая «ранний» выход без обязательного анализа всех городов. В итоге получается такая реализация:
def calcRefuelings(towns: Vector[Coord], tank: Fuel, goalRoute: Route): Int = // вспомогательная функция для удобства def canReachWithFullTank(t1: Town, t2: Town) = dist(towns(t1), towns(t2)) <= tank val coalgebra: Coalgebra[OptCell[Set[Town]]][State] = state => state.front.headOption.map { _ => // продолжаем, только если фронт не пустой val newFront = state.unvisited // вычисляем следующий фронт .filter(nextTown => state.front.exists( // добавляем город, если он canReachWithFullTank(_, nextTown) // достижим с текущего фронта )) newFront -> ((state.unvisited -- newFront) -> newFront) } val shortAccum: (Int, Set[Town]) => Either[Int, Int] = (counter, front) => Either.cond(front.contains(goalRoute.to), // если во фронте есть цель counter, // то вернуть текущий счётчик counter + 1 // иначе продолжить дальше ) val unvisitedTowns = Set.range(0, towns.length) val startFront = Set(goalRoute.from) // «излучаем» из начала val countedOrNot = shortRefold(coalgebra, 1, shortAccum)(unvisitedTowns, startFront) countedOrNot.swap.getOrElse(-1) end calcRefuelings
И не видно никаких рекурсий! Задача о ферзях
Нужно расставить n ферзей (разных цветов)) на шахматной доске n×n так, чтобы каждый чувствовал себя в безопасности — не оказался атакованным остальными ферзями.
Термины предметной области:
type File = Int // номер вертикали type Rank = Int // номер горизонтали (не понадобится) type Setup = Vector[File] // расстановка - номера вертикалей ферзей для каждой горизонтали
В векторе Setup индекс — это номер горизонтали, а значение — номер вертикали. Сигнатура искомой функции:
def queensSetup(n: Int): List[Setup]
Идея решения задачи простая — это классический поиск с возвратом (backtracking). В нашей терминологии он превращается в развёртку дерева всех корректных расстановок ферзей. Для этого нужно развернуть дерево всех корректных расстановок ферзей и собрать его «листья», со всеми n ферзями. Разворачивать будем так:
- для уже расставленных на предыдущих горизонталях ферзей проверяем следующую горизонталь,
- проверяем установку нового ферзя на каждой вертикали;
- только если получилась корректная расстановка, продолжаем развёртывание уже для неё.
Нам потребуется функция проверки корректности добавления ферзя на указанную вертикаль следующей горизонтали:
def isNewQueenUnderAttack(filledRanks: Setup, newRankFile: File): Boolean = filledRanks.zipWithIndex.exists((rankFile, rank) => val d = filledRanks.length - rank Array( // не очень эффективно, зато идиоматично newRankFile, newRankFile + d, newRankFile - d ).contains(rankFile) )
Также нам понадобится дерево, а точнее, описание только одного его узла:
type TreeBase[A] = [T] =>> Option[(current: A, branches: List[T])] // Rose Tree //type Tree[A] = 𝛎[TreeBase[A]] given treeBaseFunctor: [X] => Functor[TreeBase[X]] = [A, B] => (f: A => B) => _.map((a, lst) => a -> lst.map(f))
При развёртывании дерева мы оставляем в каждом узле только корректные дочерние расстановки:
val coalg: Coalgebra[TreeBase[Setup]][Setup] = (ranks: Setup) => Option.when(ranks.length <= n)( ranks -> Iterator.range(0, n) .filterNot(isNewQueenUnderAttack(ranks, _)) .map(ranks.appended) .toList )
Затем нужно свернуть дерево, выбирая лишь «листья», у которых расстановка содержит ровно n строк. В данной алгебре применяется маленький трюк — конкатенация ++ позволяет одной строкой обработать две ситуации: искомую расстановку n×n и продолжение поиска.
val alg: Algebra[TreeBase[Setup]][List[Setup]] = _.toList.flatMap(node => List(node.current).filter(_.length == n) ++ node.branches.flatten )
Подразумевается, что уровень вложенности рекурсии, определяемый n, будет небольшим, поэтому для пересвёртки достаточно использовать функцию hylo, использующую стек. В итоге получается следующее решение задачи:
def queensSetup(n: Int): List[Setup] = val coalg: Coalgebra[TreeBase[Setup]][Setup] = ... val alg: Algebra[TreeBase[Setup]][List[Setup]] = ... hylo(coalg, alg)(Vector.empty) // начинаем с пустой расстановки end queensSetup
В редких задачах могут встречаться несбалансированные деревья огромной высоты. Тогда либо придётся воспользоваться функцией safeHylo, либо попробовать другой путь. Обход рекурсивной структуры так или иначе сводится к последовательным итерациям, между которыми зачастую приходится передавать сложные структуры данных. В safeHylo это достигается посредством SwapIn («пробегание» по функтору F), а деревья итерируемых состояний строятся с помощью TailRec. Но того же результата всегда можно добиться и вручную. Чтобы линейно пробежать рекурсивную структуру данных, нужно уметь передавать фокус с текущего узла на следующий непосещённый. В пятой части обзора говорилось, что тип фокуса вычисляется как производная от типа рекурсивных структур. Одним из элементов такой производной является «список ветвлений», пройденных от корня до выбранного узла. Этот список позволяет «сделать шаг назад» и выбрать новый путь.
«Линейное» решение задачи о ферзях
В задаче о ферзях не обязательно хранить информацию о ветвлениях по той причине, что вся информация и так содержится в значении Setup каждого узла. Тогда «линейное» решение задачи можно записать так:
def queensSetupIter(n: Int): List[Setup] = type State = (List[Setup], Setup) val checkedSetup: Coalgebra[Option][Setup] = rankFiles => Some(rankFiles).filterNot(_ .init.zipWithInddef queensSetupIter(n: Int): List[Setup] = type State = (List[Setup], Setup) val checkedSetup: Coalgebra[Option][Setup] = rankFiles => Some(rankFiles).filterNot(_ .init.zipWithIndex.exists((rankFile, rank) => val d = rankFiles.length - rank - 1 Array( // не очень эффективно, зато идиоматично rankFiles.last, rankFiles.last + d, rankFiles.last - d ).contains(rankFile) ) ) def nextPrevFile(setup: Setup) = setup.lastOption.map(file => setup.init :+ (file + 1)) def coalg: Coalgebra[OptCell[List[Setup]]][State] = (res, setup) => // промежуточный результат и текущая расстановка if setup.last == n then // вышли за границы поля nextPrevFile(setup.init).map(stp => res -> (res -> stp)) else { val correctSetupOpt = checkedSetup(setup) // валидация расстановки val newRes = // новый результат if setup.length == n then correctSetupOpt.toList ++ res else res val nextSetup = // следующая расстановка if setup.length < n && correctSetupOpt.nonEmpty then Some(setup :+ 0) else nextPrevFile(setup) nextSetup.map(stp => newRes -> (newRes -> stp)) } refoldIter(coalg, Nil, lastAccum)(Nil, Vector(0)) end queensSetupIter ex.exists((rankFile, rank) => val d = rankFiles.length - rank - 1 Array( // не очень эффективно, зато идиоматично rankFiles.last, rankFiles.last + d, rankFiles.last - d ).contains(rankFile) ) ) def nextPrevFile(setup: Setup) = setup.lastOption.map(file => setup.init :+ (file + 1)) def coalg: Coalgebra[OptCell[List[Setup]]][State] = (res, setup) => // промежуточный результат и текущая расстановка if setup.last == n then // вышли за границы поля nextPrevFile(setup.init).map(stp => res -> (res -> stp)) else { val correctSetupOpt = checkedSetup(setup) // валидация расстановки val newRes = // новый результат if setup.length == n then correctSetupOpt.toList ++ res else res val nextSetup = // следующая расстановка if setup.length < n && correctSetupOpt.nonEmpty then Some(setup :+ 0) else nextPrevFile(setup) nextSetup.map(stp => newRes -> (newRes -> stp)) } refoldIter(coalg, Nil, lastAccum)(Nil, Vector(0)) end queensSetupIter
Выводы Пересвёртка неподвижных точек конструкторов типов является методом решения задач динамического программирования (что бы ни имелось в виду)). Его преимущества заключаются в том, что все итерации оказываются спрятанными внутрь стандартных функций пересвёртки. Это избавляет программистов от ручной реализации рекурсии и циклов, защищая код от самой возможности появления многих ошибок. Для решения задачи остаётся только:
- сформулировать понятия предметной области,
- определиться со структурой ветвления (функтор F[_]),
- описать шаг развёртки Coalgebra[F],
- описать шаг свёртки Algebra[F],
- указать граничные значения (seedB для начала развёртки и defaultC как часть алгебры),
- и выбрать наиболее подходящую функцию пересвёртки с учётом конкретного F и ожидаемой глубины рекурсии.
Из наших пересвёрток выпало явное упоминание неподвижной точки. Хотя само это понятие остаётся ключевым, здесь мы уже не зависим от его конкретной реализации. Впрочем, самостоятельные рекурсивные типы (тот же List) по-прежнему необходимы как структуры данных — их можно хранить в памяти для повторного использования или сериализовать для передачи в другой процесс. С более высокого уровня абстракций проблемы выглядят мельче))-Источник
|