X-Git-Url: http://git.squeep.com/?a=blobdiff_plain;f=sqs-action.py;fp=sqs-action.py;h=54f2eb7969cd48b4a879eeac8829dc4315d85b84;hb=164fb4ac7aebf84ca89433aeee8d16771fb8b7da;hp=0000000000000000000000000000000000000000;hpb=eff6951aff36874ec094e80849ee3095800b7e76;p=awsible diff --git a/sqs-action.py b/sqs-action.py new file mode 100755 index 0000000..54f2eb7 --- /dev/null +++ b/sqs-action.py @@ -0,0 +1,116 @@ +#!/usr/bin/env python +'''\ +Check an SQS queue for ASG lifecycle notifications of new instances, +and run the appropriate Ansible playbook against the host. +''' + +import argparse +import logging +import boto3 +import json +import sys +import os +import errno +from subprocess import Popen, PIPE +from tempfile import gettempdir +from hashlib import sha256 + + +ANSIBLE_PLAYBOOK_CMD = '/usr/local/bin/ansible-playbook' + + +def notify(subj, msg): + if topic: + u8msg = unicode(msg).encode('utf-8') + topic.publish(Subject=subj, Message=u8msg[:262144]) + else: + print(msg) + + +def handleEvent(message, event, ASGName, InstanceId): + notice = [' '.join([ASGName, InstanceId, event])] + if os.path.isfile(os.path.join(args.playbooks, ASGName + '.yml')): + message.change_visibility(VisibilityTimeout=(60 * 15)) # hope config doesn't take more than 15m + cmd = [ ANSIBLE_PLAYBOOK_CMD, '-i', 'inventory', '--limit', InstanceId, ASGName + '.yml'] + p = Popen(cmd, cwd=args.playbooks, stdout=PIPE, stderr=PIPE) + (stdoutdata, stderrdata) = p.communicate() + retval = p.returncode + message.change_visibility(VisibilityTimeout=60) + if retval: + notice += ['FAILURE CODE {}'.format(retval), stderrdata, stdoutdata] + else: + notice += ['SUCCESS'] + message.delete() + else: + notice += ['no action taken: no playbook for this ASG'] + notify(notice[0], '\n'.join(notice)) + + +def processMessage(message): + '''Unpack the data we want from an SQS message.''' + try: + data = json.loads(json.loads(message.body)['Message']) + event = data['Event'] + ASGName = data['AutoScalingGroupName'] + InstanceId = data['EC2InstanceId'] + except: + logging.debug('unparsable message %r', message.body) + message.delete() + else: + if event == 'autoscaling:EC2_INSTANCE_LAUNCH': + try: + instanceState = ec2r.Instance(InstanceId).state['Name'] + except: + logging.debug('instance %s does not exist', InstanceId) + message.delete() + else: + if instanceState == 'running': + handleEvent(message, event, ASGName, InstanceId) + else: + logging.debug('instance %s is in state %s, will try again', InstanceId, instanceState) + else: + logging.debug('nothing to do for event %r', data) + message.delete() + + +class PidFileSingleton: + '''Ensure that only one instance of this specific script runs at once.''' + def __init__(self): + self.pidfile = os.path.join(gettempdir(), sha256(os.path.abspath(sys.argv[0])).hexdigest() + '.pid') + try: + fd = os.open(self.pidfile, os.O_WRONLY|os.O_CREAT|os.O_EXCL, ) + except OSError as e: + self.pidfile = None + if e.errno == errno.EEXIST: + logging.debug('An instance of this is already running.') + sys.exit(0) + raise e + with os.fdopen(fd, 'w') as f: + f.write(str(os.getpid())) + def __del__(self): + if self.pidfile: + os.unlink(self.pidfile) + + +parser = argparse.ArgumentParser(description='act on SQS Notifications') +parser.add_argument('--profile', metavar='PROFILE', dest='profile_name', help='AWS Profile (default: current IAM Role)') +parser.add_argument('--region', metavar='REGION', dest='region_name', help='AWS Region') +parser.add_argument('playbooks', metavar='directory', help='path containing playbooks et al') +parser.add_argument('queue', help='SQS Queue') +parser.add_argument('arn', nargs='?', default=None, help='ARN of SNS topic') +args = parser.parse_args() + +pidfile = PidFileSingleton() + +session = boto3.session.Session(**{k:v for k,v in vars(args).items() if k in ('profile_name', 'region_name')}) +queue = session.resource('sqs').get_queue_by_name(QueueName=args.queue) +topic = session.resource('sns').Topic(args.arn) if args.arn else None +ec2r = session.resource('ec2') + +while True: + # long poll until there are no more messages + messages = queue.receive_messages(MaxNumberOfMessages=10, WaitTimeSeconds=20) + if not len(messages): + break + for message in messages: + processMessage(message)