Http接口获取数据写入Hdfs

数据源类型:数组列表

[{field:value}, {field:value}, {field:value}, {field:value}]

1. 定义http数据源链接

package com.etl.datalink;

import java.util.Map;

public class LinkHttp {
    
        private String url;
        private Map<String,Object> params;
        
        public String getUrl() {
            return url;
        }
        public void setUrl(String url) {
            this.url = url;
        }
        public Map<String, Object> getParams() {
            return params;
        }
        public void setParams(Map<String, Object> params) {
            this.params = params;
        }

}

2. 定义hdfs链接配置

package com.etl.datalink;

import org.apache.hadoop.conf.Configuration;

public class LinkHdfs {
    
        private Configuration conf = new Configuration();
        private String fsName="fs.defaultFS";
        private String fsURI;
        
        
        public LinkHdfs(String fsName, String fsURI) {
                this.fsName = fsName;
                this.fsURI = fsURI;
                conf.set(this.fsName, this.fsURI);
        }
        
        public LinkHdfs(String fsURI) {
                this.fsURI = fsURI;
                conf.set(this.fsName, this.fsURI);
        }

        public String getFsName() {
            return fsName;
        }

        public void setFsName(String fsName) {
            this.fsName = fsName;
        }

        public String getFsURI() {
            return fsURI;
        }

        public void setFsURI(String fsURI) {
            this.fsURI = fsURI;
        }

        public Configuration getConf() {
            return conf;
        }

        public void setConf(Configuration conf) {
            this.conf = conf;
        }

     
}

3. 定义泛型类用于传送http的内容到hdfs

   这里存在一点小问题:由于json是数组列表,所以需要获取每条记录,然后加入换行符号 写入hdfs。这样在hive中查询才能获取到多个记录。否则会全部当作一条记录。

/**
 * 通用的http抽取数据到hdfs文件中
 * @author KingWang
 * @date    2018-10-15
 * @description
 */
public class Api2Hdfs{
    
        private static Logger log = Logger.getLogger(Api2Hdfs.class);
        
        public static <T> void run(String[] args, Class<T> clazz) {
            
            //http
            String url = args[0];
            String method = args[1];
            String startTime = args[2];
            String endTime = args[3];
        
            //hdfs
            String fsName = args[4];
            String fsURI = args[5];
            String targetFilePath = args[6];
            //http config
            Map<String,Object> params = new HashMap<String,Object>();

            //....省略部分参数 
            params.put("timestamp", System.currentTimeMillis()/1000L);
            params.put("start_time", startTime);
            params.put("end_time", endTime);
            
            LinkHttp http = new LinkHttp();
            http.setUrl(url);
            http.setParams(params);
            
            //hdfs config
            LinkHdfs hdfs = new LinkHdfs(fsName, fsURI);
            try {
                Api2Hdfs.process(http, hdfs, targetFilePath, clazz);
            } catch(Exception e) {
                e.printStackTrace();
            }
        }    
    
