You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

518 lines
14 KiB

package main
import (
"context"
"crypto/tls"
"encoding/json"
"errors"
"flag"
"fmt"
"io/ioutil"
"math"
"net/http"
"net/http/httputil"
"os"
"strconv"
"time"
"github.com/gorilla/mux"
"github.com/influxdata/influxdb-client-go/v2"
"github.com/sirupsen/logrus"
"github.com/spf13/viper"
)
type Configuration struct {
Authorization string `mapstructure:"authorization"`
Debug bool `mapstructure:"debug"`
Database InfluxConfiguration `mapstructure:"database"`
Satellites map[string]Satellite `mapstructure:"satellites"`
Privileged bool `mapstructure:"privileged"`
Targets map[string]Target `mapstructure:"targets"`
}
type InfluxConfiguration struct {
Host string `mapstructure:"host"`
Token string `mapstructure:"token"`
Org string `mapstructure:"org"`
Bucket string `mapstructure:"bucket"`
}
type ErrorResponse struct {
Errors []*ErrorPacket `json:"errors"`
}
type ErrorPacket struct {
Status string `json:"status"`
Source string `json:"source"`
Title string `json:"title"`
Detail string `json:"detail"`
}
type Satellite struct {
Active bool `mapstructure:"active"`
Name string `mapstructure:"name"`
Secret string `mapstructure:"secret" json:"-"`
Targets []string `mapstructure:"targets"`
LastData time.Time `mapstructure:"last_data"`
Health bool `mapstructure:"health"`
}
type ResponsePacket struct {
SatelliteName string `mapstructure:"satellite_name"`
TargetName string `mapstructure:"target_name"`
ProbeType string `mapstructure:"probe_type"`
Probes []Probe `mapstructure:"probes"`
}
type Probe struct {
MinRTT float64 `mapstructure:"min_rtt"`
MaxRTT float64 `mapstructure:"max_rtt"`
Median float64 `mapstructure:"median"`
Loss float64 `mapstructure:"loss"`
NumProbes int `mapstructure:"num_probes"`
Timestamp time.Time `mapstructure:"timestamp"`
}
type Target struct {
Name string `mapstructure:"name"`
Host string `mapstructure:"host"`
ProbeType string `mapstructure:"probe_type"`
Probes int `mapstructure:"probes"`
Interval int `mapstructure:"interval"`
BatchSize int `mapstructure:"batch_size"`
}
type Worker struct {
Target Target
HeadUrl string
ProbeName string
Id int
Err error
}
var Config Configuration
var ConfigFile string
var Client influxdb2.Client
var log *logrus.Logger
const version = "0.0.2"
const apiVersion = "0.0.1"
func main() {
log = logrus.New()
log.SetLevel(logrus.InfoLevel)
log.SetFormatter(&logrus.TextFormatter{
DisableColors: true,
FullTimestamp: true,
})
hostname, err := os.Hostname()
if err != nil {
fmt.Println(err)
os.Exit(1)
}
configFile := flag.String("config", "config/config.json", "config file")
debug := flag.Bool("debug", false, "enable debug mode")
headNode := flag.String("head", "", "fqdn / ip of head node")
insecureTls := flag.Bool("insecure-tls", false, "disable use of tls cert checking")
mode := flag.String("mode", "satellite", "head / satellite")
notls := flag.Bool("notls", false, "disable use of tls")
privileged := flag.Bool("privileged", false, "enable privileged mode")
probeName := flag.String("name", hostname, "name of probe")
flag.Parse()
if *debug {
log.SetLevel(logrus.DebugLevel)
Config.Debug = true
}
if *privileged {
Config.Privileged = true
}
if *headNode == "" {
*mode = "head"
}
log.WithFields(logrus.Fields{
"host": *probeName,
"version": version,
"mode": *mode,
}).Info("nprobe is starting")
if *mode == "head" {
parseConfig(configFile)
ConfigFile = *configFile
Client = influxdb2.NewClient(Config.Database.Host, Config.Database.Token)
defer Client.Close()
router := mux.NewRouter()
// make use of our middleware to set content type and such
router.Use(commonMiddleware)
router.HandleFunc("/config", ConfigReload).Headers("X-Authorization", Config.Authorization).Methods("POST")
router.HandleFunc("/healthz", HealthRequest).Methods("GET")
router.HandleFunc("/satellites/{name}", GetSatellite).Methods("GET")
router.HandleFunc("/satellites/{name}/targets", GetTargets).Methods("GET")
router.HandleFunc("/targets/{name}", SubmitTarget).Methods("POST")
router.HandleFunc("/version", VersionRequest).Methods("GET")
log.Fatal(http.ListenAndServe(":8000", router))
} else {
headUrl := "https://"
if *notls {
headUrl = "http://"
}
headUrl = headUrl + *headNode + "/"
request, _ := http.NewRequest("GET", headUrl+"satellites/"+*probeName+"/targets", nil)
request.Header.Set("X-Authorization", os.Getenv("NPROBE_SECRET"))
t := &http.Transport{}
if !*insecureTls {
t = &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
},
}
}
client := &http.Client{Transport: t, Timeout: 15 * time.Second}
response, err := client.Do(request)
if err != nil {
log.WithFields(logrus.Fields{"error": err}).Fatal("Error retrieving configuration from head")
} else {
if response.StatusCode != 200 {
errorMsg, _ := ioutil.ReadAll(response.Body)
switch response.StatusCode {
case 403:
log.WithFields(logrus.Fields{"Response Status": response.StatusCode}).
Error("Error talking to head - validate that your authorization is correct")
case 404:
log.WithFields(logrus.Fields{"Response Status": response.StatusCode}).
Error("Error talking to head - validate that your satellite name is correct")
default:
log.WithFields(logrus.Fields{"Response Status": response.StatusCode}).
Error("Error talking to head")
}
log.WithFields(logrus.Fields{"Raw Error Message": errorMsg}).
Debug("Error talking to head")
log.Fatal("Abort - critical error")
}
data, _ := ioutil.ReadAll(response.Body)
var targets []Target
err := json.Unmarshal(data, &targets)
if err != nil {
log.WithFields(logrus.Fields{"error": err}).Fatal("Error while processing configuration")
}
log.WithFields(logrus.Fields{
"targets": targets,
}).Infof("Targets received")
log.WithFields(logrus.Fields{"configuration": data}).Debug("Configuration received")
workerChan := make(chan *Worker, len(targets))
i := 0
for _, k := range targets {
wk := &Worker{
Target: k,
HeadUrl: headUrl,
ProbeName: *probeName,
Id: i}
log.WithFields(logrus.Fields{
"worker id": i,
"target": wk.Target.Name,
"type": wk.Target.ProbeType,
}).Info("Launching worker")
go wk.HandleProbe(workerChan)
i++
// put a few seconds in between starting the worker
time.Sleep(5 * time.Second)
}
// read the channel, it will block until something is written, then a new
// goroutine will start
for wk := range workerChan {
// log the error
log.WithFields(logrus.Fields{
"worker id": wk.Id,
"target": wk.Target.Name,
"error": wk.Err,
}).Error()
// reset err
wk.Err = nil
// a goroutine has ended, restart it
go wk.HandleProbe(workerChan)
}
}
}
}
func ConfigReload(w http.ResponseWriter, r *http.Request) {
log.Infof("Config Reload triggered")
parseConfig(&ConfigFile)
}
func GetSatellite(w http.ResponseWriter, r *http.Request) {
params := mux.Vars(r)
satellite, found := Config.Satellites[params["name"]]
if !found {
handleError(w, http.StatusNotFound, r.RequestURI, "Requested item not found", nil)
return
}
if r.Header.Get("X-Authorization") == satellite.Secret {
err := json.NewEncoder(w).Encode(satellite)
if err != nil {
log.WithFields(logrus.Fields{"error": err}).Error()
}
return
} else {
handleError(w, http.StatusForbidden, r.RequestURI, "You're not allowed here", nil)
return
}
}
func GetTargets(w http.ResponseWriter, r *http.Request) {
params := mux.Vars(r)
satellite, found := Config.Satellites[params["name"]]
if !found {
handleError(w, http.StatusNotFound, r.RequestURI, "Requested item not found", nil)
return
}
if r.Header.Get("X-Authorization") != satellite.Secret {
handleError(w, http.StatusForbidden, r.RequestURI, "You're not allowed here", nil)
return
}
if !satellite.Active {
handleError(w, http.StatusForbidden, r.RequestURI, "You're not allowed here",
errors.New("satellite marked inactive"))
return
}
var targets = make([]Target, len(satellite.Targets))
var i = 0
for _, k := range satellite.Targets {
targets[i] = Config.Targets[k]
i++
}
log.WithFields(logrus.Fields{
"satellite": satellite.Name,
"targets": targets,
}).Debugf("Satellite is receiving targets")
err := json.NewEncoder(w).Encode(targets)
if err != nil {
handleError(w, http.StatusServiceUnavailable, r.RequestURI, "Error while encoding targets", err)
}
return
}
func SubmitTarget(w http.ResponseWriter, r *http.Request) {
params := mux.Vars(r)
log.Debugf("%+v", params)
var responsePacket ResponsePacket
_ = json.NewDecoder(r.Body).Decode(&responsePacket)
log.WithFields(logrus.Fields{"responsePacket": responsePacket}).Debug()
satellite := Config.Satellites[responsePacket.SatelliteName]
if r.Header.Get("X-Authorization") != satellite.Secret {
handleError(w, http.StatusForbidden, r.RequestURI, "You're not allowed here", nil)
return
}
if !satellite.Active {
handleError(w, http.StatusForbidden, r.RequestURI, "You're not allowed here - satellite is marked inactive", nil)
return
}
// user blocking write client for writes to desired bucket
writeAPI := Client.WriteAPI(Config.Database.Org, Config.Database.Bucket)
// create point using fluent style
for _, probe := range responsePacket.Probes {
p := influxdb2.NewPointWithMeasurement("stat").
AddTag("unit", "milliseconds").
AddTag("target", responsePacket.TargetName+" ("+responsePacket.ProbeType+")").
AddTag("probe", responsePacket.SatelliteName).
AddField("median", probe.Median).
AddField("max", probe.MaxRTT).
AddField("min", probe.MinRTT).
AddField("loss", probe.Loss).
SetTime(probe.Timestamp)
writeAPI.WritePoint(p)
}
s := Config.Satellites[responsePacket.SatelliteName]
s.LastData = time.Now()
Config.Satellites[responsePacket.SatelliteName] = s
log.WithFields(logrus.Fields{"data": s}).Debug()
}
func handleError(w http.ResponseWriter, status int, source string, title string, err error) {
log.WithFields(logrus.Fields{
"error": err,
}).Error("Error while encoding targets")
errorResponse := ErrorResponse{Errors: []*ErrorPacket{
{
Status: strconv.Itoa(status),
Source: source,
Title: title,
Detail: fmt.Sprintf("%v", err)}}}
e, _ := json.Marshal(errorResponse)
http.Error(w, string(e[:]), status)
}
func commonMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
dumpRequest(r)
w.Header().Add("Content-Type", "application/vnd.api+json")
w.Header().Add("X-Api-Version", apiVersion)
w.Header().Add("X-Nprobe-Version", version)
w.Header().Add("X-Powered-By", "nprobe")
next.ServeHTTP(w, r)
})
}
func HealthRequest(w http.ResponseWriter, r *http.Request) {
log.Info("Running Health-Check")
msg := "Health Check not ok"
authHeader := r.Header.Get("X-Authorization")
authedRequest := false
if authHeader == Config.Authorization {
authedRequest = true
}
health, err := Client.Health(context.Background())
if err != nil {
log.WithFields(logrus.Fields{"error": err}).Error()
if authedRequest {
if health != nil {
msg = fmt.Sprintf("Influx Error: %s", *health.Message)
}
} else {
// for unauthed requests to /health we don't want to leak the actual error
err = nil
}
handleError(w, http.StatusServiceUnavailable, "/healthz", msg, err)
return
}
// check each satellite
for _, k := range Config.Satellites {
if k.Active {
last := k.LastData
// find interval
interval := math.MaxInt
for _, t := range k.Targets {
s := Config.Targets[t]
if s.Interval < interval {
interval = s.Interval
}
}
timeout := time.Now().Add(-time.Minute * time.Duration(int64(interval)))
if last.Before(timeout) {
if authedRequest {
msg = fmt.Sprintf("Probe '%s' has not come back in time. Last message from '%s'", k.Name, k.LastData)
} else {
msg = ""
}
handleError(w, http.StatusServiceUnavailable, "/healthz", msg, err)
return
}
}
}
log.Info("Health-Check completed OK")
}
func VersionRequest(w http.ResponseWriter, r *http.Request) {
_, err := w.Write([]byte(fmt.Sprintf("{ \"Version:\" \"%s\" }", version)))
if err != nil {
log.WithFields(logrus.Fields{"error": err}).Error()
}
}
func dumpRequest(r *http.Request) {
if log.GetLevel() == logrus.DebugLevel {
requestDump, err := httputil.DumpRequest(r, true)
if err != nil {
log.WithFields(logrus.Fields{"error": err}).Error()
} else {
log.Debugf("%s", string(requestDump))
}
}
}
func parseConfig(configPtr *string) {
if Config.Debug {
viper.Set("Verbose", true)
}
viper.SetConfigFile(*configPtr) // name of config file (without extension)
viper.SetConfigType("json")
err := viper.ReadInConfig() // Find and read the config file
if err != nil { // Handle errors reading the config file
log.WithFields(logrus.Fields{"error": err}).Fatal("Error while processing configuration")
}
log.Infof("Using config file: %s\n", viper.ConfigFileUsed())
err = viper.Unmarshal(&Config)
if err != nil {
log.WithFields(logrus.Fields{"error": err}).Fatal("Error while unmarshalling")
}
// inject name from map names
for name, s := range Config.Satellites {
s.Name = name
Config.Satellites[name] = s
}
for name, k := range Config.Targets {
k.Name = name
Config.Targets[name] = k
}
log.Debugf("%+v", Config)
}