33

In Spring batch I need to pass the items read by an ItemReader to two different processors and writer. What I'm trying to achieve is that...

                        +---> ItemProcessor#1 ---> ItemWriter#1
                        |
ItemReader ---> item ---+
                        |
                        +---> ItemProcessor#2 ---> ItemWriter#2

This is needed because items written by ItemWriter#1 should be processed in a completely different way compared to the ones written by ItemWriter#2. Moreover, ItemReader reads item from a database, and the queries it executes are so computational expensive that executing the same query twice should be discarded.

Any hint about how to achieve such set up ? Or, at least, a logically equivalent set up ?

2
  • Hi @danidemi , I need to write into two different table using same writer. I am using java based configuration .how to achieve this? Any help
    – Bug
    Jun 11, 2020 at 16:11
  • Hi @sashikanta there's really not enough room here to answer your question. Why don't you write a completely new question in order to have help from the community?
    – danidemi
    Jun 12, 2020 at 17:20

5 Answers 5

14

This solution is valid if your item should be processed by processor #1 and processor #2

You have to create a processor #0 with this signature:

class Processor0<Item, CompositeResultBean>

where CompositeResultBean is a bean defined as

class CompositeResultBean {
  Processor1ResultBean result1;
  Processor2ResultBean result2;
}

In your Processor #0 just delegate work to processors #1 and #2 and put result in CompositeResultBean

CompositeResultBean Processor0.process(Item item) {
  final CompositeResultBean r = new CompositeResultBean();
  r.setResult1(processor1.process(item));
  r.setResult2(processor2.process(item));
  return r;
}

Your own writer is a CompositeItemWriter that delegate to writer CompositeResultBean.result1 or CompositeResultBean.result2 (look at PropertyExtractingDelegatingItemWriter, maybe can help)

3
  • 1
    Sadly the requirements is indeed that the same item has to be processed by BOTH processors. Just to give you some more context, reader reads one record from db, then processor#1 should map field names to other field names and writer#1 will write the mapped item in another db. Processor#2, starting from the same original item, should execute a completely different elaboration then write the item in a legacy system.
    – danidemi
    Sep 25, 2013 at 10:23
  • Ok, I have a solution also for that problem. just wait for edit Sep 25, 2013 at 11:55
  • @bellabax - I'd be interested to see you solution. Might you have time to update it here? Jan 22, 2014 at 16:07
7

I followed Luca's suggestion to use PropertyExtractingDelegatingItemWriter as writer and I was able to work with two different entities in one single step.

First of all what I did was to define a DTO that stores the two entities/results from the processor

public class DatabaseEntry {
    private AccessLogEntry accessLogEntry;
    private BlockedIp blockedIp;

    public AccessLogEntry getAccessLogEntry() {
        return accessLogEntry;
    }

    public void setAccessLogEntry(AccessLogEntry accessLogEntry) {
        this.accessLogEntry = accessLogEntry;
    }

    public BlockedIp getBlockedIp() {
        return blockedIp;
    }

    public void setBlockedIp(BlockedIp blockedIp) {
        this.blockedIp = blockedIp;
    }
}

Then I passed this DTO to the writer, a PropertyExtractingDelegatingItemWriter class where I define two customized methods to write the entities into the database, see my writer code below:

@Configuration
public class LogWriter extends LogAbstract {
    @Autowired
    private DataSource dataSource;

    @Bean()
    public PropertyExtractingDelegatingItemWriter<DatabaseEntry> itemWriterAccessLogEntry() {
        PropertyExtractingDelegatingItemWriter<DatabaseEntry> propertyExtractingDelegatingItemWriter = new PropertyExtractingDelegatingItemWriter<DatabaseEntry>();
        propertyExtractingDelegatingItemWriter.setFieldsUsedAsTargetMethodArguments(new String[]{"accessLogEntry", "blockedIp"});
        propertyExtractingDelegatingItemWriter.setTargetObject(this);
        propertyExtractingDelegatingItemWriter.setTargetMethod("saveTransaction");
        return propertyExtractingDelegatingItemWriter;
    }

