Using Mosquitto as a Message Broker for MQTT in loklak Server

In loklak server, messages are collected from various sources and indexed using Elasticsearch. To know when a message of interest arrives, users can poll the search endpoint. But this method would require a lot of HTTP requests, most of them being redundant. Also, if a user would like to collect messages for a particular topic, he would need to make a lot of requests over a period of time to get enough data.

For GSoC 2017, my proposal was to introduce stream API in the loklak server so that we could save ourselves from making too many requests and also add many use cases.

Mosquitto is Eclipse’s project which acts as a message broker for the popular MQTT protocol. MQTT, based on the pub-sub model, is a lightweight and IOT friendly protocol. In this blog post, I will discuss the basic setup of Mosquitto in the loklak server.

Installation and Dependency for Mosquitto

The installation process of Mosquitto is very simple. For Ubuntu, it is available from the pre installed PPAs –

sudo apt-get install mosquitto

Once the message broker is up and running, we can use the clients to connect to it and publish/subscribe to channels. To add MQTT client as a project dependency, we can introduce following line in Gradle dependencies file –

compile group: 'net.sf.xenqtt', name: 'xenqtt', version: '0.9.5'

[SOURCE]

After this, we can use the client libraries in the server code base.

The MQTTPublisher Class

The MQTTPublisher class in loklak would provide an interface to perform basic operations in MQTT. The implementation uses AsyncClientListener to connect to Mosquitto broker –

AsyncClientListener listener = new AsyncClientListener() {
    // Override methods according to needs
};

[SOURCE]

The publish method for the class can be used by other components of the project to publish messages on the desired channel –

public void publish(String channel, String message) {
    this.mqttClient.publish(new PublishMessage(channel, QoS.AT_LEAST_ONCE, message));
}

[SOURCE]

We also have methods which allow publishing of multiple messages to multiple channels in order to increase the functionality of the class.

Starting Publisher with Server

The flags which signal using of streaming service in loklak are located in conf/config.properties. These configurations are referred while initializing the Data Access Object and an MQTTPublisher is created if needed –

String mqttAddress = getConfig("stream.mqtt.address", "tcp://127.0.0.1:1883");
streamEnabled = getConfig("stream.enabled", false);
if (streamEnabled) {
    mqttPublisher = new MQTTPublisher(mqttAddress);
}

[SOURCE]

The mqttPublisher can now be used by other components of loklak to publish messages to the channel they want.

Adding Mosquitto to Kubernetes

Since loklak has also a nice Kubernetes setup, it was very simple to introduce a new deployment for Mosquitto to it.

Changes in Dockerfile

The Dockerfile for master deployment has to be modified to discover Mosquitto broker in the Kubernetes cluster. For this purpose, corresponding flags in config.properties have to be changed to ensure that things work fine –

sed -i.bak 's/^\(stream.enabled\).*/\1=true/' conf/config.properties && \
sed -i.bak 's/^\(stream.mqtt.address\).*/\1=mosquitto.mqtt:1883/' conf/config.properties && \

[SOURCE]

The Mosquitto broker would be available at mosquitto.mqtt:1883 because of the service that is created for it (explained in later section).

Mosquitto Deployment

The Docker image used in Kubernetes deployment of Mosquitto is taken from toke/docker-kubernetes. Two ports are exposed for the cluster but no volumes are needed –

apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  name: mosquitto
  namespace: mqtt
spec:
  ...
  template:
    ...
    spec:
      containers:
      - name: mosquitto
        image: toke/mosquitto
        ports:
        - containerPort: 9001
        - containerPort: 8883

[SOURCE]

Exposing Mosquitto to the Cluster

Now that we have the deployment running, we need to expose the required ports to the cluster so that other components may use it. The port 9001 appears as port 80 for the service and 1883 is also exposed –

apiVersion: v1
kind: Service
metadata:
  name: mosquitto
  namespace: mqtt
  ...
spec:
  ...
  ports:
  - name: mosquitto
    port: 1883
  - name: mosquitto-web
    port: 80
    targetPort: 9001

[SOURCE]

After creating the service using this configuration, we will be able to connect our clients to Mosquitto at address mosquitto.mqtt:1883.

Conclusion

In this blog post, I discussed the process of adding Mosquitto to the loklak server project. This is the first step towards introducing the stream API for messages collected in loklak.

