自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Python 語法校驗(yàn)接口異步非阻塞實(shí)現(xiàn)

開發(fā) MySQL
本文主要記錄線上案例中將同步阻塞代碼修改為異步非阻塞的過程,期間介紹 goInception 的基本使用、多進(jìn)程實(shí)現(xiàn)踩的坑、測試 tornado 中異步的多種實(shí)現(xiàn),最終使用協(xié)程,并對(duì)使用期間遇到的性能問題進(jìn)行了簡單的分析。

引言

本文主要記錄線上案例中將同步阻塞代碼修改為異步非阻塞的過程,期間介紹 goInception 的基本使用、多進(jìn)程實(shí)現(xiàn)踩的坑、測試 tornado 中異步的多種實(shí)現(xiàn),最終使用協(xié)程,并對(duì)使用期間遇到的性能問題進(jìn)行了簡單的分析。

現(xiàn)象

背景:SQL 工單應(yīng)用基于 tornado 框架實(shí)現(xiàn),其中實(shí)現(xiàn)多個(gè)接口,包括語法校驗(yàn)接口,其中語法校驗(yàn)基于開源項(xiàng)目 goInception 實(shí)現(xiàn)。對(duì)于超長 SQL 或多實(shí)例的場景,經(jīng)常出現(xiàn)語法校驗(yàn)超時(shí)的問題,原因是接口阻塞,IO 操作導(dǎo)致服務(wù) block。

需求:改造接口實(shí)現(xiàn),從同步阻塞修改為異步非阻塞,緩解語法校驗(yàn)超時(shí)的問題。

當(dāng)前實(shí)現(xiàn)

語法校驗(yàn)接口

class StartSyncCheckHandler(tornado.web.RequestHandler):
  
  def post(self):
   ...
      return_data = mysql_check(job_option)
      self.finish(return_data)

接口中調(diào)用 goinception 實(shí)現(xiàn)語法校驗(yàn),goinception 使用的主要流程如下所示。

conn = self._get_inception_server_connection()
  cursor = conn.cursor()
  cursor.execute(self._get_inception_check_statement())
  query_result = cursor.fetchall()
  cursor.close()

由于 goinception 支持使用 MySQL 客戶端連接,因此和 MySQL 的使用方式相同,主要包括:

  • 創(chuàng)建連接
  • 創(chuàng)建 cursor
  • 提交校驗(yàn)
  • 獲取校驗(yàn)結(jié)果

使用 pymysql 創(chuàng)建連接,其中指定的 goinception 服務(wù)的 IP 和端口。

def _get_inception_server_connection():
  return pymysql.connect(
      host=GoInceptionServerConfig.mysql_ip,
      user=GoInceptionServerConfig.MySQL_User,
      passwd=GoInceptionServerConfig.MySQL_Password,
      port=GoInceptionServerConfig.MySQL_Port,
      charset=GoInceptionServerConfig.MySQL_Charset,
      db=GoInceptionServerConfig.Database_Name
  )

執(zhí)行校驗(yàn)前生成提交給 goinception 的審核語句。

def _get_inception_check_statement(self):
        """
        獲取MySQL Inception使用的檢查語句
        :return:
        """
        backup_option = "--execute=0;--backup=0"
        run_option = "--check=1"
        inception_statement = """/*--user={inception_user};--password={inception_password};\
--host={inception_host};--port={inception_port};{run_option};{backup_option};*/ 
inception_magic_start;
{sql_script}
inception_magic_commit;
""".format(
            inception_user=self.mysql_user,
            inception_password=self.mysql_password,
            inception_host=self.mysql_ip,
            inception_port=self.mysql_port,
            run_option=run_option,
            backup_option=backup_option,
            sql_script=self.full_sql_script
        )
        return inception_statement

其中:

  • 要求使用/* */將提交的信息括起來,其中每個(gè)參數(shù)通過分號(hào)分隔,包括要審核或執(zhí)行的語句,包括use database語句也要加分號(hào),這一點(diǎn)與 MySQL 客戶端不同;
  • 參數(shù)中的 IP 和端口是要校驗(yàn)的 SQL 對(duì)應(yīng)的數(shù)據(jù)庫;
  • 定--check=1;--execute=0,表示使用審核,不使用執(zhí)行;
  • goinception 支持語句塊的審核,要求通過執(zhí)行接口將要審核的 SQL 一次性提交,內(nèi)部拆分后一條條審核。其中inception_magic_start;作為語句塊的開始,inception_magic_start;作為語句塊的結(jié)束。

