+#!/usr/bin/env python
+'''\
+Check an SQS queue for ASG lifecycle notifications of new instances,
+and run the appropriate Ansible playbook against the host.
+'''
+
+import argparse
+import logging
+import boto3
+import json
+import sys
+import os
+import errno
+from subprocess import Popen, PIPE
+from tempfile import gettempdir
+from hashlib import sha256
+
+
+ANSIBLE_PLAYBOOK_CMD = '/usr/local/bin/ansible-playbook'
+
+
+def notify(subj, msg):
+ if topic:
+ u8msg = unicode(msg).encode('utf-8')
+ topic.publish(Subject=subj, Message=u8msg[:262144])
+ else:
+ print(msg)
+
+
+def handleEvent(message, event, ASGName, InstanceId):
+ notice = [' '.join([ASGName, InstanceId, event])]
+ if os.path.isfile(os.path.join(args.playbooks, ASGName + '.yml')):
+ message.change_visibility(VisibilityTimeout=(60 * 15)) # hope config doesn't take more than 15m
+ cmd = [ ANSIBLE_PLAYBOOK_CMD, '-i', 'inventory', '--limit', InstanceId, ASGName + '.yml']
+ p = Popen(cmd, cwd=args.playbooks, stdout=PIPE, stderr=PIPE)
+ (stdoutdata, stderrdata) = p.communicate()
+ retval = p.returncode
+ message.change_visibility(VisibilityTimeout=60)
+ if retval:
+ notice += ['FAILURE CODE {}'.format(retval), stderrdata, stdoutdata]
+ else:
+ notice += ['SUCCESS']
+ message.delete()
+ else:
+ notice += ['no action taken: no playbook for this ASG']
+ notify(notice[0], '\n'.join(notice))
+
+
+def processMessage(message):
+ '''Unpack the data we want from an SQS message.'''
+ try:
+ data = json.loads(json.loads(message.body)['Message'])
+ event = data['Event']
+ ASGName = data['AutoScalingGroupName']
+ InstanceId = data['EC2InstanceId']
+ except:
+ logging.debug('unparsable message %r', message.body)
+ message.delete()
+ else:
+ if event == 'autoscaling:EC2_INSTANCE_LAUNCH':
+ try:
+ instanceState = ec2r.Instance(InstanceId).state['Name']
+ except:
+ logging.debug('instance %s does not exist', InstanceId)
+ message.delete()
+ else:
+ if instanceState == 'running':
+ handleEvent(message, event, ASGName, InstanceId)
+ else:
+ logging.debug('instance %s is in state %s, will try again', InstanceId, instanceState)
+ else:
+ logging.debug('nothing to do for event %r', data)
+ message.delete()
+
+
+class PidFileSingleton:
+ '''Ensure that only one instance of this specific script runs at once.'''
+ def __init__(self):
+ self.pidfile = os.path.join(gettempdir(), sha256(os.path.abspath(sys.argv[0])).hexdigest() + '.pid')
+ try:
+ fd = os.open(self.pidfile, os.O_WRONLY|os.O_CREAT|os.O_EXCL, )
+ except OSError as e:
+ self.pidfile = None
+ if e.errno == errno.EEXIST:
+ logging.debug('An instance of this is already running.')
+ sys.exit(0)
+ raise e
+ with os.fdopen(fd, 'w') as f:
+ f.write(str(os.getpid()))
+ def __del__(self):
+ if self.pidfile:
+ os.unlink(self.pidfile)
+
+
+parser = argparse.ArgumentParser(description='act on SQS Notifications')
+parser.add_argument('--profile', metavar='PROFILE', dest='profile_name', help='AWS Profile (default: current IAM Role)')
+parser.add_argument('--region', metavar='REGION', dest='region_name', help='AWS Region')
+parser.add_argument('playbooks', metavar='directory', help='path containing playbooks et al')
+parser.add_argument('queue', help='SQS Queue')
+parser.add_argument('arn', nargs='?', default=None, help='ARN of SNS topic')
+args = parser.parse_args()
+
+pidfile = PidFileSingleton()
+
+session = boto3.session.Session(**{k:v for k,v in vars(args).items() if k in ('profile_name', 'region_name')})
+queue = session.resource('sqs').get_queue_by_name(QueueName=args.queue)
+topic = session.resource('sns').Topic(args.arn) if args.arn else None
+ec2r = session.resource('ec2')
+
+while True:
+ # long poll until there are no more messages
+ messages = queue.receive_messages(MaxNumberOfMessages=10, WaitTimeSeconds=20)
+ if not len(messages):
+ break
+ for message in messages:
+ processMessage(message)