Checking your Stocks with Alphavantage and Python

SynchroDynamic Software
10 min readDec 8, 2020

If you have not yet heard of Alphavantage, it is a Stock market API that gives you many stock charts in JSON. You can obtain a Key for Free, and so long as you keep your API calls limited to 5 API requests per minute; 500 API requests per day, you can use their API. Let’s learn how to use Alphavantage and Python to build a sock analysis application.

Getting Started

  1. Obtain a Key From Alphvantage.
  2. Open a new project in Pycharm.
  3. Open the provided project in another Pycharm window. (Optional)

Handling Alphavantage

First lets build out a stock handler that will allow us to make easy calls to Alphavantage and handle the JSON Data.

StockHandler.py

import numpy as np import json import requests key = 'Put your alphavantage key here' #Alphavantage specific variables interval = {1:'1min', 5:'5min', 15:'15min', 30:'30min', 60:'60min', 'daily':'Daily', 'month': 'Monthly Time Series'} #min allowed: 1, 5, 15, 30, 60 > Daily: 'Daily' function = {'intra' :'TIME_SERIES_INTRADAY', 'daily': 'TIME_SERIES_DAILY', 'month': 'TIME_SERIES_MONTHLY'} i_type = {'intra': 0, 'none': 1} #END Alphavantage specific variables # A simple function that allows for easier transition between AlphaVantage's "intraday" 1,5,15,30,60 minute time series, # and their "Daily" series def alphavantage_time_series_url(function, ticker, interval, i_type): url = 'https://www.alphavantage.co/query?function=' + function switcher = { 0: url + '&symbol=' + ticker + '&interval=' + interval + '&outputsize=full&apikey=' + key, 1: url + '&symbol=' + ticker + '&apikey=' + key, 2: url + '&symbol=' + ticker + '&interval=1min&apikey=' + key } return switcher.get(i_type, "Invalid type") def load_data(function, ticker, interval, i_type): url = alphavantage_time_series_url(function, ticker, interval, i_type) params = {"retina_name": "en_associative","start_index": 0, "max_results": 1, "sparsity": 1.0, "get_fingerprint": False} r = requests.get(url=url, params=params) data_str = r.json() ds = json.dumps(data_str) json_obj = json.loads(ds) try: if interval == 'Monthly Time Series' or interval == 'Technical Analysis: VWAP': dataset = json_obj[interval] else: dataset = json_obj["Time Series (" + interval + ")"] except: return None, None x = [] ox = [] y = [] z = [] count = 0 for key in dataset: x.append(count) count += 1 ox.append(key) try: y.append(average([float(dataset[key]["2. high"]), float(dataset[key]["3. low"])])) z.append(float(dataset[key]["5. volume"])) except: y.append(float(dataset[key]["VWAP"])) if len(y) % 2 != 0: y.pop(0) x.pop(0) try: z.pop(0) except: z = None #deviation = Reverse(cut_mean_from_elements(y)) #zdev = Reverse(cut_mean_from_elements(z)) deviation = Reverse(y) try: zdev = Reverse(z) np_z = np.array(zdev).reshape(len(z), 1) except: np_z = None np_x = np.array(x).reshape(len(x), 1) np_y = np.array(deviation).reshape(len(y), 1) #print(ox) return {'x': np_x, 'y': np_y, 'z': np_z}, {'ox': Reverse(ox), 'y': y} def get_quotes(symbols): url = 'https://www.alphavantage.co/query?function=BATCH_STOCK_QUOTES&apikey=' + key + '&symbols=' + symbols params = {"retina_name": "en_associative", "start_index": 0, "max_results": 1, "sparsity": 1.0, "get_fingerprint": False} r = requests.get(url=url, params=params) print(r) data_str = r.json() ds = json.dumps(data_str) json_obj = json.loads(ds) dataset = json_obj["Stock Quotes"] return dataset def Reverse(lst): return [ele for ele in reversed(lst)] def average(set): count = len(set) total = 0 for part in set: total += part return total / count def cut_mean_from_elements(set): mean = average(set) deviation = [] for element in set: deviation.append(element - mean) return deviation

