We backtest a simple pairs trading strategy involving the well-known GLD / GDX pair using Quantopian's open source algo-trading and backtesting framework.
Note : This code doesn't yet factor in funding costs for short positions
# Import Zipline, the open source backester, and a few other libraries that we will use
from zipline import TradingAlgorithm
from zipline.api import order_target, order_value, record, symbol, history, add_history, get_order
from datetime import datetime
from itertools import chain
import matplotlib.colors as colors
import matplotlib.pyplot as plt
import numpy as np
import pandas.io.data as web
from pandas.stats.api import ols
import pandas as pd
import pytz
import statsmodels.tsa.stattools as ts
from trading.strats.util import plot_price_series, plot_scatter_series, plot_residuals, print_cadf
from IPython.core.display import display, HTML
% matplotlib inline
train_start = datetime(2013, 1, 1, 0, 0, 0, 0, pytz.utc)
train_end = datetime(2013, 12, 31, 0, 0, 0, 0, pytz.utc)
test_start = datetime(2014, 1, 1, 0, 0, 0, 0, pytz.utc)
test_end = datetime(2014, 12, 31, 0, 0, 0, 0, pytz.utc)
std_dev_entry_threshold = 1.0
std_dev_exit_threshold = 0.1
symbol1 = "GDX"
symbol2 = "GLD"
def handle_data(context, data):
sym1BarData = data[context.symbol1]
sym2BarData = data[context.symbol2]
beta = context.ols_res.beta
intercept = beta.intercept if hasattr( beta, 'intercept') else 0
pair_port_value = sym1BarData.price - beta.x * sym2BarData.price
current_residual_value = pair_port_value - intercept
lot_size = 5000.0
action = 'none'
if not context.orders: # no existing position
if abs( current_residual_value ) > context.ols_res.resid.std() * std_dev_entry_threshold:
#print current_residual_value, context.ols_res.resid.std() * std_dev_entry_threshold
if current_residual_value > 0.0:
orderid_short = order_target( context.symbol1, -lot_size )
orderid_long = order_target( context.symbol2, lot_size * beta.x )
action='short'
elif current_residual_value < 0.0:
orderid_long = order_target( context.symbol1, lot_size )
orderid_short = order_target( context.symbol2, -lot_size * beta.x )
action='long'
# Save values for later inspection
context.orders.append( ( orderid_long, orderid_short ) )
record( action=action)
else:
# check if we have mean reversion and need to sell
if abs( current_residual_value ) <= context.ols_res.resid.std() * std_dev_exit_threshold:
while context.orders:
orderid_long, orderid_short = context.orders.pop()
order_long = get_order( orderid_long )
order_short = get_order( orderid_short )
#if order_long.status == ORDER_STATUS.FILLED: # check this?
order_target( order_long.sid, 0 )
order_target( order_short.sid, 0 )
action='closeout'
record( action=action )
# Save values for later inspection
record( context.symbol1.symbol, sym1BarData.price,
context.symbol2.symbol, sym2BarData.price,
cointegration_factor=current_residual_value,
)
def main():
from zipline.algorithm import TradingAlgorithm
data1 = web.DataReader(symbol1, 'yahoo', test_start, test_end)
data1 = data1['Adj Close'].to_frame()
data1.index = pd.to_datetime(data1.index).tz_localize(pytz.utc)
data1.rename( columns={'Adj Close':symbol1}, inplace=True)
data2 = web.DataReader(symbol2, 'yahoo', test_start, test_end)
data2 = data2['Adj Close'].to_frame()
data2.index = pd.to_datetime(data2.index).tz_localize(pytz.utc)
data2.rename( columns={'Adj Close':symbol2}, inplace=True)
full_data = pd.concat( [ data1, data2 ], axis=1 )
def initialize(context):
context.symbol1 = symbol(symbol1)
context.symbol2 = symbol(symbol2)
display(HTML('<h2>Training Data</h2>'))
train_data1 = web.DataReader(symbol1, 'yahoo', train_start, train_end)
train_data2 = web.DataReader(symbol2, 'yahoo', train_start, train_end)
df = pd.DataFrame(index=train_data1.index)
df[symbol1] = train_data1["Adj Close"]
df[symbol2] = train_data2["Adj Close"]
plot_price_series( df, symbol1, symbol2 )
plot_scatter_series( df, symbol1, symbol2 )
# setup the cointegration parameters here
ols_res = ols( y=train_data1['Adj Close'], x=train_data2['Adj Close'] )
display( ols_res.summary_as_matrix )
# Calculate and output the CADF test on the residuals
cadf = ts.adfuller(ols_res.resid)
print_cadf(cadf)
context.ols_res = ols_res
context.orders = []
# Create and run the algorithm.
algo = TradingAlgorithm(initialize=initialize, handle_data=handle_data,
identifiers=[symbol1, symbol2])
results = algo.run(full_data)
return algo, results
context, results = main()
# Note: this function can be removed if running
# this algorithm on quantopian.com
def analyze(context=None, results=None):
import logbook
logbook.StderrHandler().push_application()
log = logbook.Logger('Algorithm')
display(HTML('<h2>Backtest results on out-of-sample data</h2>'))
fig = plt.figure(figsize=(12,16))
ax1 = fig.add_subplot(311)
results.portfolio_value.plot(ax=ax1)
ax1.set_ylabel('Portfolio value (USD)')
ax2 = fig.add_subplot(312)
ax2.set_ylabel('Price (USD)')
ax3 = fig.add_subplot(313)
ax3.set_ylabel('Cointegration factor')
if ( symbol1 in results ) and ( symbol2 in results ):
results[symbol1].plot(ax=ax2, label='TEST')
plt.legend()
results[symbol2].plot(ax=ax2)
results.cointegration_factor.plot(ax=ax3)
for i in xrange(-3,4):
ax3.axhline(context.ols_res.resid.std() * i,
linestyle='--', color=colors.cnames['orange'])
ax3.axhline(context.ols_res.resid.std() * std_dev_entry_threshold,
linestyle='--', color=colors.cnames['red'])
ax3.axhline(-context.ols_res.resid.std() * std_dev_entry_threshold,
linestyle='--', color=colors.cnames['red'])
trans = results.ix[[t != [] for t in results.transactions]]
# identify buys / sells of the co-integrating factor
buys = trans.ix[ [t[1][2] == 'long' for t in trans.iterrows() ]]
sells = trans.ix[ [t[1][2] == 'short' for t in trans.iterrows() ]]
closeouts = trans.ix[ [t[1][2] == 'closeout' for t in trans.iterrows() ]]
ax3.plot(buys.index, results.cointegration_factor.ix[buys.index],
'^', markersize=10, color='#80ff00')
ax3.plot(sells.index, results.cointegration_factor.ix[sells.index],
'v', markersize=10, color='red')
ax3.plot(closeouts.index, results.cointegration_factor.ix[closeouts.index],
'.', markersize=15, color='black')
analyze(context=context, results=results)
trans = results.ix[[t != [] for t in results.transactions]]
data = []
indices = []
for dt, rows in trans.transactions.iteritems():
for r in rows:
data.append( r )
indices.append( dt )
display(HTML('<h2>Transaction data</h2>'))
orders = pd.DataFrame(data, index=indices)
orders
capital = results.capital_used[np.where(results.capital_used > 0)[0]]
capital.to_frame()