自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

半小時(shí),將你的Spark SQL模型變?yōu)樵诰€服務(wù)

大數(shù)據(jù) Spark
第四范式已經(jīng)在很多行業(yè)落地了上萬個(gè)AI應(yīng)用,比如在金融行業(yè)的反欺詐,媒體行業(yè)的新聞推薦,能源行業(yè)管道檢測,而SparkSQL在這些AI應(yīng)用中快速實(shí)現(xiàn)特征變換發(fā)揮著重要的作用

SparkSQL在機(jī)器學(xué)習(xí)場景中應(yīng)用

第四范式已經(jīng)在很多行業(yè)落地了上萬個(gè)AI應(yīng)用,比如在金融行業(yè)的反欺詐,媒體行業(yè)的新聞推薦,能源行業(yè)管道檢測,而SparkSQL在這些AI應(yīng)用中快速實(shí)現(xiàn)特征變換發(fā)揮著重要的作用

 

半小時(shí),將你的Spark SQL模型變?yōu)樵诰€服務(wù)

SparkSQL在特征變換主要有一下幾類

  1. 多表場景,用于表之間拼接操作,比如交易信息表去拼接賬戶表
  2. 使用udf進(jìn)行簡單的特征變換,比如對時(shí)間戳進(jìn)行hour函數(shù)處理
  3. 使用時(shí)間窗口和udaf進(jìn)行時(shí)序類特征處理,比如計(jì)算一個(gè)人最近1天的消費(fèi)金額總和

SparkSQL到目前為止,解決很好的解決離線模型訓(xùn)練特征變換問題,但是隨著AI應(yīng)用的發(fā)展,大家對模型的期望不再只是得出離線調(diào)研效果,而是在真實(shí)的業(yè)務(wù)場景發(fā)揮出價(jià)值,而真實(shí)的業(yè)務(wù)場景是模型應(yīng)用場景,它需要高性能,需要實(shí)時(shí)推理,這時(shí)候我們就會(huì)遇到以下問題

  1. 多表數(shù)據(jù)離線到在線怎么映射,即批量訓(xùn)練過程中輸入很多表,到在線環(huán)境這些表該以什么形式存在,這點(diǎn)也會(huì)影響整個(gè)系統(tǒng)架構(gòu),做得好能夠提升效率,做得不好就會(huì)大大增加模型產(chǎn)生業(yè)務(wù)價(jià)值的成本
  2. SQL轉(zhuǎn)換成實(shí)時(shí)執(zhí)行成本高,因?yàn)樵诰€推理需要高性能,而數(shù)據(jù)科學(xué)家可能做出成千上萬個(gè)特征,每個(gè)特征都人肉轉(zhuǎn)換,會(huì)大大增加的工程成本
  3. 離線特征和在線特征保持一致困難,手動(dòng)轉(zhuǎn)換就會(huì)導(dǎo)致一致性能,而且往往很難一致
  4. 離線效果很棒但是在線效果無法滿足業(yè)務(wù)需求

在具體的反欺詐場景,模型應(yīng)用要求tp99 20ms去檢測一筆交易是否是欺詐,所以對模型應(yīng)用性能要求非常高

第四范式特征工程數(shù)據(jù)庫是如何解決這些問題

 

半小時(shí),將你的Spark SQL模型變?yōu)樵诰€服務(wù)

通過特征工程數(shù)據(jù)庫讓SparkSQL的能力得到了補(bǔ)充

  1. 以數(shù)據(jù)庫的形式,解決了離線表到在線的映射問題,我們對前面給出的答案就是離線表是怎么分布的,在線也就怎么分布
  2. 通過同一套代碼去執(zhí)行離線和在線特征轉(zhuǎn)換,讓在線模型效果得到了保證
  3. 數(shù)據(jù)科學(xué)家與業(yè)務(wù)開發(fā)團(tuán)隊(duì)的合作以sql為傳遞介質(zhì),而不再是手工去轉(zhuǎn)換代碼,大大提升模型迭代效率
  4. 通過llvm加速的sql,相比scala實(shí)現(xiàn)的spark2.x和3.x在時(shí)序復(fù)雜特征場景能夠加速2~3倍,在線通過in-memory的存儲(chǔ),能夠保證sql能夠在非常低延遲返回結(jié)果

快速將spark sql 模型變成實(shí)時(shí)服務(wù)demo

demo的模型訓(xùn)練場景為預(yù)測一次打車行程到結(jié)束所需要的時(shí)間,這里我們將使用fedb ,pyspark,lightgbm等工具最終搭建一個(gè)http 模型推理服務(wù),這也會(huì)是spark在機(jī)器學(xué)習(xí)場景的實(shí)踐

 

半小時(shí),將你的Spark SQL模型變?yōu)樵诰€服務(wù)