Please put your key in the ‘key’ variable.

Building Analyzers

Once we get the data we need to do something with it. Here I have built three different files.

DataManipulate.py

A simple snippet of code to use wherever it is needed. Simply take non-numeric data and make it numeric.

def pieces(l, n): n = max(1, n) return (l[i:i + n] for i in range(0, len(l), n))

LinearRegression.py

The simplest Stock visualization is a simple trend line. We will use Linear Regression to get a trend line.

import numpy as np from sklearn.linear_model import LinearRegression import Plotting.PlotFunctions as plt from sklearn.svm import SVR from sklearn.model_selection import GridSearchCV import sklearn.metrics as metrics class LR: model = None vmodel = None x = None y = None z = None accuracies = [] def __init__(self, data, t): self.x = np.array(data['x']).reshape((-1, 1)) self.y = np.array(data['y']) self.z = np.array(data['z']) self.ticker = t self.model = LinearRegression() self.vmodel = LinearRegression() self.model.fit(self.x, self.y) try: self.vmodel.fit(self.x, self.z) except: self.vmodel = None def get_trend(self): return self.model.coef_, self.model.intercept_ def predict(self, x): return self.model.predict(x) def pieces(self, l, n): n = max(1, n) return (l[i:i + n] for i in range(0, len(l), n)) def regressorOp(self, x, y): """ This will optimize the parameters for the algo """ regr_rbf = SVR(kernel="rbf", verbose=True, cache_size=1024) C = [10000, 6000, 5000, 1000, 10, 1] gamma = [0.005, 0.004, 0.003, 0.002, 0.001, 1, 2, 3, 4] epsilon = [0.1, 0.01, 0.05, 1] parameters = {"C": C, "gamma": gamma, "epsilon": epsilon} gs = GridSearchCV(regr_rbf, parameters, scoring="neg_mean_squared_error", cv=2, iid=True) gs.fit(x, y) #print("Best Estimator:\n", gs.best_estimator_) #print("Type: ", type(gs.best_estimator_)) return gs.best_estimator_ def view_trend_for_subset(self, sets): # Data is deviation, and originalData is true timeseries # (data.x = 0,1,2,...,n | originalData.x = 01-01-2020, ..., 02-01-2020, ..., n xPieces = self.pieces(self.x, int(len(self.x) / sets)) yPieces = self.pieces(self.y, int(len(self.y) / sets)) try: zPieces = self.pieces(self.z, int(len(self.z) / sets)) except: zPieces = [] xPieces = list(xPieces) yPieces = list(yPieces) zPieces = list(zPieces) xParts = [] yParts = [] zParts = [] mParts = [] bParts = [] predictions = [] # print(xPieces[3], yPieces[3], "\n", xPieces[4], yPieces[4]) for sub in range(sets): try: z = zPieces[sub] except: z = -1 subdata = {'x': xPieces[sub], 'y': yPieces[sub], 'z': z} #print(subdata) self.model = LinearRegression() self.model.fit(subdata['x'], subdata['y']) #self.vmodel = SVR(kernel='poly', gamma='auto') #self.vmodel.fit(subdata['x'], subdata['z'].ravel()) if z != -1: clf = self.regressorOp(subdata['x'], subdata['z'].ravel()) clf.fit(subdata['x'], subdata['z'].ravel()) y_pred = clf.predict(subdata['x']) accuracy = clf.score(subdata['x'], subdata['z'].ravel()) variance = metrics.explained_variance_score(subdata['z'], y_pred) self.accuracies.append(accuracy) prediction = (clf.predict(subdata['x'])) predictions.append(prediction) #print("prediction: ", prediction) xParts.append(subdata['x']) yParts.append(subdata['y']) zParts.append(subdata['z']) mParts.append(self.model.coef_) bParts.append(self.model.intercept_) #plt.plot_all(subdata['x'], {1: subdata['y'], 2: subdata['z']}, self.model.coef_, self.model.intercept_, prediction, self.ticker, 0) #print("PARAMS: ", clf.get_params(True)) return mParts, bParts, xParts, yParts, zParts, predictions

