Introducing Stream Servlet in loklak Server

A major part of my GSoC proposal was adding stream API to loklak server. In a previous blog post, I discussed the addition of Mosquitto as a message broker for MQTT streaming. After testing this service for a few days and some minor improvements, I was in a position to expose the stream to outside users using a simple API.

In this blog post, I will be discussing the addition of /api/stream.json endpoint to loklak server.

HTTP Server-Sent Events

Server-sent events (SSE) is a technology where a browser receives automatic updates from a server via HTTP connection. The Server-Sent Events EventSource API is standardized as part of HTML5 by the W3C.

Wikipedia

This API is supported by all major browsers except Microsoft Edge. For loklak, the plan was to use this event system to send messages, as they arrive, to the connected users. Apart from browser support, EventSource API can also be used with many other technologies too.

Jetty Eventsource Plugin

For Java, we can use Jetty’s EventSource plugin to send events to clients. It is similar to other Jetty servlets when it comes to processing the arguments, handling requests, etc. But it provides a simple interface to send events as they occur to connected users.

Adding Dependency

To use this plugin, we can add the following line to Gradle dependencies –

compile group: 'org.eclipse.jetty', name: 'jetty-eventsource-servlet', version: '1.0.0'

[SOURCE]

The Event Source

An EventSource is the object which is required for EventSourceServlet to send events. All the logics for emitting events needs to be defined in the related class. To link a servlet with an EventSource, we need to override the newEventSource method –

public class StreamServlet extends EventSourceServlet {
    @Override
    protected EventSource newEventSource(HttpServletRequest request) {
        String channel = request.getParameter("channel");
        if (channel == null) {
            return null;
        }
        if (channel.isEmpty()) {
            return null;
        }
        return new MqttEventSource(channel);
    }
}

[SOURCE]

If no channel is provided, the EventSource object will be null and the request will be rejected. Here, the MqttEventSource would be used to handle the stream of Tweets as they arrive from the Mosquitto message broker.

Cross Site Requests

Since the requests to this endpoint can’t be of JSONP type, it is necessary to allow cross site requests on this endpoint. This can be done by overriding the doGet method of the servlet –

@Override
protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
     response.setHeader("Access-Control-Allow-Origin", "*");
    super.doGet(request, response);
}

[SOURCE]

Adding MQTT Subscriber

When a request for events arrives, the constructor to MqttEventSource is called. At this stage, we need to connect to the stream from Mosquitto for the channel. To achieve this, we can set the class as MqttCallback using appropriate client configurations –

public class MqttEventSource implements MqttCallback {
    ...
    MqttEventSource(String channel) {
        this.channel = channel;
    }
    ...
    this.mqttClient = new MqttClient(address, "loklak_server_subscriber");
    this.mqttClient.connect();
    this.mqttClient.setCallback(this);
    this.mqttClient.subscribe(this.channel);
    ...
}

[SOURCE]

By setting the callback to this, we can override the messageArrived method to handle the arrival of a new message on the channel. Just to mention, the client library used here is Eclipse Paho.

Connecting MQTT Stream to SSE Stream

Now that we have subscribed to the channel we wish to send events from, we can use the Emitter to send events from our EventSource by implementing it –

public class MqttEventSource implements EventSource, MqttCallback {
    private Emitter emitter;


    @Override
    public void onOpen(Emitter emitter) throws IOException {
        this.emitter = emitter;
        ...
    }

    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        this.emitter.data(message.toString());
    }
}

[SOURCE]

Closing Stream on Disconnecting from User

When a client disconnects from the stream, it doesn’t makes sense to stay connected to the server. We can use the onClose method to disconnect the subscriber from the MQTT broker –

@Override
public void onClose() {
    try {
        this.mqttClient.close();
        this.mqttClient.disconnect();
    } catch (MqttException e) {
        // Log some warning 
    }
}

[SOURCE]

Conclusion

In this blog post, I discussed connecting the MQTT stream to SSE stream using Jetty’s EventSource plugin. Once in place, this event system would save us from making too many requests to collect and visualize data. The possibilities of applications of such feature are huge.

This feature can be seen in action at the World Mood Tracker app.

The changes were introduced in pull request loklak/loklak_server#1474 by @singhpratyush (me).

Resources

Introducing Stream Servlet in loklak Server

Using Mosquitto as a Message Broker for MQTT in loklak Server

In loklak server, messages are collected from various sources and indexed using Elasticsearch. To know when a message of interest arrives, users can poll the search endpoint. But this method would require a lot of HTTP requests, most of them being redundant. Also, if a user would like to collect messages for a particular topic, he would need to make a lot of requests over a period of time to get enough data.

For GSoC 2017, my proposal was to introduce stream API in the loklak server so that we could save ourselves from making too many requests and also add many use cases.

Mosquitto is Eclipse’s project which acts as a message broker for the popular MQTT protocol. MQTT, based on the pub-sub model, is a lightweight and IOT friendly protocol. In this blog post, I will discuss the basic setup of Mosquitto in the loklak server.

Installation and Dependency for Mosquitto

The installation process of Mosquitto is very simple. For Ubuntu, it is available from the pre installed PPAs –

sudo apt-get install mosquitto

Once the message broker is up and running, we can use the clients to connect to it and publish/subscribe to channels. To add MQTT client as a project dependency, we can introduce following line in Gradle dependencies file –

compile group: 'net.sf.xenqtt', name: 'xenqtt', version: '0.9.5'

[SOURCE]

After this, we can use the client libraries in the server code base.

The MQTTPublisher Class

The MQTTPublisher class in loklak would provide an interface to perform basic operations in MQTT. The implementation uses AsyncClientListener to connect to Mosquitto broker –

AsyncClientListener listener = new AsyncClientListener() {
    // Override methods according to needs
};

[SOURCE]

The publish method for the class can be used by other components of the project to publish messages on the desired channel –

