+38(044) 277-40-42
+38(063) 233-01-83
+38(095) 628-11-32
Войти
+38(044) 277-42-05
+38(063) 233-01-83
+38(095) 628-11-32

Многопоточность в Java 8. Часть 2 (Пишем асинхронный код с CompletableFuture)

Вы, наверное, уже слышали о Future
         Future представляет собой отложенный результат асинхронных вычислений. Он предоставляет метод – get, который возвращает результат вычисления, когда они завершены.
         Проблема заключается в том, что вызов этого метода блокируется до тех пор, пока вычисление не будут сделаны до конца. Это весьма неудобно и может быстро сделать асинхронные вычисления бессмысленными
         Конечно – вы можете создавать новые асинхронные задачи и передавать их executor-у, чтобы он выполнял ее в новом потоке. Но почему вы должны беспокоиться обо всем этом, когда должны думать только о логике своей программы.
 
Это где CompletableFuture вступает в игру
         Помимо реализации интерфейса Future, CompletableFuture также реализует интерфейс CompletionStage.
         CompletionStage это аналог Promise в JavaScript. Он обещает, что вычисление в конечном итоге будет сделано.
         Самое замечательное то, что CompletionStage предлагает большой набор методов, которые позволяют прикреплять колбеки, которые будут выполняться по завершении асинхронной задачи.
         Таким образом, мы можем строить системы в неблокирующем стиле, например как сервер на Node.js.
 
 
Простейшие асинхронное вычисление
         Давайте начнем с самого элементарного  – создадим простую асинхронную задачу – отправление сообщения.

CompletableFuture.supplyAsync(this::sendMsg); 

 
supplyAsync принимает функцию Supplier, содержащий код, который мы хотим выполнить асинхронно - в нашем случае метод sendMsg.
         Если вы работали немного с Future-ми в прошлом, вы можете задаться вопросом, где же Executor? Если вы хотите, вы можете передать его в качестве второго аргумента метода supplyAsync. Тем не менее, если вы его не укажите, то по умолчанию будет использован  ForkJoinPool.commonPool ().
 
Добавление колбека
         Наша первая асинхронная задача выполнена. Давайте добавим колбек к ней!
         Красота колбека в том, что мы можем сказать, что должно произойти, когда асинхронное вычисление выполняется без ожидания результата.
         В первом примере, мы просто послал сообщение в асинхронном режиме, выполнив sendMsg в отдельном потоке.
         Теперь давайте добавим функцию колбек, где мы сообщаем о том, как прошла отправка сообщения.

CompletableFuture.supplyAsync(this::sendMsg) 
                 .thenAccept(this::notify);

 
thenAccept является одним из многих способов добавить колбек функцию. Он принимает функцию Consumer – в нашем случае метод notify, который обрабатывает результат предыдущего вычисления, после его завершения.
 
 
Последовательный вызов нескольких колбеков
         Если вы хотите продолжить передачу значений от одного колбека к другому, thenAccept не позволит этого сделать, поскольку функция Consumer не возвращает ничего.
         Чтобы сохранить передачу значений, вы можете просто использовать thenApply вместо thenAccept .
         thenApply принимает функцию, которая принимает значение, но и возвращать что-то в ответ.
         Чтобы увидеть, как это работает, давайте расширим наш предыдущий пример, сначала найдя получателя сообщения.
 

    CompletableFuture.supplyAsync(this::findReceiver)
                     .thenApply(this::sendMsg)
                     .thenAccept(this::notify);

        
         Теперь асинхронная задача будет сначала заключаться в нахождении получателя, затем в отправлении ему сообщение, после чего будет вызван колбек notify с результатом, полученным от отправки сообщения .
 
 
Построение асинхронной программы
         При построении больших асинхронных программ, все работает немного по-другому.
         Обычно мы хотим, чтобы создание новых частей кода было основано на более мелких кусочках. Каждый из этих кусочков, как правило, должен быть асинхронным – в нашем случае возвращать CompletionStage.
         До сих пор, sendMsg было нормальной блокирующей функцией. Давайте теперь предположим, что мы получили метод sendMsgAsync, который возвращает CompletionStage.
         Если мы продолжим  использовать thenApply, мы бы столкнулись с вложенными CompletionStage.

 