GradientDescent.py

Another resourceful yet slightly more complex trend would be finding a non-linear trend with Gradient Descent.

import numpy as np import scipy.linalg def gradient_descent(X, Y, w, c, it, L): n = len(X) Y_pred = w * X + c # The current predicted value of Y D_m = (-2 / n) * sum(X * (Y - Y_pred)) # Derivative wrt m D_c = (-2 / n) * sum(Y - Y_pred) # Derivative wrt c m = w - L * D_m # Update m c = c - L * D_c # Update c return m, c def batch_gradient_descent(x, y, w, eta): derivative = np.sum([-(y[d]-np.dot(w.T.copy(),x[d,:]))*(x[d,:]).reshape(np.shape(w)) for d in range(len(x))],axis=0) return eta*(1/len(x))*derivative def mini_batch_gradient_descent(x, y, w, eta, batch): gradient_sum = np.zeros(shape=np.shape(w)) for b in range(batch): choice = np.random.choice(list(range(len(x)))) gradient_sum += -(y[choice]-np.dot(w.T,x[choice,:]))*x[choice,:].reshape(np.shape(w)) return eta*(1/batch)*gradient_sum # initialize variables def trend_for(data, it, L, m, b): x = data['x'] y = data['y'] w = np.random.normal(size=(np.shape(x)[1], 1)) c = 0.0 # Update w w_s = [] Error = [] for i in range(it): # Calculate error error = (1 / 2) * np.sum([(y[i] - (b + np.dot(w.T, x[i, :]))) ** 2 for i in range(len(x))]) #print(error) Error.append(error) if error == float('inf'): print('INFINITY') return x, y, m, b, Error m, b = gradient_descent(x, y, w, c, it, L) w = m c = b w_s.append(w.copy()) return x, y, m, b, Error

Lets Plot Something

We can find all of this data, and build trends for the data, but now we need to see or visualize the data. Lets Plot it

PlotFunctions.py

import numpy as np import matplotlib.pyplot as plt def plot_function(x, y, w,b, Error, st, iterations): if len(Error) == iterations: fig, ax = plt.subplots(nrows=1,ncols=2,figsize=(40,10)) ax[0].plot(x, ((x * w) + b), c='lightgreen', linewidth=3, zorder=0) ax[0].plot(x, y) ax[0].set_title(st) ax[1].scatter(range(iterations), Error) ax[1].set_title('Error') plt.show() else: print('Plotting Error: Iterations and Errors are different sizes') def plot_all(x, set, w,b, vw, st, iterations): fig, ax = plt.subplots(nrows=1,ncols=2,figsize=(40,10)) wb = [((xi * w) + b) for xi in x] print(wb.shape, " >> ", x.shape) wb = np.array(wb).reshape(len(x), 1) print(wb.shape, " >> ", x.shape) ax[0].plot(x, wb, c='lightgreen', linewidth=3, zorder=0) ax[0].plot(x, set[1]) ax[0].set_title(st) try: ax[1].plot(x, set[2]) ax[1].set_title('Volume') ax[1].plot(x, vw) except: '' plt.show()

Building an Alphavantage Service

Now we built a bunch of services, but no one want to recreate all of this every time they want to see a stock quote. Lets build a service that we can easily call one function and get an entire process all in one.

AlphavantageService.py

