Tasks

The following is a simplified version of the Scala definition:

  trait Task[I,O] {
    type Params = Map[String,Any]

    def name:String
    def params:Params
    def id:ID = ID(s"$name")
    def trace:Trace = Trace(s"$name($params)")
  }

A task has the generic input type I, the generic output type O and and the generic accumulator type A. One can therefore define and use a task specifying the appropriate input and output types at compile time. In order to facilitate task execution tracking, a task has a name and a list of parameters params that will be collected twice: once during the compilation phase and later during the execution phase. Note how the parameter map is untyped (has type Any). This list is only used for tracking. If you need to store the parameters, make sure they are correctly typed. Both name and params should be defined (or in Scala parlance, overridden).

Creating a Transformer

The following is a simplified version of the Scala definition:

  trait Transformer[I,O,A] extends Task[I,O] {
    def pre(i:I): Either[ADWError,Task[I,O]] = Right(this)
    def f(i:I): Either[ADWError,O]
  }

A transformer is a specific task that takes input and aps it to an output.

In order to create your own task you will need to define some or all of the remaining trait members and methods1.
The pre(i:I) methods will be called just before f(i:I) is called. Here we can perform checks and initializations that may be required (for example determine parameters dynamically based on the input). The task must then return a copy of its initialized self. By default if pre(i:I) is not overridden, it will return the defined task as is. Note that the return type is an Either[ADWError,Task[I,O]] so that if an error occurs the task is free to return an error (for example Left(ADWError("An error occurred.")) ). In such cases the pipeline execution is terminated immediately.

The f(i:I) is the workhorse of the input (loading) or transformation tasks that consume a prior tasks output. Here we consume the data i:I and generate the output O. As with the previous method the return type is an Either[ADWError,O] so that if an error occurs the task is free to return a error (for example Left(ADWError("Execution failed.")) ). In such cases the pipe execution is terminated immediately.

Creating and Aggregator

The following is a simplified version of the Scala definition:

  trait Aggregator[I,A] {
    def zero: A
    def collect(acc: A, i: Either[ADWError,I], ta:Long, track: Executing): A
  }

Tasks may also be used to collect the output of one or more pipelines. We say that the task is an aggregator or performs aggregation. An aggregator is a monoids. This means that two properties are required of the aggregator: an identity element (the zero:A member) and that its binary operation (collect(acc: A, i: Either[ADWError,I], ta:Long, track: Executing): A) be associative. None of these properties is enforced. It is the client’s responsibility to ensure these properties exist. The identity will be the first value of the accumulator passed on to the collect method. It is also the same value as the output of the previous call to the collect method. The second parameters i: Either[ADWError,I] is the output of one of the executed pipelines, which may have produced an error. The parameters ta:Long represent the wall time (in nanoseconds) that it took to execute the pipeline that generated the input iand the f(i:I) of the aggregator respectively2. The last parameter track: Executing has information on all of the tasks (and their parameters) that were executed by the pipeline that provides the input i.

The aggregator can be used for example to select the maximum output of a set of pipelines or even the sum of those outputs. Note that these to operations are associative (don’t depend on the order of the inputs of the pipelines). Associativity is important because when the pipelines are executed in parallel, the order in which their outputs are generated cannot be guaranteed.

Manually defining a Transformer Task

We now use the Scala REPL (Read, Eval, Print, Loop) to show how to create and call tasks. Once you have imported the library, you too may use the REPL to replicate these experiments. We first start off by importing the Tasks classes and objects. This includes implicit Scala definitions and methods that make it easier to create and use tasks. We will also need to import th definitions for ADWError (contains the error messages) and Executing (tracks the execution of the tasks) classes.

import pt.inescn.search.stream.Tasks._
// import pt.inescn.search.stream.Tasks._

import pt.inescn.utils.ADWError
// import pt.inescn.utils.ADWError

import pt.inescn.search.stream.Pipes.Executing
// import pt.inescn.search.stream.Pipes.Executing

When tasks are executed, their execution trace is recorded. This allows us to:

  • Debug what is happening in a pipe
  • Collect results to be processed later (for example experiments using different parameters)
  • Control execution flow (for example execute the next task based on the previous results) This means we should record the tasks name and its parameters’ names and values. In order to avoid the need to repeatedly write out this boilerplate, Scala macros (meta-programming) is used to generate this code. To be able to use these macros transparently, the following imports are required.
import pt.inescn.macros.MacroCore.TaskInfo
// import pt.inescn.macros.MacroCore.TaskInfo

import pt.inescn.macros.TaskMacro._
// import pt.inescn.macros.TaskMacro._

We then define a anonymous class to create the task object assigned to concat.

