Indexing for multiscrapers in Loklak Server

I recently added multiscraper system which can scrape data from web-scrapers like YoutubeScraper, QuoraScraper, GithubScraper, etc. As scraping is a costly task, it is important to improve it’s efficiency. One of the approach is to index data in cache. TwitterScraper uses multiple sources to optimize the efficiency.

This system uses Post message holder object to store data and PostTimeline (a specialized iterator) to iterate the data objects. This difference in data structures from TwitterScraper leads to the need of different approach to implement indexing of data to ElasticSearch (currently in review process).

These are the following changes I made while implementing ‘indexing of data’ in the project.

1) Writing of data is invoked only using PostTimeline iterator

In TwitterScraper, the data is written in message holder TwitterTweet. So all the tweets are written to index as they are created. Here, when the data is scraped, Writing of the posts is initiated. Scraping of data is considered a heavy process. This approach keeps lower resource usage in average traffic on the server.

protected Post putData(Post typeArray, String key, Timeline2 postList) {
   if(!"cache".equals(this.source)) {
       postList.writeToIndex();
   }
   return this.putData(typeArray, key, postList.toArray());
}

2) One object for holding a message

During the implementation, I kept the same message holder Post and post-iterator PostTimeline from scraping to indexing of data. This helps to keep the structure uniform. Earlier approach involves different types of message wrappers in the way. This approach cuts the processes for looping and transitioning of data structures.

3) Index a list, not a message

In TwitterScraper, as the messages are enqueued in the bulk to be indexed. But in this approach, I have enqueued the complete lists. This approach delays the indexing till the scraper is done with processing the html.

Creating the queue of postlists:

// Add post-lists to queue to be indexed
queueClients.incrementAndGet();
try {
    postQueue.put(postList);
} catch (InterruptedException e) {
DAO.severe(e);
}
queueClients.decrementAndGet();

 

Indexing of the posts in postlists:

// Start indexing of data in post-lists
for (Timeline2 postList: postBulk) {
    if (postList.size() < 1) continue;
    if(postList.dump) {
        // Dumping of data in a file
        writeMessageBulkDump(postList);
    }
    // Indexing of data to ElasticSearch
    writeMessageBulkNoDump(postList);
}

 

4) Categorizing the input parameters

While searching the index, I have divided the query parameters from scraper into 3 categories. The input parameters are added to those categories (implemented using map data structure) and thus data fetched are according to them. These categories are:

// Declaring the QueryBuilder
BoolQueryBuilder query = new BoolQueryBuilder();

 

a) Get the parameter– Get the results for the input fields in map getMap.

// Result must have these fields. Acts as AND operator
if(getMap != null) {
    for(Map.Entry<String, String> field : getMap.entrySet()) {
        query.must(QueryBuilders.termQuery(
field.getKey(), field.getValue()));
    }
}

 

b) Don’t get the parameter- Don’t get the results for the input fields in map notGetMap.

// Result must not have these fields.
if(notGetMap != null) {
    for(Map.Entry<String, String> field : notGetMap.entrySet()) {
        query.mustNot(QueryBuilders.termQuery(
                field.getKey(), field.getValue()));
    }
}

 

c) Get if possible- Get the results with the input fields if they are present in the index.

// Result may preferably also get these fields. Acts as OR operator
if(mayAlsoGetMap != null) {
    for(Map.Entry<String, String> field : mayAlsoGetMap.entrySet()) {
        query.should(QueryBuilders.termQuery(
                field.getKey(), field.getValue()));

    }
}

 

By applying these changes, the scrapers are shifted from a message indexing to list of messages indexing. This way we are keeping load on RAM low, but the aggregation of latest scraped data may be affected. So there will be a need to workaround to solve this issue while scraping itself.

References

Indexing for multiscrapers in Loklak Server

Setting Loklak Server with SSL

Loklak Server is based on embedded Jetty Server which can work both with or without SSL encryption. Lately, there was need to setup Loklak Server with SSL. Though the need was satisfied by CloudFlare. Alternatively, there are 2 ways to set up Loklak Server with SSL. They are:-

1) Default Jetty Implementation

There is pre-existing implementation of Jetty libraries. The http mode can be set in configuration file. There are 4 modes on which Loklak Server can work: http mode, https mode, only https mode and redirect to https mode. Loklak Server listens to port 9000 when in http mode and to port 9443 when in https mode.

