Skip to main content

Camel-Elasticsearch: create timestamped indices

One nice feature of the logstash-elasticsearch integration is that, by default, logstash will use timestamped indices when feeding data to elasticsearch.

This means that yesterday's data is in a separate index from today's data and from each other day's data, simplifying index management. For instance, suppose you only want to keep the last 30 days:

elasticsearch-remove-old-indices.sh -i 30

The Apache Camel Elasticsearch component provides no such feature out of the box, but luckily it is quite easy to implement (when you know what to do. /grin ).

As a matter of fact, it is enough to define the proper header on the message and the elasticsearch component will then use that header as the index name. Unfortunately this is not documented anywhere, but it can be understood by looking at the source. Once again: use the source, Luke.

So, let's suppose the route is something as simple as:

        <route autostartup="true" id="processMirthMessages-route">
            <from uri="sql:{{sql.selectMessage}}?consumer.delay=5000&consumer.onConsume={{sql.markMessage}}">
            <to uri="elasticsearch://mirth?operation=INDEX&indexType=mmsg">
        </to></from></route>

Then all that is needed to is to define a content enricher bean as follows:

        <route autostartup="true" id="processMirthMessages-route">
            <from uri="sql:{{sql.selectMessage}}?consumer.delay=5000&consumer.onConsume={{sql.markMessage}}">
            <bean method="process" ref="eSheaders">
            <to uri="elasticsearch://mirth?operation=INDEX&indexType=mmsg">
        </to></bean></from></route>

The bean is also pretty simple (imports omitted for brevity):

public class ESHeaders {
    public void process(Exchange exchange) {
        Message in = exchange.getIn();
        DateFormat df=new SimpleDateFormat("YYYY.MM.dd");
        in.setHeader(ElasticsearchConfiguration.PARAM_INDEX_NAME, "mirth2-"+df.format(new Date()));
    }
}

Update: get timestamp index name from the message itself.

If the data to be indexed contains, as it should, a @timestamp field then the content enricher bean can be imrproved to use it as follows:

public void process(Exchange exchange) {
        Message in = exchange.getIn();
        String indexName=null;
        DateFormat df=new SimpleDateFormat("YYYY.MM.dd");
        try {
            Map body = (Map) in.getBody();
            if(body.containsKey("@timestamp")) {
                logger.trace("Computing indexName from @timestamp: "+body.get("@timestamp"));
                indexName = "mirth2-"+df.format((Date) body.get("@timestamp")));
            } else {
                indexName = "mirth2-"+df.format(new Date()));
            }
        } catch(Exception e) {
            logger.error("Cannot compute index name, failing back to default");
            indexName = "mirth2-"+df.format(new Date()));
        }
        in.setHeader(ElasticsearchConfiguration.PARAM_INDEX_NAME, indexName);
    }

Comments

Popular posts from this blog

Indexing Apache access logs with ELK (Elasticsearch+Logstash+Kibana)

Who said that grepping Apache logs has to be boring?

The truth is that, as Enteprise applications move to the browser too, Apache access logs are a gold mine, it does not matter what your role is: developer, support or sysadmin. If you are not mining them you are most likely missing out a ton of information and, probably, making the wrong decisions.
ELK (Elasticsearch, Logstash, Kibana) is a terrific, Open Source stack for visually analyzing Apache (or nginx) logs (but also any other timestamped data).

From 0 to ZFS replication in 5m with syncoid

The ZFS filesystem has many features that once you try them you can never go back. One of the lesser known is probably the support for replicating a zfs filesystem by sending the changes over the network with zfs send/receive.
Technically the filesystem changes don't even need to be sent over a network: you could as well dump them on a removable disk, then receive  from the same removable disk.

A not so short guide to ZFS on Linux

Updated Oct 16 2013: shadow copies, memory settings and links for further learning.
Updated Nov 15 2013: shadow copies example, samba tuning.

Unless you've been living under a rock you should have by now heard many stories about how awesome ZFS is and the many ways it can help with saving your bacon.

The downside is that ZFS is not available (natively) for Linux because the CDDL license under which it is released is incompatible with the GPL. Assuming you are not interested in converting to one of the many Illumos distributions or FreeBSD this guide might serve you as a starting point if you are attracted  by ZFS features but are reluctant to try it out on production systems.

Basically in this post I note down both the tought process and the actual commands for implementing a fileserver for a small office. The fileserver will run as a virtual machine in a large ESXi host and use ZFS as the filesystem for shared data.