fbpx
Time Series Forecasting and Simulations: Python Signature Transformation Method  Time Series Forecasting and Simulations: Python Signature Transformation Method 
  About the author on Time Series Forecasting: Pavel Zapolskii, Data Science and ML specialist, is now a Senior Trading Researcher... Time Series Forecasting and Simulations: Python Signature Transformation Method 

 

About the author on Time Series Forecasting: Pavel Zapolskii, Data Science and ML specialist, is now a Senior Trading Researcher at Exness. With over 6 years of experience in data science, trading, and business growth, he is a seasoned mathematician aiming to specialize in the FinTech and GameDev industries. For the last few years, he has been advising top companies in their industries regarding ML and Data Science.

Friends, I am very excited to share with you a truly amazing Stochastic Process Math invention—the Signature of Time Series! I’m dedicating this article on time series forecasting to this extremely complex issue with a lot of details to cover.

First, I will try to provide a clear definition of Time Series Signature and explain its application in classification and time series forecasting. Having said that, I will try to shield you as much as possible from the complex math underlying this technology.

Second, we will practice data augmentation. We will enrich a dataset with a large number of vectors distributed in the same way as the original ones, for which I will employ the evolutionary method of recovery, which has no restrictions on the original dataset’s dimensionality.

Without further ado, let me tell you about the two main ways Signatures are currently used with great success:

  • Feature Engineering for n-dimensional Time Series, which replaces the commonly used mean(), standard(), and sum() functions. [Part 1].
  • Monte Carlo simulation of similarly distributed n-dimensional processes to calculate probabilities, confidence intervals, and so on [Part 2].

The signature approach is a nonparametric, robust method for extracting characteristic features from data that can then be used to build machine learning models.

Article plan. Image by author.

We will also look at the left side of my plan, which includes two distinct tasks: inversion and sampling.

The results of this work can be used for:

  1. Calculating sample statistics across a dataset (confidence intervals, probabilities).
  2. Enriching a small dataset for further training complex machine learning models
  3. Anonymizing data while preserving the overall properties of time series

How does it work?

To begin, let us define the transformation process — that is, what we do with the data — rather than the signature.

Signature Transformation entails calculating on internal data of n-dim Time Series with a length of t and matching the (n*t) table data to the m-length vector.

Almost always, m will be larger than n*t because we want to build “long” Signatures to extract as much information as possible from the Time Series. 

As an experienced reader can attest, this feature generation is fairly generic. It is far more convenient to make predictions or categorizations based on the resulting vectors, and you can ensure that the signature vector contains much cleaner information than the original Time Series and its basic derivatives.

Before model cooking comes hours of feature engineering reckoning.

(Mr. Zapolskii)

Example: Classification of profitable/non-profitable clients

Assume we have some profitable customers and some who aren’t. We are looking at data gathered from these customers over time, such as timed metrics of the processes they go through (trajectories). However, this data is complex and messy, and there is no simple formula for determining whether a customer is profitable or not. However, among all that noisy data, there are some patterns (Stochastic Processes) that we want to be able to identify.

The diagram shows that when signatures are used instead of straightforward metrics data (trajectories), the distinction between profitable and non-profitable becomes more clear. There is less noise interference, and we get more useful information: signatures are closer to the “true pattern” of profitable/non-profitable clients. 

Segmentation TS with signatures. Image by author

Spoilers

Because there will be a lot of math in this article, here is a spoiler of the MASSIVE potential that Signatures hold:

Here’s how a simple Random Forest differentiates Brownian motion (scale_step=0.10) from (scale_step=0.12). 

Time Series. Image by author

VS how the same simple Random Forest distinguishes Brownian motion signatures (scale_step=0.10) from (scale_step=0.12).

Signatures. Image by author

That piqued your interest, right? In the following chapter, I’ll go into greater detail about the actual definition.

Image generated using Simplified [1].

If you’re wondering if a Signature exists in reality or if I made it up: a combination of signatures and a gradient-boosting regression model, these guys won the PhysioNet 2019 cardiology computing competition, which you can read about here [2]. For example, another team won first place in the ICDAR 2013 online competition for recognizing isolated Chinese characters by combining signatures with convolutional neural networks here [3].

Some math to make it work

