/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package moreTests;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
/**
*
* @author shayzukerman
*/
public class ThreadWorkersManager
{
static volatile int workerCnt; // used to identify the thread:
// The Class's Client object.
ThreadWorkerClient workerClient = null;
int[] inpArr;
int N; // Size of the input array
// # of elements per Worker:
int elemsPerWorker;
// Result of evaluation:
private volatile int result = 0;
CyclicBarrier barrier = null;
List<Thread> workThreadsList;
public ThreadWorkersManager(ThreadWorkerClient workerClient, int numThreads)
{
this.workerClient = workerClient;
this.inpArr = workerClient.getArray();
this.N = inpArr.length;
barrier = new CyclicBarrier(numThreads, () ->
{
System.
out.
println("*** All Threads are done ***");
workerClient.notifyReady();
});
// Create the Threads per # of Element for each one of them:
workThreadsList = new ArrayList<>(numThreads);
}
public void addWorkerThread(int minVal, int maxVal, boolean startNow)
{
ArrEvalWorker worker= new ArrEvalWorker(inpArr, minVal, maxVal);
workThreadsList.add(workerThread);
if (startNow)
workerThread.start();
}
public ThreadWorkersManager(int[] inpArr, int elemsPerWorker)
{
this.inpArr = inpArr;
this.N = inpArr.length;
this.elemsPerWorker = elemsPerWorker;
// Get the # of Thread;
int numThreads = N / elemsPerWorker;
if (N % elemsPerWorker > 0)
numThreads++;
barrier = new CyclicBarrier(numThreads, () ->
{
System.
out.
println("*** All Threads are done ***");
});
// Create the Threads per # of Element for each one of them:
workThreadsList = new ArrayList<>(numThreads);
int minVal = 0;
while (minVal < N)
{
int maxVal = minVal + elemsPerWorker - 1;
if (maxVal >= N)
maxVal = N - 1;
ArrEvalWorker worker= new ArrEvalWorker(inpArr, minVal, maxVal);
workThreadsList.add(workerThread);
minVal = maxVal + 1;
}
}
// Activate the threads and wait for them to complete:
public void activateCalc()
{
// Start the Threads:
System.
out.
println("Starting: " + workThreadsList.
size() + " threads...");
for (Thread workerThread
: workThreadsList
)
workerThread.start();
// Wait for them to complete:
System.
out.
println("Waiting for their completion...");
for (Thread workerThread
: workThreadsList
)
{
try
{
workerThread.join();
}
{
System.
out.
println("Thread: "+ workerThread.
getName()
+ " was interrupted: " + ex.getMessage());
}
}
}
{
int[] arrRef; // Reference to the evaluatred array
// Range to evaluate:
int min, max;
public ArrEvalWorker(int[] arrRef, int min, int max)
{
this.arrRef = arrRef;
this.min = min;
this.max = max;
Thread.
currentThread().
setName("Worker # " + (workerCnt
++));
}
private ArrEvalWorker(int[] arrRef)
{
this(arrRef, 0, 0);
}
@Override
public void run()
{
try
{
System.
out.
format("%s - stating\n", currThreadId
);
workerClient.handleArray(min, max);
// Indicate that it's complete:
barrier.await();
}
{
System.
err.
println(currThreadId
+ " Exception: " + ex.
getMessage());
}
}
//@Override
public void ORG_run()
{
int tmpRes = 0;
int i = min;
while (i <= max)
tmpRes += arrRef[i++];
try
{
String logMessage
= String.
format("%s - sum: %d", currThreadId, tmpRes
);
printRange(min, max, logMessage);
// Indicate that it's complete:
barrier.await();
}
{
System.
err.
println(currThreadId
+ " Exception: " + ex.
getMessage());
}
}
private void printRange
(int min,
int max,
String msg
)
{
System.
out.
format("%s - min: %d max: %d - [", msg, min, max
);
while (min <= max)
{
System.
out.
format("%d%s", arrRef
[min
],
((min
< max
) ? ", " : ""));
min++;
}
}
}
public static void main
(String[] args
)
{
int[] inpArr = {1, 2, 4, 6, 20, 45, 38, 10, 76};
ThreadWorkersManager tcb = new ThreadWorkersManager(inpArr, 3);
tcb.activateCalc();
}
}