1. Your First Program
SampleClean is a scalable data cleaning library built on the AMPLab Berkeley Data Analytics Stack (BDAS). The library uses Apache SparkSQL 1.2.0 and Apache Hive to support distributed data cleaning operations and query processing on dirty data. In this programming guide, we will introduce the SampleClean software, its API, and how to build programs using the library.
To get started, first import Spark and SampleClean.
To begin using SampleClean, you first need to create an object called a SampleCleanContext
This context manages all of the information for your session and provides the API primitives for interacting with the data.
is constructed with a SparkContext
2. Transferring Data Into Hive
SampleClean uses ApacheHive as its persistent data storage layer, and uses the Hive tables to manage schema and lineage information.
SampleClean assumes that your dirty data is already stored in an Apache Hive Table.
If not, SampleClean provides an interface through the SampleCleanContext to directly query Hive using HiveQL.
val hiveContext = scc.getHiveContext()
Refer to the Hive documentation
on writing these queries.
An example Hive loading query is:
val myQuery1 = "CREATE TABLE IF NOT EXISTS
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LINES TERMINATED BY '\\n'"
val myQuery2 = "LOAD DATA LOCAL INPATH 'restaurant.csv'
OVERWRITE INTO TABLE restaurant"
3. Initializing Tables With SampleClean
Once your data is in a Hive table,
the first step is to register tables with SampleClean.
provides an initialize
function that takes as input the name of a base table in hive, the name of what you will call your working set (the data you will be cleaning), and a sampling ratio.
initialize(baseTableName: String, workingSetName: String, [samplingRatio: double = 1.0])
scc.initialize(baseTableName, workingSetName, samplingRatio)
For some applications, it may be better to use a consistent hash to sample e.g., for debugging or being able to join sampled tables. For these applications, SampleClean provides an API. The initializeConsistent
function takes as input a base table in hive, the name of what you will call your working set (the data you will be cleaning), a column to hash, and a samplingRatio to determine the hashing modulus.
initializeConsistent(baseTableName: String, workingSetName: String, columnName: String, [samplingRatio: double = 1.0])
scc.initializeConsistent(baseTableName, workingSetName, columnName, samplingRatio)
For details please refer to this article
A simple example of the benefits of hashing is explained. Consider two tables with a foreign key relationship:
Restaurant(restaurant_id, restaurant_name, city_id)
City(city_id, city_name, city_population)
If we apply initializeConsistent with on the attribute "city_id", then our sample is all restaurants from a random sample of cities as opposed to a sample of restaurants. Then, if we apply the same initializeConsistent function to City, then we can join the two random samples as every city sampled from City, is also a city in Restaurant.
SampleClean creates logical table for your defined workingSetName
You apply our data cleaning algorithms to the logical tables and the base data is never modified.
Of course, these logical tables require additional storage.
SampleClean provides functions to clear temporary tables after you finish.
You must use SampleClean's API's to query the data in this table.
You can fetch the data as an RDD and also get the sampling ratio:
As the data is a SchemaRDD, any of the functionality of Apache SparkSQL can be used. Please refer to the SparkSQL documentation
for more details.
These working sets are logical tables with the same columns as your initial table but two extra meta data colums: hash
, and dup
. The hash
column is a unique identifier for each row and the dup
counts the number of records in the entire dataset that refer to the same entity as the record, and before any cleaning this is 1
SampleClean has special support for aggregate queries, please refer to our research paper
for more details.
Since you are potentially querying a sample, the results will be approximate. Therefore, you need to use the SampleCleanAQP
API to interpret the probabilistic results.
This gives you an estimate of the query result on the full data, as well as a confidence interval quantifying the distance to the true result (as if you queried the full data).
In our research, we have shown how to make these query results consistent before and after cleaning.
This class interprets SQL queries and rewrites/re-scales them in a way that you can estimate confidence intervals and get unbiased results.
We provide a wrapper interface for SQL Queries to avoid having to manipulate SQL query strings programmatically:
SampleCleanQuery(scc:SampleCleanContext, aqp:SampleCleanAQP, workingSetName:String, attribute:String, aggregate:String, predicate:String, groupBy:String, algorithm:Boolean)
val query = new SampleCleanQuery(scc,
attribute, //attribute to aggregate
aggregate, //aggregate function
predicate, //SQL predicate
groupBy, //group by attributes
algorithm //see below)
SampleClean has two result estimation algorithms RawSC
, please refer to our research paper
for more details. The default algorithm option is True, using RawSC
To execute the query simply run:
The result is a tuple (estimate, standardError)
To calculate a confidence interval using this tuple, refer to a standard normal probability table.
E.g., 1.96*standardError gives a 95% confidence interval, 2.57*standardError gives a 99% confidence interval.
Interpreting Results on Samples
SampleClean result estimation gives an estimate of the result as if the entire dataset was cleaned.
That is, your confidence interval measures the distance to the fully cleaned table.
SampleClean provides an easy-to-use interface to basic EntityResolution tasks.
We provide the EntityResolution
class that wraps common deduplication
programming patterns into a single class.
Here is the basic EntityResolution workflow: (1) I have a column of categorical attributes that are inconsistent, (2) I first link together similar attributes, (3) I chose a single canonical representation of the linked attributes, and (4) I apply the changes to the database.
Short String Comparison
Suppose, we have a column of short strings such as country names that are inconsistently represented (e.g., United States, Uinted States).
We can apply our framework to deduplicate these representations.
The function EntityResolution.shortAttributeCanonicalize
takes as input the current context, the name of the working
set you want to clean, the column to fix, and a threshold in [0,1] (0 merge all, 1 merge only exact matches).
The default similarity metric used in this operation is EditDistance, see below for customizing this process.
val algorithm = EntityResolution.shortAttributeCanonicalize(scc,workingSetName,columnName,threshold)
Long String Comparison
Suppose, we have a column of long strings such as addresses that are close but not exact, our entity resolution
class supports similarity metrics that are better suited to such strings.
The basic strategy is to tokenize these strings and compare the set of words rather than the whole string.
This class by default tokenizes on whitespace, see below for details for customization.
By default it uses the WeightedJaccard similarity metric.
A more advanced deduplication task is when records (rather than individual columns) are inconsistent. That is, there are multiple records in the dataset that refer to the same real world entity. We provide an analagous API for this problem in the class RecordDeduplication
By default, RecordDeduplication uses the "Long Attribute" similarity metrics and tokenization described above.
RecordDeduplication.deduplication(scc, workingSetName, columnProjection, threshold)
Sometimes it may be hard to tune automated entity resolution to the desired quality. In such cases, the user can use crowdsourcing (by paying human workers on platforms such as Amazon’s Mechanical Turk) to improve the quality of deduplication. Since crowdsourcing scales poorly to very large datasets, we ask the crowd to deduplicate only a sample of the data, then train predictive models to generalize the crowd’s work to the entire dataset. In particular, we use techniques from Active Learning to sample points that will lead to a good model as quickly as possible.
Configuring the Crowd
To clean data using crowd workers, we use the open source AMPCrowd service in order to support multiple crowd platforms and provide automated quality control. Thus, in order to use the crowd-based operators described below, the user must first have a running installation of AMPCrowd (see the Installation instructions). In addition, crowd operators must be configured to point to the AMPCrowd server by passing CrowdConfiguration objects (see the JAVADOCS for all parameters and their defaults).
val crowdConfig = CrowdConfiguration(crowdName=”amt”,
Using the Crowd
We currently provide one main crowd operator: the ActiveLearningMatcher. This is an add-on step to an existing EntityResolution algorithm that trains a crowd-supervised models to predict duplicates.
createCrowdMatcher(scc:SampleCleanContext, attribute:String, workingSetName:String)
val crowdMatcher = EntityResolution.createCrowdMatcher(scc,attribute,workingSetName)
Make sure to configure the matcher:
To add this matcher to your algorithm, use the following function:
Tuning the Crowd
The crowd matcher calls out to AMPCrowd with reasonable defaults for individual tasks, but these can also be customized, using the CrowdTaskConfiguration object. See the JAVADOCS for the full list of parameters. The custom task configuration can be assigned to a crowd matcher with the setTaskParameters(crowdTaskParams) method.
val taskParams = CrowdTaskConfiguration(votesPerPoint=3, maxPointsPerTask=10)
SampleClean provides a composable class heirarchy that allows you to customize both automatic and crowd sourced operators. In our initial release, we provide the API to customize EntityResolution
Similarity And Tokenization
The first thing you may want to customize is the similarity metrics and tokenization strategies used to deduplicate.
SampleClean makes this process easy, and users can choose between EditDistance
, and WeightedOverlap
Likewise, if you want to change tokenization strategies between Whitespace
To write custom similarity metrics, please refer to our JAVADOCS on AnnotatedSimilarityFeaturizer
To write custom tokenizers, please refer to our JAVADOCS on Tokenizer
You may also want to set a canonicalization strategy.
When you group together values to deduplicate, you need a strategy to pick between the "canonical" representation.
We provide two strategies: mostFrequent or mostConcise. Most Frequent sets all duplicate representations to the most frequently found representation and the Most Concise sets all representations to the shortest representation.
By default, we use the mostFrequent strategy.
Completely Custom Pipelines
algorithm.components is an object we call a BlockerMatcherSelfJoinSequence, this decouples two stages of entity resolution: blocking and matching. These components are composable and can be modified with different interals such as join implementations and caching.
Please refer to our JAVADOCS for more information.