Source code for shepherd.config

"""
shepherd.manager
~~~~~~~~~~~~~~~~~~~~~~~~~~~~

This modules contains the Config object itself, which is the
primary outward facing object for the package (kind of like app in Flask).

Notes:
    * Currently the builtin paths and plugins are hardcoded, we may want to
    inspect the plugins module and dirnames to create those variables.
    * See the common/config.schema for the schema of the settings dict.
    * the default settings dict is::

        {
            'manifest_path': '',
            'verbosity': 2,
            'retries': 120,
            'delay': 5,
            'vars': {},
            'storage': {
                'name': 'DynamoStorage',
                'settings': {}
            },
        }

"""
from __future__ import print_function

import os
import sys
import inspect
import fnmatch
import logging
import anyconfig

from os.path import dirname, join, abspath
from importlib import import_module
from yapsy.PluginManager import PluginManager
from yapsy.PluginFileLocator import PluginFileLocator
from yapsy.PluginFileLocator import IPluginFileAnalyzer
from yapsy.PluginFileLocator import PluginFileAnalyzerWithInfoFile
from attrdict import AttrDict

# from shepherd.common.exceptions import PluginError
from shepherd.common.plugins import Resource
from shepherd.common.plugins import Action
from shepherd.common.plugins import Storage
from shepherd.common.plugins import Parser
from shepherd.common.plugins import is_plugin
from shepherd.common.utils import validate_config, configure_logging

if sys.version > '3':
    from configparser import ConfigParser
else:
    from ConfigParser import ConfigParser

_PACKAGE_PATH = dirname(abspath(__file__))
_BUILTIN_PATHS = [
    join(_PACKAGE_PATH, 'resources'),
    join(_PACKAGE_PATH, 'actions'),
    join(_PACKAGE_PATH, 'storage'),
]
_DEFAULT_SETTINGS = AttrDict({
    'manifest_path': '',
    'verbosity': 2,
    'retries': 120,
    'delay': 5,
    'vars': {},
    'storage': {
        'name': 'DynamoStorage',
        'settings': {}
    },
})

logger = logging.getLogger(__name__)


