3.   Configuration Parameters for Program Execution



A configuration file is defined for setting the values for various configuration parameters for the Beehive framework at runtime. Some of these parameters are critical for program execution, e.g. the number of worker threads or setting the execution mode as transaction-based or barrier-based. There are also several other parameters which are used for performance tuning and experimental evaluation of Beehive mechanisms. The configuration file needs to be created in the same directory where the application program code is stored. Typicallywe name it configFile in our example applications.

The parameters values specified in the configuration file are related to different components of the Beehive framework, such as the GlobalWorkpool (i.e. Global Validation Service), LocalWorkpool, task distributor, Worker thread-pool, file-loaders etc.


3.1 Global Validation Service

This section describes the global validation service executing in a single process (i.e. non-replicated service). By default it is based on Java RMI, but it can be configured to run using Thrift as described below in this section.

The Global Validation Service can be configured to run either as a single-threaded or multi-threaded server. By setting the flag concurrentValidator to true, the server would execute as multi-threaded, providing more concurrency in processing valdiation request. Default configuration is single-threaded server, if this flag is not specified.

        GlobalWorkpool.concurrentValidator=true
    

If this flag is set true, the one should also sepcify the number of locks. The following flag sets the number of locks to 256.

        ConcurrentValidator.nLocks=256
    

The following flag is used only for single-threaded server to enable speculative checking of queued validation requests which are destined to be aborted due to commitment of current request

        GlobalWorkpool.speculativeChecking=true
    

3.1.1 Enabling Truncation

For large graph problem the internal table maintained by the GlobalWorkpool can become as large as the number of nodes in the graph, causing insufficient memory problem. To avoid this, truncable should be enable as follows:

        GlobalWorkpool.truncationEnabled=true
    

With this flag set, items in the table with update time less than (STS-1000) are deleted periodically. The window size 1000 is fixed in this configuration. In the replicated validation service, it can be set to a different value.

3.1.2 Thrift Based Configuration

It is possible to use Thrift RPC, instead of Java RMI, for making validation calls. The following flags should be set to run the Global Validation Service (GlobalWorkpoolImpl) and the clients in the LocalWorkpoolImpl on all cluster nodes using Thrift for the validation calls.

        # When the flag below is "false", we should start the
        # GlobalValidator which ia single Java process on a host, which is
        # kepler.cs.umn.edu in this example
        Validator.useReplicated=false

        # Validator.RPC_MECHANISM=RMI will use Java RMI for both client-side and
        peer interface
        Validator.RPC_MECHANISM=THRIFT

        # Single Validator Flags
        Validator.GlobalWorkpoolImpl_THRIFT_HOST=cs-kepler.cs.umn.edu
        Validator.GlobalWorkpoolImpl_THRIFT_PORT=9001
    

3.2 Replicated Validation Service

For scalability experiments, we developed a replicated/partitioned validation service. This can also execute with Java RMI as well as Thrift which can be configured.

The following parameters are specified for the Replicated Validation Service configuration:

        Validator.useReplicated=true

        # Replicated Validator Flags
        # Replicated Server Selection Policy : RANDOM, KEY_BASED
        Validator.serverIdentifyPolicy=RANDOM
        Validator.truncation.allowed=true
        Validator.truncation.scanLimit=1000
        Validator.truncation.windowSize=2000
        Validator.writeStatsToFile=false
        Validator.nodeSize=100

        #Default will use Java RMI for both client-side and peer interface
        # Below is Thrift based configuration
        #Validator.RPC_MECHANISM=RMI
        Validator.RPC_MECHANISM=THRIFT

        # clientPort and serverPort are the starting port numbers, increasing
        #with validation service replica index number
        Validator.thrift.clientPort=5000
        Validator.thrift.serverPort=6000
        Validator.clientThriftConnectionPool=20
        Validator.serverThriftConnectionPool=20
        Validator.clusteredGraph=src/TestProgs/GraphColoring/fgraph-20-10-10.pp_bkt_dir

        # Validator testing flags
        Validator.debug=false
        Validator.MSI=175
        Validator.ServerReplica=2
        Validator.rwSetLengthCount=8
        Validator.acrossCluster=0
    

3.3 Local Workpool

