使用AWS云端機(jī)器學(xué)習(xí),構(gòu)建無(wú)服務(wù)器新聞數(shù)據(jù)管道
譯文【51CTO.com快譯】作為一名分析師,我花很多時(shí)間來(lái)跟蹤新聞和行業(yè)最新資訊。我在休產(chǎn)假時(shí)考慮了這個(gè)問(wèn)題,決定構(gòu)建一個(gè)簡(jiǎn)單的應(yīng)用程序來(lái)跟蹤有關(guān)綠色技術(shù)和可再生能源的新聞。使用AWS Lambda及AWS的其他服務(wù)(比如EventBridge、SNS、DynamoDB和Sagemaker),可以非常輕松地上手,在幾天內(nèi)構(gòu)建好原型。
該應(yīng)用程序由一系列無(wú)服務(wù)器Lambda函數(shù)和作為SageMaker端點(diǎn)來(lái)部署的文本摘要機(jī)器學(xué)習(xí)模型提供支持。AWS EventBridge規(guī)則每24小時(shí)觸發(fā)一次Lambda函數(shù),從DynamoDB數(shù)據(jù)庫(kù)獲取新聞源(feed)。
然后這些新聞源作為SNS主題來(lái)發(fā)送,以觸發(fā)多個(gè)Lambda分析新聞源并提取新聞URL。每個(gè)網(wǎng)站每天更新RSS新聞源最多只更新幾篇文章,因此我們不會(huì)發(fā)送大批流量,不然可能會(huì)導(dǎo)致消耗任何特定新聞出版物的過(guò)多資源。
然而,一大問(wèn)題是提取文章的全文,因?yàn)槊總€(gè)網(wǎng)站不一樣。對(duì)我們來(lái)說(shuō)幸運(yùn)的是,goose3之類的庫(kù)通過(guò)運(yùn)用機(jī)器學(xué)習(xí)方法提取頁(yè)面正文來(lái)解決這個(gè)問(wèn)題。由于版權(quán)問(wèn)題,我無(wú)法存儲(chǔ)文章的全文,這就是為什么我運(yùn)用HuggingFace文本摘要轉(zhuǎn)換器模型來(lái)生成簡(jiǎn)短摘要。
下面詳細(xì)介紹了如何自行構(gòu)建基于機(jī)器學(xué)習(xí)的新聞聚合管道。
1. 設(shè)置擁有必要權(quán)限的IAM角色。
雖然這個(gè)數(shù)據(jù)管道很簡(jiǎn)單,但它連接許多AWS資源。想授予我們的函數(shù)訪問(wèn)所有所需資源的權(quán)限,我們需要設(shè)置IAM角色。該角色為函數(shù)賦予了使用云端其他資源的權(quán)限,比如DynamoDB、Sagemaker、CloudWatch和SNS。出于安全原因,最好不要為我們的IAM角色賦予全面的AWS管理訪問(wèn)權(quán)限,只允許它使用所需的資源。
2. 在RSS Dispatcher Lambda中從DynamoDB獲取RSS新聞源
使用AWS Lambda幾乎可以做任何事情,它是一種非常強(qiáng)大的無(wú)服務(wù)器計(jì)算服務(wù),非常適合短任務(wù)。對(duì)我而言,主要優(yōu)點(diǎn)在于很容易訪問(wèn)AWS生態(tài)系統(tǒng)中的其他服務(wù)。
我將所有RSS新聞源存儲(chǔ)在DynamoDB表中,使用boto3庫(kù)從Lambda訪問(wèn)它真的很容易。一旦從數(shù)據(jù)庫(kù)獲取所有新聞源后,我將它們作為SNS消息發(fā)送,以觸發(fā)新聞源解析Lambda。
- import boto3
- import json
- def lambda_handler(event, context):
- # Connect to DynamoDB
- dynamodb = boto3.resource('dynamodb')
- # Get table
- table = dynamodb.Table('rss_feeds')
- # Get all records from the table
- data = table.scan()['Items']
- rss = [y['rss'] for y in data]
- # Connect to SNS
- client = boto3.client('sns')
- # Send messages to the queue
- for item in rss:
- client.publish(TopicArn="arn:aws:sns:eu-west-1:802099603194:rss_to-parse", Message = item)
3. 使用必要的庫(kù)創(chuàng)建層
想在AWS Lambdas中使用一些特定庫(kù),您需要將它們作為層來(lái)導(dǎo)入。想準(zhǔn)備庫(kù)導(dǎo)入,它需要位于python.zip壓縮包中,然后我們可以將其上傳到AWS并在函數(shù)中使用。想創(chuàng)建層,只需cd進(jìn)入到Python文件夾中,運(yùn)行pip install命令,將其壓縮并準(zhǔn)備好上傳。
- pip install feedparser -t
然而,我將goose3庫(kù)作為一個(gè)層來(lái)部署時(shí)遇到了一些困難。簡(jiǎn)單的調(diào)查后發(fā)現(xiàn),LXML等一些庫(kù)需要在類似Lambda的環(huán)境(Linux)中加以編譯。因此如果庫(kù)在Windows上編譯,然后導(dǎo)入到函數(shù)中,就會(huì)發(fā)生錯(cuò)誤。為了解決這個(gè)問(wèn)題,在創(chuàng)建壓縮包之前,我們需要在Linux上安裝該庫(kù)。
這有兩種方法。首先,安裝在帶有Docker的模擬Lambda環(huán)境上。對(duì)我來(lái)說(shuō),最簡(jiǎn)單的方法是使用AWS sam build命令。函數(shù)構(gòu)建后,我只需從構(gòu)建文件夾中拷貝所需的包,并將它們作為層來(lái)上傳。
- sam build --use-container
4. 啟動(dòng)Lambda函數(shù)來(lái)解析新聞源
一旦我們將新聞URL作為主題發(fā)送到SNS,就可以觸發(fā)多個(gè)Lambda從RSS新聞源去獲取新聞文章。一些RSS新聞源不一樣,但新聞源解析器庫(kù)允許我們使用不同的格式。我們的URL是事件對(duì)象的一部分,所以我們需要通過(guò)key來(lái)提取它。
- import boto3
- import feedparser
- from datetime import datetime
- def lambda_handler(event, context):
- #Connect to DynamoDB
- dynamodb = boto3.resource('dynamodb')
- # Get table
- table = dynamodb.Table('news')
- # Get a url from from event
- url = event['Records'][0]['Sns']['Message']
- # Parse the rss feed
- feed = feedparser.parse(url)
- for item in feed['entries']:
- result = {
- "news_url": item['link'],
- "title": item['title'],
- "created_at": datetime.now().strftime('%Y-%m-%d') # so that dynamodb will be ok with our date
- }
- # Save the result to dynamodb
- table.put_item(Item=result, ConditionExpression='attribute_not_exists(news_url)') # store only unique urls
5. 在Sagemaker上創(chuàng)建和部署文本摘要模型
Sagemaker是一項(xiàng)服務(wù),可讓您輕松在AWS上編寫、訓(xùn)練和部署機(jī)器學(xué)習(xí)模型。 HuggingFace與AWS合作,使用戶更容易將其模型部署到云端。
這里我在Jupiter notebook中編寫了一個(gè)簡(jiǎn)單的文本摘要模型,并使用deploy()命令來(lái)部署它。
- from sagemaker.huggingface import HuggingFaceModel
- import sagemaker
- role = sagemaker.get_execution_role()
- hub = {
- 'HF_MODEL_ID':'facebook/bart-large-cnn',
- 'HF_TASK':'summarization'
- }
- # Hugging Face Model Class
- huggingface_model = HuggingFaceModel(
- transformers_version='4.6.1',
- pytorch_version='1.7.1',
- py_version='py36',
- env=hub,
- role=role,
- )
- # deploy model to SageMaker Inference
- predictor = huggingface_model.deploy(
- initial_instance_count=1, # number of instances
- instance_type='ml.m5.xlarge' # ec2 instance type
- )
一旦部署完畢,我們可以從Sagemaker -> Inference -> Endpoint configuration獲取端點(diǎn)信息,并用在我們的Lamdas中。
6. 獲取文章的全文、摘要文章并將結(jié)果存儲(chǔ)在DynamoDB中
由于版權(quán)我們不會(huì)存儲(chǔ)全文,這就是為什么所有處理工作都在一個(gè)Lambda中進(jìn)行。一旦URL進(jìn)入到Dynamo DB表,我啟動(dòng)了文本處理Lambda。為此,我創(chuàng)建了DynamoDB項(xiàng)生成,作為啟動(dòng)Lambda的觸發(fā)器。我創(chuàng)建了批大小,那樣Lambda每次只處理一篇文章。
- import json
- import boto3
- from goose3 import Goose
- from datetime import datetime
- def lambda_handler(event, context):
- # Get url from DynamoDB record creation event
- url = event['Records'][0]['dynamodb']['Keys']['news_url']['S']
- # fetch article full text
- g = Goose()
- article = g.extract(url=url)
- body = article.cleaned_text # clean article text
- published_date = article.publish_date # from meta desc
- # Create a summary using our HuggingFace text summary model
- ENDPOINT_NAME = "your_model_endpoint"
- runtime= boto3.client('runtime.sagemaker')
- response = runtime.invoke_endpoint(EndpointName=ENDPOINT_NAME, ContentType='application/json', Body=json.dumps(data))
- #extract a summary
- summary = json.loads(response['Body'].read().decode())
- #Connect to DynamoDB
- dynamodb = boto3.resource('dynamodb')
- # Get table
- table = dynamodb.Table('news')
- # Update item stored in dynamoDB
- update = table.update_item(
- Key = { "news_url": url }
- ,
- ConditionExpression= 'attribute_exists(news_url) ',
- UpdateExpression='SET summary = :val1, published_date = :val2'
- ExpressionAttributeValues={
- ':val1': summary,
- ':val2': published_date
- }
- )
這就是我們?nèi)绾问褂肁WS工具,構(gòu)建并部署一個(gè)簡(jiǎn)單的無(wú)服務(wù)器數(shù)據(jù)管道以讀取最新新聞的過(guò)程。
原文標(biāo)題:Build a Serverless News Data Pipeline using ML on AWS Cloud,作者:Maria Zentsova
【51CTO譯稿,合作站點(diǎn)轉(zhuǎn)載請(qǐng)注明原文譯者和出處為51CTO.com】