fork download
  1. # -*- coding: utf-8 -*-
  2. import logging
  3. import sys
  4. import inspect
  5. import threading
  6. import time
  7. import datetime
  8.  
  9. class OptimizedFormatter(logging.Formatter):
  10. def __init__(self, fmt=None, datefmt=None):
  11. default_fmt = u'[%(asctime)s] [%(levelname)s] %(module)s:%(lineno)d ➤ %(message)s'
  12. default_datefmt = '%H:%M:%S'
  13. super(OptimizedFormatter, self).__init__(
  14. fmt=fmt or default_fmt,
  15. datefmt=datefmt or default_datefmt
  16. )
  17. self._frame_cache = {}
  18.  
  19. def format(self, record):
  20. try:
  21. # 修复函数名获取逻辑
  22. if not hasattr(record, 'funcName'):
  23. frame = self._find_caller_frame()
  24. if frame:
  25. record.funcName = frame.f_code.co_name
  26.  
  27. # 合并参数处理逻辑(修复重复输出)
  28. args_str = self._process_args(record)
  29. if args_str:
  30. record.msg = u"%s | %s" % (record.msg, args_str)
  31.  
  32. return super(OptimizedFormatter, self).format(record)
  33. except Exception as e:
  34. return u"⚠️ Format Error: %s | Raw: %s" % (e, record.getMessage())
  35.  
  36. def _find_caller_frame(self):
  37. """优化调用栈追踪逻辑"""
  38. stack = inspect.stack()
  39. for frame_info in stack:
  40. frame = frame_info[0]
  41. # 跳过日志类自身的调用栈
  42. if self._is_user_frame(frame):
  43. return frame
  44. return None
  45.  
  46. def _is_user_frame(self, frame):
  47. """判断是否为用户代码帧"""
  48. module = inspect.getmodule(frame)
  49. if not module:
  50. return False
  51. return not (
  52. module.__name__.startswith('logging') or
  53. module.__name__ == __name__
  54. )
  55.  
  56. def _process_args(self, record):
  57. """合并参数处理逻辑"""
  58. parts = []
  59. # 统一处理所有参数
  60. if hasattr(record, 'args') and record.args:
  61. if isinstance(record.args, dict):
  62. parts.extend(u"%s=%s" % (k, self._safe_repr(v))
  63. for k, v in record.args.iteritems())
  64. else:
  65. parts.extend(self._safe_repr(a) for a in record.args)
  66. return u" | ".join(parts)
  67.  
  68. class SimpleLog(object):
  69. _initialized = False
  70. _lock = threading.Lock()
  71.  
  72. @classmethod
  73. def _ensure_init(cls):
  74. with cls._lock:
  75. if not cls._initialized:
  76. root = logging.getLogger()
  77. root.setLevel(logging.DEBUG)
  78. if not root.handlers:
  79. handler = logging.StreamHandler()
  80. handler.setFormatter(OptimizedFormatter())
  81. root.addHandler(handler)
  82. cls._initialized = True
  83.  
  84. @classmethod
  85. def info(cls, msg, *args, **kwargs):
  86. cls._log(logging.INFO, msg, args, kwargs)
  87.  
  88. @classmethod
  89. def _log(cls, level, msg, args, kwargs):
  90. cls._ensure_init()
  91. try:
  92. frame = inspect.currentframe().f_back.f_back
  93. logger = logging.getLogger(
  94. inspect.getmodule(frame).__name__
  95. if inspect.getmodule(frame) else 'unknown'
  96. )
  97.  
  98. # 合并参数并记录函数名
  99. merged_args = dict(kwargs)
  100. if args:
  101. merged_args.update({'__positional_args__': args})
  102.  
  103. # 自动捕获函数名
  104. func_name = frame.f_code.co_name if frame else 'unknown'
  105. merged_args['Func'] = func_name
  106.  
  107. logger.log(
  108. level,
  109. msg,
  110. args=merged_args, # 使用统一参数字段
  111. extra={'funcName': func_name}
  112. )
  113. except Exception as e:
  114. sys.stderr.write(u"Log Error: %s\n" % unicode(e))
  115.  
  116. def get_startend_stamp():
  117. current_time = datetime.datetime.now().replace(minute=0, second=0, microsecond=0)
  118. two_days_before = current_time - datetime.timedelta(days=2)
  119. two_days_before_timestamp = int(time.mktime(two_days_before.timetuple()))
  120.  
  121. SimpleLog.info(
  122. "zheshishuchu",
  123. tt=two_days_before_timestamp,
  124. additional_info={"source": "get_startend_stamp"}
  125. )
  126. return two_days_before_timestamp
  127.  
  128. if __name__ == "__main__":
  129. get_startend_stamp()
Success #stdin #stdout #stderr 0.13s 68596KB
stdin
Standard input is empty
stdout
Standard output is empty
stderr
Log Error: _log() got multiple values for keyword argument 'args'