re
run
.
me

Camel CXF Service With Multiple Query Parameters

While the awesome Apache Camel team is busy fixing the handling of the multiple parameters in the query, here’s a workaround. Hopefully, this post will become obsolete with the next versions of Camel. (Currently, I use 2.7.5)

Problem

Query parameters more than 1 is passed as a null value into a Camel-CXF service. Say, if the URL has four query parameters as in

name=arun&email=arun@arunma.com&age=10&phone=123456

only the first one gets populated when you do a

MultiQueryParams
1
2
3
4
5
6
7
@GET
@Path("search")
@Produces(MediaType.APPLICATION_JSON)
public String sourceResultsFromTwoSources(@QueryParam("name") String name, @QueryParam("age") String age,
                                          @QueryParam("phone") String phone,@QueryParam("email") String email
);

All other parameters are null.

Final Output

For url

http://localhost:8181/cxf/karafcxfcamel/search?name=arun&email=arun@arunma.com&age=31&phone=232323

the result expected is :

Workaround

Interestingly, we could get the entire query string in the header.

QueryStringHeader
1
String queryString = exchange.getIn().getHeader(Exchange.HTTP_QUERY, String.class);

We could then do a

ExtractingParams
1
MultivaluedMap<String, String> queryMap = JAXRSUtils.getStructuredParams(queryString, "&", false, false);

to get the query parameters as a multi valued Map.

The query parameters could then be set as a property to the Exchange and used across the exchange.

Code

The entire code could be downloaded from github

Please note that I am running Camel as part of OSGi inside the Karaf container. While the workaround does not differ because of the environment in which you are using Camel-CXF, please be wary of this fact when you download the code from github. Watch out for the blueprint xmls for Camel configuration.

The most important piece here is the Router

Router

RestToBeanRouter
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package me.rerun.karafcxfcamel.camel.beans;

import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.model.dataformat.JsonLibrary;
import org.apache.cxf.jaxrs.utils.JAXRSUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.ws.rs.core.MultivaluedMap;
import java.util.List;
import java.util.Map;

public class RestToBeanRouter extends RouteBuilder {

    private static Logger logger= LoggerFactory.getLogger(RouteBuilder.class);

    @Override
    public void configure() throws Exception {

        from ("cxfrs://bean://rsServer")
                .process(new ParamProcessor())
                .multicast()
                .parallelProcessing()
                .aggregationStrategy(new ResultAggregator())
                .beanRef("restServiceImpl", "getNameEmailResult")
                .beanRef("restServiceImpl", "getAgePhoneResult")
                .end()
                .marshal().json(JsonLibrary.Jackson)
                .to("log://camelLogger?level=DEBUG");
    }

    private class ParamProcessor implements Processor {

        @Override
        public void process(Exchange exchange) throws Exception {
            String queryString = exchange.getIn().getHeader(Exchange.HTTP_QUERY, String.class);

            MultivaluedMap<String, String> queryMap = JAXRSUtils.getStructuredParams(queryString, "&", false, false);

            for (Map.Entry<String, List<String>> eachQueryParam : queryMap.entrySet()) {
                exchange.setProperty(eachQueryParam.getKey(), eachQueryParam.getValue());
            }


        }

    }
}

Interface

RestService
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package me.rerun.karafcxfcamel.rest;

import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

public interface RestService {

    @GET
    @Path("search")
    @Produces(MediaType.APPLICATION_JSON)
    public String sourceResultsFromTwoSources();

}
  

Implementation

RestServiceImpl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package me.rerun.karafcxfcamel.rest;

import me.rerun.karafcxfcamel.model.AgePhoneResult;
import me.rerun.karafcxfcamel.model.NameEmailResult;
import me.rerun.karafcxfcamel.service.base.AgePhoneService;
import me.rerun.karafcxfcamel.service.base.NameEmailService;
import me.rerun.karafcxfcamel.service.impl.AgePhoneServiceImpl;
import org.apache.camel.Exchange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

public class RestServiceImpl implements RestService {

    private static Logger logger= LoggerFactory.getLogger(AgePhoneServiceImpl.class);

    private NameEmailService nameEmailService;
    private AgePhoneService agePhoneService;

    public RestServiceImpl(){
    }

