Spring Integration Aggregator Example

I talked about Spring Integration and file inbound-channel-adapter on my previous post. Now I will share a spring integration aggregator example.

Here is the link for my previous post : Spring Integration Example : file inbound-channel-adapter complex spring integration example

Spring Integration Aggregator Example

We will improve this implementation and will add a spring integration aggregator. Let’s remember the user requirements again:

  • We will receive 5 different trade files within 15 minutes time frame.
  • Process should wait for all 5 files and process them together.
  • Process will wait 5 files for a given time period. If cant receive all files within the given period, will raise an exception and move files to Error folder.
  • Each file should be processed only once.
  • Process should check filename and only process the files with correct filename prefix.

Let’s add more requirements:

  • After receiving 5 files successfully, We want to process each file and check “Type” column. We only want to process trades with valid types. (trade files are comma sepereted csv files.)
  • Out of all 5 trade files, we need to create one master file which has all valid trades.
  • Release the master file to spring integration flow for further trade processing.

Spring Integration Aggregator Example

Let’s see the configuration of spring integration aggregator:

   <poller default="false" max-messages-per-poll="5" >
       <interval-trigger interval="30000"></interval-trigger>

<aggregator id="fileAggregator"
               send-partial-result-on-expiry="true" />

<beans:bean id="messageStore" class="org.springframework.integration.store.SimpleMessageStore"/>

<beans:bean id="fileAggregator" class="com.services.FileAggregator">
        <beans:property name="processFolder" value="C:/File/Process" />
        <beans:property name="waitingFileNumber" value="5" />
        <beans:property name="expectedTypes" value="Type1,Type2,Type3"/>

and here is the spring integration aggregator java code:

public class FileAggregator {

    private final Logger logger = LoggerFactory.getLogger(FileAggregator.class);
    private final String masterFileName       = "master_file";
    private final String masterFileNamePrefix = ".csv";

    // position index in the CSV file
    private int TYPE = 32;

    private String expectedTypes;
    private String processFolder;
    private int waitingFileNumber;

    private Map<String, File> filesToMove = new HashMap<String, File>();

    public String correlateBy(@Header("id") String id) {
        return "1";

    public boolean release(List<Message<File>> messages) {
        return messages.size() == waitingFileNumber;

    public File aggregateFiles(List<Message<File>> messages) throws IOException {

        logger.info("Aggregator releasing [{}] messages", messages.size() );
		for(Message<File> msg : messages) {
			logger.info("  - FileName[{}] ", msg.getPayload().getName());
        logger.info(" as one file." );

        File masterFile = com.util.FileUtils.createFile(processFolder,

        FileOutputStream os = com.util.FileUtils.openOutputStream(masterFile);

        logger.info("MasterFile creation is started!" );
        for(Message<File> msg : messages) {
	    File file = msg.getPayload();
            LineIterator iterator = FileUtils.lineIterator(file);
            while (iterator.hasNext()) {
                String line = iterator.nextLine();

                String[] record = line.split("\\,");
                String type = (record.length > TYPE) ? record[TYPE] : null;

                if (isExpectedType(type)) {
                    logger.info("Record accepted for type - " + type);
            filesToMove.put(file.getName(), file);

        logger.info("MasterFile creation is completed!" );
        logger.info("MasterFile rows: "+com.util.FileUtils.readLines(masterFile).size());

        return masterFile;

   private boolean isExpectedType(String type) {
       Set<String> expectedTypesSet = com.util.SetUtils.convertCsvStringToSet(expectedTypes);
       return expectedTypesSet.contains(type);

   public void setProcessFolder(String processFolder) {
       this.processFolder = processFolder;

   public void setWaitingFileNumber(int waitingFileNumber) {
       this.waitingFileNumber = waitingFileNumber;

   public void setExpectedTypes(String expectedTypes) {
       this.expectedTypes= expectedTypes;

For more reading about spring integration aggregator, please see this link on Spring Source website.

Filed in: Java, Software Development Tags: , , , , ,

Get Updates

Related Posts

Leave a Reply

Submit Comment
© Deha Solutions. All rights reserved.