FDMEE Parallelism Gone Wild? Rein it in with custom throttling!

FDMEE Parallelism Gone Wild?  Rein it in with custom throttling!

If you're like me, you've noticed (perhaps despairingly) that FDMEE settings can be a bit extreme. You need to tightly control the order and concurrency of your data loads? Sequential to the rescue! Oh wait now, you need to allow multiple loads to process at the same time? Parallel to the rescue! Oh wait again, neither of these solutions can handle users who do/expect the following:

  • Conspire against you by launching 50 batches in a short window, crippling your system performance
  • Accidentally execute the same batch two or more times in a row (maybe because they didn't wait long enough for the confirmation popup, perhaps due to some system latency, see the point above)
  • Have certain processes which need sequential loading
  • Have other processes that can run in parallel and should finish as quickly as possible

While on its face, the solution is pretty straightforward: create batches for these different processes. In cases where order matters, configure the batch to perform sequential processing. In those other cases where order is irrelevant, configure the batch to allow parallel processing. Go into your ODI topology to allow parallel processing and you're all good...or mostly good...

You have the perfect plan save for one wrinkle, namely that need to limit total concurrent processing. If you allow full bore parallel processing you can quickly find that its like a fire hose. Having a lot of data flowing concurrently through your TDATASEG_T table, especially when there are a lot of mapping rules, can really gum things down.

If only there was a way to control how many concurrent jobs FDMEE will allow. Not just within a batch, but across all batches...or manually executed DLRs...basically over the whole system. FDMEE user guide, system settings, application settings, nope nothing there fits the bill.

So what's a consultant to do? Roll a custom throttling solution of course! And in fact, it turns out that it can be done pretty simply. For extra credit, I'll even include a few different versions of the code which allow different options/restrictions/configurations.

My implementation relies on the BefImport event scripts to check whether the current process should be allowed to run. If so, then along it goes. If not, then it waits a random interval of time before checking again. On and on it goes until it gets the green light.

Getting to the actual solution, first you need to have modified FDMEE and ODI to allow parallel processing. Then you need to have added a BefImport script for each of your target applications. Once those are made you can get to work by following three simple steps:

  1. Edit BefImport to import your required libraries (note that I'm assuming you have some sort of custom shared library for all your jython goodies):
import MainLib as mainLib
import time
import random
  1. Edit BefImport to check your throttling function for permission to run (I wait a random amount of time between 15 and 45 seconds, obviously you can change that according to your liking):
#Wait until the process is allowed to continue
while mainLib.AllowDataImport(fdmContext["LOADID"], fdmAPI) == False:
	time.sleep(random.randint(15,45))
  1. Edit your custom shared library to add your throttling function which might look like:

a) This (simple first in, first out, with an option parameter for the max number of concurrent jobs)

def AllowDataImport(loadid, api, totalMax = 10):
	sql = """
		SELECT p.process_id
		FROM (
		  SELECT p.process_id
		  FROM (
			SELECT p.process_id
			FROM aif_processes p
			WHERE p.STATUS = 'RUNNING'
			  AND p.RULE_TYPE = 'DATA'
			ORDER BY p.process_ID ASC
		  ) p
		  WHERE rownum <= ?
		) p
		WHERE p.process_id = ?
	""" 
	params = [totalMax,loadid]
	rs = api.executeQuery(sql, params)
	retVal = rs.isBeforeFirst()
	rs.close()
	return retVal

b) Or this (slightly more complex first in, first out but which prevents a DLR from running more than once at a time, with an optional parameter for the max number of concurrent jobs)

def AllowDataImport(loadid, api, totalMax = 10):
	sql = """
		SELECT p.process_id
		FROM (
			SELECT p.process_id
			FROM (
				SELECT p.process_id
				FROM (
					SELECT p.rule_id, min(p.process_id) as process_id
					FROM aif_processes p
					WHERE p.STATUS = 'RUNNING'
						AND p.RULE_TYPE = 'DATA'
					GROUP BY p.rule_id  
				) p
				ORDER BY p.process_ID ASC
			) p
			WHERE rownum <= ?  
		) p
		WHERE p.process_id = ?
	""" 
	params = [totalMax,loadid]
	rs = api.executeQuery(sql, params)
	retVal = rs.isBeforeFirst()
	rs.close()
	return retVal

c) Or if you're really getting tricky, like this (still first in, first out but allows you to limit concurrency by target application type, in addition to total processing across all loads)

def AllowDataImport(loadid, api, hfmMax = 5, plnMax = 5, customMax = 5, totalMax = 10):
	sql = """
		SELECT p.process_id
		FROM (
		  SELECT p.process_id
		  FROM (
			SELECT p.process_id
			FROM (
			  SELECT p.process_id
			  FROM (
				SELECT p.process_id
				FROM aif_processes p
				  INNER JOIN aif_target_applications ta on p.APPLICATION_ID = ta.APPLICATION_ID
				WHERE p.STATUS = 'RUNNING'
				  AND p.RULE_TYPE = 'DATA'
				  AND ta.TARGET_APPLICATION_TYPE = 'HFM'
				ORDER BY p.process_ID ASC
			  ) p
			  WHERE rownum <= ?
				UNION
			  SELECT p.process_id
			  FROM (
				SELECT p.process_id
				FROM aif_processes p
				  INNER JOIN aif_target_applications ta on p.APPLICATION_ID = ta.APPLICATION_ID
				WHERE p.STATUS = 'RUNNING'
				  AND p.RULE_TYPE = 'DATA'
				  AND ta.TARGET_APPLICATION_TYPE = 'HPL'
				ORDER BY p.process_ID ASC
			  ) p
			  WHERE rownum <= ?
				UNION
			  SELECT p.process_id
			  FROM (
				SELECT p.process_id
				FROM aif_processes p
				  INNER JOIN aif_target_applications ta on p.APPLICATION_ID = ta.APPLICATION_ID
				WHERE p.STATUS = 'RUNNING'
				  AND p.RULE_TYPE = 'DATA'
				  AND ta.TARGET_APPLICATION_TYPE = 'CUSTOM'
				ORDER BY p.process_ID ASC    
			  ) p
			  WHERE rownum <= ?  
			) p
			ORDER BY p.process_ID ASC
		  ) p
		  WHERE rownum <= ? 
		) p
		WHERE p.process_id = ?
	""" 
	params = [hfmMax,plnMax,customMax,totalMax,loadid]
	rs = api.executeQuery(sql, params)
	retVal = rs.isBeforeFirst()
	rs.close()
	return retVal

And that's it. You can now enable parallel processing as appropriate, but establish some limits. Rein it in so to speak (and so the title of this post goes). Enjoy.