-
Notifications
You must be signed in to change notification settings - Fork 140
[Z Doc] Http Proxy example and lib
You can proxy to another backend service.
For example, let's say that we have the following backend service.
package io.advantageous.qbit.example.proxy;
import io.advantageous.qbit.http.server.HttpServer;
import io.advantageous.qbit.http.server.HttpServerBuilder;
public class HttpServerMain {
public static void main(String... args) throws Exception {
final HttpServerBuilder httpServerBuilder = HttpServerBuilder.httpServerBuilder().setPort(8080);
final HttpServer httpServer = httpServerBuilder.build();
httpServer.setHttpRequestConsumer(request -> {
request.getReceiver().response(200, request.getContentType(), request.body());
});
httpServer.startServer();
}
}
This basically will echo back whatever is sent. Simple and easy. It is our example service in this case.
Notice that it is registered to port 8080.
To create an HttpProxy, we just listen on port 9090 and then forward requests to port 8080 where our service is.
package io.advantageous.qbit.example.proxy;
import io.advantageous.qbit.http.server.HttpServerBuilder;
import io.advantageous.qbit.proxy.HttpProxy;
import io.advantageous.qbit.proxy.HttpProxyBuilder;
import io.advantageous.qbit.proxy.ProxyBuilder;
public class HttpProxyServerMain {
public static void main(String... args) throws Exception {
final HttpProxyBuilder httpProxyBuilder = HttpProxyBuilder.httpProxyBuilder();
final HttpServerBuilder httpServerBuilder = httpProxyBuilder.getHttpServerBuilder();
httpServerBuilder.setPort(9090);
final ProxyBuilder proxyBuilder = httpProxyBuilder.getProxyBuilder();
proxyBuilder.getHttpClientBuilder().setPort(8080);
final HttpProxy httpProxy = httpProxyBuilder.build();
httpProxy.start();
}
}
Most of the magic happens in the ProxyBuilder
and the ProxyService/ProxyServiceImpl
. The HttpProxy
is mainly just for example.
Now we just have to create a client (or use CURL) to drive this. We want to hit port 9090, which will then forward to port 8080 which is our echo service.
package io.advantageous.qbit.example.proxy;
import io.advantageous.qbit.http.client.HttpClient;
import io.advantageous.qbit.http.client.HttpClientBuilder;
import io.advantageous.qbit.http.request.HttpTextResponse;
import static io.advantageous.boon.core.IO.puts;
public class HttpClientMain {
public static void main(String... args) throws Exception{
final HttpClientBuilder httpClientBuilder = HttpClientBuilder.httpClientBuilder().setPort(9090);
final HttpClient httpClient = httpClientBuilder.buildAndStart();
for (int index =0; index < 100; index++) {
final HttpTextResponse httpTextResponse = httpClient.postJson("/foo", "\"bar\"");
puts(httpTextResponse);
}
}
}
####Output
HttpTextResponse(code:200contentType:application/json; charset="UTF-8"
body:
"bar"
)
package io.advantageous.qbit.proxy;
import io.advantageous.qbit.http.client.HttpClientBuilder;
import io.advantageous.qbit.http.request.HttpRequest;
import io.advantageous.qbit.http.request.HttpRequestBuilder;
import io.advantageous.qbit.reactive.Reactor;
import io.advantageous.qbit.reactive.ReactorBuilder;
import io.advantageous.qbit.service.ServiceBuilder;
import io.advantageous.qbit.time.Duration;
import io.advantageous.qbit.util.Timer;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Predicate;
/**
* Used to construct a proxy service to proxy call to a backend.
*/
public class ProxyBuilder {
/** Reactor used to manage the periodic jobs. */
private Reactor reactor;
/** Timer used to get the current time in a cost effective manner. */
private Timer timer;
/** HttpClientBuilder used to construct httpClients to talk to backend services. */
private HttpClientBuilder httpClientBuilder;
/**
* Used to intercept calls to do things like populate additional headers.
* This happens after the incoming request is copied into the HttpRequestBuilder.
*/
private Consumer<HttpRequestBuilder> beforeSend;
/**
* Used if you want to do additional error handling.
*/
private Consumer<Exception> errorHandler;
/**
* Used to determine if this request should be forwarded to the back end.
* By default there is a predicate that always returns true.
*/
private Predicate<HttpRequest> httpClientRequestPredicate;
/**
* How often we should check to see if the backend connection is healthy.
*
*/
private Duration checkClientDuration = Duration.MINUTES.units(10);
/**
* Used to construct a ping request to the backend.
* The ping request will be sent to backend every `checkClientDuration`.
*/
private HttpRequestBuilder pingBuilder;
/**
* Used to determine if we want to track timeouts to backend services.
*/
private boolean trackTimeOuts;
/**
* Sets the backend timeout. Requests that take longer than this are aborted.
*/
private Duration timeOutInterval = Duration.SECONDS.units(180);
/**
* Used to construct a proxy service to the ProxyServiceImpl
*/
private ServiceBuilder serviceBuilder;
public ServiceBuilder getServiceBuilder() {
if (serviceBuilder==null) {
serviceBuilder = ServiceBuilder.serviceBuilder();
}
return serviceBuilder;
}
public Reactor getReactor() {
if (reactor == null) {
reactor = ReactorBuilder.reactorBuilder().build();
}
return reactor;
}
public ProxyBuilder setReactor(Reactor reactor) {
this.reactor = reactor;
return this;
}
public Timer getTimer() {
if (timer == null) {
timer = Timer.timer();
}
return timer;
}
public ProxyBuilder setTimer(Timer timer) {
this.timer = timer;
return this;
}
public HttpClientBuilder getHttpClientBuilder() {
if (httpClientBuilder==null) {
httpClientBuilder = HttpClientBuilder.httpClientBuilder();
}
return httpClientBuilder;
}
public ProxyBuilder setHttpClientBuilder(HttpClientBuilder httpClientBuilder) {
this.httpClientBuilder = httpClientBuilder;
return this;
}
public Consumer<HttpRequestBuilder> getBeforeSend() {
if (beforeSend==null) {
beforeSend = httpRequestBuilder -> {
};
}
return beforeSend;
}
public ProxyBuilder setBeforeSend(Consumer<HttpRequestBuilder> beforeSend) {
this.beforeSend = beforeSend;
return this;
}
public Consumer<Exception> getErrorHandler() {
if (errorHandler==null) {
errorHandler = e -> {
};
}
return errorHandler;
}
public ProxyBuilder setErrorHandler(Consumer<Exception> errorHandler) {
this.errorHandler = errorHandler;
return this;
}
public Predicate<HttpRequest> getHttpClientRequestPredicate() {
if (httpClientRequestPredicate==null) {
httpClientRequestPredicate = request -> true;
}
return httpClientRequestPredicate;
}
public ProxyBuilder setHttpClientRequestPredicate(Predicate<HttpRequest> httpClientRequestPredicate) {
this.httpClientRequestPredicate = httpClientRequestPredicate;
return this;
}
public Duration getCheckClientDuration() {
return checkClientDuration;
}
public ProxyBuilder setCheckClientDuration(Duration checkClientDuration) {
this.checkClientDuration = checkClientDuration;
return this;
}
public HttpRequestBuilder getPingBuilder() {
return pingBuilder;
}
public ProxyBuilder setPingBuilder(HttpRequestBuilder pingBuilder) {
this.pingBuilder = pingBuilder;
return this;
}
public boolean isTrackTimeOuts() {
return trackTimeOuts;
}
public ProxyBuilder setTrackTimeOuts(boolean trackTimeOuts) {
this.trackTimeOuts = trackTimeOuts;
return this;
}
public Duration getTimeOutInterval() {
return timeOutInterval;
}
public ProxyBuilder setTimeOutInterval(Duration timeOutInterval) {
this.timeOutInterval = timeOutInterval;
return this;
}
/**
* Build the impl.
* @return returns an instance of the impl.
*/
public ProxyService build() {
return new ProxyServiceImpl(getReactor(), getTimer(), getHttpClientBuilder(), getBeforeSend(),
getErrorHandler(), getHttpClientRequestPredicate(), getCheckClientDuration(),
pingBuilder==null? Optional.<HttpRequestBuilder>empty() : Optional.of(pingBuilder),
isTrackTimeOuts(), getTimeOutInterval());
}
/**
* Builds a proxy queue service to the impl.
* @return proxy queue service interface to impl.
*/
public ProxyService buildProxy() {
return getServiceBuilder().setServiceObject(build()).buildAndStart().createProxyWithAutoFlush(ProxyService.class,
Duration.HUNDRED_MILLIS);
}
public static ProxyBuilder proxyBuilder() {
return new ProxyBuilder();
}
}
package io.advantageous.qbit.proxy;
import io.advantageous.qbit.http.request.HttpRequest;
/**
* ProxyService interface to proxy services to the backend.
*/
public interface ProxyService {
void handleRequest(HttpRequest request);
}
package io.advantageous.qbit.proxy;
import io.advantageous.qbit.annotation.QueueCallback;
import io.advantageous.qbit.annotation.QueueCallbackType;
import io.advantageous.qbit.http.client.HttpClient;
import io.advantageous.qbit.http.client.HttpClientBuilder;
import io.advantageous.qbit.http.request.HttpBinaryReceiver;
import io.advantageous.qbit.http.request.HttpRequest;
import io.advantageous.qbit.http.request.HttpRequestBuilder;
import io.advantageous.qbit.reactive.Reactor;
import io.advantageous.qbit.time.Duration;
import io.advantageous.qbit.util.MultiMap;
import io.advantageous.qbit.util.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Predicate;
/**
* Used to proxy HTTP calls to a backend.
*/
public class ProxyServiceImpl implements ProxyService {
/** Reactor used to manage the periodic jobs. */
private final Reactor reactor;
/** Timer used to get the current time in a cost effective manner. */
private final Timer timer;
/** HttpClientBuilder used to construct httpClients to talk to backend services. */
private final HttpClientBuilder httpClientBuilder;
/**
* Used to construct a ping request to the backend if present.
* The ping request will be sent to backend every `checkClientDuration`.
*/
private final Optional<HttpRequestBuilder> pingBuilder;
/**
* Sets the backend timeout. Requests that take longer than this are aborted.
*/
private final long timeOutIntervalMS;
/** Logging. */
private final Logger logger = LoggerFactory.getLogger(ProxyServiceImpl.class);
/**
* Used to intercept calls to do things like populate additional headers.
* This happens after the incoming request is copied into the HttpRequestBuilder.
*/
private final Consumer<HttpRequestBuilder> beforeSend;
/**
* Used if you want to do additional error handling.
*/
private final Consumer<Exception> errorHandler;
/**
* Used to determine if this request should be forwarded to the back end.
* By default there is a predicate that always returns true.
*/
private final Predicate<HttpRequest> httpClientRequestPredicate;
/** Keep track of errors. */
private final AtomicInteger errorCount = new AtomicInteger();
/** Keep track of pings that were received. */
private final AtomicInteger pingCount = new AtomicInteger();
/**
* Used to determine if we want to track timeouts to backend services.
*/
private final boolean trackTimeOuts;
/**
* Used to forward requests to a backend service.
*/
private HttpClient backendServiceHttpClient;
/**
* Keeps the current time.
*/
private long time;
/** Keeps a list of outstanding requests if timeout tracking is turned on. */
private final List<HttpRequestHolder> httpRequestHolderList;
/**
* Holds request information.
*/
private class HttpRequestHolder {
final HttpRequest request;
final long startTime;
private HttpRequestHolder(HttpRequest request, long startTime) {
this.request = request;
this.startTime = startTime;
}
}
/**
* Construct.
* @param reactor reactor
* @param timer timer
* @param httpClientBuilder client builder to build client to backend.
* @param beforeSend used if you want to populate the request builder before request is sent to the backend
* @param errorHandler used to pass a custom error handler
* @param httpClientRequestPredicate httpClientRequestPredicate is used to see if this request should be forwarded to the backend.
* @param checkClientDuration checkClientDuration periodic check health of backend.
* @param pingBuilder if present used to build a ping request to backend to check client connectivity.
* @param trackTimeOuts if true track timeouts.
* @param timeOutInterval if tracking timeouts, what is considered a timeout.
*/
public ProxyServiceImpl(final Reactor reactor,
final Timer timer,
final HttpClientBuilder httpClientBuilder,
final Consumer<HttpRequestBuilder> beforeSend,
final Consumer<Exception> errorHandler,
final Predicate<HttpRequest> httpClientRequestPredicate,
final Duration checkClientDuration,
final Optional<HttpRequestBuilder> pingBuilder,
final boolean trackTimeOuts,
final Duration timeOutInterval) {
this.reactor = reactor;
this.timer = timer;
this.httpClientBuilder = httpClientBuilder;
this.backendServiceHttpClient = this.httpClientBuilder.buildAndStart();
this.beforeSend = beforeSend;
this.errorHandler = errorHandler;
this.httpClientRequestPredicate = httpClientRequestPredicate;
this.trackTimeOuts = trackTimeOuts;
this.reactor.addRepeatingTask(checkClientDuration, this::checkClient);
this.pingBuilder = pingBuilder;
/* If we are tracking timeouts than setup a repeating job to track timeouts. */
if (trackTimeOuts) {
this.httpRequestHolderList = new ArrayList<>();
this.timeOutIntervalMS = timeOutInterval.toMillis();
this.reactor.addRepeatingTask(this.timeOutIntervalMS/2, TimeUnit.MILLISECONDS, this::trackTimeouts);
} else {
this.httpRequestHolderList = null;
this.timeOutIntervalMS = -1;
}
}
/** Trackes timeouts periodically if timeout tracking is enabled. */
private void trackTimeouts() {
new ArrayList<>(httpRequestHolderList).forEach(httpRequestHolder -> {
long duration = time - httpRequestHolder.startTime;
if (duration > timeOutIntervalMS) {
httpRequestHolder.request.handled();
httpRequestHolder.request.getReceiver().timeoutWithMessage(String.format("\"TIMEOUT %s %s %s\"",
httpRequestHolder.request.address(),
httpRequestHolder.request.getRemoteAddress(),
httpRequestHolder.startTime
));
httpRequestHolderList.remove(httpRequestHolder); //Not very fast if you a lot of outstanding requests
}
});
}
/** Checks client health periodically to see if we are connected. Tries to reconnect if not connected. */
private void checkClient() {
/** If the errorCount is greater than 0, make sure we are still connected. */
if (errorCount.get() > 0) {
if (backendServiceHttpClient.isClosed()) {
backendServiceHttpClient = httpClientBuilder.buildAndStart();
}
}
/** If the ping builder is present, use it to ping the service. */
if (pingBuilder.isPresent()) {
pingBuilder.get().setBinaryReceiver((code, contentType, body) -> {
if (code >=200 && code < 299) {
pingCount.incrementAndGet();
}else {
errorCount.incrementAndGet();
}
}).setErrorHandler(e -> {
logger.error("Error doing ping operation", e);
errorCount.incrementAndGet();
});
}
}
/** Request coming from the client side.
*
* @param clientRequest clientRequest
*/
@Override
public void handleRequest(final HttpRequest clientRequest) {
if (trackTimeOuts) {
httpRequestHolderList.add(new HttpRequestHolder(clientRequest, time));
}
if (httpClientRequestPredicate.test(clientRequest)) {
createBackEndRequestPopulateAndForward(clientRequest);
}
}
/**
* Creates a backend request from the client request and then forwards it.
* @param clientRequest clientRequest
*/
private void createBackEndRequestPopulateAndForward(final HttpRequest clientRequest) {
try {
/* forward request to backend client. */
final HttpRequestBuilder httpRequestBuilder = HttpRequestBuilder.httpRequestBuilder()
.copyRequest(clientRequest).setBinaryReceiver(new HttpBinaryReceiver() {
@Override
public void response(final int code,
final String contentType,
final byte[] body,
final MultiMap<String, String> headers) {
handleBackendClientResponses(clientRequest, code, contentType, body, headers);
}
@Override
public void response(int code, String contentType, byte[] body) {
response(code, contentType, body, MultiMap.empty());
}
}).setErrorHandler(e -> {
handleHttpClientErrorsForBackend(clientRequest, e);
});
/** Give user of the lib a chance to populate headers and such. */
beforeSend.accept(httpRequestBuilder);
backendServiceHttpClient.sendHttpRequest(httpRequestBuilder.build());
}catch (Exception ex) {
errorCount.incrementAndGet();
logger.error("Unable to forward request", ex);
}
}
/**
* Handle errors.
* @param clientRequest clientRequest
* @param e exception
*/
private void handleHttpClientErrorsForBackend(final HttpRequest clientRequest, final Exception e) {
/* Notify error handler that we got an error. */
errorHandler.accept(e);
/* Increment our error count. */
errorCount.incrementAndGet();
/* Create the error message. */
final String errorMessage = String.format("Unable to make request %s %s %s",
clientRequest.address(),
clientRequest.body(), e.getMessage());
/* Log the error. */
logger.error(errorMessage, e);
if (!clientRequest.isHandled()) {
clientRequest.handled();
/* Notify the client that there was an error. */
clientRequest.getReceiver().error(String.format("\"%s\"", errorMessage));
}
}
/**
* Handle a response from the backend service
* @param clientRequest clientRequest (original client request)
* @param code response code from the backend.
* @param contentType contentType from the backend.
* @param body body from the backend.
* @param headers headers from the backend.
*/
private void handleBackendClientResponses(final HttpRequest clientRequest,
final int code,
final String contentType,
final byte[] body,
final MultiMap<String, String> headers) {
if (!clientRequest.isHandled()) {
clientRequest.handled();
clientRequest.getReceiver().response(code, contentType, body, headers);
}
}
/** Manage periodic jobs. */
@QueueCallback({QueueCallbackType.EMPTY,
QueueCallbackType.IDLE,
QueueCallbackType.LIMIT})
public void process() {
reactor.process();
time = timer.time();
}
}
package io.advantageous.qbit.proxy;
import io.advantageous.qbit.http.server.HttpServerBuilder;
public class HttpProxyBuilder {
private HttpServerBuilder httpServerBuilder;
private ProxyBuilder proxyBuilder;
public HttpServerBuilder getHttpServerBuilder() {
if (httpServerBuilder==null) {
httpServerBuilder = HttpServerBuilder.httpServerBuilder();
}
return httpServerBuilder;
}
public HttpProxyBuilder setHttpServerBuilder(HttpServerBuilder httpServerBuilder) {
this.httpServerBuilder = httpServerBuilder;
return this;
}
public ProxyBuilder getProxyBuilder() {
if (proxyBuilder == null ) {
proxyBuilder = ProxyBuilder.proxyBuilder();
}
return proxyBuilder;
}
public HttpProxyBuilder setProxyBuilder(ProxyBuilder proxyBuilder) {
this.proxyBuilder = proxyBuilder;
return this;
}
public HttpProxy build() {
return new HttpProxy(getHttpServerBuilder().build(), getProxyBuilder().buildProxy());
}
public static HttpProxyBuilder httpProxyBuilder() {
return new HttpProxyBuilder();
}
}
package io.advantageous.qbit.proxy;
import io.advantageous.qbit.http.server.HttpServer;
import io.advantageous.qbit.service.ServiceProxyUtils;
import io.advantageous.qbit.service.Startable;
import io.advantageous.qbit.service.Stoppable;
/** Marries a ProxyService to an HTTP endpoint and is a good example of how to use a ProxyService. */
public class HttpProxy implements Startable, Stoppable{
/** Http Server we are using this to listen to requests. */
private final HttpServer server;
/** Proxy service used to forward requests to a backend. */
private final ProxyService proxyService;
/** Constructor a new HttpProxy. **/
public HttpProxy(final HttpServer server, ProxyService proxyService) {
this.server = server;
this.proxyService = proxyService;
}
/** Start this. */
public void start() {
server.setHttpRequestConsumer(httpRequest -> {
proxyService.handleRequest(httpRequest);
ServiceProxyUtils.flushServiceProxy(proxyService);
});
server.startServer();
}
/** Stop this. */
@Override
public void stop() {
server.stop();
}
}
QBit Website What is Microservices Architecture?
QBit Java Micorservices lib tutorials
The Java microservice lib. QBit is a reactive programming lib for building microservices - JSON, HTTP, WebSocket, and REST. QBit uses reactive programming to build elastic REST, and WebSockets based cloud friendly, web services. SOA evolved for mobile and cloud. ServiceDiscovery, Health, reactive StatService, events, Java idiomatic reactive programming for Microservices.
Reactive Programming, Java Microservices, Rick Hightower
Java Microservices Architecture
[Microservice Service Discovery with Consul] (http://www.mammatustech.com/Microservice-Service-Discovery-with-Consul)
Microservices Service Discovery Tutorial with Consul
[Reactive Microservices] (http://www.mammatustech.com/reactive-microservices)
[High Speed Microservices] (http://www.mammatustech.com/high-speed-microservices)
Reactive Microservices Tutorial, using the Reactor
QBit is mentioned in the Restlet blog
All code is written using JetBrains Idea - the best IDE ever!
Kafka training, Kafka consulting, Cassandra training, Cassandra consulting, Spark training, Spark consulting
Tutorials
- QBit tutorials
- Microservices Intro
- Microservice KPI Monitoring
- Microservice Batteries Included
- RESTful APIs
- QBit and Reakt Promises
- Resourceful REST
- Microservices Reactor
- Working with JSON maps and lists
__
Docs
Getting Started
- First REST Microservice
- REST Microservice Part 2
- ServiceQueue
- ServiceBundle
- ServiceEndpointServer
- REST with URI Params
- Simple Single Page App
Basics
- What is QBit?
- Detailed Overview of QBit
- High level overview
- Low-level HTTP and WebSocket
- Low level WebSocket
- HttpClient
- HTTP Request filter
- HTTP Proxy
- Queues and flushing
- Local Proxies
- ServiceQueue remote and local
- ManagedServiceBuilder, consul, StatsD, Swagger support
- Working with Service Pools
- Callback Builders
- Error Handling
- Health System
- Stats System
- Reactor callback coordination
- Early Service Examples
Concepts
REST
Callbacks and Reactor
Event Bus
Advanced
Integration
- Using QBit in Vert.x
- Reactor-Integrating with Cassandra
- Using QBit with Spring Boot
- SolrJ and service pools
- Swagger support
- MDC Support
- Reactive Streams
- Mesos, Docker, Heroku
- DNS SRV
QBit case studies
QBit 2 Roadmap
-- Related Projects
- QBit Reactive Microservices
- Reakt Reactive Java
- Reakt Guava Bridge
- QBit Extensions
- Reactive Microservices
Kafka training, Kafka consulting, Cassandra training, Cassandra consulting, Spark training, Spark consulting