- 3.1 Global Validation Service
- 3.2 Replicated Validation Service
- 3.3 Local Workpool
- 3.4 Task Distribution Parameters
- 3.5 Worker Thread Pool Parameters
- 3.6 StorageSystem and HashtableServer
- 3.7 Logger Configuration
A configuration file is defined for setting the values for various configuration parameters for the Beehive framework at runtime. Some of these parameters are critical for program execution, e.g. the number of worker threads or setting the execution mode as transaction-based or barrier-based. There are also several other parameters which are used for performance tuning and experimental evaluation of Beehive mechanisms. The configuration file needs to be created in the same directory where the application program code is stored. Typicallywe name it configFile in our example applications. The parameters values specified in the configuration file are related to different components of the Beehive framework, such as the GlobalWorkpool (i.e. Global Validation Service), LocalWorkpool, task distributor, Worker thread-pool, file-loaders etc.
This section describes the global validation service executing in a single process (i.e. non-replicated service). By default it is based on Java RMI, but it can be configured to run using Thrift as described below in this section. The Global Validation Service can be configured to run either as a single-threaded or multi-threaded server. By setting the flag concurrentValidator to true, the server would execute as multi-threaded, providing more concurrency in processing valdiation request. Default configuration is single-threaded server, if this flag is not specified.
If this flag is set true, the one should also sepcify the number of locks. The following flag sets the number of locks to 256.
The following flag is used only for single-threaded server to enable speculative checking of queued validation requests which are destined to be aborted due to commitment of current request
For large graph problem the internal table maintained by the GlobalWorkpool can become as large as the number of nodes in the graph, causing insufficient memory problem. To avoid this, truncable should be enable as follows:
With this flag set, items in the table with update time less than (STS-1000) are deleted periodically. The window size 1000 is fixed in this configuration. In the replicated validation service, it can be set to a different value.
It is possible to use Thrift RPC, instead of Java RMI, for making validation calls. The following flags should be set to run the Global Validation Service (GlobalWorkpoolImpl) and the clients in the LocalWorkpoolImpl on all cluster nodes using Thrift for the validation calls.
# When the flag below is "false", we should start the # GlobalValidator which ia single Java process on a host, which is # kepler.cs.umn.edu in this example Validator.useReplicated=false # Validator.RPC_MECHANISM=RMI will use Java RMI for both client-side and peer interface Validator.RPC_MECHANISM=THRIFT # Single Validator Flags Validator.GlobalWorkpoolImpl_THRIFT_HOST=cs-kepler.cs.umn.edu Validator.GlobalWorkpoolImpl_THRIFT_PORT=9001
For scalability experiments, we developed a replicated/partitioned validation service. This can also execute with Java RMI as well as Thrift which can be configured. The following parameters are specified for the Replicated Validation Service configuration:
Validator.useReplicated=true # Replicated Validator Flags # Replicated Server Selection Policy : RANDOM, KEY_BASED Validator.serverIdentifyPolicy=RANDOM Validator.truncation.allowed=true Validator.truncation.scanLimit=1000 Validator.truncation.windowSize=2000 Validator.writeStatsToFile=false Validator.nodeSize=100 #Default will use Java RMI for both client-side and peer interface # Below is Thrift based configuration #Validator.RPC_MECHANISM=RMI Validator.RPC_MECHANISM=THRIFT # clientPort and serverPort are the starting port numbers, increasing #with validation service replica index number Validator.thrift.clientPort=5000 Validator.thrift.serverPort=6000 Validator.clientThriftConnectionPool=20 Validator.serverThriftConnectionPool=20 Validator.clusteredGraph=src/TestProgs/GraphColoring/fgraph-20-10-10.pp_bkt_dir # Validator testing flags Validator.debug=false Validator.MSI=175 Validator.ServerReplica=2 Validator.rwSetLengthCount=8 Validator.acrossCluster=0
The following flag, named modelType, defines whether the execution model is transaction-based or barrier-based. This can be set to either TRANSACTION, BARRIER, BARRIER-JOIN. In the barrier model there are two options. With BARRIER model, the completion of each barrier phase is automatically detected by the Beehive framework. This can cause program to execute slowly because of delays in automatic detection of phase completion. The BARRIER-JOIN model uses explicit notification by each host in the cluster to the master host (which is the first host in the hostlist file) on completion of its current phase. BARRIER-JOIN model is more efficient in terms of execution time overheads, but it is applicable only when each host has a known number of tasks to be executed in a given phase. For application programs, where the number of tasks executed in a phase are not known a-priori, then one should use the BARRIER model.
# LocalWorkpool.modelType=BARRIER # LocalWorkpool.modelType=BARRIER-JOIN LocalWorkpool.modelType=TRANSACTION
The LocalWorkpool can be configured to periodically print the number of tasks committed and aborted. It gives cumulative as well as incremental statistics over the specified interval. The following flag specifies this periodic printing interval. It becomes helpful to observe the execution progress. Below we are setting it to 30 seconds.
Another flag called progressMonitorInterval is used to periodically compute the abort rate, which can be used for any adaptive scheduling of tasks. It is set to 20 seconds in the example below:
The following flag is used to retrieve the STS value from the Global Validation Service if it has not been updated for more than the time value specified by STS_DELAY in seconds:
There are several other flags which are current not used. Thus, there is no need to set them; we list them below for convienience.
LocalWorkpool.GAP_TIME=50 LocalWorkpool.SLEEP_TIME=50 LocalWorkpool.pullTask=true LocalWorkpool.logging=false
When new tasks are created by a task, the worker thread deposits them in the LocalWorkpool by calling addTask method. These are tasks are then distributed to different cluster nodes based on the task distribution policies and the affinity level specified in each task. There are two versions of the LoadDistributor module: V1 and V2. V1 is the original and had many unncessary complexities and it performed synchronous distribution of tasks, one at a time. Version 2, V2, is a simplified and more efficient version in which tasks are batched and asynchronously distributed by a daemon thread.
# Version can be V1 or V2. # V2 is simplified form of distributor using batching and buddy distribution LocalWorkpool.LoadDistribution.Version=V2 # Distribution Policies for V1 # 1) KWAYSPLIT-(kwaySplitYpe - a) RANDOM, b) LDAWARE), # 2) ROUNDROBIN, 3) LCLDAWARE ( Locality Aware , Load Aware) LocalWorkpool.distributionPolicy=KWAYSPLIT LocalWorkpool.kwaySplitType=LDAWARE LocalWorkpool.ksplit=3 # Distribution Policies for V2 # 1) RANDOM, 2) ROUND_ROBIN, 3) MIN_QUEUE, 4) MAX_QUEUE # Current distribution scheme selects only one other buddy node to split new tasks # Strong-affinity tasks are always ditributed according to the target node # The LocalTaskDepositRatio=0.8 flag will put 80% of the new tasks in the local workpool LocalWorkpool.LoadDistribution.LocalTaskDepositRatio=0.8 LocalWorkpool.LoadDistribution.Policy=RANDOM LocalWorkpool.LoadDistribution.QueueDispatchSize=20 LocalWorkpool.LoadDistribution.QueueDispatchTimeout=1000
The following parameter specifies the number of worker threads on each cluster host. This is an important parameter.
If there are no tasks available, then a worker thread sleeps for the number of milliseconds specified by the flag below before attempting to fetch a task again:
The StorageSystem object provides the abstraction of global data storage service which is implemented by the HashTableServers executing on different hosts. It is possible to configure the system to use either Java RMI (default) or Thrift for remote calls:
# THRIFT configs # System.RPC_MECHANISM options are RMI and THRIFT # Setting the THRIFT option will cause all NodeData operations to go through Thrift interface System.RPC_MECHANISM=THRIFT System.HashtableServer_THRIFT_NUM_HANDLES=24 System.HashtableServer_THRIFT_PORT=9002
The following flag determines when the hostnames are DNS names or in the private IP address space. For example, when we are running on the MSI cluster, this flag must be set to false, otherwise it is set to true on clusters with DNS hostnames:
IMPORTANT # canonicalHostNames should be true on our research clusters with DNS names # canonicalHostNames should be false on MSI Itasca Server.canonicalHostNames=true
The storage of Node objects on the HashtableServers of cluster nodes can be based in several schemes which can be specified using the following flag:
#Hashing scheme : INTEGER_MODULO, GRAPH_PARTITION, STRING_MODULO Server.NODE_HASHING_SCHEME=INTEGER_MODULO # INTEGER_MODULO is used when the Node keys (Ids) are integers, and the distribution # is in round-robin manner. # # GRAPH_PARTITION is used for storage integer valued keys in range partitioned manner. # # STRING_MODULO scheme hashes string key (id) for Nodes to an integer, and the stores # them in round-robin (modulo) fashion.
When an application program wants to distinguish or select the codepath using NodeData instead of a Node based remote call, the following flag is used. Although it is currently designated as a StorageSystem flag, its function is exclusively in the application program and Worker code:
# useNodeData flag is set to true to make NodeData based remote data access StorageSystem.useNodeData=true
The following flag enables some optimized execution path in the HashtableServer code with NodeData based on the type of access that is requested. We found that the impact is mostly marginal and can therefore be ignored:
The following flags were introduced for Node cahcing mechanisms in the StorageSystem and the HashtableServer. Currently we are mostly relying on fine-grained remote operations using NodeData. As a result, caching is not being used. Currently, these two mechanisms cannot be used simulatneously:
# Server Caching flags # The useCaching flag is used to turn on/off Node caching # Currently we are turning it off and relying on NodeData Server.useCaching=false # More details needed for the following flags Server.CacheInvalidationScheme=2 Server.CacheTableSize=1000 Server.CacheCleanupTarget=0.8 Server.CacheWindow=5000 StorageSystem.WriteThroughCache=false
In the StorageSystem object, we experimented with different options for dispatching vector calls for getting/putting multiple nodes. The choices were parallel calls, serial one at a time per server, and using Java's Executor service. The serial option performed best:
# Storage System Configs # Remote Calls Type : 0 (Parallel), 1 (Serial), 2 (Executor Service) StorageSystem.remoteCalls=1 #Following flag control maxattempts when a remote call fails StorageSystem.MAX_ATTEMPTS=3
The following flags were introduced for bucketization and batching of NodeData related calls to amortize the remote access costs. This led to significant performance gains:
# Call Bucketization and Caching Parameters for NodeData # More details are needed to added here # The following timeout period (msec) is to dispatch a batch StorageSystem.BatchDispatchTimeout=5 StorageSystem.BatchCountThreshold=3 # NodeDataRequestDispatchPoolSize indicates the size of the thread-pool StorageSystem.NodeDataRequestDispatchPoolSize=8 StorageSystem.NodeDataCacheSize=10000 StorageSystem.NodeDataCachePurgeThreshold=0.9 StorageSystem.NodeDataCachePurgePeriod=500000 StorageSystem.NodeDataCacheCleanupFraction=0.1
Logger configuration parameters are specified as shown below:
# Logger Configs Logger.BUF_SIZE=8000 Logger.PrintToFile=false Logger.PrintToConsole=false Logger.logLevels=debug