public void publish(String channel, String message) {
    this.mqttClient.publish(new PublishMessage(channel, QoS.AT_LEAST_ONCE, message));
}

[SOURCE]

We also have methods which allow publishing of multiple messages to multiple channels in order to increase the functionality of the class.

Starting Publisher with Server

The flags which signal using of streaming service in loklak are located in conf/config.properties. These configurations are referred while initializing the Data Access Object and an MQTTPublisher is created if needed –

String mqttAddress = getConfig("stream.mqtt.address", "tcp://127.0.0.1:1883");
streamEnabled = getConfig("stream.enabled", false);
if (streamEnabled) {
    mqttPublisher = new MQTTPublisher(mqttAddress);
}

[SOURCE]

The mqttPublisher can now be used by other components of loklak to publish messages to the channel they want.

Adding Mosquitto to Kubernetes

Since loklak has also a nice Kubernetes setup, it was very simple to introduce a new deployment for Mosquitto to it.

Changes in Dockerfile

The Dockerfile for master deployment has to be modified to discover Mosquitto broker in the Kubernetes cluster. For this purpose, corresponding flags in config.properties have to be changed to ensure that things work fine –

sed -i.bak 's/^\(stream.enabled\).*/\1=true/' conf/config.properties && \
sed -i.bak 's/^\(stream.mqtt.address\).*/\1=mosquitto.mqtt:1883/' conf/config.properties && \

[SOURCE]

The Mosquitto broker would be available at mosquitto.mqtt:1883 because of the service that is created for it (explained in later section).

Mosquitto Deployment

The Docker image used in Kubernetes deployment of Mosquitto is taken from toke/docker-kubernetes. Two ports are exposed for the cluster but no volumes are needed –

apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  name: mosquitto
  namespace: mqtt
spec:
  ...
  template:
    ...
    spec:
      containers:
      - name: mosquitto
        image: toke/mosquitto
        ports:
        - containerPort: 9001
        - containerPort: 8883

[SOURCE]

Exposing Mosquitto to the Cluster

Now that we have the deployment running, we need to expose the required ports to the cluster so that other components may use it. The port 9001 appears as port 80 for the service and 1883 is also exposed –

apiVersion: v1
kind: Service
metadata:
  name: mosquitto
  namespace: mqtt
  ...
spec:
  ...
  ports:
  - name: mosquitto
    port: 1883
  - name: mosquitto-web
    port: 80
    targetPort: 9001

[SOURCE]

After creating the service using this configuration, we will be able to connect our clients to Mosquitto at address mosquitto.mqtt:1883.

Conclusion

In this blog post, I discussed the process of adding Mosquitto to the loklak server project. This is the first step towards introducing the stream API for messages collected in loklak.

These changes were introduced in pull requests loklak/loklak_server#1393 and loklak/loklak_server#1398 by @singhpratyush (me).

Resources

Using Mosquitto as a Message Broker for MQTT in loklak Server

Deploying loklak Server on Kubernetes with External Elasticsearch

Kubernetes is an open-source system for automating deployment, scaling, and management of containerized applications.

kubernetes.io

Kubernetes is an awesome cloud platform, which ensures that cloud applications run reliably. It runs automated tests, flawless updates, smart roll out and rollbacks, simple scaling and a lot more.

So as a part of GSoC, I worked on taking the loklak server to Kubernetes on Google Cloud Platform. In this blog post, I will be discussing the approach followed to deploy development branch of loklak on Kubernetes.

New Docker Image

Since Kubernetes deployments work on Docker images, we needed one for the loklak project. The existing image would not be up to the mark for Kubernetes as it contained the declaration of volumes and exposing of ports. So I wrote a new Docker image which could be used in Kubernetes.

The image would simply clone loklak server, build the project and trigger the server as CMD

FROM alpine:latest

ENV LANG=en_US.UTF-8
ENV JAVA_TOOL_OPTIONS=-Dfile.encoding=UTF8

WORKDIR /loklak_server

RUN apk update && apk add openjdk8 git bash && \
    git clone https://github.com/loklak/loklak_server.git /loklak_server && \
    git checkout development && \
    ./gradlew build -x test -x checkstyleTest -x checkstyleMain -x jacocoTestReport && \
    # Some Configurations and Cleanups

CMD ["bin/start.sh", "-Idn"]

[SOURCE]

This image wouldn’t have any volumes or exposed ports and we are now free to configure them in the configuration files (discussed in a later section).

Building and Pushing Docker Image using Travis

To automatically build and push on a commit to the master branch, Travis build is used. In the after_success section, a call to push Docker image is made.

Travis environment variables hold the username and password for Docker hub and are used for logging in –

docker login -u $DOCKER_USERNAME -p $DOCKER_PASSWORD

[SOURCE]

We needed checks there to ensure that we are on the right branch for the push and we are not handling a pull request –

# Build and push Kubernetes Docker image
KUBERNETES_BRANCH=loklak/loklak_server:latest-kubernetes-$TRAVIS_BRANCH
KUBERNETES_COMMIT=loklak/loklak_server:kubernetes-$TRAVIS_COMMIT
  
if [ "$TRAVIS_BRANCH" == "development" ]; then
    docker build -t loklak_server_kubernetes kubernetes/images/development
    docker tag loklak_server_kubernetes $KUBERNETES_BRANCH
    docker push $KUBERNETES_BRANCH
    docker tag $KUBERNETES_BRANCH $KUBERNETES_COMMIT
    docker push $KUBERNETES_COMMIT
elif [ "$TRAVIS_BRANCH" == "master" ]; then
    # Build and push master
else
    echo "Skipping Kubernetes image push for branch $TRAVIS_BRANCH"
fi

[SOURCE]

Kubernetes Configurations for loklak

Kubernetes cluster can completely be configured using configurations written in YAML format. The deployment of loklak uses the previously built image. Initially, the image tagged as latest-kubernetes-development is used –