There is also a need of SSL certificate which is to be added in configuration file.

2) Getting SSL certificate with Kube-Lego on Kubernetes Deployment

I got to know about Kube-Lego by @niranjan94. It is implemented in Open-Event-Orga-Server. The approach is to use:

a) Nginx as ingress controller

For setting up Nginx ingress controller, a yml file is needed which downloads and configures the server.

The configurations for data requests and response are:

proxy-connect-timeout: "15"
 proxy-read-timeout: "600"
 proxy-send-imeout: "600"
 hsts-include-subdomains: "false"
 body-size: "64m"
 server-name-hash-bucket-size: "256"
 server-tokens: "false"

Nginx is configured to work on both http and https ports in service.yml

ports:
- port: 80
  name: http
- port: 443
  name: https

 

b) Kube-Lego for fetching SSL certificates from Let’s Encrypt

Kube-Lego was set up with default values in yml. It uses the host-name, email address and secretname of the deployment to validate url and fetch SSL certificate from Let’s Encrypt.

c) Setup configurations related to TLS and no-TLS connection

These configuration files mentions the path and service ports for Nginx Server through which requests are forwarded to backend Loklak Server. Here for no-TLS and TLS requests, the requests are directly forwarded to localhost at port 80 of Loklak Server container.

rules:
- host: staging.loklak.org
  http:
  paths:
  - path: /
    backend:
    serviceName: server
    servicePort: 80

For TLS requests, the secret name is also mentioned. Kube-Lego fetches host-name and secret-name from here for the certificate

tls:
- hosts:
- staging.loklak.org
  secretName: loklak-api-tls

d) Loklak Server, ElasticSearch and Mosquitto at backend

These containers work at backend. ElasticSearch and Mosquitto are only accessible to Loklak Server. Loklak Server can be accessed through Nginx server. Loklak Server is configured to work at http mode and is exposed at port 80.

ports:
- port: 80
  protocol: TCP
  targetPort: 80

To deploy the Loklak Server, all these are deployed in separate pods and they interact through service ports. To deploy, we use deploy script:

# For elasticsearch, accessible only to api-server
kubectl create -R -f ${path-to-config-file}/elasticsearch/

# For mqtt, accessible only to api-server
kubectl create -R -f ${path-to-config-file}/mosquitto/

# Start KubeLego deployment for TLS certificates
kubectl create -R -f ${path-to-config-file}/lego/
kubectl create -R -f ${path-to-config-file}/nginx/

# Create web namespace, this acts as bridge to Loklak Server
kubectl create -R -f ${path-to-config-file}/web/

# Create API server deployment and expose the services
kubectl create -R -f ${path-to-config-file}/api-server/

# Get the IP address of the deployment to be used
kubectl get services --namespace=nginx-ingress

References

Setting Loklak Server with SSL

Backend Scraping in Loklak Server

Loklak Server is a peer-to-peer Distributed Scraping System. It scrapes data from websites and also maintain other sources like peers, storage and a backend server to scrape data. Maintaining different sources has it’s benefits of not engaging in costly requests to the websites, no scraping of data and no cleaning of data.

Loklak Server can maintain a secondary Loklak Server (or a backend server) tuned for storing large amount of data. This enables the primary Loklak Server fetch data in return of pushing all scraped data to the backend.

Lately there was a bug in backend search as a new feature of filtering tweets was added to scraping and indexing, but not for backend search. To fix this issue, I had backtracked the backend search codebase and fix it.

Let us discuss how Backend Search works:-

1) When query is made from search endpoint with:

a) source=all

When source is set to all. The first TwitterScraper and Messages from local search server is preferred. If the messages scraped are not enough or no output has been returned for a specific amount of time, then, backend search is initiated

b) source=backend

SearchServlet specifically scrapes directly from backend server.

2) Fetching data from Backend Server

The input parameters fetched from the client is feeded into DAO.searchBackend method. The list of backend servers fetched from config file. Now using these input parameters and backend servers, the required data is scraped and output to the client.

In DAO.searchOnOtherPeers method. the request is sent to multiple servers and they are arranged in order of better response rates. This method invokes SearchServlet.search method for sending request to the mentioned servers.

