extras.logstash — Logstash Server Emulation


An emulated, lightweight server that resembles Logstash.

Example Usage
import socket
import time
ls = FakeLogstashService()
ls.start()
# Logstash Handler internally creates this connection and sends a log.
s = socket.create_connection((ls.local_host, ls.port))
s.sendall(b'{"record": "value", "level": "ERROR"}\\n')
s.close()
time.sleep(0.01)  # Wait for service to process message
ls.stop()
ls.assert_logs("ERROR")
assert ls.records[0]["record"] == "value"
class extras.logstash.FakeLogstashService(port: int = 0)[source]

Implements a fake logging service that closely resembles Logstash.

Accepts external TCP connections, with logs received in the “json_lines” format.

Inherits from YellowService.

Parameters:

port – Port to bind the server to. Default is random port.

records: list[dict[str, ...]][source]

A list of all records received.

Each record is a dictionary that has the same keys as expected by Logstash, and is generated by Python’s numerous Logstash packages.

All records have at least the following keys:

  • '@timestamp': Log timestamp in ISO-8601 format (str).

  • '@version': Logstash format version as a string (str always "1").

  • 'message': Log message (str).

  • 'host': Host sending the message (str).

  • 'path': Path to the module writing the log (str).

  • 'tags': List of tags (list[str]).

  • 'type': Always "Logstash".

  • 'level': An all upper-case name of the log level (str).

  • 'logger_name': Logger name (str).

  • 'stack_info': Formatted stacktrace if one exists, otherwise None (str | None).

More keys may be added by the specific software sending the logs.

You are welcome to modify, clear or iterate over this during runtime. New logs will be added in the order they were received.

port: int[source]

Dynamic port external service should connect to.

FakeLogstashService automatically binds a free port during initialization unless chosen otherwise.

local_host: str[source]

Host to connect to from the local machine (i.e. “localhost”)

container_host: str[source]

Host to connect to from inside containers.

encoding: str[source]

Encoding of the json lines received. Defaults to "utf-8" per specification.

Note

This attribute will be ignored if it is changed after the service is started.

delimiter: str[source]

Delimiter splitting between json objects. Defaults to b'\n' per specification.

Note

This attribute will be ignored if it is changed after the service is started.

Has the following additional methods:

filter_records(level: str | int) Iterator[dict[str, ...]][source]

Iterate over records in the given level or above.

Parameters:

level – Log level to filter by.

Returns:

An iterator over the valid records.

assert_logs(level: str | int)[source]

Asserts that log messages were received in the given level or above.

Parameters:

level – Log level to check.

Raises:

AssertionError if no logs of at least the given level were received.

Resembles unittest’s assertLogs().

assert_no_logs(level: str | int)[source]

Asserts that no log messages were received in the given level or above.

Parameters:

level – Log level to check.

Raises:

AssertionError if any logs of at least the given level were received.

Resembles unittest’s assertNoLogs().