apiVersion: apps/v1beta1
kind: Deployment
metadata:
  name: server
  namespace: web
spec:
  replicas: 1
  template:
    metadata:
      labels:
        app: server
    spec:
      containers:
      - name: server
        image: loklak/loklak_server:latest-kubernetes-development
        ...

[SOURCE]

Readiness and Liveness Probes

Probes act as the top level tester for the health of a deployment in Kubernetes. The probes are performed periodically to ensure that things are working fine and appropriate steps are taken if they fail.

When a new image is updated, the older pod still runs and servers the requests. It is replaced by the new ones only when the probes are successful, otherwise, the update is rolled back.

In loklak, the /api/status.json endpoint gives information about status of deployment and hence is a good target for probes –

livenessProbe:
  httpGet:
    path: /api/status.json
    port: 80
  initialDelaySeconds: 30
  timeoutSeconds: 3
readinessProbe:
  httpGet:
    path: /api/status.json
    port: 80
  initialDelaySeconds: 30
  timeoutSeconds: 3

[SOURCE]

These probes are performed periodically and the server is restarted if they fail (non-success HTTP status code or takes more than 3 seconds).

Ports and Volumes

In the configurations, port 80 is exposed as this is where Jetty serves inside loklak –

ports:
- containerPort: 80
  protocol: TCP

[SOURCE]

If we notice, this is the port that we used for running the probes. Since the development branch deployment holds no dumps, we didn’t need to specify any explicit volumes for persistence.

Load Balancer Service

While creating the configurations, a new public IP is assigned to the deployment using Google Cloud Platform’s load balancer. It starts listening on port 80 –

ports:
- containerPort: 80
  protocol: TCP

[SOURCE]

Since this service creates a new public IP, it is recommended not to replace/recreate this services as this would result in the creation of new public IP. Other components can be updated individually.

Kubernetes Configurations for Elasticsearch

To maintain a persistent index, this deployment would require an external Elasticsearch cluster. loklak is able to connect itself to external Elasticsearch cluster by changing a few configurations.

Docker Image and Environment Variables

The image used for Elasticsearch is taken from pires/docker-elasticsearch-kubernetes. It allows easy configuration of properties from environment variables in configurations. Here is a list of configurable variables, but we needed just a few of them to do our task –

image: quay.io/pires/docker-elasticsearch-kubernetes:2.0.0
env:
- name: KUBERNETES_CA_CERTIFICATE_FILE
  value: /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
- name: NAMESPACE
  valueFrom:
    fieldRef:
      fieldPath: metadata.namespace
- name: "CLUSTER_NAME"
  value: "loklakcluster"
- name: "DISCOVERY_SERVICE"
  value: "elasticsearch"
- name: NODE_MASTER
  value: "true"
- name: NODE_DATA
  value: "true"
- name: HTTP_ENABLE
  value: "true"

[SOURCE]

Persistent Index using Persistent Cloud Disk

To make the index last even after the deployment is stopped, we needed a stable place where we could store all that data. Here, Google Compute Engine’s standard persistent disk was used. The disk can be created using GCP web portal or the gcloud CLI.

Before attaching the disk, we need to declare a volume where we could mount it –

volumeMounts:
- mountPath: /data
  name: storage

[SOURCE]

Now that we have a volume, we can simply mount the persistent disk on it –

volumes:
- name: storage
  gcePersistentDisk:
    pdName: data-index-disk
    fsType: ext4

[SOURCE]

Now, whenever we deploy these configurations, we can reuse the previous index.

Exposing Kubernetes to Cluster

The HTTP and transport clients are enabled on port 9200 and 9300 respectively. They can be exposed to the rest of the cluster using the following service –

apiVersion: v1
kind: Service
...
Spec:
  ...
  ports:
  - name: http
    port: 9200
    protocol: TCP
  - name: transport
    port: 9300
    protocol: TCP

[SOURCE]

Once deployed, other deployments can access the cluster API from ports 9200 and 9300.

Connecting loklak to Kubernetes

To connect loklak to external Elasticsearch cluster, TransportClient Java API is used. In order to enable these settings, we simply need to make some changes in configurations.

Since we enable the service named “elasticsearch” in namespace “elasticsearch”, we can access the cluster at address elasticsearch.elasticsearch:9200 (web) and elasticsearch.elasticsearch:9300 (transport).

To confine these changes only to Kubernetes deployment, we can use sed command while building the image (in Dockerfile) –

sed -i.bak 's/^\(elasticsearch_transport.enabled\).*/\1=true/' conf/config.properties && \
sed -i.bak 's/^\(elasticsearch_transport.addresses\).*/\1=elasticsearch.elasticsearch:9300/' conf/config.properties && \

[SOURCE]

Now when we create the deployments in Kubernetes cluster, loklak auto connects to the external elasticsearch index and creates indices if needed.

Verifying persistence of the Elasticsearch Index

In order to see that the data persists, we can completely delete the deployment or even the cluster if we want. Later, when we recreate the deployment, we can see all the messages already present in the index.

I  [2017-07-29 09:42:51,804][INFO ][node                     ] [Hellion] initializing ...
 
I  [2017-07-29 09:42:52,024][INFO ][plugins                  ] [Hellion] loaded [cloud-kubernetes], sites []
 
I  [2017-07-29 09:42:52,055][INFO ][env                      ] [Hellion] using [1] data paths, mounts [[/data (/dev/sdb)]], net usable_space [84.9gb], net total_space [97.9gb], spins? [possibly], types [ext4]
 
I  [2017-07-29 09:42:53,543][INFO ][node                     ] [Hellion] initialized
 
I  [2017-07-29 09:42:53,543][INFO ][node                     ] [Hellion] starting ...
 
I  [2017-07-29 09:42:53,620][INFO ][transport                ] [Hellion] publish_address {10.8.1.13:9300}, bound_addresses {10.8.1.13:9300}
 
