From 10f05938ac284cf6bc9adb7c05a0d5713c44e4f2 Mon Sep 17 00:00:00 2001 From: Xe Iaso Date: Tue, 5 Aug 2025 01:25:47 +0000 Subject: [PATCH] feat(expressions): add CEL log filtering calls Signed-off-by: Xe Iaso --- lib/policy/expressions/logging.go | 120 +++++++++++++++++++++++++ lib/policy/expressions/logging_test.go | 65 ++++++++++++++ 2 files changed, 185 insertions(+) create mode 100644 lib/policy/expressions/logging.go create mode 100644 lib/policy/expressions/logging_test.go diff --git a/lib/policy/expressions/logging.go b/lib/policy/expressions/logging.go new file mode 100644 index 00000000..ea0e8021 --- /dev/null +++ b/lib/policy/expressions/logging.go @@ -0,0 +1,120 @@ +package expressions + +import ( + "context" + "fmt" + "log/slog" + "time" + + "github.com/google/cel-go/cel" + "github.com/google/cel-go/common/types" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + timestamp "google.golang.org/protobuf/types/known/timestamppb" +) + +var ( + filterInvocations = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "techaro", + Subsystem: "anubis", + Name: "slog_filter_invocations", + }, []string{"name"}) + + filterExecutionTime = promauto.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "techaro", + Subsystem: "anubis", + Name: "slog_filter_execution_time_nanoseconds", + Buckets: []float64{10, 50, 100, 200, 500, 1000, 2000, 5000, 10000, 20000, 50000, 100000, 200000, 500000, 1000000, 2000000, 5000000, 10000000}, // 10 nanoseconds to 10 milliseconds + }, []string{"name"}) +) + +func LogFilter(opts ...cel.EnvOption) (*cel.Env, error) { + return New( + // Slog record metadata + cel.Variable("time", cel.TimestampType), + cel.Variable("msg", cel.StringType), + cel.Variable("level", cel.StringType), + cel.Variable("attrs", cel.MapType(cel.StringType, cel.StringType)), + ) +} + +func NewFilter(lg *slog.Logger, name, src string) (*Filter, error) { + env, err := LogFilter() + if err != nil { + return nil, fmt.Errorf("logging: can't create CEL env: %w", err) + } + + program, err := Compile(env, src) + if err != nil { + return nil, fmt.Errorf("logging: can't compile expression: Compile(%q): %w", src, err) + } + + return &Filter{ + program: program, + name: name, + src: src, + log: lg.With("filter", name), + }, nil +} + +type Filter struct { + program cel.Program + name string + src string + log *slog.Logger +} + +func (f Filter) Filter(ctx context.Context, r slog.Record) bool { + t0 := time.Now() + + result, _, err := f.program.ContextEval(ctx, &Record{ + Record: r, + }) + if err != nil { + f.log.Error("error executing log filter", "err", err, "src", f.src) + return false + } + dur := time.Since(t0) + filterExecutionTime.WithLabelValues(f.name).Observe(float64(dur.Nanoseconds())) + filterInvocations.WithLabelValues(f.name).Inc() + //f.log.Debug("filter execution", "dur", dur.Nanoseconds()) + + if val, ok := result.(types.Bool); ok { + return !bool(val) + } + + return false +} + +type Record struct { + slog.Record + attrs map[string]string +} + +func (r *Record) Parent() cel.Activation { return nil } + +func (r *Record) ResolveName(name string) (any, bool) { + switch name { + case "time": + return ×tamp.Timestamp{Seconds: r.Time.Unix()}, true + case "msg": + return r.Message, true + case "level": + return r.Level.String(), true + case "attrs": + if r.attrs == nil { + attrs := map[string]string{} + + r.Attrs(func(attr slog.Attr) bool { + attrs[attr.Key] = attr.Value.String() + return true + }) + + r.attrs = attrs + return attrs, true + } + return r.attrs, true + default: + return nil, false + } +} diff --git a/lib/policy/expressions/logging_test.go b/lib/policy/expressions/logging_test.go new file mode 100644 index 00000000..f13afd5e --- /dev/null +++ b/lib/policy/expressions/logging_test.go @@ -0,0 +1,65 @@ +package expressions + +import ( + "context" + "fmt" + "io" + "log/slog" + "strings" + "testing" + "time" +) + +func BenchmarkFilter(b *testing.B) { + log := slog.New(slog.NewTextHandler(io.Discard, nil)) + filter, err := NewFilter(log, "benchmark", `msg == "hello"`) + if err != nil { + b.Fatalf("NewFilter() error = %v", err) + } + + record := slog.NewRecord(time.Now(), slog.LevelInfo, "hello", 0) + record.AddAttrs(slog.String("foo", "bar")) + + ctx := context.Background() + + b.ReportAllocs() + + for b.Loop() { + filter.Filter(ctx, record) + } +} + +func BenchmarkFilterAttributes(b *testing.B) { + for _, numAttrs := range []int{1, 2, 4, 8, 16, 32} { + b.Run(fmt.Sprintf("%d_attributes", numAttrs), func(b *testing.B) { + log := slog.New(slog.NewTextHandler(io.Discard, nil)) + + var sb strings.Builder + sb.WriteString(`msg == "hello" && "foo" in attrs`) + + attrs := make([]slog.Attr, numAttrs) + for i := range numAttrs { + key := fmt.Sprintf("foo%d", i) + val := "bar" + attrs[i] = slog.String(key, val) + } + + filter, err := NewFilter(log, "benchmark", sb.String()) + if err != nil { + b.Fatalf("NewFilter() error = %v", err) + } + + record := slog.NewRecord(time.Now(), slog.LevelInfo, "hello", 0) + record.AddAttrs(attrs...) + + ctx := context.Background() + + b.ResetTimer() + b.ReportAllocs() + + for b.Loop() { + filter.Filter(ctx, record) + } + }) + } +}