Source code for prestoadmin.catalog

# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Module for presto catalog configurations
"""
import errno
import logging

import fabric.utils
from fabric.api import task, env
from fabric.context_managers import hide
from fabric.contrib import files
from fabric.operations import sudo, os, get

from prestoadmin.deploy import secure_create_directory
from prestoadmin.standalone.config import StandaloneConfig, \
    PRESTO_STANDALONE_USER_GROUP
from prestoadmin.util import constants
from prestoadmin.util.base_config import requires_config
from prestoadmin.util.exception import ConfigFileNotFoundError, \
    ConfigurationError
from prestoadmin.util.fabricapi import put_secure
from prestoadmin.util.filesystem import ensure_directory_exists
from prestoadmin.util.local_config_util import get_catalog_directory

_LOGGER = logging.getLogger(__name__)

__all__ = ['add', 'remove']
COULD_NOT_REMOVE = 'Could not remove catalog'


# we deploy catalog files with 0600 permissions because they can contain passwords
# that should not be world readable
def deploy_files(filenames, local_dir, remote_dir, user_group, mode=0600):
    _LOGGER.info('Deploying configurations for ' + str(filenames))
    secure_create_directory(remote_dir, PRESTO_STANDALONE_USER_GROUP)
    for name in filenames:
        put_secure(user_group, mode, os.path.join(local_dir, name), remote_dir,
                   use_sudo=True)


def gather_catalogs(local_config_dir, allow_overwrite=False):
    local_catalog_dir = os.path.join(local_config_dir, env.host, 'catalog')
    if not allow_overwrite and os.path.exists(local_catalog_dir):
        fabric.utils.error("Refusing to overwrite %s. Use 'overwrite' "
                           "option to overwrite." % local_catalog_dir)
    ensure_directory_exists(local_catalog_dir)
    if files.exists(constants.REMOTE_CATALOG_DIR):
        return get(constants.REMOTE_CATALOG_DIR, local_catalog_dir, use_sudo=True)
    else:
        return []


def validate(filenames):
    for name in filenames:
        file_path = os.path.join(get_catalog_directory(), name)
        _LOGGER.info('Validating catalog configuration: ' + str(name))
        try:
            with open(file_path) as f:
                file_content = f.read()
            if 'connector.name' not in file_content:
                message = ('Catalog configuration %s does not contain '
                           'connector.name' % name)
                raise ConfigurationError(message)

        except IOError, e:
            fabric.utils.error(message='Error validating ' + file_path,
                               exception=e)
            return False

    return True


@task
@requires_config(StandaloneConfig)
[docs]def add(name=None): """ Deploy configuration for a catalog onto a cluster. E.g.: 'presto-admin catalog add tpch' deploys a configuration file for the tpch connector. The configuration is defined by tpch.properties in the local catalog directory, which defaults to ~/.prestoadmin/catalog. If no catalog name is specified, then configurations for all catalogs in the catalog directory will be deployed Parameters: name - Name of the catalog to be added """ catalog_dir = get_catalog_directory() if name: filename = name + '.properties' config_path = os.path.join(catalog_dir, filename) if not os.path.isfile(config_path): raise ConfigFileNotFoundError( config_path=config_path, message='Configuration for catalog ' + name + ' not found') filenames = [filename] elif not os.path.isdir(catalog_dir): message = ('Cannot add catalogs because directory %s does not exist' % catalog_dir) raise ConfigFileNotFoundError(config_path=catalog_dir, message=message) else: try: filenames = os.listdir(catalog_dir) except OSError as e: fabric.utils.error(e.strerror) return if not filenames: fabric.utils.warn( 'Directory %s is empty. No catalogs will be deployed' % catalog_dir) return if not validate(filenames): return filenames.sort() _LOGGER.info('Adding catalog configurations: ' + str(filenames)) print('Deploying %s catalog configurations on: %s ' % (', '.join(filenames), env.host)) deploy_files(filenames, catalog_dir, constants.REMOTE_CATALOG_DIR, PRESTO_STANDALONE_USER_GROUP)
@task @requires_config(StandaloneConfig)
[docs]def remove(name): """ Remove a catalog from the cluster. Parameters: name - Name of the catalog to be removed """ _LOGGER.info('[' + env.host + '] Removing catalog: ' + name) ret = remove_file(os.path.join(constants.REMOTE_CATALOG_DIR, name + '.properties')) if ret.succeeded: if COULD_NOT_REMOVE in ret: fabric.utils.error(ret) else: print('[%s] Catalog removed. Restart the server for the change ' 'to take effect' % env.host) else: fabric.utils.error('Failed to remove catalog ' + name + '.\n\t' + ret) local_path = os.path.join(get_catalog_directory(), name + '.properties') try: os.remove(local_path) except OSError as e: if e.errno == errno.ENOENT: pass else: raise
def remove_file(path): script = ('if [ -f %(path)s ] ; ' 'then rm %(path)s ; ' 'else echo "%(could_not_remove)s \'%(name)s\'. ' 'No such file \'%(path)s\'"; fi') with hide('stderr', 'stdout'): return sudo(script % {'path': path, 'name': os.path.splitext(os.path.basename(path))[0], 'could_not_remove': COULD_NOT_REMOVE})