集成rocketmq

This commit is contained in:
zhaozhenyang 2024-01-16 14:47:54 +08:00
parent b316c7f725
commit 969ba8046b
25 changed files with 3207 additions and 0 deletions

View File

@ -0,0 +1,87 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.ssssssss</groupId>
<artifactId>magic-api-plugins</artifactId>
<version>2.1.1</version>
</parent>
<artifactId>magic-api-plugin-rocketmq</artifactId>
<packaging>jar</packaging>
<name>magic-api-plugin-rocketmq</name>
<description>magic-api-plugin-rocketmq</description>
<dependencies>
<!--rocketmq-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.3</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- npm install && npm run build -->
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.6.0</version>
<executions>
<execution>
<id>exec-npm-install</id>
<phase>generate-resources</phase>
<goals>
<goal>exec</goal>
</goals>
<configuration>
<executable>npm</executable>
<arguments>
<argument>install</argument>
</arguments>
<workingDirectory>${basedir}/src/console</workingDirectory>
</configuration>
</execution>
<execution>
<id>exec-npm-run-build</id>
<phase>generate-resources</phase>
<goals>
<goal>exec</goal>
</goals>
<configuration>
<executable>npm</executable>
<arguments>
<argument>run</argument>
<argument>build</argument>
</arguments>
<workingDirectory>${basedir}/src/console</workingDirectory>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<id>copy-resource</id>
<phase>generate-resources</phase>
<goals>
<goal>copy-resources</goal>
</goals>
<configuration>
<outputDirectory>${basedir}/target/classes/magic-editor/plugins</outputDirectory>
<resources>
<resource>
<directory>${basedir}/src/console/dist</directory>
</resource>
</resources>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,190 @@
var __vite_style__ = document.createElement("style");
__vite_style__.innerHTML = "\n.magic-rocketmq-info[data-v-1628ca62]{\r\n display: flex;\r\n flex-direction: column;\r\n flex: 1;\r\n padding: 5px;\n}\n.magic-rocketmq-info form[data-v-1628ca62]{\r\n display: flex;\n}\n.magic-rocketmq-info form label[data-v-1628ca62]{\r\n display: inline-block;\r\n width: 100px;\r\n height: var(--magic-input-height);\r\n line-height: var(--magic-input-height);\r\n font-weight: 400;\r\n text-align: right;\r\n padding: 0 5px;\n}\n.magic-rocketmq-info form[data-v-1628ca62] .magic-checkbox{\r\n width: 22px;\r\n height: 22px;\n}\n.magic-rocketmq-info form[data-v-1628ca62] .magic-textarea{\r\n margin: 5px;\n}\r\n";
document.head.appendChild(__vite_style__);
var MagicRocketMq = function(vue) {
"use strict";
function MagicRocketMq2(bus, constants, $i, Message, request) {
return {
getIcon: (item) => ["ROCKETMQ", "#9012FE"],
name: $i("rocketmq.name"),
language: "magicscript",
defaultScript: `return 'Hello magic-api-rocketmq'`,
doTest: (opened) => {
opened.running = true;
const info = opened.item;
const requestConfig = {
baseURL: constants.SERVER_URL,
url: "/rocketmq/execute",
method: "POST",
responseType: "json",
headers: {},
withCredentials: true
};
bus.$emit(Message.SWITCH_TOOLBAR, "log");
requestConfig.headers[constants.HEADER_REQUEST_CLIENT_ID] = constants.CLIENT_ID;
requestConfig.headers[constants.HEADER_REQUEST_SCRIPT_ID] = opened.item.id;
requestConfig.headers[constants.HEADER_MAGIC_TOKEN] = constants.HEADER_MAGIC_TOKEN_VALUE;
requestConfig.headers[constants.HEADER_REQUEST_BREAKPOINTS] = (opened.decorations || []).filter((it) => it.options.linesDecorationsClassName === "breakpoints").map((it) => it.range.startLineNumber).join(",");
const fullName = opened.path();
bus.status(`\u5F00\u59CB\u6D4B\u8BD5\u5B9A\u65F6\u4EFB\u52A1\u300C${fullName}\u300D`);
request.sendPost("/rocketmq/execute", { id: info.id }, requestConfig).success((res) => {
opened.running = false;
}).end(() => {
bus.status(`\u5B9A\u65F6\u4EFB\u52A1\u300C${fullName}\u300D\u6D4B\u8BD5\u5B8C\u6BD5`);
opened.running = false;
});
},
runnable: true,
requirePath: true,
merge: (item) => item
};
}
var localZhCN = {
rocketmq: {
title: "RocketMq\u4FE1\u606F",
name: "RocketMq",
form: {
topic: " \u4E3B\u9898",
tag: "\u6807\u7B7E",
name: "\u540D\u79F0",
path: "\u8DEF\u5F84",
placeholder: {
topic: "\u8BF7\u8F93\u5165\u6D88\u606F\u961F\u5217\u4E3B\u9898",
tag: "\u8BF7\u8F93\u5165\u6D88\u606F\u961F\u5217\u6807\u7B7E",
name: "\u8BF7\u8F93\u5165\u6D88\u606F\u961F\u5217\u540D\u79F0",
path: "\u8BF7\u8F93\u5165\u6D88\u606F\u961F\u5217\u8DEF\u5F84",
description: "\u8BF7\u8F93\u5165\u6D88\u606F\u961F\u5217\u63CF\u8FF0"
}
}
}
};
var localEn = {
rocketmq: {
title: "RocketMq Info",
name: "RocketMq",
form: {
topic: " topic",
tag: "tag",
name: "name",
path: "path",
placeholder: {
topic: "Please Enter Topic Expression",
tag: "Please Enter tag Expression",
name: "Please Enter RocketMq Name",
path: "Please Enter RocketMq Path",
description: "Please Enter RocketMq Description"
}
}
}
};
var magicRocketmqInfo_vue_vue_type_style_index_0_scoped_true_lang = "";
var _export_sfc = (sfc, props) => {
const target = sfc.__vccOpts || sfc;
for (const [key, val] of props) {
target[key] = val;
}
return target;
};
const _hoisted_1 = { class: "magic-rocketmq-info" };
const _hoisted_2 = { style: { "flex": "1", "padding-top": "5px" } };
const _sfc_main = {
__name: "magic-rocketmq-info",
setup(__props) {
const $i = vue.inject("i18n.format");
const info = vue.inject("info");
return (_ctx, _cache) => {
const _component_magic_checkbox = vue.resolveComponent("magic-checkbox");
const _component_magic_input = vue.resolveComponent("magic-input");
const _component_magic_textarea = vue.resolveComponent("magic-textarea");
return vue.openBlock(), vue.createElementBlock("div", _hoisted_1, [
vue.createElementVNode("form", null, [
vue.createElementVNode("label", null, vue.toDisplayString(vue.unref($i)("message.enable")), 1),
vue.createVNode(_component_magic_checkbox, {
value: vue.unref(info).enabled,
"onUpdate:value": _cache[0] || (_cache[0] = ($event) => vue.unref(info).enabled = $event)
}, null, 8, ["value"]),
vue.createElementVNode("label", null, vue.toDisplayString(vue.unref($i)("rocketmq.form.topic")), 1),
vue.createVNode(_component_magic_input, {
value: vue.unref(info).topic,
"onUpdate:value": _cache[1] || (_cache[1] = ($event) => vue.unref(info).topic = $event),
placeholder: vue.unref($i)("rocketmq.form.placeholder.topic"),
width: "250px"
}, null, 8, ["value", "placeholder"]),
vue.createElementVNode("label", null, vue.toDisplayString(vue.unref($i)("rocketmq.form.tag")), 1),
vue.createVNode(_component_magic_input, {
value: vue.unref(info).tag,
"onUpdate:value": _cache[2] || (_cache[2] = ($event) => vue.unref(info).tag = $event),
placeholder: vue.unref($i)("rocketmq.form.placeholder.tag"),
width: "250px"
}, null, 8, ["value", "placeholder"]),
vue.createElementVNode("label", null, vue.toDisplayString(vue.unref($i)("rocketmq.form.name")), 1),
vue.createVNode(_component_magic_input, {
value: vue.unref(info).name,
"onUpdate:value": _cache[3] || (_cache[3] = ($event) => vue.unref(info).name = $event),
placeholder: vue.unref($i)("rocketmq.form.placeholder.name"),
width: "250px"
}, null, 8, ["value", "placeholder"]),
vue.createElementVNode("label", null, vue.toDisplayString(vue.unref($i)("rocketmq.form.path")), 1),
vue.createVNode(_component_magic_input, {
value: vue.unref(info).path,
"onUpdate:value": _cache[4] || (_cache[4] = ($event) => vue.unref(info).path = $event),
placeholder: vue.unref($i)("rocketmq.form.placeholder.path"),
width: "auto",
style: { "flex": "1" }
}, null, 8, ["value", "placeholder"])
]),
vue.createElementVNode("div", _hoisted_2, [
vue.createVNode(_component_magic_textarea, {
value: vue.unref(info).description,
"onUpdate:value": _cache[5] || (_cache[5] = ($event) => vue.unref(info).description = $event),
placeholder: vue.unref($i)("rocketmq.form.placeholder.description")
}, null, 8, ["value", "placeholder"])
])
]);
};
}
};
var MagicRocketMqInfo = /* @__PURE__ */ _export_sfc(_sfc_main, [["__scopeId", "data-v-1628ca62"]]);
if (typeof window !== "undefined") {
let loadSvg = function() {
var body = document.body;
var svgDom = document.getElementById("__svg__icons__dom__1705386927094__");
if (!svgDom) {
svgDom = document.createElementNS("http://www.w3.org/2000/svg", "svg");
svgDom.style.position = "absolute";
svgDom.style.width = "0";
svgDom.style.height = "0";
svgDom.id = "__svg__icons__dom__1705386927094__";
svgDom.setAttribute("xmlns", "http://www.w3.org/2000/svg");
svgDom.setAttribute("xmlns:link", "http://www.w3.org/1999/xlink");
}
svgDom.innerHTML = '<symbol class="icon" viewBox="0 0 1024 1024" id="magic-rocketmq-rocketmq"><path d="M512.787 189.403a372.25 372.25 0 1 1-.056 744.444 372.25 372.25 0 0 1 0-744.5zm20.025 179.431h-39.936a6.694 6.694 0 0 0-6.694 6.694v228.705c0 2.194 1.013 4.162 2.756 5.4l137.414 100.234a6.637 6.637 0 0 0 9.281-1.463l23.793-32.399a6.525 6.525 0 0 0-1.518-9.224l-118.459-85.61V375.528a6.694 6.694 0 0 0-6.637-6.694zM711.287 90.125a24.805 24.805 0 0 1 0 49.61H314.232a24.805 24.805 0 0 1 0-49.61h397.111z" /></symbol>';
body.insertBefore(svgDom, body.firstChild);
};
if (document.readyState === "loading") {
document.addEventListener("DOMContentLoaded", loadSvg);
} else {
loadSvg();
}
}
var index = (opt) => {
const i18n = opt.i18n;
i18n.add("zh-cn", localZhCN);
i18n.add("en", localEn);
return {
resource: [{
type: "rocketmq",
icon: "#magic-rocketmq-rocketmq",
title: "rocketmq.name",
service: MagicRocketMq2(opt.bus, opt.constants, i18n.format, opt.Message, opt.request)
}],
toolbars: [{
type: "rocketmq",
title: "rocketmq.title",
icon: "parameter",
component: MagicRocketMqInfo
}]
};
};
return index;
}(Vue);

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,18 @@
{
"name": "magic-rocketmq",
"version": "1.0.0",
"description": "",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1",
"build": "vite build"
},
"author": "",
"license": "ISC",
"devDependencies": {
"vue": "^3.2.31",
"@vitejs/plugin-vue": "^2.2.4",
"vite-plugin-svg-icons": "^1.1.0",
"vite": "^2.8.6"
}
}

