Task Aggregation
Motivations
For an application generating a large number of tasks, some tasks may become redundant. Consider the Single Shortest Path problem and suppose there are two tasks, A and B, as an example. If the computation of Task B can improve the distance between two nodes better than the computation of Task A can, then maintaining and computing Task A is redundant. To eliminate such redundancies, then, task aggregation combines both Task A and B into a single Task.
abstract public class TaskComparator { public static final int IGNORE_NEW_TASK = 1; public static final int ADD_TO_TASKPOOL_AND_UPDATE_TASKMAP = 2; public static final int ADD_TO_TASKPOOL_AND_DONT_UPDATE_TASKMAP = 3; public abstract int combine(Task oldTask, Task newTask); }
Limitations
As of now, if one decides to use Task Aggregation, then aggregation rules can be defined for only one type of Task within the context of the Application.
Shortest Path Example
Implementation
The following is an example implementation of Task aggregation under the Single Source Shortest Path context.
public class SPTaskComparator extends TaskComparator { @Override public int combine(Task oldTask, Task newTask) { Class c = newTask.getClass(); String strclassnamearr[] = c.toString().split(" ")[1].split("\\."); String newTaskClassName = strclassnamearr[strclassnamearr.length - 1]; if (newTaskClassName.equals("SPTask")) { if (oldTask == null) { return ADD_TO_TASKPOOL_AND_UPDATE_TASKMAP; } else { SPTask oldSPTask = (SPTask) oldTask; SPTask newSPTask = (SPTask) newTask; replace(oldSPTask, newSPTask); return IGNORE_NEW_TASK; } } else { return ADD_TO_TASKPOOL_AND_DONT_UPDATE_TASKMAP; } } public void replace(SPTask oldSPTask, SPTask newSPTask) { if ((oldSPTask.distFromSrc + oldSPTask.edgeWeight) > (newSPTask.distFromSrc + newSPTask.edgeWeight)) { //copy the content of newTask to the contents of oldTask oldSPTask.distFromSrc = newSPTask.distFromSrc; oldSPTask.edgeWeight = newSPTask.edgeWeight; oldSPTask.senderId = newSPTask.senderId; oldSPTask.TS = newSPTask.TS; oldSPTask.nodeId = newSPTask.nodeId; oldSPTask.txnInfo = newSPTask.txnInfo; oldSPTask.phase = newSPTask.phase; oldSPTask.affinityLevel = newSPTask.affinityLevel; oldSPTask.targetLocation = newSPTask.targetLocation; oldSPTask.status = newSPTask.status; } } }
Usage within the Workpool
In order to use Task aggregation within the workpool, the following flag needs to be set to true.
LocalWorkpool.enableTaskAggregation=true
Moreover, the workpool needs a reference to the task aggregation implementation, e.g. SPTaskComparator. This can be done by including the following line in the Application start up class, e.g. SPTest:
SPTest loader = new SPTest(); loader.workpool.taskComparator = new SPTaskComparator();