3 Check an SQS queue for ASG lifecycle notifications of new instances,
4 and run the appropriate Ansible playbook against the host.
10 import botocore
.exceptions
16 from subprocess
import Popen
, PIPE
17 from tempfile
import gettempdir
18 from hashlib
import sha256
21 ANSIBLE_PLAYBOOK_CMD
= '/usr/local/bin/ansible-playbook'
24 def notify(subj
, msg
):
26 u8msg
= unicode(msg
).encode('utf-8')
27 topic
.publish(Subject
=subj
, Message
=u8msg
[:262144])
32 def handleEvent(message
, event
, ASGName
, InstanceId
):
33 notice
= [' '.join([ASGName
, InstanceId
, event
])]
35 if os
.path
.isfile(os
.path
.join(args
.playbooks
, ASGName
+ '.yml')):
36 message
.change_visibility(VisibilityTimeout
=(60 * 15)) # hope config doesn't take more than 15m
37 cmd
= [ANSIBLE_PLAYBOOK_CMD
, '--limit', InstanceId
, ASGName
+ '.yml']
38 p
= Popen(cmd
, cwd
=args
.playbooks
, stdout
=PIPE
, stderr
=PIPE
)
39 (stdoutdata
, stderrdata
) = p
.communicate()
41 message
.change_visibility(VisibilityTimeout
=60)
43 notice
+= ['FAILURE CODE {}'.format(retval
), stderrdata
, stdoutdata
]
47 if os
.path
.isfile(os
.path
.join(args
.playbooks
, ASGName
+ '-post.yml')):
48 postnotice
= [' '.join(ASGName
, 'post', event
)]
49 cmd
= [ANSIBLE_PLAYBOOK_CMD
, ASGName
+ '-post.yml']
50 p
= Popen(cmd
, cwd
=args
.playbooks
, stdout
=PIPE
, stderr
=PIPE
)
51 (stdoutdata
, stderrdata
) = p
.communicate()
54 postnotice
+= ['FAILURE CODE {}'.format(retval
), stderrdata
, stdoutdata
]
56 postnotice
+= ['SUCCESS']
59 notice
+= ['no action taken: no playbook for this ASG']
61 notify(notice
[0], '\n'.join(notice
))
63 notify(postnotice
[0], '\n'.join(postnotice
))
66 def processMessage(message
):
67 '''Unpack the data we want from an SQS message.'''
69 data
= json
.loads(json
.loads(message
.body
)['Message'])
71 ASGName
= data
['AutoScalingGroupName']
72 InstanceId
= data
['EC2InstanceId']
74 logging
.warning('unparsable message %r', message
.body
)
77 if event
== 'autoscaling:EC2_INSTANCE_LAUNCH':
79 instanceState
= ec2r
.Instance(InstanceId
).state
['Name']
81 logging
.warning('instance %s does not exist', InstanceId
)
82 message
.change_visibility(VisibilityTimeout
=60 * 2)
84 if instanceState
== 'running':
85 handleEvent(message
, event
, ASGName
, InstanceId
)
87 logging
.warning('instance %s is in state %s, will try again', InstanceId
, instanceState
)
88 message
.change_visibility(VisibilityTimeout
=60 * 2)
90 logging
.warning('nothing to do for event %r', data
)
94 class PidFileSingleton
:
95 '''Ensure that only one instance of this specific script runs at once.'''
97 self
.pidfile
= os
.path
.join(gettempdir(), sha256(os
.path
.abspath(sys
.argv
[0])).hexdigest() + '.pid')
99 fd
= os
.open(self
.pidfile
, os
.O_WRONLY|os
.O_CREAT|os
.O_EXCL
, )
102 if e
.errno
== errno
.EEXIST
:
103 logging
.debug('An instance of this is already running.')
106 with os
.fdopen(fd
, 'w') as f
:
107 f
.write(str(os
.getpid()))
110 os
.unlink(self
.pidfile
)
113 parser
= argparse
.ArgumentParser(description
='act on SQS Notifications')
114 parser
.add_argument('--profile', metavar
='PROFILE', dest
='profile_name', help='AWS Profile (default: current IAM Role)')
115 parser
.add_argument('--region', metavar
='REGION', dest
='region_name', help='AWS Region')
116 parser
.add_argument('playbooks', metavar
='directory', help='path containing playbooks et al')
117 parser
.add_argument('queue', help='SQS Queue')
118 parser
.add_argument('arn', nargs
='?', default
=None, help='ARN of SNS topic')
119 args
= parser
.parse_args()
121 pidfile
= PidFileSingleton()
123 # occasionally, small instances seem to briefly lose their iam credentials
124 sessionTriesRemaining
= 3
125 while sessionTriesRemaining
:
127 session
= boto3
.session
.Session(**{k:v for k,v in vars(args).items() if k in ('profile_name', 'region_name')}
)
128 queue
= session
.resource('sqs').get_queue_by_name(QueueName
=args
.queue
)
129 topic
= session
.resource('sns').Topic(args
.arn
) if args
.arn
else None
130 ec2r
= session
.resource('ec2')
131 except botocore
.exceptions
.NoCredentialsError
as e
:
132 logging
.debug('Trouble with credentials, will retry %s more times.', sessionTriesRemaining
)
133 sessionTriesRemaining
-= 1
137 if sessionTriesRemaining
== 0:
138 logging
.error('Failed trying to use IAM credentials.')
142 # long poll until there are no more messages
143 messages
= queue
.receive_messages(MaxNumberOfMessages
=10, WaitTimeSeconds
=20)
144 if not len(messages
):
146 for message
in messages
:
147 processMessage(message
)