Tasks
The following is a simplified version of the Scala definition:
trait Task[I,O] {
type Params = Map[String,Any]
def name:String
def params:Params
def id:ID = ID(s"$name")
def trace:Trace = Trace(s"$name($params)")
}
A task has the generic input type I
, the generic output type O
and and the generic accumulator type A
. One can
therefore define and use a task specifying the appropriate input and output types at compile time. In order to
facilitate task execution tracking, a task has a name
and a list of parameters params
that will be collected twice:
once during the compilation phase and later during the execution phase. Note how the parameter map is untyped (has
type Any
). This list is only used for tracking. If you need to store the parameters, make sure they
are correctly typed. Both name
and params
should be defined (or in Scala parlance, overridden).
Creating a Transformer
The following is a simplified version of the Scala definition:
trait Transformer[I,O,A] extends Task[I,O] {
def pre(i:I): Either[ADWError,Task[I,O]] = Right(this)
def f(i:I): Either[ADWError,O]
}
A transformer is a specific task that takes input and aps it to an output.
In order to create your own task you will need to define some or all of the remaining trait members and methods1.
The pre(i:I)
methods will be called just before f(i:I)
is called. Here we can perform checks and initializations
that may be required (for example determine parameters dynamically based on the input). The task must then return a
copy of its initialized self. By default if pre(i:I)
is not overridden, it will return the defined task as is. Note
that the return type is an Either[ADWError,Task[I,O]]
so that if an error occurs the task is free to return an error
(for example Left(ADWError("An error occurred."))
). In such cases the pipeline execution is terminated immediately.
The f(i:I)
is the workhorse of the input (loading) or transformation tasks that consume a prior tasks output.
Here we consume the data i:I
and generate the output O
. As with the previous method the return type is an
Either[ADWError,O]
so that if an error occurs the task is free to return a error (for example
Left(ADWError("Execution failed."))
). In such cases the pipe execution is terminated immediately.
Creating and Aggregator
The following is a simplified version of the Scala definition:
trait Aggregator[I,A] {
def zero: A
def collect(acc: A, i: Either[ADWError,I], ta:Long, track: Executing): A
}
Tasks may also be used to collect the output of one or more pipelines. We say that the task is an aggregator or
performs aggregation. An aggregator is a monoids. This means
that two properties are required of the aggregator: an identity element (the zero:A
member) and that its binary
operation (collect(acc: A, i: Either[ADWError,I], ta:Long, track: Executing): A
) be associative. None of
these properties is enforced. It is the client’s responsibility to ensure these properties exist. The identity will be
the first value of the accumulator passed on to the collect
method. It is also the same value as the output of the
previous call to the collect
method. The second parameters i: Either[ADWError,I]
is the output of one of the
executed pipelines, which may have produced an error. The parameters ta:Long
represent the wall time (in nanoseconds)
that it took to execute the pipeline that generated the input i
and the f(i:I)
of the aggregator respectively2.
The last parameter track: Executing
has information on all of the tasks (and their parameters) that were executed by
the pipeline that provides the input i
.
The aggregator can be used for example to select the maximum output of a set of pipelines or even the sum of those outputs. Note that these to operations are associative (don’t depend on the order of the inputs of the pipelines). Associativity is important because when the pipelines are executed in parallel, the order in which their outputs are generated cannot be guaranteed.
Manually defining a Transformer Task
We now use the Scala REPL (Read, Eval, Print, Loop) to show how to create and call tasks. Once you have imported the
library, you too may use the REPL to replicate these experiments. We first start off by importing the Tasks
classes
and objects. This includes implicit Scala definitions and methods that make it easier to create and use tasks. We will
also need to import th definitions for ADWError
(contains the error messages) and Executing
(tracks the execution
of the tasks) classes.
import pt.inescn.search.stream.Tasks._
// import pt.inescn.search.stream.Tasks._
import pt.inescn.utils.ADWError
// import pt.inescn.utils.ADWError
import pt.inescn.search.stream.Pipes.Executing
// import pt.inescn.search.stream.Pipes.Executing
When tasks are executed, their execution trace is recorded. This allows us to:
- Debug what is happening in a pipe
- Collect results to be processed later (for example experiments using different parameters)
- Control execution flow (for example execute the next task based on the previous results) This means we should record the tasks name and its parameters’ names and values. In order to avoid the need to repeatedly write out this boilerplate, Scala macros (meta-programming) is used to generate this code. To be able to use these macros transparently, the following imports are required.
import pt.inescn.macros.MacroCore.TaskInfo
// import pt.inescn.macros.MacroCore.TaskInfo
import pt.inescn.macros.TaskMacro._
// import pt.inescn.macros.TaskMacro._
We then define a anonymous
class to create the task object assigned to concat
.
val concat = new Transformer[String,String] {
val ap: String = "_x"
override val name: String = "concat_name"
override val params: Params = Map("ap" -> ap)
override def f(i:String):Either[ADWError, String] = Right(i + ap)
}
// concat: pt.inescn.search.stream.Tasks.Transformer[String,String]{val ap: String; val name: String; val params: this.Params} = concat_name(Map(ap -> _x))
The task above takes as input a String
and outputs a String
. Note that we left the pre(i:I)
member with its
default implementation. The params
map records the tasks parameters and their values. As we will see later,
the macros referred to above can setup the tasks’ name
and params
members automatically for you. We can now use
the task so:
val input = "input"
// input: String = input
val output = concat.f(input)
// output: Either[pt.inescn.utils.ADWError,String] = Right(input_x)
val concat1 = concat.pre(input)
// concat1: Either[pt.inescn.utils.ADWError,pt.inescn.search.stream.Tasks.Transformer[String,String]] = Right(concat_name(Map(ap -> _x)))
println(concat1)
// Right(concat_name(Map(ap -> _x)))
val sameOutput = concat1.right.get.f(input)
// sameOutput: Either[pt.inescn.utils.ADWError,String] = Right(input_x)
If we want to parameterize and reuse a task then it is best to define a Transformer
class as follows:
case class Concat(ap: String = "_x") extends Transformer[String,String] {
override val name: String = "concat_name"
override val params: Params = Map("ap" -> ap)
def f(i:String):Either[ADWError, String] = Right(i + ap)
}
// defined class Concat
Now we have to explicitly instantiate the Concat
task before we can use it. Here we create two instances of the task,
each using a different parameter:
val input = "input"
// input: String = input
val concatOne = Concat("_1")
// concatOne: Concat = concat_name(Map(ap -> _1))
val output1 = concatOne.f(input)
// output1: Either[pt.inescn.utils.ADWError,String] = Right(input_1)
val concatTwo = Concat("_2")
// concatTwo: Concat = concat_name(Map(ap -> _2))
val output2 = concatTwo.f(input)
// output2: Either[pt.inescn.utils.ADWError,String] = Right(input_2)
Manually defining an Aggregator Task
We proceed as we did for the Transformer
but now we define the zero
and collect
members of the Aggregator
class.
Lets assume that we want to aggregate the results of the various instances of the Concat
task that we defined above.
We will store all of the output into a list. Here is a simple implementation that collects the Concat
String
output
into a List[String]
.
val allConcats = new Aggregator[String,List[String]] {
type Acc = List[String]
override val name: String = "all_concats_name"
// No parameters
override val params: Params = Map()
override def zero: Acc = List()
override def collect(acc: Acc, i: Either[ADWError,String], ta:Long, track: Executing): Acc =
i match {
case(Right(s)) => s :: acc
case _ => acc
}
}
// allConcats: pt.inescn.search.stream.Tasks.Aggregator[String,List[String]]{type Acc = List[String]; val name: String; val params: this.Params} = all_concats_name(Map())
In the example below we simulate pipeline execution of 1 and 2 Concat
tasks. For the second case we will place these
concat tasks into a list and then fold over this list so that we can execute the tasks and then have the aggregator
collect the output.
val all = List(concatOne, concatTwo)
// all: List[Concat] = List(concat_name(Map(ap -> _1)), concat_name(Map(ap -> _2)))
// Simulating the execution of a task that took no time
val ex1 = allConcats.collect(allConcats.zero, all.head.f(input), 0, Executing())
// ex1: List[String] = List(input_1)
// Simulating the execution of a set of tasks that each took no time
val ex = all.foldLeft(allConcats.zero){ case(acc,c) => allConcats.collect(acc, c.f(input), 0, Executing()) }
// ex: List[String] = List(input_2, input_1)
Automatically defining a Transformer Task
In the case of instantiating a transformer task via an anonymous class when using the default pre(i:I)
definition, we
need to repeatedly create the same boilerplate. To make things easier, it is possible to use Scala implicits to find and
create the appropriate task by only furnishing a function with the same signature as f(i:I)
. In the following example
we create a new version of the concat
instance using this technique:
concatOne.getClass.getInterfaces
// res3: Array[Class[_]] = Array(interface pt.inescn.search.stream.Tasks$Transformer, interface scala.Product, interface scala.Serializable)
// The function
def concatFunc(ap:String, i:String):Either[ADWError, String] = Right(i + ap)
// concatFunc: (ap: String, i: String)Either[pt.inescn.utils.ADWError,String]
// Its parameter value
val ap = "_3"
// ap: String = _3
// Create a task semi-autmatically
val concat0 = T(nameFunc(concatFunc _), ap)
// concat0: pt.inescn.search.stream.Pipes.Func[String,String] = Func(concatFunc(Map(ap -> _3)))
concat0.t.f(input)
// res7: Either[pt.inescn.utils.ADWError,String] = Right(input_3)
// Create the task
val concat = T1(concatFunc _, ap)
// concat: pt.inescn.search.stream.Pipes.Func[String,String] = Func(concatFunc(Map(ap -> _3)))
concat.t.f(input)
// res9: Either[pt.inescn.utils.ADWError,String] = Right(input_3)
concat.getClass.getInterfaces
// res10: Array[Class[_]] = Array(interface pt.inescn.search.stream.Pipes$Op, interface scala.Product, interface scala.Serializable)
Some important details regarding the task type and the use of of parameters. First note that the new concat
we
created does not have the expected (compare the first and last lines) type. This is because we usually create tasks to
be used within a Pipe, so the shorthand above immediately wraps the Task
in the appropriate pipe
construct. In order to access the task, we need to dereference it with the .t
method. In the next section we will see
another example of a shorthand means of creating multiple tasks with different parameter that is also geared for use
with the pipes.
Another issue is that this shorthand means of creating a task uses an anonymous class. So it is not possible to provide
parameters to a constructor. Notice that here the parameter ap = "_3"
is passed directly to the task builder, which
passes this on to the concatFunc
. As a result and per convention all initial parameters of the function will be
assigned as the task’s parameters. The last function parameter, however, will always be the input i:I
. In order to
allow for the use of multiple parameters combinations we also have a shorthand to create
multiple tasks using the same transformer function f(i:I)
but applied to different parameter values.
NOTE: As of this writing (Scala version 2.12.8), the example above does not work if we use the T
method. Type
inference fails (see an issue placed in the Scala user group question).
We have created a T1
to circumvent this problem but the T
should otherwise be used.
Automatically defining Parameterized Transformer
Our goal of setting up Pipe
s is to automatically generate pipelines of task functions that can be executed and
evaluated using different hyper-parameters. As a result we provide a Pipe construct that groups tasks
that have the same input and output (we are however free to use different functions). In order to evaluate, for example,
a function using various parameters we need only generate several tasks using the same function but using different
parameter values (one task for every parameter combination). The next example shows how we can test the same concat
function
using various parameters:
concatOne.getClass.getInterfaces
// res11: Array[Class[_]] = Array(interface pt.inescn.search.stream.Tasks$Transformer, interface scala.Product, interface scala.Serializable)
// The function
def concatFunc(ap:String, i:String):Either[ADWError, String] = Right(i + ap)
// concatFunc: (ap: String, i: String)Either[pt.inescn.utils.ADWError,String]
// The parameter values we want to evaluate
val ap = Iterable("_1", "_2", "_3")
// ap: Iterable[String] = List(_1, _2, _3)
// Create the task
val concat = T(concatFunc _, ap)
// concat: pt.inescn.search.stream.Pipes.All[String,String] = All(Tasks(Func(concatFunc(Map(ap -> _1))), Func(concatFunc(Map(ap -> _2))), Func(concatFunc(Map(ap -> _3)))))
import pt.inescn.search.stream.Pipes
// import pt.inescn.search.stream.Pipes
val op: Pipes.Op[String, String] = concat.f.head
// op: pt.inescn.search.stream.Pipes.Op[String,String] = Func(concatFunc(Map(ap -> _1)))
val func = op.asInstanceOf[Pipes.Func[String, String]]
// func: pt.inescn.search.stream.Pipes.Func[String,String] = Func(concatFunc(Map(ap -> _1)))
func.t.f(input)
// res15: Either[pt.inescn.utils.ADWError,String] = Right(input_1)
concat.getClass.getInterfaces
// res16: Array[Class[_]] = Array(interface pt.inescn.search.stream.Pipes$Op, interface scala.Product, interface scala.Serializable)
Note that we can use iterators to enumerate the parameter values. This means that we can potentially generate an
infinite number of tasks. When constructing the Pipe
we can then sample the pipelines using, for example, a set of
random hyper-parameters. In addition to this when we have two or more parameters, we must stipulate how these parameters
are combined. For example we may test all combinations of hyper-parameters, which is equivalent to performing a grid
search. To define such a search strategy we use a set of search operators.
Automatically defining Aggregator Task
As of this writing, no automated or short hand means exist to create these tasks.