initial import
[awsible] / sqs-action.py
1 #!/usr/bin/env python
2 '''\
3 Check an SQS queue for ASG lifecycle notifications of new instances,
4 and run the appropriate Ansible playbook against the host.
5 '''
6
7 import argparse
8 import logging
9 import boto3
10 import json
11 import sys
12 import os
13 import errno
14 from subprocess import Popen, PIPE
15 from tempfile import gettempdir
16 from hashlib import sha256
17
18
19 ANSIBLE_PLAYBOOK_CMD = '/usr/local/bin/ansible-playbook'
20
21
22 def notify(subj, msg):
23 if topic:
24 u8msg = unicode(msg).encode('utf-8')
25 topic.publish(Subject=subj, Message=u8msg[:262144])
26 else:
27 print(msg)
28
29
30 def handleEvent(message, event, ASGName, InstanceId):
31 notice = [' '.join([ASGName, InstanceId, event])]
32 if os.path.isfile(os.path.join(args.playbooks, ASGName + '.yml')):
33 message.change_visibility(VisibilityTimeout=(60 * 15)) # hope config doesn't take more than 15m
34 cmd = [ ANSIBLE_PLAYBOOK_CMD, '-i', 'inventory', '--limit', InstanceId, ASGName + '.yml']
35 p = Popen(cmd, cwd=args.playbooks, stdout=PIPE, stderr=PIPE)
36 (stdoutdata, stderrdata) = p.communicate()
37 retval = p.returncode
38 message.change_visibility(VisibilityTimeout=60)
39 if retval:
40 notice += ['FAILURE CODE {}'.format(retval), stderrdata, stdoutdata]
41 else:
42 notice += ['SUCCESS']
43 message.delete()
44 else:
45 notice += ['no action taken: no playbook for this ASG']
46 notify(notice[0], '\n'.join(notice))
47
48
49 def processMessage(message):
50 '''Unpack the data we want from an SQS message.'''
51 try:
52 data = json.loads(json.loads(message.body)['Message'])
53 event = data['Event']
54 ASGName = data['AutoScalingGroupName']
55 InstanceId = data['EC2InstanceId']
56 except:
57 logging.debug('unparsable message %r', message.body)
58 message.delete()
59 else:
60 if event == 'autoscaling:EC2_INSTANCE_LAUNCH':
61 try:
62 instanceState = ec2r.Instance(InstanceId).state['Name']
63 except:
64 logging.debug('instance %s does not exist', InstanceId)
65 message.delete()
66 else:
67 if instanceState == 'running':
68 handleEvent(message, event, ASGName, InstanceId)
69 else:
70 logging.debug('instance %s is in state %s, will try again', InstanceId, instanceState)
71 else:
72 logging.debug('nothing to do for event %r', data)
73 message.delete()
74
75
76 class PidFileSingleton:
77 '''Ensure that only one instance of this specific script runs at once.'''
78 def __init__(self):
79 self.pidfile = os.path.join(gettempdir(), sha256(os.path.abspath(sys.argv[0])).hexdigest() + '.pid')
80 try:
81 fd = os.open(self.pidfile, os.O_WRONLY|os.O_CREAT|os.O_EXCL, )
82 except OSError as e:
83 self.pidfile = None
84 if e.errno == errno.EEXIST:
85 logging.debug('An instance of this is already running.')
86 sys.exit(0)
87 raise e
88 with os.fdopen(fd, 'w') as f:
89 f.write(str(os.getpid()))
90 def __del__(self):
91 if self.pidfile:
92 os.unlink(self.pidfile)
93
94
95 parser = argparse.ArgumentParser(description='act on SQS Notifications')
96 parser.add_argument('--profile', metavar='PROFILE', dest='profile_name', help='AWS Profile (default: current IAM Role)')
97 parser.add_argument('--region', metavar='REGION', dest='region_name', help='AWS Region')
98 parser.add_argument('playbooks', metavar='directory', help='path containing playbooks et al')
99 parser.add_argument('queue', help='SQS Queue')
100 parser.add_argument('arn', nargs='?', default=None, help='ARN of SNS topic')
101 args = parser.parse_args()
102
103 pidfile = PidFileSingleton()
104
105 session = boto3.session.Session(**{k:v for k,v in vars(args).items() if k in ('profile_name', 'region_name')})
106 queue = session.resource('sqs').get_queue_by_name(QueueName=args.queue)
107 topic = session.resource('sns').Topic(args.arn) if args.arn else None
108 ec2r = session.resource('ec2')
109
110 while True:
111 # long poll until there are no more messages
112 messages = queue.receive_messages(MaxNumberOfMessages=10, WaitTimeSeconds=20)
113 if not len(messages):
114 break
115 for message in messages:
116 processMessage(message)