Caching Elasticsearch Aggregation Results in loklak Server

To provide aggregated data for various classifiers, loklak uses Elasticsearch aggregations. Aggregated data speaks a lot more than a few instances from it can say. But performing aggregations on each request can be very resource consuming. So we needed to come up with a way to reduce this load.

In this post, I will be discussing how I came up with a caching model for the aggregated data from the Elasticsearch index.

Fields to Consider while Caching

At the classifier endpoint, aggregations can be requested based on the following fields –

  • Classifier Name
  • Classifier Classes
  • Countries
  • Start Date
  • End Date

But to cache results, we can ignore cases where we just require a few classes or countries and store aggregations for all of them instead. So the fields that will define the cache to look for will be –

  • Classifier Name
  • Start Date
  • End Date

Type of Cache

The data structure used for caching was Java’s HashMap. It would be used to map a special string key to a special object discussed in a later section.

Key

The key is built using the fields mentioned previously –

private static String getKey(String index, String classifier, String sinceDate, String untilDate) {
    return index + "::::"
        + classifier + "::::"
        + (sinceDate == null ? "" : sinceDate) + "::::"
        + (untilDate == null ? "" : untilDate);
}

[SOURCE]

In this way, we can handle requests where a user makes a request for every class there is without running the expensive aggregation job every time. This is because the key for such requests will be same as we are not considering country and class for this purpose.

Value

The object used as key in the HashMap is a wrapper containing the following –

  1. json – It is a JSONObject containing the actual data.
  2. expiry – It is the expiry of the object in milliseconds.

class JSONObjectWrapper {
    private JSONObject json; 
    private long expiry;
    ... 
}

Timeout

The timeout associated with a cache is defined in the configuration file of the project as “classifierservlet.cache.timeout”. It defaults to 5 minutes and is used to set the eexpiryof a cached JSONObject –

class JSONObjectWrapper {
    ...
    private static long timeout = DAO.getConfig("classifierservlet.cache.timeout", 300000);

    JSONObjectWrapper(JSONObject json) {
        this.json = json;
        this.expiry = System.currentTimeMillis() + timeout;
    }
    ...
}

 

Cache Hit

For searching in the cache, the previously mentioned string is composed from the parameters requested by the user. Checking for a cache hit can be done in the following manner –

String key = getKey(index, classifier, sinceDate, untilDate);
if (cacheMap.keySet().contains(key)) {
    JSONObjectWrapper jw = cacheMap.get(key);
    if (!jw.isExpired()) {
        // Do something with jw
    }
}
// Calculate the aggregations
...

But since jw here would contain all the data, we would need to filter out the classes and countries which are not needed.

Filtering results

For filtering out the parts which do not contain the information requested by the user, we can perform a simple pass and exclude the results that are not needed.

Since the number of fields to filter out, i.e. classes and countries, would not be that high, this process would not be that resource intensive. And at the same time, would save us from requesting heavy aggregation tasks from the user.

Since the data about classes is nested inside the respective country field, we need to perform two level of filtering –

JSONObject retJson = new JSONObject(true);
for (String key : json.keySet()) {
    JSONArray value = filterInnerClasses(json.getJSONArray(key), classes);
    if ("GLOBAL".equals(key) || countries.contains(key)) {
        retJson.put(key, value);
    }
}

Cache Miss

In the case of a cache miss, the helper functions are called from ElasticsearchClient.java to get results. These results are then parsed from HashMap to JSONObject and stored in the cache for future usages.

JSONObject freshCache = getFromElasticsearch(index, classifier, sinceDate, untilDate);
cacheMap.put(key, new JSONObjectWrapper(freshCache));

The getFromElasticsearch method finds all the possible classes and makes a request to the appropriate method in ElasticsearchClient, getting data for all classifiers and all countries.

Conclusion

In this blog post, I discussed the need for caching of aggregations and the way it is achieved in the loklak server. This feature was introduced in pull request loklak/loklak_server#1333 by @singhpratyush (me).

Resources

Caching Elasticsearch Aggregation Results in loklak Server

Using Elasticsearch Aggregations to Analyse Classifier Data in loklak Server

Loklak uses Elasticsearch to index Tweets and other social media entities. It also houses a classifier that classifies Tweets based on emotion, profanity and language. But earlier, this data was available only with the search API and there was no way to get aggregated data out of it. So as a part of my GSoC project, I proposed to introduce a new API endpoint which would allow users to access aggregated data from these classifiers.

In this blog post, I will be discussing how aggregations are performed on the Elasticsearch index of Tweets in the loklak server.

Structure of index

The ES index for Twitter is called messages and it has 3 fields related to classifiers –

  1. classifier_emotion
  2. classifier_language
  3. classifier_profanity

With each of these classifiers, we also have a probability attached which represents the confidence of the classifier for assigned class to a Tweet. The name of these fields is given by suffixing the emotion field by _probability (e.g. classifier_emotion_probability).

