AWS IP更新をセキュリティグループへ取り込み
上記で公開されているIPをEC2のセキュリティグループに取り込む必要があったので、
Pythonでさっと書いてみました。
処理フローとしては、下記の通り。
- IPアドレスのリストが変更される
- SNSでメールとLambdaに通知される
- Lambdaが実行される
- 新規にセキュリティグループを作成する
- セキュリティグループのルール(Egress)を変更する
- EC2に付与されているセキュリティグループ一覧を取得する
- EC2のセキュリティグループをアップデートする
- 古いセキュリティグループを削除する
環境変数には下記を設定する必要があります。
## OS Environment Info # VpcId : (str) vpc_xxxxxxx # InstanceId : (str) i-xxxxxxxx # SgNamePrefix : (str)nameprefixForSg # SgNameDescription : (str)descriptionForSg
エラーハンドリングの加筆とかいろいろありますが、
まずは初版ということで。
#!/usr/bin/env python # -*- coding: utf-8 -*- import urllib.request import boto3 import json import os import sys import datetime from datetime import timedelta, tzinfo client = boto3.client('ec2') class JST(tzinfo): def utcoffset(self, dt): return timedelta(hours=9) def dst(self, dt): return timedelta(0) def tzname(self, dt): return 'JST' def create_securitygroup(VpcIdArg,NewGroupNameArg,TargetSgNameDescriptionArg): ## Args Info # VpcIdArg(excisting) : (str)vpc-xxxxxxx # NewGroupNameArg(new) : (str) SomeNameforSG # TargetSgNameDescriptionArg(new) : (str) DescriptionforSG response = client.create_security_group( Description=TargetSgNameDescriptionArg, GroupName=NewGroupNameArg, VpcId=VpcIdArg ) return(response['GroupId']) def update_securitygroup(TargetSgIdArg): ## Args Info # TargetSgIdArg(made by this) : (str)sg-xxxxxxx url = "https://ip-ranges.amazonaws.com/ip-ranges.json" readObj = urllib.request.urlopen(url) response = json.load(readObj) iplist = response['prefixes'] CidrIpList = [] IpDict = {} for attr in iplist: if attr['region'] == 'ap-northeast-1' and attr['service'] == 'AMAZON': IpDict['CidrIp'] = attr['ip_prefix'] CidrIpList.append(IpDict) IpDict = {} try: ec2 = boto3.resource('ec2') security_group = ec2.SecurityGroup(TargetSgIdArg) response = security_group.authorize_egress( DryRun=False, IpPermissions=[ { 'IpProtocol': 'tcp', 'FromPort': 443, 'IpRanges': CidrIpList, 'ToPort': 443 } ] ) except Exception as e: raise e def delete_securitygroup(GroupIdArg): ## Args Info # GroupIdArg(excisting) : (str)sg-xxxxxxx response = client.delete_security_group( GroupId=GroupIdArg ) def modify_securitygroup(TargetOldSgIdsListArg,TargetInstanceIdArg): ## Args Info # TargetOldSgIdsListArg(excisting) : (list) ['sg-xxxxxxx','sg-xxxxxxx'] # TargetInstanceIdArg(excisting) : (str)i-xxxxxxxxx response = client.modify_instance_attribute( Groups=TargetOldSgIdsListArg, InstanceId=TargetInstanceIdArg, ) def describe_ec2securitygroups(TargetInstanceIdArg): ## Args Info # TargetInstanceIdArg(excisting) : (str)i-xxxxxxxxx response = client.describe_instance_attribute( Attribute='groupSet', InstanceId=TargetInstanceIdArg ) return(response['Groups']) def get_securitygroupname(TargetOldSgIdListArg,TargetSgNameArg): ## Args Info # TargetOldSgIdListArg(excisting) : (list) ['sg-xxxxxxx','sg-xxxxxxx'] # TargetSgNameArg(excisting) : (str)SgName_ response = client.describe_security_groups( GroupIds=TargetOldSgIdListArg ) for attr in response['SecurityGroups']: if attr['GroupName'].startswith(TargetSgNameArg): return(attr['GroupId']) def lambda_handler(event, context): ## OS Environment Info # VpcId : (str) vpc_xxxxxxx # InstanceId : (str) i-xxxxxxxx # SgNamePrefix : (str)nameprefixForSg # SgNameDescription : (str)descriptionForSg ## get osenviron TargetVpcId = os.environ['VpcId'] TargetInstanceId = os.environ['InstanceId'] TargetSgNamePrefix = os.environ['SgNamePrefix'] TargetSgNameDescription = os.environ['SgNameDescription'] ## define GroupName yyyymmddHHMM = datetime.datetime.now(tz=JST()).strftime('%Y%m%d%H%M') GroupName = TargetSgNamePrefix + yyyymmddHHMM ## describe securitygroup of Instance TargetOldSgIdList = describe_ec2securitygroups(TargetInstanceId) ## get SecurityGroupName TargetOldSgIdsList = [] for OldSgIdN in TargetOldSgIdList: TargetOldSgIdsList.append(OldSgIdN['GroupId']) TargetOldSgId = get_securitygroupname(TargetOldSgIdsList,TargetSgNamePrefix) ## create SecurityGroup TargetSgId=create_securitygroup(TargetVpcId,GroupName,TargetSgNameDescription) ## update SecurityGroup update_securitygroup(TargetSgId) ## attach SecurityGroup TargetOldSgIdsList.append(TargetSgId) modify_securitygroup(TargetOldSgIdsList,TargetInstanceId) ## detach SecurityGroup TargetOldSgIdsList.remove(TargetOldSgId) modify_securitygroup(TargetOldSgIdsList,TargetInstanceId) ##delete SecurityGroup delete_securitygroup(TargetOldSgId)