#!/usr/bin/env python
"""
Vapi server
"""
__author__ = 'VMware, Inc.'
__copyright__ = 'Copyright 2011-2014 VMware, Inc. All rights reserved. -- VMware Confidential'
import atexit
import locale
import logging
import logging.config
import os
import sys
from six.moves import configparser
import vmware.vapi.debug.livedump # pylint: disable=W0611
from vmware.vapi.lib.addr_url_parser import get_url_scheme
from vmware.vapi.l10n.constants import DEFAULT_LOCALE
from vmware.vapi.settings import config
from vmware.vapi.settings.sections import ENDPOINT, LOGGERS
# Note: Use logger only after configure_logging has been called!!!
logger = None
# Set english locale for vAPI server
locale.setlocale(locale.LC_ALL, DEFAULT_LOCALE)
[docs]def check_file_exists(filename):
"""
Check if name is a file and exists
:type :class:`str`
:param file name
"""
if not os.path.exists(filename) or not os.path.isfile(filename):
raise os.error(2, "No such file: '%s'" % filename)
[docs]def set_process_title(cfg):
"""
If setproctitle library is available, set the process title
:type cfg: :class:`configparser`
:param cfg: configparser object
"""
try:
import setproctitle
name = cfg.get(ENDPOINT, 'process.name')
if name:
setproctitle.setproctitle(name)
except Exception as e:
logger.warn('Could not set the process title because of %s', str(e))
[docs]def get_ssl_args(cfg, protocol_prefix):
"""
Extract the ssl arguments
:type cfg: :class:`configparser.SafeConfigParser`
:param cfg: Configuration
:type protocol_prefix: :class:`str`
:param protocol_prefix: Prefix of the protocol configuration
:rtype: :class:`dict`
:return: SSL arguments for this protocol configuration
"""
ssl_args = {}
ssl_prefix = '%s.ssl' % protocol_prefix
# Try to get the certdir from environment variable or .certdir option
certdir = os.environ.get('CERTDIR')
if certdir is None:
try:
certdir = cfg.get(ENDPOINT, '%s.certdir' % ssl_prefix)
except configparser.NoOptionError:
# But don't complain yet
pass
for ssl_key in ('ca_certs', 'certfile', 'keyfile'):
option = '%s.%s' % (ssl_prefix, ssl_key)
file_name = None
try:
file_name = cfg.get(ENDPOINT, option)
except configparser.NoOptionError:
# ca_certs, certfile and keyfile are optional, so don't complain
# if they are not present
pass
if file_name is not None:
if certdir is None:
# If one of ca_certs, certfile, keyfile is specified
# and certdir is not specified, then raise an error
logger.error('Specify certificate absolute directory path '
'either by setting environment variable CERTDIR '
'or by setting %s.certdir in the properties file',
ssl_prefix)
raise configparser.NoOptionError('%s.certdir' % ssl_prefix,
ENDPOINT)
else:
file_path = os.path.join(certdir, file_name)
check_file_exists(file_path)
ssl_args[ssl_key] = file_path
return ssl_args
[docs]def setup_provider_chain(cfg, singleton):
"""
Setup the API Provider chain
In the properties file, users would specify the order of ApiProviders
For ex: InterposerProvider, ApiAggregator. In this case all incoming
requests would first go to InterposerProvider and then forwarded to
ApiAggregator after processing.
This function initializes all these providers in the reverse order
and passes the reference of n+1th provider to nth provider.
:type cfg: :class:`configparser.SafeConfigParser`
:param cfg: Configuration
:type singleton: :class:`bool`
:param singleton: Specify whether to create new instances of Providers or
use existing ones
:rtype: :class:`list` of :class:`vmware.vapi.core.ApiProvider`
:return: List of API Providers
"""
# This import cannot be at the top as we need to wait for logger to get
# initialized with right handlers
from vmware.vapi.lib.load import dynamic_import
singleton_providers = {
'ApiAggregator': 'vmware.vapi.provider.aggregator.get_provider',
'LocalProvider': 'vmware.vapi.provider.local.get_provider',
'InterposerProvider': 'vmware.vapi.provider.interposer.get_provider',
'AuthenticationFilter': 'vmware.vapi.provider.authentication.get_provider',
}
providers = {
'ApiAggregator': 'vmware.vapi.provider.aggregator.AggregatorProvider',
'LocalProvider': 'vmware.vapi.provider.local.LocalProvider',
'InterposerProvider': 'vmware.vapi.provider.interposer.InterposerProvider',
'AuthenticationFilter': 'vmware.vapi.provider.authentication.AuthenticationFilter',
}
provider_types = cfg.get(ENDPOINT, 'provider.type').split(',')
provider_map = singleton_providers if singleton else providers
providers = []
for i, provider_type in enumerate(reversed(provider_types)):
provider_name = provider_map.get(provider_type)
if provider_name:
provider_constructor = dynamic_import(provider_name)
if provider_constructor is None:
raise ImportError('Could not import %s' % provider_name)
if i == 0:
# TODO: Add validation to make sure that the last provider
# can support registration of services
provider = provider_constructor()
provider.register_by_properties(cfg)
else:
provider = provider_constructor()
provider.next_provider = providers[i - 1]
providers.append(provider)
else:
logger.error('Could not load provider')
return []
return providers[::-1]
[docs]def create_servers(spec, singleton=True):
"""
Create RPC servers
:type spec: :class:`dict`
:param spec: Protocol configurations
:type singleton: :class:`bool`
:kwarg singleton: Specify whether to create new instances of Providers or
use existing ones
:rtype: :class:`list` of :class:`vmware.vapi.server.server_interface.ServerInterface`
:return: list of servers
"""
server_providers = {
'asyncore': 'vmware.vapi.server.asyncore_server.get_server',
'twisted': 'vmware.vapi.server.twisted_server.get_server',
'stdio': 'vmware.vapi.server.stdio_server.get_server',
'wsgi': 'vmware.vapi.server.wsgi_server.get_server',
}
cfg = configparser.SafeConfigParser()
cfg.read(spec)
# Set the singleton config object
config.cfg = cfg
# Configure logging
if LOGGERS in cfg.sections():
configure_logging(spec)
else:
configure_logging()
set_process_title(cfg)
providers = setup_provider_chain(cfg, singleton)
if not providers:
return providers
# Get the first API Provider to connect to RPC
provider = providers[0]
try:
protocol_configs = cfg.get(ENDPOINT, 'protocol').split(',')
except configparser.NoOptionError:
raise Exception('No protocol configurations specified')
# These import cannot be at the top as we need to wait for logger to get
# initialized with right handlers
from vmware.vapi.lib.load import dynamic_import
from vmware.vapi.protocol.server.msg.handler_factory import ProtocolHandlerFactory
from vmware.vapi.protocol.server.transport.async_server_adapter_factory import AsyncServerAdapterFactory
servers = {}
msg_handlers = {}
protocol_handlers = {}
for protocol_config in protocol_configs:
protocol_prefix = 'protocol.%s' % protocol_config
rpc_prefix = '%s.rpc' % protocol_prefix
url = cfg.get(ENDPOINT, rpc_prefix)
logger.info('url: %s', url)
transport_protocol = get_url_scheme(url)
logger.info('transport protocol: %s', url)
# Get request handler
msg_handler_factory = ProtocolHandlerFactory()
msg_protocol = cfg.get(ENDPOINT, '%s.msg' % protocol_prefix)
try:
msg_handler_class = cfg.get(ENDPOINT, '%s.msg.handler' % protocol_prefix)
msg_handler_reference = dynamic_import(msg_handler_class)
if msg_handler_reference:
msg_handler = msg_handler_reference(provider)
except configparser.NoOptionError:
logger.info('msg_protocol %s', msg_protocol)
msg_handler = msg_handlers.get((msg_protocol, provider))
if not msg_handler:
msg_handler = msg_handler_factory.get_handler(msg_protocol, provider)
msg_handlers[(msg_protocol, provider)] = msg_handler
# Get async server to msg handler adapter
adapter_factory = AsyncServerAdapterFactory()
protocol_handler = protocol_handlers.get((transport_protocol, msg_handler))
if not protocol_handler:
if url in ('stdio', 'wsgi'):
protocol_handler_factory = ProtocolHandlerFactory()
protocol_handler = \
protocol_handler_factory.get_handler(msg_protocol, provider)
else:
protocol_handler = adapter_factory.get_adapter(transport_protocol,
msg_handler)
protocol_handlers[(transport_protocol, msg_handler)] = protocol_handler
# Extract ssl arguments
ssl_args = get_ssl_args(cfg, protocol_prefix)
# Get server constructor
try:
rpc_server = cfg.get(ENDPOINT, '%s.server' % rpc_prefix)
except configparser.NoOptionError:
if transport_protocol in ('stdio'):
rpc_server = 'stdio'
elif transport_protocol in ('wsgi'):
rpc_server = 'wsgi'
else:
rpc_server = 'asyncore'
logger.info('rpc server: %s', rpc_server)
# Register handler
server = servers.get(rpc_server)
if not server:
server_name = server_providers.get(rpc_server)
if server_name is not None:
server_constructor = dynamic_import(server_name)
if server_constructor is None:
raise Exception('Could not find RPC constructor')
server = server_constructor(cfg)
servers[rpc_server] = server
server.register_handler(url, msg_protocol, protocol_handler, ssl_args)
return list(servers.values())
[docs]def shutdown_servers(servers):
"""
Shutdown all servers
:type :class:`list` of servers
:param List of servers to shutdown
"""
for server in servers:
server.shutdown()
[docs]def main():
""" main """
spec = sys.argv[1]
check_file_exists(os.path.abspath(spec))
servers = create_servers(spec)
if len(servers) == 0:
logger.info('No server available. Quit')
return
if len(servers) > 1:
threaded_servers = []
main_thread_server = None
for server in servers:
if getattr(server, 'main_thread_only', False):
if main_thread_server is None or main_thread_server == server:
main_thread_server = server
else:
raise Exception('More than one servers required running on main thread!')
else:
threaded_servers.append(server)
# Use thread pool to start servers
from vmware.vapi.lib.thread_pool import ThreadPool
pool = ThreadPool(max_workers=len(servers))
atexit.register(shutdown_servers, servers)
if main_thread_server:
for server in threaded_servers:
pool.queue_work(server.serve_forever)
main_thread_server.serve_forever()
else:
# Wait until all server die
pool.queue_works_and_wait([server.serve_forever for server in threaded_servers])
else:
server = servers[0]
server.serve_forever()
if __name__ == '__main__':
main()