List<String> remote = getBackendPeers();
if (remote.size() > 0) {
    // condition deactivated because we need always at least one peer
    Timeline tt = searchOnOtherPeers(remote, q, filterList, order, count, timezoneOffset, where, SearchServlet.backend_hash, timeout);
    if (tt != null) tt.writeToIndex();
    return tt;
}

 

3) Creation of request url and sending requests

The request url is created according to the input parameters passed to SearchServlet.search method. Here the search url is created according to input parameters and request is sent to the respective servers to fetch the required messages.

   // URL creation
    urlstring = protocolhostportstub + "/api/search.json?q="
           + URLEncoder.encode(query.replace(' ', '+'), "UTF-8") + "&timezoneOffset="
           + timezoneOffset + "&maximumRecords=" + count + "&source="
           + (source == null ? "all" : source) + "&minified=true&shortlink=false&timeout="
           + timeout;
    if(!"".equals(filterString = String.join(", ", filterList))) {
       urlstring = urlstring + "&filter=" + filterString;
    }
    // Download data
    byte[] jsonb = ClientConnection.downloadPeer(urlstring);
    if (jsonb == null || jsonb.length == 0) throw new IOException("empty content from " + protocolhostportstub);
    String jsons = UTF8.String(jsonb);
    JSONObject json = new JSONObject(jsons);
    if (json == null || json.length() == 0) return tl;
    // Final data fetched to be returned
    JSONArray statuses = json.getJSONArray("statuses");

References

Backend Scraping in Loklak Server

Configuring Youtube Scraper with Search Endpoint in Loklak Server

Youtube Scraper is one of the interesting web scrapers of Loklak Server with unique implementation of its data scraping and data key creation (using RDF). It couldn’t be accessed as it didn’t have any url endpoint. I configured it to use both as separate endpoint (api/youtubescraper) and search endpoint (/api/search.json).

Usage:

  1. YoutubeScraper Endpoint: /api/youtubescraperExample:http://api.loklak.org/api/youtubescraper?query=https://www.youtube.com/watch?v=xZ-m55K3FhQ&scraper=youtube
  2. SearchServlet Endpoint: /api/search.json

Example: http://api.loklak.org/api/search.json?query=https://www.youtube.com/watch?v=xZ-m55K3FhQ&scraper=youtube

The configurations added in Loklak Server are:-

1) Endpoint

We can access YoutubeScraper using endpoint /api/youtubescraper endpoint. Like other scrapers, I have used BaseScraper class as superclass for this functionality .

2) PrepareSearchUrl

The prepareSearchUrl method creates youtube search url that is used to scrape Youtube webpage. YoutubeScraper takes url as input. But youtube link could also be a shortened link. That is why, the video id is stored as query. This approach optimizes the scraper and adds the capability to add more scrapers to it.

Currently YoutubeScraper scrapes the video webpages of Youtube, but scrapers for search webpage and channel webpages can also be added.

URIBuilder url = null;
String midUrl = "search/";
    try {
       switch(type) {
           case "search":
               midUrl = "search/";
               url = new URIBuilder(this.baseUrl + midUrl);
               url.addParameter("search_query", this.query);
               break;
           case "video":
               midUrl = "watch/";
               url = new URIBuilder(this.baseUrl + midUrl);
               url.addParameter("v", this.query);
               break;
           case "user":
               midUrl = "channel/";
               url = new URIBuilder(this.baseUrl + midUrl + this.query);
               break;
           default:
               url = new URIBuilder("");
               break;
       }
    } catch (URISyntaxException e) {
       DAO.log("Invalid Url: baseUrl = " + this.baseUrl + ", mid-URL = " + midUrl + "query = " + this.query + "type = " + type);
       return "";
    }

 

3) Get-Data-From-Connection

The getDataFromConnection method is used to fetch Bufferedreader object and input it to scrape method. In YoutubeScraper, this method has been overrided to prevent using default method implementation i.e. use type=all

@Override
public Post getDataFromConnection() throws IOException {
    String url = this.prepareSearchUrl(this.type);
    return getDataFromConnection(url, this.type);
}

 

4) Set scraper parameters input as get-parameters

The Map data-structure of get-parameters fetched by scraper fetches type and query. For URL, the video hash-code is separated from url and then used as query.

