# -*- coding: utf-8 -*-
import logging
import sys
import inspect
# from functools import lru_cache
from datetime import datetime
class OptimizedFormatter( logging .Formatter ) :
def __init__ ( self , *args, **kwargs) :
super ( OptimizedFormatter, self ) .__init__ ( *args, **kwargs)
self ._frame_blacklist = { id ( inspect .currentframe ( ) ) }
def format( self , record) :
try :
# 智能帧信息捕获
if not all ( hasattr ( record, attr) for attr in [ 'module' , 'lineno' , 'funcName' ] ) :
frame = self ._find_caller_frame( )
if frame:
self ._populate_record( record, frame)
# 统一参数处理
args_str = self ._process_arguments( record)
if args_str:
record.message = u"%s ⎸ %s" % ( record.message , args_str)
else :
record.message = record.getMessage ( )
return super ( OptimizedFormatter, self ) .format ( record)
except Exception as e:
return u"⚠️ Format Error | Err: %s | Raw: %s" % ( e, record.getMessage ( ) )
# @lru_cache(maxsize=128)
def _find_caller_frame( self ) :
"""带缓存的调用帧查找"""
stack = inspect .stack ( )
for frame_info in reversed ( stack) :
frame = frame_info[ 0 ]
if id ( frame) not in self ._frame_blacklist and \
inspect .getmodule ( frame) .__name__ != __name__:
return frame
return inspect .currentframe ( )
def _populate_record( self , record, frame) :
"""填充记录元数据"""
module = inspect .getmodule ( frame)
record.module = module.__name__ if module else 'unknown'
record.lineno = frame.f_lineno
record.funcName = frame.f_code .co_name
def _process_arguments( self , record) :
"""统一参数处理"""
parts = [ ]
# 处理位置参数
if hasattr ( record, 'positional_args' ) and record.positional_args :
parts.extend ( self ._safe_repr( a) for a in record.positional_args )
# 处理关键字参数
if hasattr ( record, 'extra_kwargs' ) and record.extra_kwargs :
parts.extend ( u"%s=%s" % ( k, self ._safe_repr( v) )
for k, v in record.extra_kwargs .items ( ) )
return u" ⎸ " .join ( parts) if parts else u""
def _safe_repr( self , value) :
"""安全类型表示"""
try :
if isinstance ( value, dict ) :
return u"{%s}" % u"," .join ( u"%s:%s" % ( self ._safe_repr( k) , self ._safe_repr( v) )
for k, v in value.items ( ) )
if isinstance ( value, ( list , tuple ) ) :
seq_type = u"[]" if isinstance ( value, list ) else u"()"
return u"%s%s%s" % ( seq_type[ 0 ] , u"," .join ( self ._safe_repr( x) for x in value) , seq_type[ 1 ] )
return self ._safe_str( value)
except :
return u"<Unrepresentable>"
@ staticmethod
def _safe_str( obj) :
"""安全字符串转换"""
try :
if isinstance ( obj, bytes ) :
return obj.decode ( 'utf-8' , 'replace' )
return str ( obj)
except UnicodeDecodeError :
return u"<Non-UTF8 Binary Data>"
except :
return u"<Unconvertible Object>"
class SimpleLogger( object ) :
_initialized = False
@ classmethod
def _ensure_init( cls) :
if not cls._initialized:
root = logging .getLogger ( )
root.setLevel ( logging .DEBUG )
if not root.handlers :
handler = logging .StreamHandler ( )
handler.setFormatter ( OptimizedFormatter(
fmt= u'[%(asctime)s] [%(levelname)s] %(module)s:%(lineno)d ➤ [%(funcName)s] %(message)s' ,
datefmt= '%H:%M:%S'
) )
root.addHandler ( handler)
cls._initialized = True
@ classmethod
def _log( cls, level, msg, *args, **kwargs) :
cls._ensure_init( )
try :
# 消息预处理
msg = cls._preprocess_message( msg)
# 获取调用上下文
frame = inspect .currentframe ( ) .f_back .f_back
logger = logging .getLogger (
inspect .getmodule ( frame) .__name__
if inspect .getmodule ( frame) else 'unknown'
)
# 创建日志记录
logger.log (
level,
msg,
extra= {
'positional_args' : args,
'extra_kwargs' : kwargs
} ,
# stacklevel=3 # Python 3.8+ 支持
)
except Exception as e:
sys .stderr .write ( u"Log Error: %s\n " % str ( e) )
@ staticmethod
def _preprocess_message( msg) :
"""消息预处理"""
if isinstance ( msg, bytes ) :
try :
return msg.decode ( 'utf-8' )
except UnicodeDecodeError :
return msg.decode ( 'latin-1' , 'replace' )
return str ( msg)
@ classmethod
def debug( cls, msg, *args, **kwargs) :
cls._log( logging .DEBUG , msg, *args, **kwargs)
@ classmethod
def info( cls, msg, *args, **kwargs) :
cls._log( logging .INFO , msg, *args, **kwargs)
@ classmethod
def warning( cls, msg, *args, **kwargs) :
cls._log( logging .WARNING , msg, *args, **kwargs)
@ classmethod
def error( cls, msg, *args, **kwargs) :
cls._log( logging .ERROR , msg, *args, **kwargs)
# 测试用例
if __name__ == "__main__" :
# 测试中文日志
SimpleLogger.info ( u"用户登录" , u"张三" , ip= "192.168.1.100" )
# 测试混合参数
SimpleLogger.error ( "配置错误" , "database" , code = 500 , detail= { "line" : 42 , "file" : "app.conf" } )
# 测试二进制数据
bad_data = b'\x e6\x 97\x a0' # UTF-8编码的"无"
SimpleLogger.warning ( "无效数据流" , bad_data, sector= [ 0x12 , 0xff , 0x7f ] )
# 测试嵌套参数
SimpleLogger.debug ( "调试信息" , { "key" : [ 1 , 2 , 3 ] } , timeout= 30.5 )
# -*- coding: utf-8 -*-
import logging
import sys
import inspect
# from functools import lru_cache
from datetime import datetime