整個(gè)demo200多行代碼,制作時(shí)間不超過半個(gè)小時(shí)

  • train_sql.py 特征計(jì)算與訓(xùn)練, 80行代碼
  • predict_server.py 模型推理http服務(wù), 129行代碼

場景數(shù)據(jù)和特征介紹

整個(gè)訓(xùn)練數(shù)據(jù)如下樣子

樣例數(shù)據(jù)

 

  1. id,vendor_id,pickup_datetime,dropoff_datetime,passenger_count,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,store_and_fwd_flag,trip_duration 
  2.  id3097625,1,2016-01-22 16:01:00,2016-01-22 16:15:16,2,-73.97746276855469,40.7613525390625,-73.95573425292969,40.772396087646484,N,856  
  3. id3196697,1,2016-01-28 07:20:18,2016-01-28 07:40:16,1,-73.98524475097656,40.75959777832031,-73.99615478515625,40.72945785522461,N,1198  
  4. id0224515,2,2016-01-31 00:48:27,2016-01-31 00:53:30,1,-73.98342895507812,40.7500114440918,-73.97383880615234,40.74980163574219,N,303  
  5. id3370903,1,2016-01-14 11:46:43,2016-01-14 12:25:33,2,-74.00027465820312,40.74786376953125,-73.86485290527344,40.77039337158203,N,2330  
  6. id2763851,2,2016-02-20 13:21:00,2016-02-20 13:45:56,1,-73.95218658447266,40.772220611572266,-73.9920425415039,40.74932098388672,N,1496  
  7. id0904926,1,2016-02-20 19:17:44,2016-02-20 19:33:19,4,-73.97344207763672,40.75189971923828,-73.98480224609375,40.76243209838867,N,935  
  8. id2026293,1,2016-02-25 01:16:23,2016-02-25 01:31:27,1,-73.9871597290039,40.68777847290039,-73.9115219116211,40.68180847167969,N,904  
  9. id1349988,1,2016-01-28 20:16:05,2016-01-28 20:21:36,1,-74.0028076171875,40.7338752746582,-73.9968032836914,40.743770599365234,N,331  
  10. id3218692,2,2016-02-17 16:43:27,2016-02-17 16:54:41,5,-73.98147583007812,40.77408218383789,-73.97216796875,40.76400375366211,N,674 ` 

場景特征變換sql腳本

特征變換

 

  1. select trip_duration, passenger_count,  
  2. sum `(pickup_latitude) over w as vendor_sum_pl,`  
  3. max `(pickup_latitude) over w as vendor_max_pl,`  
  4. min `(pickup_latitude) over w as vendor_min_pl,`  
  5. avg `(pickup_latitude) over w as vendor_avg_pl,`  
  6. sum `(pickup_latitude) over w2 as pc_sum_pl,`  
  7. max `(pickup_latitude) over w2 as pc_max_pl,`  
  8. min `(pickup_latitude) over w2 as pc_min_pl,`  
  9. avg `(pickup_latitude) over w2 as pc_avg_pl ,`  
  10. count `(vendor_id) over w2 as pc_cnt,`  
  11. count `(vendor_id) over w as vendor_cnt`  
  12. from {}  
  13. window w as (partition by vendor_id order by pickup_datetime ROWS_RANGE BETWEEN 1d PRECEDING AND CURRENT ROW),  
  14. w2 as (partition by passenger_count order by pickup_datetime ROWS_RANGE BETWEEN 1d PRECEDING AND CURRENT ROW) ` 

我們選擇了vendor_id 和 passenger_count 兩個(gè)緯度做時(shí)序特征

 

  1. train_df = spark.sql(train_sql)  
  2. # specify your configurations as a dict  
  3. params = {  
  4. 'boosting_type' `: 'gbdt' ,  
  5. 'objective' `: 'regression' ,  
  6. 'metric' `: { 'l2' , 'l1' },  
  7. 'num_leaves' `: 31 ,  
  8. 'learning_rate' `: 0.05 ,  
  9. 'feature_fraction' `: 0.9 ,  
  10. 'bagging_fraction' `: 0.8 ,  
  11. 'bagging_freq' `: 5 ,  
  12. 'verbose' `: 0`  
  13.  
  14. print `( 'Starting training...' )`  
  15. gbm = lgb.train(params,  
  16. lgb_train,  
  17. num_boost_round `= 20 ,`  
  18. valid_sets `= lgb_eval,  
  19. early_stopping_rounds `= 5 )`  
  20. gbm.save_model( `'model.txt' )執(zhí)行模型訓(xùn)練過程,最終產(chǎn)生model.txt 

模型推理過程

