SNS(Simple Notification Service)是AWS的服务之一,用于发送信息,将publisher和subscriber进行解耦。下面的实验是创建一个Email自动发送的demo程序,包括两大部分:
- 创建topic,设置目标的邮箱地址,并认证地址以便邮箱正常接收邮件
- 根据topic配置邮件并发送。
1 设置Topic和邮箱
- Topic是SNS的基础单位,后面发邮件的时候,会根据Topic的arn来选择发送,算是一个publisher。
- subscription是subscriber,单位是邮箱地址,subscriber可以配置filter来根据attribute过滤一些邮件。
- 导入boto3和对应的client
import boto3
sns=boto3.client("sns")
- 创建Topic
topic_response = sns.create_topic(
Name='test002'
)
- 根据topic的arn来创建subscription
subscription_response = sns.subscribe(
TopicArn=topic_response['TopicArn'],
Protocol='email',
Endpoint='目标邮箱地址',
ReturnSubscriptionArn=True
)
- 创建subscription之后,目标邮箱地址会收到一封确认邮件,点进链接之后就会自动confirm。confirm之后就可以正常收到邮件了。
2 发送邮件
- 首先导入需要使用的Library
import boto3
import json
from datetime import datetime, timedelta
from datetime import timezone
- 定义函数来配置主题Subject和内容Messages
def get_subject():
subject = '主题:测试SNS Email'
return subject
- 由于Python发送Email的时候,无论是使用
\n
,<br>
,.\n
,%0a
,
都不能实现换行效果。因此,使用array的形式,后面会用json.dumps(message, indent=2)
来实现换行效果。
def get_messages():
utc_dt = datetime.utcnow().replace(tzinfo=timezone.utc).astimezone(timezone(timedelta(hours=8)))
bj_dt = utc_dt
subject = '主题:测试SNS Email'
messages = ['本邮件是SNS测试邮件:Status : Success',
'日期时间 : {year:04}年 {month:02}月 {day:02}日 {hour:02}:{minute:02}'.format(year=bj_dt.year, month=bj_dt.month, day=bj_dt.day, hour=bj_dt.hour, minute=bj_dt.minute)]
return messages
- 定义函数来发送Email,使用
sns.publish(TargetArn='SNS对应的Topic Arn', Message='正文', Subject='主题')
def publish_to_sns(subject, messages):
topic_arn = "topic arn"
sns=boto3.client("sns")
message=messages
response=sns.publish(
TargetArn=topic_arn,
Message=json.dumps({'default': json.dumps(message, ensure_ascii=False, indent=2)}),
Subject=subject,
MessageStructure='json',
)
return response
- 最终,发送
subject, messages = get_subject(), get_messages()
response = publish_to_sns(subject, messages)
完整代码:
import boto3
import json
from datetime import datetime, timedelta
from datetime import timezone
def publish_to_sns(subject, messages):
topic_arn = "arn:aws:sns:ap-northeast-1:337058716437:test001"
sns=boto3.client("sns")
message=messages
response=sns.publish(
TargetArn=topic_arn,
Message=json.dumps({'default': json.dumps(message, ensure_ascii=False, indent=2)}),
Subject=subject,
MessageStructure='json',
)
return response
def get_subject():
subject = '主题:测试SNS Email'
return subject
def get_messages():
utc_dt = datetime.utcnow().replace(tzinfo=timezone.utc).astimezone(timezone(timedelta(hours=8)))
bj_dt = utc_dt
subject = '主题:测试SNS Email'
messages = ['本邮件是SNS测试邮件:Status : Success',
'日期时间 : {year:04}年 {month:02}月 {day:02}日 {hour:02}:{minute:02}'.format(year=bj_dt.year, month=bj_dt.month, day=bj_dt.day, hour=bj_dt.hour, minute=bj_dt.minute)]
return messages
subject, messages = get_subject(), get_messages()
response = publish_to_sns(subject, messages)