this.query = this.getExtraValue("query");
this.query = this.query.substring(this.query.length() - 11);

 

5) Scrape Method

Scrape method runs the different scraper methods (in YoutubeScraper, there is only one), iterate it using PostTimeline and wraps in Post object to the output. This simple function can improve flexibility of scraper to scrape different pages concurrently.

Post out = new Post(true);
Timeline2 postList = new Timeline2(this.order);
postList.addPost(this.parseVideo(br, type, url));
out.put("videos", postList.toArray());

 

References

Configuring Youtube Scraper with Search Endpoint in Loklak Server

Scraping Concurrently with Loklak Server

At Present, SearchScraper in Loklak Server uses numerous threads to scrape Twitter website. The data fetched is cleaned and more data is extracted from it. But just scraping Twitter is under-performance.

Concurrent scraping of other websites like Quora, Youtube, Github, etc can be added to diversify the application. In this way, single endpoint search.json can serve multiple services.

As this Feature is under-refinement, We will discuss only the basic structure of the system with new changes. I tried to implement more abstract way of Scraping by:-

1) Fetching the input data in SearchServlet

Instead of selecting the input get-parameters and referencing them to be used, Now complete Map object is referenced, helping to be able to add more functionality based on input get-parameters. The dataArray object (as JSONArray) is fetched from DAO.scrapeLoklak method and is embedded in output with key results

    // start a scraper
    inputMap.put("query", query);
    DAO.log(request.getServletPath() + " scraping with query: "
           + query + " scraper: " + scraper);
    dataArray = DAO.scrapeLoklak(inputMap, true, true);

 

2) Scraping the selected Scrapers concurrently

In DAO.java, the useful get parameters of inputMap are fetched and cleaned. They are used to choose the scrapers that shall be scraped, using getScraperObjects() method.

Timeline2.Order order= getOrder(inputMap.get("order"));
Timeline2 dataSet = new Timeline2(order);
List<String> scraperList = Arrays.asList(inputMap.get("scraper").trim().split("\\s*,\\s*"));

 

Threads are created to fetch data from different scrapers according to size of list of scraper objects fetched. input map is passed as argument to the scrapers for further get parameters related to them and output data according to them.

List<BaseScraper> scraperObjList = getScraperObjects(scraperList, inputMap);
ExecutorService scraperRunner = Executors.newFixedThreadPool(scraperObjList.size());

try{
    for (BaseScraper scraper : scraperObjList)
    {
        scraperRunner.execute(() -> {
            dataSet.mergePost(scraper.getData());
        });

    }

} finally {
    scraperRunner.shutdown();

    try {
        scraperRunner.awaitTermination(24L, TimeUnit.HOURS);
    } catch (InterruptedException e) { }
}

 

3) Fetching the selected Scraper Objects in DAO.java

Here the variable of abstract class BaseScraper (SuperClass of all search scrapers) is used to create List of scrapers to be scraped. All the scrapers’ constructors are fed with input map to be scraped accordingly.

List<BaseScraper> scraperObjList = new ArrayList<BaseScraper>();
BaseScraper scraperObj = null;

if (scraperList.contains("github") || scraperList.contains("all")) {
    scraperObj = new GithubProfileScraper(inputMap);
    scraperObjList.add(scraperObj);
}
.
.
.

 

References:

Scraping Concurrently with Loklak Server

Data Indexing in Loklak Server

Loklak Server is a data-scraping system that indexes all the scraped data for the purpose to optimize it. The data fetched by different users is stored as cache. This helps in retrieving of data directly from cache for recurring queries. When users search for the same queries, load on Loklak Server is reduced by outputting indexed data, thus optimizing the operations.

Application

It is dependent on ElasticSearch for indexing of cached data (as JSON). The data that is fetched by different users is stored as cache. This helps in fetching data directly from cache for same queries. When users search for the same queries, load on Loklak Server is reduced and it is optimized by outputting indexed data instead of scraping the same date again.

When is data indexing done?

The indexing of data is done when:

1) Data is scraped:

When data is scraped, data is indexed concurrently while cleaning of data in TwitterTweet data object. For this task, addScheduler static method of IncomingMessageBuffer is used, which acts as