val concat = new Transformer[String,String] {
    val ap: String = "_x"

    override val name: String = "concat_name"
    override val params: Params = Map("ap" -> ap)
    
    override def f(i:String):Either[ADWError, String] = Right(i + ap)
  }
// concat: pt.inescn.search.stream.Tasks.Transformer[String,String]{val ap: String; val name: String; val params: this.Params} = concat_name(Map(ap -> _x))

The task above takes as input a Stringand outputs a String. Note that we left the pre(i:I) member with its default implementation. The params map records the tasks parameters and their values. As we will see later, the macros referred to above can setup the tasks’ name and params members automatically for you. We can now use the task so:

val input = "input"
// input: String = input

val output = concat.f(input)
// output: Either[pt.inescn.utils.ADWError,String] = Right(input_x)

val concat1 = concat.pre(input)
// concat1: Either[pt.inescn.utils.ADWError,pt.inescn.search.stream.Tasks.Transformer[String,String]] = Right(concat_name(Map(ap -> _x)))

println(concat1)
// Right(concat_name(Map(ap -> _x)))

val sameOutput = concat1.right.get.f(input)
// sameOutput: Either[pt.inescn.utils.ADWError,String] = Right(input_x)

If we want to parameterize and reuse a task then it is best to define a Transformer class as follows:

case class Concat(ap: String = "_x") extends Transformer[String,String] {
    override val name: String = "concat_name"
    override val params: Params = Map("ap" -> ap)
    
    def f(i:String):Either[ADWError, String] = Right(i + ap)
  }
// defined class Concat

Now we have to explicitly instantiate the Concat task before we can use it. Here we create two instances of the task, each using a different parameter:

val input = "input"
// input: String = input

val concatOne = Concat("_1")
// concatOne: Concat = concat_name(Map(ap -> _1))

val output1 = concatOne.f(input)
// output1: Either[pt.inescn.utils.ADWError,String] = Right(input_1)

val concatTwo = Concat("_2")
// concatTwo: Concat = concat_name(Map(ap -> _2))

val output2 = concatTwo.f(input)
// output2: Either[pt.inescn.utils.ADWError,String] = Right(input_2)

Manually defining an Aggregator Task

We proceed as we did for the Transformer but now we define the zero and collect members of the Aggregator class. Lets assume that we want to aggregate the results of the various instances of the Concat task that we defined above. We will store all of the output into a list. Here is a simple implementation that collects the Concat String output into a List[String].

val allConcats = new Aggregator[String,List[String]] {
    type Acc = List[String]

    override val name: String = "all_concats_name"
    // No parameters
    override val params: Params = Map()
    
    override def zero: Acc = List()
    override def collect(acc: Acc, i: Either[ADWError,String], ta:Long, track: Executing): Acc = 
    i match {
      case(Right(s)) => s :: acc
      case _ => acc
    }
  }
// allConcats: pt.inescn.search.stream.Tasks.Aggregator[String,List[String]]{type Acc = List[String]; val name: String; val params: this.Params} = all_concats_name(Map())

In the example below we simulate pipeline execution of 1 and 2 Concat tasks. For the second case we will place these concat tasks into a list and then fold over this list so that we can execute the tasks and then have the aggregator collect the output.

val all = List(concatOne, concatTwo)
// all: List[Concat] = List(concat_name(Map(ap -> _1)), concat_name(Map(ap -> _2)))

// Simulating the execution of a task that took no time
val ex1 = allConcats.collect(allConcats.zero, all.head.f(input), 0, Executing())
// ex1: List[String] = List(input_1)

// Simulating the execution of a set of tasks that each took no time
val ex = all.foldLeft(allConcats.zero){ case(acc,c) => allConcats.collect(acc, c.f(input), 0, Executing()) }
// ex: List[String] = List(input_2, input_1)

Automatically defining a Transformer Task

In the case of instantiating a transformer task via an anonymous class when using the default pre(i:I) definition, we need to repeatedly create the same boilerplate. To make things easier, it is possible to use Scala implicits to find and create the appropriate task by only furnishing a function with the same signature as f(i:I). In the following example we create a new version of the concat instance using this technique:

    concatOne.getClass.getInterfaces
// res3: Array[Class[_]] = Array(interface pt.inescn.search.stream.Tasks$Transformer, interface scala.Product, interface scala.Serializable)

    // The function
    def concatFunc(ap:String, i:String):Either[ADWError, String] = Right(i + ap)
// concatFunc: (ap: String, i: String)Either[pt.inescn.utils.ADWError,String]

    // Its parameter value
    val ap = "_3"
// ap: String = _3

    // Create a task semi-autmatically
    val concat0 = T(nameFunc(concatFunc _), ap)