    //Do nothing. Camel intercepts and routes the requests
    public String sourceResultsFromTwoSources() {
        return null;
    }


    public NameEmailResult getNameEmailResult(Exchange exchange){
        logger.info("Invoking getNameEmailResult from RestServiceImpl");

        String name=getFirstEntrySafelyFromList(exchange.getProperty("name", List.class));
        String email=getFirstEntrySafelyFromList(exchange.getProperty("email", List.class));

        return nameEmailService.getNameAndEmail(name, email);
    }


    public AgePhoneResult getAgePhoneResult(Exchange exchange){
        logger.info("Invoking getAgePhoneResult from RestServiceImpl");

        String age=getFirstEntrySafelyFromList(exchange.getProperty("age", List.class));
        String phone=getFirstEntrySafelyFromList(exchange.getProperty("phone", List.class));

        return agePhoneService.getAgePhoneResult(age, phone);
    }

    public NameEmailService getNameEmailService() {
        return nameEmailService;
    }

    public AgePhoneService getAgePhoneService() {
        return agePhoneService;
    }

    public void setNameEmailService(NameEmailService nameEmailService) {
        this.nameEmailService = nameEmailService;
    }

    public void setAgePhoneService(AgePhoneService agePhoneService) {
        this.agePhoneService = agePhoneService;
    }

    private String getFirstEntrySafelyFromList(List<String> list){

        if (list!=null && !list.isEmpty()){
            return list.get(0);
        }
        return null;
    }
}

Reference

Camel Mailing List Question

SLF4J Binding for ADFLogger - the Missing Piece

For reasons best left untold, in my day job, I was expected to provide an SLF4J Adapter for ADF Logger Oracle ADF. Not surprisingly, slf4j does not have an adapter for ADFLogger but since ADFLogger was just a gentle wrapper over Java Util Logging, it took a little over an hour to fill that gap.

The testcases (more like main programs) in the repository will confirm that the adapter framework plays well with the Oracle Diagnostic Logging –without breaking the format of the log messages. (which is more or less the only advantage which the ADFLogger provides).

You could download the entire codebase from the repository at github. Optionally, if you are interested only in the binding jar, please download it here.

Log Levels :

Considering there are many log levels on the ADF Logger (7) and fewer levels in SLF4J (5), a compromise was made on certain levels on the ADF Logger. The mapping is made as follows :

NOTE : Support for Log levels FINER and CONFIG were dropped during this adaptation.

Usage :

Just go ahead and replace your ADFLogger with SLF4J instantiations.

1
private static final Logger slfLogger = LoggerFactory.getLogger(LoggingChecker.class);

instead of

1
private static final ADFLogger adfLogger= ADFLogger.createADFLogger(LoggingChecker.class);

Logging methods

a. Logging to various levels

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Test
  public void testLoggerLevels(){
      
        adfLogger.finest("finest Message from ADF Logger");
        slfLogger.trace("finest Message from SLF Logger");

        adfLogger.fine("fine Message from ADF Logger");
        slfLogger.debug("fine Message from SLF Logger");

        adfLogger.info("info Message from ADF Logger");
        slfLogger.info("info Message from SLF Logger");

        adfLogger.warning("warning Message from ADF Logger");
        slfLogger.warn("warning Message from SLF Logger");

        adfLogger.severe("severe Message from ADF Logger");
        slfLogger.error("severe Message from SLF Logger");
     }

b. Logging exceptions

where dummyException inherits a Throwable

c. Logging parameters

Development Notes :

  1. The ADF Logger Adapter framework composes of two core classes (the ADFLoggerFactory.java and the ADFLoggerAdapter.java) and three other helper classes.
  2. The Adapter can support multiple loggers including the ROOT logger of the ADFLogger (which is the default and has an empty string as its name).
  3. It is no surprise that SLF4J uses the ADFLoggerFactory.java to instantiate the ADFLoggerAdapter.
  4. For each Logger instance, the ADFLoggerAdapter.java composes an instance of ADFLogger (bound to the logger name) and delegates the call to the ADFLogger itself.
  5. The fillCallerData method of the ADFLoggerAdapter.java filters the stackframes of the ADFLoggerAdapter from the entire logging stack so that the caller class and method name is retained as the one from the host application and not of the Adapter framework classes.

