Skip to content

Commit

Permalink
Completely replace RabbitMQ / AMQP with a PG based job queue server
Browse files Browse the repository at this point in the history
  • Loading branch information
justinclift committed Dec 11, 2023
1 parent f5d069c commit e1cf6d5
Show file tree
Hide file tree
Showing 32 changed files with 1,202 additions and 2,650 deletions.
5 changes: 2 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@ our API server at https://api.dbhub.io, or run things locally for your own users

### Requirements

* [Golang](https://golang.org) - version 1.17 or above is required.
* [Golang](https://golang.org) - version 1.18 or above is required.
* [Memcached](https://memcached.org) - version 1.4.33 and above are known to work.
* [Minio](https://minio.io) - release 2016-11-26T02:23:47Z and later are known to work.
* [NodeJS](https://nodejs.org) - version 18.x is known to work, others are untested.
* [NodeJS](https://nodejs.org) - version 20 is known to work, others are untested.
* [PostgreSQL](https://www.postgresql.org) - version 13 and above are known to work.
* [RabbitMQ](https://www.rabbitmq.com) - version 3.10.x and above are known to work.
* [Yarn](https://classic.yarnpkg.com) - version 1.22.x. Not Yarn 2.x or greater.

### Subdirectories
Expand Down
62 changes: 20 additions & 42 deletions api/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package main

import (
"encoding/json"
"errors"
"fmt"
"log"
"mime/multipart"
Expand Down Expand Up @@ -132,11 +131,11 @@ func columnsHandler(w http.ResponseWriter, r *http.Request) {
// If a live database has been uploaded but doesn't have a live node handling its requests, then error out as this
// should never happen
if isLive && liveNode == "" {
jsonErr(w, "No AMQP node available for request", http.StatusInternalServerError)
jsonErr(w, "No job queue node available for request", http.StatusInternalServerError)
return
}

// If it's a standard database, process it locally. Else send the query to our AMQP backend
// If it's a standard database, process it locally. Else send the query to our job queue backend
var cols []sqlite.Column
if !isLive {
// Get Minio bucket and object id for the SQLite file
Expand Down Expand Up @@ -187,7 +186,7 @@ func columnsHandler(w http.ResponseWriter, r *http.Request) {
return
}
} else {
// Send the columns request to our AMQP backend
// Send the columns request to our job queue backend
cols, _, err = com.LiveColumns(liveNode, loggedInUser, dbOwner, dbName, table)
if err != nil {
jsonErr(w, err.Error(), http.StatusBadRequest)
Expand Down Expand Up @@ -383,7 +382,7 @@ func deleteHandler(w http.ResponseWriter, r *http.Request) {
}
}

// For a live database, delete it from both Minio and our AMQP backend
// For a live database, delete it from both Minio and our job queue backend
var bucket, id string
if isLive {
// Get the Minio bucket and object names for this database
Expand All @@ -400,7 +399,7 @@ func deleteHandler(w http.ResponseWriter, r *http.Request) {
return
}

// Delete the database from our AMQP backend
// Delete the database from our job queue backend
err = com.LiveDelete(liveNode, loggedInUser, dbOwner, dbName)
if err != nil {
jsonErr(w, err.Error(), http.StatusInternalServerError)
Expand Down Expand Up @@ -700,11 +699,11 @@ func executeHandler(w http.ResponseWriter, r *http.Request) {
// If a live database has been uploaded but doesn't have a live node handling its requests, then error out as this
// should never happen
if isLive && liveNode == "" {
jsonErr(w, "No AMQP node available for request", http.StatusInternalServerError)
jsonErr(w, "No job queue node available for request", http.StatusInternalServerError)
return
}

// Send the SQL execution request to our AMQP backend
// Send the SQL execution request to our job queue backend
var rowsChanged int
rowsChanged, err = com.LiveExecute(liveNode, loggedInUser, dbOwner, dbName, sql)
if err != nil {
Expand Down Expand Up @@ -752,11 +751,11 @@ func indexesHandler(w http.ResponseWriter, r *http.Request) {
// If a live database has been uploaded but doesn't have a live node handling its requests, then error out as this
// should never happen
if isLive && liveNode == "" {
jsonErr(w, "No AMQP node available for request", http.StatusInternalServerError)
jsonErr(w, "No job queue node available for request", http.StatusInternalServerError)
return
}

// If it's a standard database, process it locally. Else send the query to our AMQP backend
// If it's a standard database, process it locally. Else send the query to our job queue backend
var indexes []com.APIJSONIndex
if !isLive {
// Get Minio bucket and object id for the SQLite file
Expand Down Expand Up @@ -811,33 +810,12 @@ func indexesHandler(w http.ResponseWriter, r *http.Request) {
indexes = append(indexes, oneIndex)
}
} else {
// Send the indexes request to our AMQP backend
var rawResponse []byte
rawResponse, err = com.MQRequest(com.AmqpChan, liveNode, "indexes", loggedInUser, dbOwner, dbName, "")
// Send the indexes request to our job queue backend
indexes, err = com.LiveIndexes(liveNode, loggedInUser, dbOwner, dbName)
if err != nil {
jsonErr(w, err.Error(), http.StatusInternalServerError)
log.Println(err)
return
}

// Decode the response
var resp com.LiveDBIndexesResponse
err = json.Unmarshal(rawResponse, &resp)
if err != nil {
jsonErr(w, err.Error(), http.StatusInternalServerError)
log.Println(err)
return
}
if resp.Error != "" {
err = errors.New(resp.Error)
jsonErr(w, err.Error(), http.StatusInternalServerError)
return
}
if resp.Node == "" {
log.Printf("In API (Live) indexesHandler(). A node responded, but didn't identify itself.")
return
}
indexes = resp.Indexes
}

// Return the results
Expand Down Expand Up @@ -955,7 +933,7 @@ func queryHandler(w http.ResponseWriter, r *http.Request) {
// If a live database has been uploaded but doesn't have a live node handling its requests, then error out as this
// should never happen
if isLive && liveNode == "" {
jsonErr(w, "No AMQP node available for request", http.StatusInternalServerError)
jsonErr(w, "No job queue node available for request", http.StatusInternalServerError)
return
}

Expand Down Expand Up @@ -1088,11 +1066,11 @@ func tablesHandler(w http.ResponseWriter, r *http.Request) {
// If a live database has been uploaded but doesn't have a live node handling its requests, then error out as this
// should never happen
if isLive && liveNode == "" {
jsonErr(w, "No AMQP node available for request", http.StatusInternalServerError)
jsonErr(w, "No job queue node available for request", http.StatusInternalServerError)
return
}

// If it's a standard database, process it locally. Else send the query to our AMQP backend
// If it's a standard database, process it locally. Else send the query to our job queue backend
var tables []string
if !isLive {
// Get Minio bucket and object id for the SQLite file
Expand Down Expand Up @@ -1125,7 +1103,7 @@ func tablesHandler(w http.ResponseWriter, r *http.Request) {
return
}
} else {
// Send the tables request to our AMQP backend
// Send the tables request to our job queue backend
tables, err = com.LiveTables(liveNode, loggedInUser, dbOwner, dbName)
if err != nil {
jsonErr(w, err.Error(), http.StatusInternalServerError)
Expand Down Expand Up @@ -1361,8 +1339,8 @@ func uploadHandler(w http.ResponseWriter, r *http.Request) {
log.Printf("API Server: Username '%s' uploaded LIVE database '%s/%s', bytes: %v", loggedInUser,
com.SanitiseLogString(dbOwner), com.SanitiseLogString(dbName), numBytes)

// Send a request to the AMQP backend to set up the database there, ready for querying
liveNode, err := com.LiveCreateDB(com.AmqpChan, dbOwner, dbName, objectID)
// Send a request to the job queue to set up the database
liveNode, err := com.LiveCreateDB(dbOwner, dbName, objectID)
if err != nil {
log.Println(err)
jsonErr(w, err.Error(), http.StatusInternalServerError)
Expand Down Expand Up @@ -1448,11 +1426,11 @@ func viewsHandler(w http.ResponseWriter, r *http.Request) {
// If a live database has been uploaded but doesn't have a live node handling its requests, then error out as this
// should never happen
if isLive && liveNode == "" {
jsonErr(w, "No AMQP node available for request", http.StatusInternalServerError)
jsonErr(w, "No job queue node available for request", http.StatusInternalServerError)
return
}

// If it's a standard database, process it locally. Else send the query to our AMQP backend
// If it's a standard database, process it locally. Else send the query to our job queue backend
var views []string
if !isLive {
// Get Minio bucket and object id for the SQLite file
Expand Down Expand Up @@ -1485,7 +1463,7 @@ func viewsHandler(w http.ResponseWriter, r *http.Request) {
return
}
} else {
// Send the views request to our AMQP backend
// Send the views request to our job queue backend
views, err = com.LiveViews(liveNode, loggedInUser, dbOwner, dbName)
if err != nil {
jsonErr(w, err.Error(), http.StatusInternalServerError)
Expand Down
28 changes: 18 additions & 10 deletions api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,16 @@ func main() {
log.Fatalf("Configuration file problem: '%s'", err)
}

// Set the node name used in various logging strings
com.Conf.Live.Nodename = "API server"

// Open the request log for writing
reqLog, err = os.OpenFile(com.Conf.Api.RequestLog, os.O_CREATE|os.O_APPEND|os.O_WRONLY|os.O_SYNC, 0750)
if err != nil {
log.Fatalf("Error when opening request log: %s", err)
}
defer reqLog.Close()
log.Printf("Request log opened: %s", com.Conf.Api.RequestLog)
log.Printf("%s: request log opened: %s", com.Conf.Live.Nodename, com.Conf.Api.RequestLog)

// Parse our template files
tmpl = template.Must(template.New("templates").Delims("[[", "]]").ParseGlob(
Expand All @@ -70,9 +73,8 @@ func main() {
log.Fatal(err)
}

// Connect to MQ server
com.Conf.Live.Nodename = "API server"
com.AmqpChan, err = com.ConnectMQ()
// Connect to job queue server
err = com.ConnectQueue()
if err != nil {
log.Fatal(err)
}
Expand All @@ -95,6 +97,13 @@ func main() {
log.Fatal(err)
}

// Start background goroutines to handle job queue responses
com.ResponseWaiters = com.NewResponseReceiver()
com.CheckResponsesQueue = make(chan struct{})
com.SubmitterInstance = com.RandomString(3)
go com.ResponseQueueCheck()
go com.ResponseQueueListen()

// Load our self signed CA chain
ourCAPool = x509.NewCertPool()
certFile, err := os.ReadFile(com.Conf.DB4S.CAChain)
Expand Down Expand Up @@ -136,11 +145,10 @@ func main() {

// Load our self signed CA Cert chain, check client certificates if given, and set TLS1.2 as minimum
newTLSConfig := &tls.Config{
ClientAuth: tls.VerifyClientCertIfGiven,
ClientCAs: ourCAPool,
MinVersion: tls.VersionTLS12,
PreferServerCipherSuites: true,
RootCAs: ourCAPool,
ClientAuth: tls.VerifyClientCertIfGiven,
ClientCAs: ourCAPool,
MinVersion: tls.VersionTLS12,
RootCAs: ourCAPool,
}
srv := &http.Server{
Addr: com.Conf.Api.BindAddress,
Expand All @@ -153,7 +161,7 @@ func main() {
server = fmt.Sprintf("https://%s", com.Conf.Api.ServerName)

// Start API server
log.Printf("API server starting on %s", server)
log.Printf("%s: listening on %s", com.Conf.Live.Nodename, server)
err = srv.ListenAndServeTLS(com.Conf.Api.Certificate, com.Conf.Api.CertificateKey)

// Shut down nicely
Expand Down
47 changes: 38 additions & 9 deletions common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@ package common

import (
"crypto/tls"
"errors"
"fmt"
"io/fs"
"log"
"os"
"path/filepath"
"strconv"

"github.com/BurntSushi/toml"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/mitchellh/go-homedir"
)
Expand All @@ -19,26 +22,30 @@ var (

// PostgreSQL configuration info
pgConfig *pgxpool.Config

// Configuration info for the PostgreSQL job queue
listenConfig *pgx.ConnConfig
)

// ReadConfig reads the server configuration file.
func ReadConfig() error {
func ReadConfig() (err error) {
// Override config file location via environment variables
var err error
configFile := os.Getenv("CONFIG_FILE")
if configFile == "" {
// TODO: Might be a good idea to add permission checks of the dir & conf file, to ensure they're not
// world readable. Similar in concept to what ssh does for its config files.
userHome, err := homedir.Dir()
var userHome string
userHome, err = homedir.Dir()
if err != nil {
log.Printf("User home directory couldn't be determined: '%s'", err)
return err
return
}
configFile = filepath.Join(userHome, ".dbhub", "config.toml")
}

// Reads the server configuration from disk
if _, err := toml.DecodeFile(configFile, &Conf); err != nil {
_, err = toml.DecodeFile(configFile, &Conf)
if err != nil {
return fmt.Errorf("Config file couldn't be parsed: %s", err)
}

Expand Down Expand Up @@ -166,14 +173,18 @@ func ReadConfig() error {
}

// Check cache directory exists
if _, err := os.Stat(Conf.DiskCache.Directory); os.IsNotExist(err) {
_, err = os.Stat(Conf.DiskCache.Directory)
if errors.Is(err, fs.ErrNotExist) {
if os.MkdirAll(Conf.DiskCache.Directory, 0775) != nil {
log.Fatal(err)
return
}
}

// Set the PostgreSQL configuration values
// Set the main PostgreSQL database configuration values
pgConfig, err = pgxpool.ParseConfig(fmt.Sprintf("host=%s port=%d user= %s password = %s dbname=%s pool_max_conns=%d connect_timeout=10", Conf.Pg.Server, uint16(Conf.Pg.Port), Conf.Pg.Username, Conf.Pg.Password, Conf.Pg.Database, Conf.Pg.NumConnections))
if err != nil {
return
}
clientTLSConfig := tls.Config{}
if Conf.Environment.Environment == "production" {
clientTLSConfig.ServerName = Conf.Pg.Server
Expand All @@ -187,12 +198,30 @@ func ReadConfig() error {
pgConfig.ConnConfig.TLSConfig = nil
}

// Create the connection string for the dedicated PostgreSQL notification connection
listenConfig, err = pgx.ParseConfig(fmt.Sprintf("host=%s port=%d user= %s password = %s dbname=%s connect_timeout=10", Conf.Pg.Server, uint16(Conf.Pg.Port), Conf.Pg.Username, Conf.Pg.Password, Conf.Pg.Database))
if err != nil {
return
}
listenTLSConfig := tls.Config{}
if Conf.Environment.Environment == "production" {
listenTLSConfig.ServerName = Conf.Pg.Server
listenTLSConfig.InsecureSkipVerify = false
} else {
listenTLSConfig.InsecureSkipVerify = true
}
if Conf.Pg.SSL {
listenConfig.TLSConfig = &listenTLSConfig
} else {
listenConfig.TLSConfig = nil
}

// Environment variable override for non-production logged-in user
tempString = os.Getenv("DBHUB_USERNAME")
if tempString != "" {
Conf.Environment.UserOverride = tempString
}

// The configuration file seems good
return nil
return
}
11 changes: 0 additions & 11 deletions common/config_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ type TomlConfig struct {
Live LiveInfo
Memcache MemcacheInfo
Minio MinioInfo
MQ MQInfo
Pg PGInfo
Sign SigningInfo
UserMgmt UserMgmtInfo
Expand Down Expand Up @@ -93,16 +92,6 @@ type MinioInfo struct {
Server string
}

// MQInfo contains the AMQP backend connection configuration info
type MQInfo struct {
CertFile string `toml:"cert_file"`
KeyFile string `toml:"key_file"`
Password string `toml:"password"`
Port int `toml:"port"`
Server string `toml:"server"`
Username string `toml:"username"`
}

// PGInfo contains the PostgreSQL connection parameters
type PGInfo struct {
Database string
Expand Down
Loading

0 comments on commit e1cf6d5

Please sign in to comment.