        private static  <T> void process(LinkHttp http,LinkHdfs hdfs, String hdfsFile, Class<T> clazz) throws Exception{
            
                if(null==http) {
                        log.error("请求参数http未设置");
                        throw new Exception("请求参数http未设置");
                } 
                if(null==hdfs) {
                        log.error("请求参数hdfs未设置");
                        throw new Exception("请求参数hdfs未设置");
                }
            
            //创建http请求
            String url = http.getUrl();
            Map<String,Object> params = http.getParams();
            OkHttpClient client = new OkHttpClient();

            //添加参数
            FormBody.Builder bodyParams=new FormBody.Builder();
            if(params!=null && params.size() > 0) {
                  Iterator<Map.Entry<String,Object>> it = params.entrySet().iterator();
                  while(it.hasNext()) {
                      Map.Entry<String, Object> entry = it.next();
                      bodyParams.add(entry.getKey(), entry.getValue().toString());
                  }
            }
            
            final Request request = new Request.Builder().url(url).post(bodyParams.build()).build();
            Call call = client.newCall(request);
            call.enqueue(new Callback() {
    
                //网络错误延迟处理
                @Override
                public void onFailure(Call call, IOException e) {
                    e.printStackTrace();
                    log.error(e.getMessage());
                }
    
                @Override
                public void onResponse(Call call, Response response) throws IOException {
                    FileSystem fs = null;
                    try {
                             
                            Path dstPath = new Path(hdfsFile);
                            fs = FileSystem.get(hdfs.getConf());
                            DateTimeFormatter df = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
                            if(response.isSuccessful()) {
                                
                                    //对后台返回的数据进行处理
                                    System.out.println(df.format(LocalDateTime.now()) +" response.code:" +response.code());
                                    if (200 == response.code()) {
                                             
                                            //注意:response.body().string()只能有效调用一次
                                            ResponseInfo info = JSONObject.parseObject(response.body().string(), ResponseInfo.class);
                                            
                                            //error不为空,则错误
                                            if(StringUtils.isNotBlank(info.getError())) {
                                                    log.error(info.getError());
                                            } else {
                                                
                                                String rspcode = info.getResult().getRsp();
                                                //写入hdfs
                                                if(rspcode.equalsIgnoreCase(ResultCode.SUCCESS.getCode())) {
                                                        System.out.println(info.getResult().getData());
                                                        if(info.getResult().getData().equals("[]")) {
                                                                System.out.println(df.format(LocalDateTime.now()) + " " + info.getResult().getMsg());        
                                                        } else {
                                                            List<T> objList = JSON.parseArray(info.getResult().getData(),clazz);
//                                                            byte[] bt = info.getResult().getData().getBytes();
                                                            FSDataOutputStream outputStream = fs.create(dstPath); 
                                                            int size = objList.size();
                                                            for(int i=0;i<size; i++) {
                                                                String orderstr = JSON.toJSONString(objList.get(i)) + '
';
                                                                System.out.println(orderstr);
                                                                outputStream.write(orderstr.getBytes());   
                                                                if(i % 1000==0) {
                                                                    outputStream.flush();
                                                                }
                                                            }
                                                            outputStream.flush();
                                                            outputStream.close();        
                                                            log.info("create file " + hdfsFile + " success!");    
                                                        }
                                                } else {
                                                        log.error(info.getResult().getMsg());
                                                }                                                    
                                            }
                                    } 
                                    //对后台返回200~300之间的错误进行处理
                                    else {
                                        log.error(response.message());
                                    }
                                    
                                    //fs.close();
                            }
                    }catch (Exception e){
                        e.printStackTrace();
                        log.error(e.getMessage());
                    }finally {
                        fs.close();
                        //关闭
                        if(response.body()!=null) {
                            response.body().close();
                        }
                    }
                    log.info("write hdfs file end: " + hdfsFile);
                }
            });
            
        }

}

4. 定义bean用于解析, 由于定义了泛型,可以针对不同到接口定义不同的bean。

   类似如下

Http接口获取数据写入Hdfs

5. 定义执行的每个接口主类:

public class MemberApi extends Api2Hdfs{
    
        public static void main(String[] args) {
            Api2Hdfs.run(args, Member.class);
        }
}
public class OrderApi extends Api2Hdfs{
    
        public static void main(String[] args) {
            Api2Hdfs.run(args, Order.class);
        }
}

6. 定义每个接口的shell脚本,执行即可。

java -Djava.ext.dirs=lib com.etl.MemberApi 
${url} ${method} ${startDate} ${endDate} ${fsName} ${fsURI} ${targetFilePath} ${salt} >> ./logs/${table}.log 2>&1 &
java -Djava.ext.dirs=lib com.etl.OrderApi 
${url} ${method} ${startDate} ${endDate} ${fsName} ${fsURI} ${targetFilePath} ${salt}  >> ./logs/${table}.log 2>&1 &