SQLite is an embedded SQL database. SQLite is small, fast and the most used database. It's also by far my favorite way to exchange large amounts of data.
I prefer to use relational (SQL) databases in general since they provide several features that are very helpful when working with data:
Transactions : You insert data into an SQL database inside a transaction. This means that either all of the data gets in, or none of it. Transactions simplify retry logic in data pipelines by order of magnitude.
Schema : Data in relational databases has a schema, which means it's easier to check the validity of your data.
SQL : SQL (Structured Query Language) is a language for selecting and changing data. You don't need to invent yet another way to select interesting parts of data. SQL is an established format and there's a lot of knowledge and tooling around it.
I like SQLite since the database is a single file, which makes it easier to share data. Even though it's a single file, SQLite can handle up to 281 terabytes of data. SQLite also comes with a command line client called sqlite3
which is great for quick prototyping.
We'll write an HTTP server in Go that will get notifications on trades and will store them in an SQLite database. Then we'll write a Python program that will process this data.
In Go, we'll be using github.com/mattn/go-sqlite3 which is a wrapper around the SQLite C library.
Note: Since go-sqlite
uses cgo
, the initial build time will be longer than usual. Using cgo
means that the resulting executable depends on some shared libraries, making distribution slightly more complicated.
In Python, we'll use the built-in sqlite3 module and Pandas read_sql function to load the data.
The Go HTTP Server - trades.go
Listing 1: Trade struct
38 // Trade is a buy/sell trade for symbol
39 type Trade struct {
40 Time time.Time
41 Symbol string
42 Price float64
43 IsBuy bool `json:"buy"`
44 }
Listing one shows the Trade
data structure. It has a Time
field for the trade time, a Symbol
field for the stock symbol (e.g. AAPL
) the Price
and a boolean flag that tells if it's a buy or a sell trade.
On line 43 we use a field tag to tell the JSON decoder to fill the IsBuy
field from the buy
field in the incoming JSON object.
Listing 2: Database Schema
25 schemaSQL = `
26 CREATE TABLE IF NOT EXISTS trades (
27 time TIMESTAMP,
28 symbol VARCHAR(32),
29 price FLOAT,
30 buy BOOLEAN
31 );
32
33 CREATE INDEX IF NOT EXISTS trades_time ON trades(time);
34 CREATE INDEX IF NOT EXISTS trades_symbol ON trades(symbol);
`
Listing 2 describes the database schema. On line 26 we create a table called trades
. On lines 27-30 we define the table columns that correspond to the Trade
struct fields. On lines 33-34 we create indexes on the table to allow fast selection of rows by time
and symbol
.
Inserting records one-by-one is a slow process. We're going to store records in a buffer and once it's full insert all the records in the buffer to the database. This has the advantage of being fast, on my machine about 60,000 records/sec, but carries the risk that we'll lose data on server crash.
Listing 3: Insert Record SQL*
17 insertSQL = `
18 INSERT INTO trades (
19 time, symbol, price, buy
20 ) VALUES (
21 ?, ?, ?, ?
22 )
23 `
Listing 3 defines the SQL to insert a record to the database. On line 21 we use ?
as place holders for the parameters to this query. Never use fmt.Sprintf
to craft an SQL query - you're risking an SQL injection.
Listing 4: TradesDB
46 // TradesDB is a database of stocks
47 type TradesDB struct {
48 db *sql.DB
49 stmt *sql.Stmt
50 buffer []Trade
51 }
Listing 4 describes the TradesDB
struct. On line 48 we hold the connection to the database. On line 49 we store a prepared (pre-compiled) statement for inserting and on line 50 we have the in-memory buffer of pending transactions.
Listing 5: NewTradesDB
53 // NewTradesDB connect to SQLite database in dbFile
54 // Tables will be created if they don't exist
55 func NewTradesDB(dbFile string) (*TradesDB, error) {
56 db, err := sql.Open("sqlite3", dbFile)
57 if err != nil {
58 return nil, err
59 }
60
61 _, err = db.Exec(schemaSQL)
62 if err != nil {
63 return nil, err
64 }
65
66 stmt, err := db.Prepare(insertSQL)
67 if err != nil {
68 return nil, err
69 }
70
71 buffer := make([]Trade, 0, 1024)
72 return &TradesDB{db, stmt, buffer}, nil
73 }
Listing 5 shows the creation of a TradesDB
. On line 56 we connect to the database using the "sqlite3" driver. On line 61 we execute the schema SQL to create the trades
table if it doesn't already exist. On line 66 we pre-compile the insert SQL statement. On line 71 we create the internal buffer with 0 length and a capacity of 1024.
Listing 6: AddTrade
104 // AddTrade adds a new trade.
105 // The new trade is only added to the internal buffer and will be inserted
106 // to the database later on Flush
107 func (db *TradesDB) AddTrade(t Trade) error {
108 // FIXME: We might grow buffer indefinitely on persistent Flush errors
109 db.buffer = append(db.buffer, t)
110 if len(db.buffer) == cap(db.buffer) {
111 if err := db.Flush(); err != nil {
112 return err
113 }
114 }
115 return nil
116 }
Listing 6 shows the AddTrade
method. On line 109 we append the trade to the in-memory buffer. On line 110 we check to see if the buffer is full and if it is we call Flush
on line 111 that will insert the records from the buffer into the database.
Listing 7: Flush
83 // Flush inserts trades from internal buffer to the database
84 func (db *TradesDB) Flush() error {
85 tx, err := db.db.Begin()
86 if err != nil {
87 return err
88 }
89
90 for _, t := range db.buffer {
91 _, err := tx.Stmt(db.stmt).Exec(t.Time, t.Symbol, t.Price, t.IsBuy)
92 if err != nil {
93 tx.Rollback()
94 return err
95 }
96 }
97 err = tx.Commit()
98 if err == nil {
99 db.buffer = db.buffer[:0]
100 }
101 return err
102 }
Listing 7 shows the Flush
method. On line 85 we start a transaction. On line 90 we iterate over the internal buffer and on line 91 insert each trade. On line 93 we issue a rollback. On line 97 we issue a commit. On line 99, if there are no errors, we reset the in-memory trades buffer.
Listing 8: Close
75 // Close closes all database related resources
76 func (db *TradesDB) Close() error {
77 // TODO: Check errors from Flush & stmt.Close
78 db.Flush()
79 db.stmt.Close()
80 return db.db.Close()
81 }
Listing 8 shows the Close
methods. On line 78 we call Flush
to insert any remaining trades into the database. On line 79 and 80 we close the statement and the database. Functions creating a TradesDB
should have a defer db.Close()
to make sure the database connection is freed. In our case the database is global and the connection is alive for the life of the program - so we don't call Close
.
Listing 9: HTTP Handler
118 // tradeHandler handles a new trade notification
119 func tradeHandler(w http.ResponseWriter, r *http.Request) {
120 if r.Method != "POST" {
121 http.Error(w, "only POST", http.StatusMethodNotAllowed)
122 return
123 }
124
125 if db == nil {
126 log.Printf("DB uninitialized")
127 http.Error(w, "Database not initialized", http.StatusInternalServerError)
128 return
129 }
130
131 defer r.Body.Close()
132
133 var tr Trade
134 if err := json.NewDecoder(r.Body).Decode(&tr); err != nil {
135 log.Printf("json decode error: %s", err)
136 http.Error(w, err.Error(), http.StatusBadRequest)
137 return
138 }
139
140 if err := db.AddTrade(tr); err != nil {
141 log.Printf("add error: %s", err)
142 http.Error(w, err.Error(), http.StatusInternalServerError)
143 return
144 }
145 }
Listing 9 shows the HTTP handler for new trades. On line 134 we decode the JSON input into a Trade
struct. On line 140 we add the trade struct to the global TradesDB
called db
Listing 10: main
147 func main() {
148 dbFile := os.Getenv("DB_FILE")
149 if dbFile == "" {
150 dbFile = "trades.db"
151 }
152
153 var err error
154 db, err = NewTradesDB(dbFile)
155 if err != nil {
156 log.Fatal(err)
157 }
158 log.Printf("conneted to %s", dbFile)
159
160 http.HandleFunc("/trade", tradeHandler)
161
162 addr := os.Getenv("HTTPD_ADDR")
163 if addr == "" {
164 addr = ":8080"
165 }
166
167 log.Printf("server listening on %s", addr)
168 if err := http.ListenAndServe(addr, nil); err != nil {
169 log.Fatal(err)
170 }
171 }
Listing 10 shows how we run the server. On line 148 we use the DB_FILE
environment variable to know the location of the database file. If it doesn't exist, SQLite will create it. On line 154 we create the global database db
. On lines 160 to 170 we set the HTTP server routing and start the HTTP server.
Listing 11: imports
03 import (
04 "database/sql"
05 "encoding/json"
06 "log"
07 "net/http"
08 "os"
09 "time"
10
11 _ "github.com/mattn/go-sqlite3"
12 )
Listing 11 shows the imports for the file. On line 04 we import database/sql
that defines the API for working with SQL databases. database/sql
does not contain any specific database driver. On line 11 we import the github.com/mattn/go-sqlite3
package. Since we import github.com/mattn/go-sqlite3
only for the side effect of registering the "sqlite3" protocol. Since unused imports are a compilation error, we use _
in front of the import - telling the Go compiler it's OK we don't use this package in the code.
The Python Code - analyze_trades.py
Listing 12: imports
02 import sqlite3
03 from contextlib import closing
04 from datetime import datetime
05
06 import pandas as pd
Listing 12 shows the libraries we're using in the Python code. On line 02 we import the built-in sqlite3
module and on line 06 we import the pandas library.
Listing 13: Select SQL
08 select_sql = """
09 SELECT * FROM trades
10 WHERE time >= ? AND time <= ?
11 """
Listing 13 shows the SQL for selecting data. On line 10 we select all the columns from the trades
table. On line 10 we add a WHERE
clause for selecting in time range. As in the Go code, we use ?
as placeholders for arguments and not construct the SQL manually.
Listing 14: Loading Trades
14 def load_trades(db_file, start_time, end_time):
15 """Load trades from db_file in given time range."""
16 conn = sqlite3.connect(db_file)
17 with closing(conn) as db:
18 df = pd.read_sql(select_sql, db, params=(start_time, end_time))
19
20 # We can't use detect_types=sqlite3.PARSE_DECLTYPES here since Go is
21 # inserting time zone and Python's sqlite3 doesn't handle it.
22 # See https://bugs.python.org/issue29099
23 df["time"] = pd.to_datetime(df["time"])
24 return df
Listing 14 shows the code for loading trades at a given time range. On line 16 we connect to the database. On lines 17 we use a context manager, somewhat like Go's defer
to make sure the database is closed. On line 18 we use pandas read_sql
function to load data from an SQL query to a DataFrame
. Python has an API for connection to databases (like database/sql
) and Pandas can use any compatible driver. On line 23 we convert the time
column to pandas Timestamp
. This is specific to SQLite that doesn't have built-in support for TIMESTAMP
types.
Listing 15: Average Price
27 def average_price(df):
28 """Return average price in df grouped by (stock, buy)"""
29 return df.groupby(["symbol", "buy"], as_index=False)["price"].mean()
Listing 15 shows how to calculate the average price per symbol
and buy
. On line 29 we use the DataFrame groupby
to group by symbol
and buy
. We use as_index=False
to get the symbol
and buy
as columns in the resulting data frame. Then we take the price
column and calculate the mean per group.
Listing 16: Output
symbol,buy,price
AAPL,0,250.82925665004535
AAPL,1,248.28277375538832
GOOG,0,250.11537993385295
GOOG,1,252.4726772487683
MSFT,0,250.9214212695317
MSFT,1,248.60187022941685
NVDA,0,250.3844763417279
NVDA,1,249.3578146208962
Listing 16 shows the output of running the Python code on dummy data.
I highly recommend you consider using SQLite in your next project. It's a mature and stable project that can handle huge amounts of data. Many programming languages have drivers to SQLite database, which makes it a good storage option.
I've simplified the code as much as I could to show the more interesting points. There are several places where you can try and improve it:
- Add a retry logic to
Flush
- Do more error checking in
Close
- Have the Go HTTP server invoke the Python code every hour
- Run more analysis on the Python side
Have fun with the code, let me know what crazy things you did.