Apache Hivemall, a collection of machine-learning-related Hive user-defined functions (UDFs), offers Spark integration as documented here. Now, we will see how it works in PySpark.
Note that Hivemall requires Spark 2.1+. This article particularly uses Spark 2.3 and Hivemall 0.5.2, and the entire contents are available at this Google Colabo notebook.
Installation
We do need to set up Spark and Hadoop environment first of all. For example, if you are using Colabo, follow instructions as:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://mirror.reverse.net/pub/apache/spark/spark-2.3.3/spark-2.3.3-bin-hadoop2.7.tgz
!tar xf spark-2.3.3-bin-hadoop2.7.tgz
!pip install -q findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.3.3-bin-hadoop2.7"
import findspark
findspark.init()
Next, download hivemall-spark2.x-0.y.z-incubating-with-dependencies.jar
corresponding to your Spark version from the ASF repository:
wget -q http://mirror.reverse.net/pub/apache/incubator/hivemall/0.5.2-incubating/hivemall-spark2.3-0.5.2-incubating-with-dependencies.jar
Create Spark session
Connect to the Spark instance and start a new session:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local[*]').config('spark.jars', 'hivemall-spark2.3-0.5.2-incubating-with-dependencies.jar').enableHiveSupport().getOrCreate()
The Hivemall .jar
file is explicitly loaded from jars
option, and Hive connection and their UDF support are enabled by enableHiveSupport()
.
Register Hive(mall) UDF to Spark
If a Spark session is instantiated with enableHiveSupport()
as the above example, we can use Hive UDFs in Spark. This GitHub repository gives more explanation and examples.
Basically, the only thing we have to do is to load a Hive function from CREATE TEMPORARY FUNCTION
statement with its appropriate class path:
spark.sql("CREATE TEMPORARY FUNCTION hivemall_version AS 'hivemall.HivemallVersionUDF'")
Eventually, Spark SQL allows us to use the UDF just like HiveQL:
spark.sql("SELECT hivemall_version()").show()
+------------------+
|hivemall_version()|
+------------------+
| 0.5.2-incubating|
+------------------+
Example: Binary classification
To give a practical example, let's solve a customer churn prediction problem with simple binary classifier.
Register UDFs
Below is a minimal list of Hivemall functions to tackle the problem:
# preprocessing
spark.sql("CREATE TEMPORARY FUNCTION categorical_features AS 'hivemall.ftvec.trans.CategoricalFeaturesUDF'")
spark.sql("CREATE TEMPORARY FUNCTION quantitative_features AS 'hivemall.ftvec.trans.QuantitativeFeaturesUDF'")
spark.sql("CREATE TEMPORARY FUNCTION array_concat AS 'hivemall.tools.array.ArrayConcatUDF'")
# training
spark.sql("CREATE TEMPORARY FUNCTION train_classifier AS 'hivemall.classifier.GeneralClassifierUDTF'")
# prediction and evaluation
spark.sql("CREATE TEMPORARY FUNCTION sigmoid AS 'hivemall.tools.math.SigmoidGenericUDF'")
spark.sql("CREATE TEMPORARY FUNCTION extract_feature AS 'hivemall.ftvec.ExtractFeatureUDFWrapper'")
spark.sql("CREATE TEMPORARY FUNCTION extract_weight AS 'hivemall.ftvec.ExtractWeightUDFWrapper'")
spark.sql("CREATE TEMPORARY FUNCTION logloss AS 'hivemall.evaluation.LogarithmicLossUDAF'")
spark.sql("CREATE TEMPORARY FUNCTION auc AS 'hivemall.evaluation.AUCUDAF'")
Data preparation
Dataset is a small CSV file having 3,333 records:
wget -q http://dataminingconsultant.com/DKD2e_data_sets.zip
unzip -j DKD2e_data_sets.zip "**/churn.txt"
Create a Spark DataFrame from the CSV file:
import re
import pandas as pd
df = spark.createDataFrame(
pd.read_csv('churn.txt').rename(lambda c: re.sub(r'[^a-zA-Z0-9 ]', '', str(c)).lower().replace(' ', '_'), axis='columns'))
df.printSchema()
root
|-- state: string (nullable = true)
|-- account_length: long (nullable = true)
|-- area_code: long (nullable = true)
|-- phone: string (nullable = true)
|-- intl_plan: string (nullable = true)
|-- vmail_plan: string (nullable = true)
|-- vmail_message: long (nullable = true)
|-- day_mins: double (nullable = true)
|-- day_calls: long (nullable = true)
|-- day_charge: double (nullable = true)
|-- eve_mins: double (nullable = true)
|-- eve_calls: long (nullable = true)
|-- eve_charge: double (nullable = true)
|-- night_mins: double (nullable = true)
|-- night_calls: long (nullable = true)
|-- night_charge: double (nullable = true)
|-- intl_mins: double (nullable = true)
|-- intl_calls: long (nullable = true)
|-- intl_charge: double (nullable = true)
|-- custserv_calls: long (nullable = true)
|-- churn: string (nullable = true)
Notice that the column names are normalized just in case.
SparkSession.read
can be an alternative option, while the example uses pandas.read_csv()
:
df = spark.read.option('header', True).schema(schema).csv('churn.txt')
Here, schema
needs to be explicitly specified as follows, otherwise all columns are simply recognized as string:
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType
schema = StructType([
StructField("A", IntegerType()),
StructField("B", DoubleType()),
StructField("C", StringType())
])
Finally, split the records into 80% training and 20% validation samples:
df_train, df_test = df.randomSplit([0.8, 0.2], seed=31)
Training
It's time to learn how to do machine learning with Hivemall; every single step of machine learning workflow can be implemented in the form of SQL-like query as shown in here. In case of using Hivemall with PySpark, createOrReplaceTempView('table_name')
enables those queries to access to Spark DataFrames:
df_train.createOrReplaceTempView('train')
Replace the train
table with vectorized training samples:
spark.sql("""
CREATE OR REPLACE TEMPORARY VIEW train AS
SELECT
array_concat(
categorical_features(
array('intl_plan', 'vmail_plan'),
intl_plan, vmail_plan
),
quantitative_features(
array('custserv_calls', 'account_length'),
custserv_calls, account_length
)
) as features,
if(churn = 'True.', 1, 0) as label
FROM
train
""")
For the sake of simplicity, I randomly choose four attributes, intl_plan
and vmail_plan
for categorical features, and custserve_calls
and account_length
for quantitative features. Of course we can incorporate more attributes and/or apply more aggressive feature engineering techniques such as standardization in reality.
Building a logistic regression model is done by just 10 lines of query:
df_model = spark.sql("""
SELECT
feature, avg(weight) as weight
FROM (
SELECT
train_classifier(features, label) as (feature, weight)
FROM
train
) t
GROUP BY 1
""")
df_model.show()
+--------------+-------------------+
| feature| weight|
+--------------+-------------------+
|custserv_calls| 9.037338733673096|
| intl_plan#no| -7.765866041183472|
| vmail_plan#no| 3.730261445045471|
|account_length|0.20337164402008057|
|vmail_plan#yes| -5.873621702194214|
| intl_plan#yes| 11.210701942443848|
+--------------+-------------------+
Yes, Hivemall represents machine learning models in the form of table.
Prediction
Convert the 20% test set in the same way as the training samples, and explode
them for prediction:
df_test.createOrReplaceTempView('test')
spark.sql("""
CREATE OR REPLACE TEMPORARY VIEW test AS
SELECT
phone,
label,
extract_feature(fv) AS feature,
extract_weight(fv) AS value
FROM (
SELECT
phone,
array_concat(
categorical_features(
array('intl_plan', 'vmail_plan'),
intl_plan, vmail_plan
),
quantitative_features(
array('custserv_calls', 'account_length'),
custserv_calls, account_length
)
) as features,
if(churn = 'True.', 1, 0) as label
FROM
test
) t1
LATERAL VIEW explode(features) t2 AS fv
""")
Join the logistic regression model with test samples and their feature set, and take sigmoid of weighted sum:
df_model.createOrReplaceTempView('model')
df_prediction = spark.sql("""
SELECT
phone,
label as expected,
sigmoid(sum(weight * value)) as prob
FROM
test t LEFT OUTER JOIN model m
ON t.feature = m.feature
GROUP BY 1, 2
""")
df_prediction.show()
+--------+--------+----------+
| phone|expected| prob|
+--------+--------+----------+
|414-9054| 0| 1.0|
|372-1493| 0| 1.0|
|339-7541| 1| 1.0|
|400-3150| 0| 1.0|
|365-3562| 0| 1.0|
|356-2992| 0| 0.9999807|
...
Since we did nothing special to achieve a better prediction model, prediction results are obviously poor.
Evaluation
Anyway, once prediction results are obtained, we can evaluate the accuracy of prediction. For instance, Hivemall supports Area Under the ROC Curve (AUC) and Log Loss metric for binary classification:
df_prediction.createOrReplaceTempView('prediction')
spark.sql("""
SELECT
sum(IF(IF(prob >= 0.5, 1, 0) = expected, 1.0, 0.0)) / count(1) AS accuracy,
auc(prob, expected) AS auc,
logloss(prob, expected) AS logloss
FROM (
SELECT prob, expected
FROM prediction
ORDER BY prob DESC
) t
""").show()
+--------------------+------------------+------------------+
| accuracy| auc| logloss|
+--------------------+------------------+------------------+
|0.157037037037037...|0.6012885662431942|24.322495101659264|
+--------------------+------------------+------------------+
Again, the result is just an example, and we do need to tweak the model to make accurate prediction.
Conclusion
As the example above shows, we have successfully used Hivemall in combination with PySpark. That is, we can directly access to the Hivemall capabilities from Python code for each of preprocessing, training, prediction, and evaluation phase.
In practice, I can easily imagine jointly using the other Python packages e.g., scikit-learn for training, Airflow for workflow management, Flask for providing REST APIs. This fact definitely expands the potential uses of Hivemall.
Share
Categories
See also
- 2019-10-26
- ApacheCon 2019 North America #ACNA19 & Europe #ACEU19
- 2018-10-26
- Apache Hivemall at #ODSCEurope, #RecSys2018, and #MbedConnect
- 2015-10-13
- PyCon JP 2015 #pyconjp
Last updated: 2022-08-06
Author: Takuya Kitazawa
Takuya Kitazawa is a freelance software developer based in British Columbia, Canada. As a technologist specializing in AI and data-driven solutions, he has worked globally at Big Tech and start-up companies for a decade. At the intersection of tech and society, he is passionate about promoting the ethical use of information technologies through his mentoring, business consultation, and public engagement activities. See CV for more information, or contact at [email protected].
Now Gift a cup of coffeeDisclaimer
- Opinions are my own and do not represent the views of organizations I am/was belonging to.
- I am doing my best to ensure the accuracy and fair use of the information. However, there might be some errors, outdated information, or biased subjective statements because the main purpose of this blog is to jot down my personal thoughts as soon as possible before conducting an extensive investigation. Visitors understand the limitations and rely on any information at their own risk.
- That said, if there is any issue with the content, please contact me so I can take the necessary action.