mirror of https://gitee.com/anolis/sysom.git
67 lines
2.1 KiB
Python
67 lines
2.1 KiB
Python
# -*- coding: utf-8 -*- #
|
|
"""
|
|
Time 2023/3/17 19:43
|
|
Author: mingfeng (SunnyQjm)
|
|
Email mfeng@linux.alibaba.com
|
|
File url.py
|
|
Description:
|
|
"""
|
|
|
|
import urllib.parse
|
|
from .exceptions import GCacheNotValidGCacheUrlException
|
|
|
|
|
|
class GCacheUrl:
|
|
"""GCacheUrl definition
|
|
|
|
GCacheUrl URL format definition, which consists of three parts
|
|
(proto, netloc, params)
|
|
|
|
Args:
|
|
proto(str): Protocol identifier (e.g., redis)
|
|
netloc(str): Connection address, mainly used to connect to low-level
|
|
messaging middleware (e.g., localhost:6379)
|
|
params(dict): Connection parameters (e.g., {"password": "123456"})
|
|
|
|
Attributes:
|
|
proto(str): Protocol identifier (e.g., redis)
|
|
netloc(str): Connection address, mainly used to connect to low-level
|
|
messaging middleware (e.g., localhost:6379)
|
|
params(dict): Connection parameters (e.g., {"password": "123456"})
|
|
"""
|
|
|
|
def __init__(self, proto: str, netloc: str, params: dict):
|
|
self.proto = proto
|
|
self.netloc = netloc
|
|
self.params = params
|
|
|
|
def __str__(self):
|
|
query = "&".join([f'{k}={v}' for k, v in self.params.items()])
|
|
return f"{self.proto}://{self.netloc}?" \
|
|
f"{query}"
|
|
|
|
@staticmethod
|
|
def parse(url: str):
|
|
"""Parses a string into a GCacheUrl object
|
|
|
|
Args:
|
|
url(str)
|
|
|
|
Returns:
|
|
GCacheUrl
|
|
"""
|
|
parse_result = urllib.parse.urlparse(url, allow_fragments=False)
|
|
proto, netloc = parse_result.scheme, parse_result.netloc
|
|
query_str, params = parse_result.query, {}
|
|
if proto == '' or netloc == '':
|
|
raise GCacheNotValidGCacheUrlException(url)
|
|
for param in query_str.split('&'):
|
|
if param.strip() == '':
|
|
continue
|
|
param_split = param.split('=')
|
|
if len(param_split) != 2:
|
|
raise GCacheNotValidGCacheUrlException(
|
|
f"params error: {param}, url: {url}")
|
|
params[param_split[0]] = param_split[1]
|
|
return GCacheUrl(proto, netloc, params)
|