Monthly Archives: June 2015

Distributed Agent Job Chains – getting child results from Reduce() function

Geek alert: This blog post is highly technical and assumes a certain level of Oscript development skills.

In my last blog, I discussed improvements made to the Distributed Agents framework in Update 201506. As a result of that post, John Simon asked if the improvements extended to capturing the results of child tasks on the Reduce() function.  At the time, my research was inconclusive. Today I did a little experiment on two CS10.0 releases, one at U201503 and the other at U201506. The upshot is both behaved identically, so if the behaviour I’m citing is an improvement, it was introduced as early as U201503 (anyone from OpenText DEV care to comment?).

In a nutshell, it is possible to get results to bubble up from child task to master task. This happens in the Reduce() function. Unfortunately, any aggregated results don’t seem to make it to the Finalize task which is where most developers would like to get aggregated data about the results of the tasks that were executed.

So what aggregation *can* be done?

The Reduce() function gets data from two main locations, the list of assoc’s passed in as its one argument, and the .fTaskData feature on the Task object (i.e. your instance of your JobChain orphan).  Tragically, .fTaskData means something completely different depending on what function you are in. i.e. within Split(), it’s the original assoc you passed in with your call to  $DistributedAgent.MapReducePkg.Map(), but something else in Map(), Reduce(), and Finalize(). I summarize below:

blogTable-1

 

Yeah, it’s a bit messed up, and no wonder Oscript developers are often confused about what data they are getting when they are in different functions of the JobChain task object. Before I proceed with how to get child data to bubble up, let’s look at the sequence of events when a JobChain is kicked off. The sequence of events is:

  • Master task executes Split() function, creates a set of Assoc’s for each portion of the job. This list of chunk task data’s is returned in the Split Function’s “set” feature (i.e. rtnVal.set).
  • Master task executes Map() function for the first task data in the set from above (yeah, kind of weird)
  • Child tasks are assigned for each other taskData in the set from above. That means if your split function only split into one chunk, everything gets executed in the Master task.
  • Each Child task executes their Map() function.  Any data you want to go to Reduce should go into retVal.Data for Map’s return
  • Master task executes Reduce() and complains that child tasks outstanding (this sometimes creates an endless loop in builder, but it works fine when the server is running).
  • Each child executes Reduce().
  • Master task re-executes Reduce() and now succeeds
  • Master task executes Finalize().
  • And that’s the flow of execution.

 

 