View File

@ -0,0 +1,51 @@
<template>
<div class="magic-rocketmq-info">
<form>
<label>{{ $i('message.enable') }}</label>
<magic-checkbox v-model:value="info.enabled" />
<label>{{ $i('rocketmq.form.topic') }}</label>
<magic-input v-model:value="info.topic" :placeholder="$i('rocketmq.form.placeholder.topic')" width="250px"/>
<label>{{ $i('rocketmq.form.tag') }}</label>
<magic-input v-model:value="info.tag" :placeholder="$i('rocketmq.form.placeholder.tag')" width="250px"/>
<label>{{ $i('rocketmq.form.name') }}</label>
<magic-input v-model:value="info.name" :placeholder="$i('rocketmq.form.placeholder.name')" width="250px"/>
<label>{{ $i('rocketmq.form.path') }}</label>
<magic-input v-model:value="info.path" :placeholder="$i('rocketmq.form.placeholder.path')" width="auto" style="flex:1"/>
</form>
<div style="flex:1;padding-top:5px;">
<magic-textarea v-model:value="info.description" :placeholder="$i('rocketmq.form.placeholder.description')"/>
</div>
</div>
</template>
<script setup>
import { inject } from 'vue'
const $i = inject('i18n.format')
const info = inject('info')
</script>
<style scoped>
.magic-rocketmq-info{
display: flex;
flex-direction: column;
flex: 1;
padding: 5px;
}
.magic-rocketmq-info form{
display: flex;
}
.magic-rocketmq-info form label{
display: inline-block;
width: 100px;
height: var(--magic-input-height);
line-height: var(--magic-input-height);
font-weight: 400;
text-align: right;
padding: 0 5px;
}
.magic-rocketmq-info form :deep(.magic-checkbox){
width: 22px;
height: 22px;
}
.magic-rocketmq-info form :deep(.magic-textarea){
margin: 5px;
}
</style>

