Message channel support

Message channels represent the in memory messaging solution in Citrus. Producer and consumer components are linked via channels exchanging messages in memory. As this transport mechanism comes from Spring Integration API (http://www.springsource.org/spring-integration) and Citrus itself uses a lot of Spring APIs, especially those from Spring Integration you are able to connect to all Spring messaging adapters via these in memory channels.

Citrus offers a channel components that can be used both by Citrus and Spring Integration. The conclusion is that Citrus supports the sending and receiving of messages both to and from Spring Integration message channel components. This opens up a lot of great possibilities to interact with the Spring Integration transport adapters for FTP, TCP/IP and so on. In addition to that the message channel support provides us a good way to exchange messages in memory.

Citrus provides support for sending and receiving JMS messages. We have to separate between synchronous and asynchronous communication. So in this chapter we explain how to setup JMS message endpoints for synchronous and asynchronous outbound and inbound communication

Note The message channel configuration components use the default "citrus" configuration namespace and schema definition. Include this namespace into your Spring configuration in order to use the Citrus configuration elements. The namespace URI and schema location are added to the Spring configuration XML file as follows.

<beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:citrus-jms="http://www.citrusframework.org/schema/config"
        xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.citrusframework.org/schema/config
        http://www.citrusframework.org/schema/config/citrus-config.xsd">

    [...]

</beans>

After that you are able to use customized Citrus XML elements in order to define the Spring beans.

Channel endpoint

Citrus offers a channel endpoint component that is able to create producer and consumer components. Producer and consumer send and receive messages both to and from a channel endpoint. By default the endpoint is asynchronous when configured in the Citrus application context. With this component you are able to access message channels directly:

<citrus:channel-endpoint id="helloEndpoint" channel="helloChannel"/>

<si:channel id="helloChannel"/>

The Citrus channel endpoint references a Spring Integration channel directly. Inside your test case you can reference the Citrus endpoint as usual to send and receive messages. We will see this later in some example code listings.

Note The Spring Integration configuration components use a specific namespace that has to be included into your Spring application context. You can use the following template which holds all necessary namespaces and schema locations:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:citrus="http://www.citrusframework.org/schema/config"
    xmlns:si="http://www.springframework.org/schema/integration"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
            http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.citrusframework.org/schema/config 
            http://www.citrusframework.org/schema/config/citrus-config.xsd
            http://www.springframework.org/schema/integration
            http://www.springframework.org/schema/integration/spring-integration.xsd">
</beans>

The Citrus channel endpoint also supports a customized message channel template that will actually send the messages. The customized template might give you access to special configuration possibilities. However it is optional, so if no message channel template is defined in the configuration Citrus will create a default template.

<citrus:channel-endpoint id="helloEndpoint"
                            channel="helloChannel"
                            message-channel-template="myMessageChannelTemplate"/>

The message sender is now ready to publish messages to the defined channel. The communication is supposed to be asynchronous, so the producer is not able to process a reply message. We will deal with synchronous communication and reply messages later in this chapter. The message producer just publishes messages to the channel and is done. Interacting with the endpoints in a test case is quite easy. Just reference the id of the endpoint in your send and receive test actions

<send endpoint="helloEndpoint">
    <message>
        <payload>
            <v1:HelloRequest xmlns:v1="http://citrusframework.org/schemas/HelloService.xsd">
                <v1:Text>Hello World!</v1:Text>
            </v1:HelloRequest>
        </payload>
    </message>
</send>

<receive endpoint="helloEndpoint">
    <message>
        <payload>
            <v1:HelloResponse xmlns:v1="http://citrusframework.org/schemas/HelloService.xsd">
                <v1:Text>Hello Citrus!</v1:Text>
            </v1:HelloResponse>
        </payload>
    </message>
</receive>

As you can see Citrus is also able to receive messages from the same Spring Integration message channel destination. We just references the same channel-endpoint in the receive action.

As usual the receiver connects to the message destination and waits for messages to arrive. The user can set a receive timeout which is set to 5000 milliseconds by default. In case no message was received in this time frame the receiver raises timeout errors and the test fails.

Synchronous channel endpoints

The synchronous channel producer publishes messages and waits synchronously for the response to arrive on some reply channel destination. The reply channel name is set in the message's header attributes so the counterpart in this communication can send its reply to that channel. The basic configuration for a synchronous channel endpoint component looks like follows:

<citrus:channel-sync-endpoint id="helloSyncEndpoint"
                            channel="helloChannel"
                            reply-timeout="1000"
                            polling-interval="1000"/>

Synchronous message channel endpoints usually do poll for synchronous reply messages for processing the reply messages. The poll interval is an optional setting in order to manage the amount of reply message handshake attempts. Once the endpoint was able to receive the reply message synchronously the test case can receive the reply. In case all message handshake attempts do fail because the reply message is not available in time we raise some timeout error and the test will fail.

Note By default the channel endpoint uses temporary reply channel destinations. The temporary reply channels are only used once for a single communication handshake. After that the reply channel is deleted again. Static reply channels are not supported as it has not been in scope yet.

When sending a message to this endpoint in the first place the producer will wait synchronously for the response message to arrive on the reply destination. You can receive the reply message in your test case using the same endpoint component. So we have two actions on the same endpoint, first send then receive.

<send endpoint="helloSyncEndpoint">
    <message>
        <payload>
            <v1:HelloRequest xmlns:v1="http://citrusframework.org/schemas/HelloService.xsd">
                <v1:Text>Hello World!</v1:Text>
            </v1:HelloRequest>
        </payload>
    </message>
</send>

<receive endpoint="helloSyncEndpoint">
    <message>
        <payload>
            <v1:HelloResponse xmlns:v1="http://citrusframework.org/schemas/HelloService.xsd">
                <v1:Text>Hello Citrus!</v1:Text>
            </v1:HelloResponse>
        </payload>
    </message>
</receive>

In the last section we saw that synchronous communication is based on reply messages on temporary reply channels. We saw that Citrus is able to publish messages to channels and wait for reply messages to arrive on temporary reply channels. This section deals with the same synchronous communication over reply messages, but now Citrus has to send dynamic reply messages to temporary channels.

The scenario we are talking about is that Citrus receives a message and we need to reply to a temporary reply channel that is stored in the message header attributes. We handle this synchronous communication with the same synchronous channel endpoint component. When initiating the communication by receiving a message from a synchronous channel endpoint you are able to send a synchronous response back. Again just use the same endpoint reference in your test case. The handling of temporary reply destinations is done automatically behind the scenes. So we have again two actions in our test case, but this time first receive then send.

<receive endpoint="helloSyncEndpoint">
    <message>
        <payload>
            <v1:HelloRequest xmlns:v1="http://citrusframework.org/schemas/HelloService.xsd">
                <v1:Text>Hello World!</v1:Text>
            </v1:HelloRequest>
        </payload>
    </message>
</receive>

<send endpoint="helloSyncEndpoint">
    <message>
        <payload>
            <v1:HelloResponse xmlns:v1="http://citrusframework.org/schemas/HelloService.xsd">
                <v1:Text>Hello Citrus!</v1:Text>
            </v1:HelloResponse>
        </payload>
    </message>
</send>

The synchronous message channel endpoint will handle all reply channel destinations and provide those behind the scenes.

Message selectors on channels

Unfortunately Spring Integration message channels do not support message selectors on header values as described in message-selector. With Citrus version 1.2 we found a way to also add message selector support on message channels. We had to introduce a special queue message channel implementation. So first of all we use this new message channel implementation in our configuration.

<citrus:channel id="orderChannel" capacity="5"/>

The Citrus message channel implementation extends the queue channel implementation from Spring Integration. So we can add a capacity attribute for this channel. That's it! Now we use the message channel that supports message selection. In our test we define message selectors on header values as described in message-selectorand you will see that it works.

In addition to that we have implemented other message filter possibilities on message channels that we discuss in the next sections.

Root QName Message Selector

You can use the XML root QName of your message as selection criteria. Let's see how this works in a small example:

We have two different XML messages on a message channel waiting to be picked up by a consumer.

<HelloMessage xmlns="http://citrusframework.org/schema">Hello Citrus</HelloMessage>
<GoodbyeMessage xmlns="http://citrusframework.org/schema">Goodbye Citrus</GoodbyeMessage>

We would like to pick up the GoodbyeMessage in our test case. The HelloMessage should be left on the message channel as we are not interested in it right now. We can define a root qname message selector in the receive action like this:

<receive endpoint="orderChannelEndpoint">
    <selector>
        <element name="root-qname" value="GoodbyeMessage"/>
    </selector>
    <message>
        <payload>
            <GoodbyeMessage xmlns="http://citrusframework.org/schema">Goodbye Citrus</GoodbyeMessage>
        </payload>
    </message>
</receive>

The Citrus receiver picks up the GoodbyeMessage from the channel selected via the root qname of the XML message payload. Of course you can also combine message header selectors and root qname selectors as shown in this example below where a message header sequenceId is added to the selection logic.

<selector>
    <element name="root-qname" value="GoodbyeMessage"/>
    <element name="sequenceId" value="1234"/>
</selector>

As we deal with XML qname values, we can also use namespaces in our selector root qname selection.

<selector>
    <element name="root-qname" value="{http://citrusframework.org/schema}GoodbyeMessage"/>
</selector>

XPath Evaluating Message Selector

It is also possible to evaluate some XPath expression on the message payload in order to select a message from a message channel. The XPath expression outcome must match an expected value and only then the message is consumed form the channel.

The syntax for the XPath expression is to be defined as the element name like this:

<selector>
    <element name="xpath://Order/status" value="pending"/>
</selector>

The message selector looks for order messages with status="pending" in the message payload. This means that following messages would get accepted/declined by the message selector.

<Order><status>pending</status></Order> = ACCEPTED
<Order><status>finished</status></Order> = NOT ACCEPTED

Of course you can also use XML namespaces in your XPath expressions when selecting messages from channels.

<selector>
    <element name="xpath://ns1:Order/ns1:status" value="pending"/>
</selector>

Namespace prefixes must match the incoming message - otherwise the XPath expression will not work as expected. In our example the message should look like this:

<ns1:Order xmlns:ns1="http://citrus.org/schema"><ns1:status>pending</ns1:status></ns1:Order>

Knowing the correct XML namespace prefix is not always easy. If you are not sure which namespace prefix to choose Citrus ships with a dynamic namespace replacement for XPath expressions. The XPath expression looks like this and is most flexible:

<selector>
    <element name="xpath://{http://citrus.org/schema}:Order/{http://citrus.org/schema}:status" 
                value="pending"/>
</selector>

This will match all incoming messages regardless the XML namespace prefix that is used.