You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
217 lines
3.9 KiB
217 lines
3.9 KiB
package main |
|
|
|
import ( |
|
"bytes" |
|
"encoding/gob" |
|
"encoding/json" |
|
"hash/fnv" |
|
"io" |
|
"log" |
|
|
|
"github.com/bytemine/go-icinga2/event" |
|
bolt "github.com/etcd-io/bbolt" // bbolt is the continuation of bolt and for now is usable as drop in replacement |
|
) |
|
|
|
const eventBucketName = "events" |
|
const pendingBucketName = "pendingEvents" |
|
|
|
// eventID generates an internal id to prevent using nested maps |
|
func eventID(e *event.Notification) []byte { |
|
h := fnv.New64a() |
|
|
|
// the fvn hash always returns nil error, so we can ignore it here |
|
h.Write([]byte(e.Host)) |
|
h.Write([]byte(e.Service)) |
|
|
|
return h.Sum(nil) |
|
} |
|
|
|
type cache struct { |
|
*bolt.DB |
|
debug bool |
|
} |
|
|
|
func openCache(path string) (*cache, error) { |
|
db, err := bolt.Open(path, 0600, nil) |
|
if err != nil { |
|
return nil, err |
|
} |
|
|
|
return &cache{DB: db}, nil |
|
} |
|
|
|
// eventTicket is a helper struct for saving to bolt |
|
type eventTicket struct { |
|
Event *event.Notification |
|
TicketID int |
|
} |
|
|
|
func decodeEventTicket(x []byte) (*eventTicket, error) { |
|
var et eventTicket |
|
buf := bytes.NewBuffer(x) |
|
d := gob.NewDecoder(buf) |
|
|
|
if err := d.Decode(&et); err != nil { |
|
return nil, err |
|
} |
|
|
|
return &et, nil |
|
} |
|
|
|
func encodeEventTicket(et *eventTicket) ([]byte, error) { |
|
var x bytes.Buffer |
|
|
|
e := gob.NewEncoder(&x) |
|
err := e.Encode(et) |
|
if err != nil { |
|
return nil, err |
|
} |
|
|
|
return x.Bytes(), nil |
|
} |
|
|
|
func (c *cache) getEventTicket(e *event.Notification) (*event.Notification, int, error) { |
|
if *debug { |
|
log.Printf("%x cache: get event", eventID(e)) |
|
} |
|
|
|
eID := eventID(e) |
|
|
|
var et *eventTicket |
|
err := c.DB.View(func(tx *bolt.Tx) error { |
|
var err error // declare it here so we can use = instead of := to prevent shadowing |
|
eventBucket := tx.Bucket([]byte(eventBucketName)) |
|
if eventBucket == nil { |
|
return nil |
|
} |
|
|
|
x := eventBucket.Get(eID) |
|
// if we don't have a saved event just return nil |
|
if x == nil { |
|
return nil |
|
} |
|
|
|
et, err = decodeEventTicket(x) |
|
if err != nil { |
|
return err |
|
} |
|
|
|
return nil |
|
}) |
|
|
|
if err != nil { |
|
return nil, -1, err |
|
} |
|
|
|
if et == nil { |
|
return nil, -1, nil |
|
} |
|
|
|
return et.Event, et.TicketID, nil |
|
} |
|
|
|
func (c *cache) updateEventTicket(e *event.Notification, ticketID int) error { |
|
if *debug { |
|
log.Printf("%x cache: update event", eventID(e)) |
|
} |
|
|
|
eID := eventID(e) |
|
|
|
err := c.DB.Update(func(tx *bolt.Tx) error { |
|
hostBucket, err := tx.CreateBucketIfNotExists([]byte(eventBucketName)) |
|
if err != nil { |
|
return err |
|
} |
|
|
|
x, err := encodeEventTicket(&eventTicket{Event: e, TicketID: ticketID}) |
|
if err != nil { |
|
return err |
|
} |
|
|
|
return hostBucket.Put(eID, x) |
|
}) |
|
|
|
return err |
|
} |
|
|
|
func (c *cache) deleteEventTicket(e *event.Notification) error { |
|
if *debug { |
|
log.Printf("%x cache: delete event", eventID(e)) |
|
} |
|
|
|
eID := eventID(e) |
|
|
|
err := c.DB.Update(func(tx *bolt.Tx) error { |
|
hostBucket, err := tx.CreateBucketIfNotExists([]byte(eventBucketName)) |
|
if err != nil { |
|
return err |
|
} |
|
|
|
return hostBucket.Delete(eID) |
|
}) |
|
|
|
return err |
|
} |
|
|
|
func (c *cache) WriteTo(w io.Writer) (int64, error) { |
|
err := c.DB.View(func(tx *bolt.Tx) error { |
|
eventBucket := tx.Bucket([]byte(eventBucketName)) |
|
if eventBucket == nil { |
|
return nil |
|
} |
|
|
|
enc := json.NewEncoder(w) |
|
|
|
c := eventBucket.Cursor() |
|
for k, v := c.First(); k != nil; k, v = c.Next() { |
|
et, err := decodeEventTicket(v) |
|
if err != nil { |
|
return err |
|
} |
|
err = enc.Encode(et) |
|
if err != nil { |
|
return err |
|
} |
|
} |
|
|
|
return nil |
|
}) |
|
|
|
return 0, err |
|
} |
|
|
|
func (c *cache) ReadFrom(r io.Reader) (int64, error) { |
|
err := c.DB.Update(func(tx *bolt.Tx) error { |
|
eventBucket, err := tx.CreateBucketIfNotExists([]byte(eventBucketName)) |
|
if err != nil { |
|
return err |
|
} |
|
|
|
et := &eventTicket{} |
|
dec := json.NewDecoder(r) |
|
for { |
|
err := dec.Decode(et) |
|
if err != nil { |
|
if err == io.EOF { |
|
return nil |
|
} |
|
return err |
|
} |
|
|
|
log.Printf("%#v", et) |
|
|
|
x, err := encodeEventTicket(et) |
|
if err != nil { |
|
return err |
|
} |
|
|
|
eID := eventID(et.Event) |
|
err = eventBucket.Put(eID, x) |
|
if err != nil { |
|
return err |
|
} |
|
} |
|
}) |
|
|
|
return 0, err |
|
}
|
|
|