如何使用Python將MySQL表數(shù)據(jù)遷移到MongoDB集合
譯文【51CTO.com快譯】介紹
MySQL 是一個(gè) RDBMS 平臺(tái),它以規(guī)范化的方式以表格格式存儲(chǔ)數(shù)據(jù),而 MongoDB 是一個(gè) NoSQL 數(shù)據(jù)庫(kù),它以無(wú)模式的方式將信息存儲(chǔ)為按集合分組的文檔。數(shù)據(jù)的表示方式完全不同,因此將 MySQL 表數(shù)據(jù)遷移到 MongoDB 集合聽(tīng)起來(lái)可能是一項(xiàng)艱巨的任務(wù)。但是,Python 憑借其強(qiáng)大的連接性和數(shù)據(jù)處理能力讓這一切變得輕而易舉。
在本文中,將詳細(xì)的講解如何使用簡(jiǎn)單的 Python 腳本將 MySQL 表數(shù)據(jù)遷移到 MongoDB 集合所需的步驟。這些腳本是在 Windows 上使用 Python 3.9.5 開(kāi)發(fā)的。但是,它應(yīng)該適用于任何平臺(tái)上的任何 Python 3+ 版本一起使用。
步驟 1:安裝所需的模塊
第一步是安裝連接MySQL 和 MongoDB 數(shù)據(jù)庫(kù)實(shí)例所需的模塊。我們將使用mysql.connector連接到 MySQL 數(shù)據(jù)庫(kù)。對(duì)于 MongoDB,使用pymongo,這是從 Python 連接到 MongoDB 的推薦模塊。
如果所需模塊尚未安裝,請(qǐng)運(yùn)行以下PIP命令來(lái)安裝它們。
- pip 安裝 mysql 連接器 pip 安裝 pymongo
PIP 是 Python 包或模塊的包管理器。
步驟2:從MySQL表中讀取數(shù)據(jù)
第一步是從源 MySQL 表中讀取數(shù)據(jù),并以可用于將數(shù)據(jù)加載到目標(biāo) MongoDB 數(shù)據(jù)庫(kù)中的格式進(jìn)行準(zhǔn)備。MongoDB 是一個(gè) NoSQL 數(shù)據(jù)庫(kù),它將數(shù)據(jù)存儲(chǔ)為 JSON 文檔,因此最好以 JSON 格式生成源數(shù)據(jù)。值得一提的是,Python 具有強(qiáng)大的數(shù)據(jù)處理能力,可以輕松地將數(shù)據(jù)轉(zhuǎn)換為 JSON 格式。
- import mysql.connector
- mysqldb = mysql.connector.connect( host="localhost", database="employees", user="root", password="" )
- mycursor = mysqldb.cursor(dictionary=True) mycursor.execute("SELECT * from categories;") myresult = mycursor.fetchall()
- print(myresult)
當(dāng)腳本在沒(méi)有任何錯(cuò)誤的情況下完成時(shí),輸出的結(jié)果如下:
- [
- {
- "id":4,
- "name":"Medicine",
- "description":"<p>Medicine<br></p>",
- "created_at":"",
- "updated_at":""
- },
- {
- "id":6,
- "name":"Food",
- "description":"<p>Food</p>",
- "created_at":"",
- "updated_at":""
- },
- {
- "id":8,
- "name":"Groceries",
- "description":"<p>Groceries<br></p>",
- "created_at":"",
- "updated_at":""
- },
- {
- "id":9,
- "name":"Cakes & Bakes",
- "description":"<p>Cakes & Bakes<br></p>",
- "created_at":d"",
- "updated_at":""
- }
- ]
請(qǐng)注意,輸出的是一個(gè) JSON 數(shù)組,因?yàn)槲覀儗?span style="background-color: rgb(153, 153, 153);">dictionary=True參數(shù)傳遞給了游標(biāo)。否則,結(jié)果將采用列表格式?,F(xiàn)在有了 JSON 格式的源數(shù)據(jù),就可以遷移到 MongoDB 集合。
步驟3:寫(xiě)入 MongoDB 集合
獲得 JSON 格式的源數(shù)據(jù)后,下一步是將數(shù)據(jù)插入到 MongoDB 集合中。集合是一組文檔,相當(dāng)于 RDBMS 中表(或關(guān)系)。我可以通過(guò)調(diào)用insert_many()集合類的方法來(lái)實(shí)現(xiàn),該方法返回插入文檔的對(duì)象 ID 列表。請(qǐng)注意,當(dāng)作為參數(shù)傳遞空列表時(shí),此方法將引發(fā)異常,因此在方法調(diào)用之前進(jìn)行長(zhǎng)度檢查。
- import pymongo
- mongodb_host = "mongodb://localhost:27017/"
- mongodb_dbname = "mymongodb"
- myclient = pymongo.MongoClient(mongodb_host)
- mydb = myclient[mongodb_dbname]
- mycol = mydb["categories"]
- if len(myresult) > 0:
- x = mycol.insert_many(myresult) #myresult comes from mysql cursor
- print(len(x.inserted_ids))
完成此步驟后,檢查一下 MongoDB 實(shí)例,以驗(yàn)證數(shù)據(jù)庫(kù)和集合是否已創(chuàng)建,文檔是否已插入。注意MongoDB 是無(wú)模式的,這就意味著不必定義模式來(lái)插入文檔,模式是動(dòng)態(tài)推斷并自動(dòng)創(chuàng)建的。MongoDB 還可以創(chuàng)建代碼中引用的數(shù)據(jù)庫(kù)和集合(如果它們還不存在的話)。
步驟4:把數(shù)據(jù)放在一起
下面是從 MySQL 中讀取表并將其插入到 MongoDB 中集合的完整腳本。
- import mysql.connector
- import pymongo
- delete_existing_documents = True
- mysql_host="localhost"
- mysql_database="mydatabase"
- mysql_schema = "myschema"
- mysql_user="myuser"
- mysql_password="********"
- mongodb_host = "mongodb://localhost:27017/"
- mongodb_dbname = "mymongodb"
- mysqldb = mysql.connector.connect(
- host=mysql_host,
- database=mysql_database,
- user=mysql_user,
- password=mysql_password
- )
- mycursor = mysqldb.cursor(dictionary=True)
- mycursor.execute("SELECT * from categories;")
- myresult = mycursor.fetchall()
- myclient = pymongo.MongoClient(mongodb_host)
- mydb = myclient[mongodb_dbname]
- mycol = mydb["categories"]
- if len(myresult) > 0:
- x = mycol.insert_many(myresult) #myresult comes from mysql cursor
- print(len(x.inserted_ids))
步驟 5:增強(qiáng)腳本以加載 MySQL 架構(gòu)中的所有表
該腳本從 MySQL 中讀取一個(gè)表,并將結(jié)果加載到 MongoDB 集合中。然后,下一步是遍歷源數(shù)據(jù)庫(kù)中所有表的列表,并將結(jié)果加載到新的 MySQL 集合中。我們可以通過(guò)查詢information_schema.tables元數(shù)據(jù)表來(lái)實(shí)現(xiàn)這一點(diǎn),該表提供給定模式中的表列表。然后可以遍歷結(jié)果并調(diào)用上面的腳本來(lái)遷移每個(gè)表的數(shù)據(jù)。
- #Iterate through the list of tables in the schema
- table_list_cursor = mysqldb.cursor()
- table_list_cursor.execute("SELECT table_name FROM information_schema.tables WHERE table_schema = %s ORDER BY table_name;", (mysql_schema,))
- tables = table_list_cursor.fetchall()
- for table in tables:
- #Execute the migration script for 'table'
也可以通過(guò)將遷移邏輯抽象為一個(gè)函數(shù)來(lái)實(shí)現(xiàn)這一點(diǎn)。
- #Function migrate_table
- def migrate_table(db, col_name):
- mycursor = db.cursor(dictionary=True)
- mycursor.execute("SELECT * FROM " + col_name + ";")
- myresult = mycursor.fetchall()
- mycol = mydb[col_name]
- if delete_existing_documents:
- #delete all documents in the collection
- mycol.delete_many({})
- #insert the documents
- if len(myresult) > 0:
- x = mycol.insert_many(myresult)
- return len(x.inserted_ids)
- else:
- return 0
步驟6:輸出腳本進(jìn)度并使其可讀
腳本的進(jìn)度是通過(guò)使用print 語(yǔ)句來(lái)傳達(dá)的。通過(guò)顏色編碼使輸出易于閱讀。例如,以將成功語(yǔ)句打印為綠色,將失敗語(yǔ)句打印為紅色。
- class bcolors:
- HEADER = '\033[95m'
- OKBLUE = '\033[94m'
- OKCYAN = '\033[96m'
- OKGREEN = '\033[92m'
- WARNING = '\033[93m'
- FAIL = '\033[91m'
- ENDC = '\033[0m'
- BOLD = '\033[1m'
- UNDERLINE = '\033[4m'
- print(f"{bcolors.HEADER}This is a header{bcolors.ENDC}")
- print(f"{bcolors.OKBLUE}This prints in blue{bcolors.ENDC}")
- print(f"{bcolors.OKGREEN}This message is green{bcolors.ENDC}")
最終結(jié)果
- 源 MySQL 數(shù)據(jù)庫(kù)
- 遷移后的目標(biāo) MongoDB 數(shù)據(jù)庫(kù)
- 在VSCode 中的 Python 腳本和輸出
完整的腳本
- import mysql.connector
- import pymongo
- import datetime
- class bcolors:
- HEADER = '\033[95m'
- OKBLUE = '\033[94m'
- OKCYAN = '\033[96m'
- OKGREEN = '\033[92m'
- WARNING = '\033[93m'
- FAIL = '\033[91m'
- ENDC = '\033[0m'
- BOLD = '\033[1m'
- UNDERLINE = '\033[4m'
- begin_time = datetime.datetime.now()
- print(f"{bcolors.HEADER}Script started at: {begin_time} {bcolors.ENDC}")
- delete_existing_documents = True;
- mysql_host="localhost"
- mysql_database="mydatabase"
- mysql_schema = "myschhema"
- mysql_user="root"
- mysql_password=""
- mongodb_host = "mongodb://localhost:27017/"
- mongodb_dbname = "mymongodb"
- print(f"{bcolors.HEADER}Initializing database connections...{bcolors.ENDC}")
- print("")
- #MySQL connection
- print(f"{bcolors.HEADER}Connecting to MySQL server...{bcolors.ENDC}")
- mysqldb = mysql.connector.connect(
- host=mysql_host,
- database=mysql_database,
- user=mysql_user,
- password=mysql_password
- )
- print(f"{bcolors.HEADER}Connection to MySQL Server succeeded.{bcolors.ENDC}")
- #MongoDB connection
- print(f"{bcolors.HEADER}Connecting to MongoDB server...{bcolors.ENDC}")
- myclient = pymongo.MongoClient(mongodb_host)
- mydb = myclient[mongodb_dbname]
- print(f"{bcolors.HEADER}Connection to MongoDB Server succeeded.{bcolors.ENDC}")
- print(f"{bcolors.HEADER}Database connections initialized successfully.{bcolors.ENDC}")
- #Start migration
- print(f"{bcolors.HEADER}Migration started...{bcolors.ENDC}")
- dblist = myclient.list_database_names()
- if mongodb_dbname in dblist:
- print(f"{bcolors.OKBLUE}The database exists.{bcolors.ENDC}")
- else:
- print(f"{bcolors.WARNING}The database does not exist, it is being created.{bcolors.ENDC}")
- #Function migrate_table
- def migrate_table(db, col_name):
- mycursor = db.cursor(dictionary=True)
- mycursor.execute("SELECT * FROM " + col_name + ";")
- myresult = mycursor.fetchall()
- mycol = mydb[col_name]
- if delete_existing_documents:
- #delete all documents in the collection
- mycol.delete_many({})
- #insert the documents
- if len(myresult) > 0:
- x = mycol.insert_many(myresult)
- return len(x.inserted_ids)
- else:
- return 0
- #Iterate through the list of tables in the schema
- table_list_cursor = mysqldb.cursor()
- table_list_cursor.execute("SELECT table_name FROM information_schema.tables WHERE table_schema = %s ORDER BY table_name LIMIT 15;", (mysql_schema,))
- tables = table_list_cursor.fetchall()
- total_count = len(tables)
- success_count = 0
- fail_count = 0
- for table in tables:
- try:
- print(f"{bcolors.OKCYAN}Processing table: {table[0]}...{bcolors.ENDC}")
- inserted_count = migrate_table(mysqldb, table[0])
- print(f"{bcolors.OKGREEN}Processing table: {table[0]} completed. {inserted_count} documents inserted.{bcolors.ENDC}")
- success_count += 1
- except Exception as e:
- print(f"{bcolors.FAIL} {e} {bcolors.ENDC}")
- fail_count += 1
- print("")
- print("Migration completed.")
- print(f"{bcolors.OKGREEN}{success_count} of {total_count} tables migrated successfully.{bcolors.ENDC}")
- if fail_count > 0:
- print(f"{bcolors.FAIL}Migration of {fail_count} tables failed. See errors above.{bcolors.ENDC}")
- end_time = datetime.datetime.now()
- print(f"{bcolors.HEADER}Script completed at: {end_time} {bcolors.ENDC}")
- print(f"{bcolors.HEADER}Total execution time: {end_time-begin_time} {bcolors.ENDC}")
警告
該腳本適用于中小型 MySQL 數(shù)據(jù)庫(kù),它有幾百個(gè)表,每個(gè)表有幾千行。對(duì)于具有數(shù)百萬(wàn)行的大型數(shù)據(jù)庫(kù),性能可能會(huì)受到影響。在開(kāi)始實(shí)際遷移之前,請(qǐng)?jiān)诒砹斜聿樵兒蛯?shí)際表select查詢上使用 LIMIT 關(guān)鍵字對(duì)有限行進(jìn)行檢測(cè)。
下載
帶點(diǎn)擊此處從 GitHub 下載整個(gè)腳本:
https://github.com/zshameel/MySQL2MongoDB
【51CTO譯稿,合作站點(diǎn)轉(zhuǎn)載請(qǐng)注明原文譯者和出處為51CTO.com】