fbpx
Feature Engineering with Apache Spark and Optimus Feature Engineering with Apache Spark and Optimus
When we talk about Feature Engineering we refer to creating new features from your existing ones to improve model performance. Sometimes... Feature Engineering with Apache Spark and Optimus

When we talk about Feature Engineering we refer to creating new features from your existing ones to improve model performance. Sometimes this is the case, or sometimes you need to do it because a certain model doesn’t recognize the data as you have it, so these transformations let you run most of Machine and Deep Learning algorithms.

Now with Optimus we have made easy the process of Feature Engineering for Big Data.

To install Optimus you just need to do:

$ pip install optimuspyspark

These methods are part of the DataFrameTransformer, and they are a high level of abstraction for Spark Feature Engineering methods. You’ll see how easy it is to prepare your data with Optimus for Machine Learning.

How to do Feature Engineering with Optimus?

String to index:

This method maps a string column of labels to an ML column of label indices. If the input column is numeric, we cast it to string and index the string values.

input_cols argument receives a list of columns to be indexed.

Let’s start by creating a DataFrame with Optimus.

# Importing Optimus
import optimus as op
#Importing utilities
tools = op.Utilities()

# Creating DF with Optimus
data = [('Japan', 'Tokyo', 37800000),('USA', 'New York', 19795791),('France', 'Paris', 12341418),
              ('Spain','Madrid',6489162)]
df = tools.create_data_frame(data, ["country", "city", "population"])

# Instantiating transformer
transformer = op.DataFrameTransformer(df)

# Show DF
transformer.show()
+-------+--------+----------+
|country|    city|population|
+-------+--------+----------+
|  Japan|   Tokyo|  37800000|
+-------+--------+----------+
|    USA|New York|  19795791|
+-------+--------+----------+
| France|   Paris|  12341418|
+-------+--------+----------+
|  Spain|  Madrid|   6489162|
+-------+--------+----------+

To index the sttrings in the country and city column we just need to do

# Indexing columns 'city" and 'country'
transformer.string_to_index(["city", "country"])

# Show indexed DF
transformer.show()
+-------+--------+----------+----------+-------------+
|country|    city|population|city_index|country_index|
+-------+--------+----------+----------+-------------+
|  Japan|   Tokyo|  37800000|       1.0|          1.0|
+-------+--------+----------+----------+-------------+
|    USA|New York|  19795791|       2.0|          3.0|
+-------+--------+----------+----------+-------------+
| France|   Paris|  12341418|       3.0|          2.0|
+-------+--------+----------+----------+-------------+
|  Spain|  Madrid|   6489162|       0.0|          0.0|
+-------+--------+----------+----------+-------------+

Index to String:

This method maps a column of indices back to a new column of corresponding string values. The index-string mapping is either from the ML (Spark) attributes of the input column, or from user-supplied labels (which take precedence over ML attributes).

Let’s go back to strings with the DataFrame we created in the last step.

# Importing Optimus
import optimus as op
#Importing utilities
tools = op.Utilities()

# Instantiating transformer
transformer = op.DataFrameTransformer(df)
# Indexing columns 'city" and 'country'
transformer.string_to_index(["city", "country"])
# Going back to strings from index
transformer.index_to_string(["country_index"])

# Show DF with column "county_index" back to string
transformer.get_data_frame.select("country","country_index", "country_index_string").show()
+-------+-------------+----------+----------+
|country|country_index| country_index_string|
+-------+-------------+---------------------+
|  Japan|          1.0|                Japan|
+-------+-------------+---------------------+
|    USA|          3.0|                  USA|
+-------+-------------+---------------------+
| France|          2.0|               France|
+-------+-------------+---------------------+
|  Spain|          0.0|                Spain|
+-------+-------------+---------------------+

One Hot Encoder:

One hot encoding is a process by which categorical variables are converted into a form that could be provided to ML algorithms to do a better job in prediction.

This method maps a column of label indices to a column of binary vectors, with at most a single one-value.

Let’s create a sample dataframe to see what OHE does:

# Importing Optimus
import optimus as op
#Importing utilities
tools = op.Utilities()

# Creating DataFrame
data = [
(0, "a"),
(1, "b"),
(2, "c"),
(3, "a"),
(4, "a"),
(5, "c")
]
df = tools.create_data_frame(data,["id", "category"])

# Instantiating the transformer
transformer = op.DataFrameTransformer(df)

# One Hot Encoding
transformer.one_hot_encoder(["id"])

