Source code for shepherd.resources.aws.instance

from __future__ import print_function

import boto

from boto.ec2.blockdevicemapping import BlockDeviceMapping
from boto.ec2.blockdevicemapping import BlockDeviceType

from arbiter import create_task
from arbiter.sync import run_tasks

from shepherd.common.plugins import Resource
from shepherd.common.utils import tasks_passed
from shepherd.resources.aws import get_security_group

SPOT_REQUEST_ACTIVE = 'active'
SPOT_REQUEST_FULFILLED = 'fulfilled'
INST_RUNNING_STATE = 'running'
INST_REACHABLE_STATE = 'passed'


[docs]def get_block_device_mapping(): mapping = BlockDeviceMapping() eph0 = BlockDeviceType() eph1 = BlockDeviceType() eph2 = BlockDeviceType() eph3 = BlockDeviceType() eph0.ephemeral_name = 'ephemeral0' eph1.ephemeral_name = 'ephemeral1' eph2.ephemeral_name = 'ephemeral2' eph3.ephemeral_name = 'ephemeral3' mapping['/dev/sdb'] = eph0 mapping['/dev/sdc'] = eph1 mapping['/dev/sdd'] = eph1 mapping['/dev/sde'] = eph1 return mapping
[docs]class Instance(Resource): def __init__(self): super(Instance, self).__init__('aws') self._availability_zone = None self._image_id = None self._instance_type = None self._security_groups = None self._key_name = None self._spot_price = None self._volumes = [] self._user_data = None self._instance_id = None self._security_group_ids = [] self._spot_instance_request = None self._ip = None self._reservation = None self._block_device_map = get_block_device_mapping() self._terminated = True self._attributes_map.update({ 'availability_zone': '_availability_zone', 'image_id': '_image_id', 'instance_type': '_instance_type', 'security_groups': '_security_groups', 'key_name': '_key_name', 'spot_price': '_spot_price', 'volumes': '_volumes', 'user_data': '_user_data', 'instance_id': '_instance_id', 'spot_instance_request': '_spot_instance_request', 'terminated': '_terminated', })
[docs] def get_dependencies(self): deps = [] for volume_dict in self._volumes: volume = self.stack.get_resource_by_name( volume_dict['VolumeID'] ) if volume: deps.append(volume) for group_name in self._security_groups: security_group = self.stack.get_resource_by_name(group_name) if security_group: deps.append(security_group) return deps
@property def ip(self): return self._ip @Resource.validate_create() def create(self): """ Handles creating spot or on demand instances. Info: task order is: 1. get_security_group_ids 2. type_specific_tasks: request_demand or request_spot and check_spot (last task should labelled 'get_instance_id') 3. check_running 4. create_tags 5. attach_volumes 6. check_initialized 7. ssh_accessible """ common_tasks = ( create_task('get_security_group_ids', self._get_security_group_ids), create_task( 'check_running', self._check_running, ('get_instance_id',), retries=self.stack.settings['retries'], delay=self.stack.settings['delay'] ), create_task('create_tags', self._create_tags, ('check_running',)), create_task('attach_volumes', self.attach_volumes, ('check_running',)), create_task( 'check_initialized', self._check_reachable, ('check_running',), retries=self.stack.settings['retries'], delay=self.stack.settings['delay'] ), # Test ssh accessibility ) type_specific_tasks = () if self._spot_price: type_specific_tasks = ( create_task('request_spot', self._request_spot, ('get_security_group_ids',)), create_task( 'get_instance_id', self._check_spot, ('request_spot',), retries=self.stack.settings['retries'], delay=self.stack.settings['delay'] ), ) else: type_specific_tasks = ( create_task('get_instance_id', self._request_demand, ('get_security_group_ids',)), ) tasks = common_tasks + type_specific_tasks results = run_tasks(tasks) self._available = tasks_passed( results, self._logger, msg='Failed to provision instance {}'.format(self._local_name) ) @Resource.validate_destroy() def destroy(self): conn = boto.connect_ec2() if self._spot_instance_request: conn.cancel_spot_instance_requests(self._spot_instance_request) self._spot_instance_request = None tasks = ( create_task('terminate', self._terminate_instance), create_task( 'check', self._check_terminated, ('terminate',), retries=self.stack.settings['retries'], delay=self.stack.settings['delay'] ), ) results = run_tasks(tasks) return tasks_passed( results, self._logger, msg='Failed to de provision instance {}'.format(self._local_name) ) # Might want to organize this and create tags better
[docs] def attach_volumes(self): if self._instance_id: conn = boto.connect_ec2() for volume_dict in self._volumes: volume_id = self._get_volume_id( volume_dict['VolumeId'] ) # volume = get_volume(volume_id) mountpoint = volume_dict['Device'] self._logger.debug( 'Attaching volume %s to %s an %s', volume_id, self._instance_id, mountpoint ) conn.attach_volume( volume_id=volume_id, instance_id=self._instance_id, device=mountpoint ) return True
def _request_demand(self): self._logger.debug('Requesting demand instance %s', self._local_name) conn = boto.connect_ec2() reservation = conn.run_instances( image_id=self._image_id, instance_type=self._instance_type, key_name=self._key_name, user_data=self._user_data, security_group_ids=self._security_group_ids, placement=self._availability_zone, block_device_map=self._block_device_map ) assert len(reservation.instances) == 1 self._instance_id = reservation.instances[0].id return True def _request_spot(self): self._logger.debug('Requesting spot instance %s', self._local_name) conn = boto.connect_ec2() self._spot_instance_request = conn.request_spot_instances( image_id=self._image_id, price=self._spot_price, type='one-time', instance_type=self._instance_type, key_name=self._key_name, user_data=self._user_data, security_group_ids=self._security_group_ids, placement=self._availability_zone, block_device_map=self._block_device_map )[0] return True def _terminate_instance(self): conn = boto.connect_ec2() if self._instance_id: if not self._terminated: self._logger.debug('Terminating instance %s', self._local_name) conn.terminate_instances( instance_ids=[self._instance_id] ) self._terminated = True return self._terminated def _check_running(self): self._logger.debug('Checking if instance %s is running', self._local_name) resp = False assert self._instance_id conn = boto.connect_ec2() instances = conn.get_only_instances( instance_ids=[self._instance_id] ) assert len(instances) == 1 instance = instances[0] if instance.state == INST_RUNNING_STATE: self._terminated = False self._ip = instance.ip_address resp = True return resp def _check_reachable(self): self._logger.debug('Checking if instance %s is reachable', self._local_name) resp = False assert self._instance_id conn = boto.connect_ec2() status = conn.get_all_instance_status( instance_ids=[self._instance_id] ) if len(status) > 0: if status[0].system_status.details['reachability'] == INST_REACHABLE_STATE: resp = True else: self._logger.debug( 'Reachability Status = %s', status[0].system_status.details['reachability'] ) return resp def _check_terminated(self): conn = boto.connect_ec2() if self._instance_id: reservation = conn.get_all_instances( instance_ids=[self._instance_id] )[0] instance = reservation.instances[0] if instance.state == 'terminated': self._available = False self._instance_id = None return not self._available def _check_spot(self): self._logger.debug( 'Checking if spot request %s is fulfilled', self._spot_instance_request.id ) resp = False conn = boto.connect_ec2() requests = conn.get_all_spot_instance_requests( request_ids=[self._spot_instance_request.id] ) assert len(requests) == 1 request = requests[0] if (request.state == SPOT_REQUEST_ACTIVE and request.status.code == SPOT_REQUEST_FULFILLED and request.instance_id): self._instance_id = request.instance_id resp = True return resp def _create_tags(self): conn = boto.connect_ec2() self._logger.debug('Creating tags for instance %s', self._local_name) self._tags.update(self.stack.tags) conn.create_tags([self._instance_id], self._tags) return True def _get_security_group_ids(self): if not self._security_group_ids: for sg in self._security_groups: self._security_group_ids.append( get_security_group(group_name=sg, stack=self.stack) ) return True def _get_volume_id(self, volume_name): volume_id = None vol = self.stack.get_resource_by_name(volume_name) if vol: volume_id = vol.volume_id return volume_id