add a basic cw alarm
[awsible] / sqs-action.py
1 #!/usr/bin/env python
2 '''\
3 Check an SQS queue for ASG lifecycle notifications of new instances,
4 and run the appropriate Ansible playbook against the host.
5 '''
6
7 import argparse
8 import logging
9 import boto3
10 import botocore.exceptions
11 import json
12 import sys
13 import os
14 import time
15 import errno
16 from subprocess import Popen, PIPE
17 from tempfile import gettempdir
18 from hashlib import sha256
19
20
21 ANSIBLE_PLAYBOOK_CMD = '/usr/local/bin/ansible-playbook'
22
23
24 def notify(subj, msg):
25 if topic:
26 u8msg = unicode(msg).encode('utf-8')
27 topic.publish(Subject=subj, Message=u8msg[:262144])
28 else:
29 print(msg)
30
31
32 def handleEvent(message, event, ASGName, InstanceId):
33 notice = [' '.join([ASGName, InstanceId, event])]
34 postnotice = []
35 if os.path.isfile(os.path.join(args.playbooks, ASGName + '.yml')):
36 message.change_visibility(VisibilityTimeout=(60 * 15)) # hope config doesn't take more than 15m
37 cmd = [ANSIBLE_PLAYBOOK_CMD, '--limit', InstanceId, ASGName + '.yml']
38 p = Popen(cmd, cwd=args.playbooks, stdout=PIPE, stderr=PIPE)
39 (stdoutdata, stderrdata) = p.communicate()
40 retval = p.returncode
41 message.change_visibility(VisibilityTimeout=60)
42 if retval:
43 notice += ['FAILURE CODE {}'.format(retval), stderrdata, stdoutdata]
44 else:
45 notice += ['SUCCESS']
46 message.delete()
47 if os.path.isfile(os.path.join(args.playbooks, ASGName + '-post.yml')):
48 postnotice = [' '.join([ASGName, 'post', event])]
49 cmd = [ANSIBLE_PLAYBOOK_CMD, ASGName + '-post.yml']
50 p = Popen(cmd, cwd=args.playbooks, stdout=PIPE, stderr=PIPE)
51 (stdoutdata, stderrdata) = p.communicate()
52 retval = p.returncode
53 if retval:
54 postnotice += ['FAILURE CODE {}'.format(retval), stderrdata, stdoutdata]
55 else:
56 postnotice += ['SUCCESS']
57
58 else:
59 notice += ['no action taken: no playbook for this ASG']
60 message.delete()
61 notify(notice[0], '\n'.join(notice))
62 if len(postnotice):
63 notify(postnotice[0], '\n'.join(postnotice))
64
65
66 def processMessage(message):
67 '''Unpack the data we want from an SQS message.'''
68 try:
69 data = json.loads(json.loads(message.body)['Message'])
70 event = data['Event']
71 ASGName = data['AutoScalingGroupName']
72 InstanceId = data['EC2InstanceId']
73 except:
74 logging.warning('unparsable message %r', message.body)
75 message.delete()
76 else:
77 if event == 'autoscaling:EC2_INSTANCE_LAUNCH':
78 try:
79 instanceState = ec2r.Instance(InstanceId).state['Name']
80 except:
81 logging.warning('instance %s does not exist', InstanceId)
82 message.change_visibility(VisibilityTimeout=60 * 2)
83 else:
84 if instanceState == 'running':
85 handleEvent(message, event, ASGName, InstanceId)
86 else:
87 logging.warning('instance %s is in state %s, will try again', InstanceId, instanceState)
88 message.change_visibility(VisibilityTimeout=60 * 2)
89 else:
90 logging.warning('nothing to do for event %r', data)
91 message.delete()
92
93
94 class PidFileSingleton:
95 '''Ensure that only one instance of this specific script runs at once.'''
96 def __init__(self):
97 self.pidfile = os.path.join(gettempdir(), sha256(os.path.abspath(sys.argv[0])).hexdigest() + '.pid')
98 try:
99 fd = os.open(self.pidfile, os.O_WRONLY|os.O_CREAT|os.O_EXCL, )
100 except OSError as e:
101 self.pidfile = None
102 if e.errno == errno.EEXIST:
103 logging.debug('An instance of this is already running.')
104 sys.exit(0)
105 raise e
106 with os.fdopen(fd, 'w') as f:
107 f.write(str(os.getpid()))
108 def __del__(self):
109 if self.pidfile:
110 os.unlink(self.pidfile)
111
112
113 parser = argparse.ArgumentParser(description='act on SQS Notifications')
114 parser.add_argument('--profile', metavar='PROFILE', dest='profile_name', help='AWS Profile (default: current IAM Role)')
115 parser.add_argument('--region', metavar='REGION', dest='region_name', help='AWS Region')
116 parser.add_argument('playbooks', metavar='directory', help='path containing playbooks et al')
117 parser.add_argument('queue', help='SQS Queue')
118 parser.add_argument('arn', nargs='?', default=None, help='ARN of SNS topic')
119 args = parser.parse_args()
120
121 pidfile = PidFileSingleton()
122
123 # occasionally, small instances seem to briefly lose their iam credentials
124 sessionTriesRemaining = 3
125 while sessionTriesRemaining:
126 try:
127 session = boto3.session.Session(**{k:v for k,v in vars(args).items() if k in ('profile_name', 'region_name')})
128 queue = session.resource('sqs').get_queue_by_name(QueueName=args.queue)
129 topic = session.resource('sns').Topic(args.arn) if args.arn else None
130 ec2r = session.resource('ec2')
131 except botocore.exceptions.NoCredentialsError as e:
132 logging.debug('Trouble with credentials, will retry %s more times.', sessionTriesRemaining)
133 sessionTriesRemaining -= 1
134 time.sleep(5)
135 continue
136 break
137 if sessionTriesRemaining == 0:
138 logging.error('Failed trying to use IAM credentials.')
139 sys.exit(1)
140
141 while True:
142 # long poll until there are no more messages
143 messages = queue.receive_messages(MaxNumberOfMessages=10, WaitTimeSeconds=20)
144 if not len(messages):
145 break
146 for message in messages:
147 processMessage(message)