3 Check an SQS queue for ASG lifecycle notifications of new instances,
4 and run the appropriate Ansible playbook against the host.
14 from subprocess
import Popen
, PIPE
15 from tempfile
import gettempdir
16 from hashlib
import sha256
19 ANSIBLE_PLAYBOOK_CMD
= '/usr/local/bin/ansible-playbook'
22 def notify(subj
, msg
):
24 u8msg
= unicode(msg
).encode('utf-8')
25 topic
.publish(Subject
=subj
, Message
=u8msg
[:262144])
30 def handleEvent(message
, event
, ASGName
, InstanceId
):
31 notice
= [' '.join([ASGName
, InstanceId
, event
])]
32 if os
.path
.isfile(os
.path
.join(args
.playbooks
, ASGName
+ '.yml')):
33 message
.change_visibility(VisibilityTimeout
=(60 * 15)) # hope config doesn't take more than 15m
34 cmd
= [ ANSIBLE_PLAYBOOK_CMD
, '-i', 'inventory', '--limit', InstanceId
, ASGName
+ '.yml']
35 p
= Popen(cmd
, cwd
=args
.playbooks
, stdout
=PIPE
, stderr
=PIPE
)
36 (stdoutdata
, stderrdata
) = p
.communicate()
38 message
.change_visibility(VisibilityTimeout
=60)
40 notice
+= ['FAILURE CODE {}'.format(retval
), stderrdata
, stdoutdata
]
45 notice
+= ['no action taken: no playbook for this ASG']
46 notify(notice
[0], '\n'.join(notice
))
49 def processMessage(message
):
50 '''Unpack the data we want from an SQS message.'''
52 data
= json
.loads(json
.loads(message
.body
)['Message'])
54 ASGName
= data
['AutoScalingGroupName']
55 InstanceId
= data
['EC2InstanceId']
57 logging
.debug('unparsable message %r', message
.body
)
60 if event
== 'autoscaling:EC2_INSTANCE_LAUNCH':
62 instanceState
= ec2r
.Instance(InstanceId
).state
['Name']
64 logging
.debug('instance %s does not exist', InstanceId
)
65 message
.change_visibility(VisibilityTimeout
=60 * 2)
67 if instanceState
== 'running':
68 handleEvent(message
, event
, ASGName
, InstanceId
)
70 logging
.debug('instance %s is in state %s, will try again', InstanceId
, instanceState
)
71 message
.change_visibility(VisibilityTimeout
=60 * 2)
73 logging
.debug('nothing to do for event %r', data
)
77 class PidFileSingleton
:
78 '''Ensure that only one instance of this specific script runs at once.'''
80 self
.pidfile
= os
.path
.join(gettempdir(), sha256(os
.path
.abspath(sys
.argv
[0])).hexdigest() + '.pid')
82 fd
= os
.open(self
.pidfile
, os
.O_WRONLY|os
.O_CREAT|os
.O_EXCL
, )
85 if e
.errno
== errno
.EEXIST
:
86 logging
.debug('An instance of this is already running.')
89 with os
.fdopen(fd
, 'w') as f
:
90 f
.write(str(os
.getpid()))
93 os
.unlink(self
.pidfile
)
96 parser
= argparse
.ArgumentParser(description
='act on SQS Notifications')
97 parser
.add_argument('--profile', metavar
='PROFILE', dest
='profile_name', help='AWS Profile (default: current IAM Role)')
98 parser
.add_argument('--region', metavar
='REGION', dest
='region_name', help='AWS Region')
99 parser
.add_argument('playbooks', metavar
='directory', help='path containing playbooks et al')
100 parser
.add_argument('queue', help='SQS Queue')
101 parser
.add_argument('arn', nargs
='?', default
=None, help='ARN of SNS topic')
102 args
= parser
.parse_args()
104 pidfile
= PidFileSingleton()
106 session
= boto3
.session
.Session(**{k:v for k,v in vars(args).items() if k in ('profile_name', 'region_name')}
)
107 queue
= session
.resource('sqs').get_queue_by_name(QueueName
=args
.queue
)
108 topic
= session
.resource('sns').Topic(args
.arn
) if args
.arn
else None
109 ec2r
= session
.resource('ec2')
112 # long poll until there are no more messages
113 messages
= queue
.receive_messages(MaxNumberOfMessages
=10, WaitTimeSeconds
=20)
114 if not len(messages
):
116 for message
in messages
:
117 processMessage(message
)