feat: add deploy to jcloud journey (#20)

* feat: add deploy to jcloud journey

* feat: push to hubble

* feat: complete deploy method code

* feat: provide CLI in main

Signed-off-by: Joan Fontanals Martinez <joan.martinez@jina.ai>

* ci: add docker building steps

* docs: adapt readme to new CLI

---------

Signed-off-by: Joan Fontanals Martinez <joan.martinez@jina.ai>
This commit is contained in:
Joan Fontanals 2023-06-15 17:53:59 +02:00 committed by GitHub
parent 5c38ae719f
commit 6591f20fab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 502 additions and 286 deletions

View File

@ -21,118 +21,28 @@ jobs:
touch SUCCESS touch SUCCESS
if: inputs.release_token == env.release_token if: inputs.release_token == env.release_token
env: env:
release_token: ${{ secrets.JINA_CORE_RELEASE_TOKEN }} release_token: ${{ secrets.VECTORDB_RELEASE_TOKEN }}
- name: Fail release token - name: Fail release token
run: | run: |
[[ -f SUCCESS ]] [[ -f SUCCESS ]]
regular-release: regular-release:
needs: token-check needs: token-check
runs-on: ubuntu-latest runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
pip_tag: [ "", "perf", "standard", "devel"] # default: "" = core
py_version: [ "3.7", "3.8", "3.9" , "3.10", "3.11"] # default "" = 3.7
steps: steps:
- uses: actions/checkout@v2.5.0 - uses: actions/checkout@v2.5.0
with: with:
fetch-depth: 100 fetch-depth: 100
- name: Set envs and versions - name: Set up Python 3.9
uses: actions/setup-python@v1
with:
python-version: 3.9
- name: Get vectordb version
run: | run: |
DEFAULT_PY_VERSION="3.8" pip install -e .
VCS_REF=${{ github.ref }} echo "VECTORDB_VERSION=$(python -c 'import vectordb; print(vectordb.__version__)')" >> $GITHUB_ENV
echo "VCS_REF=$VCS_REF" >> $GITHUB_ENV
echo "Will build $VCS_REF"
echo "BUILD_DATE=$(date -u +'%Y-%m-%dT%H:%M:%SZ')" >> $GITHUB_ENV
echo "BUILD_TARGET=jina" >> $GITHUB_ENV
if [[ "${{ matrix.pip_tag }}" == "perf" ]]; then
echo "JINA_PIP_INSTALL_PERF=1" >> $GITHUB_ENV
fi
if [[ "${{ matrix.pip_tag }}" == "" ]]; then
echo "JINA_PIP_INSTALL_CORE=1" >> $GITHUB_ENV
fi
JINA_VERSION=$(sed -n '/^__version__/p' ./jina/__init__.py | cut -d \' -f2)
V_JINA_VERSION=v${JINA_VERSION}
JINA_MINOR_VERSION=${JINA_VERSION%.*}
JINA_MAJOR_VERSION=${JINA_MINOR_VERSION%.*}
PY_TAG=${{matrix.py_version}}
if [ -n "${PY_TAG}" ]; then
PY_TAG=-py${PY_TAG//./}
fi
PIP_TAG=${{ matrix.pip_tag }}
if [ -n "${PIP_TAG}" ]; then
PIP_TAG=-${PIP_TAG}
fi
git fetch --depth=1 origin +refs/tags/*:refs/tags/*
LAST_VER_TAG=$(git tag -l | sort -V | tail -n1)
PRE_VERSION=-dev$(git rev-list $LAST_VER_TAG..HEAD --count)
if [[ "${{ github.event.inputs.triggered_by }}" == "CD" ]]; then
if [[ "${{ matrix.py_version }}" == "$DEFAULT_PY_VERSION" ]]; then
echo "TAG_ALIAS=\
jinaai/jina:master${PY_TAG}${PIP_TAG}, \
jinaai/jina:master${PIP_TAG}, \
jinaai/jina:${JINA_VERSION}${PRE_VERSION}${PIP_TAG}, \
jinaai/jina:${JINA_VERSION}${PRE_VERSION}${PY_TAG}${PIP_TAG}" \
>> $GITHUB_ENV
else
# on every CD
echo "TAG_ALIAS=\
jinaai/jina:master${PY_TAG}${PIP_TAG}, \
jinaai/jina:${JINA_VERSION}${PRE_VERSION}${PY_TAG}${PIP_TAG}" \
>> $GITHUB_ENV
fi
elif [[ "${{ github.event.inputs.triggered_by }}" == "TAG" ]]; then
# on every tag release
if [[ "${{ matrix.py_version }}" == "$DEFAULT_PY_VERSION" ]]; then
echo "TAG_ALIAS=\
jinaai/jina:latest${PY_TAG}${PIP_TAG}, \
jinaai/jina:${JINA_VERSION}${PY_TAG}${PIP_TAG}, \
jinaai/jina:${JINA_MINOR_VERSION}${PY_TAG}${PIP_TAG}, \
jinaai/jina:${JINA_MAJOR_VERSION}${PY_TAG}${PIP_TAG}, \
jinaai/jina:latest${PIP_TAG}, \
jinaai/jina:${JINA_VERSION}${PIP_TAG}, \
jinaai/jina:${JINA_MINOR_VERSION}${PIP_TAG}, \
jinaai/jina:${JINA_MAJOR_VERSION}${PIP_TAG} \
" >> $GITHUB_ENV
else
echo "TAG_ALIAS=\
jinaai/jina:latest${PY_TAG}${PIP_TAG}, \
jinaai/jina:${JINA_VERSION}${PY_TAG}${PIP_TAG}, \
jinaai/jina:${JINA_MINOR_VERSION}${PY_TAG}${PIP_TAG}, \
jinaai/jina:${JINA_MAJOR_VERSION}${PY_TAG}${PIP_TAG} \
" >> $GITHUB_ENV
fi
elif [[ "${{ github.event.inputs.triggered_by }}" == "MANUAL" ]]; then
# on every manual release
if [[ "${{ matrix.py_version }}" == "$DEFAULT_PY_VERSION" ]]; then
echo "TAG_ALIAS=\
jinaai/jina:${JINA_VERSION}${PIP_TAG}, \
jinaai/jina:${JINA_VERSION}${PY_TAG}${PIP_TAG} \
" >> $GITHUB_ENV
else
echo "TAG_ALIAS=\
jinaai/jina:${JINA_VERSION}${PY_TAG}${PIP_TAG} \
" >> $GITHUB_ENV
fi
else
echo "Bad triggered_by: ${{ github.event.inputs.triggered_by }}!"
exit 1
fi
echo "JINA_VERSION=${JINA_VERSION}" >> $GITHUB_ENV
- name: Set up Docker Buildx - name: Set up Docker Buildx
id: buildx id: buildx
uses: docker/setup-buildx-action@v1 uses: docker/setup-buildx-action@v1
@ -141,27 +51,12 @@ jobs:
- name: Login to DockerHub - name: Login to DockerHub
uses: docker/login-action@v1 uses: docker/login-action@v1
with: with:
username: ${{ secrets.DOCKERHUB_DEVBOT_USER }} username: ${{ secrets.DOCKERHUB_JINAVECTORDB_USER }}
password: ${{ secrets.DOCKERHUB_DEVBOT_TOKEN }} password: ${{ secrets.DOCKERHUB_JINAVECTORDB_TOKEN }}
- run: |
# https://github.com/docker/buildx/issues/464#issuecomment-741507760
# https://github.com/kubernetes-sigs/azuredisk-csi-driver/pull/808/files
docker run --privileged --rm tonistiigi/binfmt --uninstall qemu-aarch64
docker run --rm --privileged tonistiigi/binfmt --install all
- name: Build and push - name: Build and push
uses: docker/build-push-action@v2 uses: docker/build-push-action@v2
with: with:
context: .
file: Dockerfiles/debianx.Dockerfile
platforms: linux/amd64,linux/arm64
push: true push: true
tags: ${{env.TAG_ALIAS}} context: .
build-args: | file: Dockerfiles/vectordb.Dockerfile
BUILD_DATE=${{env.BUILD_DATE}} tags: jinaai/vectordb:latest, jinaai/vectordb:${{ env.VECTORDB_VERSION }}
JINA_VERSION=${{env.JINA_VERSION}}
VCS_REF=${{env.VCS_REF}}
PIP_INSTALL_CORE=${{env.JINA_PIP_INSTALL_CORE}}
PIP_INSTALL_PERF=${{env.JINA_PIP_INSTALL_PERF}}
PY_VERSION=${{matrix.py_version}}
PIP_TAG=${{matrix.pip_tag}}
target: ${{env.BUILD_TARGET}}

View File

@ -1,113 +0,0 @@
name: Manual Docs Build
on:
workflow_dispatch:
inputs:
release_token:
description: 'Your release token'
required: true
triggered_by:
description: 'CD | TAG | MANUAL'
required: false
default: MANUAL
build_old_docs:
description: 'Whether to build old docs (TRUE | FALSE)'
type: string
default: 'FALSE'
package:
description: The name of the repo to build documentation for.
type: string
default: jina
repo_owner:
description: The owner of the repo to build documentation for. Defaults to 'jina-ai'.
type: string
default: jina-ai
pages_branch:
description: Branch that Github Pages observes
type: string
default: gh-pages
git_config_name:
type: string
default: Jina Dev Bot
git_config_email:
type: string
default: dev-bot@jina.ai
jobs:
token-check:
runs-on: ubuntu-latest
steps:
- name: Check release token
id: token-check
run: |
touch SUCCESS
if: inputs.release_token == env.release_token
env:
release_token: ${{ secrets.JINA_CORE_RELEASE_TOKEN }}
- name: Fail release token
run: |
[[ -f SUCCESS ]]
build-and-push-latest-docs:
needs: token-check
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
with:
fetch-depth: 1
- uses: actions/setup-python@v4
with:
python-version: '3.7'
- name: Install Dependencies
run: |
pip install .[devel]
cd docs
pip install -r requirements.txt
pip install --pre -U furo
pip install sphinx-markdown-tables==0.0.17
- name: Sphinx Build
run: |
cd docs
bash makedoc.sh local-only
mv ./_build/dirhtml /tmp/gen-html
cd ..
- name: Checkout to GH pages branch (${{ inputs.pages_branch }})
run: |
git fetch origin ${{ inputs.pages_branch }}:${{ inputs.pages_branch }} --depth 1
git checkout -f ${{ inputs.pages_branch }}
git reset --hard HEAD
- name: Small config stuff
run: |
touch /tmp/gen-html/.nojekyll
cp ./docs/_versions.json /tmp/gen-html/_versions.json
cp ./docs/CNAME /tmp/gen-html/CNAME
cp /tmp/gen-html/404/index.html /tmp/gen-html/404.html
sed -i 's/href="\.\./href="/' /tmp/gen-html/404.html # fix asset urls that needs to be updated in 404.html
- name: Moving old doc versions
run: |
cd docs
for i in $(cat _versions.json | jq '.[].version' | tr -d '"'); do if [ -d "$i" ]; then mv "$i" /tmp/gen-html; fi; done
- name: Swap in new docs
run: |
rm -rf ./docs
mv /tmp/gen-html ./docs
- name: Push it up!
run: |
git config --local user.email "${{ inputs.git_config_email }}"
git config --local user.name "${{ inputs.git_config_name }}"
git show --summary
git add ./docs && git commit -m "chore(docs): update docs due to ${{github.event_name}} on ${{github.repository}}"
git push origin ${{ inputs.pages_branch }}
build-old-docs:
needs: build-and-push-latest-docs
runs-on: ubuntu-latest
if: inputs.build_old_docs == 'TRUE'
steps:
- uses: benc-uk/workflow-dispatch@v1
with:
workflow: Build old docs
token: ${{ secrets.JINA_DEV_BOT }}
inputs: '{ "release_token": "${{ env.release_token }}", "triggered_by": "TAG"}'
env:
release_token: ${{ secrets.JINA_CORE_RELEASE_TOKEN }}

1
.gitignore vendored
View File

@ -1,5 +1,6 @@
# Byte-compiled / optimized / DLL files # Byte-compiled / optimized / DLL files
__pycache__/ __pycache__/
*__pycache__/
*.py[cod] *.py[cod]
*$py.class *$py.class

View File

@ -0,0 +1,13 @@
ARG PY_VERSION=3.9
FROM python:${PY_VERSION}-slim
RUN apt-get update && apt-get install --no-install-recommends -y gcc libc6-dev pkg-config wget build-essential
COPY . /vectordb/
RUN cd /vectordb && pip install -U pip && pip install .
RUN pip install -U docarray[hnswlib]>=0.33
ENTRYPOINT ["vectordb"]

View File

@ -1,4 +1,9 @@
# Vector Database for Python Developers # Vector Database for Python Developers
Vector Databases are databases that store embeddings representing data to provide semantic similarity between objects. Vector databases
are used to perform similarity search between multimodal data, such as text, image, audio or videos and also are powering LLM applications
to provide context for LLMs to improve the results of the generation and prevent evaluations.
`vectordb` is a simple, user-friendly solution for Python developers looking to create their own vector database with CRUD support. Vector databases are a key component of the stack needed to use LLMs as they allow them to have access to context and memory. Many of the solutions out there require developers and users to use complex solutions that are often not needed. With `vectordb`, you can easily create your own vector database solution that can work locally and still be easily deployed and served with scalability features such as sharding and replication. `vectordb` is a simple, user-friendly solution for Python developers looking to create their own vector database with CRUD support. Vector databases are a key component of the stack needed to use LLMs as they allow them to have access to context and memory. Many of the solutions out there require developers and users to use complex solutions that are often not needed. With `vectordb`, you can easily create your own vector database solution that can work locally and still be easily deployed and served with scalability features such as sharding and replication.
Start with your solution as a local library and seamlessly transition into a served database with all the needed capability. No extra complexity than the needed one. Start with your solution as a local library and seamlessly transition into a served database with all the needed capability. No extra complexity than the needed one.
@ -122,6 +127,38 @@ When serving or deploying your Vector Databases you can set 2 scaling parameters
** When deployed to JCloud, the number of replicas will be set to 1. We are working to enable replication in the cloud ** When deployed to JCloud, the number of replicas will be set to 1. We are working to enable replication in the cloud
## 💻 `vectordb` CLI
`vectordb` is a simple CLI that helps you to serve and deploy your `vectordb` db.
First, you need to embed your database instance or class in a python file.
```python
# example.py
from docarray import DocList, BaseDoc
from docarray.typing import NdArray
from vectordb import InMemoryExactNNVectorDB
class MyDoc(BaseDoc):
text: str
embedding: NdArray[128]
db = InMemoryExactNNVectorDB[MyDoc](workspace='./vectordb') # notice how `db` is the instance that we want to serve
if __name__ == '__main__':
# make sure to protect this part of the code
with app.serve() as service:
service.block()
```
| Description | Command |
| --- | ---: |
| Serve your app locally | `vectordb serve --db example:db` |
| Deploy your app on JCloud |`vectordb deploy --db example:db` |
## :cloud: Deploy it to the cloud ## :cloud: Deploy it to the cloud
@ -134,17 +171,43 @@ When serving or deploying your Vector Databases you can set 2 scaling parameters
```jc login``` ```jc login```
3. Deploy: 3. Deploy:
```bash
vectordb deploy --db example:db
```
<details>
<summary>Show command output</summary>
```text
╭──────────────┬────────────────────────────────────────────────────────────────────────────────────────────────────────╮
│ App ID │ <id>
├──────────────┼────────────────────────────────────────────────────────────────────────────────────────────────────────┤
│ Phase │ Serving │
├──────────────┼────────────────────────────────────────────────────────────────────────────────────────────────────────┤
│ Endpoint │ grpc://<id>.wolf.jina.ai │
├──────────────┼────────────────────────────────────────────────────────────────────────────────────────────────────────┤
│ App logs │ dashboards.wolf.jina.ai │
╰──────────────┴────────────────────────────────────────────────────────────────────────────────────────────────────────╯
```
</details>
4. Connect from Client
Once deployed, you can use `vectordb` Client to access the given endpoint.
```python ```python
HNSWLibDB[MyTextDoc].deploy(config={'data_path'= './hnswlib_path'}, replicas=1, shards=1) from vectordb import Client
c = Client(address=' grpc://<id>.wolf.jina.ai')
``` ```
You can then list and delete your deployed DBs with `jc`: 5. Manage your deployed instances using [jcloud](https://github.com/jina-ai/jcloud):
You can then list and delete your deployed DBs with `jc` command:
```jc list <>``` ```jc list <>```
```jc delete <>``` ```jc delete <>```
## 🛣️ Roadmap ## 🛣️ Roadmap
We have big plans for the future of Vector Database! Here are some of the features we have in the works: We have big plans for the future of Vector Database! Here are some of the features we have in the works:

View File

@ -1,2 +1,3 @@
jina>=3.17.0 jina>=3.17.0
click
#docarray[hnswlib]>=0.33.0 #docarray[hnswlib]>=0.33.0

View File

@ -0,0 +1,6 @@
FROM jinaai/vectordb:latest
COPY . /workdir/
WORKDIR /workdir
ENTRYPOINT ["jina", "executor", "--uses", "config.yml"]

View File

@ -0,0 +1,6 @@
jtype: VectorDBExecutor
metas:
name: vectordb_executor
py_modules:
- vectordb_app.py
- executor.py

View File

@ -0,0 +1,5 @@
from vectordb_app import db
class VectorDBExecutor(db._executor_cls):
pass

View File

@ -32,13 +32,17 @@ setup(
'License :: OSI Approved :: MIT License', 'License :: OSI Approved :: MIT License',
'Programming Language :: Python', 'Programming Language :: Python',
'Programming Language :: Python :: 3', 'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8', 'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9', 'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10', 'Programming Language :: Python :: 3.10',
'Programming Language :: Python :: 3.11', 'Programming Language :: Python :: 3.11',
], ],
python_requires='>=3.7', python_requires='>=3.7',
entry_points={
'console_scripts': [
'vectordb=vectordb.__main__:deploy',
],
},
extras_require={ extras_require={
'test': [ 'test': [
'pytest', 'pytest',

125
vectordb/__main__.py Normal file
View File

@ -0,0 +1,125 @@
import click
from vectordb.db.base import VectorDB
from vectordb import __version__
@click.group()
@click.version_option(__version__, '-v', '--version', prog_name='vectordb')
@click.help_option('-h', '--help')
def vectordb():
pass
@vectordb.command(help='Deploy a vectorDB app to Jina AI Cloud')
@click.option(
'--db',
'--app',
type=str,
required=True,
help='VectorDB to deploy, in the format "<module>:<attribute>"',
)
@click.option(
'--protocol',
'--protocols',
type=str,
default='grpc',
help='Protocol to use to communicate with the VectorDB. It can be a single one or a list of multiple protocols to use. Options are grpc, http and websocket',
required=False,
show_default=True,
)
@click.option(
'--shards',
'-s',
type=int,
default=1,
help='Number of shards to use for the deployed VectorDB',
required=False,
show_default=True,
)
def deploy(db, protocol, shards):
definition_file, _, obj_name = db.partition(":")
protocol = protocol.split(',')
VectorDB.deploy(protoocl=protocol,
shards=shards,
definition_file=definition_file,
obj_name=obj_name)
@vectordb.command(help='Deploy a vectorDB app to Jina AI Cloud')
@click.option(
'--db',
'--app',
type=str,
required=True,
help='VectorDB to serve, in the format "<module>:<attribute>"',
)
@click.option(
'--port',
'-p',
type=str,
default='8081',
help='Port to use to access the VectorDB. It can be a single one or a list corresponding to the protocols',
required=False,
show_default=True,
)
@click.option(
'--protocol',
'--protocols',
type=str,
default='grpc',
help='Protocol to use to communicate with the VectorDB. It can be a single one or a list of multiple protocols to use. Options are grpc, http and websocket',
required=False,
show_default=True,
)
@click.option(
'--replicas',
'-r',
type=int,
default=1,
help='Number of replicas to use for the serving VectorDB',
required=False,
show_default=True,
)
@click.option(
'--shards',
'-s',
type=int,
default=1,
help='Number of shards to use for the served VectorDB',
required=False,
show_default=True,
)
@click.option(
'--workspace',
'-w',
type=str,
default='.',
help='Workspace for the VectorDB to persist its data',
required=False,
show_default=True,
)
def serve(db, port, protocol, replicas, shards, workspace):
import importlib
definition_file, _, obj_name = db.partition(":")
port = port.split(',')
if len(port) == 1:
port = int(port[0])
else:
port = [int(p) for p in port]
protocol = protocol.split(',')
if definition_file.endswith('.py'):
definition_file = definition_file[:-3]
module = importlib.import_module(definition_file)
db = getattr(module, obj_name)
service = db.serve(protocol=protocol,
port=port,
shards=shards,
replicas=replicas,
workspace=workspace)
with service:
service.block()
if __name__ == '__main__':
vectordb()

View File

@ -2,6 +2,7 @@ from typing import TypeVar, Generic, Type, Optional, TYPE_CHECKING
from vectordb.utils.unify_input_output import unify_input_output from vectordb.utils.unify_input_output import unify_input_output
from vectordb.utils.pass_parameters import pass_kwargs_as_params from vectordb.utils.pass_parameters import pass_kwargs_as_params
from vectordb.utils.create_doc_type import create_output_doc_type from vectordb.utils.create_doc_type import create_output_doc_type
from vectordb.utils.sort_matches_by_score import sort_matches_by_scores
if TYPE_CHECKING: if TYPE_CHECKING:
@ -43,8 +44,9 @@ class Client(Generic[TSchema]):
return ClientTyped return ClientTyped
def __init__(self, address): def __init__(self, address, reverse_order=False):
from jina import Client as jClient from jina import Client as jClient
self.reverse_score_order = reverse_order
self._client = jClient(host=address) self._client = jClient(host=address)
@unify_input_output @unify_input_output
@ -52,6 +54,7 @@ class Client(Generic[TSchema]):
def index(self, *args, **kwargs): def index(self, *args, **kwargs):
return self._client.index(*args, **kwargs) return self._client.index(*args, **kwargs)
@sort_matches_by_scores
@unify_input_output @unify_input_output
@pass_kwargs_as_params @pass_kwargs_as_params
def search(self, *args, **kwargs): def search(self, *args, **kwargs):

View File

@ -1,10 +1,10 @@
from typing import TypeVar, Generic, Type, Optional, Union, List, Dict, TYPE_CHECKING from typing import TypeVar, Generic, Type, Optional, Union, List, Dict, TYPE_CHECKING
from vectordb.db.executors.typed_executor import TypedExecutor from vectordb.db.executors.typed_executor import TypedExecutor
from vectordb.db.service import Service from vectordb.db.service import Service
from vectordb.utils.create_doc_type import create_output_doc_type
from vectordb.utils.unify_input_output import unify_input_output from vectordb.utils.unify_input_output import unify_input_output
from vectordb.utils.pass_parameters import pass_kwargs_as_params from vectordb.utils.pass_parameters import pass_kwargs_as_params
from vectordb.utils.sort_matches_by_score import sort_matches_by_scores from vectordb.utils.sort_matches_by_score import sort_matches_by_scores
from vectordb.utils.push_to_hubble import push_vectordb_to_hubble
if TYPE_CHECKING: if TYPE_CHECKING:
from jina import Deployment, Flow from jina import Deployment, Flow
@ -19,7 +19,6 @@ class VectorDB(Generic[TSchema]):
# the BaseDoc that defines the schema of the store # the BaseDoc that defines the schema of the store
# for subclasses this is filled automatically # for subclasses this is filled automatically
_input_schema: Optional[Type['BaseDoc']] = None _input_schema: Optional[Type['BaseDoc']] = None
_output_schema: Optional[Type['BaseDoc']] = None
_executor_type: Optional[Type[TypedExecutor]] = None _executor_type: Optional[Type[TypedExecutor]] = None
_executor_cls: Type[TypedExecutor] _executor_cls: Type[TypedExecutor]
@ -38,12 +37,9 @@ class VectorDB(Generic[TSchema]):
f'{cls.__name__}[item] `item` should be a Document not a {item} ' f'{cls.__name__}[item] `item` should be a Document not a {item} '
) )
out_item = create_output_doc_type(item)
class VectorDBTyped(cls): # type: ignore class VectorDBTyped(cls): # type: ignore
_input_schema: Type[TSchema] = item _input_schema: Type[TSchema] = item
_output_schema: Type[TSchema] = out_item _executor_cls: Type[TypedExecutor] = cls._executor_type[item]
_executor_cls: Type[TypedExecutor] = cls._executor_type[item, out_item]
VectorDBTyped.__name__ = f'{cls.__name__}[{item.__name__}]' VectorDBTyped.__name__ = f'{cls.__name__}[{item.__name__}]'
VectorDBTyped.__qualname__ = f'{cls.__qualname__}[{item.__name__}]' VectorDBTyped.__qualname__ = f'{cls.__qualname__}[{item.__name__}]'
@ -61,23 +57,43 @@ class VectorDB(Generic[TSchema]):
self._executor = self._executor_cls(*args, **kwargs) self._executor = self._executor_cls(*args, **kwargs)
@classmethod @classmethod
def serve(cls, def _get_jina_object(cls,
*, *,
port: Optional[Union[str, List[str]]] = 8081, to_deploy: bool = False,
workspace: Optional[str] = None, port: Optional[Union[str, List[str]]] = 8081,
protocol: Optional[Union[str, List[str]]] = None, protocol: Optional[Union[str, List[str]]] = None,
shards: Optional[int] = None, workspace: Optional[str] = None,
replicas: Optional[int] = None, shards: Optional[int] = None,
peer_ports: Optional[Union[Dict[str, List], List]] = None, replicas: Optional[int] = None,
**kwargs): peer_ports: Optional[Union[Dict[str, List], List]] = None,
definition_file: Optional[str] = None,
obj_name: Optional[str] = None,
**kwargs):
from jina import Deployment, Flow from jina import Deployment, Flow
is_instance = False
if isinstance(cls, VectorDB):
is_instance = True
if is_instance:
workspace = workspace or cls._workspace
replicas = replicas or 1
shards = shards or 1
protocol = protocol or 'grpc' protocol = protocol or 'grpc'
protocol_list = [p.lower() for p in protocol] if isinstance(protocol, list) else [protocol.lower()] protocol_list = [p.lower() for p in protocol] if isinstance(protocol, list) else [protocol.lower()]
stateful = replicas is not None and replicas > 1 stateful = replicas is not None and replicas > 1
executor_cls_name = ''.join(cls._executor_cls.__name__.split('[')[0:2]) if not to_deploy:
ServedExecutor = type(f'{executor_cls_name.replace("[", "").replace("]", "")}', (cls._executor_cls,), executor_cls_name = ''.join(cls._executor_cls.__name__.split('[')[0:2])
{}) ServedExecutor = type(f'{executor_cls_name.replace("[", "").replace("]", "")}', (cls._executor_cls,),
{})
uses = ServedExecutor
polling = {'/index': 'ANY', '/search': 'ALL', '/update': 'ALL', '/delete': 'ALL'} polling = {'/index': 'ANY', '/search': 'ALL', '/update': 'ALL', '/delete': 'ALL'}
if to_deploy and replicas > 1:
import warnings
warnings.warn(
'Deployment with replicas > 1 is not currently available. The deployment will have 1 replica per each '
'shard')
replicas = 1
if 1 < replicas < 3: if 1 < replicas < 3:
raise Exception(f'In order for consensus to properly work, at least 3 replicas need to be provided.') raise Exception(f'In order for consensus to properly work, at least 3 replicas need to be provided.')
@ -102,6 +118,12 @@ class VectorDB(Generic[TSchema]):
kwargs.pop('stateful') kwargs.pop('stateful')
use_deployment = True use_deployment = True
if to_deploy:
# here we would need to push the EXECUTOR TO HUBBLE AND CHANGE THE USES
assert definition_file is not None, 'Trying to create a Jina Object for Deployment without the file where the vectordb object/class is defined'
assert obj_name is not None, 'Trying to create a Jina Object for Deployment without the name of the vectordb object/class to deploy'
uses = f'jinaai+docker://{push_vectordb_to_hubble(vectordb_name=obj_name, definition_file_path=definition_file)}'
use_deployment = False
if 'websocket' in protocol_list: # websocket not supported for Deployment if 'websocket' in protocol_list: # websocket not supported for Deployment
use_deployment = False use_deployment = False
@ -110,26 +132,92 @@ class VectorDB(Generic[TSchema]):
use_deployment = False use_deployment = False
if use_deployment: if use_deployment:
ctxt_manager = Deployment(uses=ServedExecutor, jina_object = Deployment(name='indexer',
port=port, uses=uses,
protocol=protocol, port=port,
shards=shards, protocol=protocol,
replicas=replicas, shards=shards,
stateful=stateful, replicas=replicas,
peer_ports=peer_ports, stateful=stateful,
workspace=workspace, peer_ports=peer_ports,
polling=polling, workspace=workspace,
**kwargs) polling=polling, **kwargs)
else: else:
ctxt_manager = Flow(port=port, protocol=protocol, **kwargs).add(uses=ServedExecutor, jina_object = Flow(port=port, protocol=protocol, **kwargs).add(name='indexer',
shards=shards, uses=uses,
replicas=replicas, shards=shards,
stateful=stateful, replicas=replicas,
peer_ports=peer_ports, stateful=stateful,
polling=polling, peer_ports=peer_ports,
workspace=workspace) polling=polling,
workspace=workspace)
return Service(ctxt_manager, address=f'{protocol_list[0]}://0.0.0.0:{port}', schema=cls._input_schema, reverse_order=cls.reverse_score_order) return jina_object
@classmethod
def serve(cls,
*,
port: Optional[Union[str, List[str]]] = 8081,
protocol: Optional[Union[str, List[str]]] = None,
**kwargs):
protocol = protocol or 'grpc'
protocol_list = [p.lower() for p in protocol] if isinstance(protocol, list) else [protocol.lower()]
ctxt_manager = cls._get_jina_object(to_deploy=False, port=port, protocol=protocol, **kwargs)
port = port[0] if isinstance(port, list) else port
return Service(ctxt_manager, address=f'{protocol_list[0]}://0.0.0.0:{port}', schema=cls._input_schema,
reverse_order=cls.reverse_score_order)
@classmethod
def deploy(cls,
**kwargs):
from tempfile import mkdtemp
import os
import yaml
from yaml.loader import SafeLoader
jina_obj = cls._get_jina_object(to_deploy=True, **kwargs)
tmpdir = mkdtemp()
jina_obj.save_config(os.path.join(tmpdir, 'flow.yml'))
with open(os.path.join(tmpdir, 'flow.yml'), 'r') as f:
flow_dict = yaml.load(f, SafeLoader)
executor_jcloud_config = {'resources': {'instance': 'C5', 'autoscale': {'min': 0, 'max': 1},
'storage': {'kind': 'ebs', 'size': '10G', 'retain': True}}}
for executor in flow_dict['executors']:
executor['jcloud'] = executor_jcloud_config
global_jcloud_config = {
'labels': {
'app': 'vectordb',
},
'monitor': {
'traces': {
'enable': True,
},
'metrics': {
'enable': True,
'host': 'http://opentelemetry-collector.monitor.svc.cluster.local',
'port': 4317,
},
},
}
flow_dict['jcloud'] = global_jcloud_config
import tempfile
from jcloud.flow import CloudFlow
with tempfile.TemporaryDirectory() as tmpdir:
flow_path = os.path.join(tmpdir, 'flow.yml')
with open(flow_path, 'w') as f:
yaml.safe_dump(flow_dict, f, sort_keys=False)
cloud_flow = CloudFlow(path=flow_path)
async def _deploy():
await cloud_flow.__aenter__()
import asyncio
ret = asyncio.run(_deploy())
return ret
@pass_kwargs_as_params @pass_kwargs_as_params
@unify_input_output @unify_input_output

View File

@ -85,3 +85,6 @@ class InMemoryExactNNIndexer(TypedExecutor):
def close(self): def close(self):
if self._index_file_path is not None: if self._index_file_path is not None:
self._indexer.persist(self._index_file_path) self._indexer.persist(self._index_file_path)

View File

@ -1,7 +1,9 @@
from jina import Executor from jina import Executor
from jina.serve.executors import _FunctionWithSchema from jina.serve.executors import _FunctionWithSchema
from typing import TypeVar, Generic, Type, Optional, Tuple, TYPE_CHECKING from typing import TypeVar, Generic, Type, Optional, TYPE_CHECKING
from vectordb.utils.create_doc_type import create_output_doc_type
if TYPE_CHECKING: if TYPE_CHECKING:
from docarray import BaseDoc, DocList from docarray import BaseDoc, DocList
@ -40,18 +42,21 @@ class TypedExecutor(Executor, Generic[InputSchema, OutputSchema]):
# Behind-the-scenes magic # # Behind-the-scenes magic #
# Subclasses should not need to implement these # # Subclasses should not need to implement these #
################################################## ##################################################
def __class_getitem__(cls, item: Tuple[Type[InputSchema], Type[OutputSchema]]): def __class_getitem__(cls, item: Type[InputSchema]):
from docarray import BaseDoc from docarray import BaseDoc
input_schema, output_schema = item if not isinstance(item, type):
if not issubclass(input_schema, BaseDoc): # do nothing
# enables use in static contexts with type vars, e.g. as type annotation
return Generic.__class_getitem__.__func__(cls, item)
if not issubclass(item, BaseDoc):
raise ValueError( raise ValueError(
f'{cls.__name__}[item, ...] `item` should be a Document not a {input_schema} ' f'{cls.__name__}[item] `item` should be a Document not a {item} '
)
if not issubclass(output_schema, BaseDoc):
raise ValueError(
f'{cls.__name__}[..., item] `item` should be a Document not a {output_schema} '
) )
input_schema = item
output_schema = create_output_doc_type(input_schema)
class ExecutorTyped(cls): # type: ignore class ExecutorTyped(cls): # type: ignore
_input_schema: Type[InputSchema] = input_schema _input_schema: Type[InputSchema] = input_schema
_output_schema: Type[OutputSchema] = output_schema _output_schema: Type[OutputSchema] = output_schema

View File

@ -1,7 +1,6 @@
from vectordb.client.client import Client from vectordb.client.client import Client
from vectordb.utils.unify_input_output import unify_input_output from vectordb.utils.unify_input_output import unify_input_output
from vectordb.utils.pass_parameters import pass_kwargs_as_params from vectordb.utils.pass_parameters import pass_kwargs_as_params
from vectordb.utils.sort_matches_by_score import sort_matches_by_scores
class Service: class Service:
@ -9,7 +8,7 @@ class Service:
def __init__(self, ctxt_manager, schema, address, reverse_order=False): def __init__(self, ctxt_manager, schema, address, reverse_order=False):
self.ctxt_manager = ctxt_manager self.ctxt_manager = ctxt_manager
self._reverse_order = reverse_order self._reverse_order = reverse_order
self._client = Client[schema](address) self._client = Client[schema](address, reverse_order=reverse_order)
def __enter__(self): def __enter__(self):
self.ctxt_manager.__enter__() self.ctxt_manager.__enter__()
@ -33,7 +32,6 @@ class Service:
def index(self, *args, **kwargs): def index(self, *args, **kwargs):
return self._client.index(*args, **kwargs) return self._client.index(*args, **kwargs)
@sort_matches_by_scores
@pass_kwargs_as_params @pass_kwargs_as_params
@unify_input_output @unify_input_output
def search(self, *args, **kwargs): def search(self, *args, **kwargs):

View File

@ -0,0 +1,113 @@
import uuid
import os
from tempfile import mktemp
import shutil
import sys
import secrets
from shutil import copytree
from pathlib import Path
__resources_path__ = os.path.join(
Path(os.path.dirname(sys.modules['vectordb'].__file__)).parent.absolute(), 'resources'
)
class EnvironmentVarCtxtManager:
"""a class to wrap env vars"""
def __init__(self, envs):
"""
:param envs: a dictionary of environment variables
"""
self._env_keys_added = envs
self._env_keys_old = {}
def __enter__(self):
for key, val in self._env_keys_added.items():
# Store the old value, if it exists
if key in os.environ:
self._env_keys_old[key] = os.environ[key]
# Update the environment variable with the new value
os.environ[key] = str(val)
def __exit__(self, exc_type, exc_val, exc_tb):
# Restore the old values of updated environment variables
for key, val in self._env_keys_old.items():
os.environ[key] = str(val)
# Remove any newly added environment variables
for key in self._env_keys_added.keys():
os.unsetenv(key)
def get_random_tag():
return 't-' + uuid.uuid4().hex[:5]
def get_random_name():
return 'n-' + uuid.uuid4().hex[:5]
def _push_to_hubble(
tmpdir: str, name: str, tag: str, verbose: bool, public: bool
) -> str:
from hubble.executor.hubio import HubIO
from hubble.executor.parsers import set_hub_push_parser
secret = secrets.token_hex(8)
args_list = [
tmpdir,
'--tag',
tag,
'--no-usage',
'--no-cache',
]
if verbose:
args_list.remove('--no-usage')
args_list.append('--verbose')
if not public:
args_list.append('--secret')
args_list.append(secret)
args_list.append('--private')
args = set_hub_push_parser().parse_args(args_list)
push_envs = (
{'JINA_HUBBLE_HIDE_EXECUTOR_PUSH_SUCCESS_MSG': 'true'} if not verbose else {}
)
# return 'a' + ':' + tag
with EnvironmentVarCtxtManager(push_envs):
executor_id = HubIO(args).push().get('id')
return executor_id + ':' + tag
def push_vectordb_to_hubble(
vectordb_name,
definition_file_path,
) -> str:
tmpdir = mktemp()
image_name = get_random_name()
tag = get_random_name()
copytree(os.path.join(__resources_path__, 'jcloud_exec_template'), tmpdir)
shutil.copy(definition_file_path, os.path.join(tmpdir, 'vectordb_app.py'))
# replace `vectordb_name` in `vectordb_app`
with open(os.path.join(tmpdir, 'executor.py'), encoding='utf-8') as f:
content = f.read()
content = content.replace('from vectordb_app import db', f'from vectordb_app import {vectordb_name}')
content = content.replace('class VectorDBExecutor(db._executor_cls):',
f'class VectorDBExecutor({vectordb_name}._executor_cls):')
with open(os.path.join(tmpdir, 'executor.py'), mode='w', encoding='utf-8') as f:
f.write(content)
with open(os.path.join(tmpdir, 'config.yml'), encoding='utf-8') as f:
content = f.read()
content = content.replace('vectordb_executor', f'vectordb_executor-{image_name}')
with open(os.path.join(tmpdir, 'config.yml'), mode='w', encoding='utf-8') as f:
f.write(content)
return _push_to_hubble(tmpdir, image_name, tag, True, False)