定时订阅程序启动后注册的服务

This commit is contained in:
徐晓伟 2024-02-21 13:12:28 +08:00
parent 107c3d9ac0
commit 3bfa4ad41b
2 changed files with 120 additions and 76 deletions

View File

@ -2,7 +2,9 @@ package cn.com.xuxiaowei.nacos.sentinel;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
@EnableScheduling
@SpringBootApplication
public class NacosSentinelApplication {

View File

@ -17,12 +17,11 @@ import org.apache.commons.lang3.RandomStringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
/**
* Nacos 注册中心 监听程序
@ -34,6 +33,8 @@ import java.util.UUID;
@Component
public class NacosDiscoveryListener {
private final Set<String> subscribes = ConcurrentHashMap.newKeySet();
private NacosSentinelDiscoveryProperties nacosSentinelDiscoveryProperties;
private NacosDiscoveryRepository nacosDiscoveryRepository;
@ -66,7 +67,16 @@ public class NacosDiscoveryListener {
namingService = NamingFactory.createNamingService(properties);
}
catch (NacosException e) {
throw new RuntimeException(e);
log.error(String.format("【%s】连接 Nacos 异常:", logId), e);
}
finally {
if (namingService == null) {
log.error("【{}】NamingService 为 null", logId);
}
else {
String serverStatus = namingService.getServerStatus();
log.info("【{}】NamingService 状态:{}Nacos 连接状态:{}", logId, serverStatus, serverStatus);
}
}
}
}
@ -112,93 +122,125 @@ public class NacosDiscoveryListener {
}
for (String serviceName : serviceNames) {
namingService.subscribe(serviceName, event -> {
NamingEvent namingEvent = (NamingEvent) event;
List<Instance> instances = namingEvent.getInstances();
subscribe(serviceName);
}
}
log.info("");
String subscribeLogId = RandomStringUtils.randomAlphanumeric(6);
log.info("【{}】Nacos 服务订阅: {}", subscribeLogId, serviceName);
log.info("【{}】Nacos 服务名称: {},服务数量: {},群组名称:{}", subscribeLogId, serviceName, instances.size(),
namingEvent.getGroupName());
@Scheduled(fixedRate = 5000)
public void myScheduledMethod() throws NacosException {
List<Discovery> discoveries = nacosDiscoveryRepository.listByServiceName(serviceName);
createNamingService();
List<Instance> adds = new ArrayList<>();
int pageNo = nacosSentinelDiscoveryProperties.getPageNo();
int pageSize = nacosSentinelDiscoveryProperties.getPageSize();
String groupName = nacosSentinelDiscoveryProperties.getGroupName();
ListView<String> servicesOfServer = namingService.getServicesOfServer(pageNo, pageSize, groupName);
List<String> serviceNames = servicesOfServer.getData();
int maxLength = 0;
for (String serviceName : serviceNames) {
maxLength = Math.max(maxLength, serviceName.length());
}
for (String serviceName : serviceNames) {
if (!subscribes.contains(serviceName)) {
subscribe(serviceName);
}
}
}
private void subscribe(String serviceName) throws NacosException {
namingService.subscribe(serviceName, event -> {
subscribes.add(serviceName);
NamingEvent namingEvent = (NamingEvent) event;
List<Instance> instances = namingEvent.getInstances();
log.info("");
String subscribeLogId = RandomStringUtils.randomAlphanumeric(6);
log.info("【{}】Nacos 服务订阅: {}", subscribeLogId, serviceName);
log.info("【{}】Nacos 服务名称: {},服务数量: {},群组名称:{}", subscribeLogId, serviceName, instances.size(),
namingEvent.getGroupName());
List<Discovery> discoveries = nacosDiscoveryRepository.listByServiceName(serviceName);
List<Instance> adds = new ArrayList<>();
for (Instance instance : instances) {
String ip = instance.getIp();
int port = instance.getPort();
String clusterName = instance.getClusterName();
log.info("【{}】Nacos 服务名称: {}IP: {},端口: {},群组名称: {},集群名称: {}", subscribeLogId, serviceName,
StringUtils.formatLength(ip, 15), StringUtils.formatLength(port, 5), namingEvent.getGroupName(),
clusterName);
boolean contains = false;
for (Discovery discovery : discoveries) {
if (discovery.getIp().equals(instance.getIp()) && discovery.getPort() == instance.getPort()) {
contains = true;
}
}
if (!contains) {
adds.add(instance);
}
}
List<Discovery> deletes = new ArrayList<>();
for (Discovery discovery : discoveries) {
boolean contains = false;
for (Instance instance : instances) {
if (discovery.getIp().equals(instance.getIp()) && discovery.getPort() == instance.getPort()) {
contains = true;
}
}
if (!contains) {
deletes.add(discovery);
}
}
if (!adds.isEmpty()) {
log.info("");
log.info("【{}】Nacos 上线服务: {}", subscribeLogId, serviceName);
for (Instance instance : adds) {
String ip = instance.getIp();
int port = instance.getPort();
String clusterName = instance.getClusterName();
log.info("【{}】Nacos 服务名称: {}IP: {},端口: {},群组名称: {},集群名称: {}", subscribeLogId, serviceName,
log.info("【{}】Nacos 上线服务名称: {}IP: {},端口: {},群组名称: {},集群名称: {}", subscribeLogId, serviceName,
StringUtils.formatLength(ip, 15), StringUtils.formatLength(port, 5),
namingEvent.getGroupName(), clusterName);
StringUtils.extractAtLeft(instance.getServiceName()), clusterName);
boolean contains = false;
for (Discovery discovery : discoveries) {
if (discovery.getIp().equals(instance.getIp()) && discovery.getPort() == instance.getPort()) {
contains = true;
}
}
if (!contains) {
adds.add(instance);
}
Discovery discovery = new Discovery().setId(UUID.randomUUID().toString())
.setServiceName(serviceName)
.setIp(ip)
.setPort(port);
nacosDiscoveryRepository.save(discovery);
}
}
List<Discovery> deletes = new ArrayList<>();
for (Discovery discovery : discoveries) {
if (!deletes.isEmpty()) {
log.warn("");
log.warn("【{}】Nacos 下线服务: {}", subscribeLogId, serviceName);
boolean contains = false;
for (Instance instance : instances) {
if (discovery.getIp().equals(instance.getIp()) && discovery.getPort() == instance.getPort()) {
contains = true;
}
}
for (Discovery discovery : deletes) {
String id = discovery.getId();
String ip = discovery.getIp();
int port = discovery.getPort();
log.warn("【{}】Nacos 下线服务名称: {}IP: {},端口: {}", subscribeLogId, serviceName, ip, port);
if (!contains) {
deletes.add(discovery);
}
nacosDiscoveryRepository.deleteById(id);
}
}
if (!adds.isEmpty()) {
log.info("");
log.info("【{}】Nacos 上线服务: {}", subscribeLogId, serviceName);
for (Instance instance : adds) {
String ip = instance.getIp();
int port = instance.getPort();
String clusterName = instance.getClusterName();
log.info("【{}】Nacos 上线服务名称: {}IP: {},端口: {},群组名称: {},集群名称: {}", subscribeLogId, serviceName,
StringUtils.formatLength(ip, 15), StringUtils.formatLength(port, 5),
StringUtils.extractAtLeft(instance.getServiceName()), clusterName);
Discovery discovery = new Discovery().setId(UUID.randomUUID().toString())
.setServiceName(serviceName)
.setIp(ip)
.setPort(port);
nacosDiscoveryRepository.save(discovery);
}
}
if (!deletes.isEmpty()) {
log.warn("");
log.warn("【{}】Nacos 下线服务: {}", subscribeLogId, serviceName);
for (Discovery discovery : deletes) {
String id = discovery.getId();
String ip = discovery.getIp();
int port = discovery.getPort();
log.warn("【{}】Nacos 下线服务名称: {}IP: {},端口: {}", subscribeLogId, serviceName, ip, port);
nacosDiscoveryRepository.deleteById(id);
}
}
});
}
});
}
private void healthy(String logId, int maxLength, String serviceName, NamingService namingService, boolean healthy)