3.5 Congestion Charge - Messaging
The Congestion Charge example shows how to model and implement transactional messaging in CloudTran.
For simplicity, the default setup uses http protocol for messaging, the application was also tested with the following communication protocols: TCP, Amazon Simple Queue Service, OpenSpaces queue provider.
Mule 2.2.1 is used as ESB (Enterprise Service Bus).
This application is configured to use CloudTran transactions.
The CloudTran transaction is started as a message is received from the ESB.
All the program actions are done inside a transaction, so they are all committed or rolled back together.
This includes the sending of any outbound messages.
When a message is processed, it is always removed from the ESB queue.
The example starts the Mule server in the CongestionZoneProcessor PU (Processing Unit).
It is also possible to run Mule in a standalone server, separate from the CloudTran processing grid.
The Congestion Charge example is located at
<EclipseDir>/plugins/com.cloudtran.builder_x.y.z/jeewiz/examples/CloudTran/CongestionCharge/workspace
|
where
<EclipseDir> is your Eclipse directory and
X, Y and Z represent the CloudTran product major, minor and release numbers.
The application model is the CongestionCharge.osm file in the CongestionCharge project.
3.5.1 The Scenario
|
The Congestion Charge scheme was introduced in London in 2003.
Motorists who drive into central London during working hours have to pay a charge.
All routes into the zone are monitored by cameras that can read number plates to enforce the charge.
On opening the example model (CongestionCharge.osm in the CongestionCharge project) you will see three PUs
- Congestion Zone - the central London charging zone. Cars driving into this zone trigger a charging event.
- Road Users - motorists! This PU has a feeder class that sends Vehicle objects into the Congestion Zone
- DVLA - the Driver and Vehicle Licensing Agency
When the Road Users feeder class sends a Vehicle into the Congestion Zone, it triggers a charging event.
The business logic determines if the congestion charge has been paid and if it has it issues a Receipt to the Vehicle owner.
This is delivered by another Message Queue back to the Road User feeder.
In the same transaction the Receipt is persisted to a data source.
In this example, the Vehicles are fed into the Congestion Zone indirectly, via messages.
(The Vehicles could just as well be fed into the zone by service calls.)
3.5.2 Message Queues and Endpoints
|
Vehicles arriving at the congestion zone maps onto the application objects and messaging infrastructure as follows:
- The vehicle is represented by a Vehicle object and becomes the payload in a message
- From the point of view of Mule, the feeder sends the message via a client outbound endpoint.
- The Vehicle object is fed into the Congestion Zone PU via an inbound endpoint.
The receipt for the payment of the Congestion Charge is also sent via Mule.
In this interaction:
- The receipt is the payload in a message, send via the outbound endpoint of the CloudTran application
- The receipt reaches the Client through a client inbound endpoint.
3.5.3 Detailed Message flow
|
The previous section introduced the Mule's messaging terminology. This section describes the message-passing flow in detail.
- The client sends the zone entry indication (i.e. the Vehicle instance) as a Mule message using http protocol
- When an entry request arrives to Mule it puts it into the CongestionZoneProcessor PU space.
- When a Vehicle is put into the space, it triggers the Receiver.
In other words, for each Vehicle that arrives in the queue, the Receiver's handler method -
in this case, 'handleChargePayment()' - is called.
This method is in CongestionChargeApp_CongestionZoneProcessor/impl directory.
The class is
com.cc.zone.CongestionZoneProcessor_Actions.
As well as the object sent in by Mule, the handler method also has a DistributedTxAttributes parameter.
This represents the transaction started by the wrapper method and should be used on service calls and persistence methods.
- handleChargePayment() validates the license by calling the DVLA LicenseService method 'isLicenceValid()'.
In this sample, isLicenseValid() always returns true.
The reason this is there is to demonstrate calling a remote service.
There are predefined proxies you can use defined in 'CtProxies', which gives access to services like 'LicenseService':
Boolean isItValid = CtProxies.LicenceService.isLicenceValid
- handleChargePayment() constructs a receipt and
- saves it to the persistent data source
- sends a copy back to the client via Mule. To do this, it uses a Sender object that wraps the Mule interface.
Wrapping the Mule interface means that if you need to migrate your application to another ESB,
you just need to change the configuration. The CloudTran generation framework is set up to handle this sort of variation.
- When the message handler method returns abnormally - with an exception - there are two cases.
-
First, the exception can be TransactionExceptionRetriable.
This indicates that something caused the transaction to timeout.
This could be permanent - all the replicas of a PU have gone offline and contacting them fails -
but it is more likely to be transient, such as a reconfiguration (e.g. a scale-out or scale-up event),
which is worth retrying.
In this case, the transaction started by the wrapper is aborted and the
handler method is called once (and only once) more after 5 seconds, with a new transaction.
If the retry fails - another TransactionExceptionRetriable is received -
the exception handling described next is executed.
-
When two TransactionExceptionRetriable's, or any other exception, are thrown,
the handler is aborted. The caller can distinguish between retriable or not:
TransactionExceptionRetriable is retriable; all other exceptions indicate
a business problem (e.g. a validation
failure in the data presented) or an error in the implementation,
so the transaction started by the wrapper is aborted.
In this case, the transaction started by the framework is aborted
and the failure handler, described below, is called.
Note that the message has been consumed from the queue.
Normally, an application will propagate the exception to the client.
If this is not sufficient, any further retries must be implemented by the failure handler.
- When the message handler method returns normally,
the wrapper method generated by CloudTran commits the transaction started by the framework.
This will encompass the sending of any messages out of the application and entity persistence.
The processing continues with the next mainline step, documented after the exception case, below.
- Mule puts the response object into it's internal queue.
- The client polls Mule's internal queue and takes out the receipt.
3.5.4 Receivers
|
The modelling of the Endpoints is easy,
as long as you remember that this is a model of the Congestion Charge System,
so any Endpoints should be view from the perspective of a particular PU in the Congestion Charge System.
The Vehicle's final destination is the Congestion Zone and so is very much part of the system, triggered from the Client.
The Congestion Zone PU needs to receive this message.
The Vehicle's origin, as mentioned previously, is of no interest to the Congestion Zone.
So, the only Endpoint that needs to be modelled is a 'Receiver' -
the inbound endpoint, at which the Vehicle is 'received' in the Congestion Zone.
The Receiver is a child of the PU, which defines the PU that will process the message.
It also defines the type of the received object - in this case, the Vehicle.
The name of the Receiver must be unique.
The business method has the same name as the modelled Receiver.
In our example, it is in the
CongestionZoneProcessor project, in the impl directory, in the com.cc.zone.CongestionZoneProcessor_Actions class.
This is where you have to add business logic to handle incoming messages (vehicles in our example).
In this example, the business method creates a new Receipt and sends it back to the client via the Sender, and
in the same transaction saves it to persistent storage (with a save() call).
public void handleChargePayment( final Vehicle bean, final DistributedTxAttributes distAttr )
{
/* method uid:27445744-6940-4aa7-18: .......... method code goes below this comment
*******************************************************************************************/
// process the vehicle
log.info( " ------- processVehicle() A VEHICLE IS IN THE SPACE - " + bean);
Boolean isItValid = CtProxies.LicenceService.isLicenceValid( bean.getLicencePlate() );
log.info( " ------- processVehicle() isItValid = " + isItValid );
if ( !isItValid.booleanValue() )
{
log.info( " ------- processVehicle() The Vehicle " + bean.getLicencePlate() +
" is not a valid vehicle - call the police ");
}
try
{
receipt.save( distAttr );
if( Trace.DEBUG ) Trace.trace("receit: " + receipt );
}
catch (TransactionExceptionRetriable e)
{
Utils.error( "Failed to save entity " + receipt, e );
}
...
//Send the response
/* method end:27445744-6940-4aa7-18: .......... method code goes above this comment
==========================================================================================*/
}
|
This is an example of implementation code that is 'round-tripped' - the main body of code is what you edit.
If there are any changes in the application, such as renaming classes or methods, then
CloudTran will move your implemementation to the correct place after the renaming.
3.5.5 Senders
|
Senders are the opposite of Receivers. They are endpoints into the messaging infrastructure for outbound messages.
In this example, the Sender sends a Receipt object.
This can be seen in the 'handleChargePayment' method: it instantiates a Receipt and calls the 'sendReceipt' method, via the Sender.
public void handleChargePayment( final Vehicle bean, final DistributedTxAttributes distAttr )
{
/* method uid:27445744-6940-4aa7-18: .......... method code goes below this comment
*******************************************************************************************/
// process the vehicle
...
try
{
sendReceipt( receiptBean, distAttr );
if( Trace.DEBUG ) Trace.trace("receipt: " + receiptBean );
}
catch (TransactionExceptionRetriable e)
{
throw new TransactionExceptionRetriable( "Retriable exception sendinding receipt bean " + receiptBean + " for Vehicle=" + bean, e );
}
catch( Exception e )
{
// This logs the error and then throws a RuntimeException, which will abort the transaction.
Utils.error( "Failed to send receipt: " + receipt, e );
}
/* method end:27445744-6940-4aa7-18: .......... method code goes above this comment
==========================================================================================*/
}
|
This Sender is modelled, together with the type of the outbound object to be sent in the message.
In the same way as the originating endpoint of the Vehicle message is not relevant to the Congestion Charge System so the final destination
Endpoint of the returning Receipt message is not relevant to the Congestion Charge System, so the modeller only need model the Endpoint in this system.
Like the Receiver, the Sender requires a unique name and an object type. The generated code (shown below) provides a public method based on the Sender's name which
can be used by the developer to send messages through the endpoint.
3.5.5.1 Sending Failure Indications
|
The sender has a modelled object - the Receipt object, in our example.
This is used to generate a message for the happy path - to send a result to the outbound endpoint when the handler executes successfully.
But when there is an exception, the sender also provides a method - with a "_Exception" extension - to send back the exception.
For example
public static void sendReceipt_Exception( final Object response )
{
CtProxies.SenderService_SendReceipt.sendException( response );
}
|
As it can be seen the "sendReceipt_Exception" method accepts any kind of object, this means that the developer is not restricted to send back only exception in case of error.
A typical usage of the above method is to call it when there is an error during processing the request. In that case a generated error handling method is called with a - "_Failure" extension - to hanlde the error.
public void handleChargePayment_Failure( final Vehicle bean, final Exception exception )
{
/* method uid:27445744-6940-4aa7-18_Failure: .......... method code goes below this comment
*******************************************************************************************/
sendReceipt_Exception( exception );
Trace.log( "Unrecoverable error in 'handleChargePayment', exception is sent back to the client, exception:", exception );
/* method end:27445744-6940-4aa7-18_Failure: .......... method code goes above this comment
*******************************************************************************************/
}
|
This method receives two parameters: the input object of the request and the exception what occured during the request processing.
3.5.6 Mule Configuration Files
|
In addition to generating the code CloudTran will generate some of the ESB (Mule in this case) configuration too. The generated configuration is written
to a file called
This can be found at the top level of the main project - in this case CongestionCharge.
This file is not the completed Mule configuration but it will be included is the Endpoints modelled by the Receivers or Senders.
The deployer is responsible for providing any further configuration.(see Mule Configuration Properties)
All the generated Mule configuration is based on the viewpoint of Mule and not the system being modelled. Whilst the system is only concerned with
the endpoints that touch the system Mule has to know about both (Inbound and Outbound) Endpoints.
Where the Receiver models (from the system's point of view) an Inbound Message, Mule will still need a start (Inbound) Endpoint and a finish (Outbound) Endpoint to handle
this message.
So, the generated Mule configuration for the Receiver is actually (from Mule's viewpoint) the Outbound Endpoint of the message. This configuration is shown below
<spring:beans>
<os-core:space id="CongestionZone"
url="jini://*/*/CongestionZone"
lookup-groups="${LOOKUPGROUPS}"
/>
<os-core:giga-space id="CongestionZoneGigaSpace"
space="CongestionZone"
/>
...
</spring:beans>
<os-eventcontainer:endpoint name="handleChargePaymentCongestionZoneEndpoint"
address="os-eventcontainer://CongestionZoneGigaSpace"
/>
|
Essentially this is defines a single Endpoint in the Congestion Zone. The deployer will need to define some more configuration (like that shown below) that
references this Endpoint as the Outbound Endpoint.
<service name="MessageHandleChargePayment">
<inbound>
<inbound-endpoint ref="<some-queue-reference>" />
</inbound>
<outbound>
<pass-through-router>
<outbound-endpoint ref="handleChargePaymentCongestionZoneEndpoint" />
</pass-through-router>
</outbound>
</service>
|
Those watching carefully will have noticed that the generated code that listens on this Endpoint is actually a Polling Container waiting for Vehicle object.
The Polling Container is not defined in this configuration because it is within the Polling Container code that we need to start the transaction.
The Sender models an Outbound Message; from Mule's point of view there will be a start (Inbound) Endpoint and a finish (Outbound) Endpoint to handle
this message.
So, the generated Mule configuration for the Sender is actual (from Mule's viewpoint) the Inbound Endpoint of the message. This configuration is shown below
<spring:beans>
<os-core:space id="CongestionZone"
url="jini://*/*/CongestionZone"
lookup-groups="${LOOKUPGROUPS}"
/>
<os-core:giga-space id="CongestionZoneGigaSpace"
space="CongestionZone"
/>
<!-- SpaceToClient -->
<os-events:polling-container id="sendReceiptdataProcessorPollingEventContainer"
giga-space="CongestionZoneGigaSpace"
receive-timeout="60000"
>
<os-events:receive-operation-handler>
<spring:bean class="com.cloudtran.util.CTSingleTakeReceiverOperationHandlerForMule">
<spring:property name="nonBlocking"
value="true"
/>
<spring:property name="nonBlockingFactor"
value="10"
/>
</spring:bean>
</os-events:receive-operation-handler>
<!-- Template object for the polling container -->
<os-core:template>
<spring:bean class="com.cc.zone.ReceiptBean"
>
</spring:bean>
</os-core:template>
</os-events:polling-container>
</spring:beans>
|
In this case this defines an Endpoint which, like the Receiver, is listening in the Congestion Zone for a ReceiptBean. The Polling Container in this
case is written in the configuration and Mule will use this to take the ReceiptBean out of the space and put it on the message queue.
The deployer will need to provide some further configuration to handle the Outbound Endpoint of the message queue. An example is shown below
<service name="SpaceToClient">
<inbound>
<inbound-endpoint ref="sendReceiptCongestionZonePollingContainerEndpoint" />
</inbound>
<outbound>
<pass-through-router>
<!-- This is a queue which will be read by the receiver of the message -->
<outbound-endpoint ref="<some-queue-reference>" />
</pass-through-router>
</outbound>
</service>
|
The aim of generating this configuration is to aid the deployer.
The deployer can include the mule-generated.xml in their own configuration, but
remember any change to the Receiver or Senders within the model may result in this file changing.
3.5.7 Mule Configuration Properties
|
The deployment and configuration of the application is the same as described in the Chapter 5 and Chapter 7. There are however a number of additional
configuration options which may be provided for the Mule ESB. They can be added to the config.properties of the Application, Deployment-Option or the PU Deployment-Option,
with the same overriding structure as the current system (PU Deployment-Options override Deployment-Options which override Application options).
- muleQueueProvider
- muleEmbedded
- muleConfig(optional)
Taking the configuration options in turn:-
The muleQueueProvider option is used to indicate that Mule should use a Space to hold the Queues. The muleQueueProvider therefore needs to
point to either a Space or a ProcessingUnit. If a ProcessingUnit is chosen then the first space in the PU is the space used by Mule to hold its
Adding the muleQueueProvider adds some additional configuration to the mule-generated.xml file, which for this example is shown below. The os-queue:endpoints
are peers of the os-eventcontainer:endpoints.
<os-queue:endpoint name="handleChargePaymentMuleInputQueue"
path="handleChargePaymentMuleInputQueue"
connector-ref="queueConnector"
/>
....
....
....
<os-queue:endpoint name="sendReceiptMuleOutputQueue"
path="sendReceiptMuleOutputQueue"
connector-ref="queueConnector"
/>
|
The muleEmbedded property indicates that Mule should be started in a PU. Therefore the PU set in the property must exist in the
model. It can be started in an existing PU. However if you wish for it to be started in a completely separate PU,
then a new PU must be modelled. In this example the muleEmbedded points to the DVLA PU. Setting the muleEmbedded adds additional
configuration to the pu.xml of the chosen PU.
<bean class="com.cloudtran.shared.CTSpacesMuleContextLoader"
init-method="start">
<property name="location"
value="..path to mule files"/>
</bean>
|
By default when the application is generated all the files what are in the main project directory and matches with the pattern mule*.xml are considered mule
configuration files and automatically will be put into the puDirectory/META-INF/spring path. Where the puDirectory is the processing unit directory where the
mule server will run as an embedded server( defined by the muleEmbedded property). The embedded mule server will will be started up with those files.
It is possible to provide the the mule configuration file location at startup using the -DmuleConfig property. If several files defined with the muleConfig property
than the pathes should be comma separated e.g.:
-DmuleConfig=/mule-generated.xml,/muleConfig1.xml,/muleConfig2.xml
|
If the developer uses this property than the files in the main project directory with the pattern mule*.xml will not be used(but still will be copied).
Notice
The files with the mule*.xml pattern are copied from the main project directory to the puDirectory/META-INF/spring directory, but first any file what matches
that pattern in the puDirectory/META-INF/spring are deleted.
|
 |
|
 |
Copyright (c) 2001-2011 CloudTran Inc.
|