CompletableFuture.supplyAsync(this::findReceiver) 
                 .thenApply(this::sendMsgAsync);
 
// Возвращаемый тип CompletionStage<CompletionStage<String>>

 
         Мы не хотим этого, так что вместо этого мы можем использовать thenCompose, который позволяет сделать функцию, которая возвращает CompletionStage. Работа этого метода похожа на flatMap.

 
CompletableFuture.supplyAsync(this::findReceiver) 
                 .thenCompose(this::sendMsgAsync);
 
// Возвращаемый тип CompletionStage<String>

 
         Таким образом, мы можем продолжать создавать новые функции добавлением новых колбеков без потери одно слоистого CompletionStage.
Что делать, если код падает с ошибкой
         Как вы знаете, плохие вещи могут случиться. И если вы работали с Future раньше, вы знаете, как сложно бывает обрабатывать исключительные ситуации.
         К счастью, в CompletableFuture есть удобный способ обработки ошибок, используя метод exceptionally.

CompletableFuture.supplyAsync(this:: sendMsgWithFail) 
                 .exceptionally(ex -> new MsgFail())
                 .thenAccept(this::notify);

 
exceptionally дает нам возможность восстановиться, принимая альтернативную функцию, которая будет выполняться, если предшествующий расчет терпит неудачу с исключением.
         Таким образом, последующие колбеки смогут продолжить выполнение с этим альтернативным результатом в качестве входных данных.
 
Колбек зависит от нескольких асинхронных задач
         Иногда это было бы очень полезно, чтобы иметь возможность создать функцию колбек, которая зависит от результата двух вычислений. Для этого нужно использовать thenCombine.
thenCombine позволяет зарегистрировать колбек типа BiFunction в соответствии от результата двух CompletionStage.
         Чтобы увидеть, как это делается, давайте в дополнение к поиску получателя также выполнить тяжелую работу по созданию некоторого содержимого сообщения перед его отправкой.

 
CompletableFuture<String> receiver = 
    CompletableFuture.supplyAsync(this::findReceiver);
 
CompletableFuture<String> content = 
    CompletableFuture.supplyAsync(this::createContent);
 
receiver.thenCombine(content, this::sendMsg); 

 
         Сначала мы запустили две асинхронных задачи – нахождение получателя и создания некоторого содержимого. Затем мы используем thenCombine, чтобы сказать, что мы хотим сделать с результатом этих двух вычислений путем определения нашей би-функции.
         Стоит отметить, что существует еще один вариант thenCombine, который называется runAfterBoth. Эта версия принимает Runnable не заботясь о фактических значениях предыдущих вычислений, методу важно только то, что эти вычисления закончены и можно выполнить колбек функцию.
 
Колбек зависит от одной или другой асинхронной задачи
         Итак, сейчас мы покрыли сценарий, в котором ми зависели от двух вычислений. Теперь, давайте предположим, что нам нужен просто результат одного из них?
         Скажем, у вас есть два источника нахождения получателя. Вы будете опрашивать двоих, но будете рады, если получите ответ хотя бы от одного.
 
 
 

CompletableFuture<String> source1 = 
    CompletableFuture.supplyAsync(this::findFirstSource);
 
CompletableFuture<String> source2 = 
    CompletableFuture.supplyAsync(this::findSecondSource);
 
source1.acceptEither(source2, this::sendMsg);

 
         Как вы можете видеть, это легко решается с помощью метода acceptEither, который принимает два асинхронных вычисления и функцию, которая будет выполняться с результатом первого вернувшегося.
 
Заключение
         Эта статья охватывает лишь основы того, что CompletableFuture  может предложить. Так что не забудьте посмотреть документацию для получения более подробной информации.