fork download
  1. # -*- coding: utf-8 -*-
  2. import logging
  3. import sys
  4. import inspect
  5. # from functools import lru_cache
  6. from datetime import datetime
  7.  
  8.  
  9. class OptimizedFormatter(logging.Formatter):
  10. def __init__(self, *args, **kwargs):
  11. super(OptimizedFormatter, self).__init__(*args, **kwargs)
  12. self._frame_blacklist = {id(inspect.currentframe())}
  13.  
  14. def format(self, record):
  15. try:
  16. # 智能帧信息捕获
  17. if not all(hasattr(record, attr) for attr in ['module', 'lineno', 'funcName']):
  18. frame = self._find_caller_frame()
  19. if frame:
  20. self._populate_record(record, frame)
  21.  
  22. # 统一参数处理
  23. args_str = self._process_arguments(record)
  24. if args_str:
  25. record.message = u"%s ⎸ %s" % (record.message, args_str)
  26. else:
  27. record.message = record.getMessage()
  28.  
  29. return super(OptimizedFormatter, self).format(record)
  30. except Exception as e:
  31. return u"⚠️ Format Error | Err: %s | Raw: %s" % (e, record.getMessage())
  32.  
  33. # @lru_cache(maxsize=128)
  34. def _find_caller_frame(self):
  35. """带缓存的调用帧查找"""
  36. stack = inspect.stack()
  37. for frame_info in reversed(stack):
  38. frame = frame_info[0]
  39. if id(frame) not in self._frame_blacklist and \
  40. inspect.getmodule(frame).__name__ != __name__:
  41. return frame
  42. return inspect.currentframe()
  43.  
  44. def _populate_record(self, record, frame):
  45. """填充记录元数据"""
  46. module = inspect.getmodule(frame)
  47. record.module = module.__name__ if module else 'unknown'
  48. record.lineno = frame.f_lineno
  49. record.funcName = frame.f_code.co_name
  50.  
  51. def _process_arguments(self, record):
  52. """统一参数处理"""
  53. parts = []
  54.  
  55. # 处理位置参数
  56. if hasattr(record, 'positional_args') and record.positional_args:
  57. parts.extend(self._safe_repr(a) for a in record.positional_args)
  58.  
  59. # 处理关键字参数
  60. if hasattr(record, 'extra_kwargs') and record.extra_kwargs:
  61. parts.extend(u"%s=%s" % (k, self._safe_repr(v))
  62. for k, v in record.extra_kwargs.items())
  63.  
  64. return u" ⎸ ".join(parts) if parts else u""
  65.  
  66. def _safe_repr(self, value):
  67. """安全类型表示"""
  68. try:
  69. if isinstance(value, dict):
  70. return u"{%s}" % u",".join(u"%s:%s" % (self._safe_repr(k), self._safe_repr(v))
  71. for k, v in value.items())
  72. if isinstance(value, (list, tuple)):
  73. seq_type = u"[]" if isinstance(value, list) else u"()"
  74. return u"%s%s%s" % (seq_type[0], u",".join(self._safe_repr(x) for x in value), seq_type[1])
  75. return self._safe_str(value)
  76. except:
  77. return u"<Unrepresentable>"
  78.  
  79. @staticmethod
  80. def _safe_str(obj):
  81. """安全字符串转换"""
  82. try:
  83. if isinstance(obj, bytes):
  84. return obj.decode('utf-8', 'replace')
  85. return str(obj)
  86. except UnicodeDecodeError:
  87. return u"<Non-UTF8 Binary Data>"
  88. except:
  89. return u"<Unconvertible Object>"
  90.  
  91.  
  92. class SimpleLogger(object):
  93. _initialized = False
  94.  
  95. @classmethod
  96. def _ensure_init(cls):
  97. if not cls._initialized:
  98. root = logging.getLogger()
  99. root.setLevel(logging.DEBUG)
  100. if not root.handlers:
  101. handler = logging.StreamHandler()
  102. handler.setFormatter(OptimizedFormatter(
  103. fmt=u'[%(asctime)s] [%(levelname)s] %(module)s:%(lineno)d ➤ [%(funcName)s] %(message)s',
  104. datefmt='%H:%M:%S'
  105. ))
  106. root.addHandler(handler)
  107. cls._initialized = True
  108.  
  109. @classmethod
  110. def _log(cls, level, msg, *args, **kwargs):
  111. cls._ensure_init()
  112. try:
  113. # 消息预处理
  114. msg = cls._preprocess_message(msg)
  115.  
  116. # 获取调用上下文
  117. frame = inspect.currentframe().f_back.f_back
  118. logger = logging.getLogger(
  119. inspect.getmodule(frame).__name__
  120. if inspect.getmodule(frame) else 'unknown'
  121. )
  122.  
  123. # 创建日志记录
  124. logger.log(
  125. level,
  126. msg,
  127. extra={
  128. 'positional_args': args,
  129. 'extra_kwargs': kwargs
  130. },
  131. # stacklevel=3 # Python 3.8+ 支持
  132. )
  133. except Exception as e:
  134. sys.stderr.write(u"Log Error: %s\n" % str(e))
  135.  
  136. @staticmethod
  137. def _preprocess_message(msg):
  138. """消息预处理"""
  139. if isinstance(msg, bytes):
  140. try:
  141. return msg.decode('utf-8')
  142. except UnicodeDecodeError:
  143. return msg.decode('latin-1', 'replace')
  144. return str(msg)
  145.  
  146. @classmethod
  147. def debug(cls, msg, *args, **kwargs):
  148. cls._log(logging.DEBUG, msg, *args, **kwargs)
  149.  
  150. @classmethod
  151. def info(cls, msg, *args, **kwargs):
  152. cls._log(logging.INFO, msg, *args, **kwargs)
  153.  
  154. @classmethod
  155. def warning(cls, msg, *args, **kwargs):
  156. cls._log(logging.WARNING, msg, *args, **kwargs)
  157.  
  158. @classmethod
  159. def error(cls, msg, *args, **kwargs):
  160. cls._log(logging.ERROR, msg, *args, **kwargs)
  161.  
  162.  
  163. # 测试用例
  164. if __name__ == "__main__":
  165. # 测试中文日志
  166. SimpleLogger.info(u"用户登录", u"张三", ip="192.168.1.100")
  167.  
  168. # 测试混合参数
  169. SimpleLogger.error("配置错误", "database", code=500, detail={"line": 42, "file": "app.conf"})
  170.  
  171. # 测试二进制数据
  172. bad_data = b'\xe6\x97\xa0' # UTF-8编码的"无"
  173. SimpleLogger.warning("无效数据流", bad_data, sector=[0x12, 0xff, 0x7f])
  174.  
  175. # 测试嵌套参数
  176. SimpleLogger.debug("调试信息", {"key": [1, 2, 3]}, timeout=30.5)
Success #stdin #stdout #stderr 0.13s 68628KB
stdin
Standard input is empty
stdout
Standard output is empty
stderr
Log Error: 'ascii' codec can't encode characters in position 0-3: ordinal not in range(128)
⚠️ Format Error | Err: 'LogRecord' object has no attribute 'message' | Raw: 配置错误
⚠️ Format Error | Err: 'LogRecord' object has no attribute 'message' | Raw: 无效数据流
⚠️ Format Error | Err: 'LogRecord' object has no attribute 'message' | Raw: 调试信息