java - Spring Batch : Tasklet with multi threaded executor has very bad performances related to Throttling algorithm -
using spring batch 2.2.1, have configured spring batch job, used approach:
configuration following:
tasklet uses threadpooltaskexecutor limited 15 threads
throttle-limit equal number of threads
chunk used with:
1 synchronized adapter of jdbccursoritemreader allow it's use many threads per spring batch documentation recommandation
you can synchronize call read() , long processing , writing expensive part of chunk step may still complete faster in single threaded configuration.
savestate false on jdbccursoritemreader
a custom itemwriter based on jpa. note processing of 1 item can vary in terms of processing time, can take few millis few seconds ( > 60s).
commit-interval set 1 (i know better it's not issue)
all jdbc pools fine, regarding spring batch doc recommandation
running batch leads strange , bad results due following:
- at step, if items take time process writer, threads in thread pool end doing nothing instead of processing, slow writer working.
looking @ spring batch code, root cause seems in package:
- org/springframework/batch/repeat/support/
is way of working feature or limitation/bug ?
if it's feature, way configuration make threads without being starved long processing work without having rewrite ?
note if items take same time, works fine , multi-threading ok, if 1 of item processing takes more time, multi-threading useless time slow process works.
note opened issue:
as alex said, seems behaviour contract per javadocs of :
subclasses need provide method gets next result * , 1 waits results returned concurrent * processes or threads
look at:
taskexecutorrepeattemplate#waitforresults
another option use partitioning :
- a taskexecutorpartitionhandler execute items partitionned itemreader, see below
- a partitioner implementation gives ranges processed itemreader, see columnrangepartitioner below
- a customreader read data using partitioner have filled, see myitemreader configuration below
michael minella explains in chapter 11 of book pro spring batch:
<batch:job id="batchwithpartition"> <batch:step id="step1.master"> <batch:partition partitioner="mypartitioner" handler="partitionhandler"/> </batch:step> </batch:job> <!-- 1 create paritions of number of lines/ grid size--> <bean id="mypartitioner" class="....columnrangepartitioner"/> <!-- 1 handle every partition in thread --> <bean id="partitionhandler" class="org.springframework.batch.core.partition.support.taskexecutorpartitionhandler"> <property name="taskexecutor" ref="multithreadedtaskexecutor"/> <property name="step" ref="step1" /> <property name="gridsize" value="10" /> </bean> <batch:step id="step1"> <batch:tasklet transaction-manager="transactionmanager"> <batch:chunk reader="myitemreader" writer="manipulatablewriterfortests" commit-interval="1" skip-limit="30000"> <batch:skippable-exception-classes> <batch:include class="java.lang.exception" /> </batch:skippable-exception-classes> </batch:chunk> </batch:tasklet> </batch:step> <!-- scope step critical here--> <bean id="myitemreader" class="org.springframework.batch.item.database.jdbccursoritemreader" scope="step"> <property name="datasource" ref="datasource"/> <property name="sql"> <value> <![cdata[ select * customers id >= ? , id <= ? ]]> </value> </property> <property name="preparedstatementsetter"> <bean class="org.springframework.batch.core.resource.listpreparedstatementsetter"> <property name="parameters"> <list> <!-- minvalue , maxvalue filled in partitioner each partition in executioncontext--> <value>{stepexecutioncontext[minvalue]}</value> <value>#{stepexecutioncontext[maxvalue]}</value> </list> </property> </bean> </property> <property name="rowmapper" ref="customerrowmapper"/> </bean>
partitioner.java:
package ...; import java.util.hashmap; import java.util.map; import org.springframework.batch.core.partition.support.partitioner; import org.springframework.batch.item.executioncontext; public class columnrangepartitioner implements partitioner { private string column; private string table; public map<string, executioncontext> partition(int gridsize) { int min = queryforint("select min(" + column + ") " + table); int max = queryforint("select max(" + column + ") " + table); int targetsize = (max - min) / gridsize; system.out.println("our partition size " + targetsize); system.out.println("we have " + gridsize + " partitions"); map<string, executioncontext> result = new hashmap<string, executioncontext>(); int number = 0; int start = min; int end = start + targetsize - 1; while (start <= max) { executioncontext value = new executioncontext(); result.put("partition" + number, value); if (end >= max) { end = max; } value.putint("minvalue", start); value.putint("maxvalue", end); system.out.println("minvalue = " + start); system.out.println("maxvalue = " + end); start += targetsize; end += targetsize; number++; } system.out.println("we returning " + result.size() + " partitions"); return result; } public void setcolumn(string column) { this.column = column; } public void settable(string table) { this.table = table; } }
Comments
Post a Comment