集群同步

This commit is contained in:
mxd 2021-12-30 22:58:16 +08:00
parent 052e43ffb1
commit 3438d122bd
16 changed files with 215 additions and 128 deletions

View File

@ -411,7 +411,7 @@ public class MagicAPIAutoConfiguration implements WebMvcConfigurer, WebSocketCon
@Bean
@ConditionalOnMissingBean
public MagicAPIService magicAPIService(ResultProvider resultProvider, MagicResourceService magicResourceService) {
return new DefaultMagicAPIService(resultProvider, properties.getClusterConfig().getInstanceId(), magicResourceService, properties.isThrowException());
return new DefaultMagicAPIService(resultProvider, properties.getClusterConfig().getInstanceId(), magicResourceService, properties.isThrowException(), applicationContext);
}
/**

View File

@ -19,6 +19,7 @@ import org.ssssssss.magicapi.model.MagicNotify;
import org.ssssssss.magicapi.modules.RedisModule;
import org.ssssssss.magicapi.provider.MagicAPIService;
import org.ssssssss.magicapi.provider.MagicNotifyService;
import org.ssssssss.magicapi.service.MagicSynchronizationService;
import org.ssssssss.magicapi.utils.JsonUtils;
import java.util.Objects;
@ -73,6 +74,16 @@ public class MagicRedisAutoConfiguration {
return magicNotify -> stringRedisTemplate.convertAndSend(properties.getClusterConfig().getChannel(), Objects.requireNonNull(JsonUtils.toJsonString(magicNotify)));
}
/**
* 消息处理服务
*/
@Bean
@ConditionalOnMissingBean
@ConditionalOnProperty(prefix = "magic-api", name = "cluster-config.enable", havingValue = "true")
public MagicSynchronizationService magicSynchronizationService(MagicNotifyService magicNotifyService) {
return new MagicSynchronizationService(magicNotifyService, properties.getClusterConfig().getInstanceId());
}
/**
* 集群通知监听
*/

View File

@ -3,7 +3,7 @@ package org.ssssssss.magicapi.config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.socket.TextMessage;
import org.ssssssss.magicapi.model.Constants;
import org.ssssssss.magicapi.event.EventAction;
import org.ssssssss.magicapi.model.MagicConsoleSession;
import org.ssssssss.magicapi.model.MagicNotify;
import org.ssssssss.magicapi.provider.MagicNotifyService;
@ -65,7 +65,7 @@ public class WebSocketSessionManager {
private static void sendToOther(String sessionId, String content) {
if (magicNotifyService != null) {
// 通知其他机器去发送消息
magicNotifyService.sendNotify(new MagicNotify(instanceId, Constants.NOTIFY_WS_S_C, sessionId, content));
magicNotifyService.sendNotify(new MagicNotify(instanceId, EventAction.WS_S_C, sessionId, content));
}
}

View File

@ -8,7 +8,7 @@ import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;
import org.ssssssss.magicapi.config.Message;
import org.ssssssss.magicapi.config.WebSocketSessionManager;
import org.ssssssss.magicapi.model.Constants;
import org.ssssssss.magicapi.event.EventAction;
import org.ssssssss.magicapi.model.MagicConsoleSession;
import org.ssssssss.magicapi.model.MagicNotify;
import org.ssssssss.magicapi.provider.MagicNotifyService;
@ -106,7 +106,7 @@ public class MagicWebSocketDispatcher extends TextWebSocketHandler {
Object returnValue = findHandleAndInvoke(consoleSession, message.getPayload());
// 如果未成功处理消息则通知其他机器去处理消息
if (Boolean.FALSE.equals(returnValue)) {
magicNotifyService.sendNotify(new MagicNotify(instanceId, Constants.NOTIFY_WS_C_S, consoleSession.getId(), message.getPayload()));
magicNotifyService.sendNotify(new MagicNotify(instanceId, EventAction.WS_C_S, consoleSession.getId(), message.getPayload()));
}
}
}

View File

