I have posted how to create Solr index
using SolrJ and
DataImportHandler. Importing of more than 6,070,000 address data less than 5 minutes on my local machine using a DataImportHanlder is not too bad. Nevertheless, it will be worth to try a distributed system and I will use Hadoop's MapReduce on this post.
I assume that
Solr server is installed and running. Also, you are able to run Hadoop on your machine as shown on my previous posts:
Install Hadoop,
Configuration and Running Hadoop,
Running MapReduce.
Since Hadoop can feed each line of data in 17 different files to a map method, it seemed easier to send HTTP requests to Solr for data indexing, but I encountered several things to consider.
Things to Consider:
1. How to include third part libraries such as SolrJ libraries to the Hadoop?
I sure can use a Java URL object (without SolrJ libraries) and execute a HTTP request to the Solr after constructing a HTTP request parameter string, but using an existing library is usually beneficial.
2. How to generate an unique 'addressId' value in multiple map tasks, which may run on multiple JVM?
On the schema.xml, the addressId field is an unique field. In the database, I used a database sequence and made sure this value is unique in the database. When I use a client with the SolrJ, the client ran on one JVM and I made sure this value is unique by incrementing a value by 1.
3. New design to break up the process is necessary and consider efficiency of the process between the map and the reduce. During the address indexing, we just process each line (without any elimination) and post the data to the Solr. So, need to think about how many Map tasks and Reduce tasks need to be used.
Approaches:
1. Inside the jar file with the address MapReduce, we may include all necessary jar files in a lib directory. In this case, a size of thee jar file will be larger and each different jar file may contains same depending jar files. Instead, we can put necessary jar file to HDFS and share it if necessary.
Hadoop has a command option '-libjars' followed by a list of jar files separated with a comma:
-libjars jar1, jar2,jar3....
2. Since several map and reduce tasks can run on multiple JVM, we cannot simply use a static count variable. One way to have a unique id, we can use Solr's UUID techniques as shown on the
Solr Wiki page for my example on this post.
Another way, which I will use, is using hadoop's jab and task id. This is a table shown on a book "Hadoop-The Definitive Guide" written by Tom White.
Property Name |
Type |
Description |
Example |
mapreduce.job.id |
String |
The job id |
job_200811201130_0004 |
mapreduce.task.id |
String |
The task id |
task_200811201130_0004_m_000003 |
mapreduce.task.attempt.id |
String |
The task attempt id |
attempt_200811201130_0004_m_000003_0 |
mapreduce.task.partition |
int |
The index of the task within the job |
3 |
mapreduce.task.ismap |
boolean |
Whether this task is a map task |
true |
When you need an each id shown on the table above in your java codes, Java API will be more useful. Context parameter of the map/reduce method provides a way to retrieve ids.
For example,
context.getTaskAttemptID().getJobID().getId() , context.getTaskAttemptID().getTaskID().getId()
3. In our address indexing case, we don't need to drop or combine any line of data. Since each line is processed by a map function, I think each line can populate a data object, which will be sent to Solr, in a map method. Then, use a send and commit method for every certain number of data as we did on the
simple SolrJ example.
Hadoop performs a sorting for the output of the map before the reduce process. When we process each line within a map method and not produce any output for the reducer, we can eliminate data sorting and copying process for the reducer. In fact, I don't need a reducer at all for this, but need a combiner to take care of remaining data and to call a commit method for the data run on a same JVM.
Implementation:
1. Map
public class AddressMap2 extends Mapper<LongWritable, Text, Text, Text>{
static final SolrServer server = new HttpSolrServer(
"http://localhost:8983/solr/addressDB");
static final List<SolrInputDocument> docList =
new ArrayList<SolrInputDocument>();
static boolean isWritten = false;
static int count = 1;
static final MAX_DOC_PER_JVM = 800000;
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] lineTerms = value.toString().split("\\|");
try{
Integer.parseInt(lineTerms[0].trim());
}catch(NumberFormatException ne){
//All data starts with a number. When we get here, this data is one
//of the first two comment lines.
return;
}
int countNumber = (context.getTaskAttemptID().getTaskID().getId() *
MAX_DOC_PER_JVM) + count;
AddressInFileParser parser = new AddressInFileParser();
SolrInputDocument doc = parser.parse(value.toString(), countNum);
docList.add(doc);
if(!docList.isEmpty() && (count%1000 == 0)){
try {
server.add(docList);
docList.clear();
} catch (SolrServerException e) {
throw new InterruptedException(e.getMessage());
}
}
count++;
if(!isWritten){
context.write(new Text("dummy"), new Text(Integer.toString(count)));
isWritten = true;
}
}
}
|
- MAX_DOC_PER_JVM is calculated based on a total number of address data (about 6,070,000) and number of mapper we can approximate, which is about 10.
* Why 10?
Size of the input address file shown on the step 6 below is 1226231976 bytes, and Hadoop splits the input file to the size of file block, which is 128 MB (134217728 bytes) by default. Therefore, 1226231976 / 134217728 = 9.14 --> 10
To confirm it, see the Result section below.
- We don't need to generate any output from the mapper, but the combiner won't be called without any output from the mapper. So, I am writing one dummy output per a JVM.
2. AddressInFileParser used in the Map
public class AddressInFileParser {
/**
* @param line. This line is expected to have the following format.
* area_code|state|state_en|city|city_en|sub_city|sub_city_en|
* street_code|street_name|street_name_en|is_basement|building_num|
* building_num_sub|building_mgm_num|bulk_delivery_place_name|
* building_name|legal_dong_code|legal_dong_name|ri_name|admin_dong_name|
* is_mountain|ground_num|dong_seq|ground_num_sub|postal_code|postal_code_seq
*
* @param addressId
* @return
*/
public SolrInputDocument parse(String line, int addressId){
String[] lineTerms = line.split("\\|");
int parseIndex = 0;
SolrInputDocument doc = new SolrInputDocument();
doc.addField("addressId", addressId);
doc.addField("areaCode", StringUtil.nonNullTrim(lineTerms[parseIndex++]));
doc.addField("state", StringUtil.nonNullTrim(lineTerms[parseIndex++]));
doc.addField("state_en", StringUtil.nonNullTrim(lineTerms[parseIndex++]));
doc.addField("city", StringUtil.nonNullTrim(lineTerms[parseIndex++]));
doc.addField("city_en", StringUtil.nonNullTrim(lineTerms[parseIndex++]));
doc.addField("subCity", StringUtil.nonNullTrim(lineTerms[parseIndex++]));
doc.addField("subCity_en", StringUtil.nonNullTrim(lineTerms[parseIndex++]));
parseIndex++; //Skip street_code
doc.addField("streetName", StringUtil.nonNullTrim(lineTerms[parseIndex++]));
doc.addField("streetName_en", StringUtil.nonNullTrim(lineTerms[parseIndex++]));
parseIndex++; //Skip is_basement
String bldNumber = StringUtil.nonNullTrim(lineTerms[parseIndex++]);
if (!StringUtil.isEmpty(lineTerms[parseIndex])) {
bldNumber = bldNumber + "-" + lineTerms[parseIndex].trim();
}
parseIndex++;
doc.addField("buildingNumber", bldNumber);
parseIndex++; //Skip building_mgm_num
doc.addField("bulkDeliveryPlaceName",
StringUtil.nonNullTrim(lineTerms[parseIndex++]));
doc.addField("buildingName", StringUtil.nonNullTrim(lineTerms[parseIndex++]));
parseIndex++; //Skip legal_dong_code
doc.addField("dongName", StringUtil.nonNullTrim(lineTerms[parseIndex++]));
doc.addField("riName", StringUtil.nonNullTrim(lineTerms[parseIndex++]));
doc.addField("adminDongName", StringUtil.nonNullTrim(lineTerms[parseIndex++]));
parseIndex++; // skip is_mountain
String grdNumber = StringUtil.nonNullTrim(lineTerms[parseIndex++]);
doc.addField("dongSeq", StringUtil.nonNullTrim(lineTerms[parseIndex++]));
if (!StringUtil.isEmpty(lineTerms[parseIndex])) {
grdNumber = grdNumber + "-" + lineTerms[parseIndex];
}
parseIndex++;
doc.addField("groundNumber", grdNumber);
doc.addField("postalCode", StringUtil.nonNullTrim(lineTerms[parseIndex++]));
return doc;
}
}
|
3. Combiner
public class AddressReduce2 extends
Reducer<Text, Text, Text, IntWritable>{
@Override
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException{
try {
if(!AddressMap2.docList.isEmpty()){
AddressMap2.server.add(AddressMap2.docList);
}
AddressMap2.server.commit();
} catch (SolrServerException e) {
e.printStackTrace();
}
}
}
|
4. Client
public class AddressMRClient
extends Configured implements Tool{
@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.err.printf("Usage: %s [generic options] <input> <output>\n",
getClass().getSimpleName());
ToolRunner.printGenericCommandUsage(System.err);
return -1;
}
Job job = Job.getInstance(getConf(), "Solr address");
job.setJarByClass(getClass());
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(AddressMap2.class);
job.setCombinerClass(AddressReduce2.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
return job.waitForCompletion(true)? 0:1;
}
public static void main(String[] args) throws Exception{
int exitCode = ToolRunner.run(new AddressMRClient(), args);
System.exit(exitCode);
}
}
|
5. pom.xml for the project
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.jihwan.learn.solr</groupId>
<artifactId>address-db-client</artifactId>
<version>1.0</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<hadoop.version>2.6.0</hadoop.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.solr</groupId>
<artifactId>solr-solrj</artifactId>
<version>4.10.2</version>
</dependency>
<dependency>
<groupId>org.apache.solr</groupId>
<artifactId>solr-dataimporthandler</artifactId>
<version>4.10.2</version>
</dependency>
<dependency>
<groupId>org.apache.solr</groupId>
<artifactId>solr-dataimporthandler-extras</artifactId>
<version>4.10.2</version>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.10</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
<version>1.7.6</version>
</dependency>
<!-- Hadoop main client artifact -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!-- Unit test artifacts -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.mrunit</groupId>
<artifactId>mrunit</artifactId>
<version>1.1.0</version>
<classifier>hadoop2</classifier>
<scope>test</scope>
</dependency>
<!-- Hadoop test artifact for running mini clusters -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minicluster</artifactId>
<version>${hadoop.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.6</source>
<target>1.6</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.5</version>
<configuration>
<outputDirectory>${basedir}</outputDirectory>
</configuration>
</plugin>
</plugins>
</build>
</project>
|
6. Combining input files.
There are
17 address data files based on region and the size of each files is from 17.4 MB to 175.8 MB. In Hadoop, it is better to have a small number of large files. This is from a book "Hadoop - The Definitive Guide"
"Hadoop works better with a small number of large files than a large
number of small files. One reason for this is that FileInputFormat
generates splits in such a way that each split is all or part of a
single file. If the file is very small (“small” means significantly
smaller than an HDFS block) and there are a lot of them, each map
task will process very little input, and there will be a lot of them
(one per file), each of which imposes extra bookkeeping overhead.
Compare a 1 GB file broken into eight 128 MB blocks with 10,000 or so
100 KB files. The 10,000 files use one map each, and the job time can
be tens or hundreds of times slower than the equivalent one with a
single input file and eight map tasks."
|
To combine the files, I ran the following command on my MacBook.
$ sh -c 'cat *.txt > addressmerge.txt'
7. Running the application.
- Again, need to start hadoop and may need to create directory since I am using a /tmp directory on this post.
$ cd $HADOOP_PREFIX
$ bin/hdfs namenode -format
$ sbin/start-dfs.sh
$ sbin/start-yarn.sh
$ bin/hdfs dfs -mkdir /user
$ bin/hdfs dfs -mkdir /user/<username>
Now, create a directory for the address data file.
$ bin/hdfs dfs -mkdir /user/<username>/input
$ bin/hdfs dfs -mkdir /user/<username>/input/addressMerge
- Copy the data file on the local to Hadoop file system.
$ cd DIR_OF_addressmerge.txt_YOU_CREATED_ON_STEP6
$ hadoop fs -copyFromLocal ./addressmerge.txt input/addressMerge
- Compile the MapReduce codes.
$ cd DIR_OF_YOUR_PROJECT
$ mvn compile
- Start the Solr server. You may need to delete 'data' directory under $SOLR_INSTALLED_DIR/example/solr/addressDB before the start.
- Run the application. All library dependency is specified using a "-libjars" option
** line shown here is separated, but it should be all one line command **
$ hadoop jar address-db-client-1.0.jar com.jihwan.learn.solr.AddressMRClient
-conf conf/hadoop-localhost.xml -libjars
/Users/jihwan/.m2/repository/org/apache/solr/solr-solrj/4.10.2/solr-solrj-4.10.2.jar,
/Users/jihwan/.m2/repository/commons-io/commons-io/2.3/commons-io-2.3.jar,
/Users/jihwan/.m2/repository/org/apache/httpcomponents/httpclient/4.3.1/httpclient-4.3.1.jar,
/Users/jihwan/.m2/repository/org/apache/httpcomponents/httpcore/4.3/httpcore-4.3.jar,
/Users/jihwan/.m2/repository/org/apache/httpcomponents/httpmime/4.3.1/httpmime-4.3.1.jar,
/Users/jihwan/.m2/repository/org/noggit/noggit/0.5/noggit-0.5.jar,
/Users/jihwan/.m2/repository/org/slf4j/slf4j-api/1.7.6/slf4j-api-1.7.6.jar
input/addressMerge addressOutMerge
8. Result.
This Hadoop MapReduce application created all 6,071,307 address data on the Solr and it took about a little more that 5 minutes. This is slower than the data import from the database shown on a
previous post, but we should consider time to insert data to the postgresql DB from the data files, which was about 10 minutes on my laptop.
Let's study some of output during the application running.
- The application started at 20:49:35.
- Total input paths to process : 1 --> we have only one input file 'addressmerge.txt'
- number of splits:10 --> Hadoop split one input file to 10 smaller files that fit to the Hadoop max block size, which is 128MB by default. Hadoop will also run 10 mappers.
- running in uber mode: false --> This is not a small job that can be run sequentially on one node.
- The job was completed at 20:54:41.
- Launched map tasks=12 --> Earlier I said Hadoop will run 10 mappers, but it indicates 12 map tasks were running.
- Killed map tasks=2 --> Although 12 map tasks were running, 2 of them were killed. Hadoop often runs more than one same task parallely and kills duplicate tasks after another same task was completed first.
- Launched reduce tasks=1 --> Even if we didn't specify a reducer, Hadoop still use a default reducer org.apache.hadoop.mapreduce.Reducer, which simply writes all its input to its output. Since we didn't write anything from the combiner, the reducer didn't do anything.
- Map input records=6071341 --> We know we had total 6071307 address data. Before we merge 17 address files, each file had two comment lines, which we dropped in the map method. So, 6071341 - (17*2) = 6071307.
- Map output records=10 , Combine input records=10 --> In the map method, we only created one output data per a JVM and we had 10 mappers on different JVM. So, total output records from the map is 10, which is same as the combine input records.
Warning:
On this post, I just described how to run MapReduce to create Solr Indexing, but we should also pay attention to what is happening on Solr side. Solr makes a new searcher visible to clients on each commit call and it triggers expensive processes such as autowarming and recreating caches for the new searcher. Therefore, developers need to be careful before calling the commit method from multiple servers with/without some interval.