mirror of
https://github.com/TecharoHQ/anubis.git
synced 2026-04-25 09:32:43 +00:00
feat(expressions): add CEL log filtering calls
Signed-off-by: Xe Iaso <me@xeiaso.net>
This commit is contained in:
@@ -0,0 +1,120 @@
|
|||||||
|
package expressions
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"log/slog"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/google/cel-go/cel"
|
||||||
|
"github.com/google/cel-go/common/types"
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||||
|
timestamp "google.golang.org/protobuf/types/known/timestamppb"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
filterInvocations = promauto.NewGaugeVec(prometheus.GaugeOpts{
|
||||||
|
Namespace: "techaro",
|
||||||
|
Subsystem: "anubis",
|
||||||
|
Name: "slog_filter_invocations",
|
||||||
|
}, []string{"name"})
|
||||||
|
|
||||||
|
filterExecutionTime = promauto.NewHistogramVec(prometheus.HistogramOpts{
|
||||||
|
Namespace: "techaro",
|
||||||
|
Subsystem: "anubis",
|
||||||
|
Name: "slog_filter_execution_time_nanoseconds",
|
||||||
|
Buckets: []float64{10, 50, 100, 200, 500, 1000, 2000, 5000, 10000, 20000, 50000, 100000, 200000, 500000, 1000000, 2000000, 5000000, 10000000}, // 10 nanoseconds to 10 milliseconds
|
||||||
|
}, []string{"name"})
|
||||||
|
)
|
||||||
|
|
||||||
|
func LogFilter(opts ...cel.EnvOption) (*cel.Env, error) {
|
||||||
|
return New(
|
||||||
|
// Slog record metadata
|
||||||
|
cel.Variable("time", cel.TimestampType),
|
||||||
|
cel.Variable("msg", cel.StringType),
|
||||||
|
cel.Variable("level", cel.StringType),
|
||||||
|
cel.Variable("attrs", cel.MapType(cel.StringType, cel.StringType)),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewFilter(lg *slog.Logger, name, src string) (*Filter, error) {
|
||||||
|
env, err := LogFilter()
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("logging: can't create CEL env: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
program, err := Compile(env, src)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("logging: can't compile expression: Compile(%q): %w", src, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &Filter{
|
||||||
|
program: program,
|
||||||
|
name: name,
|
||||||
|
src: src,
|
||||||
|
log: lg.With("filter", name),
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type Filter struct {
|
||||||
|
program cel.Program
|
||||||
|
name string
|
||||||
|
src string
|
||||||
|
log *slog.Logger
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f Filter) Filter(ctx context.Context, r slog.Record) bool {
|
||||||
|
t0 := time.Now()
|
||||||
|
|
||||||
|
result, _, err := f.program.ContextEval(ctx, &Record{
|
||||||
|
Record: r,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
f.log.Error("error executing log filter", "err", err, "src", f.src)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
dur := time.Since(t0)
|
||||||
|
filterExecutionTime.WithLabelValues(f.name).Observe(float64(dur.Nanoseconds()))
|
||||||
|
filterInvocations.WithLabelValues(f.name).Inc()
|
||||||
|
//f.log.Debug("filter execution", "dur", dur.Nanoseconds())
|
||||||
|
|
||||||
|
if val, ok := result.(types.Bool); ok {
|
||||||
|
return !bool(val)
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
type Record struct {
|
||||||
|
slog.Record
|
||||||
|
attrs map[string]string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Record) Parent() cel.Activation { return nil }
|
||||||
|
|
||||||
|
func (r *Record) ResolveName(name string) (any, bool) {
|
||||||
|
switch name {
|
||||||
|
case "time":
|
||||||
|
return ×tamp.Timestamp{Seconds: r.Time.Unix()}, true
|
||||||
|
case "msg":
|
||||||
|
return r.Message, true
|
||||||
|
case "level":
|
||||||
|
return r.Level.String(), true
|
||||||
|
case "attrs":
|
||||||
|
if r.attrs == nil {
|
||||||
|
attrs := map[string]string{}
|
||||||
|
|
||||||
|
r.Attrs(func(attr slog.Attr) bool {
|
||||||
|
attrs[attr.Key] = attr.Value.String()
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
|
||||||
|
r.attrs = attrs
|
||||||
|
return attrs, true
|
||||||
|
}
|
||||||
|
return r.attrs, true
|
||||||
|
default:
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,65 @@
|
|||||||
|
package expressions
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"log/slog"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func BenchmarkFilter(b *testing.B) {
|
||||||
|
log := slog.New(slog.NewTextHandler(io.Discard, nil))
|
||||||
|
filter, err := NewFilter(log, "benchmark", `msg == "hello"`)
|
||||||
|
if err != nil {
|
||||||
|
b.Fatalf("NewFilter() error = %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
record := slog.NewRecord(time.Now(), slog.LevelInfo, "hello", 0)
|
||||||
|
record.AddAttrs(slog.String("foo", "bar"))
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
b.ReportAllocs()
|
||||||
|
|
||||||
|
for b.Loop() {
|
||||||
|
filter.Filter(ctx, record)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkFilterAttributes(b *testing.B) {
|
||||||
|
for _, numAttrs := range []int{1, 2, 4, 8, 16, 32} {
|
||||||
|
b.Run(fmt.Sprintf("%d_attributes", numAttrs), func(b *testing.B) {
|
||||||
|
log := slog.New(slog.NewTextHandler(io.Discard, nil))
|
||||||
|
|
||||||
|
var sb strings.Builder
|
||||||
|
sb.WriteString(`msg == "hello" && "foo" in attrs`)
|
||||||
|
|
||||||
|
attrs := make([]slog.Attr, numAttrs)
|
||||||
|
for i := range numAttrs {
|
||||||
|
key := fmt.Sprintf("foo%d", i)
|
||||||
|
val := "bar"
|
||||||
|
attrs[i] = slog.String(key, val)
|
||||||
|
}
|
||||||
|
|
||||||
|
filter, err := NewFilter(log, "benchmark", sb.String())
|
||||||
|
if err != nil {
|
||||||
|
b.Fatalf("NewFilter() error = %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
record := slog.NewRecord(time.Now(), slog.LevelInfo, "hello", 0)
|
||||||
|
record.AddAttrs(attrs...)
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
b.ResetTimer()
|
||||||
|
b.ReportAllocs()
|
||||||
|
|
||||||
|
for b.Loop() {
|
||||||
|
filter.Filter(ctx, record)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user