class OptimizedFormatter(logging.Formatter):
    def __init__(self, *args, **kwargs):
        super(OptimizedFormatter, self).__init__(*args, **kwargs)
        self._frame_blacklist = {id(inspect.currentframe())}

    def format(self, record):
        try:
            # 智能帧信息捕获
            if not all(hasattr(record, attr) for attr in ['module', 'lineno', 'funcName']):
                frame = self._find_caller_frame()
                if frame:
                    self._populate_record(record, frame)

            # 统一参数处理
            args_str = self._process_arguments(record)
            if args_str:
                record.message = u"%s ⎸ %s" % (record.message, args_str)
            else:
                record.message = record.getMessage()

            return super(OptimizedFormatter, self).format(record)
        except Exception as e:
            return u"⚠️ Format Error | Err: %s | Raw: %s" % (e, record.getMessage())

    # @lru_cache(maxsize=128)
    def _find_caller_frame(self):
        """带缓存的调用帧查找"""
        stack = inspect.stack()
        for frame_info in reversed(stack):
            frame = frame_info[0]
            if id(frame) not in self._frame_blacklist and \
               inspect.getmodule(frame).__name__ != __name__:
                return frame
        return inspect.currentframe()

    def _populate_record(self, record, frame):
        """填充记录元数据"""
        module = inspect.getmodule(frame)
        record.module = module.__name__ if module else 'unknown'
        record.lineno = frame.f_lineno
        record.funcName = frame.f_code.co_name

    def _process_arguments(self, record):
        """统一参数处理"""
        parts = []
        
        # 处理位置参数
        if hasattr(record, 'positional_args') and record.positional_args:
            parts.extend(self._safe_repr(a) for a in record.positional_args)
        
        # 处理关键字参数
        if hasattr(record, 'extra_kwargs') and record.extra_kwargs:
            parts.extend(u"%s=%s" % (k, self._safe_repr(v)) 
                       for k, v in record.extra_kwargs.items())
        
        return u" ⎸ ".join(parts) if parts else u""

    def _safe_repr(self, value):
        """安全类型表示"""
        try:
            if isinstance(value, dict):
                return u"{%s}" % u",".join(u"%s:%s" % (self._safe_repr(k), self._safe_repr(v)) 
                                      for k, v in value.items())
            if isinstance(value, (list, tuple)):
                seq_type = u"[]" if isinstance(value, list) else u"()"
                return u"%s%s%s" % (seq_type[0], u",".join(self._safe_repr(x) for x in value), seq_type[1])
            return self._safe_str(value)
        except:
            return u"<Unrepresentable>"

    @staticmethod
    def _safe_str(obj):
        """安全字符串转换"""
        try:
            if isinstance(obj, bytes):
                return obj.decode('utf-8', 'replace')
            return str(obj)
        except UnicodeDecodeError:
            return u"<Non-UTF8 Binary Data>"
        except:
            return u"<Unconvertible Object>"


