SNS(Simple Notification Service)是AWS的服务之一,用于发送信息,将publisher和subscriber进行解耦。下面的实验是创建一个Email自动发送的demo程序,包括两大部分:

  • 创建topic,设置目标的邮箱地址,并认证地址以便邮箱正常接收邮件
  • 根据topic配置邮件并发送。

1 设置Topic和邮箱

  • Topic是SNS的基础单位,后面发邮件的时候,会根据Topic的arn来选择发送,算是一个publisher。
  • subscription是subscriber,单位是邮箱地址,subscriber可以配置filter来根据attribute过滤一些邮件。
  1. 导入boto3和对应的client
import boto3
sns=boto3.client("sns")
  1. 创建Topic
topic_response = sns.create_topic(
    Name='test002'
)
  1. 根据topic的arn来创建subscription
subscription_response = sns.subscribe(
    TopicArn=topic_response['TopicArn'],
    Protocol='email',
    Endpoint='目标邮箱地址',
    ReturnSubscriptionArn=True
)
  1. 创建subscription之后,目标邮箱地址会收到一封确认邮件,点进链接之后就会自动confirm。confirm之后就可以正常收到邮件了。

2 发送邮件

  1. 首先导入需要使用的Library
import boto3
import json
from datetime import datetime, timedelta
from datetime import timezone
  1. 定义函数来配置主题Subject和内容Messages
def get_subject():
    subject = '主题:测试SNS Email'
    return subject
  1. 由于Python发送Email的时候,无论是使用\n, <br> , .\n,%0a, &#10; 都不能实现换行效果。因此,使用array的形式,后面会用json.dumps(message, indent=2)来实现换行效果。
def get_messages():

    utc_dt = datetime.utcnow().replace(tzinfo=timezone.utc).astimezone(timezone(timedelta(hours=8)))
    bj_dt = utc_dt

    subject = '主题:测试SNS Email'
    messages = ['本邮件是SNS测试邮件:Status : Success',
            '日期时间 : {year:04}年 {month:02}月 {day:02}日 {hour:02}:{minute:02}'.format(year=bj_dt.year, month=bj_dt.month, day=bj_dt.day, hour=bj_dt.hour, minute=bj_dt.minute)]
    return messages
  1. 定义函数来发送Email,使用sns.publish(TargetArn='SNS对应的Topic Arn', Message='正文', Subject='主题')
def publish_to_sns(subject, messages):
    topic_arn = "topic arn"
    sns=boto3.client("sns")
    message=messages
    response=sns.publish(
        TargetArn=topic_arn,
        Message=json.dumps({'default': json.dumps(message, ensure_ascii=False, indent=2)}),
        Subject=subject,
        MessageStructure='json',
    )
    
    return response
  1. 最终,发送
subject, messages = get_subject(), get_messages()

response = publish_to_sns(subject, messages)

完整代码:

import boto3
import json
from datetime import datetime, timedelta
from datetime import timezone

def publish_to_sns(subject, messages):
    topic_arn = "arn:aws:sns:ap-northeast-1:337058716437:test001"
    sns=boto3.client("sns")
    message=messages
    response=sns.publish(
        TargetArn=topic_arn,
        Message=json.dumps({'default': json.dumps(message, ensure_ascii=False, indent=2)}),
        Subject=subject,
        MessageStructure='json',
    )
    
    return response

def get_subject():
    subject = '主题:测试SNS Email'
    return subject

def get_messages():

    utc_dt = datetime.utcnow().replace(tzinfo=timezone.utc).astimezone(timezone(timedelta(hours=8)))
    bj_dt = utc_dt

    subject = '主题:测试SNS Email'
    messages = ['本邮件是SNS测试邮件:Status : Success',
            '日期时间 : {year:04}年 {month:02}月 {day:02}日 {hour:02}:{minute:02}'.format(year=bj_dt.year, month=bj_dt.month, day=bj_dt.day, hour=bj_dt.hour, minute=bj_dt.minute)]
    return messages

subject, messages = get_subject(), get_messages()

response = publish_to_sns(subject, messages)
最后修改:2021 年 06 月 01 日 02 : 24 PM
如果觉得我的文章对你有用,请随意赞赏