多進(jìn)程啟動(dòng)

tornado 默認(rèn)使用單進(jìn)程啟動(dòng),因此首先改用多進(jìn)程啟動(dòng),具體實(shí)現(xiàn)是在啟動(dòng)時(shí)指定進(jìn)程數(shù)。

tornado.options.parse_command_line()
    app = Application()
    http_server = tornado.httpserver.HTTPServer(app)
    port = options.get("port", 8001)
    http_server.listen(port, "0.0.0.0")

    logging.warning("Server is running at http://0.0.0.0:%s" % port)

    tornado.ioloop.IOLoop.instance().start()
  
    # 多進(jìn)程啟動(dòng)
    # fork not available on windows。在windows上啟本動(dòng)服務(wù)需注釋掉下面這行
    http_server.start(8)  # Forks multiple sub-processes

但是很快就發(fā)現(xiàn)了這種實(shí)現(xiàn)的缺點(diǎn)。主要包括:

  • 并發(fā)數(shù)有上限,超過進(jìn)程數(shù)后依然會(huì)發(fā)生阻塞等待,比如分庫分表語法校驗(yàn);
  • 多個(gè)接口之間相互影響,當(dāng)其他比較慢的接口用完了進(jìn)程數(shù),單實(shí)例的語法校驗(yàn)也會(huì)發(fā)生阻塞等待。

下面是兩個(gè)案例的具體介紹。

案例1:并發(fā)數(shù)超過上限后語法校驗(yàn)慢

時(shí)間:2023-09-05 10:28:37

現(xiàn)象:分庫分表語法校驗(yàn)超時(shí),16 個(gè)實(shí)例,每個(gè)實(shí)例 4 個(gè) database,每個(gè) database 256 個(gè)表,一共 16,384 個(gè)表。

日志

其中:

  • 一批接收并處理 8 個(gè)請(qǐng)求,每個(gè)請(qǐng)求的執(zhí)行用時(shí)在 4s 左右;
  • 每當(dāng)一個(gè)請(qǐng)求返回后接收并處理下一個(gè)請(qǐng)求。

監(jiān)控顯示接口的 TP99 達(dá)到 9s,是接口實(shí)際執(zhí)行用時(shí)的兩倍左右。

監(jiān)控顯示 SQL 工單應(yīng)用服務(wù)器 CPU 打滿,持續(xù)時(shí)間 30s 左右。

案例 2:其他接口慢導(dǎo)致執(zhí)行接口調(diào)用慢,如果調(diào)用語法校驗(yàn),同樣也會(huì)慢

時(shí)間:20230802 20:02

現(xiàn)象:執(zhí)行接口調(diào)用慢,判斷原因是空間檢測進(jìn)程占用進(jìn)程所致。

監(jiān)控顯示同一時(shí)間空間檢測接口的 TP99 超過 10s。

執(zhí)行接口正常情況下接口調(diào)用很快,主要是執(zhí)行一條 update SQL。查看執(zhí)行接口的日志,其中關(guān)鍵字 'xbp_id': 6044322 表示單號(hào)。

root@sql-inception-c3acb574:~# head 6044322.log 
2023-08-02 20:01:12,807 [MainThread:140461759665984] [sql] [start_execute_job:post:44] [INFO]- request_body_dict: {'job_uuid': '28182d96-905e-4807-aa4d-134f840cfe86', 'execute_method': 1, 'xbp_id': 6044322}
2023-08-02 20:01:12,807 [MainThread:140461759665984] [sql] [start_execute_job:post:50] [WARNING]- job_uuid=28182d96-905e-4807-aa4d-134f840cfe86, xbp_id=6044322
2023-08-02 20:01:12,888 [MainThread:140461759665984] [sql] [start_execute_job:post:66] [INFO]- return_data={'code': 0, 'message': 'start execute job success'}