I  [2017-07-29 09:42:53,633][INFO ][discovery                ] [Hellion] loklakcluster/cJtXERHETKutq7nujluJvA
 
I  [2017-07-29 09:42:57,866][INFO ][cluster.service          ] [Hellion] new_master {Hellion}{cJtXERHETKutq7nujluJvA}{10.8.1.13}{10.8.1.13:9300}{master=true}, reason: zen-disco-join(elected_as_master, [0] joins received)
 
I  [2017-07-29 09:42:57,955][INFO ][http                     ] [Hellion] publish_address {10.8.1.13:9200}, bound_addresses {10.8.1.13:9200}
 
I  [2017-07-29 09:42:57,955][INFO ][node                     ] [Hellion] started
 
I  [2017-07-29 09:42:58,082][INFO ][gateway                  ] [Hellion] recovered [8] indices into cluster_state

In the last line from the logs, we can see that indices already present on the disk were recovered. Now if we head to the public IP assigned to the cluster, we can see that the message count is restored.

Conclusion

In this blog post, I discussed how we utilised the Kubernetes setup to shift loklak to Google Cloud Platform. The deployment is active and can be accessed from the link provided under wiki section of loklak/loklak_server repo.

I introduced these changes in pull request loklak/loklak_server#1349 with the help of @niranjan94, @uday96 and @chiragw15.

Resources

Deploying loklak Server on Kubernetes with External Elasticsearch

Caching Elasticsearch Aggregation Results in loklak Server

To provide aggregated data for various classifiers, loklak uses Elasticsearch aggregations. Aggregated data speaks a lot more than a few instances from it can say. But performing aggregations on each request can be very resource consuming. So we needed to come up with a way to reduce this load.

In this post, I will be discussing how I came up with a caching model for the aggregated data from the Elasticsearch index.

Fields to Consider while Caching

At the classifier endpoint, aggregations can be requested based on the following fields –

  • Classifier Name
  • Classifier Classes
  • Countries
  • Start Date
  • End Date

But to cache results, we can ignore cases where we just require a few classes or countries and store aggregations for all of them instead. So the fields that will define the cache to look for will be –

  • Classifier Name
  • Start Date
  • End Date

Type of Cache

The data structure used for caching was Java’s HashMap. It would be used to map a special string key to a special object discussed in a later section.

Key

The key is built using the fields mentioned previously –

private static String getKey(String index, String classifier, String sinceDate, String untilDate) {
    return index + "::::"
        + classifier + "::::"
        + (sinceDate == null ? "" : sinceDate) + "::::"
        + (untilDate == null ? "" : untilDate);
}

[SOURCE]

In this way, we can handle requests where a user makes a request for every class there is without running the expensive aggregation job every time. This is because the key for such requests will be same as we are not considering country and class for this purpose.

Value

The object used as key in the HashMap is a wrapper containing the following –

  1. json – It is a JSONObject containing the actual data.
  2. expiry – It is the expiry of the object in milliseconds.

class JSONObjectWrapper {
    private JSONObject json; 
    private long expiry;
    ... 
}

Timeout

The timeout associated with a cache is defined in the configuration file of the project as “classifierservlet.cache.timeout”. It defaults to 5 minutes and is used to set the eexpiryof a cached JSONObject –

class JSONObjectWrapper {
    ...
    private static long timeout = DAO.getConfig("classifierservlet.cache.timeout", 300000);

    JSONObjectWrapper(JSONObject json) {
        this.json = json;
        this.expiry = System.currentTimeMillis() + timeout;
    }
    ...
}

 

Cache Hit

For searching in the cache, the previously mentioned string is composed from the parameters requested by the user. Checking for a cache hit can be done in the following manner –

String key = getKey(index, classifier, sinceDate, untilDate);
if (cacheMap.keySet().contains(key)) {
    JSONObjectWrapper jw = cacheMap.get(key);
    if (!jw.isExpired()) {
        // Do something with jw
    }
}
// Calculate the aggregations
...

But since jw here would contain all the data, we would need to filter out the classes and countries which are not needed.

Filtering results

For filtering out the parts which do not contain the information requested by the user, we can perform a simple pass and exclude the results that are not needed.

Since the number of fields to filter out, i.e. classes and countries, would not be that high, this process would not be that resource intensive. And at the same time, would save us from requesting heavy aggregation tasks from the user.

Since the data about classes is nested inside the respective country field, we need to perform two level of filtering –

JSONObject retJson = new JSONObject(true);
for (String key : json.keySet()) {
    JSONArray value = filterInnerClasses(json.getJSONArray(key), classes);
    if ("GLOBAL".equals(key) || countries.contains(key)) {
        retJson.put(key, value);
    }
}

Cache Miss

In the case of a cache miss, the helper functions are called from ElasticsearchClient.java to get results. These results are then parsed from HashMap to JSONObject and stored in the cache for future usages.

JSONObject freshCache = getFromElasticsearch(index, classifier, sinceDate, untilDate);
cacheMap.put(key, new JSONObjectWrapper(freshCache));

The getFromElasticsearch method finds all the possible classes and makes a request to the appropriate method in ElasticsearchClient, getting data for all classifiers and all countries.

Conclusion

In this blog post, I discussed the need for caching of aggregations and the way it is achieved in the loklak server. This feature was introduced in pull request loklak/loklak_server#1333 by @singhpratyush (me).

Resources

Caching Elasticsearch Aggregation Results in loklak Server

Making loklak Server’s Kaizen Harvester Extendable

Harvesting strategies in loklak are something that the end users can’t see, but they play a vital role in deciding the way in which messages are collected with loklak. One of the strategies in loklak is defined by the Kaizen Harvester, which generates queries from collected messages.

The original strategy used a simple hash queue which drops queries once it is full. This effect is not desirable as we tend to lose important queries in this process if they come up late while harvesting. To overcome this behaviour without losing important search queries, we needed to come up with new harvesting strategy(ies) that would provide a better approach for harvesting. In this blog post, I am discussing the changes made in the kaizen harvester so it can be extended to create different flavors of harvesters.