Using the signature approach, based on incoming data presented as a parameterized path (time x value), it is possible to generate features that generalize well and fully describe these data, because signatures contain complete information about the path’s analytical and geometric properties.

Define the signature of a first-order integral over a parameterized path (X), which is essentially our piecewise linear Time Series with a fixed time interval, where i is the number of dimensions: 

First order signature. Image by author

Similarly, we can define the signature of order k as a multiple integral:

K order signature. Image by author

Definition. The signature is an infinite sequence of all possible signatures in varying orders across all linear sections of our piecewise linear Time Series:

For example, consider calculating signatures of different orders for the same dimension for a single linear section (a, b).

Repeated signatures. Image by author

Is there anything that it makes you think of? This is almost identical to the coefficients in the function’s Taylor expansion series.

To be frank, thinking about it in terms of Fourier transformations is useful. Basically, if you decompose your Time Series into an infinite set of basis vectors, the coefficients you obtain tell you everything you need to know about the pattern that shapes your data across time.

If you are interested in the topic and want to understand the Signature and its properties in more detail, I recommend this article [4]. 

Geometric interpretation

To help you understand what Signatures can represent, consider this geometric example: Draw a 2-dimensional curve and a diagonal in the figure below. The total area of the resulting arcs is known as the Levy area.

Levi’s Area A. Image by author.

And it is expressed in terms of Signatures 2d order!

Levi’s Area through 2d order signatures

If we carefully integrate the curves according to the rule, we get the following expressions in X:

Signature product decomposition

A key feature of the signature is that the product of two elements can always be represented as the sum of other elements.

Let’s define multi-index I = (i1, i2,.., ik) (as in J), and K — their unification. The product of the I and J-dim signatures can be represented as follows: 

Shuffle product definition. Image by author

This property demonstrates that the signature terms are not algebraically independent, and it also allows you to work with linear objects rather than products.

This is a simple example of a signature product: 

Shuffle product example. Image by author

We need the property of independence in order for our Signature vector to be as useful as possible in the end. For this, mathematicians came up with a modification of signatures called log-signatures:

Logarithmic signature. Image by author

I won’t go into too much detail, but the idea is that the Signature can be “encoded” as an element of a power series space (i.e. there is a one-to-one correspondence between signatures and formal power series). This article [4] provides more information on the subject.

Splitting signatures into sections of paths

Last but not least, a fancy math property of our Signature is the representation of [a, c] in terms of Signatures [a, b] and [b, c]:

Chen’s Identity. Image by author

This property, known as the Chen identity, allows us to view the signatures of piecewise linear paths as the sum and product of the signatures of their linear sections.

Image generated using Simplified [1]

Congratulations! You survived.

Okay, now let’s get into the code and testing. I just want to remind everyone what is important about the Signatures in case anyone skipped all the math (no way!):

  • Signature is an infinite number of coefficients that result from various integrations of our curve (Time Series).
  • We can calculate signatures along large paths by splitting them into smaller sections.
  • To ensure the maximum purity of information, we require algebraic independence, which is where log-signature comes in. 

Core transformation. Python Code

As usual, let’s start with the imports:

import numpy as np
import pandas as pd
import iisignature
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
from scipy.stats import norm
from tqdm.auto import tqdm
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report

The most important library here is iisignature, which calculates logarithmic signatures correctly. I looked into open libraries and discovered that not all frameworks are correctly coded: the well-known esig library makes mistakes with multidimensional Time Series.

We will use a hard-coded transformation of a Time Series, the lead-lag algorithm (see [5], p. 20), which is a simple one-step forward and backward in time relative to our original series.

  • In the case of one-dimensional data, it aids in increasing the dimension of the path, which is required for the correct operation of signature methods.
  • This transformation is associated with sample variance (see [5], p. 25) and is very important in time series analysis.
def leadlag(X, use_time = False, print_arrays = False):
   if (not use_time):

       lead = []
       lag = []

       for val_lag, val_lead in zip(X[:-1], X[1:]):

           lead.append(val_lag)
           lag.append(val_lag)

           lead.append(val_lead)
           lag.append(val_lag)

       lead.append(X[-1])
       lag.append(X[-1])

       if (print_arrays):
           print(lead)
           print(lag)

       return np.c_[lead, lag]
   else:
       lead = []
       lag = []
       t = []

       time = 0

       t.append(time)
       lag.append(X[0])
       lead.append(X[0])

       for val_lag, val_lead in zip(X[:-1], X[1:]):

           time += 1

           t.append(time)
           lead.append(val_lag)
           lag.append(val_lag)

           t.append(time)
           lead.append(val_lead)
           lag.append(val_lag)

           t.append(time)
           lead.append(val_lead)
           lag.append(val_lead)

       if (print_arrays):
           print(lead)
           print(lag)
           print(t)

       return np.c_[lead, lag, t]