Testcase (well, sort of)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
public class TestClass {


    private static final ADFLogger adfLogger= ADFLogger.createADFLogger(TestClass.class);
    private static final Logger slfLogger = LoggerFactory.getLogger(TestClass.class);

    private List<String> dummyList1=null;
    private List<Integer> dummyList2=null;

    private Exception dummyException=null;

    @Before
    public void setUp(){
      
      dummyList1=new ArrayList<String>();
      dummyList1.add("Rock");
      dummyList1.add("Paper");
      dummyList1.add("Scissors");
      
      dummyList2=new ArrayList<Integer>();
      dummyList2.add(21);
      dummyList2.add(22);
      dummyList2.add(23);
      
      dummyException=new Exception("Mind blowing Exception");
      
    }

    @Test
  public void testLoggerLevels(){
      
        adfLogger.finest("finest Message from ADF Logger");
        slfLogger.trace("finest Message from SLF Logger");

        adfLogger.fine("fine Message from ADF Logger");
        slfLogger.debug("fine Message from SLF Logger");

        adfLogger.info("info Message from ADF Logger");
        slfLogger.info("info Message from SLF Logger");

        adfLogger.warning("warning Message from ADF Logger");
        slfLogger.warn("warning Message from SLF Logger");

        adfLogger.severe("severe Message from ADF Logger");
        slfLogger.error("severe Message from SLF Logger");
  }


    @Test
      public void testException(){
          
          adfLogger.severe("severe Message from ADF Logger", dummyException);
          slfLogger.error("severe Message from SLF Logger", dummyException);

      }

    @Test
      public void testParameters(){
          
          adfLogger.severe("severe Message from ADF Logger Param :{0}", dummyList1 );
          slfLogger.error("severe Message from SLF Logger Param :{}", dummyList1);

          adfLogger.severe("severe Message from ADF Logger Param 1 :[{0}] \n [{1}]", new Object[]{dummyList1, dummyList2} );
          slfLogger.error("severe Message from SLF Logger Param :{} \n {} ", dummyList1, dummyList2);
          slfLogger.error("severe Message from SLF Logger Param :[{}] \n [{}] ", new Object[]{dummyList1, dummyList2});

      }

Output :

Logger levels :

1
2
3
4
5
6
7
8
9
10
11
12
Jul 01, 2013 4:21:18 PM org.slf4j.test.TestClass testLoggerLevels
INFO: info Message from ADF Logger
Jul 01, 2013 4:21:18 PM org.slf4j.test.TestClass testLoggerLevels
INFO: info Message from SLF Logger
Jul 01, 2013 4:21:18 PM org.slf4j.test.TestClass testLoggerLevels
WARNING: warning Message from ADF Logger
Jul 01, 2013 4:21:18 PM org.slf4j.test.TestClass testLoggerLevels
WARNING: warning Message from SLF Logger
Jul 01, 2013 4:21:18 PM org.slf4j.test.TestClass testLoggerLevels
SEVERE: severe Message from ADF Logger
Jul 01, 2013 4:21:18 PM org.slf4j.test.TestClass testLoggerLevels
SEVERE: severe Message from SLF Logger

Testing Parameters :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Jul 01, 2013 4:32:04 PM org.slf4j.test.TestClass testParameters
SEVERE: severe Message from ADF Logger Param :[Rock, Paper, Scissors]
Jul 01, 2013 4:32:04 PM org.slf4j.test.TestClass testParameters
SEVERE: severe Message from SLF Logger Param :[Rock, Paper, Scissors]
Jul 01, 2013 4:32:04 PM org.slf4j.test.TestClass testParameters
SEVERE: severe Message from ADF Logger Param 1 :[[Rock, Paper, Scissors]]
 [[21, 22, 23]]
Jul 01, 2013 4:32:04 PM org.slf4j.test.TestClass testParameters
SEVERE: severe Message from SLF Logger Param :[Rock, Paper, Scissors]
 [21, 22, 23]
Jul 01, 2013 4:32:04 PM org.slf4j.test.TestClass testParameters
SEVERE: severe Message from SLF Logger Param :[[Rock, Paper, Scissors]]
 [[21, 22, 23]]


Testing Exception :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
Jul 01, 2013 4:32:28 PM org.slf4j.test.TestClass testException
SEVERE: severe Message from ADF Logger
java.lang.Exception: Mind blowing Exception
  at org.slf4j.test.TestClass.setUp(TestClass.java:38)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
  at java.lang.reflect.Method.invoke(Unknown Source)
  at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:45)
  at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
  at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:42)
  at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:27)
  at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:263)
  at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:68)
  at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:47)
  at org.junit.runners.ParentRunner$3.run(ParentRunner.java:231)
  at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:60)
  at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:229)
  at org.junit.runners.ParentRunner.access$000(ParentRunner.java:50)
  at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:222)
  at org.junit.runners.ParentRunner.run(ParentRunner.java:300)
  at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)
  at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
  at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:467)
  at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:683)
  at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:390)
  at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:197)

