Sqrrl Blog

Jun 30, 2014 12:00:00 PM

Bulk Loading in Sqrrl Pt.1: The Basics

This post is by Sqrrl's Director of Data Science and Co-Founder, Chris McCubbin. It covers the basic design and functionality behind the Bulk Loading API on Sqrrl's ingest pipeline. In a future post, Chris will cover using the advanced Transformer interface of the API to construct graph objects.

Overview

Why Bulk Ingest

Accumulo stores its files in the 'RFile' format, which is essentially a sorted list of key-value pairs with some extra metadata thrown in. MapReduce, the processing framework that ships with Hadoop, was originally designed around sorting large amounts of data. It's therefore a natural fit to use MapReduce to process data and produce these sorted RFiles. Once they are created, Accumulo can simply start serving them up, perhaps later on merging them with other RFiles it has created in a process called major compaction. We call the process of using MapReduce to create Accumulo RFiles "bulk ingest". MapReduce jobs can be tricky to write correctly, however. Why bother when Accumulo has a perfectly good streaming ingest framework? The answer is speed: we may take advantage of bulk processing, sorting all of the entries at once instead of one at a time, and the batch nature of MapReduce to gain a speed advantage over the streaming API. It can also be convenient to use bulk ingest to import large amounts of data that are already distributed throughout the cluster in HDFS from legacy big data systems. At Sqrrl, we strive to give you the advantage of the speed of bulk ingest, while retaining the simplicity of streaming ingest. We have created some powerful wrappers for the bulk ingest API that will allow you to ingest standard bulk data very simply, and custom bulk data without too much trouble either. Sqrrl supports two formats out of the box: JSON and CSV. For other file types, and for some edge case variants of JSON and CSV files, one may create their own parser to assist Sqrrl in bulk ingesting data. We call these parsers Transformers and they will be the subject of a later blog. I'm going to describe how to ingest JSON and CSV files using Sqrrl's native Transformer in this blog.

JSON

JSON is a popular data storage format that was first introduced for a human readable Javascript client-server communications. It may contain hierarchical field representations and has a few predefined types that are implicit from the formatting of the data: strings, arbitrary precision numbers, booleans, etc. JSON is what we call a "self-describing" format, in that each field has a name, a position in the object hierarchy, and a value whose type can be inferred given its format. While Sqrrl Enterprise is built on top of the BigTable-style Apache Accumulo database, we actually abstract away this data model and natively provide a "JSON in, same JSON out" semantic. At the same time, we provide indexed search capability for all of the JSON documents and fields stored. Sqrrl's bulk loader is capable of ingesting files that have a single JSON document per line. The file or files can be of arbitrary size. Each line will be parsed and ingested to Sqrrl as a single document, either with a unique generated ID or with a configurable UUID based on the fields of the document. We've included an example of custom UUID generation below.

CSV

CSV is probably more accurately called a class of file formats, as there is a wide variety of valid CSV styles. Usually, records are held one per line with columns of each record separated by a comma. Escape characters are common when one wishes to represent a quote character within the field itself, and custom separators may be used when there are many commas in the dataset. Unlike JSON, CSV is not self-describing, so in order to ingest it well, we will need to create a description of the data contained in the file. In Sqrrl, we call this file a "field description file", or FDF. The native Transformer is capable of making one document per CSV record using the types outlined in the format description.

Examples

JSON Bulk load from Sqrrl Shell

Currently, in order for JSON files to be bulk ingested, you must stage the data to a special directory within HDFS, configured by a property within Sqrrl, sqrrl.hadoop.dfs.loadDirs . Let's say you have a file data.json in hdfs:///tmp/staging. Once this is done, we can create a dataset and begin an ingest of the Json quite simply:sqrrl@SQRRL> createdataset myDataset
Created dataset myDataset
sqrrl@SQRRL myDataset> startload --json /tmp/staging/data.json
Started load: job_201401141132_0001
  Depending on the size of the file and the speed of your cluster, ingest times will vary. To get an idea of the progress of the job, use the checkload command:
sqrrl@SQRRL myDataset> checkload job_201401141132_0001 \
--all --time-format "yyyy-MM-dd HH:mm:ss"
Checking on load: job_201401141132_0001
job_201401141132_0001 : {
state: MR_RUNNING
start: 2014-01-14 16:35:56
progress: 41.77%
finish: N/A
successes: 387209
failures: 0
}
  If the startload or checkload commands are given the watch flag (--watch, -w), progress will be streamed to the shell until it is complete.
sqrrl@SQRRL myDataset> checkload --watch job_201401141132_0001
[ Press any key to return to prompt ]
job_201401141132_0001 : LOAD_SUCCEEDED : 100.0%
Load 'job_201401141132_0001' is complete.
job_201401141132_0001 : {
state: LOAD_SUCCEEDED
start: 2014-01-14T16:35:56.350+0000
progress: 100.0%
finish: 2014-01-14T16:35:56.350+0000
successes: 743087
failures: 0
}
  If you want randomly assigned UUID's, that's it: your data will be loaded into the system and ready for analysis. The most useful option with the JSON load is to assign your own UUIDs based on document fields using the --do-updates, --uuid-fields, and --uuid-delimiter options. This example would use the user/last, user/first, and 3rd element of the text fields as part of the UUID:
sqrrl@SQRRL myDataset> startload --json /data.json --do-updates --uuid-fields user/last,user/first,text[2] --uuid-delimiter _
Applying the above options to an ingest of the JSON document {"user": {"first":"John","last":"Doe"},"dates":["Apr",1,1977]} results in a UUID of Doe_John_1977.

CSV/FDF bulk load from shell