# Show encoded dataframe
transformer.show()
+---+--------+-------------+
| id|category|   id_encoded|
+---+--------+-------------+
|  0|       a|(5,[0],[1.0])|
+---+--------+-------------+
|  1|       b|(5,[1],[1.0])|
+---+--------+-------------+
|  2|       c|(5,[2],[1.0])|
+---+--------+-------------+
|  3|       a|(5,[3],[1.0])|
+---+--------+-------------+
|  4|       a|(5,[4],[1.0])|
+---+--------+-------------+
|  5|       c|    (5,[],[])|
+---+--------+-------------+

SQL Transformations:

This method implements the transformations which are defined by SQL statement. Spark only support SQL syntax like “SELECT … FROM __THIS__ …” where “__THIS__” represents the underlying table of the input dataframe. Thank Spark for this amazing function.

Let’s create a sample DataFrame to test this function.

# Importing Optimus
import optimus as op
#Importing utilities
tools = op.Utilities()

# Creating DataFrame
data = [
(0, 1.0, 3.0),
(2, 2.0, 5.0)
]

df = tools.create_data_frame(data,["id", "v1", "v2"])

# Instantiating the transformer
transformer = op.DataFrameTransformer(df)

This dataframe is just this:

+---+---+---+
| id| v1| v2|
+---+---+---+
|  0|1.0|3.0|
+---+---+---+
|  2|2.0|5.0|
+---+---+---+

Now let’s create two new columns from these ones. The first will be the sum of the columns v1 and v2, and the second one will be the multiplication of this two columns. With the sql() function we just need to pass the sql expression and use at the end FROM __THIS__ that will be the underlying table of the input dataframe.

transformer.sql("SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")

And this will output:

+---+---+---+---+----+---+----+
| id| v1| v2| v3|  v4| v3|  v4|
+---+---+---+---+----+---+----+
|  0|1.0|3.0|4.0| 3.0|4.0| 3.0|
+---+---+---+---+----+---+----+
|  2|2.0|5.0|7.0|10.0|7.0|10.0|
+---+---+---+---+----+---+----+

Vector Assembler:

This method combines a given list of columns into a single vector column.

This is very important because lots of Machine Learning algorithms in Spark need this format to work.

Let’s create a sample dataframe to see what vector assembler does:

# Importing Optimus
import optimus as op
#Importing utilities
tools = op.Utilities()
# Import Vectors
from pyspark.ml.linalg import Vectors

# Creating DataFrame
data = [(0, 18, 1.0, Vectors.dense([0.0, 10.0, 0.5]), 1.0)]

df = tools.create_data_frame(data,["id", "hour", "mobile", "user_features", "clicked"]

# Instantiating the transformer
transformer = op.DataFrameTransformer(df)

# Assemble features
transformer.vector_assembler(["hour", "mobile", "userFeatures"])

# Show assembled df
print("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'")
transform.get_data_frame.select("features", "clicked").show(truncate=False)

This will output:

+-----------------------+-------+
|features               |clicked|
+-----------------------+-------+
|[18.0,1.0,0.0,10.0,0.5]|1.0    |
+-----------------------+-------+

You can compare now how easy it is to do Feature Engineering with Optimus and with other frameworks.

Contributors:

 

Original Source.

Favio André Vázquez

Favio André Vázquez

Physicist and computer engineer. Working on Big Data and Computational Cosmology. I have a passion for science, philosophy, programming, and Data Science. Right now I'm working on data science, machine learning and big data at BBVA Data & Analytics @ Mexico City. I love new challenges, working with a good team and interesting problems to solve. I'm part of Apache Spark collaboration, helping in MLLib, Core and the Documentation. Love applying my knowledge and expertise in science, data analysis, visualization and data processing. Working with Data Science in Apache Spark: Since 2015, I've been part of the collaboration, with some minor bug fixes, and improvement of documentation. I also suggested (with others) the jump to Hadoop 2 as the default manager for Spark, and in the end I changed lots of the documentation for Spark and its core. Continuing solving problems for end users regarding my changes and updates, and always keep up to date with Spark new features. Working with Data Science and Physics: I'm part of the CosmoSIS collaboration, a framework for cosmological parameter inference, and mixing data science techniques with computational physics. I'm working with Modified Gravity in the BZ parametrization, to find new ways of explaining Dark Energy. I'm an evangelist for this collaboration in Mexico, and for Data Science and Cosmology. I'm also trying to use Apache Spark to analyze results of cosmological simulations and Galaxies catalogs. I'm also using C++ and Julia to develop scientific programs with parallel programming and mixing Spark with it.

1