fork download
  1. # -*- coding: utf-8 -*-
  2. import logging
  3. import sys
  4. import inspect
  5. from datetime import datetime
  6.  
  7.  
  8. class OptimizedFormatter(logging.Formatter):
  9. def format(self, record):
  10. try:
  11. # 获取调用者信息
  12. if not hasattr(record, 'module') or not hasattr(record, 'funcName'):
  13. frame = self._cached_caller_frame()
  14. module = inspect.getmodule(frame)
  15. record.module = module.__name__ if module else 'unknown'
  16. record.funcName = frame.f_code.co_name # 添加获取函数名的逻辑
  17.  
  18. # 参数统一处理
  19. args_str = self._process_args(record)
  20. if args_str:
  21. record.msg = u"%s ⎸ %s" % (record.msg, args_str)
  22.  
  23. return super(OptimizedFormatter, self).format(record)
  24. except Exception as e:
  25. return u"⚠️ Format Error | Err: %s | Raw: %s" % (e, record.getMessage())
  26.  
  27. def _cached_caller_frame(self):
  28. """带缓存优化的帧查找"""
  29. stack = inspect.stack()
  30. for frame_info in reversed(stack):
  31. frame = frame_info[0]
  32. if inspect.getmodule(frame).__name__ != __name__:
  33. return frame
  34. return inspect.currentframe()
  35.  
  36. def _process_args(self, record):
  37. """处理所有参数类型"""
  38. parts = []
  39.  
  40. if hasattr(record, 'positional_args') and record.positional_args:
  41. parts.extend(self._safe_repr(a) for a in record.positional_args)
  42.  
  43. if hasattr(record, 'extra') and record.extra:
  44. parts.extend(u"%s=%s" % (k, self._safe_repr(v))
  45. for k, v in record.extra.iteritems())
  46.  
  47. return u" ⎸ ".join(parts) if parts else u""
  48.  
  49. def _safe_repr(self, value):
  50. """安全类型转换(增强编码处理)"""
  51. try:
  52. if isinstance(value, dict):
  53. return u"{%s}" % u",".join(u"%s:%s" % (self._safe_repr(k), self._safe_repr(v))
  54. for k, v in value.iteritems())
  55. elif isinstance(value, (list, tuple)):
  56. return u"[%s]" % u",".join(self._safe_repr(x) for x in value)
  57. return self._safe_str(value)
  58. except:
  59. return u""
  60.  
  61. @staticmethod
  62. def _safe_str(obj):
  63. """安全字符串转换(解决编码问题)"""
  64. try:
  65. if isinstance(obj, str):
  66. return obj.decode('utf-8', 'replace')
  67. return unicode(obj)
  68. except UnicodeDecodeError:
  69. return u"<非UTF-8字节数据>"
  70. except:
  71. return u"<无法转换对象>"
  72.  
  73.  
  74. class SimpleLogger(object):
  75. _initialized = False
  76.  
  77. @classmethod
  78. def _ensure_init(cls):
  79. if not cls._initialized:
  80. root = logging.getLogger()
  81. root.setLevel(logging.DEBUG)
  82. if not root.handlers:
  83. handler = logging.StreamHandler()
  84. handler.setFormatter(OptimizedFormatter(
  85. fmt=u'[%(asctime)s] [%(levelname)s] %(module)s:%(funcName)s:%(lineno)d ➤ %(message)s',
  86. datefmt='%H:%M:%S'
  87. ))
  88. root.addHandler(handler)
  89. cls._initialized = True
  90.  
  91. @classmethod
  92. def _log(cls, level, msg, *args, **kwargs):
  93. cls._ensure_init()
  94. try:
  95. msg = cls._preprocess_message(msg)
  96. formatted_msg = cls._format_message(msg, args)
  97.  
  98. logger = logging.getLogger()
  99. logger.log(
  100. level,
  101. formatted_msg,
  102. extra={'extra': kwargs, 'positional_args': args}
  103. )
  104. except Exception as e:
  105. sys.stderr.write(u"Log Error: %s\n" % unicode(e))
  106.  
  107. @staticmethod
  108. def _preprocess_message(msg):
  109. if isinstance(msg, str):
  110. try:
  111. return msg.decode('utf-8')
  112. except UnicodeDecodeError:
  113. return msg.decode('latin-1', 'replace')
  114. return unicode(msg)
  115.  
  116. @classmethod
  117. def _format_message(cls, msg, args):
  118. try:
  119. if args and '%' in msg:
  120. return msg % args
  121. except (TypeError, ValueError):
  122. pass
  123.  
  124. if args:
  125. return u" ".join([msg] + [OptimizedFormatter._safe_str(a) for a in args])
  126. return msg
  127.  
  128. @classmethod
  129. def debug(cls, msg, *args, **kwargs):
  130. cls._log(logging.DEBUG, msg, *args, **kwargs)
  131.  
  132. @classmethod
  133. def info(cls, msg, *args, **kwargs):
  134. cls._log(logging.INFO, msg, *args, **kwargs)
  135.  
  136. @classmethod
  137. def warning(cls, msg, *args, **kwargs):
  138. cls._log(logging.WARNING, msg, *args, **kwargs)
  139.  
  140. @classmethod
  141. def error(cls, msg, *args, **kwargs):
  142. cls._log(logging.ERROR, msg, *args, **kwargs)
  143.  
  144.  
  145. if __name__ == "__main__":
  146. # 测试中文日志
  147. SimpleLogger.info(u"用户登录", u"张三", ip="192.168.1.100")
  148.  
  149. # 测试混合参数
  150. SimpleLogger.error("错误发生在 %s", u"配置文件", code=500, detail={"line": 42})
  151.  
  152. # 测试二进制数据
  153. bad_data = '\xe6\x97\xa0' # UTF-8编码的"无"
  154. SimpleLogger.warning("无效数据", bad_data)
Success #stdin #stdout #stderr 0.08s 67648KB
stdin
Standard input is empty
stdout
Standard output is empty
stderr
[07:22:12] [INFO] prog:_log:102 ➤ 用户登录 张三 ⎸ 张三 ⎸ ip=192.168.1.100
[07:22:12] [ERROR] prog:_log:102 ➤ 错误发生在 配置文件 ⎸ 配置文件 ⎸ code=500 ⎸ detail={line:42}
[07:22:12] [WARNING] prog:_log:102 ➤ 无效数据 无 ⎸ 无