import GradientDescent.GD as gd import Plotting.PlotFunctions as plt import JsonHandler.StockHandler as sh import Services.DataManipulate as dm import LinearRegression.LR as lr class AlphavantageService: data = None originalData = None ticker = None iterations = None L = None x = None y = None m = 0.0 b = 0.0 Error = None DataSet = True def __init__(self, t, i, l, stype, mins): self.ticker = t self.iterations = i self.L = l if stype == 0: self.data, self.originalData = sh.load_data(sh.function['daily'], t, sh.interval['daily'], sh.i_type['none']) elif stype == 1: try: self.data, self.originalData = sh.load_data(sh.function['intra'], t, sh.interval[mins], sh.i_type['intra']) except: self.DataSet = False elif stype == 2: self.data, self.originalData = sh.load_data(sh.function['month'], t, sh.interval['month'], sh.i_type['none']) elif stype == 3: self.data, self.originalData = sh.load_data('VWAP', t, 'Technical Analysis: VWAP', 2) def get_data(self): return self.data def set_data_switch(self, val): self.DataSet = val def originalX(self): return self.originalData['ox'] def view_trend_for(self, l): self.L = l # Data is deviation, and originalData is true timeseries # (data.x = 0,1,2,...,n | originalData.x = 01-01-2020, ..., 02-01-2020, ..., n self.x, self.y, self.m, self.b, self.Error = gd.trend_for(self.data, self.iterations, self.L, self.m, self.b) return self.m, self.b, self.Error[len(self.Error) - 1] / len(self.x) def gradient_descent(self, l, sets, ticker): self.L = l # Data is deviation, and originalData is true timeseries # (data.x = 0,1,2,...,n | originalData.x = 01-01-2020, ..., 02-01-2020, ..., n xPieces = dm.pieces(self.data['x'], int(len(self.data['x']) / sets)) yPieces = dm.pieces(self.data['y'], int(len(self.data['y']) / sets)) xPieces = list(xPieces) yPieces = list(yPieces) xParts = [] yParts = [] mParts = [] bParts = [] # print(xPieces[3], yPieces[3], "\n", xPieces[4], yPieces[4]) for sub in range(sets): subdata = {'x': xPieces[sub], 'y': yPieces[sub]} xPart, yPart, mPart, bPart, ErrorPart = gd.trend_for(subdata, self.iterations, self.L, 0.0, 0.0) xParts.append(xPart) yParts.append(yPart) mParts.append(mPart) bParts.append(bPart) plt.plot_function(self.originalData['ox'], yPart, mPart, bPart, ErrorPart, self.ticker, self.iterations) return mParts, bParts, 0 def linear_regression(self, sets): return lr.view_trend_for_subset(sets) def plot_trend(self, acceptedError, error): # Plot the predicted function and the Error if error < acceptedError: plt.plot_function(self.x, self.y, self.m, self.b, self.Error, self.ticker, self.iterations) def get_daily_trend_for(ticker, iterations, L): data, originalData = sh.load_data(sh.function['daily'], ticker, sh.interval['daily'], sh.i_type['none']) x, y, m, b, self.Error = gd.trend_for(data, iterations, L) return m, b, Error[len(Error) - 1] def get_trendline_for(ticker, iterations, L): data, originalData = sh.load_data(sh.function['intra'], ticker, sh.interval[60], sh.i_type['intra']) x, y, m, b, Error = gd.trend_for(data, iterations, L) return m, b, Error[len(Error) - 1]

Build for Future Enhancements

This is one API. The next article in this series will take this project and tie in another Free API Service that has different data sets. Lets build a service that we call once, and manages Alphavantage as well as other API’s

The service we build will have two key functions. It will have a deep analysis function, and it will also build a watch list.

Watch List

If you want to scan through a large group of tickers, and find stocks that fit your criteria, we can build a function to do this.

What this means is we can use a flat file with tickers, and a reader to read the tickers.

tickers.txt

AAPL XOM MSFT BAC^I IBM CVX GE WMT T JNJ PG WFC KO PFE JPM GOOG PM VOD ORCL INTC MRK VZ

Then, something to read the file

FlatDataReader.py

def read(file): tickers = [] with open(file) as f: for line in f: line = line.replace("\n", "") tickers.append(line) return tickers

Our Final Service

StockService.py

