diff --git a/config/templates.go b/config/templates.go index 1573952be1661f2ffdaddaaed41eddc574007020..e97097cdff3ff0a4b1c09b7349a02c30d789c06e 100644 --- a/config/templates.go +++ b/config/templates.go @@ -299,7 +299,8 @@ const sectionLoggingHeader = ` ## ## Log messages are sent to one of two 'channels': info or trace ## -## They are delivered on two independent streams: 'info' or 'info and trace' +## They are delivered on two independent streams: 'info' or 'info and trace'. +## Each of these streams has a root ## ################################################################################ diff --git a/logging/config/sinks.go b/logging/config/sinks.go index 89fbace405c93823f3bc4c8e8c6e95daaef0a5dc..5db2e9128d2cabe8030fe069a5417cc23325988c 100644 --- a/logging/config/sinks.go +++ b/logging/config/sinks.go @@ -12,6 +12,9 @@ import ( kitlog "github.com/go-kit/kit/log" ) +// This file contains definitions for a configurable output graph for the +// logging system. + type source string type outputType string type transformType string @@ -32,7 +35,7 @@ const ( Filter transformType = "filter" // Remove key-val pairs from each log line Prune transformType = "prune" - // Add key value pairs to each log linel + // Add key value pairs to each log line Label transformType = "label" Capture transformType = "capture" // TODO [Silas]: add 'flush on exit' transform which flushes the buffer of @@ -229,6 +232,15 @@ func LabelTransform(prefix bool, labelKeyvals ...string) *TransformConfig { } } +func PruneTransform(keys ...string) *TransformConfig { + return &TransformConfig{ + TransformType: Prune, + PruneConfig: &PruneConfig{ + Keys: keys, + }, + } +} + func FilterTransform(fmode filterMode, keyvalueRegexes ...string) *TransformConfig { length := len(keyvalueRegexes) / 2 predicates := make([]*KeyValuePredicateConfig, length) @@ -249,7 +261,6 @@ func FilterTransform(fmode filterMode, keyvalueRegexes ...string) *TransformConf } // Logger formation - func (sinkConfig *SinkConfig) BuildLogger() (kitlog.Logger, map[string]*loggers.CaptureLogger, error) { return BuildLoggerFromSinkConfig(sinkConfig, make(map[string]*loggers.CaptureLogger)) } @@ -261,6 +272,8 @@ func BuildLoggerFromSinkConfig(sinkConfig *SinkConfig, } numSinks := len(sinkConfig.Sinks) outputLoggers := make([]kitlog.Logger, numSinks, numSinks+1) + // We need a depth-first post-order over the output loggers so we'll keep + // recurring into children sinks we reach a terminal sink (with no children) for i, sc := range sinkConfig.Sinks { l, captures, err := BuildLoggerFromSinkConfig(sc, captures) if err != nil { @@ -269,6 +282,7 @@ func BuildLoggerFromSinkConfig(sinkConfig *SinkConfig, outputLoggers[i] = l } + // Grab the outputs after we have terminated any children sinks above if sinkConfig.Output != nil && sinkConfig.Output.OutputType != NoOutput { l, err := BuildOutputLogger(sinkConfig.Output) if err != nil { @@ -333,9 +347,12 @@ func BuildTransformLogger(transformConfig *TransformConfig, return kitlog.NewContext(outputLogger).With(keyvals...), captures, nil } case Prune: + keys := make([]interface{}, len(transformConfig.PruneConfig.Keys)) + for i, k := range transformConfig.PruneConfig.Keys { + keys[i] = k + } return kitlog.LoggerFunc(func(keyvals ...interface{}) error { - return outputLogger.Log(structure.RemoveKeys(keyvals, - transformConfig.PruneConfig.Keys)) + return outputLogger.Log(structure.RemoveKeys(keyvals, keys...)...) }), captures, nil case Capture: diff --git a/logging/config/sinks_test.go b/logging/config/sinks_test.go index e0aa4f2025866459635971366e864f8932bbeaaa..fbe035bb410310ba7b3b13e993ff924973f4121a 100644 --- a/logging/config/sinks_test.go +++ b/logging/config/sinks_test.go @@ -80,18 +80,21 @@ func TestFilterSinks(t *testing.T) { assert.Equal(t, ll, excluded.BufferLogger().FlushLogLines()) } -func TestSyslogOutput(t *testing.T) { - _, _, err := Sink().SetOutput(RemoteSyslogOutput("Foo", - "tcp://logging.example.com:6514")).BuildLogger() - assert.Error(t, err) - assert.Contains(t, err.Error(), "no such host") +func TestPruneTransform(t *testing.T) { + sinkConfig := Sink(). + SetTransform(PruneTransform("Trace")). + AddSinks(Sink(). + SetTransform(CaptureTransform("cap", 100, false))) - logger, _, err := Sink().SetOutput(SyslogOutput("Foo")).BuildLogger() - if err != nil { - assert.Contains(t, err.Error(), "syslog delivery error") - } else { - logger.Log("LogTo", "Syslog") - } + logger, captures, err := sinkConfig.BuildLogger() + assert.NoError(t, err) + logger.Log("msg", "Hello with a trace", + "Trace", []string{"logger:32, state:23"}) + logger.Log("msg", "Goodbye with a trace", + "Trace", []string{"logger:32, state:14"}) + assert.Equal(t, logLines("msg", "Hello with a trace", "", + "msg", "Goodbye with a trace"), + captures["cap"].FlushLogLines()) } // Takes a variadic argument of log lines as a list of key value pairs delimited diff --git a/logging/loggers/capture_logger.go b/logging/loggers/capture_logger.go index e95be66149615efcc2ac07c2c6bda11d389a3941..e195c3a454c24fdd9d8056cf937197a27642b7a5 100644 --- a/logging/loggers/capture_logger.go +++ b/logging/loggers/capture_logger.go @@ -1,9 +1,10 @@ package loggers import ( + "sync" + "github.com/eapache/channels" kitlog "github.com/go-kit/kit/log" - "sync" ) type CaptureLogger struct { @@ -18,8 +19,8 @@ var _ kitlog.Logger = (*CaptureLogger)(nil) // Capture logger captures output set to it into a buffer logger and retains // a reference to an output logger (the logger whose input it is capturing). // It can optionally passthrough logs to the output logger. -// Because it holds a refereence to its output it can also be used to coordinate -// Flushing of the buffer to the output logger only in exceptional circumstances +// Because it holds a reference to its output it can also be used to coordinate +// Flushing of the buffer to the output logger in exceptional circumstances only func NewCaptureLogger(outputLogger kitlog.Logger, bufferCap channels.BufferCap, passthrough bool) *CaptureLogger { return &CaptureLogger{ @@ -31,18 +32,22 @@ func NewCaptureLogger(outputLogger kitlog.Logger, bufferCap channels.BufferCap, func (cl *CaptureLogger) Log(keyvals ...interface{}) error { err := cl.bufferLogger.Log(keyvals...) - if cl.passthrough { + if cl.Passthrough() { err = cl.outputLogger.Log(keyvals...) } return err } +// Sets whether the CaptureLogger is forwarding log lines sent to it through +// to its output logger. Concurrently safe. func (cl *CaptureLogger) SetPassthrough(passthrough bool) { cl.RWMutex.Lock() cl.passthrough = passthrough cl.RWMutex.Unlock() } +// Gets whether the CaptureLogger is forwarding log lines sent to through to its +// OutputLogger. Concurrently Safe. func (cl *CaptureLogger) Passthrough() bool { cl.RWMutex.RLock() passthrough := cl.passthrough @@ -52,14 +57,29 @@ func (cl *CaptureLogger) Passthrough() bool { // Flushes every log line available in the buffer at the time of calling // to the OutputLogger and returns. Does not block indefinitely. +// +// Note: will remove log lines from buffer so they will not be produced on any +// subsequent flush of buffer func (cl *CaptureLogger) Flush() { cl.bufferLogger.Flush(cl.outputLogger) } +// Flushes every log line available in the buffer at the time of calling +// to a slice and returns it. Does not block indefinitely. +// +// Note: will remove log lines from buffer so they will not be produced on any +// subsequent flush of buffer +func (cl *CaptureLogger) FlushLogLines() [][]interface{} { + return cl.bufferLogger.FlushLogLines() +} + +// The OutputLogger whose input this CaptureLogger is capturing func (cl *CaptureLogger) OutputLogger() kitlog.Logger { return cl.outputLogger } +// The BufferLogger where the input into these CaptureLogger is stored in a ring +// buffer of log lines. func (cl *CaptureLogger) BufferLogger() *ChannelLogger { return cl.bufferLogger } diff --git a/logging/loggers/capture_logger_test.go b/logging/loggers/capture_logger_test.go index 26741accf9d9d2c4e20c72bab1d3c84d4fa30321..d6caea72cadaf5ee23665d0a7e32f4a0387adbe4 100644 --- a/logging/loggers/capture_logger_test.go +++ b/logging/loggers/capture_logger_test.go @@ -17,8 +17,8 @@ func TestFlushCaptureLogger(t *testing.T) { // Flush the ones we bufferred cl.Flush() - ll := outputLogger.logLines() - assert.Equal(t, buffered, len(ll)) + _, err := outputLogger.logLines(buffered) + assert.NoError(t, err) } func TestTeeCaptureLogger(t *testing.T) { @@ -29,8 +29,8 @@ func TestTeeCaptureLogger(t *testing.T) { cl.Log("Foo", "Bar", "Index", i) } // Check passthrough to output - ll := outputLogger.logLines() - assert.Equal(t, buffered, len(ll)) + ll, err := outputLogger.logLines(buffered) + assert.NoError(t, err) assert.Equal(t, ll, cl.BufferLogger().FlushLogLines()) cl.SetPassthrough(false) @@ -41,5 +41,9 @@ func TestTeeCaptureLogger(t *testing.T) { assert.True(t, outputLogger.empty()) cl.Flush() - assert.Equal(t, 100, len(outputLogger.logLines())) -} \ No newline at end of file + _, err = outputLogger.logLines(100) + assert.NoError(t, err) + _, err = outputLogger.logLines(1) + // Expect timeout + assert.Error(t, err) +} diff --git a/logging/loggers/channel_logger.go b/logging/loggers/channel_logger.go index 0e978020fbcdc1872980e9b4c3857b94e1fe6c9a..da26a81ba6415da11fb4753a4eb4254e89871dc0 100644 --- a/logging/loggers/channel_logger.go +++ b/logging/loggers/channel_logger.go @@ -95,17 +95,18 @@ func readLogLine(logLine interface{}, ok bool) []interface{} { // // Exits if the channel is closed. func (cl *ChannelLogger) DrainForever(logger kitlog.Logger) { - logLine := cl.WaitReadLogLine() // logLine could be nil if channel was closed while waiting for next line - if logLine != nil { + for logLine := cl.WaitReadLogLine(); logLine != nil; logLine = cl.WaitReadLogLine() { logger.Log(logLine...) } } // Drains everything that is available at the time of calling func (cl *ChannelLogger) Flush(logger kitlog.Logger) { - bufferSize := cl.ch.Len() - for i := 0; i < bufferSize; i++ { + // Grab the buffer at the here rather than within loop condition so that we + // do not drain the buffer forever + bufferLength := cl.BufferLength() + for i := 0; i < bufferLength; i++ { logLine := cl.WaitReadLogLine() if logLine != nil { logger.Log(logLine...) @@ -117,14 +118,13 @@ func (cl *ChannelLogger) Flush(logger kitlog.Logger) { // for at least one line func (cl *ChannelLogger) FlushLogLines() [][]interface{} { logLines := make([][]interface{}, 0, cl.ch.Len()) - cl.Flush(kitlog.LoggerFunc(func(keyvals... interface{}) error { + cl.Flush(kitlog.LoggerFunc(func(keyvals ...interface{}) error { logLines = append(logLines, keyvals) return nil })) return logLines } - // Close the existing channel halting goroutines that are draining the channel // and create a new channel to buffer into. Should not cause any log lines // arriving concurrently to be lost, but any that have not been drained from @@ -136,25 +136,10 @@ func (cl *ChannelLogger) Reset() { cl.RWMutex.Unlock() } -func (cl *ChannelLogger) WaitLogLines() [][]interface{} { - logLines := make([][]interface{}, 0, cl.ch.Len()) - // Wait for first line - logLines = append(logLines,cl.WaitReadLogLine()) - cl.Flush(kitlog.LoggerFunc(func(keyvals... interface{}) error { - logLines = append(logLines, keyvals) - return nil - })) - return logLines -} - -// Wraps an underlying Logger baseLogger to provide a Logger that is -// is non-blocking on calls to Log. -func NonBlockingLogger(logger kitlog.Logger) *ChannelLogger { +// Returns a Logger that wraps the outputLogger passed and does not block on +// calls to Log. +func NonBlockingLogger(outputLogger kitlog.Logger) *ChannelLogger { cl := NewChannelLogger(DefaultLoggingRingBufferCap) - go cl.DrainForever(logger) + go cl.DrainForever(outputLogger) return cl } - -func lessThanCap(i int, cap channels.BufferCap) bool { - return cap == channels.Infinity || i < int(cap) -} diff --git a/logging/loggers/channel_logger_test.go b/logging/loggers/channel_logger_test.go index b54f906a9b04cdbc74e2725b08d578715cf3e34b..5c1dc4c7658ea14387bbcaad921d86e876d6851e 100644 --- a/logging/loggers/channel_logger_test.go +++ b/logging/loggers/channel_logger_test.go @@ -19,6 +19,7 @@ import ( "github.com/eapache/channels" "github.com/stretchr/testify/assert" + "time" ) func TestChannelLogger(t *testing.T) { @@ -55,5 +56,20 @@ func TestChannelLogger_Reset(t *testing.T) { assert.Equal(t, i, ll[1]) } assert.Nil(t, cl.ReadLogLine(), "Since we have drained the buffer there "+ - "should be no more log lines.") -} \ No newline at end of file + "should be no more log lines.") +} + +func TestNonBlockingLogger(t *testing.T) { + tl := newTestLogger() + nbl := NonBlockingLogger(tl) + nbl.Log("Foo", "Bar") + nbl.Log("Baz", "Bur") + nbl.Log("Badger", "Romeo") + time.Sleep(time.Second) + + lls, err := tl.logLines(3) + assert.NoError(t, err) + assert.Equal(t, logLines("Foo", "Bar", "", + "Baz", "Bur", "", + "Badger", "Romeo"), lls) +} diff --git a/logging/loggers/multiple_output_logger_test.go b/logging/loggers/multiple_output_logger_test.go index d282a90d3db5965a0a9e39ecf564c6eedd25e46a..d310328bd0546ebb48cb10d4f14bd8a3b0cb64f0 100644 --- a/logging/loggers/multiple_output_logger_test.go +++ b/logging/loggers/multiple_output_logger_test.go @@ -24,9 +24,13 @@ func TestNewMultipleOutputLogger(t *testing.T) { a, b := newErrorLogger("error a"), newErrorLogger("error b") mol := NewMultipleOutputLogger(a, b) logLine := []interface{}{"msg", "hello"} - err := mol.Log(logLine...) + errLog := mol.Log(logLine...) expected := [][]interface{}{logLine} - assert.Equal(t, expected, a.logLines()) - assert.Equal(t, expected, b.logLines()) - assert.IsType(t, multipleErrors{}, err) + logLineA, err := a.logLines(1) + assert.NoError(t, err) + logLineB, err := b.logLines(1) + assert.NoError(t, err) + assert.Equal(t, expected, logLineA) + assert.Equal(t, expected, logLineB) + assert.IsType(t, multipleErrors{}, errLog) } diff --git a/logging/loggers/shared_test.go b/logging/loggers/shared_test.go index 0ea6ae407739ea68ebbbd1dc22b981dbb29c309c..56445b6ed01298ee5f89fc0ee0c53af347e97578 100644 --- a/logging/loggers/shared_test.go +++ b/logging/loggers/shared_test.go @@ -1,35 +1,81 @@ package loggers -import "errors" +import ( + "errors" + "fmt" + "time" + + "github.com/go-kit/kit/log" +) + +const logLineTimeout time.Duration = time.Second type testLogger struct { - cl *ChannelLogger - err error + channelLogger *ChannelLogger + logLineCh chan ([]interface{}) + err error } -func (el *testLogger) empty() bool { - return el.cl.BufferLength() == 0 +func (tl *testLogger) empty() bool { + return tl.channelLogger.BufferLength() == 0 } -func (el *testLogger) logLines() [][]interface{} { - return el.cl.WaitLogLines() +func (tl *testLogger) logLines(numberOfLines int) ([][]interface{}, error) { + logLines := make([][]interface{}, numberOfLines) + for i := 0; i < numberOfLines; i++ { + select { + case logLine := <-tl.logLineCh: + logLines[i] = logLine + case <-time.After(logLineTimeout): + return logLines, fmt.Errorf("Timed out waiting for log line "+ + "(waited %s)", logLineTimeout) + } + } + return logLines, nil } -func (el *testLogger) Log(keyvals ...interface{}) error { - el.cl.Log(keyvals...) - return el.err +func (tl *testLogger) Log(keyvals ...interface{}) error { + tl.channelLogger.Log(keyvals...) + return tl.err } func newErrorLogger(errMessage string) *testLogger { - return &testLogger{ - cl: NewChannelLogger(100), - err: errors.New(errMessage), - } + return makeTestLogger(errors.New(errMessage)) } func newTestLogger() *testLogger { + return makeTestLogger(nil) +} + +func makeTestLogger(err error) *testLogger { + cl := NewChannelLogger(100) + logLineCh := make(chan ([]interface{})) + go cl.DrainForever(log.LoggerFunc(func(keyvals ...interface{}) error { + logLineCh <- keyvals + return nil + })) return &testLogger{ - cl: NewChannelLogger(100), - err: nil, + channelLogger: cl, + logLineCh: logLineCh, + err: err, + } +} + +// Takes a variadic argument of log lines as a list of key value pairs delimited +// by the empty string +func logLines(keyvals ...string) [][]interface{} { + llines := make([][]interface{}, 0) + line := make([]interface{}, 0) + for _, kv := range keyvals { + if kv == "" { + llines = append(llines, line) + line = make([]interface{}, 0) + } else { + line = append(line, kv) + } + } + if len(line) > 0 { + llines = append(llines, line) } + return llines } diff --git a/logging/loggers/vector_valued_logger_test.go b/logging/loggers/vector_valued_logger_test.go index 33a0c5d8980a1f39ed90106bc3108f1424dee964..6cd940644504b9ba159520608807b2092e84637b 100644 --- a/logging/loggers/vector_valued_logger_test.go +++ b/logging/loggers/vector_valued_logger_test.go @@ -25,7 +25,8 @@ func TestVectorValuedLogger(t *testing.T) { logger := newTestLogger() vvl := VectorValuedLogger(logger) vvl.Log("foo", "bar", "seen", 1, "seen", 3, "seen", 2) - + lls, err := logger.logLines(1) + assert.NoError(t, err) assert.Equal(t, Slice("foo", "bar", "seen", Slice(1, 3, 2)), - logger.logLines()[0]) + lls[0]) }