时间:2023-01-26 09:45:23 | 栏目:Python代码 | 点击:次
Python logging 模块定义了为应用程序和库实现灵活的事件日志记录的函数和类。
程序开发过程中,很多程序都有记录日志的需求,并且日志包含的信息有正常的程序访问日志还可能有错误、警告等信息输出,Python 的 logging 模块提供了标准的日志接口,可以通过它存储各种格式的日志,日志记录提供了一组便利功能,用于简单的日志记录用法。
使用 Python Logging 模块的主要好处是所有 Python 模块都可以参与日志记录Logging 模块提供了大量具有灵活性的功能。
简单且方便的帮助我们输出需要的日志信息:
使用 Python 来写程序或者脚本的话,常常遇到的问题就是需要对日志进行删除。一方面可以帮助我们在程序出问题的时候排除问题,二来可以帮助我们记录需要关注的信息。
但是,使用自带自带的 logging 模块的话,则需要我们进行不同的初始化等相关工作。对应不熟悉该模块的同学来说,还是有些费劲的,比如需要配置 Handler/Formatter 等。 随着业务的复杂度提升, 对日志收集有着更高的要求, 例如: 日志分类, 文件存储, 异步写入, 自定义类型等等
loguru 是一个 Python 简易且强大的第三方日志记录库,该库旨在通过添加一系列有用的功能来解决标准记录器的注意事项,从而减少 Python 日志记录的痛苦。
pip install loguru
有很多优点,以下列举了其中比较重要的几点:
from loguru import logger logger.debug("That's it, beautiful and simple logging!")
无需初始化,导入函数即可使用, 那么你肯定要问, 如何解决一下问题?
# add logger.add(sys.stderr, \ format="{time} {level} {message}",\ filter="my_module",\ level="INFO")
是不是很easy~
# 日志文件记录 logger.add("file_{time}.log") # 日志文件转存 logger.add("file_{time}.log", rotation="500 MB") logger.add("file_{time}.log", rotation="12:00") logger.add("file_{time}.log", rotation="1 week") # 多次时间之后清理 logger.add("file_X.log", retention="10 days") # 使用zip文件格式保存 logger.add("file_Y.log", compression="zip")
logger.info( "If you're using Python {}, prefer {feature} of course!", 3.10, feature="f-strings")
@logger.catch def my_function(x, y, z): # An error? It's caught anyway! return 1 / (x + y + z) my_function(0, 0, 0)
Loguru 会自动为不同的日志级别,添加不同的颜色进行区分, 也支持自定义颜色哦~
logger.add(sys.stdout, colorize=True, format="<green>{time}</green> <level>{message}</level>") logger.add('logs/z_{time}.log', level='DEBUG', format='{time:YYYY-MM-DD :mm:ss} - {level} - {file} - {line} - {message}', rotation="10 MB")
# 异步写入 logger.add("some_file.log", enqueue=True)
你没有看错, 只需要enqueue=True
即可异步执行
用于记录代码中发生的异常的 bug 跟踪,Loguru 通过允许显示整个堆栈跟踪(包括变量值)来帮助您识别问题
logger.add("out.log", backtrace=True, diagnose=True) def func(a, b): return a / b def nested(c): try: func(5, c) except ZeroDivisionError: logger.exception("What?!") nested(0)
# 序列化为json格式 logger.add(custom_sink_function, serialize=True) # bind方法的用处 logger.add("file.log", format="{extra[ip]} {extra[user]} {message}") context_logger = logger.bind(ip="192.168.2.174", user="someone") context_logger.info("Contextualize your logger easily") context_logger.bind(user="someone_else").info("Inline binding of extra attribute") context_logger.info("Use kwargs to add context during formatting: {user}", user="anybody") # 粒度控制 logger.add("special.log", filter=lambda record: "special" in record["extra"]) logger.debug("This message is not logged to the file") logger.bind(special=True).info("This message, though, is logged to the file!") # patch()方法的用处 logger.add(sys.stderr, format="{extra[utc]} {message}") loggerlogger = logger.patch(lambda record: record["extra"].update(utc=datetime.utcnow()))
有时希望在生产环境中记录详细信息而不会影响性能,可以使用 opt() 方法来实现这一点。
logger.opt(lazy=True).debug("If sink level <= DEBUG: {x}", x=lambda: expensive_function(2**64)) # By the way, "opt()" serves many usages logger.opt(exception=True).info("Error stacktrace added to the log message (tuple accepted too)") logger.opt(colors=True).info("Per message <blue>colors</blue>") logger.opt(record=True).info("Display values from the record (eg. {record[thread]})") logger.opt(raw=True).info("Bypass sink formatting\n") logger.opt(depth=1).info("Use parent stack context (useful within wrapped functions)") logger.opt(capture=False).info("Keyword arguments not added to {dest} dict", dest="extra")
new_level = logger.level("SNAKY", no=38, color="<yellow>", icon="?") logger.log("SNAKY", "Here we go!")
# For scripts config = { "handlers": [ {"sink": sys.stdout, "format": "{time} - {message}"}, {"sink": "file.log", "serialize": True}, ], "extra": {"user": "someone"} } logger.configure(**config) # For libraries logger.disable("my_library") logger.info("No matter added sinks, this message is not displayed") logger.enable("my_library") logger.info("This message however is propagated to the sinks")
handler = logging.handlers.SysLogHandler(address=('localhost', 514)) logger.add(handler) class PropagateHandler(logging.Handler): def emit(self, record): logging.getLogger(record.name).handle(record) logger.add(PropagateHandler(), format="{message}") class InterceptHandler(logging.Handler): def emit(self, record): # Get corresponding Loguru level if it exists try: level = logger.level(record.levelname).name except ValueError: level = record.levelno # Find caller from where originated the logged message frame, depth = logging.currentframe(), 2 while frame.f_code.co_filename == logging.__file__: frameframe = frame.f_back depth += 1 logger.opt(depthdepth=depth, exception=record.exc_info).log(level, record.getMessage()) logging.basicConfig(handlers=[InterceptHandler()], level=0)
从生成的日志中提取特定的信息通常很有用,这就是为什么 Loguru 提供了一个 parse() 方法来帮助处理日志和正则表达式。
pattern = r"(?P<time>.*) - (?P<level>[0-9]+) - (?P<message>.*)" # Regex with named groups caster_dict = dict(time=dateutil.parser.parse, level=int) # Transform matching groups for groups in logger.parse("file.log", pattern, cast=caster_dict): print("Parsed:", groups) # {"level": 30, "message": "Log example", "time": datetime(2018, 12, 09, 11, 23, 55)}
import notifiers params = { "username": "you@gmail.com", "password": "abc123", "to": "dest@gmail.com" } # Send a single notification notifier = notifiers.get_notifier("gmail") notifier.notify(message="The application is running!", **params) # Be alerted on each error message from notifiers.logging import NotificationHandler handler = NotificationHandler("gmail", defaults=params) logger.add(handler, level="ERROR")
import logging import sys from pathlib import Path from flask import Flask from loguru import logger app = Flask(__name__) class InterceptHandler(logging.Handler): def emit(self, record): loggerlogger_opt = logger.opt(depth=6, exception=record.exc_info) logger_opt.log(record.levelname, record.getMessage()) def configure_logging(flask_app: Flask): """配置日志""" path = Path(flask_app.config['LOG_PATH']) if not path.exists(): path.mkdir(parents=True) log_name = Path(path, 'sips.log') logging.basicConfig(handlers=[InterceptHandler(level='INFO')], level='INFO') # 配置日志到标准输出流 logger.configure(handlers=[{"sink": sys.stderr, "level": 'INFO'}]) # 配置日志到输出到文件 logger.add(log_name, rotation="500 MB", encoding='utf-8', colorize=False, level='INFO')
介绍,主要函数的使用方法和细节 - add()的创建和删除
def add(self, sink, *, level=_defaults.LOGURU_LEVEL, format=_defaults.LOGURU_FORMAT, filter=_defaults.LOGURU_FILTER, colorize=_defaults.LOGURU_COLORIZE, serialize=_defaults.LOGURU_SERIALIZE, backtrace=_defaults.LOGURU_BACKTRACE, diagnose=_defaults.LOGURU_DIAGNOSE, enqueue=_defaults.LOGURU_ENQUEUE, catch=_defaults.LOGURU_CATCH, **kwargs ):
另外添加 sink 之后我们也可以对其进行删除,相当于重新刷新并写入新的内容。删除的时候根据刚刚 add 方法返回的 id 进行删除即可。可以发现,在调用 remove 方法之后,确实将历史 log 删除了。但实际上这并不是删除,只不过是将 sink 对象移除之后,在这之前的内容不会再输出到日志中,这样我们就可以实现日志的刷新重新写入操作
from loguru import logger trace = logger.add('runtime.log') logger.debug('this is a debug message') logger.remove(trace) logger.debug('this is another debug message')
我们在开发流程中, 通过日志快速定位问题, 高效率解决问题, 我认为 loguru 能帮你解决不少麻烦, 赶快试试吧~
当然, 使用各种也有不少麻烦, 例如:
--- Logging error in Loguru Handler #3 ---
Record was: None
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/loguru/_handler.py", line 272, in _queued_writer
message = queue.get()
File "/usr/local/lib/python3.9/multiprocessing/queues.py", line 366, in get
res = self._reader.recv_bytes()
File "/usr/local/lib/python3.9/multiprocessing/connection.py", line 221, in recv_bytes
buf = self._recv_bytes(maxlength)
File "/usr/local/lib/python3.9/multiprocessing/connection.py", line 419, in _recv_bytes
buf = self._recv(4)
File "/usr/local/lib/python3.9/multiprocessing/connection.py", line 384, in _recv
chunk = read(handle, remaining)
OSError: [Errno 9] Bad file descriptor
--- End of logging error ---
解决办法:
尝试将logs文件夹忽略git提交, 避免和服务器文件冲突即可;
当然也不止这个原因引起这个问题, 也可能是三方库(ciscoconfparse)冲突所致.解决办法: https://github.com/Delgan/loguru/issues/534
File "/home/ronaldinho/xxx/xxx/venv/lib/python3.9/site-packages/loguru/_logger.py", line 939, in add
handler = Handler(
File "/home/ronaldinho/xxx/xxx/venv/lib/python3.9/site-packages/loguru/_handler.py", line 86, in __init__
self._queue = multiprocessing.SimpleQueue()
File "/home/ronaldinho/.pyenv/versions/3.9.4/lib/python3.9/multiprocessing/context.py", line 113, in SimpleQueue
return SimpleQueue(ctx=self.get_context())
File "/home/ronaldinho/.pyenv/versions/3.9.4/lib/python3.9/multiprocessing/queues.py", line 342, in __init__
self._rlock = ctx.Lock()
File "/home/ronaldinho/.pyenv/versions/3.9.4/lib/python3.9/multiprocessing/context.py", line 68, in Lock
return Lock(ctx=self.get_context())
File "/home/ronaldinho/.pyenv/versions/3.9.4/lib/python3.9/multiprocessing/synchronize.py", line 162, in __init__
File "/home/ronaldinho/.pyenv/versions/3.9.4/lib/python3.9/multiprocessing/synchronize.py", line 57, in __init__
OSError: [Errno 24] Too many open files
你可以 remove()添加的处理程序,它应该释放文件句柄。
总之, 诸如此类的问题都能找到解决方法, 总体来说这个库是非常值得应用的, 白看不如一试, 快去coding吧~