7. AppLoader and File Loader Utilities



7.1 File Loaders

The file loaders are programs which will read the input file and load the graph into the memory of each host. For each of the graph problem, a different version of the same file loader is present since for each problem, the structure of the Node and Edge will be different. For example, a file loader for the Graph Coloring problem will be written in its specific sematics, i.e., GCNode, GCEdge, etc.

In all file loaders, the id of the host where the node is to be loaded in memory is decided by a modulo function: hostId = nodeId % totalHosts

Here are a few file loaders:

  1. InputFileReaderV2

    This is one of the early file loaders and is for the max flow problem. This file loader odes not take the preprocessed (.pp) file as input. Instead, it will take the fgraph file as input. The file loader would essentially create new node objects and create a list of Edge objects for all neighbors for each node, and store the list in each node. The nodes are not batched before flushing to the storage system. It is a single threaded program (run by the master).
  2. InputFileReaderBatch

    This is a file loader for the Graph Coloring problem. This file loader does not take the preprocessed (.pp) file as input. Instead, it will take the fgraph file as input. The file loader would create new node objects and create a list of Edge objecs for all of the neighbors for each node, and store the list in each node. The nodes are batched and when the batch size reaches 1000, the batch is sent to the storage server.
  3. InputFileReader

    This is a file loader for the Page Rank problem. This file loader does not take the preprocessed (.pp) file as input. Instead, it will take the fgraph as input. The file loader would essentially create new node objects and create a list of Edge objecs for all neighbors for each node, and store the list in each node. The nodes are batched and when the batch size reached 100, the batch is sent to the storage server.
  4. InputFileReaderParallel

    This is a file loader for the Graph Coloring problem. The file loader takes a preprocessed (.pp) file as input. Here there would be one thread on the master to read the input file. There will be ten additional threads (run by master), each of which would create the node and edge objects, batch the node objects (batch size 100), and flush the batch to the storage system.
  5. InputFileReaderDistributed

    This is a file loader for the Graph Coloring problem. This file loader takes a preprocessed (.pp) file as input. Here each host will read the input file and the host's storage server will load the node only if that host is the destination of the node. Otherwise, it will be ignored. This prevents the bottleneck of one master node loading the data into remote servers. However, since all of the hosts are reading the entire file and only loading a fraction of it into their local storage server, there is a lot of unnecessary traffic and hence this file loader does not scale well to larger graphs.
  6. InputFileReaderBucketed

    This is a file loader for the Graph Coloring problem. This file loader takes a preprocessed (.pp) file as input. Here the idea is that the input file is bucketized into smaller files and each host will then processes one of the many bucket files to load their respective nodes into its local storage system. The bucketized files are created using the FileBucketizer program (/project/cluster15/GraphGen/FileBucketizer) from the .pp files.


7.2 AppLoader

AppLoader is a program that performs several operations before the actual problem execution begins. In Beehive, all test programs inherit the BeehiveAppLoader class that encapsultes the initialization and graph loading at the Beehive nodes. It contains the init() method which has the logic to performs the required initializations. The init() method takes as parameters the program arguments passed, the class name of the test program, a hashmap nodeClassNameMap containing the mapping of all class names to an integer value, and a hashmap nodeClassMap containing the mapping of all class objects to the same integer value used above. It also declares an abstract method loadGraphData() that contains the logic to call the function of the required loader to read and load the graph data.

When the AppLoader starts its execution, it first loads the configuration file to fetch the configuration details. It then begins the operation of loading the graph data into the Beehive cluster nodes. It employs a distributed coordination mechanism to coordinate all the Beehive cluster nodes. In this distributed coordination mechanism, the first host in the hostlist file is the coordinator. When the execution starts, all the Beehive nodes read the hostlist file to determine which node is the coordinator in the Beehive cluster. Upon starting, the coordinator registers its handle in the RMI registry and starts waiting for the other cluster nodes to start up. The other cluster nodes send an RMI message to inform the coordinator that they are online, and waits for an acknowledgement message from the coordinator. Once all the Beehive nodes have informed the coordinator that they are online, the coordinator sends an acknowledgement message to all the Beehive nodes to start loading the graph. All the cluster nodes then starts loading the graph data according to the type of InputFileReader specified in the configuration file. Some of the cluster nodes may take a longer time to load the graph compared to other nodes due to differences in their hardware configurations. Once the cluster nodes have completed loading the graph, they again send a message to the coordinator. Again, the coordinator waits for message from all the Beehive nodes to ensure that all the nodes have completed loading the graph. Once the graph loading is completed, the actual operation to solve the graph problem can start.