abstract between scraping of data and storing and indexing of data.

The following is the implementation from TwitterScraper (from here). Here writeToIndex is the boolean input to whether index the data or not.

if (this.writeToIndex) IncomingMessageBuffer.addScheduler(this, this.user, true);

2) Data is fetched from backend:

When data is fetched from backend, it is indexed in Timeline iterator. It calls the above method to index data concurrently.

The following is the definition of writeToIndex() method from Timeline.java (from here). When writeToIndex() is called, the fetched data is indexed.

public void writeToIndex() {
    IncomingMessageBuffer.addScheduler(this, true);
}

How?

When addScheduler static method of IncomingMessageBuffer is called, a thread is started that indexes all data. When the messagequeue data structure is filled with some messages, indexing continues.

See here . The DAO method writeMessageBulk is called here to write data. The data is then written to the following streams:

1) Dump: The data fetched is dumped into Import directory in a file. It can also be fetched from other peers.

2) Index: The data fetched is checked if it exists in the index and data that isn’t indexed is indexed.

public static Set<String> writeMessageBulk(Collection<MessageWrapper> mws) {
    List<MessageWrapper> noDump = new ArrayList<>();
    List<MessageWrapper> dump = new ArrayList<>();
    for (MessageWrapper mw: mws) {
        if (mw.t == null) continue;
        if (mw.dump) dump.add(mw);
        else noDump.add(mw);
    }

    Set<String> createdIDs = new HashSet<>();
    createdIDs.addAll(writeMessageBulkNoDump(noDump));
    createdIDs.addAll(writeMessageBulkDump(dump));

    // Does also do an writeMessageBulkNoDump internally
    return createdIDs;
}

 

The above code snippet is from DAO.java, method calls writeMessageBulkNoDump(noDump) indexes the data to ElasticSearch. The definition of this method can be seen here

Whereas for dumping of data writeMessageBulkDump(Dump) is called. It is defined here

Resources:

Data Indexing in Loklak Server

Some Other Services in Loklak Server

Loklak Server isn’t just a scraper system software, it provides numerous other services to perform other interesting functions like Link Unshortening (reverse of link shortening) and video fetching and administrative tasks like status fetching of the Loklak deployment (for analysis in Loklak development use) and many more. Some of these are internally implemented and rest can be used through http endpoints. Also there are some services which aren’t complete and are in development stage.

Let’s go through some of them to know a bit about them and how they can be used.

1) VideoUrlService

This is the service to extract video from the website that has a streaming video and output the video file link. This service is in development stage and is functional. Presently, It can fetch twitter video links and output them with different video qualities.

Endpoint: /api/videoUrlService.json

Implementation Example:

curl api/loklak.org/api/videoUrlService.json?id=https://twitter.com/EXOGlobal/status/886182766970257409&id=https://twitter.com/KMbappe/status/885963850708865025

2) Link Unshortening Service

This is the service used to unshorten the link. There are shortened URLs which are used to track the Internet Users by Websites. To prevent this, link unshortening service unshortens the link and returns the final untrackable link to the user.

Currently this service is in application in TwitterScraper to unshorten the fetched URLs. It has other methods to get Redirect Link and also a link to get final URL from multiple unshortened link.

Implementation Example from TwitterScraper.java [LINK]:

Matcher m = timeline_link_pattern.matcher(text);

if (m.find()) {
    String expanded = RedirectUnshortener.unShorten(m.group(2));
    text = m.replaceFirst(" " + expanded);
    continue;
}

 

Further it can be used to as a service and can be used directly. New features like fetching featured image from links can be added to this service. Though these stuff are in discussion and enthusiastic contribution is most welcomed.

3) StatusService

This is a service that outputs all data related to to Loklak Server deployment’s configurations. To access this configuration, api endpoint status.json is used.

It outputs the following data:

a) About the number of messages it scrapes in an interval of a second, a minute, an hour, a day, etc.

b) The configuration of the server like RAM, assigned memory, used memory, number of cores of CPU, cpu load, etc.

c) And other configurations related to the application like size of ElasticSearch shards size and their specifications, client request header, number of running threads, etc.

Endpoint: /api/status.json

Implementation Example:

curl api/loklak.org/api/status.json

Resources:

 

Some Other Services in Loklak Server