背景描述:
项目开发中首先遇到在集群模式部署服务端时,多个节点的定时任务在各自的节点处均会执行一遍的问题,针对这个单一问题,上一篇文章 https://www.cnblogs.com/Iris1998/p/11413099.html给出了springboot集成shedlock的解决方案,但一段时间后项目中有了新的需求,想要实现消息主动推到客户端的功能 ,单从功能上来看可以采用springboot自带的websocket实现,但开发过程遇到很奇怪的报错,报错信息如下
{"time":"2019-09-27 11:07:22 626", "level":"WARN", "classname":"org.apache.juli.logging.DirectJDKLog", "method":"log", "line":"173", "msg":"The web application [ROOT] appears to have started a thread named [RxIoScheduler-1 (Evictor)] but has failed to stop it. This is very likely to create a memory leak. Stack trace of thread:
sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:748)"}
{"time":"2019-09-27 11:07:22 666", "level":"ERROR", "classname":"org.springframework.boot.SpringApplication", "method":"reportFailure", "line":"858", "msg":"Application run failed"}
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'stompWebSocketHandlerMapping' defined in class path resource [org/springframework/web/socket/config/annotation/DelegatingWebSocketMessageBrokerConfiguration.class]: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.springframework.web.servlet.HandlerMapping]: Factory method 'stompWebSocketHandlerMapping' threw exception; nested exception is java.lang.IllegalStateException: @Bean method AbstractMessageBrokerConfiguration.messageBrokerTaskScheduler called as bean reference for type [org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler] but overridden by non-compatible bean instance of type [com.sun.proxy.$Proxy220]. Overriding bean of same name declared in: class path resource [org/springframework/web/socket/config/annotation/DelegatingWebSocketMessageBrokerConfiguration.class]
at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:627)
at org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:456)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateUsingFactoryMethod(AbstractAutowireCapableBeanFactory.java:1288)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1127)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:538)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:498)
at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:320)
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:222)
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:318)
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:199)
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:846)
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:863)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:546)
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:142)
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:775)
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:316)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1260)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1248)
at com.hundunyun.wechat.App.main(App.java:19)
Caused by: org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.springframework.web.servlet.HandlerMapping]: Factory method 'stompWebSocketHandlerMapping' threw exception; nested exception is java.lang.IllegalStateException: @Bean method AbstractMessageBrokerConfiguration.messageBrokerTaskScheduler called as bean reference for type [org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler] but overridden by non-compatible bean instance of type [com.sun.proxy.$Proxy220]. Overriding bean of same name declared in: class path resource [org/springframework/web/socket/config/annotation/DelegatingWebSocketMessageBrokerConfiguration.class]
at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:185)
at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:622)
... 19 common frames omitted
Caused by: java.lang.IllegalStateException: @Bean method AbstractMessageBrokerConfiguration.messageBrokerTaskScheduler called as bean reference for type [org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler] but overridden by non-compatible bean instance of type [com.sun.proxy.$Proxy220]. Overriding bean of same name declared in: class path resource [org/springframework/web/socket/config/annotation/DelegatingWebSocketMessageBrokerConfiguration.class]
at org.springframework.context.annotation.ConfigurationClassEnhancer$BeanMethodInterceptor.resolveBeanReference(ConfigurationClassEnhancer.java:418)
at org.springframework.context.annotation.ConfigurationClassEnhancer$BeanMethodInterceptor.intercept(ConfigurationClassEnhancer.java:366)
at org.springframework.web.socket.config.annotation.DelegatingWebSocketMessageBrokerConfiguration$$EnhancerBySpringCGLIB$$748fbfa6.messageBrokerTaskScheduler(<generated>)
at org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurationSupport.stompWebSocketHandlerMapping(WebSocketMessageBrokerConfigurationSupport.java:76)
at org.springframework.web.socket.config.annotation.DelegatingWebSocketMessageBrokerConfiguration$$EnhancerBySpringCGLIB$$748fbfa6.CGLIB$stompWebSocketHandlerMapping$14(<generated>)
at org.springframework.web.socket.config.annotation.DelegatingWebSocketMessageBrokerConfiguration$$EnhancerBySpringCGLIB$$748fbfa6$$FastClassBySpringCGLIB$$19a6d0cf.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invokeSuper(MethodProxy.java:244)
at org.springframework.context.annotation.ConfigurationClassEnhancer$BeanMethodInterceptor.intercept(ConfigurationClassEnhancer.java:363)
at org.springframework.web.socket.config.annotation.DelegatingWebSocketMessageBrokerConfiguration$$EnhancerBySpringCGLIB$$748fbfa6.stompWebSocketHandlerMapping(<generated>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:154)
... 20 common frames omitted
经过实验,独自使用shedlock没有问题,独自使用websocket也没有问题,但两个放在一起就有问题,大胆猜测问题出在springBoot的shedlock和websocket的底层,尝试解决无果后决定换一种实现方式,springboot的websocket+redis分布式锁实现分布式环境中两个功能的兼容。
第一步:引入相关包
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
<version>2.2.1</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId> <version>2.2.1</version> </dependency>
<!--测试依赖项-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
<!-- 负载均衡定时任务执行一次 -->
<dependency>
<groupId>net.javacrumbs.shedlock</groupId>
<artifactId>shedlock-spring</artifactId>
<version>2.2.1</version>
</dependency>
<dependency>
<groupId>net.javacrumbs.shedlock</groupId>
<artifactId>shedlock-provider-redis-jedis</artifactId>
<version>2.2.1</version>
</dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.41</version> </dependency>
第二步:添加redis分布式锁功能相关的文件
1.添加自定义锁注解
import java.lang.annotation.*;
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface CacheLock {
String lockedPrefix() default ""; //redis 锁key的前缀
int expireTime() default 10; //key在redis里存在的时间,1000S
}
2.添加aop切面类
import com.iris.websocket.annotation.CacheLock;
import com.iris.websocket.springwebsocket.redis.RedisClient;
import lombok.Synchronized;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.lang.reflect.Method;
/*
* @Description TODO
* @Author muruan.lt
* @Date 2019/9/26 11:36
*/
@Aspect
@Slf4j
@Component
public class CacheLockAspect {
private static final String LOCK_VALUE = "locked";
@Autowired
RedisClient redisClient;
/*
* 1.有springboot-websocket情况下不用@Synchronized这个注解,可能多线程导致的分布式锁不生效
* 2.没有springboot-websocket情况下可以不用@Synchronized这个注解
*/
@Around("@annotation(com.iris.websocket.annotation.CacheLock)")
@Synchronized
public void cacheLockPoint(ProceedingJoinPoint pjp) {
String name = pjp.getSignature().getName();
Method[] methods = pjp.getTarget().getClass().getMethods();
for (Method cacheMethod : methods) {
if (null != cacheMethod.getAnnotation(CacheLock.class)
&& name.equals(cacheMethod.getName())){
try {
// String lockKey = pjp.getTarget().getClass().getName()+cacheMethod.getAnnotation(CacheLock.class).lockedPrefix();
String lockKey = cacheMethod.getAnnotation(CacheLock.class).lockedPrefix();
int timeOut = cacheMethod.getAnnotation(CacheLock.class).expireTime();
if(null == lockKey){
return;
}
if (redisClient.setnx(lockKey, LOCK_VALUE)) {
redisClient.expire(lockKey, timeOut);
// log.info("method:{}获取锁:{},开始运行!",cacheMethod,lockKey);
pjp.proceed();
return;
}
// log.info("method:{}未获取锁:{},运行失败!",cacheMethod,lockKey);
} catch (Throwable e) {
log.error("method:{},运行错误!",cacheMethod,e);
return;
}
break;
}
}
}
}
3.添加application.properties配置文件
server.port: 8800
spring.application.name: sun-websocket
eureka.instance.hostname: localhost
eureka.client.registerWithEureka: false
eureka.client.fetchRegistry: false
serviceUrl.defaultZone: http://${eureka.instance.hostname}:${server.port}/eureka/
####################################### redis 配置信息开始 ###########################################
redis.host=127.0.0.1
redis.password=123456
redis.port=6379
#最大能够保持idel状态的对象数
redis.maxIdle=300
#最大分配的对象数
redis.maxActive=600
#当池内没有返回对象时,最大等待时间
redis.maxWaitMillis=1000
redis.maxTotal=1000
#当调用borrow Object方法时,是否进行有效性检查
redis.testOnBorrow=true
#当调用return Object方法时,是否进行有效性检查
redis.testOnReturn=true
redis.timeout=3600
####################################### redis 配置信息结束 ###########################################
4.添加redis配置相关文件
4.1 添加 RedisPool.java 文件
import org.springframework.beans.factory.annotation.Value;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
/*
* @Description: JedisPool配置类,配置信息在默认的"application.properties"文件中
* @Author: muruan.lt
* @CreateDate: 2019/9/27 9:50
* @Version: 1.0
*/
public abstract class RedisPool {
public JedisPool jedisPool;
@Value("${redis.host}")
private String host;
@Value("${redis.port}")
private Integer port;
@Value("${redis.password}")
private String password;
@Value("${redis.maxIdle}")
private Integer maxIdle;
@Value("${redis.maxTotal}")
private Integer maxTotal;
@Value("${redis.maxActive}")
private Integer maxActive;
@Value("${redis.maxWaitMillis}")
private Integer maxWaitMillis;
@Value("${redis.timeout}")
private Integer timeout;
@Value("${redis.testOnBorrow}")
private Boolean testOnBorrow;
@Value("${redis.testOnReturn}")
private Boolean testOnReturn;
protected RedisPool() {
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
jedisPoolConfig.setMaxIdle(maxIdle);
jedisPoolConfig.setMaxTotal(maxTotal);
jedisPoolConfig.setMaxWaitMillis(maxWaitMillis);
jedisPoolConfig.setTestOnBorrow(testOnBorrow);
jedisPoolConfig.setTestOnReturn(testOnReturn);
jedisPool =
new JedisPool(jedisPoolConfig, host, port, timeout, password);
}
}
4.2 添加 RedisClient.java 文件
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import redis.clients.jedis.*;
import redis.clients.jedis.exceptions.JedisConnectionException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@Component
public class RedisClient {
private static final Logger loggger = LoggerFactory.getLogger(RedisClient.class);
@Autowired
private JedisPool jedisPool;
private Jedis getJedis() {
Jedis jedis = jedisPool.getResource();
// jedis.select(database);
return jedis;
}
public void set(String key, String value) {
Jedis jedis = null;
try {
jedis = getJedis();
jedis.set(key, value);
} finally {
jedis.close();
}
}
public void set(String key, String value, int expTime) {
Jedis jedis = null;
try {
jedis = getJedis();
jedis.setex(key, expTime, value);
} finally {
jedis.close();
}
}
public String get(String key) {
Jedis jedis = null;
try {
jedis = getJedis();
return jedis.get(key);
} finally {
jedis.close();
}
}
public void expire(String key, int expTime) {
Jedis jedis = null;
try {
jedis = getJedis();
jedis.expire(key, expTime);
} finally {
jedis.close();
}
}
public boolean setnx(String lockKey, String expires) {
Jedis jedis = null;
try {
jedis = getJedis();
Long result = jedis.setnx(lockKey, expires);
return result == 1;
} finally {
jedis.close();
}
}
}
4.3 添加RedisConfig.java文件
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cache.annotation.CachingConfigurerSupport;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.cache.interceptor.KeyGenerator;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import redis.clients.jedis.JedisPool;
import java.lang.reflect.Method;
/*
* @Description 服务于shedlock-redis
* @Author muruan.lt
* @Date 2019/9/26 10:19
*/
@Configuration
@EnableCaching
public class RedisConfig extends CachingConfigurerSupport {
@Value("${redis.host}")
private String host;
@Value("${redis.port}")
private Integer port;
@Bean
@Override
public KeyGenerator keyGenerator() {
return new KeyGenerator() {
@Override
public Object generate(Object target, Method method, Object... params) {
StringBuilder sb = new StringBuilder();
sb.append(target.getClass().getName());
sb.append(method.getName());
for (Object o : params) {
if (o != null) {
sb.append(o.toString());
}
}
return sb.toString();
}
};
}
@Bean
public RedisTemplate<String, String> myRedisTemplate(RedisConnectionFactory factory) {
StringRedisTemplate srt = new StringRedisTemplate(factory);
Jackson2JsonRedisSerializer j2j = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
j2j.setObjectMapper(om);
srt.setValueSerializer(j2j);
srt.afterPropertiesSet();
return srt;
}
@Bean
public JedisPool jedisPool() {
return new JedisPool(this.host, this.port);
}
}
以上就完成了所有redis分布式锁功能相关的文件添加,下面就是websocket需要添加的文件
第三步:添加websocket功能相关的文件
1. 添加websocket配置文件
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
/*
* @Description springWebsoket配置文件
* @Author muruan.lt
* @Date 2019/9/24 09:23
*/
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableSimpleBroker("/topic");
config.setApplicationDestinationPrefixes("/app");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/websocket").setAllowedOrigins("*").withSockJS();
}
}
至此就完成了两个功能相关的全部文件,下面就是功能的使用和检测。
第四步:功能的使用和检测
import com.alibaba.fastjson.JSONObject;
import com.iris.websocket.annotation.CacheLock;
import lombok.extern.slf4j.Slf4j;
import net.javacrumbs.shedlock.core.SchedulerLock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.simp.SimpMessageSendingOperations;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.Date;
/*
* @Description 定时任务测试类
* @Author muruan.lt
* @Date 2019/9/2 14:27
*/
@Component
@Slf4j
public class TaskTest {
@Autowired
private SimpMessageSendingOperations messageTemplate;
@Scheduled(cron = "0/10 * * * * ?")
@CacheLock(lockedPrefix = "test",expireTime=9)
public void test1(){
System.out.println(new Date() + "hello1!");
JSONObject jsonObject = new JSONObject();
jsonObject.put("code", 200);
jsonObject.put("content", "hello1!");
String resStr = jsonObject.toJSONString();
log.info("send message to browser,message content [{}]", resStr);
messageTemplate.convertAndSend("/topic/greetings", resStr);
}
@Scheduled(cron = "0/10 * * * * ?")
@CacheLock(lockedPrefix = "test",expireTime=9)
public void test2(){
System.out.println(new Date() + "hello2!");
}
/**
* 执行定时任务
**/
@Scheduled(cron = "0/10 * * * * ?")
@CacheLock(lockedPrefix = "TimeTaskService",expireTime=9)
public void executeTask() {
System.out.println(new Date() +"hello3!");
}
/**
* 执行定时任务
**/
@Scheduled(cron = "0/10 * * * * ?")
@CacheLock(lockedPrefix = "TimeTaskService",expireTime=9)
public void executeTask1() {
System.out.println(new Date() +"hello4!");
}
}
websocket的使用:在需要向前端推送消息的地方添加
@Autowired
private SimpMessageSendingOperations messageTemplate;
即可,向特定的topic发送相关,客户端就可以通过这个topic接收到新消息
@CacheLock(lockedPrefix = "TimeTaskService",expireTime=9)
这个注解就是实现分布式redis锁的关键,在需要解决分布式部署重复执行问题的方法上添加此注解就可以解决分布式部署重复执行的问题。 |