Skip to content

Configuration Parameters

Validation

Global Validation

This section describes the global validation service executing in a single process (i.e. non-replicated service). By default it is based on Java RMI, but it can be configured to run using Thrift as described below in this section. The Global Validation Service can be configured to run either as a single-threaded or multi-threaded server. By setting the flag concurrentValidator to true, the server would execute as multi-threaded, providing more concurrency in processing valdiation request. Default configuration is single-threaded server, if this flag is not specified.

GlobalWorkpool.concurrentValidator=true

If this flag is set true, the one should also sepcify the number of locks. The following flag sets the number of locks to 256.

ConcurrentValidator.nLocks=256

The following flag is used only for single-threaded server to enable speculative checking of queued validation requests which are destined to be aborted due to commitment of current request

GlobalWorkpool.speculativeChecking=true

Enabling Truncation

For large graph problem the internal table maintained by the GlobalWorkpool can become as large as the number of nodes in the graph, causing insufficient memory problem. To avoid this, truncable should be enable as follows:

GlobalWorkpool.truncationEnabled=true

With this flag set, items in the table with update time less than (STS-1000) are deleted periodically. The window size 1000 is fixed in this configuration. In the replicated validation service, it can be set to a different value.

Thrift Based Communication

It is possible to use Thrift RPC, instead of Java RMI, for making validation calls. The following flags should be set to run the Global Validation Service ( GlobalWorkpoolImpl) and the clients in the LocalWorkpoolImpl on all cluster nodes using Thrift for the validation calls.

# When the flag below is "false", we should start the
# GlobalValidator which ia single Java process on a host, which is
# kepler.cs.umn.edu in this example
Validator.useReplicated=false

# Validator.RPC_MECHANISM=RMI will use Java RMI for both client-side and
peer interface
Validator.RPC_MECHANISM=THRIFT

# Single Validator Flags
Validator.GlobalWorkpoolImpl_THRIFT_HOST=cs-kepler.cs.umn.edu
Validator.GlobalWorkpoolImpl_THRIFT_PORT=9001

Replicated Validation

For scalability experiments, we developed a replicated/partitioned validation service. This can also execute with Java RMI as well as Thrift which can be configured. The following parameters are specified for the Replicated Validation Service configuration:

Validator.useReplicated=true

# Replicated Validator Flags
# Replicated Server Selection Policy : RANDOM, KEY_BASED
Validator.serverIdentifyPolicy=RANDOM
Validator.truncation.allowed=true
Validator.truncation.scanLimit=1000
Validator.truncation.windowSize=2000
Validator.writeStatsToFile=false
Validator.nodeSize=100

#Default will use Java RMI for both client-side and peer interface
# Below is Thrift based configuration
#Validator.RPC_MECHANISM=RMI
Validator.RPC_MECHANISM=THRIFT

# clientPort and serverPort are the starting port numbers, increasing
#with validation service replica index number
Validator.thrift.clientPort=5000
Validator.thrift.serverPort=6000
Validator.clientThriftConnectionPool=20
Validator.serverThriftConnectionPool=20
Validator.clusteredGraph=src/TestProgs/GraphColoring/fgraph-20-10-10.pp_bkt_dir

# Validator testing flags
Validator.debug=false
Validator.MSI=175
Validator.ServerReplica=2
Validator.rwSetLengthCount=8
Validator.acrossCluster=0

Local Validation

Depending on the application context, having a single remote validator can decrease performance. Consider the following scenario as an exmaple. If there are many transactions that have small, non-overlapping read write sets, then validation traffic can overwhelm the centralized validator. To avoid such a bottleneck, the local validation scheme decentralizes the validation duties such that each HashtableServer is responsible for validating the nodes it stores.

To enable local validation, one has to toggle the following flag:

Validator.globalValidationEnable=false


Local Workpool

The following flag, named modelType, defines whether the execution model is transaction-based or barrier-based. This can be set to either TRANSACTION, BARRIER, BARRIER-JOIN. In the barrier model there are two options. With BARRIER model, the completion of each barrier phase is automatically detected by the Beehive framework. This can cause program to execute slowly because of delays in automatic detection of phase completion. The BARRIER-JOIN model uses explicit notification by each host in the cluster to the master host (which is the first host in the hostlist file) on completion of its current phase. BARRIER-JOIN model is more efficient in terms of execution time overheads, but it is applicable only when each host has a known number of tasks to be executed in a given phase. For application programs, where the number of tasks executed in a phase are not known a-priori, then one should use the BARRIER model.

