Search This Blog

Friday, October 16, 2020

Design a Web Crawler Using 10k Hacked Machines

Recently, a friend of mine asked me a Facebook system design question: design a web crawler.  The requirements are: 

  1. One billion URLs from 1 seed URL,
  2. No duplicated crawling,
  3. Distribute the load evenly,
  4. 10k hacked machines,
  5. Minimize the communication between machines.

Before we start to answer this question, let's see how a standard crawler is supposed to work.  Usually, crawler systems are centralized, which means some components are shared by all crawler machine.  For example, a simplified crawler system is like below:


URL Frontier is usually implemented with message queue and a centralized database is used for URL de-duplication.  The above system is a typical centralized crawler.

Let's revisit the requirements: #1 and #2 are pretty clear and easy to understand.  What do requirements #4 and #5 really mean?  Sometimes, the requirement #4 can be expressed as below:

4a. All machines are deployed with same software.

Requirement #4 actually implies that you are not suppose to design a centralized crawler system.  With hacked machines, you are not able to set up a centralized database or a message queue.  With this mind, we need to design a decentralized crawler system.  Requirement 4a makes it even clearer: every machine has the same software which means every machine does the exact same job.  You won't be able to have a dedicated database or message queue.

What does requirement #5 mean? This requirement implies what crawler assignment strategy to use.  A centralized crawler system can use a central system (URL Frontier in above graph) to determine URL assignment based on some predefined strategy.  In contrast, a decentralized crawler system has no central point of control.  Each crawler knows the assignment strategy and will assign the URLs to the correct crawlers responsible for crawling.

One of the criteria to evaluate assignment strategies is communication overhead [1].  It is defined as follows:

Communication overhead is the total number of pages downloaded by the system divided by the number of URLs that the crawlers have exchanged. The smaller the communication overhead is, the better the assignment strategy is.

With the centralized crawler, one URL will be transfer a few time before downloading the page: The crawler first sends the URL to database to check if it is crawled or not, if not, crawler sends it to the message queue, then one crawler consumes it from the queue to start downloading. So to download one page, the same URL is transferred at least 3 times within the system.

Another criteria is balancing: Each crawler should be assigned to the approximately same number of URLs, which essentially is the requirement #3.

With above information, we know we will design a decentralized crawler system.  Then the issue becomes finding an appropriate assignment strategy.  The easiest assignment strategy is to mod the URLs to the corresponding machines assuming 10k machines are always up.  Below is the design:

The key point of the above design is each machine does exactly the same job.

Here is the flow:

  1. The URL Frontier receives the URLs. It checks if the URLs are in the cache.  If not, it adds the URLs to queue, and the cache.
  2. Crawler consumes the messages, downloads HTML, extracts URLs from documents, and hash URLs and assigns them to corresponding machines using mod.

With the above design, URLs are evenly distributed within 10k machines, and for 1 page download, only 1 transfer of URL between machines.  The communication between machines are minimized.  The in memory HashSet guarantees no duplicate crawl.  So all requirements are met.

The above design transfer URLs immediately after they are discovered.  The crawler can wait for a while then send URLs in a batch, which further reduces the communication overhead.  For example, dedup can be implemented before assignment.

One assumption we made is that 10k machines are always up so we can use mod to hash URLs.  What if this assumption doesn't hold? The above design still works but we need to use a different hashing algorithm: consistent hashing.  If you don't know what it is, please visit wiki.

However, with consistent hashing, no duplicate crawl can't be maintained.  The reason is the info of crawled URLs in memory cache is gone.  You need to discuss it with your interviewer.


References

[1]: Junghoo Cho, Hector Garcia-Molina. Parallel Crawlers. WWW2002, May 7-11, 2002, Honolulu, Hawaii, USA.

Monday, October 12, 2020

Micrometer with Dynamic Tags

 With Micrometer, it is easy to create a counter with Builder:

Counter okCounter = Counter.builder("http.status.count")
.tag("status", "200")
.description("The number of OK count")
.register(meterRegistry);
Counter notFoundCounter = Counter.builder("http.status.count")
.tag("status", "404")
.description("The number of not found count")
.register(meterRegistry);

However, it is not feasible to create counter for every HTTP status.  We can use Java HashMap to cache the creation dynamically by leveraging the method computeIfAbsent

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import java.util.HashMap;
import java.util.Map;
import org.springframework.http.HttpStatus;

