# -*- coding: utf-8 -*-
import logging
import sys
import inspect
import threading
import time
import datetime
class OptimizedFormatter(logging.Formatter):
def __init__(self, fmt=None, datefmt=None):
default_fmt = u'[%(asctime)s] [%(levelname)s] %(module)s:%(lineno)d ➤ %(message)s'
default_datefmt = '%H:%M:%S'
super(OptimizedFormatter, self).__init__(
fmt=fmt or default_fmt,
datefmt=datefmt or default_datefmt
)
self._frame_cache = {}
def format(self, record):
try:
# 修复函数名获取逻辑
if not hasattr(record, 'funcName'):
frame = self._find_caller_frame()
if frame:
record.funcName = frame.f_code.co_name
# 合并参数处理逻辑(修复重复输出)
args_str = self._process_args(record)
if args_str:
record.msg = u"%s | %s" % (record.msg, args_str)
return super(OptimizedFormatter, self).format(record)
except Exception as e:
return u"⚠️ Format Error: %s | Raw: %s" % (e, record.getMessage())
def _find_caller_frame(self):
"""优化调用栈追踪逻辑"""
stack = inspect.stack()
for frame_info in stack:
frame = frame_info[0]
# 跳过日志类自身的调用栈
if self._is_user_frame(frame):
return frame
return None
def _is_user_frame(self, frame):
"""判断是否为用户代码帧"""
module = inspect.getmodule(frame)
if not module:
return False
return not (
module.__name__.startswith('logging') or
module.__name__ == __name__
)
def _process_args(self, record):
"""合并参数处理逻辑"""
parts = []
# 统一处理所有参数
if hasattr(record, 'args') and record.args:
if isinstance(record.args, dict):
parts.extend(u"%s=%s" % (k, self._safe_repr(v))
for k, v in record.args.iteritems())
else:
parts.extend(self._safe_repr(a) for a in record.args)
return u" | ".join(parts)
class SimpleLog(object):
_initialized = False
_lock = threading.Lock()
@classmethod
def _ensure_init(cls):
with cls._lock:
if not cls._initialized:
root = logging.getLogger()
root.setLevel(logging.DEBUG)
if not root.handlers:
handler = logging.StreamHandler()
handler.setFormatter(OptimizedFormatter())
root.addHandler(handler)
cls._initialized = True
@classmethod
def info(cls, msg, *args, **kwargs):
cls._log(logging.INFO, msg, args, kwargs)
@classmethod
def _log(cls, level, msg, args, kwargs):
cls._ensure_init()
try:
frame = inspect.currentframe().f_back.f_back
logger = logging.getLogger(
inspect.getmodule(frame).__name__
if inspect.getmodule(frame) else 'unknown'
)
# 合并参数并记录函数名
merged_args = dict(kwargs)
if args:
merged_args.update({'__positional_args__': args})
# 自动捕获函数名
func_name = frame.f_code.co_name if frame else 'unknown'
merged_args['Func'] = func_name
logger.log(
level,
msg,
args=merged_args, # 使用统一参数字段
extra={'funcName': func_name}
)
except Exception as e:
sys.stderr.write(u"Log Error: %s\n" % unicode(e))
def get_startend_stamp():
current_time = datetime.datetime.now().replace(minute=0, second=0, microsecond=0)
two_days_before = current_time - datetime.timedelta(days=2)
two_days_before_timestamp = int(time.mktime(two_days_before.timetuple()))
SimpleLog.info(
"zheshishuchu",
tt=two_days_before_timestamp,
additional_info={"source": "get_startend_stamp"}
)
return two_days_before_timestamp
if __name__ == "__main__":
get_startend_stamp()