The AppLoader also initializes the HashTableServer, the StorageSystem and the local Workpool. It then returns control to the test program so that the test program can create the required number of worker threads and begin execution.

7.2.1 Class maps

Should the application use Thrift as its communication means, the programmer must construct two class mapping for the StorageSystem to reduce communication costs. Each map will use unique byte identifiers. For each subclass of Beehive's Node class, there is a mapping from a byte to the name of the class. Additionally, there is also a mapping from a byte to the class object itself.

Consider the Graph Coloring problem, if the following lines should be added to the application's AppLoader:

        Map nodeClassNameMap = new HashMap<>();
        Map nodeClassMap = new HashMap<>();

        nodeClassNameMap.put((byte) 1, GCNode.class.getName());
        nodeClassMap.put((byte) 1, GCNode.class);
    

These two maps are then given to the init method to configure the StorageSystem for Thrift based communication.


7.3 Separation of Beehive Servers

The Beehive framework can be configured to run the storage and computation service independently. In other words, one could have four computation servers, each on a different host, and two storage servers, each on a different host from the computation servers. This provides the of scaling out resources independently of each other without too much distribution of the data across Beehive. In this section, we discuss how to configure the system to run in such a model and what changes are needed on the application level.

7.3.1 Configuration File Changes

To configure Beehive to run the compute and storage servers separately, the following lines must be added to the configuration file:

  1. Model.Partition

    This describes the configuration model of Beehive. If this is set to true, then Beehive will run under the separation model. If this is set to false, then Beehive will run under the single model where the compute and storage service exists on one server.
  2. Worker.STORAGE_WORKER_COUNT

    If Model.Partition is set to true, then this will determine how many workers are created on the storage servers. This is still in development, there should not be any workers on the storage serverse because the worker logic is identical to that of the compute workers
  3. Worker.COMPUTE_WORKER_COUNT

    If Model.Partition is set to true, then this will determine how many workers are created on the compute servers.

7.3.2 Hostname File Changes

Under the separation model, programmers will have to provide three host files:

  1. #-storage-nodes

    This lists the # hostnames that are to run the storage servers
  2. #-compute-nodes

    This lists the # hostnames that are to run the compute servers
  3. #-cluster-nodes

    This lists the # hostnames of the compute and storage servers that are within the application's cluster.

7.3.3 Script File Changes

Under the separation model, programmers will need to edit the application's run script, e.g., runGC. The only change is to provide the file containing the host names of the compute servers. The following is an example of runGC:

java -Xms4096m -Xmx8192m -XX:+UseG1GC TestProgs.GraphColoring.GCTest 4-storage-nodes configFile cs-spatial-301.cs.umn.edu $GRAPH_FGRAPH/fgraph-1000-100-100.pp 1000 configFile 6-compute-nodes 2>&1 &
    

If one decides to run under the single model, then the last hostfile can be omitted.

7.3.4 Code Changes

  1. Apploader Changes

    Under the separation model, programmers have to alter the application code that extends from BeehiveAppLoader, e.g., GCTest. The code changes needed are:
    • The ability to accept either six arguments (to run under the single model) or seven arguments (to run under the separation model).
    • The ability to determine that if the application is under separation model, then whether or not the server is a compute or storage server. This is important for the purposes of starting the appropriate workers with different logic depending on the type of server.

  2. Worker Changes

    Under the separation model, there is no concept of local keys/nodes on the compute servers as all of the nodes are stored on the storage servers. Therefore, any calls that attempt to retrieve local keys, nodes, and/or neighbors will not work as compute servers host no nodes. As a result, one must create new Workers specifically for the separation model or have workers intelligently decide whether or not to use the api calls relating to retrieving locally hosted objects.