Java读写HDFS文件
一、依赖包maven路径
二、针对HDFS文件的操作类HDFSOperate
分类:
IT文章
•
2022-06-10 21:09:35
- <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
- <dependency>
-
<groupId>org.apache.hadoop</groupId>
-
<artifactId>hadoop-client</artifactId>
-
<version>2.7.3</version>
-
<scope>runtime</scope>
- </dependency>
二、针对HDFS文件的操作类HDFSOperate
-
package com.hdfs.util;
-
-
import java.io.BufferedReader;
-
import java.io.File;
-
import java.io.FileOutputStream;
-
import java.io.IOException;
-
import java.io.InputStreamReader;
-
import java.io.PrintStream;
-
import java.net.URI;
-
-
import org.apache.hadoop.conf.Configuration;
-
import org.apache.hadoop.fs.FSDataInputStream;
-
import org.apache.hadoop.fs.FSDataOutputStream;
-
import org.apache.hadoop.fs.FileSystem;
-
import org.apache.hadoop.fs.Path;
-
import org.apache.hadoop.io.IOUtils;
-
- /**
- * 针对HDFS文件的操作类
- */
-
public class HDFSOperate {
-
- /**
- * 新增(创建)HDFS文件
- * @param hdfs
- */
-
public void createHDFS(String hdfs){
-
try {
-
Configuration conf = new Configuration();
-
conf.setBoolean("dfs.support.append", true);
-
conf.set("dfs.client.block.write.replace-datanode-on-failure.policy","NEVER");
-
conf.set("dfs.client.block.write.replace-datanode-on-failure.enable","true");
-
FileSystem fs = FileSystem.get(URI.create(hdfs), conf);
-
Path path = new Path(hdfs);
- //判断HDFS文件是否存在
-
if(fs.exists(path)){
- //System.out.println(hdfs + "已经存在!!!");
-
}else{
-
FSDataOutputStream hdfsOutStream = fs.create(new Path(hdfs));
-
hdfsOutStream.close();
- }
-
fs.close();
-
} catch (Exception e) {
- // TODO: handle exception
-
e.printStackTrace();
- }
- }
-
- /**
- * 在HDFS文件后面追加内容
- * @param hdfs
- * @param appendContent
- */
-
public void appendHDFS(String hdfs,String appendContent){
-
try {
-
Configuration conf = new Configuration();
-
conf.setBoolean("dfs.support.append", true);
-
conf.set("dfs.client.block.write.replace-datanode-on-failure.policy","NEVER");
-
conf.set("dfs.client.block.write.replace-datanode-on-failure.enable","true");
-
FileSystem fs = FileSystem.get(URI.create(hdfs), conf);
-
Path path = new Path(hdfs);
- //判断HDFS文件是否存在
-
if(fs.exists(path)){
- //System.out.println(hdfs + "已经存在!!!");
-
}else{
-
FSDataOutputStream hdfsOutStream = fs.create(new Path(hdfs));
-
hdfsOutStream.close();
- }
-
FSDataOutputStream hdfsOutStream = fs.append(new Path(hdfs));
-
byte [] str = appendContent.getBytes("UTF-8");//防止中文乱码
-
hdfsOutStream.write(str);
-
hdfsOutStream.close();
-
fs.close();
-
} catch (Exception e) {
- // TODO: handle exception
-
e.printStackTrace();
- }
- }
-
- /**
- * 修改HDFS文件内容 / 删除就是替换为空
- * @param hdfs : hdfs文件路径
- * @param sourceContent :要修改的hdfs文件内容
- * @param changeContent :需要修改成的文件内容
- */
-
public void change(String hdfs,String sourceContent,String changeContent){
-
try {
-
Configuration conf = new Configuration();
-
conf.setBoolean("dfs.support.append", true);
-
conf.set("dfs.client.block.write.replace-datanode-on-failure.policy","NEVER");
-
conf.set("dfs.client.block.write.replace-datanode-on-failure.enable","true");
-
-
FileSystem fs = FileSystem.get(URI.create(hdfs), conf);
-
Path path = new Path(hdfs);
- //判断HDFS文件是否存在
-
if(fs.exists(path)){
- //System.out.println(hdfs + "已经存在!!!");
-
FSDataInputStream in = fs.open(path);
-
BufferedReader bf=new BufferedReader(new InputStreamReader(in));//防止中文乱码
-
String totalString = "";
-
String line = null;
-
while ((line = bf.readLine()) != null) {
-
totalString += line;
- }
-
String changeString = totalString.replace(sourceContent, changeContent);
-
FSDataOutputStream hdfsOutStream = fs.create(new Path(hdfs));
-
byte [] str = changeString.getBytes("UTF-8");
-
hdfsOutStream.write(str);
-
hdfsOutStream.close();
-
}else{
- //System.out.println(hdfs + "不存在,无需操作!!!");
- }
-
fs.close();
-
} catch (Exception e) {
- // TODO: handle exception
-
e.printStackTrace();
- }
- }
-
- /**
- * 判断要追加的内容是否存在
- * @param hdfs
- * @param appendContent
- * @return
- */
-
public Boolean isContentExist(String hdfs,String appendContent){
-
try {
-
Configuration conf = new Configuration();
-
conf.setBoolean("dfs.support.append", true);
-
conf.set("dfs.client.block.write.replace-datanode-on-failure.policy","NEVER");
-
conf.set("dfs.client.block.write.replace-datanode-on-failure.enable","true");
-
-
FileSystem fs = FileSystem.get(URI.create(hdfs), conf);
-
Path path = new Path(hdfs);
- //判断HDFS文件是否存在
-
if(fs.exists(path)){
- //System.out.println(hdfs + "已经存在!!!");
-
FSDataInputStream in = fs.open(path);
-
BufferedReader bf=new BufferedReader(new InputStreamReader(in));//防止中文乱码
-
String totalString = "";
-
String line = null;
-
while ((line = bf.readLine()) != null) {
-
totalString += line;
- }
-
if(totalString.contains(appendContent)){
-
return true;
- }
-
}else{
- //System.out.println(hdfs + "不存在,无需操作!!!");
- }
-
fs.close();
-
} catch (Exception e) {
- // TODO: handle exception
-
e.printStackTrace();
- }
-
return false;
- }
-
-
public static void main(String[] args) throws IOException {
-
String hdfs = "hdfs://192.168.168.200:9000/test/tes.txt";
-
HDFSOperate hdfsOperate = new HDFSOperate();
-
hdfsOperate.createHDFS(hdfs);
-
hdfsOperate.appendHDFS(hdfs,"测试新增内容");
- //hdfsOperate.change(hdfs, "测试新增内容", "测试修改成功");
- }
- }