Programming Guide

Loading Data

1. Your First Program

SampleClean is a scalable data cleaning library built on the AMPLab Berkeley Data Analytics Stack (BDAS). The library uses Apache SparkSQL 1.2.0 and Apache Hive to support distributed data cleaning operations and query processing on dirty data. In this programming guide, we will introduce the SampleClean software, its API, and how to build programs using the library. To get started, first import Spark and SampleClean.

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import sampleclean.api.SampleCleanContext
	  

To begin using SampleClean, you first need to create an object called a SampleCleanContext. This context manages all of the information for your session and provides the API primitives for interacting with the data. SampleCleanContext is constructed with a SparkContext object.

new SampleCleanContext(sparkContext)
	  

2. Transferring Data Into Hive

SampleClean uses ApacheHive as its persistent data storage layer, and uses the Hive tables to manage schema and lineage information. SampleClean assumes that your dirty data is already stored in an Apache Hive Table. If not, SampleClean provides an interface through the SampleCleanContext to directly query Hive using HiveQL.

val hiveContext = scc.getHiveContext()
hiveContext.hql(myQuery)
          
Refer to the Hive documentation on writing these queries. An example Hive loading query is:

val myQuery1 = "CREATE TABLE IF NOT EXISTS 
		restaurant(id String, 
			   entity String,
			   name String,
			   city String,
			   category String) 
		ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' 
		LINES TERMINATED BY '\\n'"

val myQuery2 = "LOAD DATA LOCAL INPATH 'restaurant.csv' 
		OVERWRITE INTO TABLE restaurant"
          

3. Initializing Tables With SampleClean

Once your data is in a Hive table, the first step is to register tables with SampleClean. The SampleCleanContext provides an initialize function that takes as input the name of a base table in hive, the name of what you will call your working set (the data you will be cleaning), and a sampling ratio.

initialize(baseTableName: String, workingSetName: String, [samplingRatio: double = 1.0])


scc.initialize(baseTableName, workingSetName, samplingRatio)
          
For some applications, it may be better to use a consistent hash to sample e.g., for debugging or being able to join sampled tables. For these applications, SampleClean provides an API. The initializeConsistent function takes as input a base table in hive, the name of what you will call your working set (the data you will be cleaning), a column to hash, and a samplingRatio to determine the hashing modulus.

initializeConsistent(baseTableName: String, workingSetName: String, columnName: String, [samplingRatio: double = 1.0])


scc.initializeConsistent(baseTableName, workingSetName, columnName, samplingRatio)
          
For details please refer to this article. A simple example of the benefits of hashing is explained. Consider two tables with a foreign key relationship:

Restaurant(restaurant_id, restaurant_name, city_id)
City(city_id, city_name, city_population)
If we apply initializeConsistent with on the attribute "city_id", then our sample is all restaurants from a random sample of cities as opposed to a sample of restaurants. Then, if we apply the same initializeConsistent function to City, then we can join the two random samples as every city sampled from City, is also a city in Restaurant.

SampleClean Tables

SampleClean creates logical table for your defined workingSetName. You apply our data cleaning algorithms to the logical tables and the base data is never modified. Of course, these logical tables require additional storage. SampleClean provides functions to clear temporary tables after you finish.

closeHiveSession()

Querying Tables

You must use SampleClean's API's to query the data in this table. You can fetch the data as an RDD and also get the sampling ratio:

getCleanSample(workingSetName):SchemaRDD
getSamplingRatio(workingSetName):Double
As the data is a SchemaRDD, any of the functionality of Apache SparkSQL can be used. Please refer to the SparkSQL documentation for more details.
These working sets are logical tables with the same columns as your initial table but two extra meta data colums: hash, and dup. The hash column is a unique identifier for each row and the dup counts the number of records in the entire dataset that refer to the same entity as the record, and before any cleaning this is 1.

SampleClean has special support for aggregate queries, please refer to our research paper for more details. Since you are potentially querying a sample, the results will be approximate. Therefore, you need to use the SampleCleanAQP API to interpret the probabilistic results. This gives you an estimate of the query result on the full data, as well as a confidence interval quantifying the distance to the true result (as if you queried the full data). In our research, we have shown how to make these query results consistent before and after cleaning.

new SampleCleanAQP();
          

This class interprets SQL queries and rewrites/re-scales them in a way that you can estimate confidence intervals and get unbiased results. We provide a wrapper interface for SQL Queries to avoid having to manipulate SQL query strings programmatically:

SampleCleanQuery(scc:SampleCleanContext, aqp:SampleCleanAQP, workingSetName:String, attribute:String, aggregate:String, predicate:String, groupBy:String, algorithm:Boolean)


