使用DaemonSet實(shí)現(xiàn)heapdump文件自動(dòng)化管理
一、引言
1、為什么要獲取heapdump文件
heapdump文件是Java應(yīng)用遭遇OOM后的診斷報(bào)告,記錄了某一時(shí)刻 JVM 堆中對(duì)象的詳細(xì)使用情況,是 JVM 堆內(nèi)存的一個(gè)快照。通過(guò)分析 heapdump 文件,我們可以深入了解到內(nèi)存中究竟存在哪些對(duì)象,它們占用了多少內(nèi)存空間,以及對(duì)象之間的引用關(guān)系如何。這對(duì)于定位內(nèi)存泄漏問(wèn)題至關(guān)重要。
2、為什么使用DaemonSet實(shí)現(xiàn)
之前在SRE運(yùn)維筆記公眾號(hào)中看到一篇文章《運(yùn)維救星!一鍵開(kāi)啟k8s微服務(wù)OOM heapdump自動(dòng)化之旅》,其實(shí)現(xiàn)思路通過(guò)在應(yīng)用容器中增加dump腳本,然后通過(guò)java參數(shù)-XX:OnOutOfMemoryError配置腳本,它的作用是當(dāng)內(nèi)存溢出的時(shí)候,會(huì)調(diào)用這個(gè)參數(shù)配置的腳本做一些后續(xù)處理,比如文章中的dump腳本,也可以是重啟應(yīng)用的腳本等。
上述方法對(duì)應(yīng)用有一定的侵入性,另外,如果文件太大,會(huì)出現(xiàn)容器退出導(dǎo)致上傳失敗的情況。結(jié)合實(shí)際情況,準(zhǔn)備使用DaemonSet部署一個(gè)heapdump-watcher應(yīng)用,通過(guò)它來(lái)監(jiān)聽(tīng)heapdump.prof文件實(shí)現(xiàn)自動(dòng)化管理。
Tips:該方法僅適合將heapdump.prof持久化到K8s節(jié)點(diǎn)的場(chǎng)景。但是具有一定的參考意義。
3、實(shí)施前提
該方案需要以下前提:
- heapdump.prof文件持久化到K8s節(jié)點(diǎn)。
- 持久化的目錄具備相同規(guī)則,比如:/mnt/logs/<APP_NAME>/logs/heapdump.prof,如果需要避免沖突,目錄可以改造成/mnt/logs/<APP_NAME>/logs/<POD_NAME>heapdump.prof。
- 具備阿里云OSS操作權(quán)限。
- 具備一個(gè)可用的企業(yè)微信機(jī)器人。
二、整體思路
圖片
OOM事件觸發(fā)通過(guò)Java啟動(dòng)參數(shù)配置,增加-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/mnt/logs/heapdump.hprof,當(dāng)應(yīng)用觸發(fā)OOM,則會(huì)在/mnt/logs目錄下自動(dòng)生成heapdump.prof文件。
我們通過(guò)fsnotify來(lái)監(jiān)聽(tīng)文件的變化,當(dāng)heapdump.prof生成完后,fsnotify就會(huì)迅速捕捉到這個(gè)事件,我們通過(guò)阿里云OSS的SDK實(shí)現(xiàn)文件上傳,將heapdump.prof文件壓縮后上傳到阿里云OSS。為了節(jié)約節(jié)點(diǎn)磁盤(pán)空間,當(dāng)heapdump.prof文件上傳完成后清理本地文件。
為了讓相關(guān)開(kāi)發(fā)人員了解到新的heapdump.prof文件已經(jīng)生成,我們通過(guò)企業(yè)微信機(jī)器人通知到對(duì)應(yīng)的開(kāi)發(fā)群。
三、具體實(shí)現(xiàn)
(1)初始化部分
func init() {
// 獲取環(huán)境
env = getEnv("ENV", "prod")
var err error
watcher, err = fsnotify.NewWatcher()
if err != nil {
log.Fatalf("Failed to create fsnotify watcher: %v", err)
}
// 加載配置文件
config, err = loadConfig(configPath)
if err != nil {
log.Fatalf("Failed to load config: %v", err)
}
// 初始化OSS客戶(hù)端
ossClient, err := oss.New(config.OSS.Endpoint, config.OSS.AccessID, config.OSS.AccessKey)
if err != nil {
log.Fatalf("Failed to create OSS client: %v", err)
}
client, _ = ossClient.Bucket(config.OSS.Bucket)
if config.WatchPods {
// 初始化Kubernetes客戶(hù)端
kubeClient, err = createKubeClient()
if err != nil {
log.Fatalf("Failed to create Kubernetes client: %v", err)
}
// 獲取當(dāng)前節(jié)點(diǎn)的IP
nodeIP, err = getNodeIP()
if err != nil {
log.Fatalf("Failed to get node IP: %v", err)
}
}
// 初始化信號(hào)通道
signalChan = make(chan os.Signal, 1)
stopChan = make(chan struct{})
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
}
在這段初始化代碼中,首先通過(guò)getEnv函數(shù)獲取環(huán)境變量ENV的值,如果未設(shè)置則默認(rèn)為prod。接著創(chuàng)建一個(gè)fsnotify.Watcher,用于監(jiān)聽(tīng)文件系統(tǒng)的變化。然后從指定路徑configPath加載配置文件,配置文件中包含了 OSS、企業(yè)微信 Webhook 以及白名單等相關(guān)配置信息。
隨后,利用配置信息初始化阿里云 OSS 客戶(hù)端,通過(guò)提供的Endpoint、AccessID和AccessKey創(chuàng)建ossClient,并獲取指定的Bucket,以便后續(xù)進(jìn)行文件上傳操作。
如果配置中WatchPods字段為true,表示會(huì)監(jiān)聽(tīng)Pod的變化(因?yàn)镻od會(huì)重建,如果日志目錄包含POD_NAME,重建后就不應(yīng)該再監(jiān)聽(tīng)原來(lái)Pod目錄),則會(huì)初始化 Kubernetes 客戶(hù)端。通過(guò)createKubeClient函數(shù)創(chuàng)建kubeClient,用于與 Kubernetes 集群進(jìn)行交互。還會(huì)獲取當(dāng)前節(jié)點(diǎn)的 IP 地址,以便后續(xù)監(jiān)聽(tīng)該節(jié)點(diǎn)上的 Pod 變化。
最后,初始化兩個(gè)通道signalChan和stopChan。signalChan用于接收操作系統(tǒng)發(fā)送的信號(hào),如SIGINT(中斷信號(hào),通常由用戶(hù)按下 Ctrl+C 觸發(fā))和SIGTERM(終止信號(hào),用于正常終止進(jìn)程),以便程序能夠在接收到這些信號(hào)時(shí)進(jìn)行優(yōu)雅退出;stopChan則用于停止 Informer,當(dāng)程序接收到終止信號(hào)時(shí),通過(guò)關(guān)閉stopChan來(lái)通知 Informer 停止工作。
(2)文件監(jiān)聽(tīng)
func watchFiles() {
for {
select {
case event, ok := <-watcher.Events:
if !ok {
return
}
if event.Op&fsnotify.Create == fsnotify.Create {
// 檢測(cè)到新文件創(chuàng)建
if strings.HasSuffix(event.Name, "heapdump.prof") {
log.Printf("New heapdump file detected: %s", event.Name)
// 等待文件寫(xiě)入完成
if err := waitForFileCompletion(event.Name); err != nil {
log.Printf("Failed to wait for file completion: %v", err)
continue
}
// 上傳文件到OSS
appName := filepath.Base(filepath.Dir(filepath.Dir(event.Name)))
err := uploadFileToOSS(event.Name, appName)
if err != nil {
log.Printf("Failed to upload file to OSS: %v", err)
} else {
log.Printf("File uploaded to OSS successfully: %s", event.Name)
// 發(fā)送企業(yè)微信告警通知
err = sendWechatAlert(appName)
if err != nil {
log.Printf("Failed to send WeChat alert: %v", err)
}
}
}
}
case err, ok := <-watcher.Errors:
if !ok {
return
}
log.Printf("Error: %v", err)
}
}
}
watchFiles函數(shù)是實(shí)現(xiàn)文件監(jiān)聽(tīng)的核心部分。它通過(guò)一個(gè)無(wú)限循環(huán)for { }和select語(yǔ)句來(lái)監(jiān)聽(tīng)watcher.Events通道和watcher.Errors通道。
當(dāng)watcher.Events通道有事件發(fā)生時(shí),會(huì)檢查事件類(lèi)型是否為文件創(chuàng)建(event.Op&fsnotify.Create == fsnotify.Create)。如果是新文件創(chuàng)建,且文件后綴為heapdump.prof,則表示檢測(cè)到了新的 heapdump 文件。
此時(shí),會(huì)調(diào)用waitForFileCompletion函數(shù)等待文件寫(xiě)入完成。該函數(shù)通過(guò)不斷檢查文件大小是否變化來(lái)判斷文件是否寫(xiě)入完成,設(shè)置了最大檢查時(shí)長(zhǎng)為 30 秒,檢查間隔為 2 秒。如果文件在規(guī)定時(shí)間內(nèi)大小不再變化,則認(rèn)為文件寫(xiě)入完成;否則,返回錯(cuò)誤并繼續(xù)監(jiān)聽(tīng)下一個(gè)事件。
文件寫(xiě)入完成后,獲取文件所在目錄的應(yīng)用名稱(chēng),然后調(diào)用uploadFileToOSS函數(shù)將文件上傳到 OSS。上傳成功后,會(huì)調(diào)用sendWechatAlert函數(shù)發(fā)送企業(yè)微信告警通知,告知相關(guān)人員新的 heapdump 文件已生成并上傳。
(3)Pod狀態(tài)監(jiān)聽(tīng)
該方法主要是針對(duì)heapdump.prof所存放的目錄有POD_NAME變量,希望實(shí)現(xiàn)的是當(dāng)原Pod銷(xiāo)毀會(huì)取消監(jiān)聽(tīng)原Pod目錄,當(dāng)新Pod創(chuàng)建會(huì)監(jiān)聽(tīng)新Pod目錄。
func watchPods() {
// 獲取當(dāng)前節(jié)點(diǎn)上的Pod列表
for _, appName := range config.Whitelist {
pods, err := kubeClient.CoreV1().Pods(metav1.NamespaceAll).List(context.TODO(), metav1.ListOptions{
LabelSelector: fmt.Sprintf("app=%s", appName),
FieldSelector: fmt.Sprintf("spec.nodeName=%s", nodeIP),
})
if err != nil {
log.Printf("Failed to list pods for app %s: %v", appName, err)
continue
}
for _, pod := range pods.Items {
addPodWatch(appName, pod.Name)
}
}
// 監(jiān)聽(tīng)Pod變化
_, controller := cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.FieldSelector = fmt.Sprintf("spec.nodeName=%s", nodeIP)
return kubeClient.CoreV1().Pods(metav1.NamespaceAll).List(context.TODO(), options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.FieldSelector = fmt.Sprintf("spec.nodeName=%s", nodeIP)
return kubeClient.CoreV1().Pods(metav1.NamespaceAll).Watch(context.TODO(), options)
},
},
&corev1.Pod{},
0,
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pod := obj.(*corev1.Pod)
appName := pod.Labels["app"]
if isWhitelisted(appName) {
log.Printf("Pod added: %s/%s", pod.Namespace, pod.Name)
addPodWatch(appName, pod.Name)
}
},
DeleteFunc: func(obj interface{}) {
pod := obj.(*corev1.Pod)
appName := pod.Labels["app"]
if isWhitelisted(appName) {
log.Printf("Pod deleted: %s/%s", pod.Namespace, pod.Name)
removePodWatch(appName, pod.Name)
}
},
},
)
controller.Run(stopChan) // 使用 stopChan 來(lái)停止 Informer}
watchPods函數(shù)負(fù)責(zé)監(jiān)聽(tīng) Pod 的變化。首先,遍歷配置中的白名單應(yīng)用名稱(chēng),通過(guò) Kubernetes 客戶(hù)端kubeClient獲取當(dāng)前節(jié)點(diǎn)上屬于這些應(yīng)用的 Pod 列表。使用LabelSelector來(lái)篩選出特定應(yīng)用的 Pod,F(xiàn)ieldSelector來(lái)指定只獲取當(dāng)前節(jié)點(diǎn)上的 Pod。
對(duì)于獲取到的每個(gè) Pod,調(diào)用addPodWatch函數(shù)為其添加文件監(jiān)聽(tīng)。addPodWatch函數(shù)會(huì)根據(jù)應(yīng)用名稱(chēng)和 Pod 名稱(chēng)構(gòu)建日志目錄路徑,并使用watcher.Add方法將該目錄添加到文件監(jiān)聽(tīng)列表中,以便后續(xù)能及時(shí)監(jiān)聽(tīng)到該 Pod 生成的 heapdump 文件。
然后,通過(guò)cache.NewInformer創(chuàng)建一個(gè) Informer,用于監(jiān)聽(tīng) Pod 的變化。Informer是 Kubernetes 客戶(hù)端中的一個(gè)重要組件,它通過(guò)ListWatch機(jī)制定期從 Kubernetes API Server 獲取 Pod 列表,并監(jiān)聽(tīng) Pod 的變化事件。
ListFunc和WatchFunc分別定義了獲取 Pod 列表和監(jiān)聽(tīng) Pod 變化的方法,都通過(guò)kubeClient.CoreV1().Pods(metav1.NamespaceAll)來(lái)操作所有命名空間下的 Pod,并根據(jù)當(dāng)前節(jié)點(diǎn) IP 進(jìn)行篩選。
ResourceEventHandlerFuncs定義了 Informer 在接收到 Pod 添加和刪除事件時(shí)的處理邏輯。當(dāng)有新 Pod 添加時(shí),如果該 Pod 的應(yīng)用名稱(chēng)在白名單中,會(huì)調(diào)用addPodWatch函數(shù)為其添加文件監(jiān)聽(tīng);當(dāng)有 Pod 被刪除時(shí),如果應(yīng)用名稱(chēng)在白名單中,會(huì)調(diào)用removePodWatch函數(shù)移除對(duì)該 Pod 的文件監(jiān)聽(tīng)。
最后,啟動(dòng) Informer 并傳入stopChan,當(dāng)stopChan被關(guān)閉時(shí),Informer 會(huì)停止運(yùn)行,實(shí)現(xiàn)了優(yōu)雅停止的功能。
(4)文件上傳
func uploadFileToOSS(filePath string, appName string) error {
file, err := os.Open(filePath)
if err != nil {
return err
}
defer file.Close()
// 創(chuàng)建臨時(shí)文件用于存儲(chǔ)壓縮后的文件
tempFile, err := os.CreateTemp("", "heapdump-*.zip")
if err != nil {
return err
}
defer tempFile.Close()
defer os.Remove(tempFile.Name()) // 刪除臨時(shí)文件
// 創(chuàng)建 zip.Writer
zipWriter := zip.NewWriter(tempFile)
defer zipWriter.Close()
// 添加文件到 zip
zipFileWriter, err := zipWriter.Create(filepath.Base(filePath))
if err != nil {
return err
}
_, err = io.Copy(zipFileWriter, file)
if err != nil {
return err
}
// 確保 zip 文件寫(xiě)入完成
err = zipWriter.Close()
if err != nil {
return err
}
// 重新打開(kāi)臨時(shí)文件用于上傳
tempFile.Seek(0, 0)
tempFileReader := io.Reader(tempFile)
// 構(gòu)建上傳路徑
timestamp := time.Now().Format("20060102150405")
objectName := fmt.Sprintf("heapdump/%s/heapdump_%s.zip", appName, timestamp)
// 設(shè)置文件元數(shù)據(jù)
expires := time.Now().Add(24 * time.Hour) // 設(shè)置過(guò)期時(shí)間為24小時(shí)后
options := []oss.Option{
oss.Expires(expires),
}
err = client.PutObject(objectName, tempFileReader, options...)
if err != nil {
return err
}
// 生成預(yù)簽名URL
ossURL, err = client.SignURL(objectName, oss.HTTPGet, expires.Unix()-time.Now().Unix())
if err != nil {
log.Fatalf("Failed to generate presigned URL: %v", err)
}
// 文件上傳成功后,刪除本地文件
log.Printf("Deleting local file: %s", filePath)
if err := os.Remove(filePath); err != nil {
log.Printf("Failed to delete local file: %v", err)
}
return nil
}
這一步先將heapdump.prof進(jìn)行zip壓縮,然后再將其上傳到OSS,上傳成功后刪除本地文件。
(5)發(fā)送通知
func sendWechatAlert(appName string) error {
// 構(gòu)建 Markdown 格式的消息
markdownContent := fmt.Sprintf(`# JAVA OOM DUMP 文件生成
> 應(yīng)用:%s
> 環(huán)境:%s
> 文件:[下載地址](%s)
> *Tips*: 文件只保留1天,請(qǐng)及時(shí)下載`, appName, env, ossURL)
payload := map[string]interface{}{
"msgtype": "markdown",
"markdown": map[string]string{
"content": markdownContent,
},
}
_, body, errs := gorequest.New().Post(config.Wechat.WebhookURL).Send(payload).End()
if errs != nil {
return fmt.Errorf("failed to send WeChat alert: %v", errs)
}
log.Printf("WeChat alert response: %s", body)
return nil
}
該步驟將產(chǎn)生heapdump的信息發(fā)送到對(duì)應(yīng)的告警群。
四、部署驗(yàn)證
(1)制作鏡像
將應(yīng)用打包成Docker鏡像。
FROM golang:1.21-alpine AS builder
WORKDIR /app
COPY go.mod go.sum ./
RUN go mod download
COPY . .
RUN CGO_ENABLED=0 GOOS=linux go build -o /heapdump-watcher
FROM alpine:3.18
RUN apk add --no-cache ca-certificates
WORKDIR /app
COPY --from=builder /heapdump-watcher ./heapdump-watcher
CMD ["/heapdump-watcher"]
(2)在K8s中部署應(yīng)用
apiVersion: v1
kind: ServiceAccount
metadata:
name: heapdump-watcher
namespace: default
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
namespace: default
name: heapdump-watcher-role
rules:
- apiGroups: [""]
resources: ["pods"]
verbs: ["get", "list", "watch"]
---
apiVersion: v1
kind: ConfigMap
metadata:
name: heapdump-config
namespace: default
data:
config.yaml: |
oss:
endpoint: your-oss-endpoint
bucket: your-oss-bucket
accessID: your-oss-access-id
accessKey: your-oss-access-key
wechat:
webhookURL: your-wechat-webhook-url
whitelist:
- app1
- app2
- app3
watchPods: false # 控制是否監(jiān)聽(tīng) Pod 變化
---
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: heapdump-watcher
namespace: default
spec:
selector:
matchLabels:
app: heapdump-watcher
template:
metadata:
labels:
app: heapdump-watcher
spec:
serviceAccountName: heapdump-watcher
containers:
- name: heapdump-watcher
image: your-docker-image:latest
volumeMounts:
- name: logs
mountPath: /mnt/logs
readOnly: false
- name: config
mountPath: /app/config.yaml
subPath: config.yaml
readOnly: true
env:
- name: NODE_NAME
valueFrom:
fieldRef:
fieldPath: spec.nodeName
- name: ENV
value: prod
volumes:
- name: logs
hostPath:
path: /mnt/logs
type: Directory
- name: config
configMap:
name: heapdump-config
items:
- key: config.yaml
path: config.yaml
(3)驗(yàn)證
當(dāng)應(yīng)用產(chǎn)生告警后會(huì)通知到對(duì)應(yīng)的企業(yè)微信,如下:
圖片
五、最后
當(dāng)前功能已經(jīng)初步實(shí)現(xiàn),但仍有許多可以?xún)?yōu)化和擴(kuò)展的方向??梢钥紤]擴(kuò)展支持更多類(lèi)型的云存儲(chǔ),如騰訊云 COS、AWS S3 等,以滿(mǎn)足不同用戶(hù)的需求。這樣一來(lái),用戶(hù)可以根據(jù)自己的實(shí)際情況和偏好,選擇最適合自己的云存儲(chǔ)服務(wù),提高方案的通用性和靈活性。
另外在通知內(nèi)容和方式上,可以進(jìn)一步豐富通知內(nèi)容,不僅包含應(yīng)用名稱(chēng)、環(huán)境和文件下載鏈接,還可以增加更多關(guān)于內(nèi)存問(wèn)題的詳細(xì)信息,如內(nèi)存使用峰值、OOM 發(fā)生的時(shí)間點(diǎn)等。在通知方式上,可以增加對(duì)其他通信工具的支持,如釘釘、飛書(shū)等,讓用戶(hù)能夠根據(jù)自己團(tuán)隊(duì)的使用習(xí)慣選擇合適的通知方式,確保通知能夠及時(shí)、準(zhǔn)確地傳達(dá)給相關(guān)人員。
還可以引入更智能的分析功能,在上傳 heapdump 文件后,自動(dòng)對(duì)文件進(jìn)行初步分析,提取關(guān)鍵信息,如內(nèi)存泄漏的疑似對(duì)象、內(nèi)存占用過(guò)高的類(lèi)等,并將分析結(jié)果一并通知給相關(guān)人員。這樣可以幫助開(kāi)發(fā)人員更快地定位問(wèn)題,提高問(wèn)題解決的效率,為 Java 應(yīng)用的穩(wěn)定運(yùn)行提供更強(qiáng)大的支持。