You can use your client data to solve a classification or time series forecasting problem, but for a model example, we’ll use the Poisson and Brownian series: 

def generatePoisson(n):
   lam = 0.25
   events = np.random.poisson(lam, n) 
   counts = np.cumsum(events)
   return counts

def generateBrownian(n, delta=0.1):
   x = 0
   u = []
   for k in range(n):
       x = x + norm.rvs(scale=delta**2)
       u.append([x])
   return np.array(u)

Let us visualize our two-dimensional time series for the transformation:

def plot3D(path, recovered_path=None):
   fig = plt.figure()
   ax = fig.add_subplot(111, projection='3d'# Create a 3D subplot

   # Plot the path
   ax.plot(path[:, 0], path[:, 1], path[:, 2], label='Original Path')

   # Optionally, plot the recovered path
   if recovered_path is not None:
       ax.plot(recovered_path[:, 0], recovered_path[:, 1], recovered_path[:, 2], label='Recovered Path')

   ax.set_xlabel('X')
   ax.set_ylabel('Y')
   ax.set_zlabel('Z')
   ax.set_title('3D Path')
   ax.legend()

   plt.show()

def add_time(X):
   return np.c_[X, list(range(len(X)))]
n_points = 7
n_dims = 2
x = generateBrownian(n_points)
y = generatePoisson(n_points)
path = np.c_[x, y]
plot3D(add_time(path))

Brownian*Poisson process. Image by Author

 

N = 2, T = 70 — the total length of the data is 140; with lead-lag transformation, it is 4 times longer, or 560. We now estimate the size of the log-signature vector for various orders: 

def stream_to_logsig(X, order):
   X = np.array(X)
   s = iisignature.prepare(X.shape[1], order)
   return iisignature.logsig(X, s)

def data_transform(data):
   leadlag_path = leadlag(data)
   return leadlag_path

transformed_path = data_transform(path)
logsig = stream_to_logsig(transformed_path, 5)
print(len(logsig))
logsig = stream_to_logsig(transformed_path, 7)
print(len(logsig))
logsig = stream_to_logsig(transformed_path, 10)
print(len(logsig))

The answers are 294, 3304, and 145338.

Interestingly, our 5 order basis is less than the total amount of data, and order 10 requires a significant amount of time to calculate.

The rest is a matter of technique; we take a table of users represented as vectors of length 145338 and forecast their next metric values, churn, and whatever else you can think of if there is a training dataset. 

Efficiency with 1-dim and Random Forest

The simplest way to see the informational value of Signature transformation is to use a classification problem: compare how a Random Forest performs/deals with the separation of Brownian motion with a small variance. By running the code below, we will obtain the result from the beginning of the article: 

path = pd.DataFrame()
pathSign = pd.DataFrame()

# Simulations
order = 10
for i in range(0, 500):
   transformed_path = data_transform(generateBrownian(70, 0.1))
   logsig = stream_to_logsig(transformed_path, order)
   pathSign = pathSign.append(pd.DataFrame(logsig.flatten(),  columns = [i]).T)

   path = path.append(pd.DataFrame(generateBrownian(70, 0.1).flatten(),  columns = [i]).T)

for i in range(501, 1000):
   transformed_path = data_transform(generateBrownian(70, 0.12))
   logsig = stream_to_logsig(transformed_path, order)
   pathSign = pathSign.append(pd.DataFrame(logsig.flatten(),  columns = [i]).T)

   path = path.append(pd.DataFrame(generateBrownian(70, 0.12).flatten(),  columns = [i]).T)

# Fitting
path['target'] = 0  # or pathSign
path.loc[501:, 'target'] = 1

X = path.drop('target', axis=1)
y = path['target']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
clf.fit(X_train, y_train)
y_pred = clf.predict(X_test)

# Results
print("Accuracy:", accuracy_score(y_test, y_pred))
print("Classification Report:")
print(classification_report(y_test, y_pred))

This is where we get an impressive result of improving Accuracy by 28%!

Photo by Austin Schmid on Unsplash

 

Data augmentation practice

Now I will demonstrate the implementation using financial market data: the Gold ETF (GLD) and the mining company Barrick Gold (GOLD).

We will: 

  1. Recover the path from the signature.
  2. Simulate a new Time Series sample to improve the robustness of the outputs.
  3. Calculate some statistics for the final extended dataset.

Data collection

As usual, let’s start with imports.

import yfinance as yf
import datetime
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from tqdm import tqdm
import copy
import tensorflow as tf
import iisignature

For the sake of simplicity, let us assume that we are tasked with evaluating a monthly option for the concurrent growth of two assets at the end of the month.

To reformulate this task in ML language, we must determine the probability that the event will occur; the more accurate the probability estimate, the better the option price we can offer.

We will upload the data via yfinance: 

ticker = "GlD"

start = datetime.date(2022, 1, 1)
end = datetime.date(2024, 1, 1)

data_gld = yf.download(ticker, start, end)["Close"]

plt.plot(data_gld / data_gld[0])
plt.xticks([datetime.date(2022, 1, 1), datetime.date(2023, 1, 1), datetime.date(2024, 1, 1)])

plt.title('GLD')

Normalise and check the Time Series:

 

Gold ETF 2 years dynamic

The same applies to the second asset. 

ticker = "GOLD"

start = datetime.date(2022, 1, 1)
end = datetime.date(2024, 1, 1)

data_gold =  yf.download(ticker, start, end)["Close"]

plt.plot(data_gold / data_gold[0])

plt.xticks([datetime.date(2022, 1, 1), datetime.date(2023, 1, 1), datetime.date(2024, 1, 1)])

plt.title('GOLD')

Barrick Gold 2 years dynamic

For the sake of convenience, let us create a class for data extraction and processing. In the future, we plan to add a few replication-related functions here. 

class MarketGenerator:
   def __init__(self, n_dims, tickers_array, start, end, freq, signature_order, data_transform):
       self.n_dims = n_dims
       self.tickers_array = tickers_array
       self.start = start
       self.end = end
       self.freq = freq
       self.order = signature_order
       self.data_transform = data_transform

       self.load_data()
       self.build_dataset()
       self.transform_dataset()
       self.calculate_signatures()
       self.scale_dataset()

   def load_data(self):
       self.data_array = []
       for i in range(self.n_dims):
           data = yf.download(self.tickers_array[i],  self.start, self.end)["Close"]#.resample('M').last()
           self.data_array.append(data)
       self.windows_array = []

       for i in range(self.n_dims):
           windows = []
           for _, window in self.data_array[i].resample(self.freq):
               windows.append(window)
           self.windows_array.append(windows)

   def build_dataset(self):
       self.paths = []

       for i in range(len(self.windows_array[0])):
           if (self.n_dims > 1):
               df = pd.merge(self.windows_array[0][i], self.windows_array[1][i], left_index=True, right_index=True)

               for n in range (2, self.n_dims):
                   df = pd.merge(df, self.windows_array[n][i], left_index=True, right_index=True)
               path = np.c_[df.iloc[:, [0]].values, df.iloc[:, [1]].values]
               for n in range(2, self.n_dims):
                   path = np.c_[path, df.iloc[:, [n]].values]
               self.paths.append(path/path[0])
           else:
               self.paths.append(self.windows_array[0][i].values / self.windows_array[0][i].values[0])

   def transform_dataset(self):
       self.transformed_paths = []
       for path in self.paths:
           self.transformed_paths.append(self.data_transform(path))

   def calculate_signatures(self):
       self.orig_logsig = np.array([stream_to_logsig(path, self.order) for path in tqdm(self.transformed_paths)])

   def scale_dataset(self):
       self.scaler = MinMaxScaler(feature_range=(0.00001, 0.99999))
       scaled_logsig = self.scaler.fit_transform(self.orig_logsig)

       self.logsigs = scaled_logsig[1:]
       self.conditions = scaled_logsig[:-1] # needed later for CVAE

In this class, we also define the Signatures calculation for our Time Series’ paths (assets).

n_dims = 2
tickers_array = ["GLD", "GOLD"]
start = datetime.date(2022, 1, 1)
end = datetime.date(2024, 1, 1)
freq = "M"
order = 5
def data_transform(data):
   leadlag_path = leadlag(data, use_time = True)
   return leadlag_path
MG = MarketGenerator(n_dims, tickers_array, start, end, freq, order, data_transform)
paths = MG.paths

We have prepared the data for cooking and are about to perform our first magic trick. 

Recovering Time Series from Signature

An algorithm for recovering signatures in four steps:

  1. We move randomly through a given distribution (Organism).
  2. We do this several times (Population).
  3. We combine existing organisms to create a new generation (self.evolve).
  4. We compare the error to determine how much the random Time Series’ Signature differs from the original (self.loss).

In short, we “intelligently” iterate over the values until our time series corresponds in terms of Signatures.

Valid question: why does this even work? It sounds like a kindergarten game. Where are the tough inverse functions, differentials, and so on?

In fact, all the magic is contained in one mathematical theorem, which can be expressed as follows: 

The Return of the King. Generated by Supermeme

In order to minimize the metric, a piecewise linear path that best approximates the initial path (Time Series) X must be found.

Approximation X with X hat

It is important to note that approximation convergence is proportional to step width: the wider the step, the faster the convergence, and the lower the quality; the narrower the step, the more the opposite is true.

The primary statement here is:

The “Approximation of piecewise paths” theorem

In other words, if a sufficient number of iterations are performed to generate increments for each coordinate of the path from the arithmetic distribution using values from the corresponding grid of steps, the optimal path will be obtained with a probability of one at some point.

With that, we’ve created the ‘Organism’ class, which represents an instance of a distribution that, through evolution, could become a potential original Time Series.

The main method here is self.mutate, which allows an instance of Organism to navigate the distribution. 

class Organism:
   def __init__(self, n_points, n_dims, data_transform, distribution_array):
       self.n_points = n_points
       self.n_dims = n_dims
       self.data_transform = data_transform

       self.distribution_array = distribution_array

       self.init_path()

   def __add__(self, other):
       increments = []
       for increment1, increment2 in zip(self.increments, other.increments):
           if np.random.random() < 0.5:
               increments.append(increment1)
           else:
               increments.append(increment2)

       path = np.cumsum(increments, axis = 0)
       transformed_path = self.data_transform(path)

       child = Organism(self.n_points, self.n_dims, self.data_transform, self.distribution_array)
       child.increments = increments
       child.path = path
       child.transformed_path = transformed_path

       return child

   def random_increment(self):
       r = []

       for dim in range(self.n_dims):
           r.append(self.distribution_array[dim]())

       return np.array(r)

   def init_path(self):
       self.increments = np.array([self.random_increment() for _ in range(self.n_points)])

       path = np.cumsum(self.increments, axis = 0)
       transformed_path = self.data_transform(path)

       self.path = path
       self.transformed_path = transformed_path

   def mutate(self, prob=0.1):
       for i in range(len(self.increments)):
           if np.random.random() < prob:
               self.increments[i] = self.random_increment()

       path = np.cumsum(self.increments, axis = 0)
       transformed_path = self.data_transform(path)

       self.path = path
       self.transformed_path = transformed_path

   def logsignature(self, order):
       return stream_to_logsig(self.transformed_path, order)

   def loss(self, logsig, order):
       diff = np.abs((logsig - self.logsignature(order)) / logsig)
       diff /= 1 + np.arange(len(logsig))

       return np.mean(diff)

On the other hand, the ‘Population’ class is in charge of managing organisms and breeding, as well as determining the best organism based on error minimization.

The main idea is to crossbreed organisms in order to improve the convergence of the organism’s random increment to the original Time Series. 

class Population:
   def __init__(self, n_organisms, n_points, n_dims, data_transform, distribution_array):
       self.n_organisms = n_organisms
       self.n_points = n_points
       self.n_dims = n_dims
       self.data_transform = data_transform

       self.distribution_array = distribution_array

       self.organisms = [Organism(n_points, n_dims, data_transform, distribution_array) for _ in range(n_organisms)]

   def fittest(self, logsig, p, order):
       n = int(len(self.organisms) * p)
       return sorted(self.organisms, key=lambda o: o.loss(logsig, order))[:n]

   def evolve(self, logsig, p, order, mutation_prob=0.1):
       parents = self.fittest(logsig, p, order)
       new_generation = copy.deepcopy(parents)

       while len(new_generation) != self.n_organisms:
           i = j = 0
           while i == j:
               i, j = np.random.choice(range(len(parents)), size=2)
               parent1, parent2 = parents[i], parents[j]

           child = parent1 + parent2 # function Organism._add
           child.mutate(prob=mutation_prob)

           new_generation.append(copy.deepcopy(child))

       self.organisms = new_generation

       return new_generation[0].loss(logsig, order)

   def train(logsig, order, n_iterations, n_organisms, n_points, n_dims, data_transform, distribution_array, top_p=0.1, mutation_prob=0.1):
       population = Population(n_organisms, n_points, n_dims, data_transform, distribution_array)
       pbar = tqdm(range(n_iterations)) 

       for _ in pbar:
           loss = population.evolve(logsig, p=top_p, order=order, mutation_prob=mutation_prob)
           pbar.set_description(f"Loss: {loss}")
           pbar.refresh() 

           if loss == 0.:
               break 

       return population.fittest(logsig, p=top_p, order=order)[0].path, loss

I won’t keep you waiting, so let’s quickly reconstruct one month of trajectories (n=21 points, which equals the number of working days in a month) and visualize our ‘twin’ series. 

n_dims = 2
n_points = 21

def distribution_x():
   pip = 0.001
   n_pips = 1/pip * 0.025
   return pip * np.random.randint(-n_pips, n_pips)

def distribution_y():
   pip = 0.001
   n_pips = 1/pip * 0.025
   return pip * np.random.randint(-n_pips, n_pips)

disribution_array = [distribution_x, distribution_y]

n_organisms = 250
n_iterations = 80

transformed_path = data_transform(paths[1])
logsig = stream_to_logsig(transformed_path, order)
recovered_path, loss = train(logsig, order, n_iterations,
n_organisms, n_points, n_dims, data_transform, disribution_array)

Next, we’ll use three projections of our three-dimensional plots to help us analyze the results. A two-dimensional time series was chosen for the article because it is easy to interpret, but keep in mind that this method is not dimensionally limited!

label_x = "GLD"
label_y = "GOLD"

def plotAllProjections3D(filename, path, label_x, label_y, recovered_path=None):
   filename = filename
   plot3D(filename + "_" + label_x + ".png", [True, False], path, 0, -91, label_x, label_y, recovered_path)
   plot3D(filename + "_" + label_y + ".png", [False, True], path, 0, 0, label_x, label_y, recovered_path)
   plot3D(filename + "_" + label_x + "_" + label_y + ".png", [True, True], path, 10, -45, label_x, label_y, recovered_path)

def plot3D(filename, draw_tics, path, elev, azim, label_x, label_y, recovered_path=None):
   fig = plt.figure(figsize = (8, 8))
   ax = fig.add_subplot(111, projection='3d')
   ax.view_init(elev, azim)

   draw_path = path - path[0]
   ax.plot(draw_path[:, 0], draw_path[:, 1], draw_path[:, 2], label='original')

   if (recovered_path is not None):
       draw_recovered_path = recovered_path - recovered_path[0]
       ax.plot(draw_recovered_path[:, 0], draw_recovered_path[:, 1], draw_recovered_path[:, 2], label='recovered')

   if (draw_tics[0]):
       ax.set_xlabel(label_x)
   else:
       ax.set_xticks([])

   if (draw_tics[1]):
       ax.set_ylabel(label_y)
   else:
       ax.set_yticks([])

   ax.set_zlabel("time")

   ax.legend()
   fig.savefig(filename, bbox_inches='tight')

plotAllProjections3D("recover_path", add_time(paths[1]), label_x, label_y, add_time(recovered_path))

GLD&GOLD 1 month Time Series Recovering

Signatures sampling

To replicate our signatures, we’ll use an autoencoder. We’ll add a few more key features to our market simulation class:

# fitting previous signatures as conditions
def train(self, n_epochs=100):
   self.cvae = CVAE(n_latent=8, input_dim=self.logsigs.shape[1], cond_dim=self.conditions.shape[1], n_hidden=100, alpha=0.4)
   self.cvae.compile(optimizer='adam')
   self.cvae.fit(x=[self.logsigs, self.conditions], epochs=n_epochs)

def generate_logsig(self, cond):
   generated = self.cvae.generate(cond)
   return self.scaler.inverse_transform(generated.reshape(1, -1))[0]

The first function explains to the autoencoder what Signatures are (trains the model), while the second generates similar instances.

The concept of generating signatures is similar to that of generating images: once the neural network understands the definition of a signature, it can easily replicate it. However, in machine learning, as in life, you cannot gain anything without giving something in return.

As a result, we feed white noise into the neural network, and the autoencoder determines how to make it resemble our signature.

White noise: the ultimate diet for overthinking AI brains — feeds them nothing but keeps them full!

(Mr. Zapolskii)

Autoencoder implementations can vary: experiment on your own to see how Signature cloning differs!

class CVAE(tf.keras.Model):
   def __init__(self, n_latent, input_dim, cond_dim, n_hidden=50, alpha=0.2):
       super(CVAE, self).__init__()
       self.n_latent = n_latent
       self.input_dim = input_dim
       self.cond_dim = cond_dim
       self.n_hidden = n_hidden
       self.alpha = alpha
       self.encoder_net = self.build_encoder()
       self.decoder_net = self.build_decoder()

   def build_encoder(self):
       inputs = tf.keras.Input(shape=(self.input_dim,))
       cond = tf.keras.Input(shape=(self.cond_dim,))
       x = tf.keras.layers.Concatenate()([inputs, cond])
       x = tf.keras.layers.Dense(self.n_hidden, activation='relu')(x)
       mn = tf.keras.layers.Dense(self.n_latent, activation='relu')(x)
       sd = tf.keras.layers.Dense(self.n_latent, activation='relu')(x)
       return tf.keras.Model(inputs=[inputs, cond], outputs=[mn, sd])

   def build_decoder(self):
       latent = tf.keras.Input(shape=(self.n_latent,))
       cond = tf.keras.Input(shape=(self.cond_dim,))
       x = tf.keras.layers.Concatenate()([latent, cond])
       x = tf.keras.layers.Dense(self.n_hidden, activation='relu')(x)
       outputs = tf.keras.layers.Dense(self.input_dim, activation='sigmoid')(x)
       return tf.keras.Model(inputs=[latent, cond], outputs=outputs)

   def encode(self, x, cond):
       mn, sd = self.encoder_net([x, cond])
       batch_size = tf.shape(x)[0]
       epsilon = tf.random.normal(shape=(batch_size, self.n_latent))
       z = mn + epsilon * tf.exp(sd * 0.5)
       return z

   def decode(self, z, cond):
       return self.decoder_net([z, cond])

   def call(self, inputs):
       x, cond = inputs
       z = self.encode(x, cond)
       reconstructed = self.decode(z, cond)
       return reconstructed

   def compute_loss(self, x, reconstructed, mn, sd):
       cross_ent = tf.keras.losses.binary_crossentropy(x, reconstructed)
       kl_div = -0.5 * tf.reduce_sum(1. + sd - tf.square(mn) - tf.exp(sd), axis=-1)
       return tf.reduce_mean(cross_ent + self.alpha * kl_div)

   def train_step(self, data):
       x, cond = data[0]
       with tf.GradientTape() as tape:
           z = self.encode(x, cond)
           reconstructed = self.decode(z, cond)
           mn, sd = self.encoder_net([x, cond])
           loss = self.compute_loss(x, reconstructed, mn, sd)
       gradients = tape.gradient(loss, self.trainable_variables)
       self.optimizer.apply_gradients(zip(gradients, self.trainable_variables))
       return {'loss': loss}

   def generate(self, cond, n_samples=None):
       cond = np.asarray(cond, dtype=np.float32)

       if n_samples is not None:
           randoms = np.random.normal(0, 1, size=(n_samples, self.n_latent))
           cond = np.repeat(cond[np.newaxis, :], n_samples, axis=0)
       else:
           randoms = np.random.normal(0, 1, size=(1, self.n_latent))
           cond = cond[np.newaxis, :]

       z = self.decode(randoms, cond)
       return z.numpy()

In theory, we’re good, but we need to perform a sanity check to see if the generated signatures have similar statistics:

conditions = MG.conditions

MG.train(1000)

array = []
for cond in conditions:
   array.append(MG.generate_logsig(cond))

array = np.array(array)

print(np.mean(array), np.mean(MG.orig_logsig))
print(np.max(array), np.max(MG.orig_logsig))
print(np.min(array), np.min(MG.orig_logsig))
print(np.median(array), np.median(MG.orig_logsig))
print(np.std(array), np.std(MG.orig_logsig))
print(np.var(array), np.var(MG.orig_logsig))

Now everything is fine and we can start the experiments. 

Results: sampling and recovering

To begin, we will simulate data for a single randomly selected month. Here, we select the fourth month based on the values in path [4] and conditions [3]. 

generated_paths = []

for _ in range(10):
   logsig = MG.generate_logsig(conditions[3])
   recovered_path, loss = train(logsig, order, n_iterations, n_organisms, n_points, n_dims, data_transform, disribution_array)

   generated_paths.append(recovered_path)

Let’s look at various projections to visually validate the results. 

Blue — original 4th month. Red — recovered 4th month 10 times

 

Let’s go back to the original goal: we want to estimate the likelihood that by the end of the month, both assets will have either a positive or negative change.

Also, assume that we do not want to look back more than 6 months due to events that have changed the configuration.

generate_count = 6

generated_paths = []
for cond in tqdm(conditions[:generate_count]):
   for _ in range(10):
       logsig = MG.generate_logsig(cond)
       recovered_path, loss = train(logsig, order, n_iterations, n_organisms, n_points, n_dims, data_transform, disribution_array)

       generated_paths.append(recovered_path)

Finally, we convert 6 points to 66 to estimate the probability. 

Blue — the original 6 paths. Red — the recovered 60 paths

To calculate the probability, we only need to perform a few basic arithmetic operations: 

cnt_pos = 0
cnt_neg = 0

for i in range(len(generated_paths)):
   if (generated_paths[i][-1][0] > 0) and (generated_paths[i][-1][1] > 0):
       cnt_pos += 1
   if (generated_paths[i][-1][0] < 0) and (generated_paths[i][-1][1] < 0):
       cnt_neg += 1             

print(cnt_neg/len(generated_paths), cnt_pos/len(generated_paths))

Both Positive probability: 47.6%.
Both Negative probability: 28.6%.

We can also estimate confidence intervals and variances, not just at the end of the month, but on any date.

You may rightfully wonder why this is considered a good approximation. The answer is quite extensive; we can improve our understanding of a well-chosen trajectory generator using various metrics, such as entropy, and improve its performance with hyperparameter selection, autoencoders, and data preprocessing. The code presented in this article is a starting point, and you can and should modify it to meet your specific use case requirements.

Conclusion

First and foremost, thank you for taking the time to read my first challenging paper on interpreting complex mathematical constructions through empirical data.

Signature transformation is a valuable tool that can perform some truly amazing things with Time Series. The purpose of this article is to highlight interesting math and explore data analysis from various perspectives, whether it is machine learning or applied statistics. Often, research and study of new methods in probability theory and stochastic processes can yield significant benefits. 

The code in this article is not the final solution; you can experiment with the data, restoration process, and autoencoder to get the best results for your applications.

Please leave comments and share your thoughts.

Glad to see everyone, best wishes 🚀 

Links for time series forecasting

[1] AI-generated image from Simplified

[2] J. Morrill, A. Kormilitzin, A. Nevado-Holgado, S. Swaminathan, S. Howison, T.J. Lyons (2019). The signature-based model for early detection of sepsis from electronic health records in the intensive care unit. IEEE Conference 2019 Computing in Cardiology.

[3] B. Graham (2013). Sparse arrays of signatures for online character recognition. arXiv:1308.0371.

[4] H. Buhler, B. Horvath, T. Lyons, I. P. Arribas, B. Wood (2020). A data-driven market simulator for small data environments. arXiv:2006.14498.

[5] I. Chevyrev, A. Kormilitzin (2016). A Primer on the Signature Method in Machine Learning. arXiv:1603.03788.

ODSC Community

The Open Data Science community is passionate and diverse, and we always welcome contributions from data science professionals! All of the articles under this profile are from our community, with individual authors mentioned in the text itself.

1