    public void saveTransaction(AccessLogEntry accessLogEntry, BlockedIp blockedIp) throws SQLException {
        writeAccessLogTable(accessLogEntry);
        if (blockedIp != null) {
            writeBlockedIp(blockedIp);
        }

    }

    private void writeBlockedIp(BlockedIp entry) throws SQLException {
        PreparedStatement statement = dataSource.getConnection().prepareStatement("INSERT INTO blocked_ips (ip,threshold,startDate,endDate,comment) VALUES (?,?,?,?,?)");
        statement.setString(1, entry.getIp());
        statement.setInt(2, threshold);
        statement.setTimestamp(3, Timestamp.valueOf(startDate));
        statement.setTimestamp(4, Timestamp.valueOf(endDate));
        statement.setString(5, entry.getComment());
        statement.execute();
    }

    private void writeAccessLogTable(AccessLogEntry entry) throws SQLException {
        PreparedStatement statement = dataSource.getConnection().prepareStatement("INSERT INTO log_entries (date,ip,request,status,userAgent) VALUES (?,?,?,?,?)");
        statement.setTimestamp(1, Timestamp.valueOf(entry.getDate()));
        statement.setString(2, entry.getIp());
        statement.setString(3, entry.getRequest());
        statement.setString(4, entry.getStatus());
        statement.setString(5, entry.getUserAgent());
        statement.execute();
    }
}

With this approach you can get the wanted inital behaviour from a single reader for processing multiple entities and save them in a single step.

3

You can use a CompositeItemProcessor and CompositeItemWriter

It won't look exactly like your schema, it will be sequential, but it will do the job.

1
  • 1
    I don't think is enough. I will just end with a "complex" processor, but ultimately there will be just one processed item passed to just one ItemWriter! Instead I need the ItemWriter#1 to write an item that is not the one that will be written by ItemWriter#2! A requirement is that the two ItemWriters act upon different items! Keep in mind that in spring-batch you have a step that is ternary: one ItemReader, one ItemProcessor, one ItemWriter.
    – danidemi
    Sep 25, 2013 at 9:57
0

this is the solution I came up with.

So, the idea is to code a new Writer that "contains" both an ItemProcessor and an ItemWriter. Just to give you an idea, we called it PreprocessoWriter, and that's the core code.

private ItemWriter<O> writer;
private ItemProcessor<I, O> processor;

@Override
public void write(List<? extends I> items) throws Exception {
    List<O> toWrite = new ArrayList<O>();
    for (I item : items) {
        toWrite.add(processor.process(item));
    }
    writer.write(toWrite);
}

There's a lot of things being left aside. Management of ItemStream, for instance. But in our particular scenario this was enough.

So you can just combine multiple PreprocessorWriter with CompositeWriter.

3
  • 2
    Mix up processor and writer is a bad idea: this components have created separately to make process and writing two separate concerns Jan 29, 2014 at 14:15
  • I agree with you on the general concept. However, as you surely read in the original question, the problem was to read items once, and process / write them in two completely different ways. So, can you please share how you would solve the particular problem the question is about ?
    – danidemi
    Feb 2, 2014 at 16:05
  • 1
    Check my answer for that. Processing is done using a composite processor taking a single item as input and a custom bean (CompositeResultBean) holding the result of multiple processing. Writing using delegation: CompositeResultBean is the input and call the correct delegated writer for CompositeResultBean.result1 and CompositeResultBean.result2. Read once, process/write with separated ItemProcessor/ItemWriter using composition and delegation. All done without mixup concepts Feb 2, 2014 at 17:24
0

There is an other solution if you have a reasonable amount of items (like less than 1 Go) : you can cache the result of your select into a collection wrapped in a Spring bean.

Then u can just read the collection twice with no cost.

2
  • Yes, that could be done, indeed. But what happens if the first writer fails while writing an item ? If I got your idea correctly, I think the second writer won't get any chance to write any item. Is that correct ?
    – danidemi
    Apr 24, 2015 at 8:25
  • mmm no, the writers are not passing data to each other, they read the same in memory collection from inside a spring singleton bean.
    – Tristan
    Apr 25, 2015 at 9:21

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Not the answer you're looking for? Browse other questions tagged or ask your own question.