View File

@ -0,0 +1,19 @@
export default {
rocketmq: {
title: 'RocketMq Info',
name: 'RocketMq',
form: {
topic: " topic",
tag: "tag",
name: 'name',
path: 'path',
placeholder: {
topic: 'Please Enter Topic Expression',
tag: 'Please Enter tag Expression',
name: 'Please Enter RocketMq Name',
path: 'Please Enter RocketMq Path',
description: 'Please Enter RocketMq Description'
}
}
},
}

View File

@ -0,0 +1,19 @@
export default {
rocketmq: {
title: 'RocketMq信息',
name: 'RocketMq',
form: {
topic: " 主题",
tag: "标签",
name: '名称',
path: '路径',
placeholder: {
topic: '请输入消息队列主题',
tag: '请输入消息队列标签',
name: '请输入消息队列名称',
path: '请输入消息队列路径',
description: '请输入消息队列描述'
}
}
}
}

View File

@ -0,0 +1 @@
<svg class="icon" style="width: 1em;height: 1em;vertical-align: middle;fill: currentColor;overflow: hidden;" viewBox="0 0 1024 1024" version="1.1" xmlns="http://www.w3.org/2000/svg"><path d="M512.78747336 189.40294037A372.25009177 372.25009177 0 1 1 512.73122556 933.8468761a372.25009177 372.25009177 0 0 1 0-744.50018353z m20.02433313 179.43151904h-39.93616901a6.69352725 6.69352725 0 0 0-6.69352725 6.69352725V604.2328627c0 2.19367667 1.01246621 4.1623613 2.75615881 5.39982038l137.41416897 100.23415877a6.63727863 6.63727863 0 0 0 9.28094102-1.4624511l23.79295651-32.39891977a6.5247822 6.5247822 0 0 0-1.51869891-9.22469321L539.44908511 581.17113175V375.52798666a6.69352725 6.69352725 0 0 0-6.63727862-6.69352726zM711.28710712 90.125a24.80542356 24.80542356 0 0 1-1e-8 49.61084629H314.23159262a24.80542356 24.80542356 0 0 1 0-49.61084629h397.1117623z" /></svg>

