Ir al contenido principal

El Fork Join framework Java


Código fuente articulo:
https://www.dropbox.com/s/jci67120hmd0uce/Paralelismo.zip?dl=0

Este es el primer paso en paralelismo JAV...
Conque buscando eficienci...
Interesad@ en el multi...

Que dolor soy muy malo para escribir introducciones, así que aquí esta lo de Fork Join y no se diga mas.

Desde tiempos inmemorables ejecutar tareas en paralelo utilizando JAVA era un proceso oscuro y reservado para aquellos que llegaban al core del lenguaje mientras otros eran tristemente engañados con la interfaz Runnable que corría hilos de mentirillas encima del hilo de la maquina virtual , esta creciente necesidad dio pie a la introducción en JAVA 6 de los llamados executores y mas adelante en JAVA 7 al Fork Join framework, este ultimo es del que vamos a hablar y cual es la idea, simple tengo una tarea muy grande y necesito partirla entre varios hilos para que salga mas rapido.

Para que entiendas esto de forma simple vas a necesitar entender primero que hay una serie de  componentes en una aplicación que se ejecuta en paralelo.


  • Primero necesitas algo que contenga la tarea que vas a ejecutar para esto el framework ofrece dos tipos de tarea una que regresa un valor y otra que no regresa nada. Estas se llaman RecursiveAction y RecursiveTask. una acción(RecursiveAction) no regresa un valor y una tarea(RecursiveTask) regresa un valor y se ven algo así (fíjate que las dos clases se llaman Recursive mas adelante entenderás el por que )


public abstract class java.util.concurrent.RecursiveTask<T> extends java.util.concurrent.ForkJoinTask{....

 y

public abstract class java.util.concurrent.RecursiveAction extends java.util.concurrent.ForkJoinTask {


si te fijas la RecursiveTask (tarea) tiene un < > uo tipo genérico que le sirve para definir que tipo de Objeto va a regresar pj

public class MyRecursiveTask extends RecursiveTask<Long> {

la implementasion de RecursiveAction seria mas simple

public class MyRecursiveAction extends RecursiveAction {