Here's an example of a CSV line: 2014-01-08,John,Doe,367 What are these things though? Is the first column a date? What should I call the second column? As mentioned earlier, if we want to ingest the columns as CSV input, you need to tell Sqrrl a bit about the format of the columns. For the previous line, our FDF might look like this:
0:date:DATETIME:{"DATETIME_FORMAT":"yyyy-MM-dd","DATETIME_TIME_ZONE":"UTC"}
1:first:STRING
2:last:STRING
3:messages:INTEGER
  We need to tell the loader the index, the name and type of each column. missing indices are ignored. The type can be any sqrrl type, and DATETIMEs also have the requirement of being told the datetime format (in Java SimpleDateFormatter format) and timezone. Create that file and then start the load as before:
sqrrl@SQRRL myDataset> startload --csv /data.csv --do-updates --uuid-fields 1,0,3 --uuid-delimiter _
Here again we are creating the UUID from the document fields 1, 0 and 3 (in that order) with the delimiter character "_". For our example input line, the UUID will be:
John_2014-01-08_367
 

Java API

All the above actions can be performed with the Thrift API as well as in the shell. Our Java API also has some helper classes to help with the creation of the load jobs. For example, to do a JSON load with the API, we do this:
LoadOptions loadOptions = new LoadOptions(, SqrrlServerConstants.MIME_TYPE_JSON, Collections.singletonList());
String loadId = client.jobOperations().startLoad(loadOptions);
  We can then check the status of the job using the return values of the checkLoad call:
LoadStatus status = client.jobOperations().checkLoad(loadId);
 

Security

JSON

Visibilities in the JSON documents are handled the same way as with Sqrrl streaming ingest. That is, there is a special field designation for JSON fields that have security labels applied to them. Any field name in a JSON document with a postfix that looks like @[<visibility_label>], will have <visibility_label> applied as the Apache Accumulo visibility for that element and all of its contents. For example if we have the following JSON:
{
"children@[FAM|IRS]": {
"current": [
{ "name": "Johnny" }
],
"expecting@[FAM]": [
{ "name": "Baby Girl" }
]
} }
  We will expect the following fields to appear in Sqrrl:
 PathTypeVisibilityEffective VisibilityData
0 <empty> OBJECT     <unset>
1 children@[(FAM|IRS)] OBJECT (FAM|IRS) (FAM|IRS) <unset>
2 children@[(FAM|IRS)]/current ARRAY   (FAM|IRS) <unset>
3 children@[(FAM|IRS)]/current[0] OBJECT   (FAM|IRS) <unset>
4 children@[(FAM|IRS)]/current[0]/name STRING   (FAM|IRS) "Johnny".getBytes ("UTF-8")
5 children@[(FAM|IRS)]/expecting@[(FAM)] ARRAY (FAM) (FAM|IRS)&(FAM) <unset>
6 children@[(FAM|IRS)]/expecting@[(FAM)][0] OBJECT   (FAM|IRS)&(FAM) <unset>
7 children@[(FAM|IRS)]/expecting@[(FAM)][0]/name STRING   (FAM|IRS)&(FAM) "Baby Girl".getBytes ("UTF-8")
 

CSV

Using the default CSV ingest job, visibilities can only be applied to an entire column of input. To do this, set the field name to have a visibility label as in the JSON case above in the FDF. For example, if we used the following FDF:
0:date:DATETIME:{"DATETIME_FORMAT":"yyyy-MM-dd","DATETIME_TIME_ZONE":"UTC"}
1:first@[FAM]:STRING
2:last:STRING
3:messages:INTEGER
  Then each field produced from the first column of the CSV will have the FAM label applied to it.

Staging Directories

Some applications (including Sqrrl) may put things into HDFS that should not be read by arbitrary code; for instance, sensitive encryption keys. These sensitive pieces of data would ordinarily be protected by the ACLs that HDFS enforces. However, since bulk load jobs can be configured to execute arbitrary Java code (such as transformers), some system administrators may wish to narrow the set of directories that the bulk load subsystem may access. They can do this via the configuration setting in Sqrrl.

Error Handling

When a job fails, there are errors in the Hadoop MapReduce logs. Errors appear with stack traces similar to the following:

2014-01-16 15:28:12,988 ERROR com.sqrrl.analytics.server.load.LoadMapper:
failure in record at bad_1000.json:1701429
com.sqrrl.analytics.data.tokenizer.TokenizationException:
could not parse JSON @ line 1 : column 32

In this case, the error occurred when parsing the file named bad_1000.json at byte offset 1701429 in the file. This file can be inspected at this offset by using a command like the following:

> OFFSET=1701429
> hadoop fs -cat /data/bad_1000.json | tail -c +$(echo $OFFSET+1 | bc)
    | head -n 1 {"thisField":"hasARudeNeighbor",rud3ne!gh8Or}
 

The bulk loader assumes that the operator using it has full administrator-level access to Sqrrl Enterprise. As it does not utilize Sqrrl Enterprise's authorization framework, it is able to write any and all security labels that the Accumulo administrator can (that is, a system high situation). The bulk loader should therefore be used with some care in security-sensitive applications.

Summing Up

Hopefully these examples begin to show you the power of our bulk ingest API and demonstrate how you can specify some simple customizations on our basic JSON and CSV features. In a follow up post, I'll describe how you can use our Transformer API to create custom bulk ingest jobs, and directly emit graph structures into Sqrrl that can be navigated and analyzed. If you want to try these examples hands-on, sign up for our Test Drive VM or reach out to us for a demo at info@sqrrl.com. Thanks for reading!.

Topics: Accumulo, Big Data, Blog Post, Sqrrl Enterprise