Vetted - tweaking importer performance

Since the last post, I’ve started working on an importer to load data from the existing Access database. Work to date is on GitHub.

In the current domain model, there is a single aggregate root, the Client. The importer is written as a command line application which interacts directly with the domain, assuming an empty database (I might get to incremental imports in the future). At a high level, the importer currently:

  • Creates the clients
  • Adds any existing ‘notes’ about the client
    • Notes are freeform text about a client, unrelated to any particular patient or transaction
    • The existing application is a little limited in what can be entered into the main form, so notes have been used to make up the slack (e.g. in the existing data, there are numerous clients which have an email address or fax number in the notes field, as there is no first class input for these values)
  • Adds home and mobile phone numbers
  • Adds the ‘most common travel distance’ as a note

These steps are visible in the implementation of the importer:

override fun run(vararg args: String?) {
    val rows = accessDb.clientTableRows

    val newClientIds: Map<String, UUID> = createClients(
        rows,
        accessDb::postCodeFor,
        accessDb::stateFor,
        commandGateway
    )

    allOf(
        addClientNotes(rows, newClientIds, commandGateway),
        addPhoneNumbers(rows, newClientIds, commandGateway),
        addMostCommonDistance(rows, newClientIds, commandGateway)
    ).get()
}

First, clients are created, producing a Map of the old client ID to the new client ID. Once all clients have been created, all the other updates are applied (potentially concurrently).

Without giving away too much information about the existing data, the order of magnitude of the existing number of clients is 3, and the total number of events generated with the current importer implementation is at most 5x the number of clients (one ClientMigratedEvent, up to two ClientNoteAddedEvents and up to two ClientPhoneNumberAddedEvents).

My first pass at the importer was taking around 80 seconds to import everything into a PostgreSQL database. I know that premature optimization is the root of all evil, and that I don’t have anything resembling a working product at the moment, but this seemed far too high. Also, it was impacting my ability to iterate quickly with ‘production’ data, which is enough of a reason to look for improvements.

After looking at the generated schema and doing some sampling with VisualVM, I decided there were three options to investigate:

  • Asynchronous processing of commands
  • Serialisation format changes
  • Generated schema changes

In order to compare a full run of the importer pre and post optimisations, I want to be able to toggle the optimisations on/off from the command line. The following script has the toggle properties in place, and in the sections below I will use Spring config management to read these properties.

PASSWORD=$(uuidgen)

docker stop vetted-postgres ; docker rm vetted-postgres

docker run \
    --publish 5432:5432/tcp \
    --name vetted-postgres \
    --env POSTGRES_PASSWORD=$PASSWORD \
    --detach \
    postgres

./gradlew build

java \
    -jar importer/build/libs/vetted-importer-0.0.1-SNAPSHOT.jar \
    --axon.use-async-command-bus=false \
    --axon.use-cbor-serializer=false \
    --spring.jpa.database-platform=org.hibernate.dialect.PostgreSQL95Dialect \
    --spring.datasource.password=$PASSWORD

The script above will allow me to evaluate the impact of any changes I make in a repeatable fashion.

Option 1 - Asynchronous processing of commands

I’m using the Axon framework, which handles a lot of the plumbing of building an application based on DDD & CQRS principles. By default when using the Spring auto-configuration, a SimpleCommandBus is used which processes commands on the calling thread.

I added some configuration to use a AsynchronousCommandBus with a configurable number of threads:

@Bean
@ConditionalOnProperty(
    value = ["axon.use-async-command-bus"],
    matchIfMissing = true
)
fun bus(
    transactionManager: TransactionManager,
    @Value("\${axon.command-bus.executor.pool-size}") poolSize: Int
): CommandBus {
    val bus = AsynchronousCommandBus(
        Executors.newFixedThreadPool(poolSize)
    )

    val tmi = TransactionManagingInterceptor(transactionManager)
    bus.registerHandlerInterceptor(tmi)

    return bus
}

I initially tried this configuration out with a pool size of 10. This reduced the time for the import to around 30 seconds, which is an improvement from 80 seconds but short of an order of magnitude improvement which should be possible. This led me to believe that there was either contention somewhere else, or that some of the constant factors are just too high at the moment.

Option 2 - Serialisation format changes

By default, Axon will use XStream to serialise events, which uses an XML representation. XML is quite verbose, and the Axon documentation even suggests using a different serializer.

Overriding the serializer is thankfully quite easy:

@Primary
@Bean
@ConditionalOnProperty(
    value = ["axon.use-cbor-serializer"],
    matchIfMissing = true
)
fun serializer(): Serializer {
    val objectMapper = ObjectMapper(CBORFactory())
    objectMapper.findAndRegisterModules()
    objectMapper.setSerializationInclusion(NON_ABSENT)
    return JacksonSerializer(objectMapper)
}

I opted for using Jackson with a ‘Concise Binary Object Representation’ (CBOR) JsonFactory. This resulted in a ~70% reduction in size for the serialized payload for most events. With XML:

postgres=# select avg(length(loread(lo_open(payload::int, x'40000'::int), x'40000'::int))) from domain_event_entry;
     avg
--------------
 433.69003053

and with CBOR:

     avg
--------------
 111.54379774

This didn’t have a huge impact on the run time of the importer, but is still a worthwhile optimisation.

Option 3 - Generated schema changes

You may have noticed in the SQL statments above that the current schema is using the PostgreSQL large objects functionality. From the PostgreSQL docs:

PostgreSQL has a large object facility, which provides stream-style access to user data that is stored in a special large-object structure. Streaming access is useful when working with data values that are too large to manipulate conveniently as a whole.

If we inspect the schema that’s being generated:

postgres=# \d domain_event_entry
     Table "public.domain_event_entry"
      Column      | Type | Nullable | Default
------------------+------+----------+---------
 meta_data        | oid  |          |
 payload          | oid  | not null |
 ...

The oid type here is an object identifier - a reference to a large object which is stored externally from the table. The events we’re writing are small enough that the overhead of reading them as separate streams is hurting performance rather than helping.

At least two people have had the same issue when using Axon with PostgreSQL, as evidenced by the questions on Google Groups and StackOverflow. The suggestion to customise the PostgreSQL dialect used by Hibernate seems to work, and further reduced the runtime to around 8 seconds.

Conclusion

Based on my very rough benchmarking, the three changes above have reduced the run time of the importer from around 80 seconds to 8 seconds. The code is all at the link above, and the optimisations are on by default.

There is surely more that can be done to improve performance, but that’s fast enough for now!