工單 6044322 中有 37 個(gè)實(shí)例,調(diào)用執(zhí)行接口 37 次,全部日志顯示執(zhí)行接口調(diào)用的時(shí)間相差 20s,原因是接口阻塞。

root@sql-inception-c3acb574:~# cat /export/Logs/sql.log | grep "start_execute_job:post:44" | grep 6044322 | awk '{print $2}'
20:01:12,807
20:01:12,890
...
20:01:37,194
20:01:37,275
...
20:01:39,363
20:01:39,402

可以對(duì)比另一個(gè)工單 6051798 中有 64 個(gè)實(shí)例,調(diào)用執(zhí)行接口 64 次,全部日志顯示執(zhí)行接口調(diào)用的時(shí)間相差 0.2s。

root@sql-inception-c3acb574:~# cat /export/Logs/sql.log | grep "start_execute_job:post:44" | grep 6051798 | awk '{print $2}'
13:36:31,203
13:36:31,203
...
13:36:31,398
13:36:31,398

顯然,對(duì)于接口阻塞的問題,簡單實(shí)用多進(jìn)程無法解決該問題,因此下面測試將接口改為異步非阻塞模式。

測試

準(zhǔn)備

接口實(shí)現(xiàn),其中調(diào)用 time.sleep(2) 模擬阻塞。

import datetime
import json
import time

import tornado.web


class AsyncHandler(tornado.web.RequestHandler):
    def post(self):
        data = json.loads(self.request.body)
        number = data.get("number")
        receive_time = datetime.datetime.now()
        print("=== receive number={} in {}".format(number, receive_time))
        print("==== {} enter ====".format(number))
        
        time.sleep(2)
        
        data = {
            "code": 0
        }
        print("==== {} exit ====".format(number))
        self.finish(data)

接口調(diào)用,其中使用線程池并發(fā)調(diào)用接口測試是否阻塞。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import datetime
import json

import requests

from common_helper.thread_pool import ThreadPool


def call_api(args):
    db_number = args[0] + 1
    print("send number={} in {}".format(db_number, datetime.datetime.now()))
    api = "async/"
    body = {
        "number": db_number
    }
    result = send_request(api, body=body)
    return result


def send_request(api, body):
    domain = "http://127.0.0.1:8000/"# 調(diào)用多接口時(shí)便于區(qū)分環(huán)境
    url = domain + api
    headers = {
        'content-type': "application/json",
    }
    result = requests.post(url=url, data=json.dumps(body), headers=headers)
    return result.content


def main():
    start_time = datetime.datetime.now()
    print(start_time)

    param_list = range(3)
    pool = ThreadPool(call_api, param_list)
    res_list = pool.run()
    print(res_list)
    end_time = datetime.datetime.now()
    print(end_time)
    use_time = end_time - start_time
    print("Use time={:.2f} seconds".format(use_time.total_seconds()))


if __name__ == '__main__':
    main()

阻塞

服務(wù)端日志

=== receive number=2 in2025-02-0920:03:15.001429
==== 2 enter ====
==== 2 exit ====
=== receive number=1in2025-02-0920:03:17.002924
==== 1 enter ====
==== 1 exit ====
=== receive number=3in2025-02-0920:03:19.008361
==== 3 enter ====
==== 3 exit ====

其中:

  • 每次調(diào)用返回后執(zhí)行下一次調(diào)用,表明接口阻塞;
  • 顯示 15、17、19 秒分別接收到請(qǐng)求,每個(gè)相差 2 秒。

客戶端日志

2025-02-0920:03:14.989916
send number=1in2025-02-0920:03:14.990300
send number=2in2025-02-0920:03:14.990630
send number=3in2025-02-0920:03:14.990811
[b'{"code": 0}', b'{"code": 0}', b'{"code": 0}']
2025-02-0920:03:21.012274
Use time=6.02 seconds

其中:

  • 并發(fā)請(qǐng)求同時(shí)發(fā)送,時(shí)間都是 14 秒,這里可以留一個(gè)問題,客戶端發(fā)送請(qǐng)求但是服務(wù)端阻塞未處理時(shí)請(qǐng)求保存在哪里?
  • 接口的 3 次調(diào)用總用時(shí) 6s,每次請(qǐng)求分別用時(shí) 2、4、6 秒。

接下來分別測試通過多種方法將阻塞接口修改為非阻塞。

