3 Check an SQS queue for ASG lifecycle notifications of new instances,
4 and run the appropriate Ansible playbook against the host.
14 from subprocess
import Popen
, PIPE
15 from tempfile
import gettempdir
16 from hashlib
import sha256
19 ANSIBLE_PLAYBOOK_CMD
= '/usr/local/bin/ansible-playbook'
22 def notify(subj
, msg
):
24 u8msg
= unicode(msg
).encode('utf-8')
25 topic
.publish(Subject
=subj
, Message
=u8msg
[:262144])
30 def handleEvent(message
, event
, ASGName
, InstanceId
):
31 notice
= [' '.join([ASGName
, InstanceId
, event
])]
32 if os
.path
.isfile(os
.path
.join(args
.playbooks
, ASGName
+ '.yml')):
33 message
.change_visibility(VisibilityTimeout
=(60 * 15)) # hope config doesn't take more than 15m
34 cmd
= [ ANSIBLE_PLAYBOOK_CMD
, '-i', 'inventory', '--limit', InstanceId
, ASGName
+ '.yml']
35 p
= Popen(cmd
, cwd
=args
.playbooks
, stdout
=PIPE
, stderr
=PIPE
)
36 (stdoutdata
, stderrdata
) = p
.communicate()
38 message
.change_visibility(VisibilityTimeout
=60)
40 notice
+= ['FAILURE CODE {}'.format(retval
), stderrdata
, stdoutdata
]
45 notice
+= ['no action taken: no playbook for this ASG']
47 notify(notice
[0], '\n'.join(notice
))
50 def processMessage(message
):
51 '''Unpack the data we want from an SQS message.'''
53 data
= json
.loads(json
.loads(message
.body
)['Message'])
55 ASGName
= data
['AutoScalingGroupName']
56 InstanceId
= data
['EC2InstanceId']
58 logging
.warning('unparsable message %r', message
.body
)
61 if event
== 'autoscaling:EC2_INSTANCE_LAUNCH':
63 instanceState
= ec2r
.Instance(InstanceId
).state
['Name']
65 logging
.warning('instance %s does not exist', InstanceId
)
66 message
.change_visibility(VisibilityTimeout
=60 * 2)
68 if instanceState
== 'running':
69 handleEvent(message
, event
, ASGName
, InstanceId
)
71 logging
.warning('instance %s is in state %s, will try again', InstanceId
, instanceState
)
72 message
.change_visibility(VisibilityTimeout
=60 * 2)
74 logging
.warning('nothing to do for event %r', data
)
78 class PidFileSingleton
:
79 '''Ensure that only one instance of this specific script runs at once.'''
81 self
.pidfile
= os
.path
.join(gettempdir(), sha256(os
.path
.abspath(sys
.argv
[0])).hexdigest() + '.pid')
83 fd
= os
.open(self
.pidfile
, os
.O_WRONLY|os
.O_CREAT|os
.O_EXCL
, )
86 if e
.errno
== errno
.EEXIST
:
87 logging
.debug('An instance of this is already running.')
90 with os
.fdopen(fd
, 'w') as f
:
91 f
.write(str(os
.getpid()))
94 os
.unlink(self
.pidfile
)
97 parser
= argparse
.ArgumentParser(description
='act on SQS Notifications')
98 parser
.add_argument('--profile', metavar
='PROFILE', dest
='profile_name', help='AWS Profile (default: current IAM Role)')
99 parser
.add_argument('--region', metavar
='REGION', dest
='region_name', help='AWS Region')
100 parser
.add_argument('playbooks', metavar
='directory', help='path containing playbooks et al')
101 parser
.add_argument('queue', help='SQS Queue')
102 parser
.add_argument('arn', nargs
='?', default
=None, help='ARN of SNS topic')
103 args
= parser
.parse_args()
105 pidfile
= PidFileSingleton()
107 session
= boto3
.session
.Session(**{k:v for k,v in vars(args).items() if k in ('profile_name', 'region_name')}
)
108 queue
= session
.resource('sqs').get_queue_by_name(QueueName
=args
.queue
)
109 topic
= session
.resource('sns').Topic(args
.arn
) if args
.arn
else None
110 ec2r
= session
.resource('ec2')
113 # long poll until there are no more messages
114 messages
= queue
.receive_messages(MaxNumberOfMessages
=10, WaitTimeSeconds
=20)
115 if not len(messages
):
117 for message
in messages
:
118 processMessage(message
)