Skip to content

Task Aggregation

Motivations

For an application generating a large number of tasks, some tasks may become redundant. Consider the Single Shortest Path problem and suppose there are two tasks, A and B, as an example. If the computation of Task B can improve the distance between two nodes better than the computation of Task A can, then maintaining and computing Task A is redundant. To eliminate such redundancies, then, task aggregation combines both Task A and B into a single Task.

abstract public class TaskComparator {
    public static final int IGNORE_NEW_TASK = 1;
    public static final int ADD_TO_TASKPOOL_AND_UPDATE_TASKMAP = 2;
    public static final int ADD_TO_TASKPOOL_AND_DONT_UPDATE_TASKMAP = 3;

    public abstract int combine(Task oldTask, Task newTask);
}

Limitations

As of now, if one decides to use Task Aggregation, then aggregation rules can be defined for only one type of Task within the context of the Application.


Shortest Path Example

Implementation

The following is an example implementation of Task aggregation under the Single Source Shortest Path context.

public class SPTaskComparator extends TaskComparator {

    @Override
    public int combine(Task oldTask, Task newTask) {
        Class c = newTask.getClass();
        String strclassnamearr[] = c.toString().split(" ")[1].split("\\.");
        String newTaskClassName = strclassnamearr[strclassnamearr.length - 1];

        if (newTaskClassName.equals("SPTask")) {
            if (oldTask == null) {
                return ADD_TO_TASKPOOL_AND_UPDATE_TASKMAP;
            } else {
                SPTask oldSPTask = (SPTask) oldTask;
                SPTask newSPTask = (SPTask) newTask;
                replace(oldSPTask, newSPTask);
                return IGNORE_NEW_TASK;
            }
        } else {
              return ADD_TO_TASKPOOL_AND_DONT_UPDATE_TASKMAP;
        }
    }

    public void replace(SPTask oldSPTask, SPTask newSPTask) {
        if ((oldSPTask.distFromSrc + oldSPTask.edgeWeight) > (newSPTask.distFromSrc + newSPTask.edgeWeight)) {
            //copy the content of newTask to the contents of oldTask
            oldSPTask.distFromSrc = newSPTask.distFromSrc;
            oldSPTask.edgeWeight = newSPTask.edgeWeight;
            oldSPTask.senderId = newSPTask.senderId;
            oldSPTask.TS = newSPTask.TS;

            oldSPTask.nodeId = newSPTask.nodeId;
            oldSPTask.txnInfo = newSPTask.txnInfo;
            oldSPTask.phase = newSPTask.phase;
            oldSPTask.affinityLevel = newSPTask.affinityLevel;
            oldSPTask.targetLocation = newSPTask.targetLocation;
            oldSPTask.status = newSPTask.status;
        }
    }
}

Usage within the Workpool

In order to use Task aggregation within the workpool, the following flag needs to be set to true.

LocalWorkpool.enableTaskAggregation=true

Moreover, the workpool needs a reference to the task aggregation implementation, e.g. SPTaskComparator. This can be done by including the following line in the Application start up class, e.g. SPTest:

SPTest loader = new SPTest();
loader.workpool.taskComparator = new SPTaskComparator();