async + await

將代碼中的阻塞方法 time.sleep 修改為非阻塞方法 tornado.gen.sleep。

在 Tornado 中,tornado.gen.sleep(2) 是一個(gè)協(xié)程,它會(huì)暫停當(dāng)前協(xié)程的執(zhí)行,等待指定的時(shí)間(在本例中是 2 秒)后再恢復(fù)執(zhí)行。為了使用這個(gè)協(xié)程,我們需要在調(diào)用它的函數(shù)前面加上 async def 關(guān)鍵字,并在調(diào)用 tornado.gen.sleep(2) 時(shí)使用 await 關(guān)鍵字。

async def post(self):
      ...
        # time.sleep(2)
        await tornado.gen.sleep(2)
        ...
        # self.finish(data)  # Coroutine 'finish' is not awaited
        await self.finish(data)

服務(wù)端日志顯示接口處理過程中可以正常接收請(qǐng)求。

=== receive number=2 in2025-02-0920:28:59.343644
==== 2 enter ====
=== receive number=1in2025-02-0920:28:59.343680
==== 1 enter ====
=== receive number=3in2025-02-0920:28:59.343702
==== 3 enter ====
==== 2 exit ====
==== 1 exit ====
==== 3 exit ====

客戶端日志

2025-02-0920:28:59.332715
send number=1in2025-02-0920:28:59.333061
send number=2in2025-02-0920:28:59.333485
send number=3in2025-02-0920:28:59.333753
[b'{"code": 0}', b'{"code": 0}', b'{"code": 0}']
2025-02-0920:29:01.346989
Use time=2.01 seconds

測試顯示通過將阻塞方法 time.sleep 修改為非阻塞方法 tornado.gen.sleep 可以實(shí)現(xiàn)非阻塞。

@tornado.gen.coroutine

tornado 中可以使用 gen.coroutine 裝飾器實(shí)現(xiàn)異步,用于將生成器函數(shù)轉(zhuǎn)換成協(xié)程,其中使用 yield 關(guān)鍵字來暫停和恢復(fù)執(zhí)行。

import tornado.web
import tornado.gen


class AsyncHandler(tornado.web.RequestHandler):
    @tornado.gen.coroutine  # 異步、協(xié)程處理;增加并發(fā)量
    def post(self):
       ...
        # time.sleep(2)
        yield tornado.gen.sleep(2)
        ...
        self.finish(data)

其中:

  • 用 tornado.gen.sleep 替換 time.sleep,fininsh 方法前不需要加 await,這種異步的實(shí)現(xiàn)可以像同步函數(shù)一樣編寫,便于理解與維護(hù)。

服務(wù)端日志

=== receive number=1 in 2025-02-09 20:42:11.404081
==== 1 enter ====
=== receive number=3 in 2025-02-09 20:42:11.404847
==== 3 enter ====
=== receive number=2 in 2025-02-09 20:42:11.404895
==== 2 enter ====
==== 1 exit ====
==== 3 exit ====
==== 2 exit ====

其中:

  • 每次調(diào)用返回前開始執(zhí)行下一次調(diào)用,因此是非阻塞;

客戶端日志顯示同樣可以實(shí)現(xiàn)異步非阻塞。

2025-02-09 20:42:11.388831
send number=1 in 2025-02-09 20:42:11.389133
send number=2 in 2025-02-09 20:42:11.389564
send number=3 in 2025-02-09 20:42:11.389789
[b'{"code": 0}', b'{"code": 0}', b'{"code": 0}']
2025-02-09 20:42:13.407953
Use time=2.02 seconds

需要注意的是單純的 yield 并不能實(shí)現(xiàn)異步非阻塞,要求 yield 掛起的函數(shù)必須是非阻塞函數(shù),比如這里如果還是使用 time.sleep 時(shí)依然阻塞。

@tornado.concurrent.run_on_executor

tornado 中的另一種實(shí)現(xiàn)是使用線程池后臺(tái)執(zhí)行,其中線程池使用單例模式。

from concurrent.futures import ThreadPoolExecutor

import tornado.web
import tornado.gen
import tornado.concurrent