What can be different in extended harvesters?

To make the Kaizen harvester extendable, we first needed to decide that what are the parts in the original Kaizen harvester that can be changed to make the strategy different (and probably better).

Since one of the most crucial part of the Kaizen harvester was the way it stores queries to be processed, it was one of the most obvious things to change. Another thing that should be allowed to configure across various strategies was the decision of whether to go for harvesting the queries from the query list.

Query storage with KaizenQueries

To allow different methods of storing the queries, KaizenQueries class was introduced in loklak. It was configured to provide basic methods that would be required for a query storing technique to work. A query storing technique can be any data structure that we can use to store search queries for Kaizen harvester.

public abstract class KaizenQueries {

     public abstract boolean addQuery(String query);

     public abstract String getQuery();

     public abstract int getSize();

     public abstract int getMaxSize();

     public boolean isEmpty() {
         return this.getSize() == 0;
     }
}

[SOURCE]

Also, a default type of KaizenQueries was introduced to use in the original Kaizen harvester. This allowed the same interface as the original queue which was used in the harvester.

Another constructor was introduced in Kaizen harvester which allowed setting the KaizenQueries for an instance of its derived classes. It solved the problem of providing an interface of KaizenQueries inside the Kaizen harvester which can be used by any inherited strategy –

private KaizenQueries queries = null;

public KaizenHarvester(KaizenQueries queries) {
    ...
    this.queries = queries;
    ...
}

public void someMethod() {
    ...
    this.queries.addQuery(someQuery);
    ...
}

[SOURCE]

This being added, getting new queries or adding new queries was a simple. We just need to use getQuery() and addQuery() methods without worrying about the internal implementations.

Configurable decision for harvesting

As mentioned earlier, the decision taken for harvesting should also be configurable. For this, a protected method was implemented and used in harvest() method –

protected boolean shallHarvest() {
    float targetProb = random.nextFloat();
    float prob = 0.5F;
    if (this.queries.getMaxSize() > 0) {
        prob = queries.getSize() / (float)queries.getMaxSize();
    }
    return !this.queries.isEmpty() && targetProb < prob;
}

@Override
public int harvest() {
    if (this.shallHarvest()) {
        return harvestMessages();
    }

    grabSuggestions();

    return 0;
}

[SOURCE]

The shallHarvest() method can be overridden by derived classes to allow any type of harvesting decision that they want. For example, we can configure it in such a way that it harvests if there are any queries in the queue, or to use a different probability distribution instead of linear (maybe gaussian around 1).

Conclusion

This blog post explained about the changes made in loklak’s Kaizen harvester which allowed other harvesting strategies to be built on its top. It discussed the two components changed and how they allowed ease of inheriting the original Kaizen harvester. These changes were proposed in PR loklak/loklak_server#1203 by @singhpratyush (me).

Resources

Making loklak Server’s Kaizen Harvester Extendable

Introducing Priority Kaizen Harvester for loklak server

In the previous blog post, I discussed the changes made in loklak’s Kaizen harvester so it could be extended and other harvesting strategies could be introduced. Those changes made it possible to introduce a new harvesting strategy as PriorityKaizen harvester which uses a priority queue to store the queries that are to be processed. In this blog post, I will be discussing the process through which this new harvesting strategy was introduced in loklak.

Background, motivation and approach

Before jumping into the changes, we first need to understand that why do we need this new harvesting strategy. Let us start by discussing the issue with the Kaizen harvester.

The produce consumer imbalance in Kaizen harvester

