fork download
  1. # -*- coding: utf-8 -*-
  2. import logging
  3. import sys
  4. import inspect
  5. import threading
  6. from datetime import datetime
  7.  
  8. class OptimizedFormatter(logging.Formatter):
  9. def __init__(self):
  10. super(OptimizedFormatter, self).__init__()
  11. self._frame_cache = {} # 增加帧缓存优化性能
  12.  
  13. def format(self, record):
  14. try:
  15. # 带缓存的智能帧捕获
  16. if not hasattr(record, 'module'):
  17. cache_key = (record.filename, record.lineno)
  18. if cache_key not in self._frame_cache:
  19. self._frame_cache[cache_key] = self._find_caller_frame()
  20. frame = self._frame_cache[cache_key]
  21.  
  22. if frame:
  23. module = inspect.getmodule(frame)
  24. record.module = module.__name__ if module else 'unknown'
  25. record.lineno = frame.f_lineno
  26.  
  27. # 统一处理参数(修复键名冲突)
  28. args_str = self._process_args(record)
  29. if args_str:
  30. record.msg = u"%s ║ %s" % (record.msg, args_str)
  31.  
  32. # 优化时间戳生成
  33. record.asctime = datetime.fromtimestamp(record.created).strftime('%H:%M:%S')
  34. return super(OptimizedFormatter, self).format(record)
  35. except Exception as e:
  36. return u"⚠️ Format Error | Err: %s | Raw: %s" % (e, record.getMessage())
  37.  
  38. def _find_caller_frame(self):
  39. """智能跳过日志类调用栈"""
  40. stack = inspect.stack()
  41. for frame_info in stack:
  42. frame = frame_info[0]
  43. module = inspect.getmodule(frame)
  44. if module and module.__name__ not in (__name__, 'logging'):
  45. return frame
  46. return inspect.currentframe()
  47.  
  48. def _process_args(self, record):
  49. """增强参数处理能力"""
  50. parts = []
  51.  
  52. # 处理位置参数
  53. if hasattr(record, 'pos_args') and record.pos_args:
  54. parts.extend(self._safe_repr(a) for a in record.pos_args)
  55.  
  56. # 处理关键字参数
  57. if hasattr(record, 'kwargs') and record.kwargs:
  58. parts.extend(u"%s=%s" % (k, self._safe_repr(v))
  59. for k, v in record.kwargs.iteritems())
  60.  
  61. return u" ║ ".join(parts) if parts else u""
  62.  
  63. def _safe_repr(self, value):
  64. """增强复杂类型处理"""
  65. try:
  66. if isinstance(value, dict):
  67. return u"{%s}" % u", ".join(u"%s: %s" % (self._safe_repr(k), self._safe_repr(v))
  68. for k, v in value.iteritems())
  69. elif isinstance(value, (list, tuple)):
  70. return u"[%s]" % u", ".join(self._safe_repr(x) for x in value)
  71. return self._safe_str(value)
  72. except:
  73. return u"<Unprintable>"
  74.  
  75. @staticmethod
  76. def _safe_str(obj):
  77. """增强编码安全处理"""
  78. try:
  79. if isinstance(obj, str):
  80. return obj.decode('utf-8', 'replace')
  81. return unicode(obj)
  82. except UnicodeDecodeError:
  83. return obj.decode('latin-1', 'replace')
  84. except:
  85. return u"<Unrepresentable>"
  86.  
  87. class SimpleLogger(object):
  88. _initialized = False
  89. _lock = threading.Lock() # 增加线程锁
  90.  
  91. @classmethod
  92. def _ensure_init(cls):
  93. """线程安全的初始化"""
  94. with cls._lock:
  95. if not cls._initialized:
  96. root = logging.getLogger()
  97. root.setLevel(logging.DEBUG)
  98.  
  99. if not root.handlers:
  100. handler = logging.StreamHandler()
  101. handler.setFormatter(OptimizedFormatter())
  102. root.addHandler(handler)
  103.  
  104. cls._initialized = True
  105.  
  106. @classmethod
  107. def _log(cls, level, msg, *args, **kwargs):
  108. """增强容错处理"""
  109. cls._ensure_init()
  110. try:
  111. # 预处理消息(强制Unicode)
  112. msg = cls._preprocess_message(msg)
  113.  
  114. # 智能参数格式化
  115. formatted_msg = cls._format_message(msg, args)
  116.  
  117. # 获取调用者信息(优化帧获取方式)
  118. frame = inspect.currentframe().f_back.f_back
  119. logger_name = 'unknown'
  120. if frame:
  121. module = inspect.getmodule(frame)
  122. logger_name = module.__name__ if module else 'unknown'
  123.  
  124. # 创建日志记录(分离位置参数和关键字参数)
  125. logger = logging.getLogger(logger_name)
  126. logger.log(
  127. level,
  128. formatted_msg,
  129. extra={'pos_args': args, 'kwargs': kwargs}, # 分离参数类型
  130. exc_info=kwargs.get('exc_info')
  131. )
  132. except Exception as e:
  133. sys.stderr.write(u"Log Error: %s\n" % unicode(e))
  134.  
  135. @staticmethod
  136. def _preprocess_message(msg):
  137. """多编码自动检测"""
  138. if isinstance(msg, str):
  139. for encoding in ('utf-8', 'gbk', 'latin-1'):
  140. try:
  141. return msg.decode(encoding)
  142. except UnicodeDecodeError:
  143. continue
  144. return msg.decode('utf-8', 'replace')
  145. return unicode(msg)
  146.  
  147. @classmethod
  148. def _format_message(cls, msg, args):
  149. """混合参数格式化(增强容错)"""
  150. # 尝试传统格式化
  151. if args and '%' in msg:
  152. try:
  153. return msg % args
  154. except (TypeError, ValueError):
  155. pass
  156.  
  157. # 自动拼接模式
  158. if args:
  159. return u" ".join([msg] + [OptimizedFormatter._safe_str(a) for a in args])
  160. return msg
  161.  
  162. @classmethod
  163. def debug(cls, msg, *args, **kwargs):
  164. cls._log(logging.DEBUG, msg, *args, **kwargs)
  165.  
  166. @classmethod
  167. def info(cls, msg, *args, **kwargs):
  168. cls._log(logging.INFO, msg, *args, **kwargs)
  169.  
  170. @classmethod
  171. def warning(cls, msg, *args, **kwargs):
  172. cls._log(logging.WARNING, msg, *args, **kwargs)
  173.  
  174. @classmethod
  175. def error(cls, msg, *args, **kwargs):
  176. cls._log(logging.ERROR, msg, *args, **kwargs)
  177.  
  178. # 测试用例
  179. if __name__ == "__main__":
  180. # 测试中文日志
  181. SimpleLogger.info(u"用户登录", u"张三", ip="192.168.1.100")
  182.  
  183. # 测试混合参数
  184. SimpleLogger.error("错误发生在 %s", u"配置文件", code=500, detail={"line": 42})
  185.  
  186. # 测试二进制数据
  187. bad_data = '\xe6\x97\xa0' # UTF-8编码的"无"
  188. SimpleLogger.warning("无效数据", bad_data)
  189.  
  190. # 测试多线程
  191. import thread
  192. def log_thread():
  193. SimpleLogger.debug("多线程测试", thread_id=thread.get_ident())
  194.  
  195. for _ in range(3):
  196. thread.start_new_thread(log_thread, ())
Success #stdin #stdout #stderr 0.1s 68584KB
stdin
Standard input is empty
stdout
Standard output is empty
stderr
用户登录 张三 ║ 张三 ║ ip=192.168.1.100
错误发生在 配置文件 ║ 配置文件 ║ code=500 ║ detail={line: 42}
无效数据 无 ║ 无
多线程测试 ║ thread_id=22593137530624
多线程测试 ║ thread_id=22593132799744