Distributed Timestamps for MVCC

Timestamps are important for CloudTran MVCC (details on MVCC in the previous post). They are recorded at transactions managers (a) when a transaction starts and (b) when it commits.

As noted in that post, timestamps are typically used to get consistent reads: the start time of a reading transaction is compared to the commit time of the transactions that committed values into the cache. They can also be used for strict demarcation of transactions, as discussed later.


Time has to be coordinated across the grid: in other words, we need distributed time synchronization, encompassing many machines and JVMs. We are assuming the grid is on a LAN (not WANS). We’ll come back to WANs in a later post.

Out of the box, CloudTran provides two plug-ins for synchronizing time; you can add your own too (e.g. if you have a 1588/PTP driver).

The obvious method for time sync uses a singleton service that returns its local time (i.e. System.nanoTime() in Java). Time therefore runs at the rate of the crystal on the machine hosting the singleton service. In the case of failover, a replacement node takes over the service.

The singleton approach is easy to understand and gives you definitive ordering of events… but at the expense of two RPC calls per transaction. This adds significant latency – a couple of milliseconds per transaction on a GigE network – and also adds extra load to the machine running the singleton service, which can become a bottleneck and limit scalability.

The other approach is to estimate time on each transaction manager JVM (CloudTran normally runs many transaction managers). This polls the singleton service a few times a second but, rather than use the timestamps directly, interposes a service thread that deduces the global time. The main challenges in this calculation are:

  • differences in the frequencies of the crystals on the machines – 1-100 ppm (parts per million) accuracy are common between different types of computers
  • temperature variations, which changes the generated frequency. Although this effect is likely to be much smaller than the base frequency effect, it means you can never stop the service because the master frequency can change
  • the delays introduced by the networking stack, OS scheduling and Java GC’s.

The distributed time service works as follows:

  • get the timestamp ‘globalTime’ from the singleton service. We also get the local time before and after. The difference between the before and after times is the Round-Trip Time (RTT); the average of these two gives us ‘myTime’ on the local JVM. Putting them together, we have a {myTime, globalTime} data point.
  • by collecting lots of these {myTime, globalTime} points, we can deduce a straight-line graph
         globalTime = m * myTime + c
  • when we need to calculate a time, get myTime from nanoTime() and use the equation to calculate globalTime.

There are many details to getting an accurate estimate, of which the main ones are:

  • longer distance on the X axis reduces the variability (just like long wheel-base cars); 30 minutes is a typical collection length, but time is good enough for MVCC in a few minutes.
  • better estimates of time come from shorter round trips, so we filter out timings with longer roundtrips
  • if we did this filtering across the board, it would result in throwing away long sequences of readings, because roundtrip times go into quick and slow phases (i.e. get better or worse over 10’s of seconds). If these chopped readings were at the beginning (v. likely) or end, we reduce the X axis length and therefore accuracy. To avoid this, we break the graph into (e.g.) 10 pieces and do the filtering on each piece individually
  • we use a least-squares linear regression to get y=mx+c, using 128-bit arithmetic (those nanoseconds are important!)

The net result of this approach is that time can be served locally to an accuracy of c. +/-10 microsecs on GigE networks, or 20 microseconds variance between two nodes. The crucial thing for our discussion is that this is much less than the round-trip time (which, on our GigE network, within Coherence is greater than 700 microsecs). Between nodes on the same physical machine, the distributed time accuracy was +/-4 microsecs (or 8 microsecs variance) and the round-trip time was >200 microsecs.

Apart from their use in MVCC, these timestamps are useful for distributed logs because they are cheap (getting a time from the distributed service takes around 300ns on one core of a commodity Linux machine) and their accuracy (<< RTT time) means they can be used to deduce the order of events for a multi-node operation.

Application in MVCC

Distributed timestamps are good enough for use in MVCC. How can that be?

First, let’s address the easy one – timestamps for reading. How do we get a consistent view of some few objects, or indeed the whole grid, for example to get the units sold and the quantity to order for each product? Regardless of whether the transaction is distributed, the key to consistent reporting is to use transactions. They guarantee atomic and consistent state changes. If a particular set of objects is being changed at the time the read transaction starts, the new values should be used if they are committed before the start time; in other words, this boils down to ‘read committed’. For a grid-wide report, all the transactions must have completed; this can be guaranteed by starting a transaction with an exceptionally long timeout, waiting for the normal maximum transaction timeout, then reading the values.

