hadoop hdfs 断点续传-下载

hadoop hdfs 断点续传--下载

我们做了一个类似webhdfs的服务,通过rest api存储HDSF上的文件,这两天实现了对hdfs的断点续传的下载,并经应用返回给客户端:

hdfs-->java代理服务-->php应用-->浏览器等客户端


要实现断点续传,读取文件时应该支持offset和length,支持seek方法,而实际上HDFS本身就支持指定偏移量读取文件:

long offset = 1024;
FSDataInputStream in = fs.open(new Path(path));
in.seek(offset);

然后根据length读取相应的长度即可,仅供参考:

long readLength = 0; // 记录已读字节数
int buf_size = 4096;
long length = 0; 
int n = 0;
 out = new BufferedOutputStream(response.getOutputStream());//HttpServletResponse
while (readLength <= length - buf_size) {// 大部分字节在这里读取
	n = in.read(buf, 0, buf_size);
	readLength += buf_size;
	out.write(buf, 0, n);
}
if (readLength <= length) { // 余下的不足
	n = in.read(buf, 0, (int) (length - readLength));
	out.write(buf, 0, n);
}




下面是附件上JAVA写的类似webhdfs的服务片断:

(参考网上的)

  /**
     * 
     * @param appId
     * @param path
     * @param range
     * @param request
     * @param response
     */
    @GET
    @Path("/download")
    @Produces(MediaType.APPLICATION_OCTET_STREAM)
    public void download(@QueryParam("app_id") String appId,
	    @QueryParam("path") String path, @QueryParam("range") String range,
	    @Context HttpServletRequest request,
	    @Context HttpServletResponse response) {
	Map<String, Object> map = null;

	if (logger.isDebugEnabled()) {
	    logger.debug("path: {}", path);
	}
	if (!checkAppId(appId)) {
	    return;
	}

	if (null == path || "".equals(path)) {
	    return;
	}

	// 查看文件信息
	map = kdisk.info(appId, path);
	if ((Integer) map.get("code") != 0) {
	    return;
	}
	// 获取下载的范围
	if (null == range || "".equals(range)) {
	    range = request.getHeader("Range");
	}

	OutputStream out = null;
	FSDataInputStream in = null;

	try {
	    Object objMap = map.get("data");
	    FileDesc file = (FileDesc) objMap;

	    long fileLength = file.getSize(); // 记录文件大小
	    long pastLength = 0; // 记录已下载文件大小

	    int rangeSwitch = 0;
	    long toLength = fileLength - 1;
	    long contentLength = 0; // 客户端请求的字节总量
	    String rangeBytes = ""; // 记录客户端传来的形如“27000-”或者“27000-39000”的内容
	    int buf_size = 4096;
	    byte buf[] = new byte[buf_size]; // 暂存容器

	    if (range != null) { // 客户端请求的下载的文件块的开始字节
		rangeBytes = range.replaceAll("bytes=", "");
		if (rangeBytes.indexOf('-') == rangeBytes.length() - 1) {// bytes=969998336-
		    rangeSwitch = 1;
		    rangeBytes = rangeBytes.substring(0,
			    rangeBytes.indexOf('-'));
		    pastLength = Long.parseLong(rangeBytes.trim());
		    contentLength = fileLength - pastLength; // 客户端请求的是969998336之后的字节
		} else { // bytes=1275856879-1275877358
		    rangeSwitch = 2;
		    int pos = rangeBytes.indexOf('-');
		    String temp0 = rangeBytes.substring(0, pos);
		    String temp2 = rangeBytes.substring(pos + 1,
			    rangeBytes.length());
		    pastLength = Long.parseLong(temp0.trim()); 
		    toLength = Long.parseLong(temp2.trim()); 
							    
		    if (toLength  > fileLength - 1) {
			toLength = fileLength - 1;
		    }
		    contentLength = toLength + 1 - pastLength ; 

		}

		if (pastLength > fileLength) {
		    return;
		}		
		String contentRange = new StringBuffer("bytes ")
			.append(pastLength).append("-").append(toLength)
			.append("/").append(fileLength).toString();
		response.setHeader("Content-Range", contentRange);
		response.setStatus(javax.servlet.http.HttpServletResponse.SC_PARTIAL_CONTENT);
	    } else { 
		contentLength = fileLength; 
	    }

	    response.reset(); // 重置
	    response.setContentType("application/octet-stream");
	    response.addHeader("Content-Disposition", "attachment;filename=\""
		    + URLEncoder.encode(file.getName(), "utf-8") + "\"");
	    response.addHeader("Content-Length", String.valueOf(contentLength));

	    // 从HDFS上获得文件流,直接写到输出文件流给用户下载。
	    out = new BufferedOutputStream(response.getOutputStream());
	    in = (FSDataInputStream) kdisk.getFileInputStream(appId, path,
		    pastLength);

	    if (in == null) {
		if (logger.isDebugEnabled()) {
		    logger.debug("get donwload file fail.");
		}
		return;
	    }
	    int n = 0;
	    switch (rangeSwitch) {
	    case 0: 
	    case 1:
		IOUtils.copyBytes(in, out, buf_size);
		break;
	    case 2: 
		long readLength = 0;
		while (readLength <= contentLength - buf_size) {
		    n = in.read(buf, 0, buf_size);
		    readLength += buf_size;
		    out.write(buf, 0, n);
		}
		if (readLength <= contentLength) { 
		    n = in.read(buf, 0, (int) (contentLength - readLength));
		    out.write(buf, 0, n);
		}
		break;

	    default:
		break;

	    }
	    out.flush();

	} catch (Exception e) {
	    if (logger.isDebugEnabled()) {
		logger.debug(e.getMessage());
	    }
	    return;
	} finally {
	    if (out != null) {
		try {
		    out.close();
		} catch (IOException e) {
		    if (logger.isDebugEnabled()) {
			logger.debug(e.getMessage());
		    }
		}
	    }
	    if (in != null) {
		try {
		    in.close();
		} catch (IOException e) {
		    if (logger.isDebugEnabled()) {
			logger.debug(e.getMessage());
		    }
		}
	    }
	}

    }





