initial import
[awsible] / sqs-action.py
diff --git a/sqs-action.py b/sqs-action.py
new file mode 100755 (executable)
index 0000000..54f2eb7
--- /dev/null
@@ -0,0 +1,116 @@
+#!/usr/bin/env python
+'''\
+Check an SQS queue for ASG lifecycle notifications of new instances,
+and run the appropriate Ansible playbook against the host.
+'''
+
+import argparse
+import logging
+import boto3
+import json
+import sys
+import os
+import errno
+from subprocess import Popen, PIPE
+from tempfile import gettempdir
+from hashlib import sha256
+
+
+ANSIBLE_PLAYBOOK_CMD = '/usr/local/bin/ansible-playbook'
+
+
+def notify(subj, msg):
+    if topic:
+        u8msg = unicode(msg).encode('utf-8')
+        topic.publish(Subject=subj, Message=u8msg[:262144])
+    else:
+        print(msg)
+
+
+def handleEvent(message, event, ASGName, InstanceId):
+    notice = [' '.join([ASGName, InstanceId, event])]
+    if os.path.isfile(os.path.join(args.playbooks, ASGName + '.yml')):
+        message.change_visibility(VisibilityTimeout=(60 * 15)) # hope config doesn't take more than 15m
+        cmd = [ ANSIBLE_PLAYBOOK_CMD, '-i', 'inventory', '--limit', InstanceId, ASGName + '.yml']
+        p = Popen(cmd, cwd=args.playbooks, stdout=PIPE, stderr=PIPE)
+        (stdoutdata, stderrdata) = p.communicate()
+        retval = p.returncode
+        message.change_visibility(VisibilityTimeout=60)
+        if retval:
+            notice += ['FAILURE CODE {}'.format(retval), stderrdata, stdoutdata]
+        else:
+            notice += ['SUCCESS']
+            message.delete()
+    else:
+        notice += ['no action taken: no playbook for this ASG']
+    notify(notice[0], '\n'.join(notice))
+
+
+def processMessage(message):
+    '''Unpack the data we want from an SQS message.'''
+    try:
+        data = json.loads(json.loads(message.body)['Message'])
+        event = data['Event']
+        ASGName = data['AutoScalingGroupName']
+        InstanceId = data['EC2InstanceId']
+    except:
+        logging.debug('unparsable message %r', message.body)
+        message.delete()
+    else:
+        if event == 'autoscaling:EC2_INSTANCE_LAUNCH':
+            try:
+                instanceState = ec2r.Instance(InstanceId).state['Name']
+            except:
+                logging.debug('instance %s does not exist', InstanceId)
+                message.delete()
+            else:
+                if instanceState == 'running':
+                    handleEvent(message, event, ASGName, InstanceId)
+                else:
+                    logging.debug('instance %s is in state %s, will try again', InstanceId, instanceState)
+        else:
+            logging.debug('nothing to do for event %r', data)
+            message.delete()
+
+
+class PidFileSingleton:
+    '''Ensure that only one instance of this specific script runs at once.'''
+    def __init__(self):
+        self.pidfile = os.path.join(gettempdir(), sha256(os.path.abspath(sys.argv[0])).hexdigest() + '.pid')
+        try:
+            fd = os.open(self.pidfile, os.O_WRONLY|os.O_CREAT|os.O_EXCL, )
+        except OSError as e:
+            self.pidfile = None
+            if e.errno == errno.EEXIST:
+                logging.debug('An instance of this is already running.')
+                sys.exit(0)
+            raise e
+        with os.fdopen(fd, 'w') as f:
+            f.write(str(os.getpid()))
+    def __del__(self):
+        if self.pidfile:
+            os.unlink(self.pidfile)
+
+
+parser = argparse.ArgumentParser(description='act on SQS Notifications')
+parser.add_argument('--profile', metavar='PROFILE', dest='profile_name', help='AWS Profile (default: current IAM Role)')
+parser.add_argument('--region', metavar='REGION', dest='region_name', help='AWS Region')
+parser.add_argument('playbooks', metavar='directory', help='path containing playbooks et al')
+parser.add_argument('queue', help='SQS Queue')
+parser.add_argument('arn', nargs='?', default=None, help='ARN of SNS topic')
+args = parser.parse_args()
+
+pidfile = PidFileSingleton()
+
+session = boto3.session.Session(**{k:v for k,v in vars(args).items() if k in ('profile_name', 'region_name')})
+queue = session.resource('sqs').get_queue_by_name(QueueName=args.queue)
+topic = session.resource('sns').Topic(args.arn) if args.arn else None
+ec2r = session.resource('ec2')
+
+while True:
+    # long poll until there are no more messages
+    messages = queue.receive_messages(MaxNumberOfMessages=10, WaitTimeSeconds=20)
+    if not len(messages):
+        break
+    for message in messages:
+        processMessage(message)