Source code for prestoadmin.standalone.config

# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Module for setting and validating the presto-admin config
"""
import re

from fabric.api import env
from overrides import overrides

import prestoadmin.util.fabricapi as util
from prestoadmin import config
from prestoadmin.prestoclient import CERTIFICATE_ALIAS
from prestoadmin.util.base_config import BaseConfig, SingleConfigItem
from prestoadmin.util.exception import ConfigurationError
from prestoadmin.util.local_config_util import get_topology_path
from prestoadmin.util.validators import validate_username, validate_port, \
    validate_host

# Created by the presto-server RPM package.
PRESTO_STANDALONE_USER = 'presto'
PRESTO_STANDALONE_GROUP = 'presto'
PRESTO_STANDALONE_USER_GROUP = "%s:%s" % (PRESTO_STANDALONE_USER,
                                          PRESTO_STANDALONE_GROUP)

# CONFIG KEYS
USERNAME = 'username'
PORT = 'port'
COORDINATOR = 'coordinator'
WORKERS = 'workers'

STANDALONE_CONFIG_LOADED = 'standalone_config_loaded'

PRESTO_ADMIN_PROPERTIES = ['username', 'port', 'coordinator', 'workers',
                           'java8_home', CERTIFICATE_ALIAS]

DEFAULT_PROPERTIES = {USERNAME: 'root',
                      PORT: 22,
                      COORDINATOR: 'localhost',
                      WORKERS: ['localhost']}


[docs]def validate_coordinator(coordinator): validate_host(coordinator) return coordinator
[docs]def validate_workers_for_prompt(workers): return validate_workers(workers.split())
_TOPOLOGY_CONFIG = [ SingleConfigItem( USERNAME, 'Enter user name for SSH connection to all nodes:', default=DEFAULT_PROPERTIES[USERNAME], validate=validate_username), SingleConfigItem( PORT, 'Enter port number for SSH connections to all nodes:', default=DEFAULT_PROPERTIES['port'], validate=validate_port), SingleConfigItem( COORDINATOR, 'Enter host name or IP address for coordinator node. ' 'Enter an external host name or ip address if this is a multi-node ' 'cluster:', default=DEFAULT_PROPERTIES['coordinator'], validate=validate_coordinator), SingleConfigItem( WORKERS, 'Enter host names or IP addresses for worker nodes separated by spaces:', default=' '.join(DEFAULT_PROPERTIES['workers']), validate=validate_workers_for_prompt) ]
[docs]def validate_java8_home(java8_home): return java8_home
[docs]def validate(conf): for key in conf.keys(): if key not in PRESTO_ADMIN_PROPERTIES: raise ConfigurationError('Invalid property: ' + key) try: username = conf['username'] except KeyError: pass else: conf['username'] = validate_username(username) try: java8_home = conf['java8_home'] except KeyError: pass else: conf['java8_home'] = validate_java8_home(java8_home) try: coordinator = conf['coordinator'] except KeyError: pass else: conf['coordinator'] = validate_coordinator(coordinator) try: workers = conf['workers'] except KeyError: pass else: workers = [h for host in workers for h in _expand_host(host)] conf['workers'] = validate_workers(workers) try: port = conf['port'] except KeyError: pass else: conf['port'] = validate_port(port) return conf
[docs]def validate_workers(workers): if not isinstance(workers, list): raise ConfigurationError('Workers must be of type list. Found ' + str(type(workers)) + '.') if len(workers) < 1: raise ConfigurationError('Must specify at least one worker') for worker in workers: validate_host(worker) return workers
def _expand_host(host): match = re.match("(.*)\[(\d{1,})-(\d{1,})\](.*)", host) if match is not None and len(match.groups()) == 4: prefix, start, end, suffix = match.groups() if int(start) > int(end): raise ValueError("the range must be in ascending order") if len(start) == len(end) and len(start) > 1: number_format = "{0:0" + str(len(start)) + "d}" host_list = [number_format.format(i) for i in range(int(start), int(end) + 1)] return [_format_hostname(prefix, i, suffix) for i in host_list] else: return [_format_hostname(prefix, i, suffix) for i in range(int(start), int(end) + 1)] else: return [host] def _format_hostname(prefix, number, suffix): return "{prefix}{num}{suffix}".format(prefix=prefix, num=number, suffix=suffix)
[docs]class StandaloneConfig(BaseConfig): def __init__(self): super(StandaloneConfig, self).__init__(get_topology_path(), _TOPOLOGY_CONFIG) @overrides
[docs] def read_conf(self): conf = self._get_conf_from_file() config.fill_defaults(conf, DEFAULT_PROPERTIES) validate(conf) return conf
def _get_conf_from_file(self): return config.get_conf_from_json_file(self.config_path) @overrides
[docs] def is_config_loaded(self): return STANDALONE_CONFIG_LOADED in env and env[STANDALONE_CONFIG_LOADED]
@overrides
[docs] def set_config_loaded(self): env[STANDALONE_CONFIG_LOADED] = True
@overrides
[docs] def set_env_from_conf(self, conf): self.config = conf env.user = conf['username'] env.port = conf['port'] try: env.java8_home = conf['java8_home'] except KeyError: env.java8_home = None env.roledefs['coordinator'] = [conf['coordinator']] env.roledefs['worker'] = conf['workers'] env.roledefs['all'] = self._dedup_list(util.get_coordinator_role() + util.get_worker_role()) env.hosts = env.roledefs['all'][:] env.conf = conf
@staticmethod def _dedup_list(host_list): deduped_list = [] for item in host_list: if item not in deduped_list: deduped_list.append(item) return deduped_list