#!/usr/bin/env python '''\ Check an SQS queue for ASG lifecycle notifications of new instances, and run the appropriate Ansible playbook against the host. ''' import argparse import logging import boto3 import json import sys import os import errno from subprocess import Popen, PIPE from tempfile import gettempdir from hashlib import sha256 ANSIBLE_PLAYBOOK_CMD = '/usr/local/bin/ansible-playbook' def notify(subj, msg): if topic: u8msg = unicode(msg).encode('utf-8') topic.publish(Subject=subj, Message=u8msg[:262144]) else: print(msg) def handleEvent(message, event, ASGName, InstanceId): notice = [' '.join([ASGName, InstanceId, event])] if os.path.isfile(os.path.join(args.playbooks, ASGName + '.yml')): message.change_visibility(VisibilityTimeout=(60 * 15)) # hope config doesn't take more than 15m cmd = [ ANSIBLE_PLAYBOOK_CMD, '-i', 'inventory', '--limit', InstanceId, ASGName + '.yml'] p = Popen(cmd, cwd=args.playbooks, stdout=PIPE, stderr=PIPE) (stdoutdata, stderrdata) = p.communicate() retval = p.returncode message.change_visibility(VisibilityTimeout=60) if retval: notice += ['FAILURE CODE {}'.format(retval), stderrdata, stdoutdata] else: notice += ['SUCCESS'] message.delete() else: notice += ['no action taken: no playbook for this ASG'] message.delete() notify(notice[0], '\n'.join(notice)) def processMessage(message): '''Unpack the data we want from an SQS message.''' try: data = json.loads(json.loads(message.body)['Message']) event = data['Event'] ASGName = data['AutoScalingGroupName'] InstanceId = data['EC2InstanceId'] except: logging.debug('unparsable message %r', message.body) message.delete() else: if event == 'autoscaling:EC2_INSTANCE_LAUNCH': try: instanceState = ec2r.Instance(InstanceId).state['Name'] except: logging.debug('instance %s does not exist', InstanceId) message.change_visibility(VisibilityTimeout=60 * 2) else: if instanceState == 'running': handleEvent(message, event, ASGName, InstanceId) else: logging.debug('instance %s is in state %s, will try again', InstanceId, instanceState) message.change_visibility(VisibilityTimeout=60 * 2) else: logging.debug('nothing to do for event %r', data) message.delete() class PidFileSingleton: '''Ensure that only one instance of this specific script runs at once.''' def __init__(self): self.pidfile = os.path.join(gettempdir(), sha256(os.path.abspath(sys.argv[0])).hexdigest() + '.pid') try: fd = os.open(self.pidfile, os.O_WRONLY|os.O_CREAT|os.O_EXCL, ) except OSError as e: self.pidfile = None if e.errno == errno.EEXIST: logging.debug('An instance of this is already running.') sys.exit(0) raise e with os.fdopen(fd, 'w') as f: f.write(str(os.getpid())) def __del__(self): if self.pidfile: os.unlink(self.pidfile) parser = argparse.ArgumentParser(description='act on SQS Notifications') parser.add_argument('--profile', metavar='PROFILE', dest='profile_name', help='AWS Profile (default: current IAM Role)') parser.add_argument('--region', metavar='REGION', dest='region_name', help='AWS Region') parser.add_argument('playbooks', metavar='directory', help='path containing playbooks et al') parser.add_argument('queue', help='SQS Queue') parser.add_argument('arn', nargs='?', default=None, help='ARN of SNS topic') args = parser.parse_args() pidfile = PidFileSingleton() session = boto3.session.Session(**{k:v for k,v in vars(args).items() if k in ('profile_name', 'region_name')}) queue = session.resource('sqs').get_queue_by_name(QueueName=args.queue) topic = session.resource('sns').Topic(args.arn) if args.arn else None ec2r = session.resource('ec2') while True: # long poll until there are no more messages messages = queue.receive_messages(MaxNumberOfMessages=10, WaitTimeSeconds=20) if not len(messages): break for message in messages: processMessage(message)