class Executor(ThreadPoolExecutor):
    """
    單例模式實(shí)現(xiàn)線程池。大小為10
    """
    _instance = None

    def __new__(cls, *args, **kwargs):
        ifnot getattr(cls, "_instance", None):
            cls._instance = ThreadPoolExecutor(max_workers=10)
        return cls._instance


class AsyncHandler(tornado.web.RequestHandler):

    executor = Executor()

    @tornado.gen.coroutine  # 異步、協(xié)程處理;增加并發(fā)量
    def post(self):
       ...
        # time.sleep(2)
        yield self._process()
        ...
        self.finish(data)

    @tornado.concurrent.run_on_executor
    def _process(self):
        # RuntimeError: There is no current event loop in thread 'ThreadPoolExecutor-0_0'.
        # tornado.gen.sleep(2)
        time.sleep(2)

其中:

  • 將 time.sleep 封裝在 _process 函數(shù)中, _process 函數(shù)調(diào)用前要加 yield,否則后臺(tái)執(zhí)行直接返回,不會(huì)等待結(jié)束;
  • 不能使用 tornado.gen.sleep,否則報(bào)錯(cuò) RuntimeError: There is no current event loop in thread。

服務(wù)端日志

=== receive number=1 in2025-02-0920:49:27.726765
==== 1 enter ====
=== receive number=2in2025-02-0920:49:27.727211
==== 2 enter ====
=== receive number=3in2025-02-0920:49:27.727406
==== 3 enter ====
==== 2 exit ====
==== 3 exit ====
==== 1 exit ====

客戶端日志

2025-02-0920:49:27.716155
send number=1in2025-02-0920:49:27.716381
send number=2in2025-02-0920:49:27.716467
send number=3in2025-02-0920:49:27.716828
[b'{"code": 0}', b'{"code": 0}', b'{"code": 0}']
2025-02-0920:49:29.734317
Use time=2.02 seconds

測試顯示這種方式雖然可以實(shí)現(xiàn)異步非阻塞,但是本質(zhì)上還是線程池,因此無法滿足需求。

到這里可以再次明確需求,接口實(shí)現(xiàn)中有 IO 操作,需求是接口異步非阻塞,且需要等待 IO 返回結(jié)果,因此適合使用協(xié)程,原因是協(xié)程允許在執(zhí)行過程中暫停和恢復(fù),從而實(shí)現(xiàn)異步編程。

而 tornado 中支持兩種異步的實(shí)現(xiàn)方式,包括 yield 掛起函數(shù)與類線程池,這里使用前者。

優(yōu)化

測試

class AsyncInceptionHandler(tornado.web.RequestHandler):
    def post(self):
        data = json.loads(self.request.body)
        number = data.get("number")
        receive_time = datetime.datetime.now()
        print("=== receive number={} in {}".format(number, receive_time))
        print("==== {} enter ====".format(number))

        inception_main()

        data = {
            "code": 0
        }
        print("==== {} exit ====".format(number))
        self.finish(data)
        
        
def inception_main():
    start_time = datetime.datetime.now()
    database_name = "cctest"
    sql_script = "select 1;"
  inception_test(database_name, sql_script)
    end_time = datetime.datetime.now()
    print(end_time.second - start_time.second)

    
def inception_test(database_name, sql_script):
    # 調(diào)用 goinception
    my_inception = GoInception(
        mysql_ip="x.x.x.x",
        mysql_port=3358,
        database_name=database_name,
        sql_script=sql_script,
    )

    check_data = my_inception.check_sql()
    return check_data


class GoInception(object):
    def check_sql(self):
          check_result = self._get_inception_check_result()
          returun check_result

    def _get_inception_check_result(self):
            """
            獲取MySQL Inception對(duì)腳本進(jìn)行check操作后的結(jié)果
            :return:
            """
            cursor = self._get_inception_server_connection().cursor()
            cursor.execute(self._get_inception_check_statement())
            query_result = cursor.fetchall()
            return query_result

  @staticmethod
    def _get_inception_server_connection():
        """
        獲取MySQL Inception的連接
        :return:
        """
        return pymysql.connect(
            host=GoInceptionServerConfig.mysql_ip,
            user=GoInceptionServerConfig.mysql_user,
            passwd=GoInceptionServerConfig.mysql_password,
            port=GoInceptionServerConfig.mysql_port,
            charset=GoInceptionServerConfig.mysql_charset,
            db=GoInceptionServerConfig.database_name
        )

