Tag Archive for: Apache Spark

SynerScope empowers Apache Spark on IBM Power8 to truly deliver deep analytics

Author: Jorik Blaas

Let’s start by introducing the three key components:

  1. SynerScope is a deeply interactive any-data visual analytics platform for Big Data sense-making.
  2. Apache Spark is a lightning fast framework for in-memory analytics on Big Data.
  3. IBM Power8 is a high-bandwidth low-latency scaleable hardware architecture for diverse workloads.

In a world where the speed and volume of data is increasing by the day, being able to scale is an increasingly stringent demand. Scale is not only about being able to store a large amount of data, but as data size grows, it gets gradually more difficult to move data. In classic architectures, running analytics used to be something that you did in your analytic data-warehouse, and moving an aggregated, filtered or sampled dataset from your main storage into the data-warehouse was an acceptable solution. Now that analytics touches a growing number of data sources, each of ever increasing size, moving the data is less of an option.

To provide fast turnaround time in deep analytics, the computation has to be moved close to the data, not the other way around. Hadoop has brought this technology to general availability with MapReduce over the past half decade, but it always has remained a programming model that was difficult to understand, as the concepts originated in High Performance Computing.

Apache Spark is the game changer currently moving at incredible speed in this space, as it offers an unprecedented open toolkit for machine learning, graph analytics, streaming and SQL.

While most of the world is running Spark on Intel hardware, Spark as a technology is platform independent, which opens the doors for alternative platforms, such as OpenPOWER. IBM is heavily committed to developing Spark, as announced last June.

After building Apache Spark on our Power8 machine, we were able to instantly run our existing python and scala code. We noticed that the Power8 architecture is especially favorable towards jobs with a high memory bandwidth demandUsing a dataset of a five-year history of github, (100GB of gzipped JSON files), we were able to churn through the entire set in under an hour, processing over 100 million events. After processing, we can load the resulting dataset into SynerScope for a deeper inspection.

The image below shows the top 100.000 most active projects, grouped by co-committers. Projects that share committers are close to each other. Interestingly, this type of involvement-based grouping shows very clearly how different programmer communities are separated. The island of iPhone development (in orange) is really isolated from the island of Android developers.

With Spark on Power8, we were able to handle a huge dataset, reduce it into its key characteristics and it allowed us to make sense of complex mixed sources.

 

Analyzing patents with Google Dataproc and SynerScope

Author: Jorik Blaas

Google Cloud Dataproc is the latest publicly accessible beta product in the Google Cloud Platform portfolio, giving users access to managed Hadoop and Apache Spark for at-scale analytics.

In real-life, many datasets are in a format that you cannot easily deal with directly. Patents are a typical example, they mix textual documents and some semi-structured data. The US Patent Office keeps a publicly available database with all patents, and this post shows how Apache Spark, hosted on Cloud Dataproc can be used to process this huge collection of data.

Setup

First of all, get gcloud up and running, you may need to update the binary to support the gcloud beta dataproc command by running gcloud components update.

Within Dataproc, you will have your own cluster to run spark jobs on. To provision this cluster, you can simply run

 

gcloud beta dataproc clusters create test-cluster

 

This will create a Cloud Dataproc cluster with 1 master node and 2 workers, each with 4 virtual CPUs. You can also use the web interface in the Developers Console (under Big Data -> Dataproc).

Basic setup

First of all, let’s submit a simple hello world to check that everything works, create a file hello.py, with the following content:

 

#!/usr/bin/python
import pyspark
sc = pyspark.SparkContext()
print sc.version
gcloud beta dataproc jobs submit pyspark --cluster test-cluster hello.py

 

If all is well, you will see your python file being submitted into your cluster, and after a while you will see the output, showing you the Apache Spark version number.

Crunching the USPTO patent database

