Source code for vmware.vapi.server.vapid

#!/usr/bin/env python

"""
Vapi server
"""

__author__ = 'VMware, Inc.'
__copyright__ = 'Copyright 2011-2014 VMware, Inc.  All rights reserved. -- VMware Confidential'

import atexit
import locale
import logging
import logging.config
import os
import sys

from six.moves import configparser

import vmware.vapi.debug.livedump  # pylint: disable=W0611
from vmware.vapi.lib.addr_url_parser import get_url_scheme
from vmware.vapi.l10n.constants import DEFAULT_LOCALE
from vmware.vapi.settings import config
from vmware.vapi.settings.sections import ENDPOINT, LOGGERS

# Note: Use logger only after configure_logging has been called!!!
logger = None

# Set english locale for vAPI server
locale.setlocale(locale.LC_ALL, DEFAULT_LOCALE)


[docs]def check_file_exists(filename): """ Check if name is a file and exists :type :class:`str` :param file name """ if not os.path.exists(filename) or not os.path.isfile(filename): raise os.error(2, "No such file: '%s'" % filename)
[docs]def set_process_title(cfg): """ If setproctitle library is available, set the process title :type cfg: :class:`configparser` :param cfg: configparser object """ try: import setproctitle name = cfg.get(ENDPOINT, 'process.name') if name: setproctitle.setproctitle(name) except Exception as e: logger.warn('Could not set the process title because of %s', str(e))
[docs]def configure_logging(log_config=None): """ Configure logging using properties file :type log_config: :class:`str` :kwarg log_config: File path of the properties file """ if log_config is None: handler = logging.StreamHandler(sys.stdout) handler.setLevel(logging.INFO) formatter = logging.Formatter( '%(asctime)s %(levelname)-8s %(name)-15s %(message)s') handler.setFormatter(formatter) # Configure the root logger root_logger = logging.getLogger() root_logger.setLevel(logging.INFO) root_logger.addHandler(handler) else: check_file_exists(log_config) logging.config.fileConfig(log_config) global logger # pylint: disable=W0603 logger = logging.getLogger('vmware.vapi.server.vapid')
[docs]def get_ssl_args(cfg, protocol_prefix): """ Extract the ssl arguments :type cfg: :class:`configparser.SafeConfigParser` :param cfg: Configuration :type protocol_prefix: :class:`str` :param protocol_prefix: Prefix of the protocol configuration :rtype: :class:`dict` :return: SSL arguments for this protocol configuration """ ssl_args = {} ssl_prefix = '%s.ssl' % protocol_prefix # Try to get the certdir from environment variable or .certdir option certdir = os.environ.get('CERTDIR') if certdir is None: try: certdir = cfg.get(ENDPOINT, '%s.certdir' % ssl_prefix) except configparser.NoOptionError: # But don't complain yet pass for ssl_key in ('ca_certs', 'certfile', 'keyfile'): option = '%s.%s' % (ssl_prefix, ssl_key) file_name = None try: file_name = cfg.get(ENDPOINT, option) except configparser.NoOptionError: # ca_certs, certfile and keyfile are optional, so don't complain # if they are not present pass if file_name is not None: if certdir is None: # If one of ca_certs, certfile, keyfile is specified # and certdir is not specified, then raise an error logger.error('Specify certificate absolute directory path ' 'either by setting environment variable CERTDIR ' 'or by setting %s.certdir in the properties file', ssl_prefix) raise configparser.NoOptionError('%s.certdir' % ssl_prefix, ENDPOINT) else: file_path = os.path.join(certdir, file_name) check_file_exists(file_path) ssl_args[ssl_key] = file_path return ssl_args
[docs]def setup_provider_chain(cfg, singleton): """ Setup the API Provider chain In the properties file, users would specify the order of ApiProviders For ex: InterposerProvider, ApiAggregator. In this case all incoming requests would first go to InterposerProvider and then forwarded to ApiAggregator after processing. This function initializes all these providers in the reverse order and passes the reference of n+1th provider to nth provider. :type cfg: :class:`configparser.SafeConfigParser` :param cfg: Configuration :type singleton: :class:`bool` :param singleton: Specify whether to create new instances of Providers or use existing ones :rtype: :class:`list` of :class:`vmware.vapi.core.ApiProvider` :return: List of API Providers """ # This import cannot be at the top as we need to wait for logger to get # initialized with right handlers from vmware.vapi.lib.load import dynamic_import singleton_providers = { 'ApiAggregator': 'vmware.vapi.provider.aggregator.get_provider', 'LocalProvider': 'vmware.vapi.provider.local.get_provider', 'InterposerProvider': 'vmware.vapi.provider.interposer.get_provider', 'AuthenticationFilter': 'vmware.vapi.provider.authentication.get_provider', } providers = { 'ApiAggregator': 'vmware.vapi.provider.aggregator.AggregatorProvider', 'LocalProvider': 'vmware.vapi.provider.local.LocalProvider', 'InterposerProvider': 'vmware.vapi.provider.interposer.InterposerProvider', 'AuthenticationFilter': 'vmware.vapi.provider.authentication.AuthenticationFilter', } provider_types = cfg.get(ENDPOINT, 'provider.type').split(',') provider_map = singleton_providers if singleton else providers providers = [] for i, provider_type in enumerate(reversed(provider_types)): provider_name = provider_map.get(provider_type) if provider_name: provider_constructor = dynamic_import(provider_name) if provider_constructor is None: raise ImportError('Could not import %s' % provider_name) if i == 0: # TODO: Add validation to make sure that the last provider # can support registration of services provider = provider_constructor() provider.register_by_properties(cfg) else: provider = provider_constructor() provider.next_provider = providers[i - 1] providers.append(provider) else: logger.error('Could not load provider') return [] return providers[::-1]
[docs]def create_servers(spec, singleton=True): """ Create RPC servers :type spec: :class:`dict` :param spec: Protocol configurations :type singleton: :class:`bool` :kwarg singleton: Specify whether to create new instances of Providers or use existing ones :rtype: :class:`list` of :class:`vmware.vapi.server.server_interface.ServerInterface` :return: list of servers """ server_providers = { 'asyncore': 'vmware.vapi.server.asyncore_server.get_server', 'twisted': 'vmware.vapi.server.twisted_server.get_server', 'stdio': 'vmware.vapi.server.stdio_server.get_server', 'wsgi': 'vmware.vapi.server.wsgi_server.get_server', } cfg = configparser.SafeConfigParser() cfg.read(spec) # Set the singleton config object config.cfg = cfg # Configure logging if LOGGERS in cfg.sections(): configure_logging(spec) else: configure_logging() set_process_title(cfg) providers = setup_provider_chain(cfg, singleton) if not providers: return providers # Get the first API Provider to connect to RPC provider = providers[0] try: protocol_configs = cfg.get(ENDPOINT, 'protocol').split(',') except configparser.NoOptionError: raise Exception('No protocol configurations specified') # These import cannot be at the top as we need to wait for logger to get # initialized with right handlers from vmware.vapi.lib.load import dynamic_import from vmware.vapi.protocol.server.msg.handler_factory import ProtocolHandlerFactory from vmware.vapi.protocol.server.transport.async_server_adapter_factory import AsyncServerAdapterFactory servers = {} msg_handlers = {} protocol_handlers = {} for protocol_config in protocol_configs: protocol_prefix = 'protocol.%s' % protocol_config rpc_prefix = '%s.rpc' % protocol_prefix url = cfg.get(ENDPOINT, rpc_prefix) logger.info('url: %s', url) transport_protocol = get_url_scheme(url) logger.info('transport protocol: %s', url) # Get request handler msg_handler_factory = ProtocolHandlerFactory() msg_protocol = cfg.get(ENDPOINT, '%s.msg' % protocol_prefix) try: msg_handler_class = cfg.get(ENDPOINT, '%s.msg.handler' % protocol_prefix) msg_handler_reference = dynamic_import(msg_handler_class) if msg_handler_reference: msg_handler = msg_handler_reference(provider) except configparser.NoOptionError: logger.info('msg_protocol %s', msg_protocol) msg_handler = msg_handlers.get((msg_protocol, provider)) if not msg_handler: msg_handler = msg_handler_factory.get_handler(msg_protocol, provider) msg_handlers[(msg_protocol, provider)] = msg_handler # Get async server to msg handler adapter adapter_factory = AsyncServerAdapterFactory() protocol_handler = protocol_handlers.get((transport_protocol, msg_handler)) if not protocol_handler: if url in ('stdio', 'wsgi'): protocol_handler_factory = ProtocolHandlerFactory() protocol_handler = \ protocol_handler_factory.get_handler(msg_protocol, provider) else: protocol_handler = adapter_factory.get_adapter(transport_protocol, msg_handler) protocol_handlers[(transport_protocol, msg_handler)] = protocol_handler # Extract ssl arguments ssl_args = get_ssl_args(cfg, protocol_prefix) # Get server constructor try: rpc_server = cfg.get(ENDPOINT, '%s.server' % rpc_prefix) except configparser.NoOptionError: if transport_protocol in ('stdio'): rpc_server = 'stdio' elif transport_protocol in ('wsgi'): rpc_server = 'wsgi' else: rpc_server = 'asyncore' logger.info('rpc server: %s', rpc_server) # Register handler server = servers.get(rpc_server) if not server: server_name = server_providers.get(rpc_server) if server_name is not None: server_constructor = dynamic_import(server_name) if server_constructor is None: raise Exception('Could not find RPC constructor') server = server_constructor(cfg) servers[rpc_server] = server server.register_handler(url, msg_protocol, protocol_handler, ssl_args) return list(servers.values())
[docs]def shutdown_servers(servers): """ Shutdown all servers :type :class:`list` of servers :param List of servers to shutdown """ for server in servers: server.shutdown()
[docs]def main(): """ main """ spec = sys.argv[1] check_file_exists(os.path.abspath(spec)) servers = create_servers(spec) if len(servers) == 0: logger.info('No server available. Quit') return if len(servers) > 1: threaded_servers = [] main_thread_server = None for server in servers: if getattr(server, 'main_thread_only', False): if main_thread_server is None or main_thread_server == server: main_thread_server = server else: raise Exception('More than one servers required running on main thread!') else: threaded_servers.append(server) # Use thread pool to start servers from vmware.vapi.lib.thread_pool import ThreadPool pool = ThreadPool(max_workers=len(servers)) atexit.register(shutdown_servers, servers) if main_thread_server: for server in threaded_servers: pool.queue_work(server.serve_forever) main_thread_server.serve_forever() else: # Wait until all server die pool.queue_works_and_wait([server.serve_forever for server in threaded_servers]) else: server = servers[0] server.serve_forever()
if __name__ == '__main__': main()