public class HttpStatusCounter {
private static final String HTTP_STATUS_COUNT = "http.status.count";
private final MeterRegistry meterRegistry;
private Map<Integer, Counter> httpStatusCounterMap;

public HttpStatusCounter(MeterRegistry meterRegistry) {
httpStatusCounterMap = new HashMap<>();
this.meterRegistry = meterRegistry;
}

public void increment(HttpStatus status, int amount) {
Counter statusCounter = httpStatusCounterMap.computeIfAbsent(
status.value(), k -> Counter.builder(HTTP_STATUS_COUNT)
.tag("status", String.valueOf(status.value()))
.register(meterRegistry));
statusCounter.increment(amount);
}
}

Friday, September 4, 2020

Elasticsearch with Geopoint

Install Elasticsearch on Mac

Install the tap

brew tap elastic/tap

Install Elasticsearch

brew install elastic/tap/elasticsearch-full

Start Elasticsearch

elasticsearch

Confirm the installation works

curl http://localhost:9200

Something similar to the following should be shown:

{
  "name" : "MACC02Y753HJGH5",
  "cluster_name" : "elasticsearch_zhentao.li",
  "cluster_uuid" : "mkwsHWW8SoCcPzXg_KOyuQ",
  "version" : {
    "number" : "7.9.1",
    "build_flavor" : "default",
    "build_type" : "tar",
    "build_hash" : "083627f112ba94dffc1232e8b42b73492789ef91",
    "build_date" : "2020-09-01T21:22:21.964974Z",
    "build_snapshot" : false,
    "lucene_version" : "8.6.2",
    "minimum_wire_compatibility_version" : "6.8.0",
    "minimum_index_compatibility_version" : "6.0.0-beta1"
  },
  "tagline" : "You Know, for Search"
}

Search by Geo Distance

Define the index for Geo Type

put http://localhost:9200/regions
{
"mappings": {
"properties": {
"name": {
type: "text"
},
"location": {
"type": "geo_point"
}
}
}
}

With the following response:

{
"acknowledged": true,
"shards_acknowledged": true,
"index": "regions"
}

Download Geo data file

curl https://restcountries.eu/rest/v1/all | jq -c '.[] | {"index": {"_index": "regions", "_id": .alpha3Code}}, {name: .name, location: [.latlng[1], .latlng[0]] }' > regions

or download it from here.

Import the file

curl -H 'Content-Type: application/json'  -s -XPUT localhost:9200/_bulk --data-binary "@regions"

List all regions

http://localhost:9200/regions/_search?size=100
GET /regions/_search
{
"size" : 100,
"query": {
"match_all": {}
}
}

Filter by Geo distance

{
"size": 50,
"query": {
"bool": {
"must": {
"match_all": {}
},
"filter": {
"geo_distance": {
"distance": "3000km",
"location": {
"lat": 35,
"lon": 105
}
}
}
}
},
"sort": [
{
"_geo_distance": {
"location": {
"lat": 35,
"lon": 105
},
"order": "asc",
"unit": "km",
"distance_type": "arc"
}
}
]
}

The result is like below