# LocalWorkpool.modelType=BARRIER
# LocalWorkpool.modelType=BARRIER-JOIN
LocalWorkpool.modelType=TRANSACTION

The LocalWorkpool can be configured to periodically print the number of tasks committed and aborted. It gives cumulative as well as incremental statistics over the specified interval. The following flag specifies this periodic printing interval. It becomes helpful to observe the execution progress. Below we are setting it to 30 seconds.

LocalWorkpool.printProgressInterval=30000

Another flag called progressMonitorInterval is used to periodically compute the abort rate, which can be used for any adaptive scheduling of tasks. It is set to 20 seconds in the example below:

LocalWorkpool.progressMonitorInterval=20000

The following flag is used to retrieve the STS value from the Global Validation Service if it has not been updated for more than the time value specified by STS_DELAY in seconds:

LocalWorkpool.STS_DELAY=10

The following flag is only meaningful if global validation is enabled. By setting the following flag to true, then prior to sending a transaction to the global validator for validation, each workpool of a cluster node attempts to validate the transaction locally. The transaction will only be sent for global validation if the local validation fails. It is advisable to omit this flag from the configuration file or set it to false since local validation was shown to decrease performance though various experiments.

LocalWorkpool.useLocalValidator=false

There are several other flags which are current not used. Thus, there is no need to set them; we list them below for convenience.

LocalWorkpool.GAP_TIME=50
LocalWorkpool.SLEEP_TIME=50
LocalWorkpool.pullTask=true
LocalWorkpool.logging=false


Task Distribution

When new tasks are created by a task, the worker thread deposits them in the LocalWorkpool by calling addTask method. These are tasks are then distributed to different cluster nodes based on the task distribution policies and the affinity level specified in each task. There are two versions of the LoadDistributor module: V1 and V2. V1 is the original and had many unncessary complexities and it performed synchronous distribution of tasks, one at a time. Version 2, V2, is a simplified and more efficient version in which tasks are batched and asynchronously distributed by a daemon thread.

# Version can be V1 or V2.
# V2 is simplified form of distributor using batching and buddy distribution
LocalWorkpool.LoadDistribution.Version=V2

# Distribution Policies for V1
# 1) KWAYSPLIT-(kwaySplitYpe - a) RANDOM, b) LDAWARE),
# 2) ROUNDROBIN, 3) LCLDAWARE ( Locality Aware , Load Aware)
LocalWorkpool.distributionPolicy=KWAYSPLIT
LocalWorkpool.kwaySplitType=LDAWARE
LocalWorkpool.ksplit=3

# Distribution Policies for V2
# 1) RANDOM, 2) ROUND_ROBIN, 3) MIN_QUEUE, 4) MAX_QUEUE
# Current distribution scheme selects only one other buddy node to split new tasks
# Strong-affinity tasks are always ditributed according to the target node
# The LocalTaskDepositRatio=0.8 flag will put 80% of the new tasks in the local workpool
LocalWorkpool.LoadDistribution.LocalTaskDepositRatio=0.8
LocalWorkpool.LoadDistribution.Policy=RANDOM
LocalWorkpool.LoadDistribution.QueueDispatchSize=20
LocalWorkpool.LoadDistribution.QueueDispatchTimeout=1000


Workers

The following parameter specifies the number of worker threads on each cluster host. This is an important parameter.

Worker.Worker_COUNT=4

If there are no tasks available, then a worker thread sleeps for the number of milliseconds specified by the flag below before attempting to fetch a task again:

Worker.THREADSLEEPTIME=10

StorageSystem and HashtableServer

The StorageSystem object provides the abstraction of global data storage service which is implemented by the HashTableServers executing on different hosts. It is possible to configure the system to use either Java RMI (default) or Thrift for remote calls:

