AWS IP更新をセキュリティグループへ取り込み

docs.aws.amazon.com

上記で公開されているIPをEC2のセキュリティグループに取り込む必要があったので、
Pythonでさっと書いてみました。

処理フローとしては、下記の通り。

  • IPアドレスのリストが変更される
  • SNSでメールとLambdaに通知される
  • Lambdaが実行される
  • 新規にセキュリティグループを作成する
  • セキュリティグループのルール(Egress)を変更する
  • EC2に付与されているセキュリティグループ一覧を取得する
  • EC2のセキュリティグループをアップデートする
  • 古いセキュリティグループを削除する

環境変数には下記を設定する必要があります。

    ## OS Environment Info
    # VpcId : (str) vpc_xxxxxxx
    # InstanceId : (str) i-xxxxxxxx
    # SgNamePrefix : (str)nameprefixForSg
    # SgNameDescription : (str)descriptionForSg

エラーハンドリングの加筆とかいろいろありますが、
まずは初版ということで。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import urllib.request
import boto3
import json
import os
import sys
import datetime
from datetime import timedelta, tzinfo

client = boto3.client('ec2')

class JST(tzinfo):
    def utcoffset(self, dt):
        return timedelta(hours=9)

    def dst(self, dt):
        return timedelta(0)

    def tzname(self, dt):
        return 'JST'

def create_securitygroup(VpcIdArg,NewGroupNameArg,TargetSgNameDescriptionArg):
    ## Args Info
    # VpcIdArg(excisting) : (str)vpc-xxxxxxx
    # NewGroupNameArg(new) : (str) SomeNameforSG 
    # TargetSgNameDescriptionArg(new) : (str) DescriptionforSG
    response = client.create_security_group(
        Description=TargetSgNameDescriptionArg,
        GroupName=NewGroupNameArg,
        VpcId=VpcIdArg
    )
    return(response['GroupId'])
    
def update_securitygroup(TargetSgIdArg):
    ## Args Info
    # TargetSgIdArg(made by this) : (str)sg-xxxxxxx
    url = "https://ip-ranges.amazonaws.com/ip-ranges.json"
    readObj = urllib.request.urlopen(url)
    response = json.load(readObj)
    iplist = response['prefixes']
    CidrIpList = []
    IpDict = {}

    for attr in iplist:
      if attr['region'] == 'ap-northeast-1' and attr['service'] == 'AMAZON':
        IpDict['CidrIp'] = attr['ip_prefix']
        CidrIpList.append(IpDict)
        IpDict = {}
        
    try:
        ec2 = boto3.resource('ec2')
        security_group = ec2.SecurityGroup(TargetSgIdArg)
        response = security_group.authorize_egress(
            DryRun=False,
            IpPermissions=[
                {
                    'IpProtocol': 'tcp',
                    'FromPort': 443,
                    'IpRanges': CidrIpList,
                    'ToPort': 443
                }
            ]
        )
    
    except Exception as e:
        raise e

    
def delete_securitygroup(GroupIdArg):
    ## Args Info
    # GroupIdArg(excisting) : (str)sg-xxxxxxx
    response = client.delete_security_group(
        GroupId=GroupIdArg
    )

def modify_securitygroup(TargetOldSgIdsListArg,TargetInstanceIdArg):
    ## Args Info
    # TargetOldSgIdsListArg(excisting) : (list) ['sg-xxxxxxx','sg-xxxxxxx']
    # TargetInstanceIdArg(excisting) : (str)i-xxxxxxxxx
    response = client.modify_instance_attribute(
        Groups=TargetOldSgIdsListArg,
        InstanceId=TargetInstanceIdArg,
    )
    
def describe_ec2securitygroups(TargetInstanceIdArg):
    ## Args Info
    # TargetInstanceIdArg(excisting) : (str)i-xxxxxxxxx
    response = client.describe_instance_attribute(
        Attribute='groupSet',
        InstanceId=TargetInstanceIdArg
    )
    return(response['Groups'])

def get_securitygroupname(TargetOldSgIdListArg,TargetSgNameArg):
    ## Args Info
    # TargetOldSgIdListArg(excisting) : (list) ['sg-xxxxxxx','sg-xxxxxxx']
    # TargetSgNameArg(excisting) : (str)SgName_
    response = client.describe_security_groups(
        GroupIds=TargetOldSgIdListArg
    )
    for attr in response['SecurityGroups']:
        if attr['GroupName'].startswith(TargetSgNameArg):
            return(attr['GroupId'])

def lambda_handler(event, context):
    ## OS Environment Info
    # VpcId : (str) vpc_xxxxxxx
    # InstanceId : (str) i-xxxxxxxx
    # SgNamePrefix : (str)nameprefixForSg
    # SgNameDescription : (str)descriptionForSg
    ## get osenviron
    TargetVpcId = os.environ['VpcId']
    TargetInstanceId = os.environ['InstanceId']
    TargetSgNamePrefix = os.environ['SgNamePrefix']
    TargetSgNameDescription = os.environ['SgNameDescription']
    
    ## define GroupName
    yyyymmddHHMM = datetime.datetime.now(tz=JST()).strftime('%Y%m%d%H%M')
    GroupName = TargetSgNamePrefix + yyyymmddHHMM

    ## describe securitygroup of Instance
    TargetOldSgIdList = describe_ec2securitygroups(TargetInstanceId)
    
    ## get SecurityGroupName
    TargetOldSgIdsList = []
    for OldSgIdN in TargetOldSgIdList:
        TargetOldSgIdsList.append(OldSgIdN['GroupId'])

    TargetOldSgId = get_securitygroupname(TargetOldSgIdsList,TargetSgNamePrefix)

    ## create SecurityGroup
    TargetSgId=create_securitygroup(TargetVpcId,GroupName,TargetSgNameDescription)
    
    ## update SecurityGroup
    update_securitygroup(TargetSgId)

    ## attach SecurityGroup
    TargetOldSgIdsList.append(TargetSgId)
    modify_securitygroup(TargetOldSgIdsList,TargetInstanceId)

    ## detach SecurityGroup
    TargetOldSgIdsList.remove(TargetOldSgId)
    modify_securitygroup(TargetOldSgIdsList,TargetInstanceId)

    ##delete SecurityGroup
    delete_securitygroup(TargetOldSgId)