CDAP Source插件,用于从Sftp服务器读取数据

问题描述:

我想通过使用cdap源插件读取Sftp服务器可用的csv文件.

I want to read a csv file that is available to Sftp server by using a cdap source plugin.

我遇到了 FTP Batch Source 插件执行相同的操作.但是运行此程序时,我的状态低于异常.

I came across FTP Batch Source plugin that does the same. But when running this i am getting below exception.

Caused by: java.io.IOException: No FileSystem for scheme: sftp
    at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2798) ~[org.apache.hadoop.hadoop-common-2.8.0.jar:na]
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2809) ~[org.apache.hadoop.hadoop-common-2.8.0.jar:na]
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:100) ~[org.apache.hadoop.hadoop-common-2.8.0.jar:na]
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2848) ~[org.apache.hadoop.hadoop-common-2.8.0.jar:na]
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2830) ~[org.apache.hadoop.hadoop-common-2.8.0.jar:na]
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:389) ~[org.apache.hadoop.hadoop-common-2.8.0.jar:na]
    at co.cask.hydrator.format.plugin.AbstractFileSource.prepareRun(AbstractFileSource.java:129) ~[na:na]
    at co.cask.hydrator.format.plugin.AbstractFileSource.prepareRun(AbstractFileSource.java:63) ~[na:na]
    at co.cask.cdap.etl.common.plugin.WrappedBatchSource$1.call(WrappedBatchSource.java:53) ~[na:na]
    at co.cask.cdap.etl.common.plugin.WrappedBatchSource$1.call(WrappedBatchSource.java:50) ~[na:na]
    at co.cask.cdap.etl.common.plugin.Caller$1.call(Caller.java:30) ~[na:na]
    at co.cask.cdap.etl.common.plugin.StageLoggingCaller.call(StageLoggingCaller.java:40) ~[na:na]
    at co.cask.cdap.etl.common.plugin.WrappedBatchSource.prepareRun(WrappedBatchSource.java:50) ~[na:na]
    at co.cask.cdap.etl.common.plugin.WrappedBatchSource.prepareRun(WrappedBatchSource.java:36) ~[na:na]
    at co.cask.cdap.etl.common.plugin.WrappedBatchSource$1.call(WrappedBatchSource.java:53) ~[na:na]
    at co.cask.cdap.etl.common.plugin.WrappedBatchSource$1.call(WrappedBatchSource.java:50) ~[na:na]
    at co.cask.cdap.etl.common.plugin.Caller$1.call(Caller.java:30) ~[na:na]
    at co.cask.cdap.etl.common.plugin.StageLoggingCaller.call(StageLoggingCaller.java:40) ~[na:na]
    at co.cask.cdap.etl.common.plugin.WrappedBatchSource.prepareRun(WrappedBatchSource.java:50) ~[na:na]
    at co.cask.cdap.etl.common.plugin.WrappedBatchSource.prepareRun(WrappedBatchSource.java:36) ~[na:na]
    at co.cask.cdap.etl.common.submit.SubmitterPlugin$3.run(SubmitterPlugin.java:83) ~[na:na]
    at co.cask.cdap.internal.app.runtime.AbstractContext$2.run(AbstractContext.java:534) ~[na:na]
    at co.cask.cdap.data2.transaction.Transactions$CacheBasedTransactional.finishExecute(Transactions.java:224) ~[na:na]
    ... 18 common frames omitted

我正在使用以下版本的库,这也是一个限制.

I am using below version of libraries which is also a ristriction.

  1. Hadoop - 2.7.3
  2. Spark - 2.3.0
  1. Hadoop - 2.7.3
  2. Spark - 2.3.0

我还遇到了此问题,该问题建议使用

I also came across this question which suggest using this and setting proeprty fs.sftp.impl to org.apache.hadoop.fs.sftp.SFTPFileSystem will solve the issue but not sure how use above code and set this proeprty.

使用SFTP作为协议时,您需要在高级"部分下设置文件系统属性:

You need to set a file system properties under the Advanced section when using SFTP as the protocol:

{
  "fs.sftp.impl": "org.apache.hadoop.fs.sftp.SFTPFileSystem"
}