Python 自動(dòng)化:eip、cen監(jiān)控?cái)?shù)據(jù)對(duì)接到grafana
概覽
日常運(yùn)維中,我們有時(shí)需要關(guān)注阿里云中 EIP 和 CEN 的監(jiān)控?cái)?shù)據(jù),如果每次登錄到平臺(tái)查看,不太方便。
可以通過(guò) API 獲取監(jiān)控?cái)?shù)據(jù),并輸入到 influxDB,然后再到 Grafana 中展示,以便進(jìn)行實(shí)施監(jiān)控和可視化。
第一步:準(zhǔn)備工作
在開(kāi)始之前,我們需要確保已經(jīng)完成以下準(zhǔn)備工作
準(zhǔn)備阿里云的EIP和CEN實(shí)例
這一步省略
了解如何獲取EIP和CEN數(shù)據(jù)
了解如何獲取 EIP 和 CEN 數(shù)據(jù)
我的方式是 EIP 通過(guò) EIP 產(chǎn)品的 API 獲取的,調(diào)試鏈接如下
輸入 RegionId 和 AllocationId 等必選信息后,復(fù)制平臺(tái)生成的代碼,進(jìn)行更改,下文會(huì)介紹如何更改
圖片
CEN 的監(jiān)控?cái)?shù)據(jù)未找到具體的 API,但可以通過(guò)云監(jiān)控的數(shù)據(jù)獲取,也是很方便的,鏈接如下
https://api.aliyun.com/api/Cms/2019-01-01/DescribeMetricData
獲取 CEN 的具體數(shù)據(jù)時(shí),可以通過(guò) https://cms.console.aliyun.com/metric-meta/acs_cen/cen_tr?spm=a2c4g.11186623.0.0.252476ab1Ldq0T 得到
實(shí)際上,EIP 的數(shù)據(jù)也可以通過(guò)云監(jiān)控獲取
安裝Python和所需的依賴庫(kù)
下面示例的版本是截止發(fā)文時(shí)間最新版本,實(shí)際使用時(shí),可以登錄到上面的阿里云開(kāi)放平臺(tái)查看最新的版本
pip install alibabacloud_vpc20160428==5.1.0
pip install alibabacloud_cms20190101==2.0.11
安裝InfluxDB,并進(jìn)行初始化配置
1. 為方便使用,我這里是使用 Docker 運(yùn)行的 Influxdb
cd /data/influxdb
# 生成初始的配置文件
docker run --rm influxdb:2.7.1 influxd print-config > config.yml
# 啟動(dòng)容器
docker run --name influxdb -d -p 8086:8086 --volume `pwd`/influxdb2:/var/lib/influxdb2 --volume `pwd`/config.yml:/etc/influxdb2/config.yml influxdb:2.7.1
2. 安裝完成后,可通過(guò) http://ip:8086 登錄到 Influxdb
3. 創(chuàng)建 bucket
只需要?jiǎng)?chuàng)建一個(gè) bucket 就可以了,bucket 類似 MySQL 的 database
4. 獲取 API Token,在 Python 插入數(shù)據(jù)時(shí)會(huì)用到
安裝Grafana,并進(jìn)行基本的配置
省略
第二步:獲取API訪問(wèn)憑證
為了能夠通過(guò)API訪問(wèn)阿里云的 EIP 和 CEN 數(shù)據(jù),我們需要獲取訪問(wèn)憑證。具體步驟如下
- 登錄阿里云控制臺(tái)
- 創(chuàng)建 RAM 用戶并分配相應(yīng)的權(quán)限
- 獲取 RAM 用戶的 Access Key ID 和 Access Key Secret
第三步:編寫Python腳本
使用Python編寫腳本來(lái)獲取 EIP 和 CEN 的監(jiān)控?cái)?shù)據(jù),并將其存儲(chǔ)到 InfluxDB 中
本文僅展示部分代碼,如需完整的代碼,請(qǐng)聯(lián)系本公眾號(hào)獲取~
調(diào)整從阿里云復(fù)制的示例代碼
1. 修改構(gòu)造函數(shù),可以傳如 access_key_id 和 access_key_secret
def __init__(self, access_key_id: str=access_key_id, access_key_secret: str=access_key_secret):
self.access_key_id = access_key_id
self.access_key_secret = access_key_secret
2. 修改獲取 eip 數(shù)據(jù)的函數(shù)
def get_eip_monitor_data(self, region_id, allocation_id, start_time: str, end_time: str):
'''
參考文檔:
https://api.aliyun.com/api/Vpc/2016-04-28/DescribeEipMonitorData?params={%22RegionId%22:%22cn-hangzhou%22}
Args:
region_id (_type_): _description_
allocation_id (_type_): _description_
start_time (str): utc時(shí)間
end_time (_type_): utc時(shí)間
Yields:
_type_: _description_
eip_tx: 流出的流量。單位: Byte
eip_rx: 流入的流量。單位: Byte
'''
# 請(qǐng)確保代碼運(yùn)行環(huán)境設(shè)置了環(huán)境變量 ALIBABA_CLOUD_ACCESS_KEY_ID 和 ALIBABA_CLOUD_ACCESS_KEY_SECRET。
# 工程代碼泄露可能會(huì)導(dǎo)致 AccessKey 泄露,并威脅賬號(hào)下所有資源的安全性。以下代碼示例使用環(huán)境變量獲取 AccessKey 的方式進(jìn)行調(diào)用,僅供參考,建議使用更安全的 STS 方式,更多鑒權(quán)訪問(wèn)方式請(qǐng)參見(jiàn):https://help.aliyun.com/document_detail/378659.html
client = self.create_client(endpoint=f'vpc.{region_id}.aliyuncs.com', access_key_id=self.access_key_id, access_key_secret=self.access_key_secret)
describe_eip_monitor_data_request = vpc_20160428_models.DescribeEipMonitorDataRequest(
region_id=region_id,
allocation_id=allocation_id,
start_time=start_time,
end_time=end_time
)
log.debug(msg=describe_eip_monitor_data_request)
runtime = util_models.RuntimeOptions()
log.debug(msg=runtime)
try:
# 復(fù)制代碼運(yùn)行請(qǐng)自行打印 API 的返回值
results = client.describe_eip_monitor_data_with_options(describe_eip_monitor_data_request, runtime).body.eip_monitor_datas.eip_monitor_data
for result in results:
yield result
except Exception as error:
log.error(msg=error)
return UtilClient.assert_as_string(error.message)
3. 修改獲取 cen 數(shù)據(jù)的函數(shù)
def get_cen_monitor_data(self, namespace, metric_name, start_time: str, end_time: str):
# 請(qǐng)確保代碼運(yùn)行環(huán)境設(shè)置了環(huán)境變量 ALIBABA_CLOUD_ACCESS_KEY_ID 和 ALIBABA_CLOUD_ACCESS_KEY_SECRET。
# 工程代碼泄露可能會(huì)導(dǎo)致 AccessKey 泄露,并威脅賬號(hào)下所有資源的安全性。以下代碼示例使用環(huán)境變量獲取 AccessKey 的方式進(jìn)行調(diào)用,僅供參考,建議使用更安全的 STS 方式,更多鑒權(quán)訪問(wèn)方式請(qǐng)參見(jiàn):https://help.aliyun.com/document_detail/378659.html
client = self.create_client(access_key_id=self.access_key_id, access_key_secret=self.access_key_secret)
describe_metric_list_request = cms_20190101_models.DescribeMetricListRequest(
namespace=namespace,
metric_name=metric_name,
start_time=start_time,
end_time=end_time,
)
runtime = util_models.RuntimeOptions()
try:
# 復(fù)制代碼運(yùn)行請(qǐng)自行打印 API 的返回值
return client.describe_metric_list_with_options(describe_metric_list_request, runtime).body.datapoints
except Exception as error:
# 如有需要,請(qǐng)打印 error
UtilClient.assert_as_string(error.message)
編寫InfluxDB相關(guān)的代碼
將 InfluxDB 的寫入代碼獨(dú)立出來(lái)可以方便后續(xù)其他業(yè)務(wù)的調(diào)用
下面的代碼在獲取 token 時(shí),使用了 1password,可視情況進(jìn)行修改,例如通過(guò)環(huán)境變量的方式獲取 Token
#!/usr/bin/env python3
import influxdb_client, time
import datetime
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS
from modules.onepassword import OnePassword
my1p = OnePassword()
class InfluxClient:
token = my1p.get_item_by_title(title='my_influxdb')['api']
def __init__(self, url: str='http://10.1.1.1:8086', org: str='tyun', token: str=token):
self.url = url
self.org = org
self.token = token
def create_client(self):
return influxdb_client.InfluxDBClient(url=self.url, token=self.token, org=self.org)
def write_aliyun_eip(self, bucket: str='example', table_name: str='test1', location: str=None, eip_tx: int=None, eip_rx: int=None, time_stamp: str=None):
write_api = self.create_client().write_api(write_optinotallow=SYNCHRONOUS)
point = (
Point(table_name)
.tag("location", location)
.field("eip_tx", eip_tx)
.field("eip_rx", eip_rx)
.time(time_stamp)
)
write_api.write(bucket=bucket, org=self.org, record=point)
def write_cen(self, bucket: str='example', table_name: str='test1', location: str=None, tr_instance_id: str=None, value: int=None, time_stamp: str=None):
write_api = self.create_client().write_api(write_optinotallow=SYNCHRONOUS)
point = (
Point(table_name)
.tag("location", location)
.tag("tr_instance_id", tr_instance_id)
.field("value", value)
.time(time_stamp)
)
write_api.write(bucket=bucket, org=self.org, record=point)
def main():
influx_client = InfluxClient()
for i in range(5):
influx_client.write_data(bucket='example', table_name='test1', locatinotallow='hangzhou', EipBandwidth=i, EipFlow=i)
time.sleep(1)
if __name__ == '__main__':
main()
編寫主程序
1. 獲取 eip 并插入到 influxdb
#!/usr/bin/env python3
from collections import namedtuple
from modules.aliyun.eip import Eip
from modules.database.influxdb.write import InfluxClient
from modules.tools.my_time import MyDatetime as my_time
eip = Eip()
influx_client = InfluxClient()
def insert_data(region_id, location, table_name, allocation_id, start_time, end_time):
'''
_summary_
Args:
region_id (_type_): _description_
location (_type_): _description_
table_name (_type_): _description_
allocation_id (_type_): _description_
start_time (_type_): _description_
interval (int, optional): 取值的范圍, 默認(rèn)是5.
'''
eip_datas = eip.get_eip_monitor_data(region_id=region_id, allocation_id=allocation_id, start_time=start_time, end_time=end_time)
for eip_data in eip_datas:
# print(eip_data)
influx_client.write_aliyun_eip(bucket='example',
table_name=table_name,
locatinotallow=location,
eip_rx=eip_data.eip_rx,
eip_tx=eip_data.eip_tx,
time_stamp=eip_data.time_stamp)
Instance = namedtuple('Instance', ['region_id', 'allocation_id', 'bandwidth', 'env'])
hangzhou = Instance(region_id='hangzhou', allocation_id='eip-xxxxxxxxx', bandwidth='100m', env='prod')
eip_site_list = [hangzhou]
for eip_site in eip_site_list:
insert_data(region_id=f'cn-{eip_site.region_id}',
locatinotallow=f'cn-{eip_site.region_id}',
table_name='eip',
allocation_id=eip_site.allocation_id,
start_time=my_time.get_utc_now_str_offset(offset=-60*10),
end_time=my_time.get_utc_now_str()
)
2. 獲取 cen 數(shù)據(jù)并插入到 influxdb
#!/usr/bin/env python3
import ast
from modules.aliyun.metrics import Metrics
from modules.database.influxdb.write import InfluxClient
from modules.tools.my_time import MyDatetime as my_time
from modules.logger.client import LogClient
metrics = Metrics()
influx_client = InfluxClient()
log = LogClient(app='example_traffic')
def tr_instance_id_to_location(tr_instance_id):
if tr_instance_id == 'tr-xxxxxxxxxxxxx':
location = 'hangzhou'
bandwidth = '20m'
else:
location = 'none'
return location, bandwidth
metric_names = ['AttachmentOutRate', 'AttachmentInRate']
for metric_name in metric_names:
results = metrics.get_cen_monitor_data(namespace='acs_cen',
metric_name=metric_name,
start_time=my_time.get_utc_now_str_offset(offset=-60*10),
end_time=my_time.get_utc_now_str())
log.debug(msg=results)
for result in ast.literal_eval(results):
result['metric_name'] = metric_name
trInstanceId = result['trInstanceId']
result['location'] = tr_instance_id_to_location(tr_instance_id=trInstanceId)[0]
result['bandwidth'] = tr_instance_id_to_location(tr_instance_id=trInstanceId)[1]
log.info(msg=metric_name + ' ' + my_time.timestamp_to_str(timestamp=result['timestamp']) + ' ' + ' ' + result['location'] + ' ' + str(result['Value']))
influx_client.write_cen(bucket='example',
table_name=metric_name,
locatinotallow=result['location'],
tr_instance_id=result['trInstanceId'],
value=result['Value'],
time_stamp=my_time.timestamp_to_str(timestamp=result['timestamp']))
第四步:配置Grafana
在Grafana中配置 InfluxDB 數(shù)據(jù)源,并創(chuàng)建相應(yīng)的儀表盤來(lái)展示 EIP 和 CEN 的監(jiān)控?cái)?shù)據(jù)。具體步驟如下:
- 添加 InfluxDB 數(shù)據(jù)源,并配置連接信息我用的是 Flux 的查詢語(yǔ)言,配置數(shù)據(jù)源時(shí),需要注意以下事項(xiàng):
數(shù)據(jù)源名字推薦使用:InfluxDB-Flux,注明是 Flux 類型的數(shù)據(jù)源
InfluxDB Details 填寫 Organization、Token、Default Bucket 即可
不用填寫 HTTP 認(rèn)證
- 創(chuàng)建儀表盤,配置 eip 和 cen 的查詢語(yǔ)句· EIP 接收方向的流量
from(bucket: "example")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "eip")
|> filter(fn: (r) => r["_field"] == "eip_rx")
|> filter(fn: (r) => r["location"] == "cn-hangzhou")
|> aggregateWindow(every: v.windowPeriod, fn: last, createEmpty: false)
|> map(fn: (r) => ({ r with _value: r._value / 8 }))
|> yield(name: "last")
- EIP 發(fā)送方向的流量
from(bucket: "example")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "eip")
|> filter(fn: (r) => r["_field"] == "eip_tx")
|> filter(fn: (r) => r["location"] == "cn-hangzhou")
|> aggregateWindow(every: v.windowPeriod, fn: last, createEmpty: false)
|> map(fn: (r) => ({ r with _value: r._value / 8 }))
|> yield(name: "last")
- CEN 發(fā)送方向的流量
from(bucket: "example")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "AttachmentOutRate")
|> filter(fn: (r) => r["_field"] == "value")
|> filter(fn: (r) => r["location"] == "hangzhou")
|> aggregateWindow(every: v.windowPeriod, fn: last, createEmpty: false)
|> yield(name: "last")
- CEN 接收方向流量
from(bucket: "example")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "AttachmentInRate")
|> filter(fn: (r) => r["_field"] == "value")
|> filter(fn: (r) => r["location"] == "hangzhou")
|> aggregateWindow(every: v.windowPeriod, fn: last, createEmpty: false)
|> yield(name: "last")
- eip 和 cen 的數(shù)據(jù)單位都是 bit/sec(SI)
- 建議配置 Grafana 面板的 Thresholds
100M為 100000000,配置后會(huì)顯示一條紅線,可以更直觀的看到流量的占用情況
總結(jié)
通過(guò)本文的步驟,我們可以通過(guò)API獲取阿里云 EIP 和 CEN 的監(jiān)控?cái)?shù)據(jù),將其存儲(chǔ)到 InfluxDB,并通過(guò) Grafana 進(jìn)行實(shí)時(shí)監(jiān)控和可視化。這為我們提供了一種自動(dòng)化的方式來(lái)監(jiān)控和管理阿里云網(wǎng)絡(luò)資源。