Skip to content

Commit

Permalink
Add WebSocket support in plugins (#5662)
Browse files Browse the repository at this point in the history
#### What type of PR is this?

/kind feature
/area core
/area plugin

#### What this PR does / why we need it:

This PR allows plugin developers defining WebSocket endpoints in plugins.

#### Which issue(s) this PR fixes:

Fixes #5285 

#### Does this PR introduce a user-facing change?

```release-note
支持在插件中实现 WebSocket
```
  • Loading branch information
JohnNiang authored Apr 25, 2024
1 parent 924aad1 commit a635881
Show file tree
Hide file tree
Showing 11 changed files with 458 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package run.halo.app.core.endpoint;

import org.springframework.web.reactive.socket.WebSocketHandler;
import run.halo.app.extension.GroupVersion;

/**
* Endpoint for WebSocket.
*
* @author johnniang
*/
public interface WebSocketEndpoint {

/**
* Path of the URL after group version.
*
* @return path of the URL.
*/
String urlPath();

/**
* Group and version parts of the endpoint.
*
* @return GroupVersion.
*/
GroupVersion groupVersion();

/**
* Real WebSocket handler for the endpoint.
*
* @return WebSocket handler.
*/
WebSocketHandler handler();

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import reactor.core.publisher.Mono;
import run.halo.app.console.ProxyFilter;
import run.halo.app.console.WebSocketRequestPredicate;
import run.halo.app.core.endpoint.WebSocketHandlerMapping;
import run.halo.app.core.extension.endpoint.CustomEndpoint;
import run.halo.app.core.extension.endpoint.CustomEndpointsBuilder;
import run.halo.app.infra.properties.HaloProperties;
Expand Down Expand Up @@ -100,6 +101,13 @@ RouterFunction<ServerResponse> customEndpoints(ApplicationContext context) {
return builder.build();
}

@Bean
public WebSocketHandlerMapping webSocketHandlerMapping() {
WebSocketHandlerMapping handlerMapping = new WebSocketHandlerMapping();
handlerMapping.setOrder(-2);
return handlerMapping;
}

@Bean
RouterFunction<ServerResponse> consoleIndexRedirection() {
var consolePredicate = method(HttpMethod.GET)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package run.halo.app.console;

import java.util.Objects;
import org.springframework.http.HttpHeaders;

public enum WebSocketUtils {
Expand All @@ -8,8 +9,11 @@ public enum WebSocketUtils {
public static boolean isWebSocketUpgrade(HttpHeaders headers) {
// See io.netty.handler.codec.http.websocketx.extensions.WebSocketExtensionUtil
// .isWebsocketUpgrade for more.
var upgradeConnection = headers.getConnection().stream().map(String::toLowerCase)
.anyMatch(conn -> Objects.equals(conn, "upgrade"));

return headers.containsKey(HttpHeaders.UPGRADE)
&& headers.getConnection().contains(HttpHeaders.UPGRADE)
&& upgradeConnection
&& "websocket".equalsIgnoreCase(headers.getUpgrade());
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package run.halo.app.core.endpoint;

import java.util.Collection;

/**
* Interface for managing WebSocket endpoints, including registering and unregistering.
*
* @author johnniang
*/
public interface WebSocketEndpointManager {

void register(Collection<WebSocketEndpoint> endpoints);

void unregister(Collection<WebSocketEndpoint> endpoints);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package run.halo.app.core.endpoint;

import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.http.HttpMethod;
import org.springframework.http.server.reactive.observation.ServerRequestObservationContext;
import org.springframework.lang.NonNull;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.web.reactive.handler.AbstractHandlerMapping;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.util.pattern.PathPattern;
import reactor.core.publisher.Mono;
import run.halo.app.console.WebSocketUtils;

public class WebSocketHandlerMapping extends AbstractHandlerMapping
implements WebSocketEndpointManager, InitializingBean {

private final BiMap<PathPattern, WebSocketEndpoint> endpointMap;

private final ReadWriteLock rwLock;

public WebSocketHandlerMapping() {
this.endpointMap = HashBiMap.create();
this.rwLock = new ReentrantReadWriteLock();
}

@Override
@NonNull
public Mono<WebSocketHandler> getHandlerInternal(ServerWebExchange exchange) {
var request = exchange.getRequest();
if (!HttpMethod.GET.equals(request.getMethod())
|| !WebSocketUtils.isWebSocketUpgrade(request.getHeaders())) {
// skip getting handler if the request is not a WebSocket.
return Mono.empty();
}

var lock = rwLock.readLock();
lock.lock();
try {
// Refer to org.springframework.web.reactive.handler.AbstractUrlHandlerMapping
// .lookupHandler
var pathContainer = request.getPath().pathWithinApplication();
List<PathPattern> matches = null;
for (var pattern : this.endpointMap.keySet()) {
if (pattern.matches(pathContainer)) {
if (matches == null) {
matches = new ArrayList<>();
}
matches.add(pattern);
}
}
if (matches == null) {
return Mono.empty();
}

if (matches.size() > 1) {
matches.sort(PathPattern.SPECIFICITY_COMPARATOR);
}

var pattern = matches.get(0);
exchange.getAttributes().put(BEST_MATCHING_PATTERN_ATTRIBUTE, pattern);

var handler = endpointMap.get(pattern).handler();
exchange.getAttributes().put(BEST_MATCHING_HANDLER_ATTRIBUTE, handler);

ServerRequestObservationContext.findCurrent(exchange.getAttributes())
.ifPresent(context -> context.setPathPattern(pattern.toString()));

var pathWithinMapping = pattern.extractPathWithinPattern(pathContainer);
exchange.getAttributes().put(PATH_WITHIN_HANDLER_MAPPING_ATTRIBUTE, pathWithinMapping);

var matchInfo = pattern.matchAndExtract(pathContainer);
Assert.notNull(matchInfo, "Expect a match");
exchange.getAttributes()
.put(URI_TEMPLATE_VARIABLES_ATTRIBUTE, matchInfo.getUriVariables());
return Mono.just(handler);
} catch (Exception e) {
return Mono.error(e);
} finally {
lock.unlock();
}
}

@Override
public void register(Collection<WebSocketEndpoint> endpoints) {
if (CollectionUtils.isEmpty(endpoints)) {
return;
}
var lock = rwLock.writeLock();
lock.lock();
try {
endpoints.forEach(endpoint -> {
var urlPath = endpoint.urlPath();
urlPath = StringUtils.prependIfMissing(urlPath, "/");
var groupVersion = endpoint.groupVersion();
var parser = getPathPatternParser();
var pattern = parser.parse("/apis/" + groupVersion + urlPath);
endpointMap.put(pattern, endpoint);
});
} finally {
lock.unlock();
}
}

@Override
public void unregister(Collection<WebSocketEndpoint> endpoints) {
if (CollectionUtils.isEmpty(endpoints)) {
return;
}
var lock = rwLock.writeLock();
lock.lock();
try {
BiMap<WebSocketEndpoint, PathPattern> inverseMap = endpointMap.inverse();
endpoints.forEach(inverseMap::remove);
} finally {
lock.unlock();
}
}

@Override
public void afterPropertiesSet() {
var endpoints = obtainApplicationContext().getBeanProvider(WebSocketEndpoint.class)
.orderedStream()
.toList();
register(endpoints);
}

BiMap<PathPattern, WebSocketEndpoint> getEndpointMap() {
return endpointMap;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.Exceptions;
import run.halo.app.core.endpoint.WebSocketEndpoint;
import run.halo.app.core.endpoint.WebSocketEndpointManager;
import run.halo.app.extension.ReactiveExtensionClient;
import run.halo.app.infra.properties.HaloProperties;
import run.halo.app.plugin.event.HaloPluginBeforeStopEvent;
Expand Down Expand Up @@ -125,6 +127,10 @@ public ApplicationContext create(String pluginId) {
beanFactory.registerSingleton("finderManager", finderManager);
});

rootContext.getBeanProvider(WebSocketEndpointManager.class)
.ifUnique(manager -> beanFactory.registerSingleton("pluginWebSocketEndpointManager",
new PluginWebSocketEndpointManager(manager)));

rootContext.getBeanProvider(PluginRouterFunctionRegistry.class)
.ifUnique(registry -> {
var pluginRouterFunctionManager = new PluginRouterFunctionManager(registry);
Expand Down Expand Up @@ -219,6 +225,31 @@ public void onApplicationEvent(ContextRefreshedEvent event) {

}

private static class PluginWebSocketEndpointManager {

private final WebSocketEndpointManager manager;

private List<WebSocketEndpoint> endpoints;

private PluginWebSocketEndpointManager(WebSocketEndpointManager manager) {
this.manager = manager;
}

@EventListener
public void onApplicationEvent(ContextRefreshedEvent event) {
var context = event.getApplicationContext();
this.endpoints = context.getBeanProvider(WebSocketEndpoint.class)
.orderedStream()
.toList();
manager.register(this.endpoints);
}

@EventListener
public void onApplicationEvent(ContextClosedEvent ignored) {
manager.unregister(this.endpoints);
}
}

private static class PluginRouterFunctionManager {

private final PluginRouterFunctionRegistry routerFunctionRegistry;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import org.apache.commons.lang3.StringUtils;
import org.springframework.http.server.PathContainer;
import org.springframework.http.server.reactive.ServerHttpRequest;
import run.halo.app.console.WebSocketUtils;

/**
* Creates {@link RequestInfo} from {@link ServerHttpRequest}.
Expand Down Expand Up @@ -215,6 +216,10 @@ public RequestInfo newRequestInfo(ServerHttpRequest request) {
requestInfo.verb = "deletecollection";
}
}
if ("list".equals(requestInfo.verb)
&& WebSocketUtils.isWebSocketUpgrade(request.getHeaders())) {
requestInfo.verb = "watch";
}
return requestInfo;
}

Expand Down
Loading

0 comments on commit a635881

Please sign in to comment.