5. Worker Class and Abstractions for Application Programming



Here we describe the Worker class which is responsible for picking Tasks from the Workpool for execution. We will go over some of the primitives that programmers will need to be familiar with for application programming with Beehive.


5.1 Worker

Beehive provides a Java class called Worker which represents one of the several worker threads on a Beehive system. Each Worker maintains the following information:

Programmers must extend this class in order to do application programming with Beehive since the for task execution is dependent on the application. This can be done by writing a subclass of Worker. Depending on the application, one could have one or many types of Workers in the application, each with their own execution logic. As a running example, consider the Graph Coloring problem. Below is a Graph Coloring specific Worker.

        public class GCWorker extends Worker {
            // Application specific fields

            @Override
            public void doTask(Task tsk) {
                // Begin transaction
                // Application specific computation logic
                // Validate transaction
            }

            // Auxillary methods
        }
    

In addition to subclassing the Worker class, programmers must also override the doTask method. There are three essential components to the doTask method:

Starting the transaction
This starts the transaction for a given task computation
Computation
This is the computational phase of the task. In other words, this where the application specific logic is placed
Validation
This validates the results of the computation to check for any read-write or write-write conflicts with the nodes used during the computation

We will go through each component in detail in the sections that follow.


5.2 Begin Transaction

Since Beehive supports transactional computation, programmers have the option to begin their application specific computation in a transaction. Each application specific worker, e.g. GCWorker, will inherit a method called beginTransaction() that will begin the computation that follows as a transaction. Below is an example of how the GCWorker begins a transaction:

        public void doTask(Task tsk) {
            while ( not committed ) {
                // Begin transaction
                TransactionInfo txnInfo = beginTransaction();

                // Application specific computation logic
                // Validate transaction
            }
        }
    

Note that if a transaction is desired, then the while-loop and the validation step is necessary. During the validation step, if the transaction could not be committed due to read-write or write-write conflicts, then the transaction must be retried unless the programmer decides to abort the transaction.

More about TransactionInfo can be found JAVADOC LINK. It is imporant to note that not all of the following computation logic has to be within a single transaction. In fact, programmers can break up their computation logic into smaller transactions.


5.3 Computation

The computation phase is where the application specific logic is placed. Programmers have the option of placing all of their logic in the doTask method or defining auxillary methods which can be called from doTask. The following is an example of the computation phase of the GCWorker:

        public void doTask(Task tsk) {

            while ( not committed ) {
                // Begin transaction
                TransactionInfo txnInfo = beginTransaction();

                // Compute
                GCNode node = (GCNode) datastorage.getNode(tsk.getNodeId());
                Vector neighbors = getNeighbors(node);
                compute(tsk, node, neighbors);

                // Validation
            }
        }

        public void compute(Task tsk, GCNode node, Vector neighbors) {
            try {
                GCNode clone = new GCNode(node); // LINE 1
                if (clone.marked) {
                    return;
                }
                Set neighborColors = new HashSet<>();
                for (Node neighbor : neighbors) {
                    GCNode gcNeighbor = (GCNode) neighbor;
                    if (gcNeighbor.marked) {
                        neighborColors.add(gcNeighbor.color);
                    }
                    rwsetinfo.readSet.add(gcNeighbor.nodeId); // LINE 2
                }

                Vector uniqueColors = new Vector<>(neighborColors);
                Collections.sort(uniqueColors);
                int targetColor = 1;

                for (Integer color : uniqueColors) {
                    if (color > targetColor) {
                        break;
                    } else if (color == targetColor) {
                        targetColor++;
                    }
                }

                clone.marked = true;
                clone.color = targetColor;
                updatedNodes.put(clone.nodeId, clone); // LINE 3
                rwsetinfo.writeSet.add(clone.nodeId);  // LINE 4
            } catch (Exception e) {
                // Excluded for the sake of the example
            }
        }
    

The example above shows the application logic being defined in an auxillary method called compute() which is called from doTask(). We will not go over the specifics of the logic as this is just an example of how to override the doTask() method of the Worker class.

However, note the lines in the compute() method which have comments, these will important for the discussion that follows.


5.4 Validation

If a computation is to be completed within a transaction, then the transaction itself must be validated in order to avoid read-write and write-write conflicts with the nodes used in the computation. Beehive achieves this by providing each Worker class with a rwsetinfo abstraction; more details can be found JAVADOC LINK. From a high level, the id's of each node whose properties were read are added to the read set. Likewise, the id's of each node whose properties were updated are added to the write set and updatedNodes set. This can be seen in lines 2, 3, and 4

Also node in line 1 , a deep copy of the node is created. This is because if the node happens to be local to the Worker's StorageSystem, we do not want to modify its attributes unless the transaction is valid.

Below is an example of how to validate a given transaction:

        public void doTask(Task tsk) {

            boolean commit = false;
            while (!commit) {
                // Begin transaction
                TransactionInfo txnInfo = beginTransaction();

                // Compute
                GCNode node = (GCNode) datastorage.getNode(tsk.getNodeId());
                Vector neighbors = getNeighbors(node);
                compute(tsk, node, neighbors);

                // Validation
                ValidationResponse res = workpool.validate(tsk, rwsetinfo);
                commit = res.commit;
                if (commit) {
                    tsk.txnInfo.commitTS = res.commitTS;
                }
            }
            commitTask(tsk);
        }
    

5.5 run

Each Beehive worker implements Java's Runnable class and thus overrides the run method. At the bare minimum, the worker's run method should query the workpool for a task for the doTask method. Otherwise, other worker related logic may be defined in the run method. For example, should a worker find no task in the workpool, it can start gathering statistics related to the application or sleep for some period of time before querying the workpool again. Below is a simplified version of Beehive's GCWorker's run method, the complete code can be found HERE:

        @Override
        public void run() {
            while (true) {
                try {
                    WorkStatus ws = workpool.getTask();
                    if (ws.status == StatusType.VALID_TASK) {
                        Task tsk = ws.task;

                        if (tsk == null) {
                            // Valid but null task, grab another task
                            continue;
                        }
                        if (tsk.phase.equals("GENERIC")) {
                            doTask(tsk);
                            tasksCreated.clear();
                        } else if (tsk.phase.equals("TERMINATION")) {
                            verifyResults();
                        }
                    } else if (ws.status == StatusType.NO_READY_TASK) {
                        Thread.sleep(100);
                    } else if (ws.status == StatusType.WORK_TERMINATED) {
                        if (master && !terminate) {
                            // If I am master and have not broadcasted a termination
                            cleanUp();
                            Task termTsk = new Task();
                            termTsk.nodeId = "TERMINATE_TASK";
                            termTsk.phase = "TERMINATION";
                            terminate = true;
                            workpool.broadcast(termTsk);
                        }
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    

5.6 Inline Transactions

Although application developers are encouraged to used task-centric transactions, Beehive workers can start "inline transactions". That is, transactions that are independent of the task model. Starting an inline transaction is identical to that one that is started using a Task. The difference is how the transaction is committed. With a task-centric transaction, commitTask is invoked where the task is given as an argument. Conversely, an inline transaction is committed with commitTransaction where the transaction info, obtained from calling beginTransaction, is given as an argument.


5.7 Utilities

Beehive's Worker class provides the following primitives that may be useful in application programming:

  1. getNeighbors(Node u)
  2. getLocalNeighbors(Node u)
  3. getRemoteNeighbors(Vector nbrKeys)
  4. getNeighborIds(Node u)

A full listing and their respective functionalities can be found here JAVADOC.