add initial vpc buildout role
[awsible] / sqs-action.py
1 #!/usr/bin/env python
2 '''\
3 Check an SQS queue for ASG lifecycle notifications of new instances,
4 and run the appropriate Ansible playbook against the host.
5 '''
6
7 import argparse
8 import logging
9 import boto3
10 import json
11 import sys
12 import os
13 import errno
14 from subprocess import Popen, PIPE
15 from tempfile import gettempdir
16 from hashlib import sha256
17
18
19 ANSIBLE_PLAYBOOK_CMD = '/usr/local/bin/ansible-playbook'
20
21
22 def notify(subj, msg):
23 if topic:
24 u8msg = unicode(msg).encode('utf-8')
25 topic.publish(Subject=subj, Message=u8msg[:262144])
26 else:
27 print(msg)
28
29
30 def handleEvent(message, event, ASGName, InstanceId):
31 notice = [' '.join([ASGName, InstanceId, event])]
32 if os.path.isfile(os.path.join(args.playbooks, ASGName + '.yml')):
33 message.change_visibility(VisibilityTimeout=(60 * 15)) # hope config doesn't take more than 15m
34 cmd = [ ANSIBLE_PLAYBOOK_CMD, '-i', 'inventory', '--limit', InstanceId, ASGName + '.yml']
35 p = Popen(cmd, cwd=args.playbooks, stdout=PIPE, stderr=PIPE)
36 (stdoutdata, stderrdata) = p.communicate()
37 retval = p.returncode
38 message.change_visibility(VisibilityTimeout=60)
39 if retval:
40 notice += ['FAILURE CODE {}'.format(retval), stderrdata, stdoutdata]
41 else:
42 notice += ['SUCCESS']
43 message.delete()
44 else:
45 notice += ['no action taken: no playbook for this ASG']
46 notify(notice[0], '\n'.join(notice))
47
48
49 def processMessage(message):
50 '''Unpack the data we want from an SQS message.'''
51 try:
52 data = json.loads(json.loads(message.body)['Message'])
53 event = data['Event']
54 ASGName = data['AutoScalingGroupName']
55 InstanceId = data['EC2InstanceId']
56 except:
57 logging.debug('unparsable message %r', message.body)
58 message.delete()
59 else:
60 if event == 'autoscaling:EC2_INSTANCE_LAUNCH':
61 try:
62 instanceState = ec2r.Instance(InstanceId).state['Name']
63 except:
64 logging.debug('instance %s does not exist', InstanceId)
65 message.change_visibility(VisibilityTimeout=60 * 2)
66 else:
67 if instanceState == 'running':
68 handleEvent(message, event, ASGName, InstanceId)
69 else:
70 logging.debug('instance %s is in state %s, will try again', InstanceId, instanceState)
71 message.change_visibility(VisibilityTimeout=60 * 2)
72 else:
73 logging.debug('nothing to do for event %r', data)
74 message.delete()
75
76
77 class PidFileSingleton:
78 '''Ensure that only one instance of this specific script runs at once.'''
79 def __init__(self):
80 self.pidfile = os.path.join(gettempdir(), sha256(os.path.abspath(sys.argv[0])).hexdigest() + '.pid')
81 try:
82 fd = os.open(self.pidfile, os.O_WRONLY|os.O_CREAT|os.O_EXCL, )
83 except OSError as e:
84 self.pidfile = None
85 if e.errno == errno.EEXIST:
86 logging.debug('An instance of this is already running.')
87 sys.exit(0)
88 raise e
89 with os.fdopen(fd, 'w') as f:
90 f.write(str(os.getpid()))
91 def __del__(self):
92 if self.pidfile:
93 os.unlink(self.pidfile)
94
95
96 parser = argparse.ArgumentParser(description='act on SQS Notifications')
97 parser.add_argument('--profile', metavar='PROFILE', dest='profile_name', help='AWS Profile (default: current IAM Role)')
98 parser.add_argument('--region', metavar='REGION', dest='region_name', help='AWS Region')
99 parser.add_argument('playbooks', metavar='directory', help='path containing playbooks et al')
100 parser.add_argument('queue', help='SQS Queue')
101 parser.add_argument('arn', nargs='?', default=None, help='ARN of SNS topic')
102 args = parser.parse_args()
103
104 pidfile = PidFileSingleton()
105
106 session = boto3.session.Session(**{k:v for k,v in vars(args).items() if k in ('profile_name', 'region_name')})
107 queue = session.resource('sqs').get_queue_by_name(QueueName=args.queue)
108 topic = session.resource('sns').Topic(args.arn) if args.arn else None
109 ec2r = session.resource('ec2')
110
111 while True:
112 # long poll until there are no more messages
113 messages = queue.receive_messages(MaxNumberOfMessages=10, WaitTimeSeconds=20)
114 if not len(messages):
115 break
116 for message in messages:
117 processMessage(message)