Jul 01, 2013 4:32:28 PM org.slf4j.test.TestClass testException
SEVERE: severe Message from SLF Logger
java.lang.Exception: Mind blowing Exception
  at org.slf4j.test.TestClass.setUp(TestClass.java:38)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
  at java.lang.reflect.Method.invoke(Unknown Source)
  at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:45)
  at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
  at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:42)
  at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:27)
  at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:263)
  at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:68)
  at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:47)
  at org.junit.runners.ParentRunner$3.run(ParentRunner.java:231)
  at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:60)
  at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:229)
  at org.junit.runners.ParentRunner.access$000(ParentRunner.java:50)
  at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:222)
  at org.junit.runners.ParentRunner.run(ParentRunner.java:300)
  at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)
  at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
  at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:467)
  at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:683)
  at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:390)
  at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:197)

  

Building Camel-CXF REST Service in OSGi for Karaf - Multicasting and Aggregation

Please check out my other post on building plain CXF services (without Camel) in OSGi on Karaf.

This is a basic tutorial on how to

  1. create a CXF REST service
  2. multicast (and parallelize) the incoming request using Camel
  3. source data from two different services
  4. aggregate the response and
  5. finally return the consolidated result as JSON to the the end user.

You could download the entire codebase from github.

What this application does, in simple terms

The result expected from this service is a hardcoded response which looks like

As you could see from the image, the top portion of the response is sourced from a service called NameEmailService and the second portion of the response is sourced from a service called AgePhoneService. The calls to enrich both the data are done concurrently and the consolidated result entity - ConsolidatedSearchResult is populated.

The Project structure looks like this :

There are two baby steps for Step 1.

Step 1.a - Create a CXF REST Service

As you might have guessed, there’s nothing complicated in this step. Just an interface and an implementation.

Interface

RestService
1
2
3
4
5
6
7
8
9
10
@Path("rest")
public interface RestService {


    @GET
    @Path("query/{queryString}")
    @Produces(MediaType.APPLICATION_JSON)
    public String sourceResultsFromTwoSources(@PathParam("queryString") String queryString);

}

Implementation

RestServiceImpl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class RestServiceImpl implements RestService {

    private static Logger logger= LoggerFactory.getLogger(AgePhoneServiceImpl.class);

    private NameEmailService nameEmailService;
    private AgePhoneService agePhoneService;

    public RestServiceImpl(){
    }

    //Do nothing. Camel intercepts and routes the requests
    public String sourceResultsFromTwoSources(String queryString) {
        return null;
    }


    public NameEmailResult getNameEmailResult(String queryString){
        logger.info("Invoking getNameEmailResult from RestServiceImpl");
        return nameEmailService.getNameAndEmail(queryString);
    }


    public AgePhoneResult getAgePhoneResult(String queryString){
        logger.info("Invoking getAgePhoneResult from RestServiceImpl");
        return agePhoneService.getAgePhoneResult(queryString);
    }

    public NameEmailService getNameEmailService() {
        return nameEmailService;
    }

    public AgePhoneService getAgePhoneService() {
        return agePhoneService;
    }

    public void setNameEmailService(NameEmailService nameEmailService) {
        this.nameEmailService = nameEmailService;
    }

    public void setAgePhoneService(AgePhoneService agePhoneService) {
        this.agePhoneService = agePhoneService;
    }
}