After

Width:  |  Height:  |  Size: 863 B

View File

@ -0,0 +1,35 @@
import MagicRocketMq from './service/magic-rocketmq.js'
import localZhCN from './i18n/zh-cn.js'
import localEn from './i18n/en.js'
import MagicRocketMqInfo from './components/magic-rocketmq-info.vue'
import 'vite-plugin-svg-icons/register'
export default (opt) => {
const i18n = opt.i18n
// 添加i18n 国际化信息
i18n.add('zh-cn', localZhCN)
i18n.add('en', localEn)
return {
// 左侧资源
resource: [{
// 资源类型和后端存储结构一致
type: 'rocketmq',
// 展示图标
icon: '#magic-rocketmq-rocketmq', // #开头表示图标在插件中
// 展示名称
title: 'rocketmq.name',
// 运行服务
service: MagicRocketMq(opt.bus, opt.constants, i18n.format, opt.Message, opt.request),
}],
// 底部工具条
toolbars: [{
// 当打开的资源类型为 task 时显示
type: 'rocketmq',
// 工具条展示的标题
title: 'rocketmq.title',
// 展示图标
icon: 'parameter',
// 对应的组件
component: MagicRocketMqInfo,
}]
}
}

View File

@ -0,0 +1,45 @@
export default function (bus, constants, $i, Message, request) {
return {
// svg text
getIcon: item => ['ROCKETMQ', '#9012FE'],
// 任务名称
name: $i('rocketmq.name'),
// 脚本语言
language: 'magicscript',
// 默认脚本
defaultScript: `return 'Hello magic-api-rocketmq'`,
// 执行测试的逻辑
doTest: (opened) => {
opened.running = true
const info = opened.item
const requestConfig = {
baseURL: constants.SERVER_URL,
url: '/rocketmq/execute',
method: 'POST',
responseType: 'json',
headers: {},
withCredentials: true
}
bus.$emit(Message.SWITCH_TOOLBAR, 'log')
requestConfig.headers[constants.HEADER_REQUEST_CLIENT_ID] = constants.CLIENT_ID
requestConfig.headers[constants.HEADER_REQUEST_SCRIPT_ID] = opened.item.id
requestConfig.headers[constants.HEADER_MAGIC_TOKEN] = constants.HEADER_MAGIC_TOKEN_VALUE
// 设置断点
requestConfig.headers[constants.HEADER_REQUEST_BREAKPOINTS] = (opened.decorations || []).filter(it => it.options.linesDecorationsClassName === 'breakpoints').map(it => it.range.startLineNumber).join(',')
const fullName = opened.path()
bus.status(`开始测试定时任务${fullName}`)
request.sendPost('/rocketmq/execute', { id: info.id }, requestConfig).success(res => {
opened.running = false
}).end(() => {
bus.status(`定时任务${fullName}测试完毕`)
opened.running = false
})
},
// 是否允许执行测试
runnable: true,
// 是否需要填写路径
requirePath: true,
// 合并
merge: item => item
}
}

View File

@ -0,0 +1,37 @@
import vue from '@vitejs/plugin-vue'
import viteSvgIcons from 'vite-plugin-svg-icons'
import path from 'path'
import pkg from './package.json'
export default {
base: './',
build: {
minify: false,
cssCodeSplit: true, // 将组件的 style 打包到 js 文件中
outDir: 'dist',
lib: {
target: 'esnext',
formats: ['iife'],
entry: path.resolve(__dirname, 'src/index.js'),
name: 'MagicRocketMq',
fileName: (format) => `magic-rocketmq.${pkg.version}.${format}.js`
},
rollupOptions: {
// 确保外部化处理那些你不想打包进库的依赖
external: ['vue'],
output: {
// UMD 构建模式下为这些外部化的依赖提供一个全局变量
globals: {
vue: 'Vue'
}
}
}
},
plugins: [
vue(),
viteSvgIcons({
iconDirs: [path.resolve(process.cwd(), 'src/icons')],
symbolId: 'magic-rocketmq-[name]'
}),
]
}

View File

