From eaa298274eefb0fedb3f12669a343ac1aa30db63 Mon Sep 17 00:00:00 2001 From: Joan Fontanals Martinez Date: Fri, 9 Jun 2023 19:04:07 +0200 Subject: [PATCH] feat: complete deploy method code --- vectordb/db/base.py | 83 ++++++++++++++++++++++++++------ vectordb/utils/push_to_hubble.py | 2 + 2 files changed, 70 insertions(+), 15 deletions(-) diff --git a/vectordb/db/base.py b/vectordb/db/base.py index 432df6e..ed883c8 100644 --- a/vectordb/db/base.py +++ b/vectordb/db/base.py @@ -46,6 +46,16 @@ class VectorDB(Generic[TSchema]): return VectorDBTyped + def __init__(self, *args, **kwargs): + if 'work_dir' in kwargs: + self._workspace = kwargs['work_dir'] + if 'workspace' in kwargs: + self._workspace = kwargs['workspace'] + self._uses_with = kwargs + kwargs['requests'] = REQUESTS_MAP + kwargs['runtime_args'] = {'workspace': self._workspace} + self._executor = self._executor_cls(*args, **kwargs) + @classmethod def _get_jina_object(cls, *, @@ -60,6 +70,12 @@ class VectorDB(Generic[TSchema]): obj_name: Optional[str] = None, **kwargs): from jina import Deployment, Flow + is_instance = False + if isinstance(cls, VectorDB): + is_instance = True + + if is_instance: + workspace = workspace or cls._workspace replicas = replicas or 1 shards = shards or 1 protocol = protocol or 'grpc' @@ -101,12 +117,11 @@ class VectorDB(Generic[TSchema]): kwargs.pop('stateful') use_deployment = True - print(f' hey JOAN HERE {to_deploy}') if to_deploy: # here we would need to push the EXECUTOR TO HUBBLE AND CHANGE THE USES assert definition_file is not None, 'Trying to create a Jina Object for Deployment without the file where the vectordb object/class is defined' assert obj_name is not None, 'Trying to create a Jina Object for Deployment without the name of the vectordb object/class to deploy' - push_vectordb_to_hubble(vectordb_name=obj_name, definition_file_path=definition_file) + uses = f'jinaai+docker://{push_vectordb_to_hubble(vectordb_name=obj_name, definition_file_path=definition_file)}' use_deployment = False if 'websocket' in protocol_list: # websocket not supported for Deployment @@ -116,7 +131,8 @@ class VectorDB(Generic[TSchema]): use_deployment = False if use_deployment: - jina_object = Deployment(uses=uses, + jina_object = Deployment(name='indexer', + uses=uses, port=port, protocol=protocol, shards=shards, @@ -127,7 +143,8 @@ class VectorDB(Generic[TSchema]): polling=polling, **kwargs) else: - jina_object = Flow(port=port, protocol=protocol, **kwargs).add(uses=uses, + jina_object = Flow(port=port, protocol=protocol, **kwargs).add(name='indexer', + uses=uses, shards=shards, replicas=replicas, stateful=stateful, @@ -137,16 +154,6 @@ class VectorDB(Generic[TSchema]): return jina_object - def __init__(self, *args, **kwargs): - if 'work_dir' in kwargs: - self._workspace = kwargs['work_dir'] - if 'workspace' in kwargs: - self._workspace = kwargs['workspace'] - self._uses_with = kwargs - kwargs['requests'] = REQUESTS_MAP - kwargs['runtime_args'] = {'workspace': self._workspace} - self._executor = self._executor_cls(*args, **kwargs) - @classmethod def serve(cls, *, @@ -162,8 +169,54 @@ class VectorDB(Generic[TSchema]): @classmethod def deploy(cls, **kwargs): + from tempfile import mkdtemp + import os + import yaml + from yaml.loader import SafeLoader jina_obj = cls._get_jina_object(to_deploy=True, **kwargs) - # here we will need to transform to YAML, change `jcloud` options and deploy + + tmpdir = mkdtemp() + jina_obj.save_config(os.path.join(tmpdir, 'flow.yml')) + with open(os.path.join(tmpdir, 'flow.yml'), 'r') as f: + flow_dict = yaml.load(f, SafeLoader) + + executor_jcloud_config = {'resources': {'instance': 'C5', 'autoscale': {'min': 0, 'max': 1}, + 'storage': {'kind': 'ebs', 'size': '10G', 'retain': True}}} + for executor in flow_dict['executors']: + executor['jcloud'] = executor_jcloud_config + + global_jcloud_config = { + 'labels': { + 'app': 'vectordb', + }, + 'monitor': { + 'traces': { + 'enable': True, + }, + 'metrics': { + 'enable': True, + 'host': 'http://opentelemetry-collector.monitor.svc.cluster.local', + 'port': 4317, + }, + }, + } + flow_dict['jcloud'] = global_jcloud_config + import tempfile + from jcloud.flow import CloudFlow + + with tempfile.TemporaryDirectory() as tmpdir: + flow_path = os.path.join(tmpdir, 'flow.yml') + with open(flow_path, 'w') as f: + yaml.safe_dump(flow_dict, f, sort_keys=False) + + cloud_flow = CloudFlow(path=flow_path) + + async def _deploy(): + await cloud_flow.__aenter__() + + import asyncio + ret = asyncio.run(_deploy()) + return ret @pass_kwargs_as_params @unify_input_output diff --git a/vectordb/utils/push_to_hubble.py b/vectordb/utils/push_to_hubble.py index 3c5a84d..2f3e676 100644 --- a/vectordb/utils/push_to_hubble.py +++ b/vectordb/utils/push_to_hubble.py @@ -75,6 +75,8 @@ def _push_to_hubble( push_envs = ( {'JINA_HUBBLE_HIDE_EXECUTOR_PUSH_SUCCESS_MSG': 'true'} if not verbose else {} ) + # return 'a' + ':' + tag + with EnvironmentVarCtxtManager(push_envs): executor_id = HubIO(args).push().get('id') return executor_id + ':' + tag