The following flag, named modelType, defines whether the execution model is transaction-based or barrier-based. This can be set to either TRANSACTION, BARRIER, BARRIER-JOIN. In the barrier model there are two options. With BARRIER model, the completion of each barrier phase is automatically detected by the Beehive framework. This can cause program to execute slowly because of delays in automatic detection of phase completion. The BARRIER-JOIN model uses explicit notification by each host in the cluster to the master host (which is the first host in the hostlist file) on completion of its current phase. BARRIER-JOIN model is more efficient in terms of execution time overheads, but it is applicable only when each host has a known number of tasks to be executed in a given phase. For application programs, where the number of tasks executed in a phase are not known a-priori, then one should use the BARRIER model.

        # LocalWorkpool.modelType=BARRIER
        # LocalWorkpool.modelType=BARRIER-JOIN
        LocalWorkpool.modelType=TRANSACTION
    

The LocalWorkpool can be configured to periodically print the number of tasks committed and aborted. It gives cumulative as well as incremental statistics over the specified interval. The following flag specifies this periodic printing interval. It becomes helpful to observe the execution progress. Below we are setting it to 30 seconds.

        LocalWorkpool.printProgressInterval=30000
    

Another flag called progressMonitorInterval is used to periodically compute the abort rate, which can be used for any adaptive scheduling of tasks. It is set to 20 seconds in the example below:

        LocalWorkpool.progressMonitorInterval=20000
    

The following flag is used to retrieve the STS value from the Global Validation Service if it has not been updated for more than the time value specified by STS_DELAY in seconds:

        LocalWorkpool.STS_DELAY=10
    

There are several other flags which are current not used. Thus, there is no need to set them; we list them below for convienience.

        LocalWorkpool.GAP_TIME=50
        LocalWorkpool.SLEEP_TIME=50
        LocalWorkpool.pullTask=true
        LocalWorkpool.logging=false
    

3.4 Task Distribution Parameters

When new tasks are created by a task, the worker thread deposits them in the LocalWorkpool by calling addTask method. These are tasks are then distributed to different cluster nodes based on the task distribution policies and the affinity level specified in each task. There are two versions of the LoadDistributor module: V1 and V2. V1 is the original and had many unncessary complexities and it performed synchronous distribution of tasks, one at a time. Version 2, V2, is a simplified and more efficient version in which tasks are batched and asynchronously distributed by a daemon thread.

        # Version can be V1 or V2.
        # V2 is simplified form of distributor using batching and buddy distribution
        LocalWorkpool.LoadDistribution.Version=V2

        # Distribution Policies for V1
        # 1) KWAYSPLIT-(kwaySplitYpe - a) RANDOM, b) LDAWARE),
        # 2) ROUNDROBIN, 3) LCLDAWARE ( Locality Aware , Load Aware)
        LocalWorkpool.distributionPolicy=KWAYSPLIT
        LocalWorkpool.kwaySplitType=LDAWARE
        LocalWorkpool.ksplit=3

        # Distribution Policies for V2
        # 1) RANDOM, 2) ROUND_ROBIN, 3) MIN_QUEUE, 4) MAX_QUEUE
        # Current distribution scheme selects only one other buddy node to split new tasks
        # Strong-affinity tasks are always ditributed according to the target node
        # The LocalTaskDepositRatio=0.8 flag will put 80% of the new tasks in the local workpool
        LocalWorkpool.LoadDistribution.LocalTaskDepositRatio=0.8
        LocalWorkpool.LoadDistribution.Policy=RANDOM
        LocalWorkpool.LoadDistribution.QueueDispatchSize=20
        LocalWorkpool.LoadDistribution.QueueDispatchTimeout=1000
    

3.5 Worker Thread-pool Parameters

The following parameter specifies the number of worker threads on each cluster host. This is an important parameter.

        IMPORTANT
        Worker.Worker_COUNT=4
    

If there are no tasks available, then a worker thread sleeps for the number of milliseconds specified by the flag below before attempting to fetch a task again:

        Worker.THREADSLEEPTIME=10
    

3.6 StorageSystem and HashtableServer