Since I will also be discussing aggregation based on countries in this blog post, there is also a field named place_country_code which saves the ISO 3166-1 alpha-2 code for the country of creation of Tweet.

Requesting aggregations using Elasticsearch Java API

Elasticsearch comes with a simple Java API which can be used to perform any desired task. To work with data, we need an ES client which can be built from a ES Node (if creating a cluster) or directly as a transport client (if connecting remotely to a cluster) –

// Transport client
TransportClient tc = TransportClient.builder()
                        .settings(mySettings)
                        .build();

// From a node
Node elasticsearchNode = NodeBuilder.nodeBuilder()
                            .local(false).settings(mySettings)
                            .node();
Client nc = elasticsearchNode.client();

[SOURCE]

Once we have a client, we can use ES AggregationBuilder to get aggregations from an index –

SearchResponse response = elasticsearchClient.prepareSearch(indexName)
                            .setSearchType(SearchType.QUERY_THEN_FETCH)
                            .setQuery(QueryBuilders.matchAllQuery())  // Consider every row
                            .setFrom(0).setSize(0)  // 0 offset, 0 result size (do not return any rows)
                            .addAggregation(aggr)  // aggr is a AggregatoinBuilder object
                            .execute().actionGet();  // Execute and get results

[SOURCE]

AggregationBuilders are objects that define the properties of an aggregation task using ES’s Java API. This code snippet is applicable for any type of aggregation that we wish to perform on an index, given that we do not want to fetch any rows as a response.

Performing simple aggregation for a classifier

In this section, I will discuss the process to get results from a given classifier in loklak’s ES index. Here, we will be targeting a class-wise count of rows and stats (average and sum) of probabilities.

Writing AggregationBuilder

An AggregationBuilder for this task will be a Terms AggregationBuilder which would dynamically generate buckets for all the different values of fields for a given field in index –

AggregationBuilder getClassifierAggregationBuilder(String classifierName) {
    String probabilityField = classifierName + "_probability";
    return AggregationBuilders.terms("by_class").field(classifierName)
        .subAggregation(
            AggregationBuilders.avg("avg_probability").field(probabilityField)
        )
        .subAggregation(
            AggregationBuilders.sum("sum_probability").field(probabilityField)
        );
}

[SOURCE]

Here, the name of aggregation is passed as by_class. This will be used while processing the results for this aggregation task. Also, sub-aggregation is used to get average and sum probability by the name of avg_probability and sum_probability respectively. There is no need to specify to count rows as this is done by default.

Processing results

Once we have executed the aggregation task and received the SearchResponse as sr (say), we can use the name of top level aggregation to get Terms aggregation object –

Terms aggrs = sr.getAggregations().get("by_class");

After that, we can iterate through the buckets to get results –

for (Terms.Bucket bucket : aggrs.getBuckets()) {
String key = bucket.getKeyAsString();
long docCount = bucket.getDocCount(); // Number of rows
// Use name of sub aggregations to get results
Sum sum = bucket.getAggregations().get("sum_probability");
Avg avg = bucket.getAggregations().get("avg_probability");
// Do something with key, docCount, sum and avg
}

[SOURCE]

So in this manner, results from aggregation response can be processed.

Performing nested aggregations for different countries

The previous section described the process to perform aggregation over a single field. For this section, we’ll aim to get results for each country present in the index given a classifier field.

Writing a nested aggregation builder

To get the aggregation required, AggregationBuilder from previous section can be added as a sub-aggregation for the AggregationBuilder for country code field –

AggregationBuilder aggrs = AggregationBuilders.terms("by_country").field("place_country_code")
    .subAggregation(
        AggregationBuilders.terms("by_class").field(classifierName)
            .subAggregation(
                AggregationBuilders.avg("avg_probability").field(probabilityField)
            )
            .subAggregation(
                AggregationBuilders.sum("sum_probability").field(probabilityField)
            );
    );

[SOURCE]

Processing the results

Again, we can get the results by processing the AggregationBuilders by name in a top-to-bottom fashion –

Terms aggrs = response.getAggregations().get("by_country");
for (Terms.Bucket bucket : aggr.getBuckets()) {
    String countryCode = bucket.getKeyAsString();
    Terms classAggrs = bucket.getAggregations().get("by_class");
    for (Terms.Bucket classBucket : classAggr.getBuckets()) {
        String key = classBucket.getKeyAsString();
        long docCount = classBucket.getDocCount();
        Sum sum = classBucket.getAggregations().get("sum_probability");
        Avg avg = classBucket.getAggregations().get("avg_probability");
        ...
    }
    ...
}

[SOURCE]

And we have the data about classifier for each country present in the index.

Conclusion

This blog post explained about Elasticsearch aggregations and their usage in the loklak server project. The changes discussed here were introduced over a series of patches to ElasticsearchClient.java by @singhpratyush (me).

Resources

Using Elasticsearch Aggregations to Analyse Classifier Data in loklak Server