Compare commits

...

4 Commits

17 changed files with 263 additions and 87 deletions

View File

@ -17,9 +17,7 @@ import org.springframework.scheduling.annotation.EnableScheduling;
@EnableRyFeignClients
@SpringBootApplication
@EnableScheduling
public class
RuoYiManagementPlatformApplication {
public class RuoYiManagementPlatformApplication {
public static void main(String[] args) {
SpringApplication.run(RuoYiManagementPlatformApplication.class, args);
System.out.println("(♥◠‿◠)ノ゙ 复杂智能软件管理平台启动成功 ლ(´ڡ`ლ)゙ \n" +

View File

@ -4,7 +4,7 @@ import com.ruoyi.common.core.web.controller.BaseController;
import com.ruoyi.common.core.web.domain.GenericsAjaxResult;
import com.ruoyi.platform.domain.DatasetVersion;
import com.ruoyi.platform.service.DatasetVersionService;
import com.ruoyi.platform.vo.LabelDatasetVersion;
import com.ruoyi.platform.vo.LabelDatasetVersionVo;
import io.swagger.annotations.ApiOperation;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
@ -138,8 +138,10 @@ public class DatasetVersionController extends BaseController {
@PostMapping("/addDatasetVersionsFromLabel")
@ApiOperation("从数据标注添加数据集版本")
public GenericsAjaxResult<Boolean> addDatasetVersionsFromLabel(@RequestBody LabelDatasetVersion labelDatasetVersion) throws Exception {
return genericsSuccess(true);
public GenericsAjaxResult<?> addDatasetVersionsFromLabel(@RequestBody LabelDatasetVersionVo labelDatasetVersionVo) throws Exception {
datasetVersionService.addDatasetVersionsFromLabel(labelDatasetVersionVo);
return GenericsAjaxResult.success();
}
}

View File

@ -92,5 +92,7 @@ public interface ExperimentInsDao {
List<ExperimentIns> queryByExperiment(@Param("experimentIns") ExperimentIns experimentIns);
List<ExperimentIns> queryByExperimentId(Integer id);
List<ExperimentIns> queryByExperimentIsNotTerminated();
}

View File

@ -0,0 +1,88 @@
package com.ruoyi.platform.scheduling;
import com.ruoyi.common.security.utils.SecurityUtils;
import com.ruoyi.platform.domain.Experiment;
import com.ruoyi.platform.domain.ExperimentIns;
import com.ruoyi.platform.mapper.ExperimentDao;
import com.ruoyi.platform.mapper.ExperimentInsDao;
import com.ruoyi.platform.service.ExperimentInsService;
import com.ruoyi.platform.service.ExperimentService;
import com.ruoyi.platform.utils.JsonUtils;
import com.ruoyi.system.api.model.LoginUser;
import io.swagger.models.auth.In;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.*;
import java.util.stream.Collectors;
@Component()
public class ExperimentInstanceStatusTask {
@Autowired
private ExperimentInsService experimentInsService;
@Resource
private ExperimentDao experimentDao;
@Resource
private ExperimentInsDao experimentInsDao;
private List<Integer> experimentIds = new ArrayList<>();
@Scheduled(cron = "0/30 * * * * ?") // 每30S执行一次
public void executeExperimentInsStatus() throws IOException {
// 查到所有非终止态的实例
List<ExperimentIns> experimentInsList = experimentInsService.queryByExperimentIsNotTerminated();
// 去argo查询状态
List<ExperimentIns> updateList = new ArrayList<>();
if (experimentInsList != null && experimentInsList.size() > 0) {
for (ExperimentIns experimentIns : experimentInsList) {
//当原本状态为null或非终止态时才调用argo接口
String oldStatus = experimentIns.getStatus();
try {
experimentIns = experimentInsService.queryStatusFromArgo(experimentIns);
}catch (Exception e){
experimentIns.setStatus("Failed");
}
if (!StringUtils.equals(oldStatus,experimentIns.getStatus())){
experimentIns.setUpdateTime(new Date());
if (!experimentIds.contains(experimentIns.getExperimentId())){
experimentIds.add(experimentIns.getExperimentId());
}
updateList.add(experimentIns);
}
experimentInsDao.update(experimentIns);
}
}
if (updateList.size() > 0){
experimentInsDao.insertOrUpdateBatch(updateList);
}
}
@Scheduled(cron = "0/30 * * * * ?") // / 每30S执行一次
public void executeExperimentStatus() throws IOException {
if (experimentIds.size()==0){
return;
}
List<Experiment> updateexperiments = new ArrayList<>();
for (Integer experimentId : experimentIds){
List<ExperimentIns> insList = experimentInsService.getByExperimentId(experimentId);
List<String> statusList = new ArrayList<String>();
// 更新实验状态列表
for (int i=0;i<insList.size();i++){
statusList.add(insList.get(i).getStatus());
}
String subStatus = statusList.toString().substring(1, statusList.toString().length() - 1);
Experiment experiment = experimentDao.queryById(experimentId);
if (!StringUtils.equals(subStatus,experiment.getStatusList())){
updateexperiments.add(experiment);
}
}
if (updateexperiments.size() > 0){
experimentDao.insertOrUpdateBatch(updateexperiments);
}
experimentIds.clear();
System.out.println(experimentIds);
}
}

View File

@ -1,11 +1,8 @@
package com.ruoyi.platform.service;
import com.ruoyi.platform.domain.Dataset;
import com.ruoyi.platform.domain.DatasetVersion;
import com.ruoyi.platform.vo.LabelDatasetVersion;
import com.ruoyi.platform.vo.LabelDatasetVersionVo;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
@ -75,5 +72,6 @@ public interface DatasetVersionService {
String addDatasetVersions(List<DatasetVersion> datasetVersions) throws Exception;
void addDatasetVersionsFromLabel(LabelDatasetVersion labelDatasetVersion) throws Exception;
void
addDatasetVersionsFromLabel(LabelDatasetVersionVo labelDatasetVersionVo) throws Exception;
}

View File

@ -98,4 +98,9 @@ public interface ExperimentInsService {
String getRealtimePodLogFromPod(PodLogVo podLogVo);
/**
* 查询非终止态的实例
* @return
*/
List<ExperimentIns> queryByExperimentIsNotTerminated();
}

View File

@ -4,6 +4,7 @@ import org.springframework.core.io.InputStreamResource;
import org.springframework.http.ResponseEntity;
import org.springframework.web.multipart.MultipartFile;
import java.io.InputStream;
import java.util.List;
import java.util.Map;
@ -11,5 +12,5 @@ public interface MinioService {
ResponseEntity<InputStreamResource> downloadZipFile(String bucketName , String path);
Map<String, String> uploadFile(String bucketName, String objectName, MultipartFile file ) throws Exception;
void uploaInputStream(String bucketName, String objectName, InputStream inputStream ) throws Exception;
}

View File

@ -20,6 +20,7 @@ import com.ruoyi.platform.vo.DatasetVo;
import com.ruoyi.system.api.model.LoginUser;
import io.minio.messages.Item;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.io.InputStreamResource;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageImpl;
@ -67,7 +68,8 @@ public class DatasetServiceImpl implements DatasetService {
private MinioService minioService;
// 固定存储桶名
private final String bucketName = "platform-data";
@Value("${minio.dataReleaseBucketName}")
private String bucketName;
@Resource
private MinioUtil minioUtil;

View File

@ -1,17 +1,20 @@
package com.ruoyi.platform.service.impl;
import com.ruoyi.common.core.utils.DateUtils;
import com.ruoyi.common.security.utils.SecurityUtils;
import com.ruoyi.platform.annotations.CheckDuplicate;
import com.ruoyi.platform.domain.Dataset;
import com.ruoyi.platform.domain.DatasetVersion;
import com.ruoyi.platform.domain.ModelsVersion;
import com.ruoyi.platform.domain.Workflow;
import com.ruoyi.platform.mapper.DatasetDao;
import com.ruoyi.platform.mapper.DatasetVersionDao;
import com.ruoyi.platform.service.DatasetVersionService;
import com.ruoyi.platform.service.MinioService;
import com.ruoyi.platform.utils.FileUtil;
import com.ruoyi.platform.utils.HttpUtils;
import com.ruoyi.platform.vo.LabelDatasetVersion;
import com.ruoyi.platform.vo.LabelDatasetVersionVo;
import com.ruoyi.system.api.model.LoginUser;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageImpl;
import org.springframework.data.domain.PageRequest;
@ -35,9 +38,13 @@ import java.util.Map;
public class DatasetVersionServiceImpl implements DatasetVersionService {
@Resource
private DatasetVersionDao datasetVersionDao;
@Resource
private DatasetDao datasetDao;
// 固定存储桶名
private final String bucketName = "platform-data";
@Value("${minio.dataReleaseBucketName}")
private String bucketName;
@Resource
private MinioService minioService;
/**
* 通过ID查询单条数据
*
@ -198,12 +205,31 @@ public class DatasetVersionServiceImpl implements DatasetVersionService {
}
@Override
public void addDatasetVersionsFromLabel(LabelDatasetVersion labelDatasetVersion) throws Exception{
public void addDatasetVersionsFromLabel(LabelDatasetVersionVo labelDatasetVersionVo) throws Exception{
Dataset dataset = datasetDao.queryById(labelDatasetVersionVo.getDataset_id());
if (dataset == null){
throw new Exception("数据集不存在");
}
// 获取label-studio数据流
InputStream inputStream = HttpUtils.getInputStream("http://127.0.0.1:8080/api/projects/"+labelDatasetVersion.getProject_id()+"/export?exportType="+labelDatasetVersion.getExportType(), labelDatasetVersion.getToken());
// 上传镜像至minio
Map<String, String> headers = new HashMap<String, String>();
headers.put("Authorization","Token "+labelDatasetVersionVo.getToken());
InputStream inputStream = HttpUtils.getIntputStream("http://127.0.0.1:8080/api/projects/"+labelDatasetVersionVo.getProject_id()+"/export?exportType="+labelDatasetVersionVo.getExportType(), headers);
// 构建objectName
String username = SecurityUtils.getLoginUser().getUsername();
String url = username + "/" + DateUtils.dateTimeNow() + "/" + dataset.getName()+"_"+labelDatasetVersionVo.getVersion()+"."+labelDatasetVersionVo.getExportType();
String objectName = "datasets/" + url;
String formattedSize = FileUtil.formatFileSize(inputStream.available());
minioService.uploaInputStream(bucketName,objectName,inputStream);
//保存DatasetVersion
DatasetVersion datasetVersion = new DatasetVersion();
datasetVersion.setVersion(labelDatasetVersionVo.getVersion());
datasetVersion.setDatasetId(labelDatasetVersionVo.getDataset_id());
datasetVersion.setFileName(dataset.getName()+"_"+labelDatasetVersionVo.getVersion()+"."+labelDatasetVersionVo.getExportType());
datasetVersion.setFileSize(formattedSize);
datasetVersion.setUrl(url);
datasetVersion.setDescription(labelDatasetVersionVo.getDesc());
this.insert(datasetVersion);
}
private void insertPrepare(DatasetVersion datasetVersion) throws Exception {

View File

@ -71,14 +71,16 @@ public class ExperimentInsServiceImpl implements ExperimentInsService {
@Override
public ExperimentIns queryById(Integer id) throws IOException {
ExperimentIns experimentIns = this.experimentInsDao.queryById(id);
if (experimentIns != null && (StringUtils.isEmpty(experimentIns.getStatus())) || !isTerminatedState(experimentIns)) {
experimentIns = this.queryStatusFromArgo(experimentIns);
//只有当新状态是终止态时才更新数据库
if (isTerminatedState(experimentIns)) {
//同时更新各个节点
this.update(experimentIns);
}
}
//已经迁移至定时任务进行更新操作
// if (experimentIns != null && (StringUtils.isEmpty(experimentIns.getStatus())) || !isTerminatedState(experimentIns)) {
// experimentIns = this.queryStatusFromArgo(experimentIns);
// //只有当新状态是终止态时才更新数据库
// if (isTerminatedState(experimentIns)) {
// //同时更新各个节点
// this.update(experimentIns);
// }
// }
return experimentIns;
}
@ -93,40 +95,42 @@ public class ExperimentInsServiceImpl implements ExperimentInsService {
@Override
public List<ExperimentIns> getByExperimentId(Integer experimentId) throws IOException {
List<ExperimentIns> experimentInsList = experimentInsDao.getByExperimentId(experimentId);
//代码全部迁移至定时任务
//搞个标记当状态改变才去改表
boolean flag = false;
List<ExperimentIns> result = new ArrayList<ExperimentIns>();
if (experimentInsList!=null && experimentInsList.size()>0) {
for (ExperimentIns experimentIns : experimentInsList) {
//当原本状态为null或非终止态时才调用argo接口
if (experimentIns != null && (StringUtils.isEmpty(experimentIns.getStatus())) || !isTerminatedState(experimentIns)) {
experimentIns = this.queryStatusFromArgo(experimentIns);
if (!flag){
flag = true;
}
//只有当新状态是终止态时才更新数据库
if (isTerminatedState(experimentIns)) {
//同时更新各个节点
this.update(experimentIns);
}
}
// boolean flag = false;
// List<ExperimentIns> result = new ArrayList<ExperimentIns>();
// if (experimentInsList!=null && experimentInsList.size()>0) {
// for (ExperimentIns experimentIns : experimentInsList) {
// //当原本状态为null或非终止态时才调用argo接口
// if (experimentIns != null && (StringUtils.isEmpty(experimentIns.getStatus())) || !isTerminatedState(experimentIns)) {
// experimentIns = this.queryStatusFromArgo(experimentIns);
// if (!flag){
// flag = true;
// }
// //只有当新状态是终止态时才更新数据库
// if (isTerminatedState(experimentIns)) {
// //同时更新各个节点
// this.update(experimentIns);
// }
// }
//
// //新增查询tensorBoard容器状态
// result.add(experimentIns);
// }
// }
// if (flag) {
// List<String> statusList = new ArrayList<String>();
// // 更新实验状态列表
// for (int i=0;i<result.size();i++){
// statusList.add(result.get(i).getStatus());
// }
// Experiment experiment = experimentDao.queryById(experimentId);
// experiment.setStatusList(statusList.toString().substring(1, statusList.toString().length()-1));
// experimentDao.update(experiment);
// }
//新增查询tensorBoard容器状态
result.add(experimentIns);
}
}
if (flag) {
List<String> statusList = new ArrayList<String>();
// 更新实验状态列表
for (int i=0;i<result.size();i++){
statusList.add(result.get(i).getStatus());
}
Experiment experiment = experimentDao.queryById(experimentId);
experiment.setStatusList(statusList.toString().substring(1, statusList.toString().length()-1));
experimentDao.update(experiment);
}
return result;
return experimentInsList;
}
@ -141,15 +145,15 @@ public class ExperimentInsServiceImpl implements ExperimentInsService {
public Page<ExperimentIns> queryByPage(ExperimentIns experimentIns, PageRequest pageRequest) throws IOException {
long total = this.experimentInsDao.count(experimentIns);
List<ExperimentIns> experimentInsList = this.experimentInsDao.queryAllByLimit(experimentIns, pageRequest);
if (experimentInsList!=null && experimentInsList.size()>0) {
for (ExperimentIns ins : experimentInsList) {
//如果实验实例不为空或者
if (ins != null && StringUtils.isEmpty(ins.getStatus())) {
ins = this.queryStatusFromArgo(ins);
this.update(ins);
}
}
}
// if (experimentInsList!=null && experimentInsList.size()>0) {
// for (ExperimentIns ins : experimentInsList) {
// //如果实验实例不为空或者
// if (ins != null && StringUtils.isEmpty(ins.getStatus())) {
// ins = this.queryStatusFromArgo(ins);
// this.update(ins);
// }
// }
// }
return new PageImpl<>(experimentInsList, pageRequest, total);
}
@ -524,6 +528,11 @@ public class ExperimentInsServiceImpl implements ExperimentInsService {
return k8sClientUtil.getPodLogs(podLogVo.getPodName(), podLogVo.getNamespace(),podLogVo.getContainerName(), logsLines);
}
@Override
public List<ExperimentIns> queryByExperimentIsNotTerminated() {
return experimentInsDao.queryByExperimentIsNotTerminated();
}
private boolean isTerminatedState(ExperimentIns ins) throws IOException {
// 定义终止态的列表例如 "Succeeded", "Failed"
String status = ins.getStatus();

View File

@ -256,8 +256,9 @@ public class ExperimentServiceImpl implements ExperimentService {
experimentIns.setExperimentId(experiment.getId());
experimentIns.setArgoInsNs((String) metadata.get("namespace"));
experimentIns.setArgoInsName((String) metadata.get("name"));
//传入实验全局参数
experimentIns.setStatus("Pending");
//传入实验全局参数
experimentIns.setGlobalParam(experiment.getGlobalParam());

View File

@ -69,4 +69,9 @@ public class MinioServiceImpl implements MinioService {
}
return result;
}
@Override
public void uploaInputStream(String bucketName, String objectName, InputStream inputStream ) throws Exception {
minioUtil.uploadObject(bucketName, objectName, inputStream);
}
}

View File

@ -21,6 +21,7 @@ import com.ruoyi.system.api.model.LoginUser;
import io.minio.messages.Item;
import io.netty.util.Version;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.io.InputStreamResource;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageImpl;
@ -67,7 +68,8 @@ public class ModelsServiceImpl implements ModelsService {
// 固定存储桶名
private final String bucketName = "platform-data";
@Value("${minio.dataReleaseBucketName}")
private String bucketName;
@Resource
private MinioUtil minioUtil;

View File

@ -8,6 +8,7 @@ import com.ruoyi.platform.mapper.ModelsVersionDao;
import com.ruoyi.platform.service.ModelsVersionService;
import com.ruoyi.system.api.model.LoginUser;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageImpl;
import org.springframework.data.domain.PageRequest;
@ -35,7 +36,8 @@ public class ModelsVersionServiceImpl implements ModelsVersionService {
private ModelsDao modelsDao;
// 固定存储桶名
private final String bucketName = "platform-data";
@Value("${minio.dataReleaseBucketName}")
private String bucketName;
/**
* 通过ID查询单条数据

View File

@ -375,25 +375,54 @@ public class HttpUtils {
}
/**
* 发送 HTTP 请求并返回二进制数据流InputStream
* 发送 HTTP 请求并返回二进制数据流OutputStream
*
* @param url 请求的 URL 地址
* @param token 要携带的 Token
* @return 服务器响应的二进制数据流InputStream
* @param headers 头节点
* @return 服务器响应的二进制数据流OutputStream
* @throws IOException 如果请求失败或发生其他 I/O 错误
*/
public static InputStream getInputStream(String url, String token) throws IOException {
public static InputStream getIntputStream(String url, Map<String, String> headers) throws IOException {
URL requestUrl = new URL(url);
HttpURLConnection connection = (HttpURLConnection) requestUrl.openConnection();
connection.setRequestMethod("GET");
connection.setRequestProperty("Authorization", "Bearer " + token); // 添加 Authorization 头部携带 Token
// Set request headers
for (Map.Entry<String, String> entry : headers.entrySet()) {
connection.setRequestProperty(entry.getKey(), entry.getValue());
}
// Set other connection properties (e.g., timeouts, request method, etc.)
connection.setRequestMethod("GET");
connection.setConnectTimeout(5000);
connection.setReadTimeout(5000);
// Connect and get the response code
connection.connect();
int responseCode = connection.getResponseCode();
if (responseCode == HttpURLConnection.HTTP_OK) {
return connection.getInputStream(); // 获取响应的输入流
// Get the response InputStream
InputStream inputStream = connection.getInputStream();
// // Create a ByteArrayOutputStream to store the response data
// ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
//
// // Read data from the InputStream and write it to the ByteArrayOutputStream
// byte[] buffer = new byte[1024];
// int bytesRead;
// while ((bytesRead = inputStream.read(buffer)) != -1) {
// outputStream.write(buffer, 0, bytesRead);
// }
//
// // Close the InputStream
// inputStream.close();
// Return the ByteArrayOutputStream as an OutputStream
return inputStream;
} else {
throw new IOException("HTTP 请求失败,状态码:" + responseCode);
throw new IOException("HTTP request failed with response code: " + responseCode);
}
}

View File

@ -6,10 +6,10 @@ import com.fasterxml.jackson.databind.annotation.JsonNaming;
import java.io.Serializable;
@JsonNaming(PropertyNamingStrategy.SnakeCaseStrategy.class)
public class LabelDatasetVersion implements Serializable {
public class LabelDatasetVersionVo implements Serializable {
private String token;
private String project_id;
private String dataset_id;
private Integer dataset_id;
private String version;
private String desc;
private String exportType;
@ -29,11 +29,11 @@ public class LabelDatasetVersion implements Serializable {
this.project_id = project_id;
}
public String getDataset_id() {
public Integer getDataset_id() {
return dataset_id;
}
public void setDataset_id(String dataset_id) {
public void setDataset_id(Integer dataset_id) {
this.dataset_id = dataset_id;
}

View File

@ -21,7 +21,13 @@
<result property="state" column="state" jdbcType="INTEGER"/>
</resultMap>
<!--查询非终止态的实例-->
<select id="queryByExperimentIsNotTerminated" resultMap="ExperimentInsMap">
select id, experiment_id, argo_ins_name, argo_ins_ns, status, nodes_status,nodes_result, nodes_logs,global_param, start_time, finish_time, create_by, create_time, update_by, update_time, state
from experiment_ins
where (status NOT IN ('Terminated', 'Succeeded', 'Failed')
OR status IS NULL) and state = 1
</select>
<!--查询单个-->
<select id="queryById" resultMap="ExperimentInsMap">