導(dǎo)入數(shù)據(jù)代碼

 

  1. import  
  2. def insert_row(line):  
  3. row = line.split( `',' )  
  4. row[ `2 ]` `=` `'%dl' % int (datetime.datetime.strptime(row[ 2 ], '%Y-%m-%d %H:%M:%S' ).timestamp()` `*` `1000 )`  
  5. row[ `3 ]` `=` `'%dl' % int (datetime.datetime.strptime(row[ 3 ], '%Y-%m-%d %H:%M:%S' ).timestamp()` `*` `1000 )`  
  6. insert = "insert into t1 values('%s', %s, %s, %s, %s, %s, %s, %s, %s, '%s', %s);" `% tuple (row)  
  7. driver.executeInsert( `'db_test' , insert 
  8. with open `( 'data/taxi_tour_table_train_simple.csv' , 'r' ) as fd:  
  9. idx = 0  
  10. for line in fd:  
  11. if idx = `= 0 :  
  12. idx = idx + 1  
  13. continue  
  14. insert_row(line.replace( `'n' , ''))  
  15. idx = idx + 1 `  
  16. 注:train.csv為訓(xùn)練數(shù)據(jù)csv格式版本 

模型推理邏輯

 

  1. predict.py  
  2. def` `post( self ):  
  3. row = json.loads( `self .request.body)  
  4. ok, req = fedb_driver.getRequestBuilder( `'db_test' , sql)  
  5. if not ok or not req:  
  6. self `.write( "fail to get req" )`  
  7. return  
  8. input_schema = req.GetSchema()  
  9. if not input_schema:  
  10. self `.write( "no schema found" )`  
  11. return  
  12. str_length = 0  
  13. for i in range `(input_schema.GetColumnCnt()):`  
  14. if sql_router_sdk.DataTypeName(input_schema.GetColumnType(i)) = `= 'string' :  
  15. str_length = str_length + len `(row.get(input_schema.GetColumnName(i), ''))`  
  16. req.Init(str_length)  
  17. for i in range `(input_schema.GetColumnCnt()):`  
  18. tname = sql_router_sdk.DataTypeName(input_schema.GetColumnType(i))  
  19. if tname = `= 'string' :  
  20. req.AppendString(row.get(input_schema.GetColumnName(i), ''))  
  21. elif tname = `= 'int32' :  
  22. req.AppendInt32( `int (row.get(input_schema.GetColumnName(i),` `0 )))`  
  23. elif tname = `= 'double' :  
  24. req.AppendDouble( `float (row.get(input_schema.GetColumnName(i),` `0 )))`  
  25. elif tname = `= 'timestamp' :  
  26. req.AppendTimestamp( `int (row.get(input_schema.GetColumnName(i),` `0 )))`  
  27. else `:`  
  28. req.AppendNULL()  
  29. if not req.Build():  
  30. self `.write( "fail to build request" )`  
  31. return  
  32. ok, rs = fedb_driver.executeQuery( `'db_test' , sql, req)  
  33. if not ok:  
  34. self `.write( "fail to execute sql" )`  
  35. return  
  36. rs. `Next ()  
  37. ins = build_feature(rs)  
  38. self `.write( "----------------ins---------------\n" )`  
  39. self `.write( str (ins) + "n" )  
  40. duration = bst.predict(ins)  
  41. self `.write( "---------------predict trip_duration -------------\n" )`  
  42. self `.write( "%s s" % str (duration[ 0 ]))`` 

最終執(zhí)行效果

 

  1. python3 predict.py 
  2.  
  3. ----------------ins--------------- 
  4.  
  5. [[ 2. 40.774097 40.774097 40.774097 40.774097 40.774097 40.774097 
  6.  
  7. 40.774097 40.774097 1. 1. ]] 
  8.  
  9. ---------------predict trip_duration ------------- 
  10.  
  11. 859.3298781277192 s ` 

運(yùn)行demo請到 https://github.com/4paradigm/SparkSQLWithFeDB

責(zé)任編輯:未麗燕 來源: 今日頭條
相關(guān)推薦

2013-04-01 10:04:46

2024-11-14 23:24:55

Shell腳本系統(tǒng)

2013-03-21 10:03:04

Perl

2019-10-22 18:00:00

MySQL基礎(chǔ)入門數(shù)據(jù)庫

2016-11-24 16:50:19

數(shù)據(jù)庫數(shù)據(jù)庫架構(gòu)1小時(shí)延時(shí)從

2015-09-29 17:42:42

云聯(lián)

2015-10-28 08:54:11

云聯(lián)

2014-01-24 13:39:28

IT半小時(shí)

2014-01-14 15:32:59

IT半小時(shí)

2009-10-30 09:48:56

2013-08-02 16:17:18

IT半小時(shí)

2023-02-21 10:04:32

MySQL機(jī)房

2013-07-15 15:26:14

IT半小時(shí)

2012-11-23 17:20:43

Linux服務(wù)器

2013-07-02 17:36:49

IT半小時(shí)

2013-07-05 16:17:41

IT半小時(shí)

2013-07-02 17:41:59

IT半小時(shí)

2023-03-12 09:22:58

2014-01-14 14:27:46

IT半小時(shí)

2014-01-24 13:34:57

IT半小時(shí)
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)