Now, back to the question of how we aggregate data in the Reduce() function.  As you see from above, the Master Reduce() tries to execute first (it containing the first taskData generated in Splot()). It exits out because child tasks are executing.  The details of how master and child tasks map to one another is always subject to change. For our purposes we don’t need to know. In each child task’s running of Reduce(), the key thing is that at the end of the task, any details you want to get to the Master Reduce() go into rtnVal.childStatus.  So let’s assume a Map() function like the following which returns an object count and a skipped count (i.e # of objects processed and skipped) in a data assoc:

MapFunction

The Reduce() function will get an .fTaskData that is the rtnVal from the function above. You can then write the values of that data to Reduce’s rtnVal.childStatus assoc, i.e.

childStatus.data = .fTaskData.data
rtnVal.childStatus = childStatus

When the master task executes Reduce(), it gets a list of all the childStatus Assoc’s from all the child tasks.  If you loop through them, you would get the results of each Map task. These results along with the results stored in .fTaskData gives you all your task results in one location.

There is of course one small problem.  Both a child task and a master task for which no child tasks executed will appear identical – both will have an empty list for the childResults argument.

There is a cheap trick to solve this. We can take advantage of the fact that in a Job Chain, when our split function splits the job up into smaller pieces, the Master task always takes the first item in the list – even if its a list of 1. If we were to assign a row count to each child taskData that indicates the row, and we returned that in our Map() function in the results, when we’re in Reduce() we would always know whether the task is a master with no children or a child by the rowcount. In Split() you’d do something like this:

RecArray rows = ...some SQL to get back our distributed results
Record row
Integer rowCount = 0
for row in rows
   Assoc chunkData = Assoc.Copy(.fTaskData)
   ...set whatever else you need to on chunkData
   chunkData.RowNumber = rowCount
   rowCount += 1
end

In the Map() function, you would add a line like

data.rowNumber = .fTaskData.RowNumber

Now your Reduce function would look something like:

/**
* This method will reduce the task results.
* 
* @param {List} childResults Results from execution of child tasks
*
* @return {Assoc}
* @returnFeature {Boolean} ok FALSE if an error occurred, TRUE otherwise
* @returnFeature {String} errMsg Error message, if an error occurred
* @returnFeature {Assoc} childStatus ok/errMsg Assoc aggregating status of child results
* @returnFeature {Integer} childStatus.count Number of facets/columns indexed
* @returnFeature {RecArray} errDetail Detailed information about errors
* 
**/

function Assoc Reduce(\
 List childResults )
 
 Assoc result
 Assoc taskData = .fTaskData
 Assoc data = taskData.data
 
 // Assuming we set this up in Split(), RowNumber() will always be 0 for the 
 // master task
 Boolean isMaster = IsDefined( data.RowNumber) && data.RowNumber == 0 ? TRUE : FALSE
 
 // Get the results of the first task which was executed as the master
 Integer count = data.count
 Integer skipped = data.skipped
 
 // Aggregate the rest of the results
 if IsDefined( childResults )
    for result in childResults
       // Assume our child counts are in data
       if IsDefined( result.data )
          count += result.data.count
          skipped += result.data.skipped
       end
    end
 end
 
 if isMaster
    // do something with aggregated count/skipped integers
 else
    childStatus.data = data // For the child task, push this
 end
 
 rtnVal.ok = ok
 rtnVal.errMsg = errMsg
 rtnVal.errDetail = errDetail
 rtnVal.childStatus = childStatus
 
 return rtnVal
end

Using the above Reduce() function, we know which instance of Reduce() is our master, and we received an aggregated count from the Map(). Any other information could be passed such as error information from the child task.  It is unfortunate that this information doesn’t bubble up to Finalize(), and perhaps that is what OpenText is still working on.

If you’ve read this far, you must be a veteran Oscripter 🙂  Comments are welcome, even to tell me I have something completely wrong.

Distributed Agent improvements in CS 10.X U 201506

As I had promised in a linked-in status update, I would write something about the changes to distributed agents in the latest updates from Open Text.  Over the weekend, I installed the latest 10.0 release, and today I installed the latest 10.5 release.  At first, it wasn’t obvious what the change was between them. However, the change is there, under System Administration        –>Distributed Agent Status.  Unfortunately it is only available in the 10.5 release of U201506, not the 10.0 release.   On this page, which previously just listed the status of the worker threads, you can now set a black-out period.  So what does this do for us exactly?

The handy thing about specifying a black-out period for DA’s is that the ones which are the most process intense won’t be consuming your system resources during business hours when you want Content Server to be at its most responsive. The main DA’s that come with Content Server are the ones that rebuild your facet and custom columns tables, and the ones that purge deleted items from DTree (now called DTreeCore) – all operations that are potentially database intensive.

Presumably, a task that is running would not suddenly halt when the outage window time arrives but rather finish what it’s doing and exit.  This should be OK as DA tasks are intended to be short (i.e. 1-10 seconds in duration).  What is not clear is whether a job chain would simply pick up where it left off, or would the job need to be rescheduled. If it is the former, that would be pretty brilliant, and this would have little or no impact on 3rd party vendors.

This is a good feature, but the one drawback is that it is all or nothing.  That is, it prevents all DA tasks from running during an outage period. It is conceivable that facet and column updates would need to continue during the day while the purge tasks could be deferred to after hours. It is conceivable that a system administrator may want to prevent facets from updating but need columns to update during business hours (or vice versa).  Admittedly this is more of an issue for any 3rd party Technology Partner that develops a module for Content Server that uses Distributed Agents.

Prior to the introduction of this outage window for DA’s, partner developers would need to introduce their own semaphores to manage any outages such as not running during business hours. In my company’s product, BMUP, outages are managed through an administration screen which stores a weekly configuration in the KINI table, and a job table that keeps track of each DA task that is launched by user actions. Any task that is launched during a time when DA tasks aren’t supposed to run would be scheduled for later. For any job chain that was still running, each individual task would detect the outage, exit, then the master task would reschedule itself to carry on later. For the core DA job chains that Open Text provides, this level of control is probably overkill. After all, the column & facet jobs are automatically spawned, triggered by changes in Content Server.

Perhaps this is already in the works, but the next change from Open Text to the DA framework, might be the ability to increase the granularity so that particular task types  can have different outage periods.

I will be later writing more about programming with Distributed Agents, particularly because it is in my view an underrated and underutilized framework for doing some really good scalable development in Content Server. Kudos to OpenText for allowing DA’s to be restricted to not run during pre-defined periods.

Welcome to the blog

This blog will be where I talk about various things in the ECM space, particularly focused on OpenText Content Server. My focus will be more development oriented, where I will discuss Oscript, Web Services, ReST, and Web Reports development.