# THRIFT configs
# System.RPC_MECHANISM options are RMI and THRIFT
# Setting the THRIFT option will cause all NodeData operations to go through Thrift interface
System.RPC_MECHANISM=THRIFT
System.HashtableServer_THRIFT_NUM_HANDLES=24
System.HashtableServer_THRIFT_PORT=9002

Note

It is important to note that by setting the RPC_MECHANISM to THRIFT, all NodeData related operations will be routed through THRIFT calls whereas all Node related operations will be performed using RMI. If the mechanism is set to RMI, however, then all operations, Node and NodeData, will be performed using RMI.

The following flag determines when the hostnames are DNS names or in the private IP address space. For example, when we are running on the MSI cluster, this flag must be set to false, otherwise it is set to true on clusters with DNS hostnames:

# canonicalHostNames should be true on our research clusters with DNS names
# canonicalHostNames should be false on MSI Itasca
Server.canonicalHostNames=true

The storage of Node objects on the HashtableServers of cluster nodes can be based in several schemes which can be specified using the following flag:

#Hashing scheme : INTEGER_MODULO, GRAPH_PARTITION, STRING_MODULO
Server.NODE_HASHING_SCHEME=INTEGER_MODULO

# INTEGER_MODULO is used when the Node keys (Ids) are integers, and the distribution
# is in round-robin manner.
#
# GRAPH_PARTITION is used for storage integer valued keys in range partitioned manner.
#
# STRING_MODULO scheme hashes string key (id) for Nodes to an integer, and the stores
# them in round-robin (modulo) fashion.

When an application program wants to distinguish or select the codepath using NodeData instead of a Node based remote call, the following flag is used. Although it is currently designated as a StorageSystem flag, its function is exclusively in the application program and Worker code:

# useNodeData flag is set to true to make NodeData based remote data access
StorageSystem.useNodeData=true

It is important to note that this flag is only used on an application level context. This flag assist the application developer to distinguish which type of worker to use, i.e. one that only use Node related operations, or one that makes use of Node and NodeData operations. An example is provided with GCWorker vs GCWorkerReflection.

The following flag enables some optimized execution path in the HashtableServer code with NodeData based on the type of access that is requested. We found that the impact is mostly marginal and can therefore be ignored:

Server.useNodeDataReflection=true

It is important to note that if this flag is set to false, then it is the application developer's responsibility to override the toNodeData method of every subclass of Node used in the application program.

The following flags are deprecated

The following flags were introduced for Node caching mechanisms in the StorageSystem and the HashtableServer. Currently we are mostly relying on fine-grained remote operations using NodeData. As a result, caching is not being used. Currently, these two mechanisms cannot be used simultaneously:

# Server Caching flags
# The useCaching flag is used to turn on/off Node caching
# Currently we are turning it off and relying on NodeData
Server.useCaching=false

# More details needed for the following flags
Server.CacheInvalidationScheme=2
Server.CacheTableSize=1000
Server.CacheCleanupTarget=0.8
Server.CacheWindow=5000
StorageSystem.WriteThroughCache=false

In the StorageSystem object, we experimented with different options for dispatching vector calls for getting/putting multiple nodes. The choices were parallel calls, serial one at a time per server, and using Java's Executor service. The serial option performed best:

# Storage System Configs
# Remote Calls Type : 0 (Parallel), 1 (Serial), 2 (Executor Service)
StorageSystem.remoteCalls=1

#Following flag control maxattempts when a remote call fails
StorageSystem.MAX_ATTEMPTS=3

# The following flags were introduced for bucketization and batching of NodeData related calls to amortize the
# remote access costs. This led to significant performance gains:

# Call Bucketization and Caching Parameters for NodeData
# More details are needed to added here
# The following timeout period (msec) is to dispatch a batch
StorageSystem.BatchDispatchTimeout=5
StorageSystem.BatchCountThreshold=3

# NodeDataRequestDispatchPoolSize indicates the size of the thread-pool
StorageSystem.NodeDataRequestDispatchPoolSize=8
StorageSystem.NodeDataCacheSize=10000
StorageSystem.NodeDataCachePurgeThreshold=0.9
StorageSystem.NodeDataCachePurgePeriod=500000
StorageSystem.NodeDataCacheCleanupFraction=0.1