The StorageSystem object provides the abstraction of global data storage service which is implemented by the HashTableServers executing on different hosts. It is possible to configure the system to use either Java RMI (default) or Thrift for remote calls:

        # THRIFT configs
        # System.RPC_MECHANISM options are RMI and THRIFT
        # Setting the THRIFT option will cause all NodeData operations to go through Thrift interface
        System.RPC_MECHANISM=THRIFT
        System.HashtableServer_THRIFT_NUM_HANDLES=24
        System.HashtableServer_THRIFT_PORT=9002
    

The following flag determines when the hostnames are DNS names or in the private IP address space. For example, when we are running on the MSI cluster, this flag must be set to false, otherwise it is set to true on clusters with DNS hostnames:

        IMPORTANT
        # canonicalHostNames should be true on our research clusters with DNS names
        # canonicalHostNames should be false on MSI Itasca
        Server.canonicalHostNames=true
    

The storage of Node objects on the HashtableServers of cluster nodes can be based in several schemes which can be specified using the following flag:

        #Hashing scheme : INTEGER_MODULO, GRAPH_PARTITION, STRING_MODULO
        Server.NODE_HASHING_SCHEME=INTEGER_MODULO

        # INTEGER_MODULO is used when the Node keys (Ids) are integers, and the distribution
        #   is in round-robin manner.
        #
        # GRAPH_PARTITION is used for storage integer valued keys in range partitioned manner.
        #
        # STRING_MODULO scheme hashes string key (id) for Nodes to an integer, and the stores
        #   them in round-robin (modulo) fashion.
    

When an application program wants to distinguish or select the codepath using NodeData instead of a Node based remote call, the following flag is used. Although it is currently designated as a StorageSystem flag, its function is exclusively in the application program and Worker code:

        # useNodeData flag is set to true to make NodeData based remote data access
        StorageSystem.useNodeData=true
    

The following flag enables some optimized execution path in the HashtableServer code with NodeData based on the type of access that is requested. We found that the impact is mostly marginal and can therefore be ignored:

        Server.useNodeDataReflection=true
    

The following flags were introduced for Node cahcing mechanisms in the StorageSystem and the HashtableServer. Currently we are mostly relying on fine-grained remote operations using NodeData. As a result, caching is not being used. Currently, these two mechanisms cannot be used simulatneously:

        # Server Caching flags
        # The useCaching flag is used to turn on/off Node caching
        # Currently we are turning it off and relying on NodeData
        Server.useCaching=false

        # More details needed for the following flags
        Server.CacheInvalidationScheme=2
        Server.CacheTableSize=1000
        Server.CacheCleanupTarget=0.8
        Server.CacheWindow=5000
        StorageSystem.WriteThroughCache=false
    

In the StorageSystem object, we experimented with different options for dispatching vector calls for getting/putting multiple nodes. The choices were parallel calls, serial one at a time per server, and using Java's Executor service. The serial option performed best:

        # Storage System Configs
        # Remote Calls Type : 0 (Parallel), 1 (Serial), 2 (Executor Service)
        StorageSystem.remoteCalls=1

        #Following flag control maxattempts when a remote call fails
        StorageSystem.MAX_ATTEMPTS=3
    

The following flags were introduced for bucketization and batching of NodeData related calls to amortize the remote access costs. This led to significant performance gains:

        # Call Bucketization and Caching Parameters for NodeData
        # More details are needed to added here
        # The following timeout period (msec) is to dispatch a batch
        StorageSystem.BatchDispatchTimeout=5
        StorageSystem.BatchCountThreshold=3

        # NodeDataRequestDispatchPoolSize indicates the size of the thread-pool
        StorageSystem.NodeDataRequestDispatchPoolSize=8
        StorageSystem.NodeDataCacheSize=10000
        StorageSystem.NodeDataCachePurgeThreshold=0.9
        StorageSystem.NodeDataCachePurgePeriod=500000
        StorageSystem.NodeDataCacheCleanupFraction=0.1
    

3.7 Logger Configuration

Logger configuration parameters are specified as shown below:

        # Logger Configs
        Logger.BUF_SIZE=8000
        Logger.PrintToFile=false
        Logger.PrintToConsole=false
        Logger.logLevels=debug
    



 GO  TO- Top of this page    Previous Chapter  Next  Chapter   Table of Contents of this Guide