// concat0: pt.inescn.search.stream.Pipes.Func[String,String] = Func(concatFunc(Map(ap -> _3)))

    concat0.t.f(input)
// res7: Either[pt.inescn.utils.ADWError,String] = Right(input_3)

    // Create the task
    val concat =  T1(concatFunc _, ap)
// concat: pt.inescn.search.stream.Pipes.Func[String,String] = Func(concatFunc(Map(ap -> _3)))

    concat.t.f(input)
// res9: Either[pt.inescn.utils.ADWError,String] = Right(input_3)

    concat.getClass.getInterfaces
// res10: Array[Class[_]] = Array(interface pt.inescn.search.stream.Pipes$Op, interface scala.Product, interface scala.Serializable)

Some important details regarding the task type and the use of of parameters. First note that the new concat we created does not have the expected (compare the first and last lines) type. This is because we usually create tasks to be used within a Pipe, so the shorthand above immediately wraps the Task in the appropriate pipe construct. In order to access the task, we need to dereference it with the .t method. In the next section we will see another example of a shorthand means of creating multiple tasks with different parameter that is also geared for use with the pipes.

Another issue is that this shorthand means of creating a task uses an anonymous class. So it is not possible to provide parameters to a constructor. Notice that here the parameter ap = "_3" is passed directly to the task builder, which passes this on to the concatFunc. As a result and per convention all initial parameters of the function will be assigned as the task’s parameters. The last function parameter, however, will always be the input i:I. In order to allow for the use of multiple parameters combinations we also have a shorthand to create multiple tasks using the same transformer function f(i:I) but applied to different parameter values.

NOTE: As of this writing (Scala version 2.12.8), the example above does not work if we use the T method. Type inference fails (see an issue placed in the Scala user group question). We have created a T1 to circumvent this problem but the T should otherwise be used.

Automatically defining Parameterized Transformer

Our goal of setting up Pipes is to automatically generate pipelines of task functions that can be executed and evaluated using different hyper-parameters. As a result we provide a Pipe construct that groups tasks that have the same input and output (we are however free to use different functions). In order to evaluate, for example, a function using various parameters we need only generate several tasks using the same function but using different parameter values (one task for every parameter combination). The next example shows how we can test the same concat function using various parameters:

    concatOne.getClass.getInterfaces
// res11: Array[Class[_]] = Array(interface pt.inescn.search.stream.Tasks$Transformer, interface scala.Product, interface scala.Serializable)

    // The function
    def concatFunc(ap:String, i:String):Either[ADWError, String] = Right(i + ap)
// concatFunc: (ap: String, i: String)Either[pt.inescn.utils.ADWError,String]

    // The parameter values we want to evaluate
    val ap = Iterable("_1", "_2", "_3")
// ap: Iterable[String] = List(_1, _2, _3)

    // Create the task
    val concat =  T(concatFunc _, ap)
// concat: pt.inescn.search.stream.Pipes.All[String,String] = All(Tasks(Func(concatFunc(Map(ap -> _1))), Func(concatFunc(Map(ap -> _2))), Func(concatFunc(Map(ap -> _3)))))

    import pt.inescn.search.stream.Pipes
// import pt.inescn.search.stream.Pipes

    val op: Pipes.Op[String, String] = concat.f.head
// op: pt.inescn.search.stream.Pipes.Op[String,String] = Func(concatFunc(Map(ap -> _1)))

    val func = op.asInstanceOf[Pipes.Func[String, String]]
// func: pt.inescn.search.stream.Pipes.Func[String,String] = Func(concatFunc(Map(ap -> _1)))

    func.t.f(input)
// res15: Either[pt.inescn.utils.ADWError,String] = Right(input_1)

    concat.getClass.getInterfaces
// res16: Array[Class[_]] = Array(interface pt.inescn.search.stream.Pipes$Op, interface scala.Product, interface scala.Serializable)

Note that we can use iterators to enumerate the parameter values. This means that we can potentially generate an infinite number of tasks. When constructing the Pipe we can then sample the pipelines using, for example, a set of random hyper-parameters. In addition to this when we have two or more parameters, we must stipulate how these parameters are combined. For example we may test all combinations of hyper-parameters, which is equivalent to performing a grid search. To define such a search strategy we use a set of search operators.

Automatically defining Aggregator Task

As of this writing, no automated or short hand means exist to create these tasks.


  1. Suggestion: when defining your task define it as a case class. This will facilitate coding and debugging because Scala case class will automatically get readable toString and the unapply methods that makes it easy to compare with other tasks. 

  2. Note: when aggregating, no pre(i:I) method exists.