測試顯示接口單次調(diào)用執(zhí)行用時(shí) 5.4s

2025-02-16 21:46:56.962885
send number=1 in 2025-02-16 21:46:56.963171
[b'{"code": 0}']
2025-02-16 21:47:02.366053
Use time=5.40 seconds

服務(wù)端日志

Request received: /block_inception/
=== receive number=1 in 2025-02-16 21:46:56.972630
==== 1 enter ====
cursor = <pymysql.cursors.Cursor object at 0x7fad806061f0>
execute done
get result
error:
-54
==== 1 exit ====
Request processed: /block_inception/, Elapsed time: 5.39 seconds

線程池并發(fā) 3 次調(diào)用用時(shí) 6.25s,基本上等于單次調(diào)用用時(shí)的三倍。

2025-02-16 21:48:34.918864
send number=1 in 2025-02-16 21:48:34.919095
send number=2 in 2025-02-16 21:48:34.919488
send number=3 in 2025-02-16 21:48:34.919667
[b'{"code": 0}', b'{"code": 0}', b'{"code": 0}']
2025-02-16 21:48:52.591335
Use time=17.67 seconds

服務(wù)端日志顯示串行執(zhí)行,阻塞,單次執(zhí)行用時(shí)穩(wěn)定在 5s 左右。

Request received: /block_inception/
=== receive number=3in2025-02-1621:48:34.928843
==== 3 enter ====
cursor = <pymysql.cursors.Cursor object at 0x7f7de8836370>
execute done
get result
error:
8
==== 3 exit ====
Request processed: /block_inception/, Elapsed time: 7.51 seconds
Request received: /block_inception/
=== receive number=1in2025-02-1621:48:42.440652
==== 1 enter ====
cursor = <pymysql.cursors.Cursor object at 0x7f7de8816e20>
execute done
get result
error:
6
==== 1 exit ====
Request processed: /block_inception/, Elapsed time: 5.79 seconds
Request received: /block_inception/
=== receive number=2in2025-02-1621:48:48.230043
==== 2 enter ====
cursor = <pymysql.cursors.Cursor object at 0x7f7de8816d90>
execute done
get result
error:
4
==== 2 exit ====
Request processed: /block_inception/, Elapsed time: 4.36 seconds

下面將同步阻塞代碼修改為異步非阻塞模式。

aiomysql

使用 aiomyqsl + async + await

class AsyncInceptionHandler(tornado.web.RequestHandler):
    @tornado.gen.coroutine
    def post(self):
        data = json.loads(self.request.body)
        number = data.get("number")
        receive_time = datetime.datetime.now()
        print("=== receive number={} in {}".format(number, receive_time))
        print("==== {} enter ====".format(number))

        yield inception_main()

        data = {
            "code": 0
        }
        print("==== {} exit ====".format(number))
        self.finish(data)


asyncdef inception_main():
    start_time = datetime.datetime.now()
    database_name = "cctest"
    sql_script = "select 1;"
    await inception_test(database_name, sql_script)
    end_time = datetime.datetime.now()
    print(end_time.second - start_time.second)


asyncdef inception_test(database_name, sql_script):
    # 調(diào)用 goinception
    my_inception = GoInception(
        mysql_ip="x.x.x.x",
        mysql_port=3358,
        database_name=database_name,
        sql_script=sql_script,
    )

    check_data = await my_inception.check_sql()
    return check_data
    

class GoInception(object):
    asyncdef check_sql(self):
        check_result = await self._get_inception_check_result()
        return check_result
        
    asyncdef _get_inception_check_result(self):
        """
        獲取MySQL Inception對(duì)腳本進(jìn)行check操作后的結(jié)果
        :return:
        """
        cursor = await self._get_inception_server_cursor()
        await cursor.execute(self._get_inception_check_statement())
        query_result = await cursor.fetchall()
        await cursor.close()
        return query_result
      
    @staticmethod
    asyncdef _get_inception_server_cursor():
        """
        獲取MySQL Inception的連接
        :return:
        """
        conn = await aiomysql.connect(
            host=GoInceptionServerConfig.mysql_ip,
            user=GoInceptionServerConfig.mysql_user,
            password=GoInceptionServerConfig.mysql_password,
            port=GoInceptionServerConfig.mysql_port,
            charset=GoInceptionServerConfig.mysql_charset,
            db=GoInceptionServerConfig.database_name
        )
        cursor = await conn.cursor()
        return cursor

