java多线程批量上传文件_Java大文件上传解决方案

(74) 2024-07-05 20:01:01

问题

项目解决的问题主要是java实现分片上传功能,问题描述:
楼主在公司最近项目中使用multipart文件上传视频文件到服务器上,然后用fastdfs保存到数据库中。发现当上传的视频文件太大的时候会使服务器内存的buf/cache占用很高(好几个G),虽然可以手动清除,但是依旧无法从根源上解决视频上传内存占用太大的问题。

java多线程批量上传文件_Java大文件上传解决方案 (https://mushiming.com/)  第1张
清理的方式如下,可以看到清理前后buff/cahce有明显变小

sync;echo 1 > /proc/sys/vm/drop_caches # 表示清除pagecache。 sync;echo 2 > /proc/sys/vm/drop_caches # 表示清除回收slab分配器中的对象(包括目录项缓存和inode缓存)。slab分配器是内核中管理内存的一种机制,其中很多缓存数据实现都是用的pagecache。 sync;echo 3 > /proc/sys/vm/drop_caches # 表示清除pagecache和slab分配器中的缓存对象。 

java多线程批量上传文件_Java大文件上传解决方案 (https://mushiming.com/)  第2张


解决问题的思路

lz花了100积分在csdn上面提的问题:急急急,求java上传大文件占用jvm过高的问题解决方案/思路

1、mmf,通过memory mapped file 内存映射文件将数据分段存储到mysql或者其他数据库中,不适合,略
2、服务器上ftp,然后通过代码来借助ftp实现
3、前端通过vue-upload来实现文件的分片上传功能,后端使用fastdfs自带的分片功能实现数据存储(lz的解决方式)
4、使用ffmpeg将视频分成m3u8视频然后本地服务器再实现合成


解决的问题/实现的功能

前端实现的功能:simple-uploader.js(也称 Uploader) 是一个上传库,支持多并发上传,文件夹、拖拽、可暂停继续、秒传、分块上传、出错自动重传、手工重传、进度、剩余时间、上传速度等特性;该上传库依赖 HTML5 File API。

后端实现的功能:springboot 整合 fastDfs,redis实现,.本地路径文件分片上传 2.fastDfs 文件上传,下载,分片上传


项目实现技术 - 先说一下使用到大概的东西,防止大家数据库或者其他原因无法使用,导致浪费时间。 **使用到的技术:simple-uploader(前端) + fastdfs(数据库)+ springboot(项目框架)+Redis**

项目源地址:

  • 前端: simple-uploader
  • vue前端:vue -simple-uploader
  • 后端:fastDfs-demo
    注:十分感谢提供开源项目的作者,楼主的代码是根据两个项目改变来的,这两个项目是源码。
  • 楼主自己的代码:
    链接:地址
    提取码:ey36

实现的效果 -

java多线程批量上传文件_Java大文件上传解决方案 (https://mushiming.com/)  第3张
java多线程批量上传文件_Java大文件上传解决方案 (https://mushiming.com/)  第4张


实现原理:

  • 前端使用分片插件后,一个请求会被分成多个请求。多个upload请求均为分片的请求,把大文件分成多个小份一次一次向服务器传递分片完成后,即upload完成后,需要向服务器传递一个merge请求,让服务器将多个分片文件合成一个文件,当我们上传一个大文件时,会被插件分片,ajax请求如下:
    java多线程批量上传文件_Java大文件上传解决方案 (https://mushiming.com/)  第5张

  • 可以看到发起了多次upload的请求,我们来看看upload发送的具体参数
    java多线程批量上传文件_Java大文件上传解决方案 (https://mushiming.com/)  第6张

  • 第一个配置(content-disposition)中的guid和第二个配置中的access_token,是我们通过webuploader配置里的formData,即传递给服务器的参数后面几个配置是文件内容,chunkNumber、chunkSize、currentChunkSize等其中totalChunks为总分片数,chunkSize为当前第几个分片。图片中为13。当你看到chunk是130的upload请求时,代表这是最后一个upload请求了

  • 分片后,文件还未整合,数据大概是下面这个样子:
    java多线程批量上传文件_Java大文件上传解决方案 (https://mushiming.com/)  第7张

  • 后台的验证
    1 在“加入文件”的回调中,通过FileReader读取文件,生成MD5,发给后台
    2.1 如果后台直接返回了“跳过上传”字段和文件的url,则跳过上传,这是秒传;
    2.2 如果后台返回了分片信息,这是断点续传。后台会在每个分片中标识这个分片是否上传过,你需要在分片上传校验的回调中判断,如果true则跳过该分片。
    3 每个分片上传成功,后台都会返回一个字段判断是否需要合并;在“上传完成”的回调中,如果这个字段为true,则需要给后台发一个请求合并的ajax请求

代码分享

楼主前面分享的项目改后的代码

后端核心代码:

api层:

@PostMapping(value = "/fastDfsChunkUpload", consumes = MediaType.MULTIPART_FORM_DATA_VALUE) public Map chunkUpload1(MultipartFileParam multipartFileParam, HttpServletResponse response) { 
    Map<String, String> map = new HashMap<>(); long chunk = multipartFileParam.getChunkNumber(); long totalChunk = multipartFileParam.getTotalChunks(); long chunkSize = multipartFileParam.getChunkSize(); long historyUpload = (chunk - 1) * chunkSize; String md5 = multipartFileParam.getIdentifier(); MultipartFile file = multipartFileParam.getFile(); String fileName = FileUtil.extName(file.getOriginalFilename()); StorePath path = null; String groundPath; try { 
    if (chunk == 1) { 
    path = appendFileStorageClient.uploadAppenderFile(UpLoadConstant.DEFAULT_GROUP, file.getInputStream(), file.getSize(), fileName); if (path == null) { 
    map.put("result", "上传第一个就错了"); response.setStatus(500); return map; } else { 
    redisUtil.setObject(UpLoadConstant.uploadChunkNum + md5, 1, cacheTime); map.put("result", "上传成功"); } groundPath = path.getPath(); redisUtil.setObject(UpLoadConstant.fastDfsPath + md5, groundPath, cacheTime); } else { 
    groundPath = (String) redisUtil.getObject(UpLoadConstant.fastDfsPath + md5); appendFileStorageClient.modifyFile(UpLoadConstant.DEFAULT_GROUP, groundPath, file.getInputStream(), file.getSize(), historyUpload); Integer chunkNum = (Integer) redisUtil.getObject(UpLoadConstant.uploadChunkNum + md5); chunkNum = chunkNum + 1; redisUtil.setObject(UpLoadConstant.uploadChunkNum + md5, chunkNum, cacheTime); } Integer num = (Integer) redisUtil.getObject(UpLoadConstant.uploadChunkNum + md5); if (totalChunk == num) { 
    response.setStatus(200); map.put("result", "上传成功"); map.put("path", groundPath); redisUtil.del(UpLoadConstant.uploadChunkNum + md5); redisUtil.del(UpLoadConstant.fastDfsPath + md5); } } catch (FdfsIOException | SocketTimeoutException e) { 
    response.setStatus(407); map.put("result", "重新发送"); return map; } catch (Exception e) { 
    e.printStackTrace(); redisUtil.del(UpLoadConstant.uploadChunkNum + md5); redisUtil.del(UpLoadConstant.fastDfsPath + md5); response.setStatus(500); map.put("result", "upload error"); return map; } System.out.println("result=" + map.get("result")); System.out.println("path=" + map.get("path")); return map; } 

实体类:MultipartFileParam

package com.dgut.fastdfs.entity; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import lombok.ToString; import org.springframework.web.multipart.MultipartFile; import java.io.Serializable; /** * @author :CZW * @date :Created in 2019/12/15 12:27 * @description: */ @Data @AllArgsConstructor @NoArgsConstructor @ToString public class MultipartFileParam implements Serializable { 
    private String taskId;//文件传输任务ID private long chunkNumber;//当前为第几分片 private long chunkSize;//每个分块的大小 private long totalChunks;//分片总数 private String identifier;//文件唯一标识 private MultipartFile file;//分块文件传输对象 } 

工具类:RedisUtil

package com.dgut.fastdfs.utils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import java.util.List; import java.util.concurrent.TimeUnit; @Component public class RedisUtil { 
    @Autowired private RedisTemplate<String, Object> redisTemplate; //写入对象 public boolean setObject(final String key, Object value, Integer expireTime) { 
    try { 
    redisTemplate.opsForValue().set(key, value); redisTemplate.expire(key, expireTime, TimeUnit.SECONDS); return true; } catch (Exception e) { 
    e.printStackTrace(); return false; } } //获取对象 public Object getObject(final String key) { 
    return key == null ? null : redisTemplate.opsForValue().get(key); } //写入集合 public boolean setList(final String key, Object value, Integer expireTime) { 
    try { 
    redisTemplate.opsForList().rightPush(key, value); redisTemplate.expire(key, expireTime, TimeUnit.SECONDS); return true; } catch (Exception e) { 
    e.printStackTrace(); return false; } } //获取集合 public List<Object> getList(final String key) { 
    try { 
    return redisTemplate.opsForList().range(key, 0, -1); } catch (Exception e) { 
    e.printStackTrace(); return null; } } //判断时候存在key public boolean hasKey(final String key) { 
    try { 
    return redisTemplate.hasKey(key); } catch (Exception e) { 
    e.printStackTrace(); } return false; } //删除key public void del(final String key) { 
    if (hasKey(key)) { 
    redisTemplate.delete(key); } } } 

前端主要需要修改的地方

  • 访问的api
  • 同时上传的分片数量
    java多线程批量上传文件_Java大文件上传解决方案 (https://mushiming.com/)  第8张
    项目代码已经上传到百度网盘上面去了,可以自己去下载

尾声

lz花了两个小时的时间整理资源,忘大家看到后别忘了点个赞,谢谢


来生还长,切勿惆怅


*******************网友提问补充******************

问题:fastdfs如何配置以及相关依赖

依赖(好像不是很好下载,可能需要去github上下载)

<!--FastDFS--> <dependency> <groupId>org.csource</groupId> <artifactId>fastdfs-client-java</artifactId> <version>1.27-SNAPSHOT</version> </dependency> <dependency> <groupId>com.github.tobato</groupId> <artifactId>fastdfs-client</artifactId> <version>1.26.2</version> </dependency> 

配置文件项目位置及参数:fdfs_client.properties
java多线程批量上传文件_Java大文件上传解决方案 (https://mushiming.com/)  第9张

connect_timeout = 10 network_timeout = 30 charset = UTF-8 http.tracker_http_port = 80 http.anti_steal_token = no http.secret_key = FastDFS tracker_server = FastDFS_IP:22122 

加载FastDFS配置文件的工具类

import org.csource.common.MyException; import org.csource.common.NameValuePair; import org.csource.fastdfs.*; import java.io.BufferedOutputStream; import java.io.IOException; import java.net.URLDecoder; /** * @author Marsj * @description: FastDFS工具类【实现文件上传、下载、删除、查询】 * @date 2019/3/26 */ public class FastDFSClientUtil { 
    private TrackerClient trackerClient; private TrackerServer trackerServer; private StorageServer storageServer; private StorageClient1 storageClient; public FastDFSClientUtil() throws IOException, MyException { 
    // FastDFSConfig config = ApplicationContext.getBean(FastDFSConfig.class); //使用类加载器方法加载配置文件在linux上打成jar包后会造成无法读取参数,可以使用上面方法或者直接写服务器配置文件路径 String osName = System.getProperty("os.name"); if (osName.startsWith("Windows")) { 
    // windows String conf = this.getClass().getClassLoader().getResource("fdfs_client.properties").getPath(); String path = URLDecoder.decode(getClass().getProtectionDomain().getCodeSource().getLocation().toString(), "UTF-8"); path = path.substring(6); conf = conf.replace("classpath:", URLDecoder.decode(path, "UTF-8")); ClientGlobal.init(conf); } else { 
    ClientGlobal.init("/data/fdfs_client.properties"); } trackerClient = new TrackerClient(); trackerServer = trackerClient.getConnection(); storageServer = null; storageClient = new StorageClient1(trackerServer, storageServer); } /** * @param filePath 需要查询的文件路径 * @return 为null则代表文件不存在,正常返回FileInfo文件信息(包含文件大小,创建时间,上传者的id等等) * @author Czw * @Description 查询FastDFS中路径文件是否存在 * @date 2019/5/8 下午 6:58 */ public FileInfo query_file_info(String filePath) { 
    FileInfo fileInfo = null; try { 
    fileInfo = storageClient.query_file_info1(filePath); } catch (IOException | MyException e) { 
    e.printStackTrace(); } return fileInfo; } /** * 上传文件方法 * <p>Title: uploadFile</p> * <p>Description: </p> * * @param fileName 文件全路径 * @param extName 文件扩展名,不包含(.) * @param metas 文件扩展信息 */ public String uploadFile(String fileName, String extName, NameValuePair[] metas) { 
    String result = null; try { 
    result = storageClient.upload_file1(fileName, extName, metas); } catch (Exception e) { 
    e.printStackTrace(); } return result; } /** * 上传文件,传fileName * * @param fileName 文件的磁盘路径名称 如:D:/image/aaa.jpg * @return null为失败 */ public String uploadFile(String fileName) { 
    return uploadFile(fileName, null, null); } /** * @param fileName 文件的磁盘路径名称 如:D:/image/aaa.jpg * @param extName 文件的扩展名 如 txt jpg等 * @return null为失败 */ public String uploadFile(String fileName, String extName) { 
    return uploadFile(fileName, extName, null); } /** * 上传文件方法 * <p>Title: uploadFile</p> * <p>Description: </p> * * @param fileContent 文件的内容,字节数组 * @param extName 文件扩展名 * @param metas 文件扩展信息 */ public String uploadFile(byte[] fileContent, String extName, NameValuePair[] metas) { 
    String result = null; try { 
    result = storageClient.upload_file1(fileContent, extName, metas); } catch (IOException | MyException e) { 
    e.printStackTrace(); } return result; } /** * 上传文件 * * @param fileContent 文件的字节数组 * @return null为失败 */ public String uploadFile(byte[] fileContent) { 
    return uploadFile(fileContent, null, null); } /** * 上传文件 * * @param fileContent 文件的字节数组 * @param extName 文件的扩展名 如 txt jpg png 等 * @return null为失败 */ public String uploadFile(byte[] fileContent, String extName) { 
    return uploadFile(fileContent, extName, null); } /** * 文件下载到磁盘 * * @param path 图片路径 * @param output 输出流 中包含要输出到磁盘的路径 * @return -1失败,0成功 */ public int download_file(String path, BufferedOutputStream output) { 
    int result = -1; try { 
    byte[] b = storageClient.download_file1(path); try { 
    if (b != null) { 
    output.write(b); result = 0; } } catch (Exception e) { 
    } //用户可能取消了下载 finally { 
    if (output != null) { 
    try { 
    output.close(); } catch (IOException e) { 
    e.printStackTrace(); } } } } catch (Exception e) { 
    e.printStackTrace(); } return result; } /** * 获取文件数组 * * @param path 文件的路径 如group1/M00/00/00/wKgRsVjtwpSAXGwkAAAweEAzRjw471.jpg * @return */ public byte[] download_bytes(String path) { 
    byte[] b = null; try { 
    b = storageClient.download_file1(path); } catch (Exception e) { 
    e.printStackTrace(); } return b; } /** * 删除文件 * * @param group 组名 如:group1 * @param storagePath 不带组名的路径名称 如:M00/00/00/wKgRsVjtwpSAXGwkAAAweEAzRjw471.jpg * @return -1失败,0成功 */ public Integer delete_file(String group, String storagePath) { 
    int result = -1; try { 
    result = storageClient.delete_file(group, storagePath); } catch (Exception e) { 
    e.printStackTrace(); } return result; } /** * @param storagePath 文件的全部路径 如:group1/M00/00/00/wKgRsVjtwpSAXGwkAAAweEAzRjw471.jpg * @return -1失败,0成功 */ public Integer delete_file(String storagePath) { 
    int result = -1; try { 
    result = storageClient.delete_file1(storagePath); } catch (Exception e) { 
    e.printStackTrace(); } return result; } /** * 获取远程服务器文件资源信息 * * @param groupName 文件组名 如:group1 * @param remoteFileName M00/00/00/wKgRsVjtwpSAXGwkAAAweEAzRjw471.jpg */ public FileInfo getFile(String groupName, String remoteFileName) { 
    try { 
    return storageClient.get_file_info(groupName, remoteFileName); } catch (Exception e) { 
    e.printStackTrace(); } return null; } /** * 获取远程服务器文件资源信息 * * @param remoteFileName /group01/M00/00/00/wKgRsVjtwpSAXGwkAAAweEAzRjw471.jpg */ public FileInfo getFile(String remoteFileName) { 
    try { 
    return storageClient.get_file_info1(remoteFileName); } catch (Exception e) { 
    e.printStackTrace(); } return null; } } 

网友提问补充**
上传小文件时正常,上传大文件时报错找不到节点,报错点是appendFileStorageClient.modifyFile 处的代码,个人认为是fastdfs的tracker_server 或者tracker_list参数配置错误导致的,目前还在找问题中,及时更新
java多线程批量上传文件_Java大文件上传解决方案 (https://mushiming.com/)  第10张


************补充******************* 上面提到的 节点找不到的错误经过网友@阿姆斯特狼 差不多一个月的努力总算找到了问题的原因,跟我之前的猜想差不多,确实是fastdfs的参数有问题,需要把store_group 和store lookup的参数配置正确就可以,特别是store lookup参数默认参数是1,指定. 0就是轮训

java多线程批量上传文件_Java大文件上传解决方案 (https://mushiming.com/)  第11张

THE END

发表回复