Note that the method implementation sourceResultsFromTwoSources returns a null. The truth is that this method doesn’t even get called when making a REST call. Camel intercepts all requests to the URL and routes it to various endpoints (calls two methods - getNameEmailResult() and getAgePhoneResult(), in our case).

Step 1.b - Create the Service Implementation

Kiddish implementations of the NameEmailService and the AgePhoneService are below :

NameEmailServiceImpl
1
2
3
4
5
6
7
8
9
public class NameEmailServiceImpl implements NameEmailService {

    public NameEmailResult getNameAndEmail(String queryString){

        return new NameEmailResult("Arun", "arun@arunma.com");

    }

}
AgePhoneServiceImpl
1
2
3
4
5
6
public class AgePhoneServiceImpl implements AgePhoneService {

    public AgePhoneResult getAgePhoneResult(String queryString){
        return new AgePhoneResult(32, "111-222-333");
    }
}

Step 2, 3, 4 & 5

Well, I lied when I said 2,3,4 and 5 were 4 steps. They are all done as a single step using Camel routing and its Enterprise Integration Pattern implementations.

RestToBeanRouter
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class RestToBeanRouter extends RouteBuilder {


    @Override
    public void configure() throws Exception {

        from ("cxfrs://bean://rsServer")
                .multicast()
                .parallelProcessing()
                .aggregationStrategy(new ResultAggregator())
                .beanRef("restServiceImpl", "getNameEmailResult")
                .beanRef("restServiceImpl", "getAgePhoneResult")
                .end()
                .marshal().json(JsonLibrary.Jackson)
                .to("log://camelLogger?level=DEBUG");
    }
}

Our Routing explained

Simply put, what our routerbuilder does is that it

1) from ("cxfrs://bean://rsServer") Intercepts all requests to a JAX-RS server endpoint defined in the rest-blueprint.xml as

rest-blueprint.xml
1
2
3
4
5
      <cxf:rsServer id="rsServer" address="/karafcxfcamel"
                  serviceClass="me.rerun.karafcxfcamel.rest.RestServiceImpl"
                  loggingFeatureEnabled="true" />


2) The .multicast() forwards the original request untouched to

1. `getNameEmailResult`  &
2. `getAgePhoneResult` methods in `RestServiceImpl` 

3) The .parallelProcessing() places concurrent calls to the methods.

4) The .aggregationStrategy(new ResultAggregator()) specifies how the results from various multicasted sources should be, well, aggregated.

Our aggregator looks like :

ResultAggregator
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class ResultAggregator implements AggregationStrategy {

    @Override
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {


        ConsolidatedSearchResult consolidatedSearchResult=null;

        if (oldExchange==null){
            consolidatedSearchResult=new ConsolidatedSearchResult();
        }
        else{
            consolidatedSearchResult=oldExchange.getIn().getBody(ConsolidatedSearchResult.class);
        }


        NameEmailResult nameEmailResult=newExchange.getIn().getBody(NameEmailResult.class);
        AgePhoneResult agePhoneResult=newExchange.getIn().getBody(AgePhoneResult.class);

        if (nameEmailResult!=null){
            consolidatedSearchResult.setNameEmailResult(nameEmailResult);
        }

        if (agePhoneResult!=null){
            consolidatedSearchResult.setAgePhoneResult(agePhoneResult);
        }

        newExchange.getIn().setBody(consolidatedSearchResult);

        return newExchange;
    }
}

Our Aggregator explained

The aggregate method in our ResultAggregator is a little crude but does the job.

  1. The aggregate method gets called for all multicasted endpoints whenever they finish.

  2. So, the first time, the oldExchange will be null. We take that as an opportunity to construct the final consolidated result entity that we wanted to respond to the user.

  3. We check whether the newExchange that comes in is the result of a call to the NameEmailService or AgePhoneService and populate the consolidated entity accordingly.

  4. Finally, we return the consolidated entity - the returning does two jobs.

    1. The consolidated entity comes in as oldExchange for the next call to the aggregate method. (more like chaining - the last returned object from the entity is the one which comes in as the incoming exchange for the next call)

    2. Gets returned back to the user if it is the last call of aggregate (all multicast endpoints calls are complete).