Load-Sensitive ThreadPool / Queue in Java

Here is a little code-snippet (actually a ready to run jUnit TestCase) which might come handy if you need a fairly open ThreadPool not primarily limited by the number of active threads but rather by a predicted load factor. Latter one might be pretty much everything such as CPU load or a total number of "items" allowed to be processed by the whole ThreadPool at a given time.

If the predicted load is not dynamic enough for you, you might want to add another monitoring thread looking at some indicators (CPU, RAM, I/O) and adjust the LoadTracker's currentLoad value accordingly. Another path would be to skip the monitoring thread and extend the canHandle(load) method of the LoadTracker to respect the current indicator states.

Oh, and please let me know if I am reinventing the wheel, sometimes it is difficult not to.

In retrospect, same pattern could be applied to the Queue beneath the ThreadPool by coupling a LoadTrackableJob with a specific BlockingQueue. I guess you can always make the code / architecture prettier.

  1. public class TestThreadPool extends TestCase
  2. {
  3. private static Log log = LogFactory.getLog(TestThreadPool.class);
  4.  
  5. public void testLoadTracker() throws InterruptedException
  6. {
  7. int maxRunningThreads = 128;
  8. int maxLoad = 500;
  9.  
  10. LoadTracker load = new LoadTracker(maxLoad);
  11. ExecutorService pool = Executors.newFixedThreadPool(maxRunningThreads);
  12.  
  13. for (int i = 0; i < 500; i++)
  14. {
  15. // here you would create your real job and *predict* its impact on the load factor.
  16. // we choose the load to be random.
  17. int predictedJobLoad = (int) Math.round(Math.random() * 10l);
  18. MyJob aJob = new MyJob(load, predictedJobLoad,"job-"+i,this);
  19.  
  20. while (!load.canHandle(predictedJobLoad))
  21. {
  22. log.debug(String.format("WAIT: current load %d and new job is about to be %d", load.get(), predictedJobLoad));
  23. synchronized (this) { this.wait(1000); }
  24. }
  25.  
  26. log.debug(String.format("QUEUE: current load is %d and new job is about to be %d", load.get(), predictedJobLoad));
  27. pool.execute(aJob);
  28. }
  29. pool.shutdown();
  30. pool.awaitTermination(42,TimeUnit.DAYS);
  31. assertEquals(0, load.get());
  32. }
  33.  
  34. private class MyJob implements Runnable
  35. {
  36. private LoadTracker loadTracker;
  37. private int load;
  38. private String jobId;
  39. private Object monitor;
  40.  
  41. public MyJob(LoadTracker loadTracker, int load, String jobId, Object monitor)
  42. {
  43. this.jobId = jobId;
  44. this.loadTracker = loadTracker;
  45. this.load = load;
  46. this.monitor = monitor;
  47. loadTracker.add(load);
  48. }
  49.  
  50. @Override
  51. public void run()
  52. {
  53. log.debug(String.format("RUN: %s with a load of %d", jobId, load));
  54. try
  55. {
  56. Thread.sleep((int) Math.round(Math.random() * 5000l));
  57. }
  58. {
  59. e.printStackTrace();
  60. }
  61. log.debug(String.format("FIN: %s with a load of %d", jobId, load));
  62. loadTracker.remove(load);
  63. if (monitor != null) synchronized (monitor) { monitor.notify(); }
  64. }
  65. }
  66.  
  67. private class LoadTracker
  68. {
  69. private int currentLoad = 0;
  70. private int maxLoad = 0;
  71.  
  72. public LoadTracker(int maxLoad)
  73. {
  74. this.maxLoad = maxLoad;
  75. }
  76.  
  77. private synchronized void add(int load)
  78. {
  79. this.currentLoad += load;
  80. }
  81.  
  82. private synchronized void remove(int load)
  83. {
  84. this.currentLoad -= load;
  85. }
  86.  
  87. public synchronized int get()
  88. {
  89. return currentLoad;
  90. }
  91.  
  92. public synchronized boolean canHandle(int additionalLoad)
  93. {
  94. return ((this.get() + additionalLoad) < maxLoad);
  95. }
  96. }
  97. }