# -*- coding: utf-8 -*-
import logging
import sys
import inspect
import threading
from datetime import datetime
class OptimizedFormatter(logging.Formatter):
def __init__(self):
super(OptimizedFormatter, self).__init__()
self._frame_cache = {} # 增加帧缓存优化性能
def format(self, record):
try:
# 带缓存的智能帧捕获
if not hasattr(record, 'module'):
cache_key = (record.filename, record.lineno)
if cache_key not in self._frame_cache:
self._frame_cache[cache_key] = self._find_caller_frame()
frame = self._frame_cache[cache_key]
if frame:
module = inspect.getmodule(frame)
record.module = module.__name__ if module else 'unknown'
record.lineno = frame.f_lineno
# 统一处理参数(修复键名冲突)
args_str = self._process_args(record)
if args_str:
record.msg = u"%s ║ %s" % (record.msg, args_str)
# 优化时间戳生成
record.asctime = datetime.fromtimestamp(record.created).strftime('%H:%M:%S')
return super(OptimizedFormatter, self).format(record)
except Exception as e:
return u"⚠️ Format Error | Err: %s | Raw: %s" % (e, record.getMessage())
def _find_caller_frame(self):
"""智能跳过日志类调用栈"""
stack = inspect.stack()
for frame_info in stack:
frame = frame_info[0]
module = inspect.getmodule(frame)
if module and module.__name__ not in (__name__, 'logging'):
return frame
return inspect.currentframe()
def _process_args(self, record):
"""增强参数处理能力"""
parts = []
# 处理位置参数
if hasattr(record, 'pos_args') and record.pos_args:
parts.extend(self._safe_repr(a) for a in record.pos_args)
# 处理关键字参数
if hasattr(record, 'kwargs') and record.kwargs:
parts.extend(u"%s=%s" % (k, self._safe_repr(v))
for k, v in record.kwargs.iteritems())
return u" ║ ".join(parts) if parts else u""
def _safe_repr(self, value):
"""增强复杂类型处理"""
try:
if isinstance(value, dict):
return u"{%s}" % u", ".join(u"%s: %s" % (self._safe_repr(k), self._safe_repr(v))
for k, v in value.iteritems())
elif isinstance(value, (list, tuple)):
return u"[%s]" % u", ".join(self._safe_repr(x) for x in value)
return self._safe_str(value)
except:
return u"<Unprintable>"
@staticmethod
def _safe_str(obj):
"""增强编码安全处理"""
try:
if isinstance(obj, str):
return obj.decode('utf-8', 'replace')
return unicode(obj)
except UnicodeDecodeError:
return obj.decode('latin-1', 'replace')
except:
return u"<Unrepresentable>"
class SimpleLogger(object):
_initialized = False
_lock = threading.Lock() # 增加线程锁
@classmethod
def _ensure_init(cls):
"""线程安全的初始化"""
with cls._lock:
if not cls._initialized:
root = logging.getLogger()
root.setLevel(logging.DEBUG)
if not root.handlers:
handler = logging.StreamHandler()
handler.setFormatter(OptimizedFormatter())
root.addHandler(handler)
cls._initialized = True
@classmethod
def _log(cls, level, msg, *args, **kwargs):
"""增强容错处理"""
cls._ensure_init()
try:
# 预处理消息(强制Unicode)
msg = cls._preprocess_message(msg)
# 智能参数格式化
formatted_msg = cls._format_message(msg, args)
# 获取调用者信息(优化帧获取方式)
frame = inspect.currentframe().f_back.f_back
logger_name = 'unknown'
if frame:
module = inspect.getmodule(frame)
logger_name = module.__name__ if module else 'unknown'
# 创建日志记录(分离位置参数和关键字参数)
logger = logging.getLogger(logger_name)
logger.log(
level,
formatted_msg,
extra={'pos_args': args, 'kwargs': kwargs}, # 分离参数类型
exc_info=kwargs.get('exc_info')
)
except Exception as e:
sys.stderr.write(u"Log Error: %s\n" % unicode(e))
@staticmethod
def _preprocess_message(msg):
"""多编码自动检测"""
if isinstance(msg, str):
for encoding in ('utf-8', 'gbk', 'latin-1'):
try:
return msg.decode(encoding)
except UnicodeDecodeError:
continue
return msg.decode('utf-8', 'replace')
return unicode(msg)
@classmethod
def _format_message(cls, msg, args):
"""混合参数格式化(增强容错)"""
# 尝试传统格式化
if args and '%' in msg:
try:
return msg % args
except (TypeError, ValueError):
pass
# 自动拼接模式
if args:
return u" ".join([msg] + [OptimizedFormatter._safe_str(a) for a in args])
return msg
@classmethod
def debug(cls, msg, *args, **kwargs):
cls._log(logging.DEBUG, msg, *args, **kwargs)
@classmethod
def info(cls, msg, *args, **kwargs):
cls._log(logging.INFO, msg, *args, **kwargs)
@classmethod
def warning(cls, msg, *args, **kwargs):
cls._log(logging.WARNING, msg, *args, **kwargs)
@classmethod
def error(cls, msg, *args, **kwargs):
cls._log(logging.ERROR, msg, *args, **kwargs)
# 测试用例
if __name__ == "__main__":
# 测试中文日志
SimpleLogger.info(u"用户登录", u"张三", ip="192.168.1.100")
# 测试混合参数
SimpleLogger.error("错误发生在 %s", u"配置文件", code=500, detail={"line": 42})
# 测试二进制数据
bad_data = '\xe6\x97\xa0' # UTF-8编码的"无"
SimpleLogger.warning("无效数据", bad_data)
# 测试多线程
import thread
def log_thread():
SimpleLogger.debug("多线程测试", thread_id=thread.get_ident())
for _ in range(3):
thread.start_new_thread(log_thread, ())