集成rocketmq

This commit is contained in:
zhaozhenyang 2024-01-16 17:41:48 +08:00
parent 969ba8046b
commit 62ed76f1a1
10 changed files with 51 additions and 44 deletions

View File

@ -1,6 +1,5 @@
package org.ssssssss.magicapi.rocketmq.consumer;
import org.ssssssss.magicapi.rocketmq.service.RocketMqService;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
@ -9,13 +8,14 @@ import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.ssssssss.magicapi.rocketmq.service.RocketMqService;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @author 12863
* @author zzy
*/
public class RocketMQConsumer {
@ -38,28 +38,31 @@ public class RocketMQConsumer {
defaultMQPushConsumer.setNamesrvAddr(namesrvAddr);
// 订阅主题和Tag过滤模式使用*表示全量订阅
defaultMQPushConsumer.subscribe(topic, tag);
defaultMQPushConsumer.setInstanceName(topic + ":" + tag);
defaultMQPushConsumer.setInstanceName(topic + "-" + tag);
defaultMQPushConsumer.setConsumeMessageBatchMaxSize(1);
// 注册监听器
defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
rocketMqService.consume(scriptName, script, msg);
logger.info("Received message: " + new String(msg.getBody()));
if (!rocketMqService.consume(scriptName, script, msg)) {
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
defaultMQPushConsumer.start();
consumerMap.put(topic + ":" + tag, defaultMQPushConsumer);
consumerMap.put(topic + "-" + tag, defaultMQPushConsumer);
}
public void unsubscribe(String topic, String tag) {
DefaultMQPushConsumer defaultMQPushConsumer = consumerMap.get(topic + ":" + tag);
DefaultMQPushConsumer defaultMQPushConsumer = consumerMap.get(topic + "-" + tag);
if (defaultMQPushConsumer != null) {
defaultMQPushConsumer.unsubscribe(topic);
defaultMQPushConsumer.shutdown();
consumerMap.remove(topic + ":" + tag);
consumerMap.remove(topic + "-" + tag);
}
}
}

View File

@ -7,7 +7,7 @@ import org.ssssssss.magicapi.core.model.PathMagicEntity;
import java.util.Objects;
/**
* @author 12863
* @author zzy
*/
public class RocketMqInfo extends PathMagicEntity {
@ -29,7 +29,7 @@ public class RocketMqInfo extends PathMagicEntity {
/**
* 定时任务描述
* 消息队列描述
*/
private String description;

View File

@ -1,11 +1,11 @@
package org.ssssssss.magicapi.rocketmq.product;
import org.ssssssss.magicapi.core.annotation.MagicModule;
import org.ssssssss.magicapi.rocketmq.service.RocketMqService;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.ssssssss.magicapi.core.annotation.MagicModule;
import org.ssssssss.magicapi.rocketmq.service.RocketMqService;
import org.ssssssss.script.annotation.Comment;
/**
@ -33,13 +33,13 @@ public class RocketMqModule {
*/
@Comment("同步发送实体对象消息")
public boolean send(String topic, String tag, Object msg) {
rocketMqService.beforeProduce(topic,tag,msg);
rocketMqService.beforeProduce(topic, tag, msg);
SendResult sendResult = rocketMQTemplate.syncSend(topic + ":" + tag, msg);
if(sendResult.getSendStatus() != SendStatus.SEND_OK){
rocketMqService.failProduce(topic,tag,msg);
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
rocketMqService.failProduce(topic, tag, msg);
return false;
}
rocketMqService.afterProduce(topic,tag,msg);
rocketMqService.afterProduce(topic, tag, msg);
return true;
}
@ -54,18 +54,18 @@ public class RocketMqModule {
*/
@Comment("异步发送消息")
public boolean asyncSend(String topic, String tag, Object msg) {
rocketMqService.beforeProduce(topic,tag,msg);
rocketMqService.beforeProduce(topic, tag, msg);
rocketMQTemplate.asyncSend(topic + ":" + tag, msg, new SendCallback() {
@Override
public void onSuccess(SendResult var1) {
// 成功回调
rocketMqService.afterProduce(topic,tag,msg);
rocketMqService.afterProduce(topic, tag, msg);
}
@Override
public void onException(Throwable var1) {
// 失败回调
rocketMqService.failProduce(topic,tag,msg);
rocketMqService.failProduce(topic, tag, msg);
}
});
return true;
@ -81,9 +81,9 @@ public class RocketMqModule {
*/
@Comment("单向发送")
public boolean sendOneway(String topic, String tag, Object msg) {
rocketMqService.beforeProduce(topic,tag,msg);
rocketMqService.beforeProduce(topic, tag, msg);
rocketMQTemplate.sendOneWay(topic + ":" + tag, msg);
rocketMqService.afterProduce(topic,tag,msg);
rocketMqService.afterProduce(topic, tag, msg);
return true;
}

View File

@ -2,12 +2,13 @@ package org.ssssssss.magicapi.rocketmq.service;
import org.apache.rocketmq.common.message.MessageExt;
import org.ssssssss.magicapi.rocketmq.service.RocketMqService;
import org.ssssssss.script.MagicScriptContext;
/**
* @author 12863
* @author zzy
*/
public class DefaultRocketMqService implements RocketMqService{
public class DefaultRocketMqService implements RocketMqService {
@Override

View File

@ -1,12 +1,16 @@
package org.ssssssss.magicapi.rocketmq.service;
import org.apache.commons.lang3.StringUtils;
import org.ssssssss.magicapi.core.model.MagicEntity;
import org.ssssssss.magicapi.core.service.AbstractPathMagicResourceStorage;
import org.ssssssss.magicapi.rocketmq.model.RocketMqInfo;
import org.springframework.util.StringUtils;
import java.util.List;
/**
* @author zzy
*/
public class RocketMqInfoMagicResourceStorage extends AbstractPathMagicResourceStorage<RocketMqInfo> {
@Override

View File

@ -1,5 +1,9 @@
package org.ssssssss.magicapi.rocketmq.service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.event.EventListener;
import org.ssssssss.magicapi.core.config.MagicConfiguration;
import org.ssssssss.magicapi.core.event.FileEvent;
import org.ssssssss.magicapi.core.event.GroupEvent;
@ -7,12 +11,9 @@ import org.ssssssss.magicapi.core.service.AbstractMagicDynamicRegistry;
import org.ssssssss.magicapi.core.service.MagicResourceStorage;
import org.ssssssss.magicapi.rocketmq.consumer.RocketMQConsumer;
import org.ssssssss.magicapi.rocketmq.model.RocketMqInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.event.EventListener;
/**
* @author 12863
* @author zzy
*/
public class RocketMqMagicDynamicRegistry extends AbstractMagicDynamicRegistry<RocketMqInfo> {
private final RocketMQConsumer rocketMQConsumer;
@ -24,7 +25,7 @@ public class RocketMqMagicDynamicRegistry extends AbstractMagicDynamicRegistry<R
public RocketMqMagicDynamicRegistry(MagicResourceStorage<RocketMqInfo> magicResourceStorage, boolean showLog, RocketMqService rocketMqService,
String namesrvAddr) {
super(magicResourceStorage);
this.rocketMQConsumer = new RocketMQConsumer(namesrvAddr,rocketMqService);
this.rocketMQConsumer = new RocketMQConsumer(namesrvAddr, rocketMqService);
this.showLog = showLog;
}

View File

@ -1,16 +1,12 @@
package org.ssssssss.magicapi.rocketmq.service;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.transaction.annotation.Transactional;
import org.ssssssss.magicapi.utils.ScriptManager;
import org.ssssssss.script.MagicScriptContext;
import java.util.Arrays;
import java.util.stream.Collectors;
/**
* @author 12863
* @author zzy
*/
public interface RocketMqService {

View File

@ -1,5 +1,12 @@
package org.ssssssss.magicapi.rocketmq.starter;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.ssssssss.magicapi.core.config.MagicPluginConfiguration;
import org.ssssssss.magicapi.core.model.Plugin;
import org.ssssssss.magicapi.core.web.MagicControllerRegister;
@ -9,15 +16,9 @@ import org.ssssssss.magicapi.rocketmq.service.RocketMqInfoMagicResourceStorage;
import org.ssssssss.magicapi.rocketmq.service.RocketMqMagicDynamicRegistry;
import org.ssssssss.magicapi.rocketmq.service.RocketMqService;
import org.ssssssss.magicapi.rocketmq.web.MagicRocketMqController;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author 12863
* @author zzy
*/
@Configuration
@EnableConfigurationProperties(MagicRocketMqConfig.class)
@ -47,7 +48,7 @@ public class MagicAPIRocketMqConfiguration implements MagicPluginConfiguration {
@Bean
@ConditionalOnMissingBean
public RocketMqMagicDynamicRegistry rocketMqMagicDynamicRegistry(RocketMqInfoMagicResourceStorage rocketMqInfoMagicResourceStorage, RocketMqService rocketMqService) {
return new RocketMqMagicDynamicRegistry(rocketMqInfoMagicResourceStorage, config.isLog(),rocketMqService, nameServer);
return new RocketMqMagicDynamicRegistry(rocketMqInfoMagicResourceStorage, config.isLog(), rocketMqService, nameServer);
}
@Override
@ -65,6 +66,6 @@ public class MagicAPIRocketMqConfiguration implements MagicPluginConfiguration {
*/
@Bean
public RocketMqModule rocketMqFunctions(RocketMQTemplate rocketMQTemplate, RocketMqService rocketMqService) {
return new RocketMqModule(rocketMQTemplate,rocketMqService);
return new RocketMqModule(rocketMQTemplate, rocketMqService);
}
}

View File

@ -4,12 +4,13 @@ package org.ssssssss.magicapi.rocketmq.starter;
import org.springframework.boot.context.properties.ConfigurationProperties;
/**
* @author 12863
* @author zzy
*/
@ConfigurationProperties("magic-api.rocketmq")
public class MagicRocketMqConfig {
/**
* 是否打印日志
*
* @since 2.1.0
*/
private boolean log = false;

View File

@ -16,7 +16,7 @@ import org.ssssssss.script.MagicScriptDebugContext;
/**
* @author 12863
* @author zzy
*/
public class MagicRocketMqController extends MagicController implements MagicExceptionHandler {