下面附上用PHP调用我们自己开发的类似webhdfs的服务片断,并返回给客户端如浏览器:

        /**
	 * 下载文件。
	 * 可以通过浏览器下载。
	 * 支持断点续传下载。通过设置头信息:$_SERVER ['HTTP_RANGE']
	 * 
	 * @param string $remote_path
	 * @param string $file_name
	 */
	static function download_client($remote_path, $file_name) {
		$file_info = self::info ( $remote_path );
		
		if (! $file_info) {
			return false;
		}
		
		// 输入文件标签
		$speed = 20000;
		$chunk = 16384;
		$sleep = $speed ? floor ( ($chunk / ($speed * 1024)) * 500000 ) : 0;
		$sent = 0;
		ob_end_clean ();
		
		$file_mime = 'application/octet-stream';
		header ( 'Cache-Control: max-age=86400' );
		header ( 'Expires: ' . gmdate ( 'D, d M Y H:i:s \G\M\T', time () + 86400 ) );
		header ( 'Content-Type: ' . $file_mime );
		
		$size = $file_info ['size'];
		$offset = 0;
		$length = $size;
		$end_pos = $size - 1;
		$range = '';
		$req = array ();
		if (isset ( $_SERVER ['HTTP_RANGE'] )) {
			$range = $_SERVER ['HTTP_RANGE'];
			$req ['range'] = $range;
			list ( $a, $range ) = explode ( "=", $_SERVER ['HTTP_RANGE'] );
			//0-1023或者1024-,以0为开始的下标,0-1023表示要下载前面1024个字节。
			$range = explode ( '-', $range );
			if (isset ( $range [0] ) && $range [0]) {
				$offset = $range [0];
			}
			
			if (isset ( $range [1] ) && $range [1]) {
				$end_pos = $range [1];
			}
			
			//str_replace ( $range, "-", $range );
			//$size2 = $size - 1;
			$length = ($end_pos + 1) - $offset;
			header ( "HTTP/1.1 206 Partial Content" );
			header ( "Content-Length: $length" );
			header ( "Content-Range: bytes $offset-$end_pos/$size" );
		} else { //第一次连接 
			header ( "Content-Length: " . $size ); //输出总长 
		}
		
		//header ( "Content-Length: " . $file_info ['size'] );
		header ( 'Content-Transfer-Encoding: binary' );
		header ( 'Content-Encoding: none' );
		
		$ua = isset ( $_SERVER ["HTTP_USER_AGENT"] ) ? $_SERVER ["HTTP_USER_AGENT"] : '';
		
		$encoded_filename = urlencode ( $file_name );
		$encoded_filename = str_replace ( "+", "%20", $encoded_filename );
		
		if (preg_match ( "/MSIE/", $ua )) {
			header ( 'Content-Disposition: attachment; filename="' . $encoded_filename . '"' );
		} else if (preg_match ( "/Firefox/", $ua )) {
			header ( 'Content-Disposition: attachment; filename*="utf8\'\'' . $file_name . '"' );
		} else {
			header ( 'Content-Disposition: attachment; filename="' . $file_name . '"' );
		}
		
		ob_start ();
		$remote_file = KDiskServer::open ( $remote_path, $req );
		if (! $remote_file) {
			return false;
		}
		//输出文件内容
		try {
			set_time_limit ( 0 );
			while ( ! feof ( $remote_file ) && connection_status () == 0 ) {
				//reset time limit for big files
				echo (fread ( $remote_file, $chunk ));
				ob_flush ();
				flush ();
			}
			
			$info = stream_get_meta_data ( $remote_file );
			fclose ( $remote_file );
			if ($info ['timed_out']) {
				return false;
			} else {
				return true;
			}
		} catch ( Exception $e ) {
			fclose ( $remote_file );
			return false;
		}
	
	}