These changes were introduced in pull requests loklak/loklak_server#1393 and loklak/loklak_server#1398 by @singhpratyush (me).

Resources

Using Mosquitto as a Message Broker for MQTT in loklak Server

Caching Elasticsearch Aggregation Results in loklak Server

To provide aggregated data for various classifiers, loklak uses Elasticsearch aggregations. Aggregated data speaks a lot more than a few instances from it can say. But performing aggregations on each request can be very resource consuming. So we needed to come up with a way to reduce this load.

In this post, I will be discussing how I came up with a caching model for the aggregated data from the Elasticsearch index.

Fields to Consider while Caching

At the classifier endpoint, aggregations can be requested based on the following fields –

  • Classifier Name
  • Classifier Classes
  • Countries
  • Start Date
  • End Date

But to cache results, we can ignore cases where we just require a few classes or countries and store aggregations for all of them instead. So the fields that will define the cache to look for will be –

  • Classifier Name
  • Start Date
  • End Date

Type of Cache

The data structure used for caching was Java’s HashMap. It would be used to map a special string key to a special object discussed in a later section.

Key

The key is built using the fields mentioned previously –

private static String getKey(String index, String classifier, String sinceDate, String untilDate) {
    return index + "::::"
        + classifier + "::::"
        + (sinceDate == null ? "" : sinceDate) + "::::"
        + (untilDate == null ? "" : untilDate);
}

[SOURCE]

In this way, we can handle requests where a user makes a request for every class there is without running the expensive aggregation job every time. This is because the key for such requests will be same as we are not considering country and class for this purpose.

Value

The object used as key in the HashMap is a wrapper containing the following –

  1. json – It is a JSONObject containing the actual data.
  2. expiry – It is the expiry of the object in milliseconds.

class JSONObjectWrapper {
    private JSONObject json; 
    private long expiry;
    ... 
}

Timeout

The timeout associated with a cache is defined in the configuration file of the project as “classifierservlet.cache.timeout”. It defaults to 5 minutes and is used to set the eexpiryof a cached JSONObject –

class JSONObjectWrapper {
    ...
    private static long timeout = DAO.getConfig("classifierservlet.cache.timeout", 300000);

    JSONObjectWrapper(JSONObject json) {
        this.json = json;
        this.expiry = System.currentTimeMillis() + timeout;
    }
    ...
}

 

Cache Hit

For searching in the cache, the previously mentioned string is composed from the parameters requested by the user. Checking for a cache hit can be done in the following manner –

String key = getKey(index, classifier, sinceDate, untilDate);
if (cacheMap.keySet().contains(key)) {
    JSONObjectWrapper jw = cacheMap.get(key);
    if (!jw.isExpired()) {
        // Do something with jw
    }
}
// Calculate the aggregations
...

But since jw here would contain all the data, we would need to filter out the classes and countries which are not needed.

Filtering results

For filtering out the parts which do not contain the information requested by the user, we can perform a simple pass and exclude the results that are not needed.

Since the number of fields to filter out, i.e. classes and countries, would not be that high, this process would not be that resource intensive. And at the same time, would save us from requesting heavy aggregation tasks from the user.

Since the data about classes is nested inside the respective country field, we need to perform two level of filtering –

JSONObject retJson = new JSONObject(true);
for (String key : json.keySet()) {
    JSONArray value = filterInnerClasses(json.getJSONArray(key), classes);
    if ("GLOBAL".equals(key) || countries.contains(key)) {
        retJson.put(key, value);
    }
}

Cache Miss

In the case of a cache miss, the helper functions are called from ElasticsearchClient.java to get results. These results are then parsed from HashMap to JSONObject and stored in the cache for future usages.

JSONObject freshCache = getFromElasticsearch(index, classifier, sinceDate, untilDate);
cacheMap.put(key, new JSONObjectWrapper(freshCache));

The getFromElasticsearch method finds all the possible classes and makes a request to the appropriate method in ElasticsearchClient, getting data for all classifiers and all countries.

Conclusion

In this blog post, I discussed the need for caching of aggregations and the way it is achieved in the loklak server. This feature was introduced in pull request loklak/loklak_server#1333 by @singhpratyush (me).

Resources

Caching Elasticsearch Aggregation Results in loklak Server

Refactor of Dropdown Menu in Susper

