feat: complete deploy method code
This commit is contained in:
parent
d4e417f238
commit
eaa298274e
|
@ -46,6 +46,16 @@ class VectorDB(Generic[TSchema]):
|
|||
|
||||
return VectorDBTyped
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
if 'work_dir' in kwargs:
|
||||
self._workspace = kwargs['work_dir']
|
||||
if 'workspace' in kwargs:
|
||||
self._workspace = kwargs['workspace']
|
||||
self._uses_with = kwargs
|
||||
kwargs['requests'] = REQUESTS_MAP
|
||||
kwargs['runtime_args'] = {'workspace': self._workspace}
|
||||
self._executor = self._executor_cls(*args, **kwargs)
|
||||
|
||||
@classmethod
|
||||
def _get_jina_object(cls,
|
||||
*,
|
||||
|
@ -60,6 +70,12 @@ class VectorDB(Generic[TSchema]):
|
|||
obj_name: Optional[str] = None,
|
||||
**kwargs):
|
||||
from jina import Deployment, Flow
|
||||
is_instance = False
|
||||
if isinstance(cls, VectorDB):
|
||||
is_instance = True
|
||||
|
||||
if is_instance:
|
||||
workspace = workspace or cls._workspace
|
||||
replicas = replicas or 1
|
||||
shards = shards or 1
|
||||
protocol = protocol or 'grpc'
|
||||
|
@ -101,12 +117,11 @@ class VectorDB(Generic[TSchema]):
|
|||
kwargs.pop('stateful')
|
||||
|
||||
use_deployment = True
|
||||
print(f' hey JOAN HERE {to_deploy}')
|
||||
if to_deploy:
|
||||
# here we would need to push the EXECUTOR TO HUBBLE AND CHANGE THE USES
|
||||
assert definition_file is not None, 'Trying to create a Jina Object for Deployment without the file where the vectordb object/class is defined'
|
||||
assert obj_name is not None, 'Trying to create a Jina Object for Deployment without the name of the vectordb object/class to deploy'
|
||||
push_vectordb_to_hubble(vectordb_name=obj_name, definition_file_path=definition_file)
|
||||
uses = f'jinaai+docker://{push_vectordb_to_hubble(vectordb_name=obj_name, definition_file_path=definition_file)}'
|
||||
use_deployment = False
|
||||
|
||||
if 'websocket' in protocol_list: # websocket not supported for Deployment
|
||||
|
@ -116,7 +131,8 @@ class VectorDB(Generic[TSchema]):
|
|||
use_deployment = False
|
||||
|
||||
if use_deployment:
|
||||
jina_object = Deployment(uses=uses,
|
||||
jina_object = Deployment(name='indexer',
|
||||
uses=uses,
|
||||
port=port,
|
||||
protocol=protocol,
|
||||
shards=shards,
|
||||
|
@ -127,7 +143,8 @@ class VectorDB(Generic[TSchema]):
|
|||
polling=polling,
|
||||
**kwargs)
|
||||
else:
|
||||
jina_object = Flow(port=port, protocol=protocol, **kwargs).add(uses=uses,
|
||||
jina_object = Flow(port=port, protocol=protocol, **kwargs).add(name='indexer',
|
||||
uses=uses,
|
||||
shards=shards,
|
||||
replicas=replicas,
|
||||
stateful=stateful,
|
||||
|
@ -137,16 +154,6 @@ class VectorDB(Generic[TSchema]):
|
|||
|
||||
return jina_object
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
if 'work_dir' in kwargs:
|
||||
self._workspace = kwargs['work_dir']
|
||||
if 'workspace' in kwargs:
|
||||
self._workspace = kwargs['workspace']
|
||||
self._uses_with = kwargs
|
||||
kwargs['requests'] = REQUESTS_MAP
|
||||
kwargs['runtime_args'] = {'workspace': self._workspace}
|
||||
self._executor = self._executor_cls(*args, **kwargs)
|
||||
|
||||
@classmethod
|
||||
def serve(cls,
|
||||
*,
|
||||
|
@ -162,8 +169,54 @@ class VectorDB(Generic[TSchema]):
|
|||
@classmethod
|
||||
def deploy(cls,
|
||||
**kwargs):
|
||||
from tempfile import mkdtemp
|
||||
import os
|
||||
import yaml
|
||||
from yaml.loader import SafeLoader
|
||||
jina_obj = cls._get_jina_object(to_deploy=True, **kwargs)
|
||||
# here we will need to transform to YAML, change `jcloud` options and deploy
|
||||
|
||||
tmpdir = mkdtemp()
|
||||
jina_obj.save_config(os.path.join(tmpdir, 'flow.yml'))
|
||||
with open(os.path.join(tmpdir, 'flow.yml'), 'r') as f:
|
||||
flow_dict = yaml.load(f, SafeLoader)
|
||||
|
||||
executor_jcloud_config = {'resources': {'instance': 'C5', 'autoscale': {'min': 0, 'max': 1},
|
||||
'storage': {'kind': 'ebs', 'size': '10G', 'retain': True}}}
|
||||
for executor in flow_dict['executors']:
|
||||
executor['jcloud'] = executor_jcloud_config
|
||||
|
||||
global_jcloud_config = {
|
||||
'labels': {
|
||||
'app': 'vectordb',
|
||||
},
|
||||
'monitor': {
|
||||
'traces': {
|
||||
'enable': True,
|
||||
},
|
||||
'metrics': {
|
||||
'enable': True,
|
||||
'host': 'http://opentelemetry-collector.monitor.svc.cluster.local',
|
||||
'port': 4317,
|
||||
},
|
||||
},
|
||||
}
|
||||
flow_dict['jcloud'] = global_jcloud_config
|
||||
import tempfile
|
||||
from jcloud.flow import CloudFlow
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
flow_path = os.path.join(tmpdir, 'flow.yml')
|
||||
with open(flow_path, 'w') as f:
|
||||
yaml.safe_dump(flow_dict, f, sort_keys=False)
|
||||
|
||||
cloud_flow = CloudFlow(path=flow_path)
|
||||
|
||||
async def _deploy():
|
||||
await cloud_flow.__aenter__()
|
||||
|
||||
import asyncio
|
||||
ret = asyncio.run(_deploy())
|
||||
return ret
|
||||
|
||||
@pass_kwargs_as_params
|
||||
@unify_input_output
|
||||
|
|
|
@ -75,6 +75,8 @@ def _push_to_hubble(
|
|||
push_envs = (
|
||||
{'JINA_HUBBLE_HIDE_EXECUTOR_PUSH_SUCCESS_MSG': 'true'} if not verbose else {}
|
||||
)
|
||||
# return 'a' + ':' + tag
|
||||
|
||||
with EnvironmentVarCtxtManager(push_envs):
|
||||
executor_id = HubIO(args).push().get('id')
|
||||
return executor_id + ':' + tag
|
||||
|
|
Loading…
Reference in New Issue