[docs]class Config(object): """ The :class:`Config <Config>` is responsible for managing the plugins and executing given tasks. """ _configs = [] logging_verbosity = 0 def __init__(self, settings, name): """Summary Args: settings (dict): the dictionary of settings name (str): the config name """ assert settings is not None assert name is not None self._name = name self._settings = settings self._inspect_analyzer = None self._default_analyzer = None self._categories = None self._paths = _BUILTIN_PATHS self._plugins = None self._stacks = [] validate_config(settings) if Config.logging_verbosity < settings['verbosity']: configure_logging(settings['verbosity']) Config.logging_verbosity = settings['verbosity'] logger.info( 'Increased logging verbosity from %s to %s with the new config...', Config.logging_verbosity, settings['verbosity'] ) self._configure_plugins() def _configure_plugins(self): """ Handles initialization of the :class:`Config <Config>`. This method shouldn't be called outside of this class. """ logger.debug('Configuring Config') # Setup the locators # Inspection analyzer, mostly for builtin plugins # (resources, tasks, etc) self._inspect_analyzer = PluginFileAnalyzerInspection( 'inspector', _BUILTIN_PATHS ) # The default analyzer for any extension paths that we don't trust. self._default_analyzer = PluginFileAnalyzerWithInfoFile( 'default', extensions='plugin' ) # The order of the analyzers could matter. self._locator = PluginFileLocator( analyzers=[ self._inspect_analyzer, self._default_analyzer, ] ) # Create the categories filter dict self._categories = { "Resource": Resource, "Action": Action, "Storage": Storage, "Parser": Parser, } # Setup the search paths if self._settings and "extension_paths" in self._settings: self._paths.extend(self._settings["extension_paths"]) # Actually create the PluginManager self._plugins = PluginManager( categories_filter=self._categories, directories_list=self._paths, plugin_locator=self._locator ) # Collect the plugins self._plugins.collectPlugins() @property def name(self): return self._name @property def settings(self): return self._settings @classmethod
[docs] def make(cls, settings=None, name=""): """ When first setting up the Config you should call this class method. Args: settings (dict, optional): desire settings values overriding the defaults. name (str, optional): the name of the config Returns: the created config obj """ logger.debug('Creating Config named "%s"', name) config_settings = _DEFAULT_SETTINGS if settings: config_settings.update(settings) assert config_settings is not None new_config = Config(config_settings, name) for index, config in enumerate(cls._configs): if config.name == name: logger.warn('Recreating Config named %s', name) cls._configs[index] = new_config break else: cls._configs.append(new_config) return new_config
@classmethod
[docs] def make_from_file(cls, filename, name=""): """ Loads the settings dict from a file and passes it to Config.make. Args: filename (str): name of the file to load name (str, optional): the name of the config Returns: Config: the created config obj """ settings = anyconfig.load(filename, safe=True) return cls.make(settings=settings, name=name)
@classmethod
[docs] def get(cls, name=""): """ Use this to access your desired Config. Args: name (str, optional): the unique name of the config you want returned. Returns: the config obj Raises: KeyError: if a config by that name does't exist. """ logger.debug('Retrieving Config named "%s"', name) for config in cls._configs: if config.name == name: return config else: raise KeyError('No config with the name {} exists'.format(name))
[docs] def get_plugins(self, category_name=None, plugin_name=None): """ get_plugins returns a deepcopy of all the plugins fitting the search criteria. While this isn't very memory efficient our plugins should be small and few between enough that it'll be worth getting independent copies of them. For example we will likely want to work with multiple copies of the Same Resource plugin. Args: category_name (str, optional): a category to search for plugins in. plugin_name (str, optional): the name of the plugin to look for. Returns: list: of the plugins that match the criteria. """ results = [] if category_name and plugin_name: plugin_info = self._plugins.getPluginByName( plugin_name, category=category_name ) if plugin_info: results.append(plugin_info.plugin_object.__class__()) elif category_name and not plugin_name: plugin_infos = self._plugins.getPluginsOfCategory(category_name) for plugin_info in plugin_infos: results.append(plugin_info.plugin_object.__class__()) elif plugin_name and not category_name: for category in self._plugins.getCategories(): plugin_info = self._plugins.getPluginByName( plugin_name, category=category ) if plugin_info: results.append(plugin_info.plugin_object.__class__()) elif not category_name and not plugin_name: plugin_infos = self._plugins.getAllPlugins() for plugin_info in plugin_infos: results.append(plugin_info.plugin_object.__class__()) return results
[docs]class PluginFileAnalyzerInspection(IPluginFileAnalyzer): """ This PluginFileAnalyzer determines the plugins via inspection. If the module contains a class that subclasses """ def __init__(self, name, paths): """Summary Args: name (str): name of the Analyzer [requirement of yapsy] paths (list): the paths search through for loadable plugins """ IPluginFileAnalyzer.__init__(self, name) self.module_paths = {} self.getModulePaths(paths)
[docs] def isValidPlugin(self, filename): """ Checks if the given filename is a valid plugin for this Strategy Args: filename (str): name of the file to inspect. """ result = False if self.getPluginClass(filename): result = True return result
[docs] def getInfosDictFromPlugin(self, dirpath, filename): """ Returns the extracted plugin informations as a dictionary. This function ensures that "name" and "path" are provided. Args: dirpath (str): the directory of the file filename (str): the filename """ infos = {} infos["name"] = self.getPluginClass(filename) infos["path"] = os.path.join(dirpath, filename) logger.debug('Name: %s, Path: %s', infos['name'], infos['path']) cf_parser = ConfigParser() cf_parser.add_section("Core") cf_parser.set("Core", "Name", infos["name"]) cf_parser.set("Core", "Module", infos["path"]) return infos, cf_parser
[docs] def getPluginClass(self, filename): """ Extracts the plugin class from the given file. Args: filename (str): name of file to inspect Returns: str: name of the plugin class """ plugin_class = None if filename in self.module_paths: sys.path.insert(0, self.module_paths[filename]) mod_name = os.path.splitext(filename)[0] mod = import_module(mod_name) for name, obj in inspect.getmembers(mod, inspect.isclass): if is_plugin(obj): plugin_class = name break sys.path.remove(self.module_paths[filename]) return plugin_class
[docs] def getModulePaths(self, paths): """ Extracts the module_paths from the provided lis of paths. Args: paths (list): list of paths to walk """ for path in paths: for root, _, filenames in os.walk(path): for filename in fnmatch.filter(filenames, '*.py'): self.module_paths[filename] = os.path.abspath(root)