The uspto-pair repository contains a huge number of patent applications, and is publicly available as a Google cloud store (gs://uspto-pair/). However, each single patent is stored as a .zip file, and in order to transform this data into a tabular format, each file needs to be individually decompressed and read. This is a typical match for Spark’s bulk data processing capabilities.

The good news is that Dataproc integrates with Google Cloud Storage transparently, so you do not need to worry about transfers from Cloud Storage to your cluster’s HDFS filesystem.

Each File in the USPTO database is just a flat zip file, like so:

 

 1610 2013-06-11T13:19:36Z gs://uspto-pair/applications/05900088.zip

Each of these ZIP files contains a number of TAB separated files:

 

Archive: 05900088.zip
Zip file size: 1610 bytes, number of entries: 4
?rw------- 2.0 unx 518 b- defN 13-Jun-11 13:19 05900088/README.txt
?rw------- 2.0 unx 139 b- defN 13-Jun-11 13:19 05900088/05900088-address_and_attorney_agent.tsv
?rw------- 2.0 unx 598 b- defN 13-Jun-11 13:19 05900088/05900088-application_data.tsv
?rw------- 2.0 unx 227 b- defN 13-Jun-11 13:19 05900088/05900088-transaction_history.tsv
4 files, 1482 bytes uncompressed, 992 bytes compressed: 33.1%

 

The main file of interest is the -application_data.tsv file, it contains a tab-delimited record with all key information for the patent:

 

Application Number
Filing or 371 (c) Date
Application Type
Examiner Name
Group Art Unit
Confirmation Number
Attorney Docket Number
Class / Subclass
First Named Inventor
Entity Status
Customer Number
Status
Status Date
Location
Location Date
Earliest Publication No
Earliest Publication Date
Patent Number
Issue Date of Patent
AIA (First Inventor to File)
Title of Invention
  05/900,088
04-26-1978
Utility

2504
3077

250/211.00J
JOSEPH BOREL (FR)
Undiscounted

Patented Case
03-06-1979
FILE REPOSITORY (FRANCONIA)
04-14-1995


4,143,266
03-06-1979
No
METHOD AND DEVICE FOR DETECTING RADIATIONS

 

We will iterate through all of the zip files using Spark’s binaryFiles primitive, and for each file use python zipfile to get to the contents of the tsv file within. Each tsv will then be converted into a python dictionary with key/value pairs similar to the original structure.

We have written a script to map the zip files into data, basically defining the data transformation step from raw into structured data. Thanks to the brevity of PySpark code, we can list it here in full:

import pyspark
import zipfile
import cStringIO

sc = pyspark.SparkContext()

# utility function to process a binary zipfile and extract 
# the content of -application_data.tsv from it
def getApplicationTSVFromZip(x):
  fn,content = x
  # open the zip file from memory
  zip = zipfile.ZipFile(cStringIO.StringIO(content))
  for f in zip.filelist:
    # extract the first file of interest
    if f.filename.endswith("-application_data.tsv"):
      return zip.open(f).read()

# given the -application_data.tsv contents, return a dictionary
# with key/value pairs for the data contained in it
def TSVToRecords(x):
  # tab separated records, with two columns, one record per line
  lines = [_.split("\t", 2) for _ in x.split("\n")]
  oklines = filter( lambda x: len(x)==2, lines )
  # turn them into a key/value dictionary
  d = {a: b for (a, b) in oklines if b != '-'}
  return d

# load directly from the google cloud storage
# take all of the patents starting with number 0600....
q = sc.binaryFiles("gs://uspto-pair/applications/0600*")
d = q.map( getApplicationTSVFromZip ).map( TSVToRecords ).repartition(64)
d.saveAsPickleFile("gs://dataproc-UUID/uspto-out-0600")

 

We can convert the intermediary file (with all the pickled dictionaries):

 

import pyspark
import csv
import cStringIO

sc = pyspark.SparkContext()

def csvline2string(one_line_of_data):
 si = cStringIO.StringIO()
 cw = csv.writer(si)
 cw.writerow(one_line_of_data)
 return si.getvalue().strip('\r\n')

q = sc.pickleFile("gs://dataproc-33c9ac81-ff09-4b42-be89-57e56c695739-eu/uspto-out-0600-2")
keys = q.flatMap(lambda x: x.keys()).distinct().collect()
print keys
records = q.map(lambda x: [str(x.get(k) or '') for k in keys])
csvrecords = records.map( csvline2string )
csvrecords.saveAsTextFile("gs://dataproc-33c9ac81-ff09-4b42-be89-57e56c695739-eu/uspto-out-0600-csv")

 

The resulting csv file will contain one row for each patent:

 

Confirmation Number,Patent Number,Attorney Docket Number,Location,Class / Subclass,First Named Inventor,Group Art Unit,Status Date,Status,Issue Date of Patent,Filing or 371 (c) Date,Title of Invention,Entity Status,Location Date,Application Type,AIA (First Inventor to File
),Application Number,Examiner Name
7981,"4,320,273",I0105991,FILE REPOSITORY (FRANCONIA),219/999.999,MITSUYUKI KIUCHI (JP),2103,03-16-1982,Patented Case,03-16-1982,01-22-1979,APPARATUS FOR HEATING AN ELECTRICALLY CONDUCTIVE COOKING UTENSILE BY MAGNETIC INDUCTION,Undiscounted,06-10-1999,Utility,No,"06/005,57
4","LEUNG, PHILIP H"
7994,"4,245,042",NONE,FILE REPOSITORY (FRANCONIA),435/030,,1302,09-27-1980,Patented Case,01-13-1981,01-22-1979,DEVICE FOR HARVESTING CELL CULTURES,Undiscounted,06-10-1999,Utility,No,"06/005,587",

 

Now that all the patent meta-data has been extracted from the zip files, we can load it straight into SynerScope Marcato for further exploration. The set contains a bit over 400.000 patents, covering the period from 1979 to 1988.

By selecting a few key fields, and by pointing Marcato to the on-line USPTO database, we can quickly set up a dashboard that shows the relation between patent attributes (such as status, class and location) and patent titles, and explore these all by filing or approval date.

With SynerScope Marcato we can also quickly identify temporal trends and explore the activity of certain groups of patents over time. When selecting a group of patents, key words directly pop up that identify this subset, truly combining analytics and search.

Patents on resin and polymer compositions, filed in categories 260/029, 260/042 and some others in the same range seem to have dropped off sharply after 1980.

Deleting your Cloud Dataproc cluster

You only need to run Cloud Dataproc clusters when you need then. When you are done with your cluster you can delete with the following command:

 

gcloud beta dataproc clusters delete test-cluster

 

You can also use the Google Developers Console to delete the cluster.