  • Ahora que ya tienes algo que ejecutar necesitas algo en que correrlo. para esta tarea el Framework te ofrece un pool (una piscina llena de hilos) que esta implementada en una clase llamada ForkJoinPool, esta no es mas que un contenedor de hilos de ejecución, la idea es enviarle a este objeto una serie de tareas o acciones y dejarlo que las ejecute administrándolas entre los hilos disponibles. El ForkJoinPool se ve así.


ForkJoinPool forkJoinPool = new ForkJoinPool();

y se le encomienda una tarea enviándole un objeto que extienda de RecursiveAction o de RecursiveTask así:

forkJoinPool.invoke(tarea);

Hasta el momento todo bonito menos la parte en la que se supone que voy a ejecutar mil tareas a la vez pero solo estoy enviando un objeto al pool de hilos 😠😠😠!!!

Calma, esta es la parte en la que entra a jugar el termino Recursive, la idea de una accion/Tarea re-cursiva es que esta se divide a sí misma hasta tener pedazos del tamaño adecuado para ejecutarse, es decir, si voy multiplicar un millón de números, la forma mas eficiente de hacerlo es separarlos en grupos de cien mil números multiplicar cada grupo y después multiplicar los diez resultados (suponiendo que cada sub grupo se multiplica en un core diferente).

para ilustraros en esta ardua labor, aquí ofrezco este humilde ejemplo.

Supón que eres l@ encargad@ de ejecutar la suma de un arreglo que contiene dos millones de números, tarea de la cual depende el destino de la pobre humanidad agobiada y doliente. Para esta vital tarea has decidido utilizar el Fork Join framework Java porque, todo lo demas apesta y es completamente inutilizable.

Lo primero que harás sera decidir si este trabajo requiere retornar un valor..... la respuesta que busco es si.. entonces si.por eso el trabajo lo vas a encapsular dentro de una RecursiveTask ademas has descubierto que dada la magnitud del resultado mínimo tienes que guardarlo en un Long.
entonces crearas una clase que se vera así:

public class SumarEnteros extends RecursiveTask<Long>  {

que tendrá un variable que sera un arreglo donde guardar los numeros

private int[] numeros;

y que necesitara definir el tamaño de los pedazos que va a procesar

private static int pasos=500000;

y que necesitara controlar donde inicia el siguiente pedazo

private int inicial=0;

de entrada al constructor se le enviara el arreglo y la posición en que inicia cada pedazo.

public SumarEnteros(int[] numeros, int inicial){
this.numeros=numeros;
this.inicial=inicial;
}


ahora como estamos extendiendo una clase abstract estamos obligados a implementar sus métodos en este caso el método compute()

@Override
protected Long compute() {  

pero ¿ y como para que ?, veras cada vez que el ForkJoinPool decida ejecutar la tarea llamara a este método, por consiguiente este método es el encargado de decidir si parte la tarea en mas pedazos o que ya esta bueno con la partidera y es hora de ejecutar la tarea.

@Override
protected Long compute() {

//morder un pedozo para procesarlo
int[] parte=Arrays.copyOfRange(this.numeros, inicial, (inicial+pasos)>this.numeros.length?this.numeros.length:(inicial+pasos));


SumarEnteros subTask =null;
// mientras no llegue al final del arreglo
if(inicial<numeros.length){
//creo otra tarea para seguir partiendo el arreglo
subTask = new SumarEnteros(numeros, inicial+pasos);
//dejo la tarea disponible para que el pool la ejecute
subTask.fork();

}
//hago el trabajo de sumar la parte que guarde que es para lo que me
int i=0;
while(parte!=null && i<pasos && i+1<parte.length){
try {
System.out.println(parte[i]);
this.suma+=parte[i];
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
i++;
}
// le pido a mi subtarea que me regrese su resultado y se lo sumo a la cuenta que llevo
if(subTask!=null){
this.suma+=subTask.join();
}

return this.suma;
}

el secreto de esto esta en las dos funciones que estamos llamando .fork() y .join();
y quieren decir, .fork() ejecuta este trabajo y .join() dame el resultado del trabajo que te encargue.

Como es lógico el resultado que entregara .join() sera el definido entre < > es decir un Long, en caso de fallar sucederá algo terrible y es que .join() lanzara un Error en lugar de una Exception que matara todos los hilos relacionados a esta tarea. por eso se debe usar join() en los casos en los que se debe garantizar el 100% de la ejecución de todos los subprocesos. La alternativa a .join() se llama .get() y es como el tio mas relajado que vive en el sofa en caso de falla lanza una Exception que se puede manejar y que no afecta el resto de los hilos en ejecución... algo así como fracase pero sigo aquí viviendo de gratis con mi amada familia.

Ahora ya esta todo solo es llamarlo y ver que pasa. para eso necesitaras crear un ForkJoinPool y decirle que ejecute la tarea y todas las subtareas que esta genere.


public static void main(String[] args) {
                ///el arreglo original
int[] numerosASumar= new int[2000000];
                //llenamos el arrelo
Random ran= new Random();
for(int i=0;i<numerosASumar.length;i++){
numerosASumar[i]=ran.nextInt();
}
                //creamos el pool
ForkJoinPool forkJoinPool = new ForkJoinPool();
                //tomamos el tiempo
long start = System.currentTimeMillis();
                //iniciamos la ejecucion
System.out.println("Total "+forkJoinPool.invoke(new SumarEnteros(numerosASumar, 0)));
                //tomamos el tiempo que se demoro
float tiempo= (System.currentTimeMillis() - start);
                //y lo imprimimos
               System.out.println(tiempo);
}

Todo el código se vera así.


import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

public class SumarEnteros extends RecursiveTask<Long>  {

private int[] numeros;
private Long suma=new Long(0);
private int inicial=0;
private static int pasos=500000;

public SumarEnteros(int[] numeros, int inicial){
this.numeros=numeros;
this.inicial=inicial;
}

@Override
protected Long compute() {

//morder un pedozo para procesarlo
int[] parte=Arrays.copyOfRange(this.numeros, inicial, (inicial+pasos)>this.numeros.length?this.numeros.length:(inicial+pasos));


SumarEnteros subTask =null;
// mientras no llegue al final del arreglo
if(inicial<numeros.length){
//creo otra tarea para seguir partiendo el arreglo
subTask = new SumarEnteros(numeros, inicial+pasos);
//dejo la tarea disponible para que el pool la ejecute
subTask.fork();

}
//hago el trabajo de sumar la parte que guarde que es para lo que me
int i=0;
while(parte!=null && i<pasos && i+1<parte.length){
try {
System.out.println(parte[i]);
this.suma+=parte[i];
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
i++;
}
// le pido a mi subtarea que me regrese su resultado y se lo sumo a la cuenta que llevo
if(subTask!=null){
this.suma+=subTask.join();
}

return this.suma;
}

public static void main(String[] args) {
///el arreglo original
int[] numerosASumar= new int[2000000];
               //llenamos el arrelo
Random ran= new Random();
for(int i=0;i<numerosASumar.length;i++){
numerosASumar[i]=ran.nextInt();
}
               //creamos el pool
ForkJoinPool forkJoinPool = new ForkJoinPool();
               //tomamos el tiempo
long start = System.currentTimeMillis();
               //iniciamos la ejecucion
System.out.println("Total "+forkJoinPool.invoke(new SumarEnteros(numerosASumar, 0)));
               //tomamos el tiempo que se demoro
float tiempo= (System.currentTimeMillis() - start);
               //y lo imprimimos
              System.out.println(tiempo);

}

}


Como puedes ver el concepto de Fork Join es muy simple, el problema es implementarlo en una aplicación real, medir que si mejore el rendimiento, que no bloquee recursos, que no cueste mas partir una tarea que ejecutarla en un solo hilo. Espero te haya sido útil este Tutorial.

Comentarios

Entradas populares de este blog

Conectarse al LDAP (directorio activo) utilizando JAVA

Un LDAP es un sistema de autenticación estándar utilizado por muchas compañías para controlar el acceso a aplicaciones y recursos. Por lo general se espera que cualquier nueva aplicación haga uso del LDAP para realizar la autenticación y controlar los permisos en forma unificada, la seguridad es transversal a todos los procesos que realiza una organización. Una vez regado el cuento a lo que vinimos, como conectarse a un LDAP. Lo primero que debes saber es que no se requieren librerías adicionales, JAVA en su distribución estandar ya cuenta con todo lo que necesitas. primero tres siglas que tienes que tener en cuanta. CN  = Common Name OU  = Organizational Unit DC  = Domain Component Para conectarse primero necesitas es instanciar un Objeto de la clase LdapContext, este se encargara de manejar la conexión al LDAP y las peticiones que se hagan al mismo. por consiguiente necesitara que le entregues una serie de propiedades de conexión. Esto lo haras con un Map de la siguiente

Clases anónimas JAVA (Anonymous Classes)

Código fuente articulo: https://www.dropbox.com/s/pzw44ot0ji2metl/Lambda.zip?dl=0 Las Clases anónimas en JAVA son una solución rápida para implementar una clase que se va utilizar una vez y de forma inmediata. Por ejemplo el  EventHandler  para un botón se puede implementar en la misma asignación valiendonos de la interfaz  EventHandler  que ya esta definida. Pero mejor vamos con un ejemplo mas simple. De la definición anterior concluimos dos cosas la primera es que para crear una clase anónima es necesario haber definido una interfaz, una clase o una clase abstracta. La clase anónima lo que hará sera implementar la interfaz definida o sobre escribir los métodos definidos. Para ilustrar esto utilizaremos el ejemplo del JAVA Tutorial https://docs.oracle.com/javase/tutorial/java/javaOO/anonymousclasses.html . en este ejemplo tenemos que implementar clases que cumpliendo con la interfaz Saludo sean capaces de saludar en diferentes idiomas. El paso uno sera definir la int

Paralelismo en JAVA Executors (ExecutorService, Callables y Futures).

Código fuente articulo: https://www.dropbox.com/s/jci67120hmd0uce/Paralelismo.zip?dl=0 Para manejo de concurrencia Java desde la versión 5 presento el Concurrency API  este presento una mejora substancial en el manejo de hilos y procesos en paralelo, antes solo contabas con Thread y Runnable. lo que te obligaba a controlar la creación y el numero de hilos de ejecución, no te entregaba un resultado del procesamiento y no te dejaba controlar las Excepciones que lanzara un hilo...un hilo se lanzaba y amenos que le enviaras un un CallBack perdías todo contacto con el. Para solucionar este problema se crearon dos tipos de objetos Callables y Futures. estos dos te permiten encapsular una tarea asignándole un tipo de Objeto que sera el valor de retorno y hacer seguimiento a las tareas que ejecuto en paralelo pudiendo preguntar si ya termino, que resultado lanzo y que excepciones ocurrieron. Arranquemos con la implementación de Callable, Callable es una interfaz que te permite defini