hadoop hdfs 断点续传-下载
hadoop hdfs 断点续传--下载
然后根据length读取相应的长度即可,仅供参考:
我们做了一个类似webhdfs的服务,通过rest api存储HDSF上的文件,这两天实现了对hdfs的断点续传的下载,并经应用返回给客户端:
hdfs-->java代理服务-->php应用-->浏览器等客户端
要实现断点续传,读取文件时应该支持offset和length,支持seek方法,而实际上HDFS本身就支持指定偏移量读取文件:
long offset = 1024; FSDataInputStream in = fs.open(new Path(path)); in.seek(offset);
然后根据length读取相应的长度即可,仅供参考:
long readLength = 0; // 记录已读字节数 int buf_size = 4096; long length = 0; int n = 0; out = new BufferedOutputStream(response.getOutputStream());//HttpServletResponse while (readLength <= length - buf_size) {// 大部分字节在这里读取 n = in.read(buf, 0, buf_size); readLength += buf_size; out.write(buf, 0, n); } if (readLength <= length) { // 余下的不足 n = in.read(buf, 0, (int) (length - readLength)); out.write(buf, 0, n); }
下面是附件上JAVA写的类似webhdfs的服务片断:
(参考网上的)
/** * * @param appId * @param path * @param range * @param request * @param response */ @GET @Path("/download") @Produces(MediaType.APPLICATION_OCTET_STREAM) public void download(@QueryParam("app_id") String appId, @QueryParam("path") String path, @QueryParam("range") String range, @Context HttpServletRequest request, @Context HttpServletResponse response) { Map<String, Object> map = null; if (logger.isDebugEnabled()) { logger.debug("path: {}", path); } if (!checkAppId(appId)) { return; } if (null == path || "".equals(path)) { return; } // 查看文件信息 map = kdisk.info(appId, path); if ((Integer) map.get("code") != 0) { return; } // 获取下载的范围 if (null == range || "".equals(range)) { range = request.getHeader("Range"); } OutputStream out = null; FSDataInputStream in = null; try { Object objMap = map.get("data"); FileDesc file = (FileDesc) objMap; long fileLength = file.getSize(); // 记录文件大小 long pastLength = 0; // 记录已下载文件大小 int rangeSwitch = 0; long toLength = fileLength - 1; long contentLength = 0; // 客户端请求的字节总量 String rangeBytes = ""; // 记录客户端传来的形如“27000-”或者“27000-39000”的内容 int buf_size = 4096; byte buf[] = new byte[buf_size]; // 暂存容器 if (range != null) { // 客户端请求的下载的文件块的开始字节 rangeBytes = range.replaceAll("bytes=", ""); if (rangeBytes.indexOf('-') == rangeBytes.length() - 1) {// bytes=969998336- rangeSwitch = 1; rangeBytes = rangeBytes.substring(0, rangeBytes.indexOf('-')); pastLength = Long.parseLong(rangeBytes.trim()); contentLength = fileLength - pastLength; // 客户端请求的是969998336之后的字节 } else { // bytes=1275856879-1275877358 rangeSwitch = 2; int pos = rangeBytes.indexOf('-'); String temp0 = rangeBytes.substring(0, pos); String temp2 = rangeBytes.substring(pos + 1, rangeBytes.length()); pastLength = Long.parseLong(temp0.trim()); toLength = Long.parseLong(temp2.trim()); if (toLength > fileLength - 1) { toLength = fileLength - 1; } contentLength = toLength + 1 - pastLength ; } if (pastLength > fileLength) { return; } String contentRange = new StringBuffer("bytes ") .append(pastLength).append("-").append(toLength) .append("/").append(fileLength).toString(); response.setHeader("Content-Range", contentRange); response.setStatus(javax.servlet.http.HttpServletResponse.SC_PARTIAL_CONTENT); } else { contentLength = fileLength; } response.reset(); // 重置 response.setContentType("application/octet-stream"); response.addHeader("Content-Disposition", "attachment;filename=\"" + URLEncoder.encode(file.getName(), "utf-8") + "\""); response.addHeader("Content-Length", String.valueOf(contentLength)); // 从HDFS上获得文件流,直接写到输出文件流给用户下载。 out = new BufferedOutputStream(response.getOutputStream()); in = (FSDataInputStream) kdisk.getFileInputStream(appId, path, pastLength); if (in == null) { if (logger.isDebugEnabled()) { logger.debug("get donwload file fail."); } return; } int n = 0; switch (rangeSwitch) { case 0: case 1: IOUtils.copyBytes(in, out, buf_size); break; case 2: long readLength = 0; while (readLength <= contentLength - buf_size) { n = in.read(buf, 0, buf_size); readLength += buf_size; out.write(buf, 0, n); } if (readLength <= contentLength) { n = in.read(buf, 0, (int) (contentLength - readLength)); out.write(buf, 0, n); } break; default: break; } out.flush(); } catch (Exception e) { if (logger.isDebugEnabled()) { logger.debug(e.getMessage()); } return; } finally { if (out != null) { try { out.close(); } catch (IOException e) { if (logger.isDebugEnabled()) { logger.debug(e.getMessage()); } } } if (in != null) { try { in.close(); } catch (IOException e) { if (logger.isDebugEnabled()) { logger.debug(e.getMessage()); } } } } }
下面附上用PHP调用我们自己开发的类似webhdfs的服务片断,并返回给客户端如浏览器:
/** * 下载文件。 * 可以通过浏览器下载。 * 支持断点续传下载。通过设置头信息:$_SERVER ['HTTP_RANGE'] * * @param string $remote_path * @param string $file_name */ static function download_client($remote_path, $file_name) { $file_info = self::info ( $remote_path ); if (! $file_info) { return false; } // 输入文件标签 $speed = 20000; $chunk = 16384; $sleep = $speed ? floor ( ($chunk / ($speed * 1024)) * 500000 ) : 0; $sent = 0; ob_end_clean (); $file_mime = 'application/octet-stream'; header ( 'Cache-Control: max-age=86400' ); header ( 'Expires: ' . gmdate ( 'D, d M Y H:i:s \G\M\T', time () + 86400 ) ); header ( 'Content-Type: ' . $file_mime ); $size = $file_info ['size']; $offset = 0; $length = $size; $end_pos = $size - 1; $range = ''; $req = array (); if (isset ( $_SERVER ['HTTP_RANGE'] )) { $range = $_SERVER ['HTTP_RANGE']; $req ['range'] = $range; list ( $a, $range ) = explode ( "=", $_SERVER ['HTTP_RANGE'] ); //0-1023或者1024-,以0为开始的下标,0-1023表示要下载前面1024个字节。 $range = explode ( '-', $range ); if (isset ( $range [0] ) && $range [0]) { $offset = $range [0]; } if (isset ( $range [1] ) && $range [1]) { $end_pos = $range [1]; } //str_replace ( $range, "-", $range ); //$size2 = $size - 1; $length = ($end_pos + 1) - $offset; header ( "HTTP/1.1 206 Partial Content" ); header ( "Content-Length: $length" ); header ( "Content-Range: bytes $offset-$end_pos/$size" ); } else { //第一次连接 header ( "Content-Length: " . $size ); //输出总长 } //header ( "Content-Length: " . $file_info ['size'] ); header ( 'Content-Transfer-Encoding: binary' ); header ( 'Content-Encoding: none' ); $ua = isset ( $_SERVER ["HTTP_USER_AGENT"] ) ? $_SERVER ["HTTP_USER_AGENT"] : ''; $encoded_filename = urlencode ( $file_name ); $encoded_filename = str_replace ( "+", "%20", $encoded_filename ); if (preg_match ( "/MSIE/", $ua )) { header ( 'Content-Disposition: attachment; filename="' . $encoded_filename . '"' ); } else if (preg_match ( "/Firefox/", $ua )) { header ( 'Content-Disposition: attachment; filename*="utf8\'\'' . $file_name . '"' ); } else { header ( 'Content-Disposition: attachment; filename="' . $file_name . '"' ); } ob_start (); $remote_file = KDiskServer::open ( $remote_path, $req ); if (! $remote_file) { return false; } //输出文件内容 try { set_time_limit ( 0 ); while ( ! feof ( $remote_file ) && connection_status () == 0 ) { //reset time limit for big files echo (fread ( $remote_file, $chunk )); ob_flush (); flush (); } $info = stream_get_meta_data ( $remote_file ); fclose ( $remote_file ); if ($info ['timed_out']) { return false; } else { return true; } } catch ( Exception $e ) { fclose ( $remote_file ); return false; } }