用 Python 分析微信群聊記錄,是怎樣一種體驗?
1. 場景
前幾天,有一位小伙伴在后臺給我留言,說自己有幾十個微信群,自己精力有限,沒法看過來,想要篩選一些高質(zhì)量的群,讓我是否能幫忙想想辦法。
其實,微信群里的所有聊天記錄都在手機本地文件夾內(nèi),只需要導(dǎo)出來進行解密,然后來一波數(shù)據(jù)分析,就可以幫他篩選出高質(zhì)量的社群。
本篇文章將帶大家用 Python 一步步來實現(xiàn)這個功能。
2. 實現(xiàn)步驟
第 1 步,導(dǎo)出微信聊天記錄數(shù)據(jù)庫
首先,我們使用一部 Root 后的手機或者模擬器登錄微信,找到微信聊天記錄數(shù)據(jù)庫,然后導(dǎo)出到本地。
數(shù)據(jù)庫文件的完整路徑如下:
- # 微信聊天記錄數(shù)據(jù)庫完整路徑
- /data/data/com.tencent.mm/MicroMsg/[當(dāng)前登錄微信的隨機字符串]/EnMicroMsg.db
需要注意的是,如果當(dāng)前設(shè)備沒有 Root,可以選擇群聊消息進行一次遷移,然后從 Root 設(shè)備或模擬器中導(dǎo)出數(shù)據(jù)庫。
第 2 步,獲取數(shù)據(jù)庫的密碼
微信數(shù)據(jù)庫的密碼組成形式為:手機 IMEI + 微信 UIN,然后 md5 加密(32 位小寫)的前 7 個數(shù)字。
其中,手機的 IMEI 可以通過 *#06# 獲取,如果是雙卡手機,需要自己做一下判斷。
微信的 UIN 在下面配置文件中,找到 name 屬性為 default_uin 的 value 值,即為 UIN
- # 當(dāng)前登錄微信的配置文件
- /data/data/com.tencent.mm/shared_prefs/system_config_prefs.xml
最后,然后將 IMET 和 UIN 組成字符串,然后利用 MD5 進行加密,取 32 位小寫的前 7 位即為微信數(shù)據(jù)庫的密碼。
第 3 步,破解數(shù)據(jù)庫
由于微信數(shù)據(jù)庫是使用 SQLCipher 生成,所以要先安裝 sqlcipher 命令行文件
- # 安裝sqlcipher命令行(Mac)
- brew install sqlcipher
- # Win可以去下載sqlcipher命令行文件
然后,輸入數(shù)據(jù)庫的密碼及解密方式等,導(dǎo)出破解后的數(shù)據(jù)庫。
第 4 步,分析數(shù)據(jù)庫
推薦使用 SQLiteSutdio 打開并分析上面破解后的數(shù)據(jù)庫,重點查看 message、rcontact、chatroom 這 3 張表。
微信所有的文字聊天記錄都存放在 mesage 數(shù)據(jù)表中,包含:聊天內(nèi)容、發(fā)送者、消息類型、創(chuàng)建時間等
rcontact 為微信通訊錄表,包含:微信 ID、昵稱、備注名等
chatroom 是群聊信息表,包含:群聊 ID、成員列表等
第 5 步,Python 打開數(shù)據(jù)庫并封裝
使用 sqlite3 連接本地數(shù)據(jù)庫文件,獲取數(shù)據(jù)庫對象和游標(biāo)對象
- import sqlite3
- def __init__(self, db_path="./weixin.db"):
- """
- 本地數(shù)據(jù)庫初始化
- """
- self.db = sqlite3.connect(db_path)
- self.cursor = self.db.cursor()
接著,對數(shù)據(jù)庫常用的操作,包含:增刪改查,進行封裝操作。
- def execute(self, sql, param=None):
- """
- sql: Sql語句,包含:增、刪、改
- param:數(shù)據(jù),可以為列表、字典,也可以為空
- """
- try:
- if param is None:
- self.cursor.execute(sql)
- else:
- if type(param) is list:
- self.cursor.executemany(sql, param)
- else:
- self.cursor.execute(sql, param)
- count = self.db.total_changes
- self.db.commit()
- except Exception as e:
- print(e)
- return False, e
- # 返回結(jié)果
- return True if count > 0 else False
- def query(self, sql, param=None):
- """
- 查詢語句
- sql:Sql語句
- param:參數(shù),可以包含空
- retutn:成功返回True
- """
- if param is None:
- self.cursor.execute(sql)
- else:
- self.cursor.execute(sql, param)
- # 返回查詢的結(jié)果
- return self.cursor.fetchall()
第 6 步,通過群聊名稱獲取群聊 ID
根據(jù)群聊昵稱,使用 Sql 語句查詢 rcontact 表,可以獲取群聊的 ID 值
- def __get_chartroom_id(self):
- """
- 獲取群聊的id
- :return:
- """
- res = self.db.query('select username from rcontact where nickname=?;', (self.chatroom_name,))
- # 群聊id
- chatroom_id = res[0][0]
- return chatroom_id
第 7 步,獲取群聊消息
擁有群聊 ID 之后,緊接著查詢 message 表,獲取當(dāng)前群聊的所有消息內(nèi)容。
- # message表:聊天記錄表
- # isSend=0:對方發(fā)送的;isSend=1:自己發(fā)送的
- sql = "SELECT content FROM message WHERE talker='{}' and isSend=0".format(chatroom_id)
- # 查詢表,獲取所有的聊天記錄
- result = self.db.query(sql)
為了獲取有效的消息內(nèi)容,可以清洗掉自己發(fā)送的消息、系統(tǒng)消息、紅包消息等內(nèi)容
- # 循環(huán)查詢到的所有的消息
- for item in result:
- # 過濾數(shù)據(jù)
- if not item or not item[0] or item[0].find('xml') != -1 or item[0].find('sysmsg') != -1 or item[0].find(
- '<msg>') != -1 or item[0].find('chatroom') != -1 or item[0].find('weixinhongbao') != -1:
- continue
- # 過濾掉自己發(fā)送的內(nèi)容,不包含:
- temps = item[0].split(':')
- if len(temps) < 2:
- # print('自己發(fā)送的內(nèi)容:' + item[0])
- continue
- # 每一條聊天記錄,過濾掉發(fā)送者,只保留消息正文
- # 發(fā)送者
- send_from = item[0].split(':')[0]
- # 發(fā)送內(nèi)容
- send_msg = "".join(item[0].split(':')[1:]).strip().replace("\"", "")
- # 過長的消息,也過濾掉
- if len(send_msg) > 200:
- continue
對于群其他成員發(fā)送的內(nèi)容,再過濾掉消息內(nèi)容的前半部分,只保留消息正文
第 8 步,生成詞云
使用 jieba 對群內(nèi)有效的消息進行分詞,然后使用 wordcloud 生成詞云圖。
- def generate_wordcloud(self, word):
- """
- 生成詞云
- :param word:
- :return:
- """
- img = WordCloud(font_path="./DroidSansFallbackFull.ttf", width=2000, height=2000,
- margin=2, collocations=False).generate(word)
- plt.imshow(img)
- plt.axis("off")
- plt.show()
- # 保存圖片
- img.to_file("{}.png".format("群聊"))
- # 分詞
- temp = " ".join(jieba.cut(words, cut_all=True))
- # 生成詞云
- generate_wordcloud(temp)
第 9 步,新建排名表,插入數(shù)據(jù)
為了統(tǒng)計群聊活躍度排名,我們需要新建一張表,包含:id、微信昵稱、消息內(nèi)容 3 個字段。
- def __create_top_table(self):
- """
- 創(chuàng)建Top表
- :return:
- """
- # 創(chuàng)建Top表,如果存在就不重新創(chuàng)建
- result = self.db.execute(
- "CREATE TABLE IF NOT EXISTS top(uid integer primary key,name varchar(200),msg varchar(200))")
接著,將上一步的每一條消息中的發(fā)送者 ID、發(fā)送內(nèi)容 2 個字段插入到新建的 Top 表內(nèi)
- # 定義一個列表,加入所有要統(tǒng)計的數(shù)據(jù)
- msg_pre = []
- for item in result:
- # 發(fā)送者
- send_from = item[0].split(':')[0]
- # 發(fā)送內(nèi)容
- send_msg = "".join(item[0].split(':')[1:]).strip().replace("\"", "")
- msg_pre.append((send_from, send_msg))
- # 把要統(tǒng)計的數(shù)據(jù),插入到top表中
- self.db.execute("insert into top(uid,name,msg) values (NULL,?,?);", msg_pre)
第 10 步,獲取活躍度排名并可視化
從 Top 數(shù)據(jù)表中,通過微信昵稱查詢出每一位成員發(fā)言的次數(shù),并保存到一個列表中
- def get_top_partner(self):
- """
- 排名前15的成員
- :return:
- """
- sql = "SELECT name as 姓名,COUNT(*) as times FROM top GROUP BY name ORDER BY times DESC limit %d;" % self.top_num
- result = self.db.query(sql)
- for item in result:
- # 用戶id
- id = item[0]
- # 發(fā)言次數(shù)
- count = item[1]
- # 獲取用戶的昵稱,即:微信昵稱
- username = self.get_username(id)
- self.top_data.append({
- 'username': username,
- 'count': count
- })
最后,去除微信昵稱的特殊符號,使用 pyecharts 將數(shù)據(jù)可視化。
- def draw_image(self):
- """
- 數(shù)據(jù)可視化
- :return:
- """
- usernames = []
- counts = []
- for user in self.top_data:
- # 去除昵稱中的特殊符號
- usernames.append(get_ava_string(user.get('username').strip())[0:8])
- counts.append(user.get('count'))
- def bar_chart() -> Bar:
- c = (
- Bar()
- .add_xaxis(usernames)
- .add_yaxis("活躍度", counts)
- .reversal_axis()
- .set_series_opts(label_opts=opts.LabelOpts(position="right"))
- .set_global_opts(title_opts=opts.TitleOpts(title="最活躍的%d個小伙伴" % self.top_num))
- )
- return c
- # 需要安裝 snapshot-selenium 或者 snapshot-phantomjs
- make_snapshot(driver, bar_chart().render(), "bar.png")
3. 最后
上面的操作,通過生成的詞云了解到當(dāng)前群聊過去一段時間都在聊的話題及價值,通過對聊天記錄的數(shù)據(jù)分析,獲取到微信群聊活躍度排名。
當(dāng)然,也可以分析群成員潛水排名及某一位群成員的數(shù)據(jù)分析。