Как Akka Streams могут облегчить жизнь

Допустим, у вас есть процессор, процессор изображений, и этот процессор изображений имеет несколько этапов обработки. И каждый этап изменял или обрабатывал изображение по-своему. И эта обработка изображений была потрясающей, и вы хотите поделиться ею со всем миром. Таким образом, вы создаете код сервера, который загружает файл и выполняет эту обработку изображения. Возможно, каждый этап обработки был Flow[BufferedImage]. Предположим, для аргументов, которые занимали около секунды на этап и выглядели примерно так:

Затем вы соединили бы все этапы процесса вместе, что, вероятно, будет выглядеть примерно как Flow[BufferedImage, ImageProcessed]:

Затем вы создали метод, который запускает этот поток как Stream:

В реальном мире это не будет Sink.ignore, поскольку мы, вероятно, сохраним обработанное изображение в корзине S3 или другом хранилище.

Используя Akka Http, вы легко можете определить Route для обработки загрузки файла:

Теперь наши друзья могут пользоваться нашим сервисом изображений! Но что, если бы мы хотели уведомить пользователя о ходе нашей обработки?

Из документации Akka Http мы знаем, что обрабатываем WebSockets с помощью Flow[Message], как показывает их пример:

Но как мы можем общаться с этим потоком WebSocket извне? 🤔

Ответ - preMaterialize Source, а затем построить Flow из Sink и этого предварительно материализованного Source ... Итак, давайте сделаем это:

Теперь все, что мы отправляем на wsActor, будет отправлено обратно нашему веб-клиенту JavaScript! Наш новый Route теперь содержит обработчик WebSocket:

И мы обновляем этапы обработки, чтобы уведомить нашего wsActor:

Теперь, когда мы загружаем изображение, мы получаем хороший список, который сообщает нам, через какие этапы обработки проходит наше изображение 😎

Весь код (включая интерфейс!) Здесь с открытым исходным кодом. 🐾