The first version of the Susper top menu was providing links to resources and tutorials. In the next version of the menu, we were looking for a menu with more colorful icons, a cleaner UI design and a menu that should appear on the homepage as well. In this blog, I will discuss about refactoring the dropdown menu. This is how earlier dropdown of Susper looks like:

We decided to create a separate component for the menu DropdownComponent.

At first, I created a drop down menu with matching dimensions similar to what Google follows. Then, I gave padding: 28px to create similar UI to market leader. This will make a dropdown menu with clean UI design. I replaced the old icons with colorful icons. In the dropdown we have:

  • Added more projects of FOSSASIA like eventyay, loklak, susi and main website of FOSSASIA. Here how it looks now :

The main problem I faced was aligning the content inside the dropdown and they should not get disturbed when the screen size changes.
I kept the each icon dimensions as 48 x 48 inside drop down menu. I also arranged them in a row. It was easy to use div element to create rows rather than using ul and li tags which were implemented earlier.

To create a horizontal grey line effect, I used the hr element. I made sure, padding remained the same above and below the horizontal line.

At the end of drop down menu, @mariobehling suggested instead of writing ‘more’, it should redirect to projects page of FOSSASIA.

This is how I worked on refactoring drop down menu and added it on the homepage as well.

Resources

Refactor of Dropdown Menu in Susper

Setting up Codecov in Susper repository hosted on Github

In this blog post, I’ll be discussing how we setup codecov in Susper.

  • What is Codecov and in what projects it is being used in FOSSASIA?

Codecov is a famous code coverage tool. It can be easily integrated with the services like Travis CI. Codecov also provides more features with the services like Docker.

Projects in FOSSASIA like Open Event Orga Server, Loklak search, Open Event Web App uses Codecov. Recently, in the Susper project also the code coverage tool has been configured.

  • How we setup Codecov in our project repository hosted on Github?

The simplest way to setup Codecov in a project repository is by installing codecov.io using the terminal command:

npm install --save-dev codecov.io

Susper works on tech-stack Angular 2 (we have recently upgraded it to Angular v4.1.3) recently. Angular comes with Karma and Jasmine for testing purpose. There are many repositories of FOSSASIA in which Codecov has been configured like this. But with, Angular this case is a little bit tricky. So, using alone:

bash <(curl -s https://codecov.io/bash)

won’t generate code coverage because of the presence of Karma and Jasmine. It will require two packages: istanbul as coverage reporter and jasmine as html reporter. I have discussed them below.

Install these two packages:

  • Karma-coverage-istanbul-reporter
  • npm install karma-coverage-istanbul-reporter --save-dev
  • Karma-jasmine html reporter
  • npm install karma-jasmine-html-reporter --save-dev

    After installing the codecov.io, the package.json will be updated as follows:

  • "devDependencies": {
      "codecov": "^2.2.0",
      "karma-coverage-istanbul-reporter": "^1.3.0",
      "karma-jasmine-html-reporter": "^0.2.2",
    }

    Add a script for testing:

  • "scripts": {
       "test": "ng test --single-run --code-coverage --reporters=coverage-istanbul"
    }

    Now generally, the codecov works better with Travis CI. With the one line bash <(curl -s https://codecov.io/bash) the code coverage can now be easily reported.

Here is a particular example of travis.yml from the project repository of Susper:

script:
 - ng test --single-run --code-coverage --reporters=coverage-istanbul
 - ng lint
 
after_success:
 - bash <(curl -s https://codecov.io/bash)
 - bash ./deploy.sh

Update karma.config.js as well:

Module.exports = function (config) {
  config.set({
    plugins: [
      require('karma-jasmine-html-reporter'),
      require('karma-coverage-istanbul-reporter')
    ],
    preprocessors: {
      'src/app/**/*.js': ['coverage']
    },
    client {
      clearContext: false
    },
    coverageIstanbulReporter: {
      reports: ['html', 'lcovonly'],
      fixWebpackSourcePaths: true
    },
    reporters: config.angularCli && config.angularCli.codeCoverage
      ? ['progress', 'coverage-istanbul'],
      : ['progress', 'kjhtml'],
  })
}

This karma.config.js is an example from the Susper project. Find out more here: https://github.com/fossasia/susper.com/pull/420
This is how we setup codecov in Susper repository. And like this way, it can be set up in other repositories as well which supports Angular 2 or 4 as tech stack.

Setting up Codecov in Susper repository hosted on Github