0

Spring Integration Aggregator Example

I talked about Spring Integration and file inbound-channel-adapter on my previous post. Now I will share a spring integration aggregator example.

Here is the link for my previous post : Spring Integration Example : file inbound-channel-adapter complex spring integration example

Spring Integration Aggregator Example

We will improve this implementation and will add a spring integration aggregator. Let’s remember the user requirements again:

  • We will receive 5 different trade files within 15 minutes time frame.
  • Process should wait for all 5 files and process them together.
  • Process will wait 5 files for a given time period. If cant receive all files within the given period, will raise an exception and move files to Error folder.
  • Each file should be processed only once.
  • Process should check filename and only process the files with correct filename prefix.

Let’s add more requirements:

  • After receiving 5 files successfully, We want to process each file and check “Type” column. We only want to process trades with valid types. (trade files are comma sepereted csv files.)
  • Out of all 5 trade files, we need to create one master file which has all valid trades.
  • Release the master file to spring integration flow for further trade processing.

Spring Integration Aggregator Example

Let’s see the configuration of spring integration aggregator:

<file:inbound-channel-adapter 
            id="channelAdapter"
            channel="fileAggregatorChannel"
            directory="C:/File/Inbound" 
            comparator="lastModifiedComparator"
            filter="compositeFileFilter"
            auto-startup="false"
            scanner="dirScanner">
   <poller default="false" max-messages-per-poll="5" >
       <interval-trigger interval="30000"></interval-trigger>
   </poller>
</file:inbound-channel-adapter>

<aggregator id="fileAggregator"
               input-channel="fileAggregatorChannel"
               discard-channel="fileProcessingChannel"
               output-channel="fileProcessingChannel"
               ref="fileAggregator"
               method="aggregateFiles"
               message-store="messageStore"
               send-partial-result-on-expiry="true" />

<beans:bean id="messageStore" class="org.springframework.integration.store.SimpleMessageStore"/>

<beans:bean id="fileAggregator" class="com.services.FileAggregator">
        <beans:property name="processFolder" value="C:/File/Process" />
        <beans:property name="waitingFileNumber" value="5" />
        <beans:property name="expectedTypes" value="Type1,Type2,Type3"/>
</beans:bean>

and here is the spring integration aggregator java code:

public class FileAggregator {

    private final Logger logger = LoggerFactory.getLogger(FileAggregator.class);
    private final String masterFileName       = "master_file";
    private final String masterFileNamePrefix = ".csv";

    // position index in the CSV file
    private int TYPE = 32;

    private String expectedTypes;
    private String processFolder;
    private int waitingFileNumber;

    private Map<String, File> filesToMove = new HashMap<String, File>();

    @CorrelationStrategy
    public String correlateBy(@Header("id") String id) {
        return "1";
    }

    @ReleaseStrategy
    public boolean release(List<Message<File>> messages) {
        return messages.size() == waitingFileNumber;
    }

    public File aggregateFiles(List<Message<File>> messages) throws IOException {

        logger.info("Aggregator releasing [{}] messages", messages.size() );
		for(Message<File> msg : messages) {
			logger.info("  - FileName[{}] ", msg.getPayload().getName());
		}
        logger.info(" as one file." );

        File masterFile = com.util.FileUtils.createFile(processFolder,
                                                        masterFileName+masterFileNamePrefix,
                                                        true);

        FileOutputStream os = com.util.FileUtils.openOutputStream(masterFile);

        logger.info("MasterFile creation is started!" );
        for(Message<File> msg : messages) {
	    File file = msg.getPayload();
            LineIterator iterator = FileUtils.lineIterator(file);
            while (iterator.hasNext()) {
                String line = iterator.nextLine();

                String[] record = line.split("\\,");
                String type = (record.length > TYPE) ? record[TYPE] : null;

                if (isExpectedType(type)) {
                    logger.info("Record accepted for type - " + type);
                    os.write(line.getBytes());
                    os.write(System.getProperty("line.separator").getBytes());
                }
            }
            iterator.close();
            filesToMove.put(file.getName(), file);
	}
        os.close();

        logger.info("MasterFile creation is completed!" );
        logger.info("MasterFile rows: "+com.util.FileUtils.readLines(masterFile).size());

        return masterFile;
    }

   private boolean isExpectedType(String type) {
       Set<String> expectedTypesSet = com.util.SetUtils.convertCsvStringToSet(expectedTypes);
       return expectedTypesSet.contains(type);
   }

   public void setProcessFolder(String processFolder) {
       this.processFolder = processFolder;
   }

   public void setWaitingFileNumber(int waitingFileNumber) {
       this.waitingFileNumber = waitingFileNumber;
   }

   public void setExpectedTypes(String expectedTypes) {
       this.expectedTypes= expectedTypes;
   }

For more reading about spring integration aggregator, please see this link on Spring Source website.

Filed in: Java, Software Development Tags: , , , , ,

Get Updates

Related Posts

Leave a Reply

Submit Comment
© Deha Solutions. All rights reserved.