{
"took": 15,
"timed_out": false,
"_shards": {
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 19,
"relation": "eq"
},
"max_score": null,
"hits": [
{
"_index": "regions",
"_type": "_doc",
"_id": "CHN",
"_score": null,
"_source": {
"name": "China",
"location": [
105,
35
]
},
"sort": [
0.0
]
},
{
"_index": "regions",
"_type": "_doc",
"_id": "MNG",
"_score": null,
"_source": {
"name": "Mongolia",
"location": [
105,
46
]
},
"sort": [
1223.1458751104453
]
},
{
"_index": "regions",
"_type": "_doc",
"_id": "MMR",
"_score": null,
"_source": {
"name": "Myanmar",
"location": [
98,
22
]
},
"sort": [
1597.9864780876323
]
},
{
"_index": "regions",
"_type": "_doc",
"_id": "BTN",
"_score": null,
"_source": {
"name": "Bhutan",
"location": [
90.5,
27.5
]
},
"sort": [
1608.420695556179
]
},
{
"_index": "regions",
"_type": "_doc",
"_id": "MAC",
"_score": null,
"_source": {
"name": "Macau",
"location": [
113.55,
22.16666666
]
},
"sort": [
1651.5104987054583
]
},
{
"_index": "regions",
"_type": "_doc",
"_id": "HKG",
"_score": null,
"_source": {
"name": "Hong Kong",
"location": [
114.16666666,
22.25
]
},
"sort": [
1674.4580484734988
]
},
{
"_index": "regions",
"_type": "_doc",
"_id": "LAO",
"_score": null,
"_source": {
"name": "Laos",
"location": [
105,
18
]
},
"sort": [
1890.3163582803475
]
},
{
"_index": "regions",
"_type": "_doc",
"_id": "BGD",
"_score": null,
"_source": {
"name": "Bangladesh",
"location": [
90,
24
]
},
"sort": [
1894.1562153610287
]
},
{
"_index": "regions",
"_type": "_doc",
"_id": "TWN",
"_score": null,
"_source": {
"name": "Taiwan",
"location": [
121,
23.5
]
},
"sort": [
2006.2988706499348
]
},
{
"_index": "regions",
"_type": "_doc",
"_id": "PRK",
"_score": null,
"_source": {
"name": "North Korea",
"location": [
127,
40
]
},
"sort": [
2012.9111514604588
]
},
{
"_index": "regions",
"_type": "_doc",
"_id": "KOR",
"_score": null,
"_source": {
"name": "South Korea",
"location": [
127.5,
37
]
},
"sort": [
2031.4778900496392
]
},
{
"_index": "regions",
"_type": "_doc",
"_id": "VNM",
"_score": null,
"_source": {
"name": "Vietnam",
"location": [
107.83333333,
16.16666666
]
},
"sort": [
2113.0731526216714
]
},
{
"_index": "regions",
"_type": "_doc",
"_id": "NPL",
"_score": null,
"_source": {
"name": "Nepal",
"location": [
84,
28
]
},
"sort": [
2132.4138804176705
]
},
{
"_index": "regions",
"_type": "_doc",
"_id": "THA",
"_score": null,
"_source": {
"name": "Thailand",
"location": [
100,
15
]
},
"sort": [
2279.3258680765903
]
},
{
"_index": "regions",
"_type": "_doc",
"_id": "KHM",
"_score": null,
"_source": {
"name": "Cambodia",
"location": [
105,
13
]
},
"sort": [
2446.291756434398
]
},
{
"_index": "regions",
"_type": "_doc",
"_id": "KGZ",
"_score": null,
"_source": {
"name": "Kyrgyzstan",
"location": [
75,
41
]
},
"sort": [
2697.507030320444
]
},
{
"_index": "regions",
"_type": "_doc",
"_id": "RUS",
"_score": null,
"_source": {
"name": "Russia",
"location": [
100,
60
]
},
"sort": [
2803.2803033336695
]
},
{
"_index": "regions",
"_type": "_doc",
"_id": "JPN",
"_score": null,
"_source": {
"name": "Japan",
"location": [
138,
36
]
},
"sort": [
2975.112941971521
]
},
{
"_index": "regions",
"_type": "_doc",
"_id": "PHL",
"_score": null,
"_source": {
"name": "Philippines",
"location": [
122,
13
]
},
"sort": [
2983.948851295192
]
}
]
}
}

When you search by distance, Elasticsearch won’t return the distance itself as it isn’t a real field. If you need sorting, Elasticsearch advise to use the sort function to return the value.



Sunday, February 21, 2016

Spring with Spark Framework for Microservices

Recently, my team decided to use Spark to build our new REST api.  Spark seems easy with less coding.  We still wanted to use Spring for dependency injection.  We also wanted to use Tomcat as the standalone web container.  After some research, we figured out how to build the REST api with Spring, Spark and Tomcat.
  1. Create class SpringSparkFilter which extends SparkFilter. Override method getApplication:
  2. Create class MySparkApplication, which implements SparkApplication. Implement init method:
  3. Configure web.xml to use SpringSparkFilter:
Then either build a war file, and deploy it to tomcat or use tomcat7-maven-plugin for test run.  I usually use tomcat7-maven-plugin for fast development.  Here is the command:

mvn tomcat7:run

Then access http://localhost:8080/current-date.  The complete example can be found here.

Thursday, September 18, 2014

HBase, Thrift2 and Erlang on Mac

