Как перенести логику на основе актеров в Akka Streams?

avatar
Alex Fruzenshtein
8 апреля 2018 в 11:12
134
1
0

Я уже некоторое время работаю с приложением Akka. 95% кода написано чистыми акторами. Теперь я собираюсь перенести некоторые части приложения в Akka Streams. Дайте мне представление о том, как следующая логика может выглядеть с точки зрения Akka Streams:

+------------+                                             
| CreateUser |                                             
+------------+                                             
      |                                                    
      |                                                    
+------------+     +-------------------+                   
| CheckEmail |-----|EmailIsAlreadyInUse|                   
+------------+     +-------------------+                   
      |                                                    
      |                                                    
+------------+     +-------------------+                   
|3rdPartyCall|-----|NoUserInInternalDB |                   
+------------+     +-------------------+                   
      |                                                    
      |                                                    
+------------+     +-------------------+                   
|  SaveUser  |-----|    UserDBError    |                   
+------------+     +-------------------+                   
      |                                                    
      |                                                    
+------------+                                             
| UserSaved  |                                             
+------------+   

В текущей реализации все блоки представляют собой сообщения, которые я отправляю соответствующему действующему лицу. Если поток сообщений проходит успешно, я отправляю обратно отправителю сообщение UserSaved. В противном случае я отправляю обратно отправителю одно из сообщений проверки: EmailIsAlreadyInUse или NoUserInInternalDB или UserDBError.

Вот набор сообщений:

case class CreateUser(email: String)
case class CheckEmailUniqueness(email: String)
case class ExternalServiceValidation(email: String)
case class SaveUser(email: String)

sealed trait CreateUserResult
sealed trait CreateUserError
case class UserCreated(email: String) extends CreateUserResult
case class EmailIsAlreadyInUse(email: String) extends CreateUserResult with CreateUserError
case class NoUserInExternalDB(email: String) extends CreateUserResult with CreateUserError
case class UserDBError(email: String) extends CreateUserResult with CreateUserError

Как мне перенести эту логику в Akka Streams?

Источник

Ответы (1)

avatar
Ramón J Romero y Vigil
9 апреля 2018 в 15:14
3

Структура сообщения

Поскольку данные akka-stream передаются в одном направлении, от источника к приемнику, функция "отправить обратно отправителю" отсутствует. Ваш единственный выбор — постоянно пересылать сообщение на следующий шаг.

Поэтому, я думаю, вам просто нужно добавить некоторую дополнительную структуру вокруг вашего сообщения. Для этого кажется полезной конструкция Either. Предположим, что ваш актер CreateUser имеет автономную функцию:

def createUserFunction(createUser : CreateUser) : UserCreated = ???

За этим может следовать функция CheckEmail:

val Set[String] existingEmails = ???

def checkEmailUniqueness(userCreated : UserCreated) : Either[CreateUserError, UserCreated] =
  if(existingEmails contains userCreated.email)
    Left(EmailIsAlreadyInUse(userCreated.email))
  else
    Right(createUser)

Аналогично, 3rdPartyCall также вернет Либо:

 def thirdPartyLibraryFunction(userCreated : UserCreated) : Boolean = ???

 def thirdPartyCall(userCreated : UserCreated) : Either[CreateUserError, UserCreated] = 
   if(!thirdPartyLibraryFunction(userCreated))
     Left(NoUserInExternalDB(userCreated.email))
   else
     Right(userCreated)

Акка Стрим Констракшн

С помощью этого структурированного обмена сообщениями вы теперь можете создать поток, который движется только в одном направлении. Сначала мы создаем Flow, который создает пользователя:

 val createUserFlow : Flow[CreateUser, UserCreated, _] = 
   Flow[CreateUser] map (createUserFunction)

Затем проверка электронной почты. Процесс:

 val emailFlow : Flow[UserCreated, Either[CreateUserError, UserCreated],_] = 
   Flow[UserCreated] map (checkEmailUniqueness)

А теперь создайте поток, который вызывает сторонние вызовы:

 val thirdPartyFlow : Flow[UserCreated, Either[CreateUserError, UserCreated],_] = 
   Flow[UserCreated] map (_ flatMap thirdPartyCall)

Эти потоки теперь могут формировать основу потока вместе с Source и Sink:

 val userSource : Source[CreateUser, _] = ???

 val userSink : Sink[Either[CreateUserError, UserCreated], _] = 
   Sink[Either[CreateUserError, UserCreated]] foreach {
     case Left(error) =>
       System.err.println("Error with user creation : " error.email)
     case Right(userCreated) =>
       System.out.println("User Created: " userCreated.email)
   }

 //create the full stream
 userSource
   .via(createUserFlow)
   .via(emailFlow)
   .via(thirdPartyFlow)
   .to(userSink)
   .run()
Alex Fruzenshtein
9 апреля 2018 в 16:13
0

Рамон спасибо за такое подробное объяснение! В целом идея ясна. У меня есть только одно несущественное замечание: сначала нужно CheckEmail затем сделать 3rdPartyCall и только после этого, если нет ошибок, сделать SaveUser :)