使用apache-nifi从csv中的json对象获取数据
问题描述:
我的csv包含
date,name,department
2020-2-4,sachith,{dep_name:computer,location:2323,3434}
2020-2-5,nalaka,{dep_name:engineering,location:3343,5454}
最终的csv应该类似于:
final csv should be like :
date,name,dep_name,lat,lot
2020-2-4,sachith,computer,2323,3434
2020-2-5,nalaka,engineering,3343,5454
此处 lat,lot
来自 location:3343,5454
数据.
我尝试为此使用 UpdateRecord
处理器.其中有一些 $ {field.value:join(','):substringAfter('dep_name:')}
I have tried to use UpdateRecord
processor for this. In it has some ${field.value:join(','):substringAfter('dep_name:')}
但是它不起作用.如何使用apache-nifi完成此操作?
But its not working. How can I complete this using apache-nifi?
答
普通groovy来测试groovyConsole中的脚本:
plain groovy to test script in groovyConsole:
import groovy.json.*
def parser = new JsonSlurper().setType(JsonParserType.LAX) //LAX to accept strings without double-quotes
def w = System.out
new StringReader('''date,name,department
2020-2-4,sachith,{"dep_name":"computer","location":"2323,3434"}
2020-2-5,nalaka,{"dep_name":"engineering","location":"3343,5454"}''').withReader{r->
r.eachLine{line, lineNum->
if(lineNum==1){
w<<line<<',lon,lat'<<'\n'
}else{
def row=line.split(',') //split line by coma
def json=row[2..-1].join(',') //join back to string starting from 3rd element
json = parser.parseText(json)
w<<"${row[0]},${row[1]},${json.dep_name},${json.location}"<<'\n'
}
}
}
现在为nifi ExecuteGroovyScript处理器修改了相同的脚本:
now the same script modified for nifi ExecuteGroovyScript processor:
import groovy.json.*
def ff=session.get()
if(!ff)return
def parser = new JsonSlurper().setType(JsonParserType.LAX)
ff.write{streamIn,streamOut->
streamIn.withReader('UTF-8'){r-> //convert in stream to reader
streamOut.withWriter('UTF-8'){w-> //convert out stream to writer
//go line by line
r.eachLine{line, lineNum->
if(lineNum==1){
w<<line<<',lon,lat'<<'\n' //for the first line just add some headers
}else{
def row=line.split(',') //split line by coma
def json=row[2..-1].join(',') //join back to string starting from 3rd element
json = parser.parseText(json)
w<<"${row[0]},${row[1]},${json.dep_name},${json.location}"<<'\n'
}
}
}
}
}
REL_SUCCESS<<ff