Configuration Parameters
Global Validation
This section describes the global validation service executing in a single process (i.e. non-replicated service). By default it is based on Java RMI, but it can be configured to run using Thrift as described below in this section. The Global Validation Service can be configured to run either as a single-threaded or multi-threaded server. By setting the flag concurrentValidator to true, the server would execute as multi-threaded, providing more concurrency in processing valdiation request. Default configuration is single-threaded server, if this flag is not specified.
If this flag is set true, the one should also sepcify the number of locks. The following flag sets the number of locks to 256.
The following flag is used only for single-threaded server to enable speculative checking of queued validation requests which are destined to be aborted due to commitment of current request
Enabling Truncation
For large graph problem the internal table maintained by the GlobalWorkpool can become as large as the number of nodes in the graph, causing insufficient memory problem. To avoid this, truncable should be enable as follows:
With this flag set, items in the table with update time less than (STS-1000) are deleted periodically. The window size 1000 is fixed in this configuration. In the replicated validation service, it can be set to a different value.
Thrift Based Communication
It is possible to use Thrift RPC, instead of Java RMI, for making validation calls. The following flags should be set to run the Global Validation Service ( GlobalWorkpoolImpl) and the clients in the LocalWorkpoolImpl on all cluster nodes using Thrift for the validation calls.
# When the flag below is "false", we should start the # GlobalValidator which ia single Java process on a host, which is # in this example Validator.useReplicated=false # Validator.RPC_MECHANISM=RMI will use Java RMI for both client-side and peer interface Validator.RPC_MECHANISM=THRIFT # Single Validator Flags Validator.GlobalWorkpoolImpl_THRIFT_PORT=9001
Replicated Validation
For scalability experiments, we developed a replicated/partitioned validation service. This can also execute with Java RMI as well as Thrift which can be configured. The following parameters are specified for the Replicated Validation Service configuration:
Validator.useReplicated=true # Replicated Validator Flags # Replicated Server Selection Policy : RANDOM, KEY_BASED Validator.serverIdentifyPolicy=RANDOM Validator.truncation.allowed=true Validator.truncation.scanLimit=1000 Validator.truncation.windowSize=2000 Validator.writeStatsToFile=false Validator.nodeSize=100 #Default will use Java RMI for both client-side and peer interface # Below is Thrift based configuration #Validator.RPC_MECHANISM=RMI Validator.RPC_MECHANISM=THRIFT # clientPort and serverPort are the starting port numbers, increasing #with validation service replica index number Validator.thrift.clientPort=5000 Validator.thrift.serverPort=6000 Validator.clientThriftConnectionPool=20 Validator.serverThriftConnectionPool=20 Validator.clusteredGraph=src/TestProgs/GraphColoring/fgraph-20-10-10.pp_bkt_dir # Validator testing flags Validator.debug=false Validator.MSI=175 Validator.ServerReplica=2 Validator.rwSetLengthCount=8 Validator.acrossCluster=0
Local Validation
Depending on the application context, having a single remote validator can decrease performance. Consider the following scenario as an exmaple. If there are many transactions that have small, non-overlapping read write sets, then validation traffic can overwhelm the centralized validator. To avoid such a bottleneck, the local validation scheme decentralizes the validation duties such that each HashtableServer is responsible for validating the nodes it stores.
To enable local validation, one has to toggle the following flag:
Local Workpool
The following flag, named modelType, defines whether the execution model is transaction-based or barrier-based. This can be set to either TRANSACTION, BARRIER, BARRIER-JOIN. In the barrier model there are two options. With BARRIER model, the completion of each barrier phase is automatically detected by the Beehive framework. This can cause program to execute slowly because of delays in automatic detection of phase completion. The BARRIER-JOIN model uses explicit notification by each host in the cluster to the master host (which is the first host in the hostlist file) on completion of its current phase. BARRIER-JOIN model is more efficient in terms of execution time overheads, but it is applicable only when each host has a known number of tasks to be executed in a given phase. For application programs, where the number of tasks executed in a phase are not known a-priori, then one should use the BARRIER model.
# LocalWorkpool.modelType=BARRIER # LocalWorkpool.modelType=BARRIER-JOIN LocalWorkpool.modelType=TRANSACTION
The LocalWorkpool can be configured to periodically print the number of tasks committed and aborted. It gives cumulative as well as incremental statistics over the specified interval. The following flag specifies this periodic printing interval. It becomes helpful to observe the execution progress. Below we are setting it to 30 seconds.
Another flag called progressMonitorInterval is used to periodically compute the abort rate, which can be used for any adaptive scheduling of tasks. It is set to 20 seconds in the example below:
The following flag is used to retrieve the STS value from the Global Validation Service if it has not been updated for more than the time value specified by STS_DELAY in seconds:
The following flag is only meaningful if global validation is enabled. By setting the following flag to true, then prior to sending a transaction to the global validator for validation, each workpool of a cluster node attempts to validate the transaction locally. The transaction will only be sent for global validation if the local validation fails. It is advisable to omit this flag from the configuration file or set it to false since local validation was shown to decrease performance though various experiments.
There are several other flags which are current not used. Thus, there is no need to set them; we list them below for convenience.
LocalWorkpool.GAP_TIME=50 LocalWorkpool.SLEEP_TIME=50 LocalWorkpool.pullTask=true LocalWorkpool.logging=false
Task Distribution
When new tasks are created by a task, the worker thread deposits them in the LocalWorkpool by calling addTask method. These are tasks are then distributed to different cluster nodes based on the task distribution policies and the affinity level specified in each task. There are two versions of the LoadDistributor module: V1 and V2. V1 is the original and had many unncessary complexities and it performed synchronous distribution of tasks, one at a time. Version 2, V2, is a simplified and more efficient version in which tasks are batched and asynchronously distributed by a daemon thread.
# Version can be V1 or V2. # V2 is simplified form of distributor using batching and buddy distribution LocalWorkpool.LoadDistribution.Version=V2 # Distribution Policies for V1 # 1) KWAYSPLIT-(kwaySplitYpe - a) RANDOM, b) LDAWARE), # 2) ROUNDROBIN, 3) LCLDAWARE ( Locality Aware , Load Aware) LocalWorkpool.distributionPolicy=KWAYSPLIT LocalWorkpool.kwaySplitType=LDAWARE LocalWorkpool.ksplit=3 # Distribution Policies for V2 # 1) RANDOM, 2) ROUND_ROBIN, 3) MIN_QUEUE, 4) MAX_QUEUE # Current distribution scheme selects only one other buddy node to split new tasks # Strong-affinity tasks are always ditributed according to the target node # The LocalTaskDepositRatio=0.8 flag will put 80% of the new tasks in the local workpool LocalWorkpool.LoadDistribution.LocalTaskDepositRatio=0.8 LocalWorkpool.LoadDistribution.Policy=RANDOM LocalWorkpool.LoadDistribution.QueueDispatchSize=20 LocalWorkpool.LoadDistribution.QueueDispatchTimeout=1000
The following parameter specifies the number of worker threads on each cluster host. This is an important parameter.
If there are no tasks available, then a worker thread sleeps for the number of milliseconds specified by the flag below before attempting to fetch a task again:
StorageSystem and HashtableServer
The StorageSystem object provides the abstraction of global data storage service which is implemented by the HashTableServers executing on different hosts. It is possible to configure the system to use either Java RMI (default) or Thrift for remote calls:
# THRIFT configs # System.RPC_MECHANISM options are RMI and THRIFT # Setting the THRIFT option will cause all NodeData operations to go through Thrift interface System.RPC_MECHANISM=THRIFT System.HashtableServer_THRIFT_NUM_HANDLES=24 System.HashtableServer_THRIFT_PORT=9002
It is important to note that by setting the RPC_MECHANISM to THRIFT, all NodeData related operations will be routed through THRIFT calls whereas all Node related operations will be performed using RMI. If the mechanism is set to RMI, however, then all operations, Node and NodeData, will be performed using RMI.
The following flag determines when the hostnames are DNS names or in the private IP address space. For example, when we are running on the MSI cluster, this flag must be set to false, otherwise it is set to true on clusters with DNS hostnames:
# canonicalHostNames should be true on our research clusters with DNS names # canonicalHostNames should be false on MSI Itasca Server.canonicalHostNames=true
The storage of Node objects on the HashtableServers of cluster nodes can be based in several schemes which can be specified using the following flag:
#Hashing scheme : INTEGER_MODULO, GRAPH_PARTITION, STRING_MODULO Server.NODE_HASHING_SCHEME=INTEGER_MODULO # INTEGER_MODULO is used when the Node keys (Ids) are integers, and the distribution # is in round-robin manner. # # GRAPH_PARTITION is used for storage integer valued keys in range partitioned manner. # # STRING_MODULO scheme hashes string key (id) for Nodes to an integer, and the stores # them in round-robin (modulo) fashion.
When an application program wants to distinguish or select the codepath using NodeData instead of a Node based remote call, the following flag is used. Although it is currently designated as a StorageSystem flag, its function is exclusively in the application program and Worker code:
# useNodeData flag is set to true to make NodeData based remote data access StorageSystem.useNodeData=true
It is important to note that this flag is only used on an application level context. This flag assist the application developer to distinguish which type of worker to use, i.e. one that only use Node related operations, or one that makes use of Node and NodeData operations. An example is provided with GCWorker vs GCWorkerReflection.
The following flag enables some optimized execution path in the HashtableServer code with NodeData based on the type of access that is requested. We found that the impact is mostly marginal and can therefore be ignored:
It is important to note that if this flag is set to false, then it is the application developer's responsibility to override the toNodeData method of every subclass of Node used in the application program.
The following flags are deprecated
The following flags were introduced for Node caching mechanisms in the StorageSystem and the HashtableServer. Currently we are mostly relying on fine-grained remote operations using NodeData. As a result, caching is not being used. Currently, these two mechanisms cannot be used simultaneously:
# Server Caching flags # The useCaching flag is used to turn on/off Node caching # Currently we are turning it off and relying on NodeData Server.useCaching=false # More details needed for the following flags Server.CacheInvalidationScheme=2 Server.CacheTableSize=1000 Server.CacheCleanupTarget=0.8 Server.CacheWindow=5000 StorageSystem.WriteThroughCache=false In the StorageSystem object, we experimented with different options for dispatching vector calls for getting/putting multiple nodes. The choices were parallel calls, serial one at a time per server, and using Java's Executor service. The serial option performed best: # Storage System Configs # Remote Calls Type : 0 (Parallel), 1 (Serial), 2 (Executor Service) StorageSystem.remoteCalls=1 #Following flag control maxattempts when a remote call fails StorageSystem.MAX_ATTEMPTS=3 # The following flags were introduced for bucketization and batching of NodeData related calls to amortize the # remote access costs. This led to significant performance gains: # Call Bucketization and Caching Parameters for NodeData # More details are needed to added here # The following timeout period (msec) is to dispatch a batch StorageSystem.BatchDispatchTimeout=5 StorageSystem.BatchCountThreshold=3 # NodeDataRequestDispatchPoolSize indicates the size of the thread-pool StorageSystem.NodeDataRequestDispatchPoolSize=8 StorageSystem.NodeDataCacheSize=10000 StorageSystem.NodeDataCachePurgeThreshold=0.9 StorageSystem.NodeDataCachePurgePeriod=500000 StorageSystem.NodeDataCacheCleanupFraction=0.1