import LinearRegression.LR as lr import Services.AlphavantageService as alphavantage import numpy as np from sklearn.linear_model import LinearRegression import Plotting.PlotFunctions as plt import JsonHandler.StockHandler as sh import FlatData.FlatDataReader as fdr import Services.DataManipulate as dm import time #Run: Used to set which tickers you want, some Ai Settings, and what type of dataset you want to work with #Ai settings iterations = 20000 L = 0.1 errortolerance = 20 trials = 1000 #set a trial limit Error = [] #END Ai settings #OK Now lets loop through and find the best Learning rate not_found = True def deep_analysis_of_stock(ticker): # cut data into many components # First lets Analyze longest trend stock_service = alphavantage.AlphavantageService(ticker, 10, 1, 3, 0) lineReg = lr.LR(stock_service.get_data(), ticker) mParts, bParts, xParts, yParts, zParts, prediction = lineReg.view_trend_for_subset(2) for i in range(len(xParts)): print(xParts[i].shape, " >> ", yParts[i].shape) print('ticker: ', ticker, 'm: ', mParts[i], 'b: ', bParts[i]) plt.plot_all(xParts[i], {1: yParts[i], 2: zParts[i]}, mParts, bParts[i], None, ticker, 0) return mParts, bParts, xParts, yParts, zParts, prediction def get_watchlist(min_price, max_price, wait, min_slope): tickers = fdr.read("C:\\Path\\To\\ticker.txt") ticker_batches = dm.pieces(tickers, 750) ticker_batches = list(ticker_batches) print('Batches: ', len(ticker_batches)) in_budget_stocks = [] for i in ticker_batches: print("batchSize: ", len(i)) batchString = ','.join(i) print('batch: ', batchString) data = sh.get_quotes(batchString) for line in data: if max_price > float(line['2. price']) > min_price: print("price: ", line['2. price'], "symbol: ", line['1. symbol']) in_budget_stocks.append(line) time.sleep(wait) # Throttle API Calls stock_data = [] for stock in in_budget_stocks: print('Reviewing Symbol: ', stock['1. symbol']) stock_service = None try: stock_service = alphavantage.AlphavantageService(stock['1. symbol'], iterations, L, 0, 15) except: stock_service.set_data_switch(False) continue if stock_service is None or stock_service.get_data() is None: print('No Data Available for: ', stock['1. symbol']) continue lineReg = lr.LR(stock_service.get_data(), stock['1. symbol'] + " From " + stock_service.originalX()[0] + " TO " + stock_service.originalX()[len(stock_service.originalX()) - 1]) mParts, bParts, xParts, yParts, zParts, prediction = lineReg.view_trend_for_subset(1) analysis = {'ticker': stock['1. symbol'], 'mParts': mParts, 'bParts': bParts, 'xParts': xParts, 'yParts': yParts, 'zParts': zParts, 'pz': prediction} stock_data.append(analysis) time.sleep(wait) watch_list = [] for analysis in stock_data: for i in range(len(analysis['xParts'])): print('ticker: ', analysis['ticker'], 'm: ', analysis['mParts'][i], 'b: ', analysis['bParts'][i]) if analysis['mParts'][i] > min_slope: watch_list.append(analysis) plt.plot_all(analysis['xParts'][i], {1: analysis['yParts'][i], 2: analysis['zParts'][i]}, analysis['mParts'][i], analysis['bParts'][i], analysis['pz'][i], analysis['ticker'], 0) ##print('WatchList: ', watch_list) return watch_list

And Finally, Use your Stock Service

Run.py

import StockService as ss wl = ss.get_watchlist(100, 105, 15, 0.1) print(wl) # Now, once we get are watch list, we can Do further analysis. for analysis in wl: print('Deep Analysis', ss.deep_analysis_of_stock(analysis['ticker']))

Conclusion

This code is intended to show how to use Alphvantage and Python to begin the process of a Stock Analysis Tool. This Tool will almost surely not help you make decisions with your stock picks.

This Application will however give you a great head start on building a Stock service using Alphavantage and Python.

If you would like to view or download this code, it has been placed on Github.

Originally published at https://synchrodynamic.com on December 8, 2020.

--

--

SynchroDynamic Software

SynchroDynamic Software, primarily builds management software for clients, to allow for easy transactions of data, across multiple platforms.