val query = new SampleCleanQuery(scc,
				 aqp,
				 workingSetName,
				 attribute, //attribute to aggregate 
				 aggregate, //aggregate function
				 predicate, //SQL predicate
				 groupBy, //group by attributes
				 algorithm //see below)
          
SampleClean has two result estimation algorithms RawSC and NormalizedSC, please refer to our research paper for more details. The default algorithm option is True, using RawSC.
To execute the query simply run:

query.execute()
          
The result is a tuple (estimate, standardError). To calculate a confidence interval using this tuple, refer to a standard normal probability table. E.g., 1.96*standardError gives a 95% confidence interval, 2.57*standardError gives a 99% confidence interval.

Interpreting Results on Samples

SampleClean result estimation gives an estimate of the result as if the entire dataset was cleaned. That is, your confidence interval measures the distance to the fully cleaned table.

Entity Resolution

SampleClean provides an easy-to-use interface to basic EntityResolution tasks. We provide the EntityResolution class that wraps common deduplication programming patterns into a single class. Here is the basic EntityResolution workflow: (1) I have a column of categorical attributes that are inconsistent, (2) I first link together similar attributes, (3) I chose a single canonical representation of the linked attributes, and (4) I apply the changes to the database.

Short String Comparison

Suppose, we have a column of short strings such as country names that are inconsistently represented (e.g., United States, Uinted States). We can apply our framework to deduplicate these representations. The function EntityResolution.shortAttributeCanonicalize takes as input the current context, the name of the working set you want to clean, the column to fix, and a threshold in [0,1] (0 merge all, 1 merge only exact matches). The default similarity metric used in this operation is EditDistance, see below for customizing this process.

val algorithm = EntityResolution.shortAttributeCanonicalize(scc,workingSetName,columnName,threshold)

Long String Comparison

Suppose, we have a column of long strings such as addresses that are close but not exact, our entity resolution class supports similarity metrics that are better suited to such strings. The basic strategy is to tokenize these strings and compare the set of words rather than the whole string. This class by default tokenizes on whitespace, see below for details for customization. By default it uses the WeightedJaccard similarity metric.

longAttributeCanonicalize(scc,workingSetName,columnName,threshold)

Record Deduplication

A more advanced deduplication task is when records (rather than individual columns) are inconsistent. That is, there are multiple records in the dataset that refer to the same real world entity. We provide an analagous API for this problem in the class RecordDeduplication. By default, RecordDeduplication uses the "Long Attribute" similarity metrics and tokenization described above.

RecordDeduplication.deduplication(scc, workingSetName, columnProjection, threshold)

Crowdsourced Deduplication

Sometimes it may be hard to tune automated entity resolution to the desired quality. In such cases, the user can use crowdsourcing (by paying human workers on platforms such as Amazon’s Mechanical Turk) to improve the quality of deduplication. Since crowdsourcing scales poorly to very large datasets, we ask the crowd to deduplicate only a sample of the data, then train predictive models to generalize the crowd’s work to the entire dataset. In particular, we use techniques from Active Learning to sample points that will lead to a good model as quickly as possible.

Configuring the Crowd

To clean data using crowd workers, we use the open source AMPCrowd service in order to support multiple crowd platforms and provide automated quality control. Thus, in order to use the crowd-based operators described below, the user must first have a running installation of AMPCrowd (see the Installation instructions). In addition, crowd operators must be configured to point to the AMPCrowd server by passing CrowdConfiguration objects (see the JAVADOCS for all parameters and their defaults).

val crowdConfig = CrowdConfiguration(crowdName=”amt”, 
     crowdServerHost=”127.0.0.1”, 
     crowdServerPort=”8000”)

Using the Crowd

We currently provide one main crowd operator: the ActiveLearningMatcher. This is an add-on step to an existing EntityResolution algorithm that trains a crowd-supervised models to predict duplicates. createCrowdMatcher(scc:SampleCleanContext, attribute:String, workingSetName:String)

val crowdMatcher = EntityResolution.createCrowdMatcher(scc,attribute,workingSetName)
Make sure to configure the matcher:

crowdMatcher.alstrategy.setCrowdParameters(crowdConfig)
To add this matcher to your algorithm, use the following function: addMatcher(matcher:Matcher)

algorithm.components.addMatcher(crowdMatcher)

Tuning the Crowd

The crowd matcher calls out to AMPCrowd with reasonable defaults for individual tasks, but these can also be customized, using the CrowdTaskConfiguration object. See the JAVADOCS for the full list of parameters. The custom task configuration can be assigned to a crowd matcher with the setTaskParameters(crowdTaskParams) method.

val taskParams = CrowdTaskConfiguration(votesPerPoint=3, maxPointsPerTask=10)
crowdMatcher.alstrategy.setTaskParameters(taskParams)

Customizing Operators

SampleClean provides a composable class heirarchy that allows you to customize both automatic and crowd sourced operators. In our initial release, we provide the API to customize EntityResolution operations.

Similarity And Tokenization

The first thing you may want to customize is the similarity metrics and tokenization strategies used to deduplicate. SampleClean makes this process easy, and users can choose between EditDistance, WeightedJaccard, WeightedCosine, WeightedDice, and WeightedOverlap.

algorithm.components.changeSimilarity(newSimilarity)
Likewise, if you want to change tokenization strategies between Whitespace and WhitespaceAndPunc:

algorithm.components.changeTokenization(newTokenization)
To write custom similarity metrics, please refer to our JAVADOCS on AnnotatedSimilarityFeaturizer To write custom tokenizers, please refer to our JAVADOCS on Tokenizer

Cannonicalization Strategy

You may also want to set a canonicalization strategy. When you group together values to deduplicate, you need a strategy to pick between the "canonical" representation. We provide two strategies: mostFrequent or mostConcise. Most Frequent sets all duplicate representations to the most frequently found representation and the Most Concise sets all representations to the shortest representation. By default, we use the mostFrequent strategy.

algorithm.setStrategy(strategy)

Completely Custom Pipelines

algorithm.components is an object we call a BlockerMatcherSelfJoinSequence, this decouples two stages of entity resolution: blocking and matching. These components are composable and can be modified with different interals such as join implementations and caching. Please refer to our JAVADOCS for more information.