There are not to many documents showing how Erlang can connect to HBase using Thrift protocol.  The very useful link which was published more than 5 years ago, and it discussed Thrift1.  Based on Lars George's post, Thrift2 will replace Thrift1 eventually.  The following will show what I have done to get Erlang to work with HBase using Thrift on a Mac machine.  It should be similar to get it work on a Linux machine.  Note that only Java is supported as first class citizen for HBase.  If you have the choice, I suggest using Java directly.
  1. Download thrift-0.9.0 and follow this tutorial to install it.
    cp -r thrift-0.9.0/lib/erl/*  /usr/local/opt/erlang/lib/erlang/lib/thrift-0.9.0 
    (homebrew installed erlang to /usr/local/opt/erlang.  Find out your own erlang directory.)
  2. Download latest stable hbase and decompress it.  I installed it to /usr/local/hbase-0.98.6.1-hadoop2, and will refer it as $HBASE_FOLDER.
  3. Download hbase.thrift and generate erlang bindings.
    thrift -gen erl hbase.thrift
    It generates erlang bindings in gen-erl folder.
  4. start hbase server:
    before starting hbase server, update conf/hbase-site.xml
    <configuration>
      <property>
        <name>hbase.rootdir</name>
        <value>file:///some-folder-hbase</value>
      </property>
      <property>
        <name>hbase.zookeeper.property.dataDir</name>
        <value>/some-folder-zookeeper</value>
      </property>
    </configuration>

    Then start hbase server:
    $HBASE_FOLDER/bin/start-hbase.sh
  5. start Thrift2:
    $HBASE_FOLDER/bin/base-daemon.sh start thrift2
  6. Create table 'ups' with column family 'ui' using HBase Shell (Thrift2 doesn't support create table yet):
    $HBASE_FOLDER/bin/hbase shell
    hbase(main):001:0>; create 'ups', 'ui'
  7. Download erlang client file and save it to gen-erl folder.
  8. Go to gen-erl folder, compile erlang files and start erlang shell:
    cd gen-erl
    erlc *.erl
    erl 
    1>hbase_thrift2_client:put("localhost", 9090).
    {{tclient,tHBaseService_thrift,
              {protocol,thrift_binary_protocol,
                        {binary_protocol,{transport,thrift_buffered_transport,
                                                    {buffered_transport,{transport,thrift_socket_transport,
                                                                                   {data,#Port<0 data-blogger-escaped-.716="">,infinity}},
                                                                        []}},
                                         true,true}},
              0},
     {ok,ok}}

    2>hbase_thrift2_client:get("localhost", 9090).
    {{tclient,tHBaseService_thrift,
              {protocol,thrift_binary_protocol,
                        {binary_protocol,{transport,thrift_buffered_transport,
                                                    {buffered_transport,{transport,thrift_socket_transport,
                                                                                   {data,#Port<0 data-blogger-escaped-.717="">,infinity}},
                                                                        []}},
                                         true,true}},
              0},
     {ok,{tResult,<<"zhentao-key1">>,
                  [{tColumnValue,<<"ui">>,<<"test">>,<<"xyz1">>,1411075681134,
                                 undefined},
                   {tColumnValue,<<"ui">>,<<"test1">>,<<"abc1">>,1411075681134,
                                 undefined}]}}}
The detailed erlang project can be found here

Friday, August 30, 2013

Spring AMQP and RabbitMQ High Availability (HA)

Recently, I finished a project to use RabbitMQ as the message broker.  It is pretty easy to set up a RabbitMQ cluster with High Availability (HA).  The RabbitMQ has the pretty good documentation on how to set up the cluster.  I also document it here on how to set up a 2-node cluster on a single Mac machine.  In this post, I would like to show how we can leverage spring-amqp to connect to RabbitMQ cluster.  I have a rabbit cluster running with 2 nodes: localhost:5673 and localhost:5672.

It is pretty easy to use spring-amqp.  I used maven to manage the dependencies:


jackson-jaxrs-json-provider is used to serialize java object to json, and deserialize json back to java object.

When creating ConnectionFactory, the addresses should be used instead of the host and port:


The addresses are the comma separated host:port pairs which consist of the cluster.

For producer, we use rabbitTemplate to send messages:



For consumer, a MessageListenerContainer is created to consume message asynchronously:



The MessageHandler code is as follows:


This class can be called anything you like, but the method must be called handleMessaege and with the correct signature (here it is Employee to match producer).  If you want to change the method name, you have to call:

          MessageListenerAdapter.setDefaultListenerMethod

The source code can be download from github.


Tuesday, June 11, 2013

Skip cobertura coverage check when skip unit test

After I introduced embedded mysql for unit testing to my project, the build is taking longer now.  For my local development, sometimes I want to disable the unit test:

      mvn clean package -DskipTests

Note that you can even skip the compiling of test class by using

     mvn clean package -Dmaven.test.skip

However, if you use cobertura for code coverage check, you would get the following error:

[ERROR] Project failed check. Total branch coverage rate of 0.0% is below 85.0%
Project failed check. Total line coverage rate of 0.0% is below 85.0%


So you need to skip cobertura as well.  if you google "skip cobertura", most answers suggested using haltOnFailure. However, cobertura already provides the skip functionality:

     mvn clean package -DskipTests -Dcobertura.skip

With -Dcobertura.skip, cobertura check will be skipped:

[INFO] --- cobertura-maven-plugin:2.5.2:instrument (default) @ tag-service ---
[INFO] Skipping cobertura execution