測試結(jié)果

服務(wù)端日志顯示請(qǐng)求非阻塞,但是單次執(zhí)行用時(shí)穩(wěn)定在 9s 左右,顯示比阻塞請(qǐng)求的用時(shí)更長。

Request received: /inception/
=== receive number=1in2025-02-1621:50:16.826816
==== 1 enter ====
Request received: /inception/
=== receive number=3in2025-02-1621:50:16.828231
==== 3 enter ====
Request received: /inception/
=== receive number=2in2025-02-1621:50:16.828826
==== 2 enter ====
cursor = <aiomysql.cursors.Cursor object at 0x7f7de8836880>
cursor = <aiomysql.cursors.Cursor object at 0x7f7de8836640>
cursor = <aiomysql.cursors.Cursor object at 0x7f7de8836490>
execute done
get result
error:
8
==== 3 exit ====
Request processed: /inception/, Elapsed time: 7.76 seconds
execute done
get result
error:
10
==== 1 exit ====
Request processed: /inception/, Elapsed time: 9.38 seconds
execute done
get result
error:
11
==== 2 exit ====
Request processed: /inception/, Elapsed time: 10.68 seconds

客戶端日志

2025-02-1621:50:16.815495
send number=1in2025-02-1621:50:16.815727
send number=2in2025-02-1621:50:16.815810
send number=3in2025-02-1621:50:16.815895
[b'{"code": 0}', b'{"code": 0}', b'{"code": 0}']
2025-02-1621:50:27.509801
Use time=10.69 seconds

測試顯示優(yōu)化后不阻塞了,但是第一個(gè)請(qǐng)求需要等待最后一個(gè)請(qǐng)求結(jié)束才返回,整體執(zhí)行反倒變慢。

因此問題就是前面的請(qǐng)求為什么發(fā)生等待?

goinception

查看 goinception 的日志,其中顯示請(qǐng)求同時(shí)進(jìn)入,但是也幾乎同時(shí)返回,因此懷疑慢不是應(yīng)用的原因。

time="2025/02/16 21:50:17.334" level=info msg="con:79 new connection 100.124.212.72:48060" file=server.go func=onConn line=319
time="2025/02/16 21:50:17.340" level=info msg="con:80 new connection 100.124.212.72:48062" file=server.go func=onConn line=319
time="2025/02/16 21:50:17.341" level=info msg="con:81 new connection 100.124.212.72:48064" file=server.go func=onConn line=319
time="2025/02/16 21:50:24.848" level=info msg="con:80 close connection" file=server.go func=func1 line=321
time="2025/02/16 21:50:26.504" level=info msg="con:81 close connection" file=server.go func=func1 line=321
time="2025/02/16 21:50:27.753" level=info msg="con:79 close connection" file=server.go func=func1 line=321

使用 top 命令查看機(jī)器負(fù)載

其中:

  • MySQL 進(jìn)程 CPU 使用率 92%;
  • goIncetpion 進(jìn)程 CPU 使用率 54%。

上線后測試顯示機(jī)器負(fù)載不高的前提下可以實(shí)現(xiàn)異步非阻塞,但是 CPU 使用率高的問題還有待分析與解決。

下面使用 perf 分析 CPU。

首先使用 perf rerord 命令記錄程序運(yùn)行期間的性能時(shí)間,默認(rèn)將性能事件保存到 perf.data 文件中。

[root@test ~]# perf record -g -a sleep 15
[ perf record: Woken up 33 times to write data ]
[ perf record: Captured and wrote 8.452 MB perf.data (51875 samples) ]
[root@test ~]# 
[root@test ~]# ll -rth
-rw------- 1 root root 8.5M Feb 16 21:50 perf.data

然后使用 perf report 命令分析 perf record 保存到性能事件。

perf report -g

