-
Notifications
You must be signed in to change notification settings - Fork 233
/
AkkaHttpMicroservice.scala
134 lines (116 loc) · 5.78 KB
/
AkkaHttpMicroservice.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
import akka.actor.ActorSystem
import akka.event.{Logging, LoggingAdapter}
import akka.http.scaladsl.Http
import akka.http.scaladsl.client.RequestBuilding
import akka.http.scaladsl.marshalling.ToResponseMarshallable
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.http.scaladsl.model.StatusCodes._
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.stream.scaladsl.{Flow, Sink, Source}
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import de.heikoseeberger.akkahttpcirce.ErrorAccumulatingCirceSupport
import io.circe.Decoder.Result
import io.circe.{Decoder, Encoder, HCursor, Json}
import java.io.IOException
import scala.concurrent.{ExecutionContext, Future}
import scala.math._
enum IpApiResponseStatus {
case Success, Fail
}
case class IpApiResponse(status: IpApiResponseStatus, message: Option[String], query: String, country: Option[String], city: Option[String], lat: Option[Double], lon: Option[Double])
case class IpInfo(query: String, country: Option[String], city: Option[String], lat: Option[Double], lon: Option[Double])
case class IpPairSummaryRequest(ip1: String, ip2: String)
case class IpPairSummary(distance: Option[Double], ip1Info: IpInfo, ip2Info: IpInfo)
object IpPairSummary {
def apply(ip1Info: IpInfo, ip2Info: IpInfo): IpPairSummary = IpPairSummary(calculateDistance(ip1Info, ip2Info), ip1Info, ip2Info)
private def calculateDistance(ip1Info: IpInfo, ip2Info: IpInfo): Option[Double] = {
(ip1Info.lat, ip1Info.lon, ip2Info.lat, ip2Info.lon) match {
case (Some(lat1), Some(lon1), Some(lat2), Some(lon2)) =>
// see http://www.movable-type.co.uk/scripts/latlong.html
val φ1 = toRadians(lat1)
val φ2 = toRadians(lat2)
val Δφ = toRadians(lat2 - lat1)
val Δλ = toRadians(lon2 - lon1)
val a = pow(sin(Δφ / 2), 2) + cos(φ1) * cos(φ2) * pow(sin(Δλ / 2), 2)
val c = 2 * atan2(sqrt(a), sqrt(1 - a))
Option(EarthRadius * c)
case _ => None
}
}
private val EarthRadius = 6371.0
}
trait Protocols extends ErrorAccumulatingCirceSupport {
import io.circe.generic.semiauto._
implicit val ipApiResponseStatusDecoder: Decoder[IpApiResponseStatus] = Decoder.decodeString.map(s => IpApiResponseStatus.valueOf(s.capitalize))
implicit val ipApiResponseDecoder: Decoder[IpApiResponse] = deriveDecoder
implicit val ipInfoDecoder: Decoder[IpInfo] = deriveDecoder
implicit val ipInfoEncoder: Encoder[IpInfo] = deriveEncoder
implicit val ipPairSummaryRequestDecoder: Decoder[IpPairSummaryRequest] = deriveDecoder
implicit val ipPairSummaryRequestEncoder: Encoder[IpPairSummaryRequest] = deriveEncoder
implicit val ipPairSummaryEncoder: Encoder[IpPairSummary] = deriveEncoder
implicit val ipPairSummaryDecoder: Decoder[IpPairSummary] = deriveDecoder
}
trait Service extends Protocols {
implicit val system: ActorSystem
implicit def executor: ExecutionContext
def config: Config
val logger: LoggingAdapter
lazy val ipApiConnectionFlow: Flow[HttpRequest, HttpResponse, Any] =
Http().outgoingConnection(config.getString("services.ip-api.host"), config.getInt("services.ip-api.port"))
// Please note that using `Source.single(request).via(pool).runWith(Sink.head)` is considered anti-pattern. It's here only for the simplicity.
// See why and how to improve it here: https://github.com/theiterators/akka-http-microservice/issues/32
def ipApiRequest(request: HttpRequest): Future[HttpResponse] = Source.single(request).via(ipApiConnectionFlow).runWith(Sink.head)
def fetchIpInfo(ip: String): Future[String | IpInfo] = {
ipApiRequest(RequestBuilding.Get(s"/json/$ip")).flatMap { response =>
response.status match {
case OK =>
Unmarshal(response.entity).to[IpApiResponse].map { ipApiResponse =>
ipApiResponse.status match {
case IpApiResponseStatus.Success => IpInfo(ipApiResponse.query,ipApiResponse.country, ipApiResponse.city, ipApiResponse.lat, ipApiResponse.lon)
case IpApiResponseStatus.Fail => s"""ip-api request failed with message: ${ipApiResponse.message.getOrElse("")}"""
}
}
case _ => Unmarshal(response.entity).to[String].flatMap { entity =>
val error = s"ip-api request failed with status code ${response.status} and entity $entity"
logger.error(error)
Future.failed(new IOException(error))
}
}
}
}
val routes: Route = {
logRequestResult("akka-http-microservice") {
pathPrefix("ip") {
(get & path(Segment)) { ip =>
complete {
fetchIpInfo(ip).map[ToResponseMarshallable] {
case ipInfo: IpInfo => ipInfo
case errorMessage: String => BadRequest -> errorMessage
}
}
} ~
(post & entity(as[IpPairSummaryRequest])) { ipPairSummaryRequest =>
complete {
val ip1InfoFuture = fetchIpInfo(ipPairSummaryRequest.ip1)
val ip2InfoFuture = fetchIpInfo(ipPairSummaryRequest.ip2)
ip1InfoFuture.zip(ip2InfoFuture).map[ToResponseMarshallable] {
case (info1: IpInfo, info2: IpInfo) => IpPairSummary(info1, info2)
case (errorMessage: String, _) => BadRequest -> errorMessage
case (_, errorMessage: String) => BadRequest -> errorMessage
}
}
}
}
}
}
}
object AkkaHttpMicroservice extends App with Service {
override implicit val system: ActorSystem = ActorSystem()
override implicit val executor: ExecutionContext = system.dispatcher
override val config = ConfigFactory.load()
override val logger = Logging(system, "AkkaHttpMicroservice")
Http().newServerAt(config.getString("http.interface"), config.getInt("http.port")).bindFlow(routes)
}