sysom1/environment/1_sdk/gcache_base/url.py

67 lines
2.1 KiB
Python

# -*- coding: utf-8 -*- #
"""
Time 2023/3/17 19:43
Author: mingfeng (SunnyQjm)
Email mfeng@linux.alibaba.com
File url.py
Description:
"""
import urllib.parse
from .exceptions import GCacheNotValidGCacheUrlException
class GCacheUrl:
"""GCacheUrl definition
GCacheUrl URL format definition, which consists of three parts
(proto, netloc, params)
Args:
proto(str): Protocol identifier (e.g., redis)
netloc(str): Connection address, mainly used to connect to low-level
messaging middleware (e.g., localhost:6379)
params(dict): Connection parameters (e.g., {"password": "123456"})
Attributes:
proto(str): Protocol identifier (e.g., redis)
netloc(str): Connection address, mainly used to connect to low-level
messaging middleware (e.g., localhost:6379)
params(dict): Connection parameters (e.g., {"password": "123456"})
"""
def __init__(self, proto: str, netloc: str, params: dict):
self.proto = proto
self.netloc = netloc
self.params = params
def __str__(self):
query = "&".join([f'{k}={v}' for k, v in self.params.items()])
return f"{self.proto}://{self.netloc}?" \
f"{query}"
@staticmethod
def parse(url: str):
"""Parses a string into a GCacheUrl object
Args:
url(str)
Returns:
GCacheUrl
"""
parse_result = urllib.parse.urlparse(url, allow_fragments=False)
proto, netloc = parse_result.scheme, parse_result.netloc
query_str, params = parse_result.query, {}
if proto == '' or netloc == '':
raise GCacheNotValidGCacheUrlException(url)
for param in query_str.split('&'):
if param.strip() == '':
continue
param_split = param.split('=')
if len(param_split) != 2:
raise GCacheNotValidGCacheUrlException(
f"params error: {param}, url: {url}")
params[param_split[0]] = param_split[1]
return GCacheUrl(proto, netloc, params)