Kaizen uses a simple hash queue to store queries. When the queue is full, new queries are dropped. But numbers of queries produced after searching for one query is much higher than the consumption rate, i.e. the queries are bound to overflow and new queries that arrive would get dropped. (See loklak/loklak_server#1156)

Learnings from attempt to add blocking queue for queries

As a solution to this problem, I first tried to use a blocking queue to store the queries. In this implementation, the producers would get blocked before putting the queries in the queue if it is full and would wait until there is space for more. This way, we would have a good balance between consumers and producers as the consumers would be waiting until producers can free up space for them –

public class BlockingKaizenHarvester extends KaizenHarvester {
   ...
   public BlockingKaizenHarvester() {
       super(new KaizenQueries() {
           ...
           private BlockingQueue<String> queries = new ArrayBlockingQueue<>(maxSize);

           @Override
           public boolean addQuery(String query) {
               if (this.queries.contains(query)) {
                   return false;
               }
               try {
                   this.queries.offer(query, this.blockingTimeout, TimeUnit.SECONDS);
                   return true;
               } catch (InterruptedException e) {
                   DAO.severe("BlockingKaizen Couldn't add query: " + query, e);
                   return false;
               }
           }
           @Override
           public String getQuery() {
               try {
                   return this.queries.take();
               } catch (InterruptedException e) {
                   DAO.severe("BlockingKaizen Couldn't get any query", e);
                   return null;
               }
           }
           ...
       });
   }
}

[SOURCE, loklak/loklak_server#1210]

But there is an issue here. The consumers themselves are producers of even higher rate. When a search is performed, queries are requested to be appended to the KaizenQueries instance for the object (which here, would implement a blocking queue). Now let us consider the case where queue is full and a thread requests a query from the queue and scrapes data. Now when the scraping is finished, many new queries are requested to be inserted to most of them get blocked (because the queue would be full again after one query getting inserted).

Therefore, using a blocking queue in KaizenQueries is not a good thing to do.

Other considerations

After the failure of introducing the Blocking Kaizen harvester, we looked for other alternatives for storing queries. We came across multilevel queues, persistent disk queues and priority queues.

Multilevel queues sounded like a good idea at first where we would have multiple queues for storing queries. But eventually, this would just boil down to how much queue size are we allowing and the queries would eventually get dropped.

Persistent disk queues would allow us to store greater number of queries but the major disadvantage was lookup time. It would terribly slow to check if a query already exists in the disk queue when the queue is large. Also, since the queries would always increase practically, the disk queue would also go out of hand at some point in time.

So by now, we were clear that not dropping queries is not an alternative. So what we had to use the limited size queue smartly so that we do not drop queries that are important.

Solution: Priority Queue

So a good solution to our problem was a priority queue. We could assign a higher score to queries that come from more popular Tweets and they would go higher in the queue and do not drop off until we have even higher priority queried in the queue.

Assigning score to a Tweet

Score for a tweet was decided using the following formula –

α= 5* (retweet count)+(favourite count)

score=α/(α+10*exp(-0.01*α))

This equation generates a score between zero and one from the retweet and favourite count of a Tweet. This normalisation of score would ensure we do not assign an insanely large score to Tweets with a high retweet and favourite count. You can see the behaviour for the second mentioned equation here.

Graph?

Changes required in existing Kaizen harvester

To take a score into account, it became necessary to add an interface to also provide a score as a parameter to the addQuery() method in KaizenQueries. Also, not all queries can have a score associated with it, for example, if we add a query that would search for Tweets older than the oldest in the current timeline, giving it a score wouldn’t be possible as it would not be associated with a single Tweet. To tackle this, a default score of 0.5 was given to these queries –

public abstract class KaizenQueries {

   public boolean addQuery(String query) {
       return this.addQuery(query, 0.5);
   }

   public abstract boolean addQuery(String query, double score);
   ...
}

[SOURCE]

Defining appropriate KaizenQueries object

The KaizenQueries object for a priority queue had to define a wrapper class that would hold the query and its score together so that they could be inserted in a queue as a single object.

ScoreWrapper and comparator

The ScoreWrapper is a simple class that stores score and query object together –

private class ScoreWrapper {

   private double score;
   private String query;

   ScoreWrapper(String m, double score) {
       this.query = m;
       this.score = score;
   }

}

[SOURCE]

In order to define a way to sort the ScoreWrapper objects in the priority queue, we need to define a Comparator for it –

private Comparator<ScoreWrapper> scoreComparator = (scoreWrapper, t1) -> (int) (scoreWrapper.score - t1.score);

[SOURCE]

Putting things together

Now that we have all the ingredients to declare our priority queue, we can also declare the strategy to getQuery and putQuery in the corresponding KaizenQueries object –

public class PriorityKaizenHarvester extends KaizenHarvester {

   private static class PriorityKaizenQueries extends KaizenQueries {
       ...
       private Queue<ScoreWrapper> queue;
       private int maxSize;

       public PriorityKaizenQueries(int size) {
           this.maxSize = size;
           queue = new PriorityQueue<>(size, scoreComparator);
       }

       @Override
       public boolean addQuery(String query, double score) {
           ScoreWrapper sw = new ScoreWrapper(query, score);
           if (this.queue.contains(sw)) {
               return false;
           }
           try {
               this.queue.add(sw);
               return true;
           } catch (IllegalStateException e) {
               return false;
           }
       }

       @Override
       public String getQuery() {
           return this.queue.poll().query;
       }
       ...
}

[SOURCE]

Conclusion

In this blog post, I discussed the process in which PriorityKaizen harvester was introduced to loklak. This strategy is a flavour of Kaizen harvester which uses a priority queue to store queries that are to be processed. These changes were possible because of a previous patch which allowed extending of Kaizen harvester.

The changes were introduced in pull request loklak/loklak#1240 by @singhpratyush (me).

Resources

Introducing Priority Kaizen Harvester for loklak server

Fetching URL for Embedded Twitter Videos in loklak server

The primary web service that loklak scrapes is Twitter. Being a news and social networking service, Twitter allows its users to post videos directly to Twitter and they convey more thoughts than what text can. But for an automated scraper, getting the links is not a simple task.

Let us see that what were the problems we faced with videos and how we solved them in the loklak server project.

Previous setup and embedded videos

In the previous version of loklak server, the TwitterScraper searched for videos in 2 ways –

  1. Youtube links
  2. HTML5 video links

To fetch the video URL from HTML5 video, following snippet was used –

if ((p = input.indexOf("<source video-src")) >= 0 && input.indexOf("type=\"video/") > p) {
   String video_url = new prop(input, p, "video-src").value;
   videos.add
   continue;
}

Here, input is the current line from raw HTML that is being processed and prop is a class defined in loklak that is useful in parsing HTML attributes. So in this way, the HTML5 videos were extracted.

The Problem – Embedded videos

Though the previous setup had no issues, it was useless as Twitter embeds the videos in an iFrame and therefore, can’t be fetched using simple HTML5 tag extraction.

If we take the following Tweet for example,

the requested HTML from the search page contains video in following format –

<src="https://twitter.com/i/videos/tweet/881946694413422593?embed_source=clientlib&player_id=0&rpc_init=1" allowfullscreen="" id="player_tweet_881946694413422593" style="width: 100%; height: 100%; position: absolute; top: 0; left: 0;">

So we needed to come up with a better technique to get those videos.

Parsing video URL from iFrame

The <div> which contains video is marked with AdaptiveMedia-videoContainer class. So if a Tweet has an iFrame containing video, it will also have the mentioned class.

Also, the source of iFrame is of the form https://twitter.com/i/videos/tweet/{Tweet-ID}. So now we can programmatically go to any Tweet’s video and parse it to get results.

Extracting video URL from iFrame source

Now that we have the source of iFrame, we can easily get the video source using the following flow –

public final static Pattern videoURL = Pattern.compile("video_url\\\":\\\"(.*?)\\\"");

private static String[] fetchTwitterIframeVideos(String iframeURL) {
   // Read fron iframeURL line by line into BufferReader br
   while ((line = br.readLine()) != null ) {
       int index;
       if ((index = line.indexOf("data-config=")) >= 0) {
           String jsonEscHTML = (new prop(line, index, "data-config")).value;
           String jsonUnescHTML = HtmlEscape.unescapeHtml(jsonEscHTML);
           Matcher m = videoURL.matcher(jsonUnescHTML);
           if (!m.find()) {
               return new String[]{};
           }
           String url = m.group(1);
           url = url.replace("\\/", "/");  // Clean URL
           /*
            * Play with url and return results
            */
       }
   }
}

MP4 and M3U8 URLs

If we encounter mp4 URLs, we’re fine as it is the direct link to video. But if we encounter m3u8 URL, we need to process it further before we can actually get to the videos.

For Twitter, the hosted m3u8 videos contain link to further m3u8 videos which are of different resolution. These m3u8 videos again contain link to various .ts files that contain actual video in parts of 3 seconds length each to support better streaming experience on the web.

To resolve videos in such a setup, we need to recursively parse m3u8 files and collect all the .ts videos.

private static String[] extractM3u8(String url) {
   return extractM3u8(url, "https://video.twimg.com/");
}

private static String[] extractM3u8(String url, String baseURL) {
   // Read from baseURL + url line by line
   while ((line = br.readLine()) != null) {
       if (line.startsWith("#")) {  // Skip comments in m3u8
           continue;
       }
       String currentURL = (new URL(new URL(baseURL), line)).toString();
       if (currentURL.endsWith(".m3u8")) {
           String[] more = extractM3u8(currentURL, baseURL);  // Recursively add all
           Collections.addAll(links, more);
       } else {
           links.add(currentURL);
       }
   }
   return links.toArray(new String[links.size()]);
}

And then in fetchTwitterIframeVideos, we can return the all .ts URLs for the video –

if (url.endsWith(".mp4")) {
   return new String[]{url};
} else if (url.endsWith(".m3u8")) {
   return extractM3u8(url);
}

Putting things together

Finally, the TwitterScraper can discover the video links by tweaking a little –

if (input.indexOf("AdaptiveMedia-videoContainer") > 0) {
   // Fetch Tweet ID
   String tweetURL = props.get("tweetstatusurl").value;
   int slashIndex = tweetURL.lastIndexOf('/');
   if (slashIndex < 0) {
       continue;
   }
   String tweetID = tweetURL.substring(slashIndex + 1);
   String iframeURL = "https://twitter.com/i/videos/tweet/" + tweetID;
   String[] videoURLs = fetchTwitterIframeVideos(iframeURL);
   Collections.addAll(videos, videoURLs);
}

Conclusion

This blog post explained the process of extracting video URL from Twitter and the problem faced. The discussed change enabled loklak to extract and serve URLs to video for tweets. It was introduced in PR loklak/loklak_server#1193 by me (@singhpratyush).

The service was further enhanced to collect single mp4 link for videos (see PR loklak/loklak_server#1206), which is discussed in another blog post.

Resources

Fetching URL for Embedded Twitter Videos in loklak server

Implementing Loklak APIs in Java using Reflections

Loklak server provides a large API to play with the data scraped by it. Methods in java can be implemented to use these API endpoints. A common approach of implementing the methods for using API endpoints is to create the request URL by taking the values passed to the method, and then send GET/POST request. Creating the request URL in every method can be tiresome and in the long run maintaining the library if implemented this way will require a lot of effort. For example, assume a method is to be implemented for suggest API endpoint, which has many parameters, for creating request URL a lot of conditionals needs to be written – whether a parameter is provided or not.

Well, the methods to call API endpoints can be implemented with lesser and easy to maintain code using Reflection in Java. The post ahead elaborates the problem, the approach to solve the problem and finally solution which is implemented in loklak_jlib_api.

Let’s say, the status API endpoint needs to be implemented, a simple approach can be:

public class LoklakAPI {
    
    public static String status(String baseUrl) {
        String requestUrl = baseUrl   "/api/status.json";
        
        // GET request using requestUrl 
    }

    public static void main(String[] argv) {
        JSONObject result = status("https://api.loklak.org");
    }
}

This one is easy, isn’t it, as status API endpoint requires no parameters. But just imagine if a method implements an API endpoint that has a lot of parameters, and most of them are optional parameters. As a developer, you would like to provide methods that cover all the parameters of the API endpoint. For example, how a method would look like if it implements suggest API endpoint, the old SuggestClient implementation in loklak_jlib_api does that:

public static ResultList<QueryEntry> suggest(
           final String hostServerUrl,
           final String query,
           final String source,
           final int count,
           final String order,
           final String orderBy,
           final int timezoneOffset,
           final String since,
           final String until,
           final String selectBy,
           final int random) throws JSONException, IOException {
       ResultList<QueryEntry>  resultList = new ResultList<>();
       String suggestApiUrl = hostServerUrl
               + SUGGEST_API
               + URLEncoder.encode(query.replace(' ', '+'), ENCODING)
               + PARAM_TIMEZONE_OFFSET + timezoneOffset
               + PARAM_COUNT + count
               + PARAM_SOURCE + (source == null ? PARAM_SOURCE_VALUE : source)
               + (order == null ? "" : (PARAM_ORDER + order))
               + (orderBy == null ? "" : (PARAM_ORDER_BY + orderBy))
               + (since == null ? "" : (PARAM_SINCE + since))
               + (until == null ? "" : (PARAM_UNTIL + until))
               + (selectBy == null ? "" : (PARAM_SELECT_BY + selectBy))
               + (random < 0 ? "" : (PARAM_RANDOM + random))
               + PARAM_MINIFIED + PARAM_MINIFIED_VALUE;
 
                // GET request using suggestApiUrl
   }
}

A lot of conditionals!!! The targeted users may also get irritated if they need to provide all the parameters every time even if they don’t need them. The obvious solution to that is overloading the methods. But,  then again for each overloaded method, the same repetitive conditionals need to be written, a form of code duplication!! And what if you have to implement some 30 API endpoints and in future maintaining them, a single thought of it is scary and a nightmare for the developers.

Approach to the problem

To reduce the code size, one obvious thought that comes is to implement static methods that send GET/POST requests provided request URL and the required data. These methods are implemented in loklak_jlib_api by JsonIO and NetworkIO.

You might have noticed the method name i.e. status, suggest is also the part of the request URL. So, the common pattern that we get is:

base_url + "/api/" + methodName + ".json" + "?" + firstParameterName + "=" + firstParameterValue + "&" + secondParameterName + "=" + secondParameterValue ...

Reflection API to rescue

Using Reflection API inspection of classes, interfaces, methods, and variables can be done, yes you guessed it right if we can inspect we can get the method and parameter names. It is also possible to implement interfaces and create objects at runtime.

So, an interface is created and API endpoint methods are defined in it. Using reflection API the interface is implemented lazily at runtime. LoklakAPI interface defines all the API endpoint methods. Some of the defined methods are:

public interface LoklakAPI {
 
    @Target(ElementType.METHOD)
    @Retention(RetentionPolicy.RUNTIME)
    @interface GET {}
 
    @Target(ElementType.METHOD)
    @Retention(RetentionPolicy.RUNTIME)
    @interface POST {}
 
    @GET
    JSONObject search(String q);
 
    @GET
    JSONObject search(String q, int count);
 
    @GET
    JSONObject search(String q, int timezoneOffset, String since, String until);
 
    @GET
    JSONObject search(String q, int timezoneOffset, String since, String until, int count);
 
    @GET
    JSONObject peers();
 
    @GET
    JSONObject hello();
 
    @GET
    JSONObject status();
 
    @POST
    JSONObject push(JSONObject data);
}

Here GET and POST annotations are used to mark the methods which use GET and POST request respectively, so that appropriate method for GET and POST static method can be used JsonIOloadJson for GET request and pushJson for POST request.

A private static inner class ApiInvocationHandler is created in APIGenerator, which implements InvocationHandler – used for implementing interfaces at runtime.

private static class ApiInvocationHandler implements InvocationHandler {
 
   private String mBaseUrl;
 
   public ApiInvocationHandler(String baseUrl) {
       this.mBaseUrl = baseUrl;
   }
 
   @Override
   public Object invoke(Object o, Method method, Object[] values) throws Throwable {
       Parameter[] params = method.getParameters();
       Object[] paramValues = values;
 
       /*
       format of annotation name:
       @packageWhereAnnotationIsDeclared$AnnotationName()
       Example: @org.loklak.client.LoklakAPI$GET()
       */
       Annotation annotation = method.getAnnotations()[0];
       String annotationName = annotation.toString().toLowerCase();
 
       String apiUrl = createGetRequestUrl(mBaseUrl, method.getName(),
               params, paramValues);
       if (annotationName.contains("get")) { // GET REQUEST
           return loadJson(apiUrl);
       } else { // POST REQUEST
           JSONObject jsonObjectToPush = (JSONObject) paramValues[0];
           String postRequestUrl = createPostRequestUrl(mBaseUrl, method.getName());
           return pushJson(postRequestUrl, params[0].getName(), jsonObjectToPush);
       }
   }
}

The base URL is provided while creating the object, in the constructor. The invoke method is called whenever a defined method in the interface is called in actual code. This way an interface is implemented at runtime. The parameters of invoke method:

  • Object o – the object on which to call the method.
  • Method method – method which is called, parameters and annotations of the method are obtained using getParameters and getAnnotations methods respectively which are required to create our request url.
  • Object[] values – an array of parameter values which are passed to the method when calling it.

createGetRequestUrl is used to create the request URL for sending GET requests.

private static String createGetRequestUrl(
       String baseUrl, String methodName, Parameter[] params, Object[] paramValues) {
   String apiEndpointUrl = baseUrl.replaceAll("/+$", "") + "/api/" + methodName + ".json";
   StringBuilder url = new StringBuilder(apiEndpointUrl);
   if (params.length > 0) {
       String queryParamAndVal = "?" + params[0].getName() + "=" + paramValues[0];
       url.append(queryParamAndVal);
       for (int i = 1; i < params.length; i++) {
           String paramAndVal = "&" + params[i].getName()
                   + "=" + String.valueOf(paramValues[i]);
           url.append(paramAndVal);
       }
   }
   return url.toString();
}

Similarly, createPostRequestUrl is used to create request URL for sending POST requests.

private static String createPostRequestUrl(String baseUrl, String methodName) {
   baseUrl = baseUrl.replaceAll("/+$", "");
   return baseUrl + "/api/" + methodName + ".json";
}

Finally, Proxy.newProxyInstance is used to implement the interface at runtime. An instance of ApiInvocationHandler class is passed to the newProxyInstance method. This is all done by static method createApiMethods.

public static <T> T createApiMethods(Class<T> service, final String baseUrl) {
   ApiInvocationHandler apiInvocationHandler = new ApiInvocationHandler(baseUrl);
   return (T) Proxy.newProxyInstance(
           service.getClassLoader(), new Class<?>[]{service}, apiInvocationHandler);
}

Where:

  • Class<T> service – is the interface where API endpoint methods are defined, here LoklakAPI.class.
  • String baseUrl – web address of the server where Loklak server is hosted.

NOTE: For all this to work the library must be build using “-parameters” flag, else parameter names can’t be obtained. Since loklak_jlib_api uses maven, “-parameters” is provided in pom.xml file.

A small example:

String baseUrl = "https://api.loklak.org";
LoklakAPI loklakAPI = APIGenerator.createApiMethods(LoklakAPI.class, baseUrl);
 
JSONObject searchResult = loklakAPI.search("FOSSAsia");
Implementing Loklak APIs in Java using Reflections