@ -2,5 +2,5 @@ package org.ssssssss.magicapi.event;
public enum EventAction {
CREATE, SAVE, DELETE, MOVE, LOAD
CREATE, SAVE, DELETE, MOVE, LOAD, WS_C_S, WS_S_C, RELOAD
}

View File

@ -11,6 +11,11 @@ public class FileEvent extends MagicEvent {
this.entity = entity;
}
public FileEvent(String type, EventAction action, MagicEntity entity, String source) {
super(type, action, source);
this.entity = entity;
}
public MagicEntity getEntity() {
return entity;
}

View File

@ -8,20 +8,27 @@ import java.util.List;
public class GroupEvent extends MagicEvent {
/**
* 分组信息
*/
private final Group group;
private List<MagicEntity> entities = Collections.emptyList();
/**
* 子分组
*/
private List<MagicEntity> entities;
public GroupEvent(String type, EventAction action, Group group) {
super(type, action);
this.group = group;
this(type, action, group, Collections.emptyList());
}
public GroupEvent(String type, EventAction action, Group group, List<MagicEntity> entities) {
this(type, action, group);
super(type, action);
this.group = group;
this.entities = entities;
}
public Group getGroup() {
return group;
}

View File

@ -2,13 +2,29 @@ package org.ssssssss.magicapi.event;
public class MagicEvent {
private String type;
/**
* 消息类型
*/
private final String type;
private EventAction action;
/**
* 消息动作
*/
private final EventAction action;
public MagicEvent(String type, EventAction action) {
/**
* 消息来源
*/
private String source;
public MagicEvent(String type, EventAction action, String source) {
this.type = type;
this.action = action;
this.source = source;
}
public MagicEvent(String type, EventAction action) {
this(type, action, null);
}
public String getType() {
@ -18,4 +34,12 @@ public class MagicEvent {
public EventAction getAction() {
return action;
}
public String getSource() {
return source;
}
public void setSource(String source) {
this.source = source;
}
}

View File

@ -0,0 +1,18 @@
package org.ssssssss.magicapi.event;
import org.ssssssss.magicapi.model.Constants;
import org.ssssssss.magicapi.model.MagicNotify;
public class NotifyEvent extends MagicEvent {
private String id;
public NotifyEvent(MagicNotify notify) {
super(notify.getType(), notify.getAction(), Constants.EVENT_SOURCE_NOTIFY);
this.id = notify.getId();
}
public String getId() {
return id;
}
}

View File

@ -11,21 +11,6 @@ public class Constants {
public static final String CONST_STRING_TRUE = "true";
/**
* 分组类型: 接口
*/
public static final String GROUP_TYPE_API = "1";
/**
* 分组类型: 函数
*/
public static final String GROUP_TYPE_FUNCTION = "2";
/**
* 分组类型: WebSocket
*/
public static final String GROUP_TYPE_WEBSOCKET = "3";
/**
* 接口文件夹名
*/
@ -81,22 +66,11 @@ public class Constants {
*/
public static final String VAR_NAME_HEADER = "header";
/**
* 脚本中query的变量名
*/
public static final String VAR_NAME_QUERY = "query";
/**
* 脚本中RequestBody的变量名
*/
public static final String VAR_NAME_REQUEST_BODY = "body";
/**
* 脚本中RequestBody的变量值字段类型
*/
public static final String VAR_NAME_REQUEST_BODY_VALUE_TYPE_OBJECT = "object";
/**
* 脚本中RequestBody的变量名字段类型
*/
public static final String VAR_NAME_REQUEST_BODY_VALUE_TYPE_ARRAY = "array";
public static final String HEADER_REQUEST_SCRIPT_ID = "Magic-Request-Script-Id";
@ -118,6 +92,10 @@ public class Constants {
public static final String ROOT_ID = "0";
public static final String EVENT_TYPE_FILE = "file";
public static final String EVENT_SOURCE_NOTIFY = "notify";
/**
* 执行成功的code值
*/

View File

@ -1,5 +1,7 @@
package org.ssssssss.magicapi.model;
import org.ssssssss.magicapi.event.EventAction;
/**
* 消息通知对象
*
@ -13,19 +15,19 @@ public class MagicNotify {
private String from;
/**
* 对应的id如接口id函数id分组id数据源id
* 文件或文件夹id
*/
private String id;
/**
* 动作
*/
private int action = -1;
private EventAction action = null;
/**
* 操作对象如接口函数分组数据源
*/
private int type = -1;
private String type = null;
/**
* WebSocket sessionId
@ -41,17 +43,17 @@ public class MagicNotify {
}
public MagicNotify(String from) {
this(from, null, Constants.NOTIFY_ACTION_ALL, Constants.NOTIFY_ACTION_ALL);
this.from = from;
}
public MagicNotify(String from, int action, String sessionId, String content) {
public MagicNotify(String from, EventAction action, String sessionId, String content) {
this.from = from;
this.sessionId = sessionId;
this.action = action;
this.content = content;
}
public MagicNotify(String from, String id, int action, int type) {
public MagicNotify(String from, String id, EventAction action, String type) {
this.from = from;
this.id = id;
this.action = action;
@ -74,19 +76,19 @@ public class MagicNotify {
this.id = id;
}
public int getAction() {
public EventAction getAction() {
return action;
}
public void setAction(int action) {
public void setAction(EventAction action) {
this.action = action;
}
public int getType() {
public String getType() {
return type;
}
public void setType(int type) {
public void setType(String type) {
this.type = type;
}
@ -108,58 +110,13 @@ public class MagicNotify {
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("MagicNotify(from=");
builder.append(from);
builder.append(", action=");
switch (action) {
case Constants.NOTIFY_ACTION_ADD:
builder.append("新增");
break;
case Constants.NOTIFY_ACTION_UPDATE:
builder.append("修改");
break;
case Constants.NOTIFY_ACTION_DELETE:
builder.append("删除");
break;
case Constants.NOTIFY_ACTION_ALL:
builder.append("刷新全部");
break;
case Constants.NOTIFY_WS_C_S:
builder.append("通知客户端发来的消息");
builder.append(", sessionId=").append(sessionId);
builder.append(", content=").append(content);
break;
case Constants.NOTIFY_WS_S_C:
builder.append("通知服务端发送给客户端的消息");
builder.append(", sessionId=").append(sessionId);
builder.append(", content=").append(content);
break;
default:
builder.append("未知");
}
if (action != Constants.NOTIFY_ACTION_ALL && action < Constants.NOTIFY_WS_C_S) {
builder.append(", type=");
switch (type) {
case Constants.NOTIFY_ACTION_API:
builder.append("接口");
break;
case Constants.NOTIFY_ACTION_FUNCTION:
builder.append("函数");
break;
case Constants.NOTIFY_ACTION_DATASOURCE:
builder.append("数据源");
break;
case Constants.NOTIFY_ACTION_GROUP:
builder.append("分组");
break;
default:
builder.append("未知");
}
builder.append(", id=");
builder.append(id);
}
builder.append(")");
return builder.toString();
return "MagicNotify{" +
"from='" + from + '\'' +
", id='" + id + '\'' +
", action=" + action +
", type='" + type + '\'' +
", sessionId='" + sessionId + '\'' +
", content='" + content + '\'' +
'}';
}
}

View File

@ -2,6 +2,7 @@ package org.ssssssss.magicapi.provider.impl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.core.io.InputStreamResource;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
@ -22,9 +23,6 @@ import java.io.*;
import java.util.List;
import java.util.Map;
import static org.ssssssss.magicapi.model.Constants.NOTIFY_WS_C_S;
import static org.ssssssss.magicapi.model.Constants.NOTIFY_WS_S_C;
public class DefaultMagicAPIService implements MagicAPIService, JsonCodeConstants {
private final static Logger logger = LoggerFactory.getLogger(DefaultMagicAPIService.class);
@ -33,14 +31,18 @@ public class DefaultMagicAPIService implements MagicAPIService, JsonCodeConstant
private final String instanceId;
private final MagicResourceService resourceService;
private final ApplicationEventPublisher publisher;
public DefaultMagicAPIService(ResultProvider resultProvider,
String instanceId,
MagicResourceService resourceService,
boolean throwException) {
boolean throwException,
ApplicationEventPublisher publisher) {
this.resultProvider = resultProvider;
this.throwException = throwException;
this.resourceService = resourceService;
this.instanceId = instanceId;
this.publisher = publisher;
}
private <T> T execute(ApiInfo info, Map<String, Object> context) {
@ -252,23 +254,14 @@ public class DefaultMagicAPIService implements MagicAPIService, JsonCodeConstant
return false;
}
logger.info("收到通知消息:{}", magicNotify);
String id = magicNotify.getId();
int action = magicNotify.getAction();
// switch (magicNotify.getType()) {
// case NOTIFY_ACTION_API:
// return mappingRegistry.processNotify(id, action);
// case NOTIFY_ACTION_FUNCTION:
// return functionRegistry.processNotify(id, action);
// case NOTIFY_ACTION_WEBSOCKET:
// return webSocketRegistry.processNotify(id, action);
// }
switch (action) {
case NOTIFY_WS_C_S:
switch (magicNotify.getAction()) {
case WS_C_S:
return processWebSocketMessageReceived(magicNotify.getSessionId(), magicNotify.getContent());
case NOTIFY_WS_S_C:
case WS_S_C:
return processWebSocketSendMessage(magicNotify.getSessionId(), magicNotify.getContent());
}
return false;
resourceService.processNotify(magicNotify);
return true;
}
@Override

View File

@ -1,10 +1,7 @@
package org.ssssssss.magicapi.service;
import org.ssssssss.magicapi.adapter.Resource;
import org.ssssssss.magicapi.model.Group;
import org.ssssssss.magicapi.model.MagicEntity;
import org.ssssssss.magicapi.model.SelectedResource;
import org.ssssssss.magicapi.model.TreeNode;
import org.ssssssss.magicapi.model.*;
import java.io.IOException;
import java.io.OutputStream;
@ -23,6 +20,8 @@ public interface MagicResourceService {
Resource getResource();
boolean processNotify(MagicNotify magicNotify);
/**
* 保存分组
*/

View File

@ -0,0 +1,50 @@
package org.ssssssss.magicapi.service;
import org.springframework.context.event.EventListener;
import org.ssssssss.magicapi.event.FileEvent;
import org.ssssssss.magicapi.event.GroupEvent;
import org.ssssssss.magicapi.model.Constants;
import org.ssssssss.magicapi.model.MagicNotify;
import org.ssssssss.magicapi.provider.MagicNotifyService;
public class MagicSynchronizationService {
private final MagicNotifyService magicNotifyService;
/**
* 当前实例ID
*/
private final String instanceId;
public MagicSynchronizationService(MagicNotifyService magicNotifyService, String instanceId) {
this.magicNotifyService = magicNotifyService;
this.instanceId = instanceId;
}
@EventListener(condition = "#event.source != T(org.ssssssss.magicapi.model.Constants).EVENT_SOURCE_NOTIFY")
public void onFolderEvent(GroupEvent event) {
switch (event.getAction()) {
case CREATE:
case SAVE:
case MOVE:
case DELETE:
magicNotifyService.sendNotify(new MagicNotify(instanceId, event.getGroup().getId(), event.getAction(), event.getType()));
break;
}
}
@EventListener(condition = "#event.source != T(org.ssssssss.magicapi.model.Constants).EVENT_SOURCE_NOTIFY")
public void onFileEvent(FileEvent event) {
if (Constants.EVENT_SOURCE_NOTIFY.equals(event.getSource())) {
return;
}
switch (event.getAction()) {
case CREATE:
case SAVE:
case MOVE:
case DELETE:
magicNotifyService.sendNotify(new MagicNotify(instanceId, event.getEntity().getId(), event.getAction(), Constants.EVENT_TYPE_FILE));
break;
}
}
}

View File

@ -54,11 +54,56 @@ public class DefaultMagicResourceService implements MagicResourceService, JsonCo
this.publisher = publisher;
}
public boolean processNotify(MagicNotify notify) {
if (Constants.EVENT_TYPE_FILE.equals(notify.getType())) {
return processFileNotify(notify.getId(), notify.getAction());
}
return processGroupNotify(notify.getId(), notify.getAction());
}
private boolean processGroupNotify(String id, EventAction action) {
Group group = groupCache.get(id);
TreeNode<Group> treeNode = tree(group.getType()).findTreeNode(it -> it.getId().equals(group.getId()));
if (treeNode != null) {
GroupEvent event = new GroupEvent(group.getType(), action, group);
event.setSource(Constants.EVENT_SOURCE_NOTIFY);
if (action != EventAction.CREATE) {
event.setEntities(treeNode
.flat()
.stream()
.flatMap(g -> listFiles(g.getId()).stream())
.collect(Collectors.toList()));
}
refresh();
publisher.publishEvent(event);
return true;
}
return false;
}
private boolean processFileNotify(String id, EventAction action) {
MagicEntity entity = fileCache.get(id);
if (entity != null) {
Group group = groupCache.get(entity.getGroupId());
if (group != null) {
refresh();
if (action != EventAction.DELETE) {
entity = fileCache.get(id);
}
publisher.publishEvent(new FileEvent(group.getType(), action, entity, Constants.EVENT_SOURCE_NOTIFY));
}
}
return false;
}
@Override
public void refresh() {
readLock(() -> {
writeLock(() -> {
groupMappings.clear();
groupCache.clear();
fileMappings.clear();
fileCache.clear();
pathCache.clear();
this.root.readAll();
storages.forEach((key, registry) -> {
if (registry.requirePath()) {
@ -83,6 +128,7 @@ public class DefaultMagicResourceService implements MagicResourceService, JsonCo
Group group = groupCache.get(entity.getGroupId());
publisher.publishEvent(new FileEvent(group.getType(), EventAction.LOAD, entity));
});
return null;
});
}
@ -127,7 +173,6 @@ public class DefaultMagicResourceService implements MagicResourceService, JsonCo
}
Resource groupResource;
GroupEvent event = new GroupEvent(group.getType(), group.getId() == null ? EventAction.CREATE : EventAction.SAVE, group);
;
if (group.getId() == null) {
// 添加分组
group.setId(UUID.randomUUID().toString().replace("-", ""));
@ -319,6 +364,7 @@ public class DefaultMagicResourceService implements MagicResourceService, JsonCo
notBlank(entity.getName(), NAME_REQUIRED);
isTrue(IoUtils.validateFileName(entity.getName()), NAME_INVALID);
return writeLock(() -> {
EventAction action = entity.getId() == null ? EventAction.CREATE : EventAction.SAVE;
// 获取所在分组
Resource groupResource = getGroupResource(entity.getGroupId());
// 分组需要存在
@ -367,7 +413,7 @@ public class DefaultMagicResourceService implements MagicResourceService, JsonCo
}
boolean flag = fileResource.write(storage.write(entity));
if (flag) {
publisher.publishEvent(new FileEvent(storage.folder(), EventAction.SAVE, entity));
publisher.publishEvent(new FileEvent(storage.folder(), action, entity));
putFile(storage, entity, fileResource);
}
return flag;

View File

@ -50,7 +50,6 @@ public class RequestMagicDynamicRegistry extends AbstractMagicDynamicRegistry<Ap
public ApiInfo getApiInfoFromRequest(HttpServletRequest request) {
String mappingKey = Objects.toString(request.getMethod(), "GET").toUpperCase() + ":" + request.getAttribute(HandlerMapping.BEST_MATCHING_PATTERN_ATTRIBUTE);
;
return getMapping(mappingKey);
}