1) Lambda 생성
- 기존에 root가 member의 권한을 다 가지고 왔으므로 lambda를 통해 원격으로 ec2를 시작 및 종료가 가능하다.
- Lambda 규칙은 기존에 만들어놨던 Role를 적용한다. (ou_user_access_role)
ec2 명 : access_another_user_ec2_test
import boto3
import botocore
import logging
import sys
import datetime
from datetime import date, datetime, timezone, timedelta
def setup_logging():
"""
AWS Lambda Log formatter: https://gist.github.com/niranjv/fb95e716151642e8ca553b0e38dd152e
"""
logger = logging.getLogger()
for h in logger.handlers:
logger.removeHandler(h)
h = logging.StreamHandler(sys.stdout)
# use whatever format you want here
FORMAT = '%(asctime)s %(levelname)s %(lineno)d %(message)s'
h.setFormatter(logging.Formatter(FORMAT))
logger.addHandler(h)
logger.setLevel(logging.INFO)
return logger
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def get_sts_credential(account_id: str):
logger.debug(f"trying get sts credential: {account_id}")
try:
# Assume the role
sts_client = boto3.client('sts')
assumed_role_object = sts_client.assume_role(
RoleArn=f"arn:aws:iam::{account_id}:role/ec2ff_for_root",
RoleSessionName="AssumeRoleSession1"
)
logger.info(f"{account_id} assume role success")
return assumed_role_object['Credentials']
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == 'AccessDenied':
logger.error(f"Access denied assume role in account {account_id}")
else:
logger.error(e)
return None
def load_instance_list(account_id: str, sts_credential):
logger.debug(f"trying load instance list: account id-{account_id}")
if sts_credential is None:
logger.error(f"No credentials available for account {account_id}")
return None, []
try:
# Use the assumed role credentials to list EC2 instances
ec2_client = boto3.client(
'ec2',
aws_access_key_id=sts_credential['AccessKeyId'],
aws_secret_access_key=sts_credential['SecretAccessKey'],
aws_session_token=sts_credential['SessionToken'],
region_name='ap-northeast-2' # Change to your desired region
)
# 예를 들어, 모든 인스턴스를 나열하는 코드
instances = ec2_client.describe_instances()
instance_list = [instance['InstanceId'] for reservation in instances['Reservations'] for instance in reservation['Instances']]
logger.debug(f"start checking {instance_list}'s tags")
return ec2_client, instance_list
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == 'AccessDenied':
logger.error(f"Access denied when trying to list instances in account {account_id}")
else:
logger.error(e)
return None, []
def stop_instances(ec2_client, instance_ids: list):
if ec2_client is None:
logger.error("EC2 client is not available")
return False
# 모든 인스턴스 인스턴스 중지
# for instance_id in instance_ids:
# try:
# ec2_client.stop_instances(InstanceIds=[instance_id])
# logger.info(f"Instance stopped: {instance_id}")
# except botocore.exceptions.ClientError as e:
# if e.response['Error']['Code'] == 'AccessDenied':
# logger.error(f"Access denied when trying to stop instance {instance_id}")
# else:
# logger.error(e)
# 추가: 특정 태그 필터링 및 종료
try:
# 시간대비 중지할 value값 추출
exp_day = str(date.today())
KST = timezone(timedelta(hours=9))
time_record = datetime.now(KST)
current_time = time_record.hour - 12 #현재 시간
stop_time_value = str(current_time) + "PM"
# 예를 들어, Code가 16인 태그를 가진 인스턴스만 필터링
filtered_instances = []
for instance_id in instance_ids:
response = ec2_client.describe_tags(Filters=[
{
'Name': 'resource-id',
'Values': [instance_id]
},
{
'Name': 'key',
'Values': ['AutoShutDown']
},
{
'Name': 'value',
'Values': [stop_time_value] # 현재 시간
}
])
if response['Tags']:
filtered_instances.append(instance_id)
# 필터링된 인스턴스를 종료
for instance_id in filtered_instances:
logger.debug(f"trying to stop instance: {instance_id}")
try:
ec2_client.stop_instances(InstanceIds=[instance_id])
logger.info(f"Instance stopped due to matching tag: {instance_id}")
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == 'AccessDenied':
logger.error(f"Access denied when trying to stop instance {instance_id} due to matching tag")
else:
logger.error(e)
except botocore.exceptions.ClientError as e:
logger.error(f"Error when filtering instances by tag: {e}")
return True
# 모든 구성원 계정 ID를 가져오는 함수 (이전 코드에서 사용)
def get_member_account_ids() -> list:
org_client = boto3.client('organizations')
accounts = []
paginator = org_client.get_paginator('list_accounts')
for page in paginator.paginate():
accounts.extend(page['Accounts'])
account_ids = [account['Id'] for account in accounts]
logger.info(f"accound ids loaded: {account_ids}")
return account_ids
def lambda_handler(event, context):
logger = setup_logging()
# 모든 구성원 계정 가져오기
member_account_ids = get_member_account_ids()
# 모든 구성원 계정의 ID를 출력하고 작업 실행
for account_id in member_account_ids:
sts_credential = get_sts_credential(account_id)
ec2_client, instance_list = load_instance_list(account_id, sts_credential)
if ec2_client and instance_list:
stop_instances(ec2_client, instance_list)
else:
logger.info(f"no target EC2 resourece to stop in {account_id}")
2) Lambda 정책 설정
- Lambda가 10초정도 걸리므로 Timeout 시간을 충분하게 늘려준다.
- Configuration -> Permissions
Edit을 눌러 Timeout 시간을 수정한다.
Existing role : ou_user_access_role 인지 확인한다.
그 이후 Deploy를 눌러 배포를 실행하면 코드에 따라 작업이 시작된다.
'Cloud Infra Architecture (AWS) > AWS SAP-C02' 카테고리의 다른 글
AWS Certified Solutions Architecture – Professional 합격후기 (1) | 2023.12.05 |
---|---|
[Organization] Root <-> Member Tag별 ec2 종료 자동화 (EventBridge) (0) | 2023.12.04 |
[Organization] Root <-> Member - CloudFormation Template - S3 생성 및 배포 (2) | 2023.12.04 |
[Organization] Root <-> Member - CloudFormation Tempelete (0) | 2023.12.04 |
[Organization] Root <-> Member (0) | 2023.12.04 |