其中:

  • Symbol 表示函數(shù)名,其中 [.] 表示用戶空間函數(shù),[k] 表示內(nèi)核空間;
  • Shared Object 表示函數(shù)所在的共享庫或所在的程序;
  • Command 表示進(jìn)程名;
  • Children 表示該函數(shù)的 CPU 使用率;
  • Self 表示該函數(shù)的子函數(shù)的 CPU 使用率。

perf report --sort cpu -g --stdio

結(jié)合 top 與 perf 的結(jié)果,判斷接口返回慢的原因是機(jī)器負(fù)載高,機(jī)器負(fù)載高的原因主要包括 MySQL 連接處理與 goInception SQL 解析,具體有待進(jìn)一步分析。

下面是用到的命令與參數(shù)。

其中:

  • perf record -g -a sleep 15
  • -g 表示保存函數(shù)調(diào)用的堆棧關(guān)系;
  • -a 表示記錄所有 CPU 上的數(shù)據(jù);
  • sleep 15 表示 perf record 命令之后要運(yùn)行的命令,sleep 15 命令會(huì)讓進(jìn)程休眠 10 秒鐘,perf record 記錄執(zhí)行期間的所有事件。
  • perf report -g
  • -g 表示顯示調(diào)用堆棧,其中快捷鍵 E 展開,C 收起。

  • perf report --sort cpu -g --stdio

  • --sort cpu 表示按照 CPU 使用率排序,默認(rèn)倒序;

  • --stdio 表示以文本模式顯示報(bào)告。

結(jié)論

SQL 工單應(yīng)用中遇到語法校驗(yàn)超時(shí)的問題,原因是接口同步阻塞,語法校驗(yàn)最耗時(shí)的是 IO 操作,期間服務(wù) block。

最開始的優(yōu)化方案是將應(yīng)用啟動(dòng)方式從單進(jìn)程修改為多進(jìn)程,但事實(shí)證明這種方式并不合理,原因是超過上限后依然阻塞,甚至多個(gè)接口之間相互影響。

因此將代碼從同步阻塞修改為異步非阻塞,tornado 中支持兩種異步的實(shí)現(xiàn)方式,包括 yield 掛起函數(shù)與類線程池,這里使用前者。當(dāng)然要求 yield 掛起的函數(shù)支持非阻塞,這里使用 aiomysql 替換 pymysql、async def 替換 def。

修改后測試顯示接口不阻塞,但是第一個(gè)請(qǐng)求需要等待最后一個(gè)請(qǐng)求結(jié)束才返回,整體執(zhí)行反倒變慢。結(jié)合 top 與 perf 的結(jié)果,判斷接口返回慢的原因是機(jī)器負(fù)載高,機(jī)器負(fù)載高的原因主要包括 MySQL 連接處理與 goInception SQL 解析,具體有待進(jìn)一步分析。

關(guān)于異步非阻塞與 CPU 使用率高的分析方法還需要深入學(xué)習(xí),本文中暫未介紹。

責(zé)任編輯:華軒 來源: 丹柿小院
相關(guān)推薦

2019-07-23 11:01:57

Python同步異步

2022-06-22 08:16:29

異步非阻塞框架

2024-09-23 17:15:28

Python并發(fā)并行

2021-06-04 18:14:15

阻塞非阻塞tcp

2012-10-10 10:00:27

同步異步開發(fā)Java

2012-02-22 21:15:41

unixIO阻塞

2018-03-28 08:52:53

阻塞非阻塞I

2021-02-27 16:08:17

Java異步非阻塞

2023-12-06 07:28:47

阻塞IO異步IO

2016-11-28 09:08:43

java系統(tǒng)異步非阻塞

2015-07-03 10:12:04

編程同步非阻塞

2021-01-10 11:21:33

JavaScript語言開發(fā)

2024-09-05 09:41:57

2021-03-04 08:34:55

同步阻塞非阻塞

2024-12-02 00:57:17

非阻塞異步編程

2022-09-22 10:51:32

服務(wù)端開發(fā)者異步非阻塞編程

2024-11-26 10:37:19

2024-08-05 09:16:54

2017-03-01 16:40:12

Linux驅(qū)動(dòng)技術(shù)設(shè)備阻塞

2019-05-05 08:50:42

阻塞非阻塞BIO
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)