class SimpleLogger(object):
    _initialized = False

    @classmethod
    def _ensure_init(cls):
        if not cls._initialized:
            root = logging.getLogger()
            root.setLevel(logging.DEBUG)
            if not root.handlers:
                handler = logging.StreamHandler()
                handler.setFormatter(OptimizedFormatter(
                    fmt=u'[%(asctime)s] [%(levelname)s] %(module)s:%(lineno)d ➤ [%(funcName)s] %(message)s',
                    datefmt='%H:%M:%S'
                ))
                root.addHandler(handler)
            cls._initialized = True

    @classmethod
    def _log(cls, level, msg, *args, **kwargs):
        cls._ensure_init()
        try:
            # 消息预处理
            msg = cls._preprocess_message(msg)
            
            # 获取调用上下文
            frame = inspect.currentframe().f_back.f_back
            logger = logging.getLogger(
                inspect.getmodule(frame).__name__ 
                if inspect.getmodule(frame) else 'unknown'
            )

            # 创建日志记录
            logger.log(
                level,
                msg,
                extra={
                    'positional_args': args,
                    'extra_kwargs': kwargs
                },
                # stacklevel=3  # Python 3.8+ 支持
            )
        except Exception as e:
            sys.stderr.write(u"Log Error: %s\n" % str(e))

    @staticmethod
    def _preprocess_message(msg):
        """消息预处理"""
        if isinstance(msg, bytes):
            try:
                return msg.decode('utf-8')
            except UnicodeDecodeError:
                return msg.decode('latin-1', 'replace')
        return str(msg)

    @classmethod
    def debug(cls, msg, *args, **kwargs):
        cls._log(logging.DEBUG, msg, *args, **kwargs)

    @classmethod
    def info(cls, msg, *args, **kwargs):
        cls._log(logging.INFO, msg, *args, **kwargs)

    @classmethod
    def warning(cls, msg, *args, **kwargs):
        cls._log(logging.WARNING, msg, *args, **kwargs)

    @classmethod
    def error(cls, msg, *args, **kwargs):
        cls._log(logging.ERROR, msg, *args, **kwargs)


# 测试用例
if __name__ == "__main__":
    # 测试中文日志
    SimpleLogger.info(u"用户登录", u"张三", ip="192.168.1.100")
    
    # 测试混合参数
    SimpleLogger.error("配置错误", "database", code=500, detail={"line": 42, "file": "app.conf"})
    
    # 测试二进制数据
    bad_data = b'\xe6\x97\xa0'  # UTF-8编码的"无"
    SimpleLogger.warning("无效数据流", bad_data, sector=[0x12, 0xff, 0x7f])
    
    # 测试嵌套参数
    SimpleLogger.debug("调试信息", {"key": [1, 2, 3]}, timeout=30.5)