@ -0,0 +1,65 @@
package org.ssssssss.magicapi.rocketmq.consumer;
import org.ssssssss.magicapi.rocketmq.service.RocketMqService;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @author 12863
*/
public class RocketMQConsumer {
Logger logger = LoggerFactory.getLogger(RocketMQConsumer.class);
private static Map<String, DefaultMQPushConsumer> consumerMap;
private final String namesrvAddr;
private final RocketMqService rocketMqService;
public RocketMQConsumer(String namesrvAddr, RocketMqService rocketMqService) {
consumerMap = new HashMap<>();
this.namesrvAddr = namesrvAddr;
this.rocketMqService = rocketMqService;
}
public void subscribe(String topic, String tag, String scriptName, String script) throws MQClientException {
DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer(topic + "-" + tag);
defaultMQPushConsumer.setNamesrvAddr(namesrvAddr);
// 订阅主题和Tag过滤模式使用*表示全量订阅
defaultMQPushConsumer.subscribe(topic, tag);
defaultMQPushConsumer.setInstanceName(topic + ":" + tag);
// 注册监听器
defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
rocketMqService.consume(scriptName, script, msg);
logger.info("Received message: " + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
defaultMQPushConsumer.start();
consumerMap.put(topic + ":" + tag, defaultMQPushConsumer);
}
public void unsubscribe(String topic, String tag) {
DefaultMQPushConsumer defaultMQPushConsumer = consumerMap.get(topic + ":" + tag);
if (defaultMQPushConsumer != null) {
defaultMQPushConsumer.unsubscribe(topic);
defaultMQPushConsumer.shutdown();
consumerMap.remove(topic + ":" + tag);
}
}
}

View File

@ -0,0 +1,106 @@
package org.ssssssss.magicapi.rocketmq.model;
import org.ssssssss.magicapi.core.model.MagicEntity;
import org.ssssssss.magicapi.core.model.PathMagicEntity;
import java.util.Objects;
/**
* @author 12863
*/
public class RocketMqInfo extends PathMagicEntity {
/**
* 主题
*/
private String topic;
/**
* 标签
*/
private String tag;
/**
* 是否启用
*/
private boolean enabled;
/**
* 定时任务描述
*/
private String description;
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public String getTag() {
return tag;
}
public void setTag(String tag) {
this.tag = tag;
}
public boolean isEnabled() {
return enabled;
}
public void setEnabled(boolean enabled) {
this.enabled = enabled;
}
public String getDescription() {
return description;
}
public void setDescription(String description) {
this.description = description;
}
public RocketMqInfo copy() {
RocketMqInfo info = new RocketMqInfo();
super.copyTo(info);
info.setTopic(this.topic);
info.setTag(this.tag);
info.setEnabled(this.enabled);
info.setDescription(this.description);
return info;
}
@Override
public MagicEntity simple() {
RocketMqInfo info = new RocketMqInfo();
super.simple(info);
return info;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;
RocketMqInfo rocketMqInfo = (RocketMqInfo) o;
return Objects.equals(id, rocketMqInfo.id) &&
Objects.equals(path, rocketMqInfo.path) &&
Objects.equals(script, rocketMqInfo.script) &&
Objects.equals(name, rocketMqInfo.name) &&
Objects.equals(topic, rocketMqInfo.topic) &&
Objects.equals(tag, rocketMqInfo.tag) &&
Objects.equals(description, rocketMqInfo.description) &&
Objects.equals(enabled, rocketMqInfo.enabled);
}
@Override
public int hashCode() {
return Objects.hash(id, path, script, name, groupId, topic, tag, enabled, description);
}
}

View File

@ -0,0 +1,91 @@
package org.ssssssss.magicapi.rocketmq.product;
import org.ssssssss.magicapi.core.annotation.MagicModule;
import org.ssssssss.magicapi.rocketmq.service.RocketMqService;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.ssssssss.script.annotation.Comment;
/**
* rocketMq模块
*
* @author zzy
*/
@MagicModule("rocketMq")
public class RocketMqModule {
private final RocketMQTemplate rocketMQTemplate;
private final RocketMqService rocketMqService;
public RocketMqModule(RocketMQTemplate rocketMQTemplate, RocketMqService rocketMqService) {
this.rocketMQTemplate = rocketMQTemplate;
this.rocketMqService = rocketMqService;
}
/**
* 同步发送实体对象消息
* 可靠同步发送同步发送是指消息发送方发出数据后会在收到接收方发回响应之后才发下一个数据包的通讯方式
* 特点速度快有结果反馈数据可靠
* 应用场景应用场景非常广泛例如重要通知邮件报名短信通知营销短信系统等
*/
@Comment("同步发送实体对象消息")
public boolean send(String topic, String tag, Object msg) {
rocketMqService.beforeProduce(topic,tag,msg);
SendResult sendResult = rocketMQTemplate.syncSend(topic + ":" + tag, msg);
if(sendResult.getSendStatus() != SendStatus.SEND_OK){
rocketMqService.failProduce(topic,tag,msg);
return false;
}
rocketMqService.afterProduce(topic,tag,msg);
return true;
}
/**
* 异步发送消息
* 可靠异步发送发送方发出数据后不等接收方发回响应接着发送下个数据包的通讯方式
* 特点速度快有结果反馈数据可靠
* 应用场景异步发送一般用于链路耗时较长, rt响应时间较为敏感的业务场景,例如用户视频上传后通知启动转码服务,转码完成后通知推送转码结果等
*
* @param msg
* @return
*/
@Comment("异步发送消息")
public boolean asyncSend(String topic, String tag, Object msg) {
rocketMqService.beforeProduce(topic,tag,msg);
rocketMQTemplate.asyncSend(topic + ":" + tag, msg, new SendCallback() {
@Override
public void onSuccess(SendResult var1) {
// 成功回调
rocketMqService.afterProduce(topic,tag,msg);
}
@Override
public void onException(Throwable var1) {
// 失败回调
rocketMqService.failProduce(topic,tag,msg);
}
});
return true;
}
/**
* 单向发送
* 单向发送只负责发送消息不等待服务器回应且没有回调函数触发即只发送请求不等待应答此方式发送消息的过程耗时非常短一般在微秒级别
* 特点速度最快耗时非常短毫秒级别无结果反馈数据不可靠可能会丢失
* 应用场景适用于某些耗时非常短但对可靠性要求并不高的场景例如日志收集
*
* @return
*/
@Comment("单向发送")
public boolean sendOneway(String topic, String tag, Object msg) {
rocketMqService.beforeProduce(topic,tag,msg);
rocketMQTemplate.sendOneWay(topic + ":" + tag, msg);
rocketMqService.afterProduce(topic,tag,msg);
return true;
}
}

View File

@ -0,0 +1,42 @@
package org.ssssssss.magicapi.rocketmq.service;
import org.apache.rocketmq.common.message.MessageExt;
import org.ssssssss.script.MagicScriptContext;
/**
* @author 12863
*/
public class DefaultRocketMqService implements RocketMqService{
@Override
public boolean beforeProduce(String topic, String tag, Object msg) {
return true;
}
@Override
public boolean afterProduce(String topic, String tag, Object msg) {
return true;
}
@Override
public boolean failProduce(String topic, String tag, Object msg) {
return false;
}
@Override
public boolean beforeConsume(MessageExt messageExt, MagicScriptContext magicScriptContext) {
return true;
}
@Override
public boolean afterConsume(MessageExt messageExt, MagicScriptContext magicScriptContext) {
return true;
}
@Override
public boolean failConsume(MessageExt messageExt, MagicScriptContext magicScriptContext) {
return false;
}
}

View File

@ -0,0 +1,42 @@
package org.ssssssss.magicapi.rocketmq.service;
import org.ssssssss.magicapi.core.model.MagicEntity;
import org.ssssssss.magicapi.core.service.AbstractPathMagicResourceStorage;
import org.ssssssss.magicapi.rocketmq.model.RocketMqInfo;
import org.springframework.util.StringUtils;
import java.util.List;
public class RocketMqInfoMagicResourceStorage extends AbstractPathMagicResourceStorage<RocketMqInfo> {
@Override
public String folder() {
return "rocketmq";
}
@Override
public Class<RocketMqInfo> magicClass() {
return RocketMqInfo.class;
}
@Override
public void validate(RocketMqInfo entity) {
notBlank(entity.getTopic(), TOPIC_ID_REQUIRED);
String currentTag = StringUtils.isEmpty(entity.getTag()) ? "*" : entity.getTag();
List<MagicEntity> files = magicResourceService.files(folder());
boolean exists = files.stream().filter(x -> !x.getId().equals(entity.getId())).anyMatch(x -> {
String topic = ((RocketMqInfo) x).getTopic();
String tag = ((RocketMqInfo) x).getTag();
if (StringUtils.isEmpty(tag)) {
tag = "*";
}
return topic.equals(entity.getTopic()) && tag.equals(currentTag);
});
isTrue(!exists, TOPIC_TAG_REPEAT);
}
@Override
public String buildMappingKey(RocketMqInfo info) {
return buildMappingKey(info, magicResourceService.getGroupPath(info.getGroupId()));
}
}

View File

@ -0,0 +1,89 @@
package org.ssssssss.magicapi.rocketmq.service;
import org.ssssssss.magicapi.core.config.MagicConfiguration;
import org.ssssssss.magicapi.core.event.FileEvent;
import org.ssssssss.magicapi.core.event.GroupEvent;
import org.ssssssss.magicapi.core.service.AbstractMagicDynamicRegistry;
import org.ssssssss.magicapi.core.service.MagicResourceStorage;
import org.ssssssss.magicapi.rocketmq.consumer.RocketMQConsumer;
import org.ssssssss.magicapi.rocketmq.model.RocketMqInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.event.EventListener;
/**
* @author 12863
*/
public class RocketMqMagicDynamicRegistry extends AbstractMagicDynamicRegistry<RocketMqInfo> {
private final RocketMQConsumer rocketMQConsumer;
private static final Logger logger = LoggerFactory.getLogger(RocketMqMagicDynamicRegistry.class);
private final boolean showLog;
public RocketMqMagicDynamicRegistry(MagicResourceStorage<RocketMqInfo> magicResourceStorage, boolean showLog, RocketMqService rocketMqService,
String namesrvAddr) {
super(magicResourceStorage);
this.rocketMQConsumer = new RocketMQConsumer(namesrvAddr,rocketMqService);
this.showLog = showLog;
}
@EventListener(condition = "#event.type == 'rocketmq'")
public void onFileEvent(FileEvent event) {
processEvent(event);
}
@EventListener(condition = "#event.type == 'rocketmq'")
public void onGroupEvent(GroupEvent event) {
processEvent(event);
}
@Override
public boolean register(RocketMqInfo entity) {
unregister(entity);
return super.register(entity);
}
@Override
protected boolean register(MappingNode<RocketMqInfo> mappingNode) {
RocketMqInfo entity = mappingNode.getEntity();
if (rocketMQConsumer != null) {
String scriptName = MagicConfiguration.getMagicResourceService().getScriptName(entity);
try {
if (entity.isEnabled()) {
try {
if (showLog) {
logger.info("消息队列:[{}]开始执行", scriptName);
}
rocketMQConsumer.subscribe(entity.getTopic(), entity.getTag(), scriptName, entity.getScript());
} catch (Exception e) {
logger.error("消息队列执行出错", e);
} finally {
if (showLog) {
logger.info("消息队列:[{}]执行完毕", scriptName);
}
}
}
} catch (Exception e) {
logger.error("消息队列:[{}]注册失败", scriptName, e);
}
logger.debug("注册消息队列:[{},{}]", MagicConfiguration.getMagicResourceService().getScriptName(entity), entity.getTopic());
}
return true;
}
@Override
protected void unregister(MappingNode<RocketMqInfo> mappingNode) {
RocketMqInfo info = mappingNode.getEntity();
logger.debug("取消注册消息队列:[{}, {}, {}]", info.getName(), info.getPath(), info.getTopic());
if (rocketMQConsumer != null) {
try {
rocketMQConsumer.unsubscribe(info.getTopic(), info.getTag());
} catch (Exception e) {
String scriptName = MagicConfiguration.getMagicResourceService().getScriptName(info);
logger.warn("消息队列:[{}]取消失败", scriptName, e);
}
}
}
}

View File

@ -0,0 +1,96 @@
package org.ssssssss.magicapi.rocketmq.service;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.transaction.annotation.Transactional;
import org.ssssssss.magicapi.utils.ScriptManager;
import org.ssssssss.script.MagicScriptContext;
import java.util.Arrays;
import java.util.stream.Collectors;
/**
* @author 12863
*/
public interface RocketMqService {
/**
* 消费默认实现
*
* @param scriptName
* @param script
* @param msg
* @return
*/
@Transactional(rollbackFor = Exception.class)
default boolean consume(String scriptName, String script, MessageExt msg) {
MagicScriptContext magicScriptContext = new MagicScriptContext();
magicScriptContext.setScriptName(scriptName);
magicScriptContext.set("msg", msg);
beforeConsume(msg, magicScriptContext);
Object result = ScriptManager.executeScript(script, magicScriptContext);
if (result != null) {
failConsume(msg, magicScriptContext);
return false;
}
afterConsume(msg, magicScriptContext);
return true;
}
/**
* 发送消息前处理
*
* @param topic 主题
* @param tag 标签
* @param msg 消息
* @return
*/
boolean beforeProduce(String topic, String tag, Object msg);
/**
* 发送消息后处理
*
* @param topic 主题
* @param tag 标签
* @param msg 消息
* @return
*/
boolean afterProduce(String topic, String tag, Object msg);
/**
* 发送消息失败处理
*
* @param topic 主题
* @param tag 标签
* @param msg 消息
* @return
*/
boolean failProduce(String topic, String tag, Object msg);
/**
* 消费消息前处理
*
* @param messageExt 消息体
* @param magicScriptContext magic 上下文
* @return
*/
boolean beforeConsume(MessageExt messageExt, MagicScriptContext magicScriptContext);
/**
* 消费消息后处理
*
* @param messageExt 消息体
* @param magicScriptContext magic 上下文
* @return
*/
boolean afterConsume(MessageExt messageExt, MagicScriptContext magicScriptContext);
/**
* 消费消息失败处理
*
* @param messageExt 消息体
* @param magicScriptContext magic 上下文
* @return
*/
boolean failConsume(MessageExt messageExt, MagicScriptContext magicScriptContext);
}

View File

@ -0,0 +1,70 @@
package org.ssssssss.magicapi.rocketmq.starter;
import org.ssssssss.magicapi.core.config.MagicPluginConfiguration;
import org.ssssssss.magicapi.core.model.Plugin;
import org.ssssssss.magicapi.core.web.MagicControllerRegister;
import org.ssssssss.magicapi.rocketmq.product.RocketMqModule;
import org.ssssssss.magicapi.rocketmq.service.DefaultRocketMqService;
import org.ssssssss.magicapi.rocketmq.service.RocketMqInfoMagicResourceStorage;
import org.ssssssss.magicapi.rocketmq.service.RocketMqMagicDynamicRegistry;
import org.ssssssss.magicapi.rocketmq.service.RocketMqService;
import org.ssssssss.magicapi.rocketmq.web.MagicRocketMqController;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author 12863
*/
@Configuration
@EnableConfigurationProperties(MagicRocketMqConfig.class)
public class MagicAPIRocketMqConfiguration implements MagicPluginConfiguration {
private final MagicRocketMqConfig config;
@Value("${rocketmq.name-server}")
private String nameServer;
public MagicAPIRocketMqConfiguration(MagicRocketMqConfig config) {
this.config = config;
}
@Bean
@ConditionalOnMissingBean
public RocketMqInfoMagicResourceStorage rocketMqInfoMagicResourceStorage() {
return new RocketMqInfoMagicResourceStorage();
}
@Bean
@ConditionalOnMissingBean
public RocketMqService rocketMqService() {
return new DefaultRocketMqService();
}
@Bean
@ConditionalOnMissingBean
public RocketMqMagicDynamicRegistry rocketMqMagicDynamicRegistry(RocketMqInfoMagicResourceStorage rocketMqInfoMagicResourceStorage, RocketMqService rocketMqService) {
return new RocketMqMagicDynamicRegistry(rocketMqInfoMagicResourceStorage, config.isLog(),rocketMqService, nameServer);
}
@Override
public Plugin plugin() {
return new Plugin("rocket消息队列", "MagicRocketMq", "magic-rocketmq.1.0.0.iife.js");
}
@Override
public MagicControllerRegister controllerRegister() {
return (mapping, configuration) -> mapping.registerController(new MagicRocketMqController(configuration));
}
/**
* 注入rocketmq模块
*/
@Bean
public RocketMqModule rocketMqFunctions(RocketMQTemplate rocketMQTemplate, RocketMqService rocketMqService) {
return new RocketMqModule(rocketMQTemplate,rocketMqService);
}
}

View File

@ -0,0 +1,25 @@
package org.ssssssss.magicapi.rocketmq.starter;
import org.springframework.boot.context.properties.ConfigurationProperties;
/**
* @author 12863
*/
@ConfigurationProperties("magic-api.rocketmq")
public class MagicRocketMqConfig {
/**
* 是否打印日志
* @since 2.1.0
*/
private boolean log = false;
public boolean isLog() {
return log;
}
public void setLog(boolean log) {
this.log = log;
}
}

View File

@ -0,0 +1,46 @@
package org.ssssssss.magicapi.rocketmq.web;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.ssssssss.magicapi.core.config.MagicConfiguration;
import org.ssssssss.magicapi.core.config.WebSocketSessionManager;
import org.ssssssss.magicapi.core.logging.MagicLoggerContext;
import org.ssssssss.magicapi.core.model.DebugRequest;
import org.ssssssss.magicapi.core.model.JsonBean;
import org.ssssssss.magicapi.core.model.MagicEntity;
import org.ssssssss.magicapi.core.servlet.MagicHttpServletRequest;
import org.ssssssss.magicapi.core.web.MagicController;
import org.ssssssss.magicapi.core.web.MagicExceptionHandler;
import org.ssssssss.magicapi.utils.ScriptManager;
import org.ssssssss.script.MagicScriptDebugContext;
/**
* @author 12863
*/
public class MagicRocketMqController extends MagicController implements MagicExceptionHandler {
public MagicRocketMqController(MagicConfiguration configuration) {
super(configuration);
}
@PostMapping("/rocketmq/execute")
@ResponseBody
public JsonBean<Object> execute(String id, MagicHttpServletRequest request){
MagicEntity entity = MagicConfiguration.getMagicResourceService().file(id);
notNull(entity, FILE_NOT_FOUND);
String script = entity.getScript();
DebugRequest debugRequest = DebugRequest.create(request);
MagicLoggerContext.SESSION.set(debugRequest.getRequestedClientId());
String sessionAndScriptId = debugRequest.getRequestedClientId() + debugRequest.getRequestedScriptId();
try {
MagicScriptDebugContext magicScriptContext = debugRequest.createMagicScriptContext(configuration.getDebugTimeout());
WebSocketSessionManager.addMagicScriptContext(sessionAndScriptId, magicScriptContext);
magicScriptContext.setScriptName(MagicConfiguration.getMagicResourceService().getScriptName(entity));
return new JsonBean<>(ScriptManager.executeScript(script, magicScriptContext));
} finally {
WebSocketSessionManager.removeMagicScriptContext(sessionAndScriptId);
MagicLoggerContext.SESSION.remove();
}
}
}

View File

@ -0,0 +1 @@
org.springframework.boot.autoconfigure.EnableAutoConfiguration=org.ssssssss.magicapi.rocketmq.starter.MagicAPIRocketMqConfiguration

View File

@ -23,6 +23,7 @@
<module>magic-api-plugin-elasticsearch</module>
<module>magic-api-plugin-cluster</module>
<module>magic-api-plugin-git</module>
<module>magic-api-plugin-rocketmq</module>
</modules>
<dependencies>
<dependency>

View File

@ -84,6 +84,10 @@ public interface JsonCodeConstants {
JsonCode API_NOT_FOUND = new JsonCode(1035, "找不到接口");
JsonCode TOPIC_ID_REQUIRED = new JsonCode(1036, "主题不能为空");
JsonCode TOPIC_TAG_REPEAT = new JsonCode(1037, "主题与标签重复");
default void notNull(Object value, JsonCode jsonCode) {
if (value == null) {
throw new InvalidArgumentException(jsonCode);