Python 代碼實(shí)踐小結(jié)
最近寫了較多的 Python 腳本,將最近自己寫的腳本進(jìn)行一個(gè)總結(jié),其中有些是 Python 獨(dú)有的,有些是所有程序設(shè)計(jì)中共有的:
考慮使用 Logger(logger 怎么配置,需要輸出哪些信息 — 可以反向考慮,自己看到這個(gè) logger 的時(shí)候想了解什么信息)
傳遞的數(shù)據(jù)結(jié)構(gòu)如何考慮(是否對(duì)調(diào)用方有先驗(yàn)知識(shí)的要求,比如返回一個(gè) Tuple,則需要用戶了解 tuple 中元素的順序,這樣情況是否應(yīng)該進(jìn)行封裝;),數(shù)據(jù)結(jié)構(gòu)定義清楚了,很多東西也就清楚了。
如何操作數(shù)據(jù)庫(kù)(可以學(xué)習(xí) sqlalchemy,包括 core 和 orm 兩種 api)
異常如何處理(異常應(yīng)該分開捕獲 — 可以清楚的知道什么情況下導(dǎo)致的,異常之后應(yīng)該打印日志說(shuō)明出現(xiàn)什么問(wèn)題,如果情況惡劣需要進(jìn)行異常再次拋出或者報(bào)警)
所有獲取資源的地方都應(yīng)該做 check(a. 沒(méi)有獲取到會(huì)怎么辦;b.獲取到異常的怎么辦)
所有操作資源的地方都應(yīng)該檢查是否操作成功
每個(gè)函數(shù)都應(yīng)該簡(jiǎn)短,如果函數(shù)過(guò)長(zhǎng)應(yīng)該進(jìn)行拆分(有個(gè)建議值,函數(shù)包含的行數(shù)應(yīng)該在 20-30 行之間,具體按照這個(gè)規(guī)范做過(guò)一次之后就會(huì)發(fā)現(xiàn)這樣真好)
使用 class 之后,考慮重構(gòu) __str__ 函數(shù),用戶打印輸出(如果不實(shí)現(xiàn) __str__,會(huì)調(diào)用 __repr__ ),如果對(duì)象放到 collection 中之后,需要實(shí)現(xiàn) __repr__ 函數(shù),用于打印整個(gè) collection 的時(shí)候,直觀顯示
如果有些資源會(huì)發(fā)生變化,可以單獨(dú)抽取出來(lái),做成函數(shù),這樣后續(xù)調(diào)用就可以不用改變了
附上一份 Python2.7 代碼(將一些私有的東西進(jìn)行了修改)
- # -*- coding:utf-8 -*-
- from sqlalchemy import create_engine
- import logging
- from logging.config import fileConfig
- import requests
- import Clinet # 私有的模塊
- fileConfig("logging_config.ini")
- logger = logging.getLogger("killduplicatedjob")
- #配置可以單獨(dú)放到一個(gè)模塊中
- DB_USER = "xxxxxxx"
- DB_PASSWORD = "xxxxxxxx"
- DB_PORT = 111111
- DB_HOST_PORT = "xxxxxxxxxx"
- DB_DATA_BASE = "xxxxxxxxxxx"
- REST_API_URL = "http://sample.com"
- engine = create_engine("mysql://%s:%s@%s:%s/%s" % (DB_USER, DB_PASSWORD, DB_HOST_PORT, DB_PORT, DB_DATA_BASE))
- # 這個(gè) class 是為了在函數(shù)間傳遞時(shí),不需要使用方了解屬性的具體順序而寫的,也可以放到一個(gè)單獨(dú)的模塊中
- class DuplicatedJobs(object):
- def __init__(self, app_id, app_name, user):
- self.app_id = app_id
- self.app_name = app_name
- self.user = user
- def __repr__(self):
- return '[appid:%s, app_name:%s, user:%s]' % (self.app_id, self.app_name, self.user)
- def find_duplicated_jobs():
- logger.info("starting find duplicated jobs")
- (running_apps, app_name_to_user) = get_all_running_jobs()
- all_apps_on_yarn = get_apps_from_yarn_with_queue(get_resource_queue())
- duplicated_jobs = []
- for app in all_apps_on_yarn:
- (app_id, app_name) = app
- if app_id not in running_apps:
- if not app_name.startswith("test"):
- logger.info("find a duplicated job, prefixed_name[%s] with appid[%s]" % (app_name, app_id))
- user = app_name_to_user[app_name]
- duplicated_jobs.append(DuplicatedJobs(app_id, app_name, user))
- else:
- logger.info("Job[%s] is a test job, would not kill it" % app_name)
- logger.info("Find duplicated jobs [%s]" % duplicated_jobs)
- return duplicated_jobs
- def get_apps_from_yarn_with_queue(queue):
- param = {"queue": queue}
- r = requests.get(REST_API_URL, params=param)
- apps_on_yarn = []
- try:
- jobs = r.json().get("apps")
- app_list = jobs.get("app", [])
- for app in app_list:
- app_id = app.get("id")
- name = app.get("name")
- apps_on_yarn.append((app_id, name))
- except Exception as e: #Exception ***進(jìn)行單獨(dú)的分開,針對(duì)每一種 Exception 進(jìn)行不同的處理
- logger.error("Get apps from Yarn Error, message[%s]" % e.message)
- logger.info("Fetch all apps from Yarn [%s]" % apps_on_yarn)
- return apps_on_yarn
- def get_all_running_jobs():
- job_infos = get_result_from_mysql("select * from xxxx where xx=yy")
- app_ids = []
- app_name_to_user = {}
- for (topology_id, topology_name) in job_infos:
- status_set = get_result_from_mysql("select * from xxxx where xx=yy")
- application_id = status_set[0][0]
- if "" != application_id:
- configed_resource_queue = get_result_from_mysql(
- "select * from xxxx where xx=yy")
- app_ids.append(application_id)
- app_name_to_user[topology_name] = configed_resource_queue[0][0].split(".")[1]
- logger.info("All running jobs appids[%s] topology_name2user[%s]" % (app_ids, app_name_to_user))
- return app_ids, app_name_to_user
- def kill_duplicated_jobs(duplicated_jobs):
- for job in duplicated_jobs:
- app_id = job.app_id
- app_name = job.app_name
- user = job.user
- logger.info("try to kill job[%s] with appid[%s] for user[%s]" % (app_name, app_id, user))
- try:
- Client.kill_job(app_id, user)
- logger.info("Job[%s] with appid[%s] for user[%s] has been killed" % (app_name, app_id, user))
- except Exception as e:
- logger.error("Can't kill job[%s] with appid[%s] for user[%s]" % (app_name, app_id, user))
- def get_result_from_mysql(sql):
- a = engine.execute(sql)
- return a.fetchall()
- # 因?yàn)橄旅娴馁Y源可能發(fā)生變化,而且可能包含一些具體的邏輯,因此單獨(dú)抽取出來(lái),獨(dú)立成一個(gè)函數(shù)
- def get_resource_queue():
- return "xxxxxxxxxxxxx"
- if __name__ == "__main__":
- kill_duplicated_jobs(find_duplicated_jobs())
其中 logger 配置文件如下(對(duì)于 Python 的 logger,官方文檔寫的非常好,建議讀一次,并且實(shí)踐一次)
- [loggers]
- keys=root, simpleLogger
- [handlers]
- keys=consoleHandler, logger_handler
- [formatters]
- keys=formatter
- [logger_root]
- level=WARN
- handlers=consoleHandler
- [logger_simpleLogger]
- level=INFO
- handlers=logger_handler
- propagate=0
- qualname=killduplicatedjob
- [handler_consoleHandler]
- class=StreamHandler
- level=WARN
- formatter=formatter
- args=(sys.stdout,)
- [handler_logger_handler]
- class=logging.handlers.RotatingFileHandler
- level=INFO
- formatter=formatter
- args=("kill_duplicated_streaming.log", "a", 52428800, 3,)
- [formatter_formatter]
- format=%(asctime)s %(name)-12s %(levelname)-5s %(message)s