For most business requirements, whether the actual timestamp from our distributed time server is highly accurate is not important – it is not directly related to the time of day anyway. The business will be more concerned that the report gets run, and then that it gets run on time. And consistency is served by having a “happens-before” metric – to decide whether the commit happened before the ask (read transaction) time. So our accuracy of a few microseconds is quite adequate for this.

Now let’s consider how to implement the update policy that prevents against write-write conflicts (e.g. eCommerce) or read-write conflicts (e.g. moving money around).

As noted in the previous post, there are two lists attached to each entry in the cache: a “pending list” holding the updates (and reads, sometimes) that may be committed by the transaction; and a committed list, with the new value and timestamp information for each commit.

CloudTran can immediately fail a transaction’s proposed update/read if another transaction already has a conflicting entry in the list – it doesn’t need timestamps to figure this out. But, when there are delays or long-running transactions and an overlapping transaction has already committed, we need to compare timestamps to check for conflict.

For example, here are some events (ordered by global time!) which interact at cache Entry X:

Start Transaction{A}
Read Entry X{A}
Start Transaction{B}
Update Entry X{B}
Commit Transaction{B}
Update Entry X{A}

If the policy for Transaction A prevents write-write conflicts, CloudTran must fail Update Entry X{A}, because the transactions overlap – Update Entry X{B} has occurred during transaction A. For example, in an eCommerce application, Entry X could represent all orders for a given customer. If we let Update Entry X{A} through, when Transaction A commits we will lose the changes from Transaction B (e.g. we’d lose the orders added by it).

The point of this example is that transaction B’s update will now be in the committed list, so CloudTran must also examine the timestamps of entries in the committed list to check for conflict. Diagrammatically, the conflict arises as follows:

Read Entry X{A} Update

So the check for a conflict in this case boils down to the test

     GlobalTime(Start Transaction{A}) < Global Time(Commit Transaction{B})

If the timestamps were inaccurate enough, we could have the situation where

     GlobalTime(Commit Transaction{B}) < Global Time(Start Transaction{A})

incorrectly implying the following order – the two transactions did not overlap:

Read Entry X{A} Update

So, how accurate do distributed timestamps have to be to guarantee that the conflict will be detected?

The answer is, much less than two roundtrip times. This is caused by the backups that need to be written for Start Transaction{A} and Update Entry X{B}. The detailed (tedious, even) derivation of this result is given below.

The case described above is the nastiest edge case, and gives quite a relaxed requirement on distributed time. Although this has been developed for CloudTran-Coherence, the general approach should be applicable to any LAN-connected system needing ordering/consistency where backups (and the delays they bring) can be assumed.

Derivation of “Two Roundtrip Times” result

We need to establish the minimum time between Time(Start Transaction{A}) and Time(Commit Transaction{B}), so we can be sure we don’t get these the wrong way round using distributed time across different nodes.

