Hive基于UDF進(jìn)行文本分詞
本文轉(zhuǎn)載自微信公眾號(hào)「Java大數(shù)據(jù)與數(shù)據(jù)倉庫」,作者劉不二 。轉(zhuǎn)載本文請(qǐng)聯(lián)系Java大數(shù)據(jù)與數(shù)據(jù)倉庫公眾號(hào)。
UDF 簡(jiǎn)介
Hive作為一個(gè)sql查詢引擎,自帶了一些基本的函數(shù),比如count(計(jì)數(shù)),sum(求和),有時(shí)候這些基本函數(shù)滿足不了我們的需求,這時(shí)候就要寫hive hdf(user defined funation),又叫用戶自定義函數(shù)。編寫Hive UDF的步驟:
- 添加相關(guān)依賴,創(chuàng)建項(xiàng)目,這里我用的管理工具是maven,所以我創(chuàng)建的也是一個(gè)maven 項(xiàng)目(這個(gè)時(shí)候你需要選擇合適的依賴版本,主要是Hadoop 和 Hive,可以使用hadoop version和hive --version 來分別查看版本)
- 繼承org.apache.hadoop.hive.ql.exec.UDF類,實(shí)現(xiàn)evaluate方法,然后打包;
- 使用 add方法添加jar 包到分布式緩存,如果jar包是上傳到$HIVE_HOME/lib/目錄以下,就不需要執(zhí)行add命令了;
- 通過create temporary function創(chuàng)建臨時(shí)函數(shù),不加temporary就創(chuàng)建了一個(gè)永久函數(shù);
- 在SQL 中使用你創(chuàng)建的UDF;
UDF分詞
這個(gè)是一個(gè)比較常見的場(chǎng)景,例如公司的產(chǎn)品有每天都會(huì)產(chǎn)生大量的彈幕或者評(píng)論,這個(gè)時(shí)候我們可能會(huì)想去分析一下大家最關(guān)心的熱點(diǎn)話題是什么,或者是我們會(huì)分析最近一段時(shí)間的網(wǎng)絡(luò)趨勢(shì)是什么,但是這里有一個(gè)問題就是你的詞庫建設(shè)的問題,因?yàn)槟闶褂猛ㄓ玫脑~庫可能不能達(dá)到很好的分詞效果,尤其有很多網(wǎng)絡(luò)流行用語它是不在詞庫里的,還有一個(gè)就是停用詞的問題了,因?yàn)楹芏鄷r(shí)候停用詞是沒有意義的,所以這里我們需要將其過濾,而過濾的方式就是通過停用詞詞表進(jìn)行過濾。
這個(gè)時(shí)候我們的解決方案主要有兩種,一種是使用第三方提供的一些詞庫,還有一種是自建詞庫,然后有專人去維護(hù),這個(gè)也是比較常見的一種情況。
最后一個(gè)就是我們使用的分詞工具,因?yàn)槟壳爸髁鞯姆衷~器很多,選擇不同的分詞工具可能對(duì)我們的分詞結(jié)果有很多影響。
分詞工具
1:Elasticsearch的開源中文分詞器 IK Analysis(Star:2471)
IK中文分詞器在Elasticsearch上的使用。原生IK中文分詞是從文件系統(tǒng)中讀取詞典,es-ik本身可擴(kuò)展成從不同的源讀取詞典。目前提供從sqlite3數(shù)據(jù)庫中讀取。es-ik-plugin-sqlite3使用方法:1. 在elasticsearch.yml中設(shè)置你的sqlite3詞典的位置:ik_analysis_db_path: /opt/ik/dictionary.db
2:開源的java中文分詞庫 IKAnalyzer(Star:343)
IK Analyzer 是一個(gè)開源的,基于java語言開發(fā)的輕量級(jí)的中文分詞工具包。從2006年12月推出1.0版開始, IKAnalyzer已經(jīng)推出了4個(gè)大版本。最初,它是以開源項(xiàng)目Luence為應(yīng)用主體的,結(jié)合詞典分詞和文法分析算法的中文分詞組件。從3.0版本開始,IK發(fā)展為面向Java的公用分詞組件,獨(dú)立于Lucene項(xiàng)目
3:java開源中文分詞 Ansj(Star:3019)
Ansj中文分詞 這是一個(gè)ictclas的java實(shí)現(xiàn).基本上重寫了所有的數(shù)據(jù)結(jié)構(gòu)和算法.詞典是用的開源版的ictclas所提供的.并且進(jìn)行了部分的人工優(yōu)化 分詞速度達(dá)到每秒鐘大約200萬字左右,準(zhǔn)確率能達(dá)到96%以上。
目前實(shí)現(xiàn)了.中文分詞. 中文姓名識(shí)別 . 詞性標(biāo)注、用戶自定義詞典,關(guān)鍵字提取,自動(dòng)摘要,關(guān)鍵字標(biāo)記等功能。
可以應(yīng)用到自然語言處理等方面,適用于對(duì)分詞效果要求高的各種項(xiàng)目.
4:結(jié)巴分詞 ElasticSearch 插件(Star:188)
elasticsearch官方只提供smartcn這個(gè)中文分詞插件,效果不是很好,好在國(guó)內(nèi)有medcl大神(國(guó)內(nèi)最早研究es的人之一)寫的兩個(gè)中文分詞插件,一個(gè)是ik的,一個(gè)是mmseg的
5:Java分布式中文分詞組件 - word分詞(Star:672)
word分詞是一個(gè)Java實(shí)現(xiàn)的分布式的中文分詞組件,提供了多種基于詞典的分詞算法,并利用ngram模型來消除歧義。能準(zhǔn)確識(shí)別英文、數(shù)字,以及日期、時(shí)間等數(shù)量詞,能識(shí)別人名、地名、組織機(jī)構(gòu)名等未登錄詞
6:Java開源中文分詞器jcseg(Star:400)
Jcseg是什么?Jcseg是基于mmseg算法的一個(gè)輕量級(jí)開源中文分詞器,同時(shí)集成了關(guān)鍵字提取,關(guān)鍵短語提取,關(guān)鍵句子提取和文章自動(dòng)摘要等功能,并且提供了最新版本的lucene, solr, elasticsearch的分詞接口, Jcseg自帶了一個(gè) jcseg.properties文件…
7:中文分詞庫Paoding
庖丁中文分詞庫是一個(gè)使用Java開發(fā)的,可結(jié)合到Lucene應(yīng)用中的,為互聯(lián)網(wǎng)、企業(yè)內(nèi)部網(wǎng)使用的中文搜索引擎分詞組件。Paoding填補(bǔ)了國(guó)內(nèi)中文分詞方面開源組件的空白,致力于此并希翼成為互聯(lián)網(wǎng)網(wǎng)站首選的中文分詞開源組件。Paoding中文分詞追求分詞的高效率和用戶良好體驗(yàn)。
8:中文分詞器mmseg4j
mmseg4j 用 Chih-Hao Tsai 的 MMSeg 算法(http://technology.chtsai.org/mmseg/ )實(shí)現(xiàn)的中文分詞器,并實(shí)現(xiàn) lucene 的 analyzer 和 solr 的TokenizerFactory 以方便在Lucene和Solr中使…
9:中文分詞Ansj(Star:3015)
Ansj中文分詞 這是一個(gè)ictclas的java實(shí)現(xiàn).基本上重寫了所有的數(shù)據(jù)結(jié)構(gòu)和算法.詞典是用的開源版的ictclas所提供的.并且進(jìn)行了部分的人工優(yōu)化 內(nèi)存中中文分詞每秒鐘大約100萬字(速度上已經(jīng)超越ictclas) 文件讀取分詞每秒鐘大約30萬字 準(zhǔn)確率能達(dá)到96%以上 目前實(shí)現(xiàn)了….
10:Lucene中文分詞庫ICTCLAS4J
ictclas4j中文分詞系統(tǒng)是sinboy在中科院張華平和劉群老師的研制的FreeICTCLAS的基礎(chǔ)上完成的一個(gè)java開源分詞項(xiàng)目,簡(jiǎn)化了原分詞程序的復(fù)雜度,旨在為廣大的中文分詞愛好者一個(gè)更好的學(xué)習(xí)機(jī)會(huì)。
代碼實(shí)現(xiàn)
第一步:引入依賴
這里我們引入了兩個(gè)依賴,其實(shí)是兩個(gè)不同分詞工具
- <dependency>
- <groupId>org.ansj</groupId>
- <artifactId>ansj_seg</artifactId>
- <version>5.1.6</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>com.janeluo</groupId>
- <artifactId>ikanalyzer</artifactId>
- <version>2012_u6</version>
- </dependency>
在開始之前我們先寫一個(gè)demo 玩玩,讓大家有個(gè)基本的認(rèn)識(shí)
- @Test
- public void testAnsjSeg() {
- String str = "我叫李太白,我是一個(gè)詩人,我生活在唐朝" ;
- // 選擇使用哪種分詞器 BaseAnalysis ToAnalysis NlpAnalysis IndexAnalysis
- Result result = ToAnalysis.parse(str);
- System.out.println(result);
- KeyWordComputer kwc = new KeyWordComputer(5);
- Collection<Keyword> keywords = kwc.computeArticleTfidf(str);
- System.out.println(keywords);
- }
輸出結(jié)果
- 我/r,叫/v,李太白/nr,,/w,我/r,是/v,一個(gè)/m,詩人/n,,/w,我/r,生活/vn,在/p,唐朝/t
- [李太白/24.72276098504223, 詩人/3.0502185968368885, 唐朝/0.8965677022546215, 生活/0.6892230219652541]
[李太白/24.72276098504223, 詩人/3.0502185968368885, 唐朝/0.8965677022546215, 生活/0.6892230219652541]
第二步:引入停用詞詞庫
因?yàn)槭峭S迷~詞庫,本身也不是很大,所以我直接放在項(xiàng)目里了,當(dāng)然你也可以放在其他地方,例如HDFS 上
第三步:編寫UDF
代碼很簡(jiǎn)單我就不不做詳細(xì)解釋了,需要注意的是GenericUDF 里面的一些方法的使用規(guī)則,至于代碼設(shè)計(jì)的好壞以及還有什么改進(jìn)的方案我們后面再說,下面兩套實(shí)現(xiàn)的思路幾乎是一致的,不一樣的是在使用的分詞工具上的不一樣
ansj的實(shí)現(xiàn)
- /**
- * Chinese words segmentation with user-dict in com.kingcall.dic
- * use Ansj(a java open source analyzer)
- */
- // 這個(gè)信息就是你每次使用desc 進(jìn)行獲取函數(shù)信息的時(shí)候返回的
- @Description(name = "ansj_seg", value = "_FUNC_(str) - chinese words segment using ansj. Return list of words.",
- extended = "Example: select _FUNC_('我是測(cè)試字符串') from src limit 1;\n"
- + "[\"我\", \"是\", \"測(cè)試\", \"字符串\"]")
- public class AnsjSeg extends GenericUDF {
- private transient ObjectInspectorConverters.Converter[] converters;
- private static final String userDic = "/app/stopwords/com.kingcall.dic";
- //load userDic in hdfs
- static {
- try {
- FileSystem fs = FileSystem.get(new Configuration());
- FSDataInputStream in = fs.open(new Path(userDic));
- BufferedReader br = new BufferedReader(new InputStreamReader(in));
- String line = null;
- String[] strs = null;
- while ((line = br.readLine()) != null) {
- line = line.trim();
- if (line.length() > 0) {
- strs = line.split("\t");
- strs[0] = strs[0].toLowerCase();
- DicLibrary.insert(DicLibrary.DEFAULT, strs[0]); //ignore nature and freq
- }
- }
- MyStaticValue.isNameRecognition = Boolean.FALSE;
- MyStaticValue.isQuantifierRecognition = Boolean.TRUE;
- } catch (Exception e) {
- System.out.println("Error when load userDic" + e.getMessage());
- }
- }
- @Override
- public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
- if (arguments.length < 1 || arguments.length > 2) {
- throw new UDFArgumentLengthException(
- "The function AnsjSeg(str) takes 1 or 2 arguments.");
- }
- converters = new ObjectInspectorConverters.Converter[arguments.length];
- converters[0] = ObjectInspectorConverters.getConverter(arguments[0], PrimitiveObjectInspectorFactory.writableStringObjectInspector);
- if (2 == arguments.length) {
- converters[1] = ObjectInspectorConverters.getConverter(arguments[1], PrimitiveObjectInspectorFactory.writableIntObjectInspector);
- }
- return ObjectInspectorFactory.getStandardListObjectInspector(PrimitiveObjectInspectorFactory.writableStringObjectInspector);
- }
- @Override
- public Object evaluate(DeferredObject[] arguments) throws HiveException {
- boolean filterStop = false;
- if (arguments[0].get() == null) {
- return null;
- }
- if (2 == arguments.length) {
- IntWritable filterParam = (IntWritable) converters[1].convert(arguments[1].get());
- if (1 == filterParam.get()) filterStop = true;
- }
- Text s = (Text) converters[0].convert(arguments[0].get());
- ArrayList<Text> result = new ArrayList<>();
- if (filterStop) {
- for (Term words : DicAnalysis.parse(s.toString()).recognition(StopLibrary.get())) {
- if (words.getName().trim().length() > 0) {
- result.add(new Text(words.getName().trim()));
- }
- }
- } else {
- for (Term words : DicAnalysis.parse(s.toString())) {
- if (words.getName().trim().length() > 0) {
- result.add(new Text(words.getName().trim()));
- }
- }
- }
- return result;
- }
- @Override
- public String getDisplayString(String[] children) {
- return getStandardDisplayString("ansj_seg", children);
- }
- }
ikanalyzer的實(shí)現(xiàn)
- @Description(name = "ansj_seg", value = "_FUNC_(str) - chinese words segment using Iknalyzer. Return list of words.",
- extended = "Example: select _FUNC_('我是測(cè)試字符串') from src limit 1;\n"
- + "[\"我\", \"是\", \"測(cè)試\", \"字符串\"]")
- public class IknalyzerSeg extends GenericUDF {
- private transient ObjectInspectorConverters.Converter[] converters;
- //用來存放停用詞的集合
- Set<String> stopWordSet = new HashSet<String>();
- @Override
- public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
- if (arguments.length < 1 || arguments.length > 2) {
- throw new UDFArgumentLengthException(
- "The function AnsjSeg(str) takes 1 or 2 arguments.");
- }
- //讀入停用詞文件
- BufferedReader StopWordFileBr = null;
- try {
- StopWordFileBr = new BufferedReader(new InputStreamReader(new FileInputStream(new File("stopwords/baidu_stopwords.txt"))));
- //初如化停用詞集
- String stopWord = null;
- for(; (stopWord = StopWordFileBr.readLine()) != null;){
- stopWordSet.add(stopWord);
- }
- } catch (FileNotFoundException e) {
- e.printStackTrace();
- } catch (IOException e) {
- e.printStackTrace();
- }
- converters = new ObjectInspectorConverters.Converter[arguments.length];
- converters[0] = ObjectInspectorConverters.getConverter(arguments[0], PrimitiveObjectInspectorFactory.writableStringObjectInspector);
- if (2 == arguments.length) {
- converters[1] = ObjectInspectorConverters.getConverter(arguments[1], PrimitiveObjectInspectorFactory.writableIntObjectInspector);
- }
- return ObjectInspectorFactory.getStandardListObjectInspector(PrimitiveObjectInspectorFactory.writableStringObjectInspector);
- }
- @Override
- public Object evaluate(DeferredObject[] arguments) throws HiveException {
- boolean filterStop = false;
- if (arguments[0].get() == null) {
- return null;
- }
- if (2 == arguments.length) {
- IntWritable filterParam = (IntWritable) converters[1].convert(arguments[1].get());
- if (1 == filterParam.get()) filterStop = true;
- }
- Text s = (Text) converters[0].convert(arguments[0].get());
- StringReader reader = new StringReader(s.toString());
- IKSegmenter iks = new IKSegmenter(reader, true);
- List<Text> list = new ArrayList<>();
- if (filterStop) {
- try {
- Lexeme lexeme;
- while ((lexeme = iks.next()) != null) {
- if (!stopWordSet.contains(lexeme.getLexemeText())) {
- list.add(new Text(lexeme.getLexemeText()));
- }
- }
- } catch (IOException e) {
- }
- } else {
- try {
- Lexeme lexeme;
- while ((lexeme = iks.next()) != null) {
- list.add(new Text(lexeme.getLexemeText()));
- }
- } catch (IOException e) {
- }
- }
- return list;
- }
- @Override
- public String getDisplayString(String[] children) {
- return "Usage: evaluate(String str)";
- }
- }
第四步:編寫測(cè)試用例
GenericUDF 給我們提供了一些方法,這些方法可以用來構(gòu)建測(cè)試需要的環(huán)境和參數(shù),這樣我們就可以測(cè)試這些代碼了
- @Test
- public void testAnsjSegFunc() throws HiveException {
- AnsjSeg udf = new AnsjSeg();
- ObjectInspector valueOI0 = PrimitiveObjectInspectorFactory.javaStringObjectInspector;
- ObjectInspector valueOI1 = PrimitiveObjectInspectorFactory.javaIntObjectInspector;
- ObjectInspector[] init_args = {valueOI0, valueOI1};
- udf.initialize(init_args);
- Text str = new Text("我是測(cè)試字符串");
- GenericUDF.DeferredObject valueObj0 = new GenericUDF.DeferredJavaObject(str);
- GenericUDF.DeferredObject valueObj1 = new GenericUDF.DeferredJavaObject(0);
- GenericUDF.DeferredObject[] args = {valueObj0, valueObj1};
- ArrayList<Object> res = (ArrayList<Object>) udf.evaluate(args);
- System.out.println(res);
- }
- @Test
- public void testIkSegFunc() throws HiveException {
- IknalyzerSeg udf = new IknalyzerSeg();
- ObjectInspector valueOI0 = PrimitiveObjectInspectorFactory.javaStringObjectInspector;
- ObjectInspector valueOI1 = PrimitiveObjectInspectorFactory.javaIntObjectInspector;
- ObjectInspector[] init_args = {valueOI0, valueOI1};
- udf.initialize(init_args);
- Text str = new Text("我是測(cè)試字符串");
- GenericUDF.DeferredObject valueObj0 = new GenericUDF.DeferredJavaObject(str);
- GenericUDF.DeferredObject valueObj1 = new GenericUDF.DeferredJavaObject(0);
- GenericUDF.DeferredObject[] args = {valueObj0, valueObj1};
- ArrayList<Object> res = (ArrayList<Object>) udf.evaluate(args);
- System.out.println(res);
- }
我們看到加載停用詞沒有找到,但是整體還是跑起來了,因?yàn)樽x取不到HDFS 上的文件
但是我們第二個(gè)樣例是不需要從HDFS 上加載停用詞信息,所以可以完美的測(cè)試運(yùn)行
注 后來為了能在外部更新文件,我將其放在了HDFS 上,和AnsjSeg 中的代碼一樣
第五步:創(chuàng)建UDF 并使用
- add jar /Users/liuwenqiang/workspace/code/idea/HiveUDF/target/HiveUDF-0.0.4.jar;
- create temporary function ansjSeg as 'com.kingcall.bigdata.HiveUDF.AnsjSeg';
- select ansjSeg("我是字符串,你是啥");
- -- 開啟停用詞過濾
- select ansjSeg("我是字符串,你是啥",1);
- create temporary function ikSeg as 'com.kingcall.bigdata.HiveUDF.IknalyzerSeg';
- select ikSeg("我是字符串,你是啥");
- select ikSeg("我是字符串,你是啥",1);
上面方法的第二個(gè)參數(shù),就是是否開啟停用詞過濾,我們使用ikSeg函數(shù)演示一下
下面我們嘗試獲取一下函數(shù)的描述信息
如果沒有寫的話,就是下面的這樣的
其它應(yīng)用場(chǎng)景
通過編寫Hive UDF可以輕松幫我們實(shí)現(xiàn)大量常見需求,其它應(yīng)該場(chǎng)景還有:
- ip地址轉(zhuǎn)地區(qū):將上報(bào)的用戶日志中的ip字段轉(zhuǎn)化為國(guó)家-省-市格式,便于做地域分布統(tǒng)計(jì)分析;
- 使用Hive SQL計(jì)算的標(biāo)簽數(shù)據(jù),不想編寫Spark程序,可以通過UDF在靜態(tài)代碼塊中初始化連接池,利用Hive啟動(dòng)的并行MR任務(wù),并行快速導(dǎo)入大量數(shù)據(jù)到codis中,應(yīng)用于一些推薦業(yè)務(wù);
- 還有其它sql實(shí)現(xiàn)相對(duì)復(fù)雜的任務(wù),都可以編寫永久Hive UDF進(jìn)行轉(zhuǎn)化;
總結(jié)
這一節(jié)我們學(xué)習(xí)了一個(gè)比較常見的UDF,通過實(shí)現(xiàn)GenericUDF 抽象類來實(shí)現(xiàn),這一節(jié)的重點(diǎn)在于代碼的實(shí)現(xiàn)以及對(duì)GenericUDF類中方法的理解
上面的代碼實(shí)現(xiàn)上有一個(gè)問題,那就是關(guān)于停用詞的加載,就是我們能不能動(dòng)態(tài)加載停用詞呢?