×

Welcome to TagMyCode

Please login or create account to add a snippet.
0
0
 
0
Language: Java
Posted by: Shamir Yona
Added: Jun 18, 2016 1:47 PM
Views: 13
Manage worker threads by CyclicBarrier
  1. /*
  2.  * To change this license header, choose License Headers in Project Properties.
  3.  * To change this template file, choose Tools | Templates
  4.  * and open the template in the editor.
  5.  */
  6. package moreTests;
  7.  
  8. import java.util.ArrayList;
  9. import java.util.List;
  10. import java.util.concurrent.BrokenBarrierException;
  11. import java.util.concurrent.CyclicBarrier;
  12.  
  13. /**
  14.  *
  15.  * @author shayzukerman
  16.  */
  17. public class ThreadWorkersManager
  18. {
  19.     static volatile int workerCnt; // used to identify the thread:
  20.    
  21.     // The Class's Client object.
  22.     ThreadWorkerClient workerClient = null;
  23.    
  24.     int[] inpArr;
  25.     int N; // Size of the input array
  26.    
  27.     // # of elements per Worker:
  28.     int elemsPerWorker;
  29.    
  30.     // Result of evaluation:
  31.     private volatile int result = 0;
  32.    
  33.     CyclicBarrier barrier = null;
  34.     List<Thread> workThreadsList;
  35.  
  36.     public ThreadWorkersManager(ThreadWorkerClient workerClient, int numThreads)
  37.     {
  38.         this.workerClient = workerClient;
  39.         this.inpArr = workerClient.getArray();
  40.         this.N = inpArr.length;
  41.  
  42.         barrier = new CyclicBarrier(numThreads, () ->
  43.         {
  44.             System.out.println("*** All Threads are done ***");
  45.             workerClient.notifyReady();
  46.         });
  47.         // Create the Threads per # of Element for each one of them:
  48.         workThreadsList = new ArrayList<>(numThreads);
  49.        
  50.     }
  51.    
  52.     public void addWorkerThread(int minVal, int maxVal, boolean startNow)
  53.     {
  54.         ArrEvalWorker worker= new ArrEvalWorker(inpArr, minVal, maxVal);
  55.         Thread workerThread = (new Thread(worker));
  56.         workThreadsList.add(workerThread);
  57.         if (startNow)
  58.             workerThread.start();
  59.     }
  60.    
  61.     public ThreadWorkersManager(int[] inpArr, int elemsPerWorker)
  62.     {
  63.         this.inpArr = inpArr;
  64.         this.N = inpArr.length;
  65.         this.elemsPerWorker = elemsPerWorker;
  66.        
  67.         // Get the # of Thread;
  68.         int numThreads = N / elemsPerWorker;
  69.         if (N % elemsPerWorker > 0)
  70.             numThreads++;
  71.        
  72.         barrier = new CyclicBarrier(numThreads, () ->
  73.         {
  74.             System.out.println("*** All Threads are done ***");
  75.         });
  76.        
  77.        
  78.         // Create the Threads per # of Element for each one of them:
  79.         workThreadsList = new ArrayList<>(numThreads);
  80.        
  81.         int minVal = 0;
  82.         while (minVal < N)
  83.         {
  84.             int maxVal = minVal + elemsPerWorker - 1;
  85.             if (maxVal >= N)
  86.                 maxVal = N - 1;
  87.             ArrEvalWorker worker= new ArrEvalWorker(inpArr, minVal, maxVal);
  88.             Thread workerThread = (new Thread(worker));
  89.             workThreadsList.add(workerThread);
  90.             minVal = maxVal + 1;
  91.         }
  92.        
  93.     }
  94.    
  95.     // Activate the threads and wait for them to complete:
  96.     public void activateCalc()
  97.     {
  98.         // Start the Threads:
  99.         System.out.println("Starting: " + workThreadsList.size() + " threads...");
  100.         for (Thread workerThread : workThreadsList)
  101.             workerThread.start();
  102.  
  103.         // Wait for them to complete:
  104.         System.out.println("Waiting for their completion...");
  105.         for (Thread workerThread : workThreadsList)
  106.         {
  107.             try
  108.             {
  109.                 workerThread.join();
  110.             }
  111.             catch (InterruptedException ex)
  112.             {
  113.                 System.out.println("Thread: "+ workerThread.getName()
  114.                          + " was interrupted: " + ex.getMessage());
  115.             }
  116.         }
  117.        
  118.     }
  119.    
  120.     class ArrEvalWorker implements Runnable
  121.     {
  122.        
  123.         int[] arrRef; // Reference to the evaluatred array
  124.         // Range to evaluate:
  125.         int min, max;
  126.  
  127.         public ArrEvalWorker(int[] arrRef, int min, int max)
  128.         {
  129.             this.arrRef = arrRef;
  130.             this.min = min;
  131.             this.max = max;
  132.             Thread.currentThread().setName("Worker # " + (workerCnt++));
  133.         }
  134.  
  135.         private ArrEvalWorker(int[] arrRef)
  136.         {
  137.             this(arrRef, 0, 0);
  138.         }
  139.         @Override
  140.         public void run()
  141.         {
  142.             String currThreadId = Thread.currentThread().getName();
  143.            
  144.             try
  145.             {
  146.                 System.out.format("%s - stating\n", currThreadId);
  147.                 workerClient.handleArray(min, max);
  148.                 // Indicate that it's complete:
  149.                 barrier.await();
  150.             }
  151.             catch (InterruptedException | BrokenBarrierException ex)
  152.             {
  153.                 System.err.println(currThreadId + " Exception: " + ex.getMessage());
  154.             }
  155.            
  156.         }
  157.        
  158.         //@Override
  159.         public void ORG_run()
  160.         {
  161.             String currThreadId = Thread.currentThread().getName();
  162.            
  163.             int tmpRes = 0;
  164.             int i = min;
  165.             while (i <= max)
  166.                 tmpRes += arrRef[i++];
  167.            
  168.             try
  169.             {
  170.                 String logMessage = String.format("%s - sum: %d", currThreadId, tmpRes);
  171.                 printRange(min, max, logMessage);
  172.                 // Indicate that it's complete:
  173.                 barrier.await();
  174.             }
  175.             catch (InterruptedException | BrokenBarrierException ex)
  176.             {
  177.                 System.err.println(currThreadId + " Exception: " + ex.getMessage());
  178.             }
  179.         }
  180.    
  181.         private void printRange(int min, int max, String msg)
  182.         {
  183.             System.out.format("%s - min: %d max: %d - [", msg, min, max);
  184.             while (min <= max)
  185.             {
  186.                 System.out.format("%d%s", arrRef[min], ((min < max) ? ", " : ""));
  187.                 min++;
  188.             }
  189.             System.out.println("]");
  190.         }
  191.        
  192.     }
  193.    
  194.     public static void main(String[] args)
  195.     {
  196.         int[] inpArr = {1, 2, 4, 6, 20, 45, 38, 10, 76};
  197.         ThreadWorkersManager tcb = new ThreadWorkersManager(inpArr, 3);
  198.         tcb.activateCalc();
  199.     }
  200.    
  201.    
  202. }
  203.