Start Transaction{A} Occurs on Tx A’s manager node, possibly (but not necessarily different) from manager B node and primary data cache node.
Timestamp taken at start.
Elapsed time: 1 RTT to backup.
Happens before Read Entry (because it needs the transaction).
Read Entry X{A} Occurs on the data node holding X.
Elapsed time may be negligible if on same node as client.
Must happens before the Update, because on Coherence accesses to the cache lock the entry; and after that the update is present.
(If we don’t do the Read at this point, then the read gets the updated value and we don’t have a conflict.
Update Entry X{B} Occurs on the data node holding X.
Happens after Read Entry X{A} (because the Read would fail if checking for write conflicts and UpdateEntry X{B} were present) and Read Entry X{B}. This orders transactions A and B.
Elapsed time: 1 RTT to backup.
Happens before Commit Transaction{B}.
Commit Transaction{B} Occurs on Tx B’s manager node.
Happens after Update Entry X{B}.
Elapsed time: negligible – timestamp taken at start of commit processing.

There will be other roundtrips in typical transactions, but for this analysis we’ve ignored them. All the steps here have a happens-before ordering, with an absolute minimum of 2 RTTs delay. These RTT’s will be between physical machines, because that is the safe deployment for backups. So, for distributed timestamps to cause us to deduce the wrong order, the inaccuracy would have to be greater than 2 RTTs. As the method has an accuracy of 1/25 or 1/30 of an RTT, this is impossible.

MVCC For Coherence – GA!

We’ve just released the GA version of CloudTran V2.0 for Oracle Coherence. The main feature of this release is MVCC (Multi-Version Concurrency Control), which is the focus of this post.

The new release also includes the following features, discussed in later posts:
      distributed time synchronization in support of MVCC
      transactional replication between data centers

CloudTran is an add-on transaction management product for data grids (e.g. Coherence, GigaSpaces) that provides high-performance ACID transactions across arbitrary numbers of clients, servers and data values. The aim is to simplify development in applications that need consistency, durability or strong ordering semantics with minimal programming effort, which is achieved by providing a transactional wrapper for a data grid cache. It uses the underlying grid features to provide scalability of the transaction manager service, so the complete system (transactions and processing/storage) can scale up and down to meet the expected demand. Transaction managers also support failover, which avoids the “unresolved transaction” problem.

Transactions are optionally durable. So, if you want a cache-only transaction facility, CloudTran works quite happily. If (maybe later in a product’s life) you want to add persistence of grid data to datastores, you can without changing existing code.

Previously, CloudTran has provided pessimistic locking which allows multiple readers of committed values outside of a transaction, but only one active transaction per object. This supports two use cases:

  • pessimistic writing, guaranteeing isolation and deadlock protection
  • consistent reads (albeit with locking).

The problem with pessimistic locking is the need to effectively lock entries. This slows down simple reads and in some use cases, like financial risk, can cause an unacceptable percentage of transactions failing due to lock conflicts.

MVCC transactions avoid locking problems, and is therefore preferred, for these use cases:

  • reporting: read a set of values from different nodes as of a point in time
  • volatile pricing: continuous update of hot values, like the price of a financial instrument or a bet
  • mostly-read, one writer: prevent simultaneous updates of a given value, for example in eCommerce.

There is also a mode available to prevent the ‘missed update’ problem (aka write skew anomaly – see http://en.wikipedia.org/wiki/Snapshot_isolation) which should be used for moving money around. This is very similar to the strong isolation provided by pessimistic locking.

The current implementation does not maintain indexes that are aware of transactional values, so inconsistencies are possible. For example, a transactional update followed by a search using an index will be inconsistent. Similarly, starting a transaction and then doing a search will retrieve the appropriate value based on the last committed value, rather than the value as of the start time of the transaction. We have yet to see a compelling use-case that justifies the additional overhead and complexity of supporting MVCC-aware indexes. (No doubt we will hear of one soon.)

The data to support MVCC is held in parallel with the entry in the data grid (e.g. in Coherence, using a decoration), meaning that non-transactional readers see the latest committed value. The transactional data for each entry under MVCC consists of

  • a list of committed values and their commit time, so a transactional read can retrieve the committed value appropriate to the transaction (i.e. the newest value committed by another application, as long as it was committed before the start of this transaction)
  • a list of pending updates, identifying the transaction and the update to perform. When the transaction is secured against missed updates, a ‘read’ entry is also placed into this list, to prevent it being written by another transaction.

For a transactional read, CloudTran first examines the updates list to see if there is an existing value for this transaction; this means that multiple clients can use the cache to coordinate. If there is no update for this transaction, CloudTran uses the list of committed values and returns the latest one committed before the reading transaction started; this provides snapshot isolation on reads. For example, if values were committed at 11:00, 11:01 and 11:03 and the reading transaction started at 11:02, then the 11:01 value is returned. Finally, if the parallel information is missing or does not yield an appropriate value, the non-transactional value is returned.

Therefore, “read your own writes” is guaranteed; and a client never sees another transaction’s dirty value.

For an update, the transaction manager applies the current update policy (e.g. allow concurrent updates) and if the update is allowed, adds a pending update. Note that this approach means that writing the new value is combined with the Prepare of two-phase commit: by the time all the updates have successfully been done, the only thing left to do is to commit.

It is the transaction manager’s responsibility to

  • remove pending updates for a transaction if the transaction aborts and
  • abort transactions if a client fails.

If a node fails while managing a transaction, another node will become the owner of that transaction and take responsibility for completing the transaction lifecycle (unless the whole grid crashes – in which case we have bigger problems). This avoids the problem of pending updates never getting resolved because a server machine crashed.

The parallel MVCC data needs to be garbage collected. First, as described so far, committed values will just get added into the list and never be deleted. In reality, they are only useful up to the timeout point for the longest-lived transaction. While there are ways to determine this, for now we use a pragmatic approach: use a grid-wide definition of the maximum allowable transaction time. This means that the committed values can be garbage-collected at every access to the MVCC information if they are obsolete.

Updates will also need to be garbage collected if the client crashes before committing or aborting a transaction with some updates placed directly in the cache. Every transaction has a timeout, and that is recorded in the transaction information in the update list; the timeout is used to lazily garbage collect obsolete updates.

Finally, a word on the overhead of using transactions. Distributed transactions have a reputation for being slow because of the overhead of two-phase commit. CloudTran minimises this overhead by being a member of the grid, so it can take advantage of the grid topology and the operations being performed. We mentioned above the technique of combining the ‘Prepare’ with the cache write; this reduces the overhead per changed value to one backed-up write (i.e. to do the ‘Commit’). The other aspect to consider is the cost of calls to the manager to start, commit and complete the transaction; CloudTran can also reduce (but not entirely eliminate) the overhead here, given the appropriate deployment structure.

CloudTran V2 – MVCC and Transactional Replication

Previously on Coherence we’ve supported TopLink Grid and a cache-level API with Pessimistic Locking.  For the 2.0 release, we’ve added two major new features to CloudTran – while continuing to support the previous features.

First,  MVCC (Multi-Version Concurrency Control) is added as an alternative to Pessimistic Locking on the cache-level API.  It supports:

-        consistent reads across multiple distributed cache entries or services.  This effectively gives a snapshot, with all the values being consistent with the time the reading transaction started.  For example, if you start a transaction at 9:01 and you want to read the price of an item which was updated at 9:00 and 9:02, you will get the 9:00 price – being the correct value as of 9:01.

-        multiple simultaneous updates.  If you want to calculate a new price and note it as “the current price” regardless of whether other threads are also calculating the price, then you can turn off update checking – so the price can be continuously updated, using a consistent snapshot of input values for each calculation.

-        non-conflicting updates.  You can specify write conflict checking – i.e. no intervening write can have occurred on any cache entry changed in a transaction.  You can also do the same, but for any entry read by the transaction as well; this is the MVCC equivalent of pessimistic locking and prevents a “write skew anomaly”.

Second, CloudTran 2.0 introduces transactional replication between data centers.  The key here is “transactional” across multiple cache entries (in different partitions); if you don’t need this, the Coherence push replicator or GoldenGate will make more sense depending on your system of reference.

By doing transactional replication, you know that the secondary grid will be consistent – updates are done atomically – so if the primary grid goes down, you can switch over to the secondary grid immediately the outage is detected.

One of the big issues in replication is the time to send to the other data center, which is typically 100ms or more.  It doesn’t make sense in a grid environment to wait that long, so what the CloudTran replicator does is to save the transactions to SSD at the sending data centre, and then relays them to the other grid while the transaction carries on locally.

Writing to SSDs only takes 13ms from our Java code, which is likely to be acceptable in most applications – to get the benefit of knowing the transaction will be replicated eventually (even if the replicator node or data centre goes down).  The replicator supports failover and dual-ported SSD drives, avoiding a single point of failure – it’s like a message appliance, but built on Coherence.

Data center replication for less than you thought…

As we work with development teams on building out data grid architectures, a common requirement has surfaced, which is to provide backup to a remote data center, supporting Disaster Recovery (DR) and High Availability (HA) for the complete application. With more than one source of data, consistency and synchronization between the data centers are important issues — but even more important is the requirement to avoid data loss during replication.

Recently, we’ve been under development with our new CloudTran Replicator Service. This  addresses the requirement above using an active-passive configuration: one data center is primary, the other is backup. It may be possible to organize the application into separate domains (where transactions only involve data from one domain), which allows an active-active configuration: one data center is primary for half of the data set and backup for the other half.

Our approach makes transactions on the order of 10 to 100 times faster than typical “data center backup” approaches that reconcile transactional inconsistencies using a distributed voting procedure. This clearly impacts the response time to the user. Less obvious is impact on server capacity: reducing the length of time for a transaction reduces the amount of information that must be tracked in the server.

Ultimately, the CloudTran approach to data center replication results is less cost to buy and run servers… a great thing for most enterprises these days.  For those with more than a casual interest, many of the technical details of our replicator approach can be found in this newly published white paper.

VoltDB and Millions of Operations Per Second … So?

I’ve been looking at VoltDB recently, and thinking about comparisons with Coherence (and CloudTran of course, although what follows is purely about Coherence).

The first thing that hits you about VoltDB is the huge performance numbers. For example, this piece about Node.js and VoltDB quotes 675,000 transactions/sec on 8 clients and 12 servers. As each transaction did three reads and one write, this amounted to 2.8 million data operations per second. Each of the 12 servers had 8 cores for a total of 96 server cores, which means the test ran at just under 30,000 data operations per server core per second.

When you talk about databases, there is an assumption of durability – you assume that the data is getting saved to permanent storage. However, VoltDB is an in-memory database, and in fact in the above piece, the headline test was done with “k-factor=0” – which means, no backups at all (and presumably no persistent storage, as disk or SSD wasn’t mentioned). In other words, this is a lot like a data cache – you send a request to the server, cleverly routed to the correct partition, do some calculations with the targetted value plus related data, then store the result. Which raises the question – if you do this sequence of operations in a data cache, do you get similar performance?

To test this out, we loaded up some reference data, then sent a request (in Coherence, an entry processor) which read the targetted value, read two items of reference data, and then stored a computed result. We only had 7 ropey old development boxes available, so we wrote the application so that both client and server applications ran in each JVM, with the client load spread equally across the servers.

The bottom line was that our Coherence rig was more that 1.5X faster than VoltDB *per core* – 44,000 data operations per second versus 29,000 – even though the Coherence test was running client and server on the same machine.

For good measure, we tried “k-factor=1” – i.e. one backup – and got 28,300 operations per second, which is very close to VoltDB with no backups.

You’ll have to forgive me for not publishing more detailed information, but we’d need to get Oracle’s permission to publish a ‘benchmark’ – and this wasn’t a proper benchmark, just a few hours of running through a few scenarios and pretty much straight out of the box config. However, if you do want more detail, you can run it yourself: I’ve attached the Eclipse project and scripts for server-side deployment here… CoherencePerformance.zip


the point of this article is not the numbers themselves, but to point out that a data grid like Coherence and an in-memory database are likely to give very similar performance when they are doing relatively simple operations because it’s all the rest of the systems that cost the time. In fact, it is uncanny how similar these numbers are to tests we did on another IMDG a few years ago on the same equipment.

In other words, VoltDB may be superfast compared to database systems that store information to disk, but not compared to data grids storing all information in memory.

I hope I remember this now…

If Coherence is giving you an error message like:

    This cluster node failed to deserialize the config
    java.io.IOException: message contains more data then expected

Assuming you use the same cache-config etc., the answer is

    Member 1 has tangosol.pof.enabled=true.
    Member n has tangosol.pof.enabled=false.

Only the third time I’ve made this mistake.  This is not as stupid as it may seem – the new low-level CloudTran API will add Spring and AspectJ to Coherence, EclipseLink and TopLink Grid so it’s easy to overlook one line of config.

The root cause of this error is probably that you have asked Coherence for a cache before you have read the CloudTran config (e.g. when copy-and-pasting JUnit test suites … easy to end up skipping init code).  Try setting a breakpoint when you initialize the program and see if the CloudTran config properties trace (search for “loading config.properties”).  If not, reorganise so you initialize the CloudTran config properties first.  This will then set tangosol.pof.enabled=true, which is required by CloudTran.