聊聊Python用rpc實(shí)現(xiàn)分布式系統(tǒng)調(diào)用的那些事
通俗的講rpc是什么?
rpc 一般俗稱,遠(yuǎn)程過程調(diào)用,把本地的函數(shù),放到遠(yuǎn)端去調(diào)用。
通常我們調(diào)用一個(gè)方法,譬如: sumadd(10, 20),sumadd方法的具體實(shí)現(xiàn)要么是用戶自己定義,要么存在于該語言的庫函數(shù)中,也就說在sumadd方法的代碼實(shí)現(xiàn)在本地,它是一個(gè)本地調(diào)用!
“遠(yuǎn)程調(diào)用”意思就是:被調(diào)用方法的具體實(shí)現(xiàn)不在程序運(yùn)行本地,而是在別的某個(gè)地方(分布到各個(gè)服務(wù)器),但是用起來像是在本地。
rpc遠(yuǎn)程調(diào)用原理 :
比如 A調(diào)用B提供的remoteAdd方法:
首先A與B之間建立一個(gè)TCP連接;
然后A把需要調(diào)用的方法名(這里是remoteAdd)以及方法參數(shù)(10, 20)序列化成字節(jié)流發(fā)送出去;
B接受A發(fā)送過來的字節(jié)流,然后反序列化得到目標(biāo)方法名,方法參數(shù),接著執(zhí)行相應(yīng)的方法調(diào)用(可能是localAdd)并把結(jié)果30返回;
A接受遠(yuǎn)程調(diào)用結(jié)果,然后do()。
RPC框架也就是把上線說的具體的細(xì)節(jié)封裝起來,給用戶好用的API使用(提示:有些遠(yuǎn)程調(diào)用選擇比較底層的socket協(xié)議,有些遠(yuǎn)程調(diào)用選擇比較上層的HTTP協(xié)議);
一般rpc配合http協(xié)議的多點(diǎn),也就是走h(yuǎn)ttp的多。 當(dāng)然還是看應(yīng)用,我曾經(jīng)一共的rpc框架是基于zeromq的zerorpc。速度是挺快,server和client都有python的gevent支持,速度沒道理慢。(有興趣的,可以看看有關(guān)zerorpc的文章 http://rfyiamcool.blog.51cto.com/1030776/1254000 )最少要比python本身的xml-rpc要快。 rpc over http(基于http的rpc)有兩種協(xié)議,一種是xml-rpc ,還有一個(gè)是 json-rpc。
XML-RPC:XML Remote Procedure Call,即XML遠(yuǎn)程方法調(diào)用,利用http+xml封裝進(jìn)行RPC調(diào)用?;趆ttp協(xié)議傳輸、XML作為信息編碼格式。一個(gè)xml-rpc消息就是一個(gè)請求體為xml的http-post請求,服務(wù)端執(zhí)行后也以xml格式編碼返回。這個(gè)標(biāo)準(zhǔn)面前已經(jīng)演變?yōu)橄旅娴腟OAP協(xié)議??梢岳斫釹OAP是XML-RPC的高級(jí)版本。
JSON-RPC:JSON Remote Procedure Call,即JSON遠(yuǎn)程方法調(diào)用 。類似于XML-RPC,不同之處是使用JSON作為信息交換格式
下面是一個(gè)例子,很簡單。我們是用python的rpc庫SimpleXMLRPCServer 做的測試,創(chuàng)建rpc server,然后注冊一些函數(shù),供應(yīng)別的客戶端去調(diào)用。
- from SimpleXMLRPCServer import SimpleXMLRPCServer
- 原文:xiaorui.cc
- def add(x,y):
- return x+y
- def subtract(x, y):
- return x-y
- def multiply(x, y):
- return x*y
- def divide(x, y):
- return x/y
- # A simple server with simple arithmetic functions
- server = SimpleXMLRPCServer(("localhost", 8000))
- print "Listening on port 8000..."
- server.register_multicall_functions()
- server.register_function(add, 'add')
- server.register_function(subtract, 'subtract')
- server.register_function(multiply, 'multiply')
- server.register_function(divide, 'divide')
- server.serve_forever()
- import xmlrpclib
- proxy = xmlrpclib.ServerProxy("http://localhost:8000/")
- multicall = xmlrpclib.MultiCall(proxy)
- multicall.add(7,3)
- multicall.subtract(7,3)
- multicall.multiply(7,3)
- multicall.divide(7,3)
- result = multicall()
- print "7+3=%d, 7-3=%d, 7*3=%d, 7/3=%d" % tuple(result)
rpc本來是單任務(wù)的,如果任務(wù)相對(duì)頻繁,可以設(shè)置成多線程的默認(rèn),你不用在調(diào)用threading模塊什么的,直接引用 。
- class AsyncXMLRPCServer(SocketServer.ThreadingMixIn,SimpleXMLRPCServer): pass
然后rpc初始化的方法換成。
- server = AsyncXMLRPCServer(('', 1111), SimpleXMLRPCRequestHandler)
這里再說下,和xmlrpc相似的jsonrpc,貌似現(xiàn)在用xmlrpc的,要比jsonrpc的多點(diǎn)。 有時(shí)候到國外的it論壇看帖子,xmlrpc用的交多點(diǎn)。其實(shí)現(xiàn)在較大的公司,一般干脆直接自己實(shí)現(xiàn)了rpc框架,像淘寶Dubbo(朋友有搞過,搞了半天,沒有對(duì)接成接口,說是有難度,不明覺厲?。俣鹊膞xx(忘名字了)。
- import jsonrpc
- server = jsonrpc.Server(jsonrpc.JsonRpc20(), jsonrpc.TransportTcpIp(addr=("127.0.0.1", 31415), logfunc=jsonrpc.log_file("myrpc.log")))
- #原文:xiaorui.cc
- # 注冊一個(gè)函數(shù)方法
- def echo(s):
- return s
- def search(number=None, last_name=None, first_name=None):
- sql_where = []
- sql_vars = []
- if number is not None:
- sql_where.append("number=%s")
- sql_vars.append(number)
- if last_name is not None:
- sql_where.append("last_name=%s")
- sql_vars.append(last_name)
- if first_name is not None:
- sql_where.append("first_name=%s")
- sql_vars.append(first_name)
- sql_query = "SELECT id, last_name, first_name, number FROM mytable"
- if sql_where:
- sql_query += " WHERE" + " AND ".join(sql_where)
- cursor = ...
- cursor.execute(sql_query, *sql_vars)
- return cursor.fetchall()
- server.register_function( echo )
- server.register_function( search )
- # start server
- server.serve()
- # 創(chuàng)建jsonrpc客戶端
- import jsonrpc
- server = jsonrpc.ServerProxy(jsonrpc.JsonRpc20(), jsonrpc.TransportTcpIp(addr=("127.0.0.1", 31415)))
- #調(diào)用遠(yuǎn)端的一個(gè)函數(shù)
- result = server.echo("hello world")
- found = server.search(last_name='Python')
我做過一些個(gè)壓力的測試,XMLRPCSERVER的開了async之后,每個(gè)連接特意堵塞5秒,他的并發(fā)在40個(gè)左右 。也就是每秒成功40個(gè)左右,剩下的還是在堵塞等待中。 其實(shí)他的瓶頸不是在于rpc的本身,是承載rpc的那個(gè)basehttpserver,太弱爆了。
接收請求,調(diào)用方法 !
現(xiàn)在開源社區(qū)這么發(fā)達(dá),有不少人都根據(jù)rpc的協(xié)議,重寫了承載rpc的web服務(wù)。 比如用flask,tornado,配合uwsgi,你猜咋招了。。。。如果不堵塞連接,那還可以,如果堵塞連接,uwsgi的廢材特色就顯出來了,以前有文章說過,uwsgi是prework,他會(huì)預(yù)先啟動(dòng)進(jìn)程,官方都推薦要根據(jù)你的cpu核數(shù)或者超線程來開啟進(jìn)程,如果開的太多,你會(huì)發(fā)現(xiàn),uwsgi他是駕馭不了那么多進(jìn)程的。還是看我大tornado,用了@gen.engine之后。輕易飆到500的并發(fā)連接。
(以上是我的吃飽又蛋疼測試,沒聽過誰會(huì)重復(fù)調(diào)用那么多的堵塞方法,自評(píng) sx行為)
不多說了,看flask實(shí)現(xiàn)xmlrpc服務(wù)端的代碼,看了下flask xmlrpc的源碼,實(shí)現(xiàn)的不難。
- from flask import Flask
- from flaskext.xmlrpc import XMLRPCHandler, Fault
- app = Flask(__name__)
- handler = XMLRPCHandler('api')
- handler.connect(app, '/api')
- @handler.register
- def woca(name="world"):
- if not name:
- raise Fault("fuck...fuck", "fuck shencan!")
- return "Hello, %s!" % name
- 原文:xiaorui.cc
- app.run()
對(duì)于每個(gè)連接的超時(shí),有多種的方法,如果你用的是flask,tornado做web server,那就寫個(gè)裝飾器single起來,只是性能不好。 或者是前面掛一個(gè)nginx,然后做個(gè)client_header_timeout,client_body_timeout,proxy_connect_timeout(你懂的。),如果用的python自帶的xml-rpc的話,需要引入socket。
- import socket
- socket.setdefaulttimeout()
再說下rpc安全的問題。
至于安全方面,有興趣就開個(gè)ssl,或者是在程序里面判斷下client ip,反正配置都是統(tǒng)一下發(fā)的,你重載daemon的時(shí)候,也就知道該判斷什么ip了。
我個(gè)人對(duì)于rpc的應(yīng)用,更加的傾向于基本資源的獲取和調(diào)用,畢竟單純的用socket或者是mq,你在程序里面還要做一個(gè)解析過來的數(shù)據(jù),然后根據(jù)過來的數(shù)據(jù)在做調(diào)用。 (alert: 我想觸發(fā) add() ,如果是rpc的話,我不用管,只是傳過去就行了,到那時(shí)mq和socket就需要eval調(diào)用函數(shù)了),一些復(fù)雜的應(yīng)用還是喜歡用面向資源的rest,也推薦大家用這個(gè),靠譜的。