#!/usr/bin/env python '''\ Check an SQS queue for ASG lifecycle notifications of new instances, and run the appropriate Ansible playbook against the host. ''' import argparse import logging import boto3 import botocore.exceptions import json import sys import os import time import errno from subprocess import Popen, PIPE from tempfile import gettempdir from hashlib import sha256 ANSIBLE_PLAYBOOK_CMD = '/usr/local/bin/ansible-playbook' def notify(subj, msg): if topic: u8msg = unicode(msg).encode('utf-8') topic.publish(Subject=subj, Message=u8msg[:262144]) else: print(msg) def handleEvent(message, event, ASGName, InstanceId): notice = [' '.join([ASGName, InstanceId, event])] postnotice = [] if os.path.isfile(os.path.join(args.playbooks, ASGName + '.yml')): message.change_visibility(VisibilityTimeout=(60 * 15)) # hope config doesn't take more than 15m cmd = [ANSIBLE_PLAYBOOK_CMD, '--limit', InstanceId, ASGName + '.yml'] p = Popen(cmd, cwd=args.playbooks, stdout=PIPE, stderr=PIPE) (stdoutdata, stderrdata) = p.communicate() retval = p.returncode message.change_visibility(VisibilityTimeout=60) if retval: notice += ['FAILURE CODE {}'.format(retval), stderrdata, stdoutdata] else: notice += ['SUCCESS'] message.delete() if os.path.isfile(os.path.join(args.playbooks, ASGName + '-post.yml')): postnotice = [' '.join([ASGName, 'post', event])] cmd = [ANSIBLE_PLAYBOOK_CMD, ASGName + '-post.yml'] p = Popen(cmd, cwd=args.playbooks, stdout=PIPE, stderr=PIPE) (stdoutdata, stderrdata) = p.communicate() retval = p.returncode if retval: postnotice += ['FAILURE CODE {}'.format(retval), stderrdata, stdoutdata] else: postnotice += ['SUCCESS'] else: notice += ['no action taken: no playbook for this ASG'] message.delete() notify(notice[0], '\n'.join(notice)) if len(postnotice): notify(postnotice[0], '\n'.join(postnotice)) def processMessage(message): '''Unpack the data we want from an SQS message.''' try: data = json.loads(json.loads(message.body)['Message']) event = data['Event'] ASGName = data['AutoScalingGroupName'] InstanceId = data['EC2InstanceId'] except: logging.warning('unparsable message %r', message.body) message.delete() else: if event == 'autoscaling:EC2_INSTANCE_LAUNCH': try: instanceState = ec2r.Instance(InstanceId).state['Name'] except: logging.warning('instance %s does not exist', InstanceId) message.change_visibility(VisibilityTimeout=60 * 2) else: if instanceState == 'running': handleEvent(message, event, ASGName, InstanceId) else: logging.warning('instance %s is in state %s, will try again', InstanceId, instanceState) message.change_visibility(VisibilityTimeout=60 * 2) else: logging.warning('nothing to do for event %r', data) message.delete() class PidFileSingleton: '''Ensure that only one instance of this specific script runs at once.''' def __init__(self): self.pidfile = os.path.join(gettempdir(), sha256(os.path.abspath(sys.argv[0])).hexdigest() + '.pid') try: fd = os.open(self.pidfile, os.O_WRONLY|os.O_CREAT|os.O_EXCL, ) except OSError as e: self.pidfile = None if e.errno == errno.EEXIST: logging.debug('An instance of this is already running.') sys.exit(0) raise e with os.fdopen(fd, 'w') as f: f.write(str(os.getpid())) def __del__(self): if self.pidfile: os.unlink(self.pidfile) parser = argparse.ArgumentParser(description='act on SQS Notifications') parser.add_argument('--profile', metavar='PROFILE', dest='profile_name', help='AWS Profile (default: current IAM Role)') parser.add_argument('--region', metavar='REGION', dest='region_name', help='AWS Region') parser.add_argument('playbooks', metavar='directory', help='path containing playbooks et al') parser.add_argument('queue', help='SQS Queue') parser.add_argument('arn', nargs='?', default=None, help='ARN of SNS topic') args = parser.parse_args() pidfile = PidFileSingleton() # occasionally, small instances seem to briefly lose their iam credentials sessionTriesRemaining = 3 while sessionTriesRemaining: try: session = boto3.session.Session(**{k:v for k,v in vars(args).items() if k in ('profile_name', 'region_name')}) queue = session.resource('sqs').get_queue_by_name(QueueName=args.queue) topic = session.resource('sns').Topic(args.arn) if args.arn else None ec2r = session.resource('ec2') except botocore.exceptions.NoCredentialsError as e: logging.debug('Trouble with credentials, will retry %s more times.', sessionTriesRemaining) sessionTriesRemaining -= 1 time.sleep(5) continue break if sessionTriesRemaining == 0: logging.error('Failed trying to use IAM credentials.') sys.exit(1) while True: # long poll until there are no more messages messages = queue.receive_messages(MaxNumberOfMessages=10, WaitTimeSeconds=20) if not len(messages): break for message in messages: processMessage(message)