Source code for clarifai.client.input
import csv
import os
import time
import uuid
from concurrent.futures import ThreadPoolExecutor, as_completed
from multiprocessing import cpu_count
from typing import List, Union
from clarifai_grpc.grpc.api import resources_pb2, service_pb2 # noqa: F401
from clarifai_grpc.grpc.api.resources_pb2 import Annotation, Audio, Image, Input, Text, Video
from clarifai_grpc.grpc.api.status import status_code_pb2, status_pb2
from google.protobuf.json_format import MessageToDict
from google.protobuf.struct_pb2 import Struct
from tqdm import tqdm
from clarifai.client.base import BaseClient
from clarifai.client.lister import Lister
from clarifai.errors import UserError
from clarifai.utils.logging import get_logger
from clarifai.utils.misc import BackoffIterator, Chunker
[docs]class Inputs(Lister, BaseClient):
"""Inputs is a class that provides access to Clarifai API endpoints related to Input information."""
[docs] def __init__(self, user_id: str = "", app_id: str = "", logger_level: str = "INFO", **kwargs):
"""Initializes an Input object.
Args:
user_id (str): A user ID for authentication.
app_id (str): An app ID for the application to interact with.
**kwargs: Additional keyword arguments to be passed to the Input
"""
self.user_id = user_id
self.app_id = app_id
self.kwargs = {**kwargs}
self.input_info = resources_pb2.Input(**self.kwargs)
self.logger = get_logger(logger_level=logger_level, name=__name__)
BaseClient.__init__(self, user_id=self.user_id, app_id=self.app_id)
Lister.__init__(self)
def _get_proto(self,
input_id: str,
dataset_id: Union[str, None],
imagepb: Image = None,
video_pb: Video = None,
audio_pb: Audio = None,
text_pb: Text = None,
geo_info: List = None,
labels: List = None,
metadata: Struct = None) -> Input:
"""Create input proto for image data type.
Args:
input_id (str): The input ID for the input to create.
dataset_id (str): The dataset ID for the dataset to add the input to.
imagepb (Image): The image proto to be used for the input.
video_pb (Video): The video proto to be used for the input.
audio_pb (Audio): The audio proto to be used for the input.
text_pb (Text): The text proto to be used for the input.
geo_info (list): A list of longitude and latitude for the geo point.
labels (list): A list of labels for the input.
metadata (Struct): A Struct of metadata for the input.
Returns:
Input: An Input object for the specified input ID.
"""
assert geo_info is None or isinstance(
geo_info, list), "geo_info must be a list of longitude and latitude"
assert labels is None or isinstance(labels, list), "labels must be a list of strings"
assert metadata is None or isinstance(metadata, Struct), "metadata must be a Struct"
geo_pb = resources_pb2.Geo(geo_point=resources_pb2.GeoPoint(
longitude=geo_info[0], latitude=geo_info[1])) if geo_info else None
concepts=[
resources_pb2.Concept(
id=f"id-{''.join(_label.split(' '))}", name=_label, value=1.)\
for _label in labels
]if labels else None
if dataset_id:
return resources_pb2.Input(
id=input_id,
dataset_ids=[dataset_id],
data=resources_pb2.Data(
image=imagepb,
video=video_pb,
audio=audio_pb,
text=text_pb,
geo=geo_pb,
concepts=concepts,
metadata=metadata))
return resources_pb2.Input(
id=input_id,
data=resources_pb2.Data(
image=imagepb,
video=video_pb,
audio=audio_pb,
text=text_pb,
geo=geo_pb,
concepts=concepts,
metadata=metadata))
[docs] def get_input_from_url(self,
input_id: str,
image_url: str = None,
video_url: str = None,
audio_url: str = None,
text_url: str = None,
dataset_id: str = None,
**kwargs) -> Input:
"""Create input proto from url.
Args:
input_id (str): The input ID for the input to create.
image_url (str): The url for the image.
video_url (str): The url for the video.
audio_url (str): The url for the audio.
text_url (str): The url for the text.
dataset_id (str): The dataset ID for the dataset to add the input to.
Returns:
Input: An Input object for the specified input ID.
Example:
>>> from clarifai.client.input import Input
>>> input_obj = Input()
>>> input_proto = input_obj.get_input_from_url(input_id = 'demo', image_url='https://samples.clarifai.com/metro-north.jpg')
"""
if not any((image_url, video_url, audio_url, text_url)):
raise ValueError(
"At least one of image_url, video_url, audio_url, text_url must be provided.")
image_pb = resources_pb2.Image(url=image_url) if image_url else None
video_pb = resources_pb2.Video(url=video_url) if video_url else None
audio_pb = resources_pb2.Audio(url=audio_url) if audio_url else None
text_pb = resources_pb2.Text(url=text_url) if text_url else None
return self._get_proto(
input_id=input_id,
dataset_id=dataset_id,
imagepb=image_pb,
video_pb=video_pb,
audio_pb=audio_pb,
text_pb=text_pb,
**kwargs)
[docs] def get_input_from_file(self,
input_id: str,
image_file: str = None,
video_file: str = None,
audio_file: str = None,
dataset_id: str = None,
**kwargs) -> Input:
"""Create input proto from files.
Args:
input_id (str): The input ID for the input to create.
image_file (str): The url for the image.
video_file (str): The url for the video.
audio_file (str): The url for the audio.
dataset_id (str): The dataset ID for the dataset to add the input to.
Returns:
Input: An Input object for the specified input ID.
Example:
>>> from clarifai.client.input import Input
>>> input_obj = Input()
>>> input_proto = input_obj.get_input_from_file(input_id = 'demo', video_file='file_path')
"""
if not any((image_file, video_file, audio_file)):
raise ValueError("At least one of image_file, video_file, audio_file, must be provided.")
image_pb = resources_pb2.Image(base64=open(image_file, 'rb').read()) if image_file else None
video_pb = resources_pb2.Video(base64=open(video_file, 'rb').read()) if video_file else None
audio_pb = resources_pb2.Audio(base64=open(audio_file, 'rb').read()) if audio_file else None
return self._get_proto(
input_id=input_id,
dataset_id=dataset_id,
imagepb=image_pb,
video_pb=video_pb,
audio_pb=audio_pb,
**kwargs)
[docs] def get_input_from_bytes(self,
input_id: str,
image_bytes: bytes = None,
video_bytes: bytes = None,
audio_bytes: bytes = None,
dataset_id: str = None,
**kwargs) -> Input:
"""Create input proto from bytes.
Args:
input_id (str): The input ID for the input to create.
image_bytes (str): The bytes for the image.
video_bytes (str): The bytes for the video.
audio_bytes (str): The bytes for the audio.
dataset_id (str): The dataset ID for the dataset to add the input to.
Returns:
Input: An Input object for the specified input ID.
Example:
>>> from clarifai.client.input import Input
>>> input_obj = Input()
>>> image = open('demo.jpg', 'rb').read()
>>> video = open('demo.mp4', 'rb').read()
>>> input_proto = input_obj.get_input_from_bytes(input_id = 'demo',image_bytes =image, video_bytes=video)
"""
if not any((image_bytes, video_bytes, audio_bytes)):
raise ValueError("At least one of image_bytes, video_bytes, audio_bytes, must be provided.")
image_pb = resources_pb2.Image(base64=image_bytes) if image_bytes else None
video_pb = resources_pb2.Video(base64=video_bytes) if video_bytes else None
audio_pb = resources_pb2.Audio(base64=audio_bytes) if audio_bytes else None
return self._get_proto(
input_id=input_id,
dataset_id=dataset_id,
imagepb=image_pb,
video_pb=video_pb,
audio_pb=audio_pb,
**kwargs)
[docs] def get_image_inputs_from_folder(self,
folder_path: str,
dataset_id: str = None,
labels: bool = False) -> List[Input]: #image specific
"""Create input protos for image data type from folder.
Args:
folder_path (str): Path to the folder containing images.
Returns:
list of Input: A list of Input objects for the specified folder.
Example:
>>> from clarifai.client.input import Input
>>> input_obj = Input()
>>> input_protos = input_obj.get_image_inputs_from_folder(folder_path='demo_folder')
"""
input_protos = []
labels = [folder_path.split('/')[-1]] if labels else None
for filename in os.listdir(folder_path):
if filename.split('.')[-1] not in ['jpg', 'jpeg', 'png', 'tiff', 'webp']:
continue
input_id = filename.split('.')[0]
image_pb = resources_pb2.Image(base64=open(os.path.join(folder_path, filename), 'rb').read())
input_protos.append(
self._get_proto(
input_id=input_id, dataset_id=dataset_id, imagepb=image_pb, labels=labels))
return input_protos
[docs] def get_text_input(self, input_id: str, raw_text: str, dataset_id: str = None,
**kwargs) -> Text: #text specific
"""Create input proto for text data type from rawtext.
Args:
input_id (str): The input ID for the input to create.
raw_text (str): The raw text input.
dataset_id (str): The dataset ID for the dataset to add the input to.
**kwargs: Additional keyword arguments to be passed to the Input
Returns:
Text: An Input object for the specified input ID.
Example:
>>> from clarifai.client.input import Input
>>> input_obj = Input()
>>> input_protos = input_obj.get_text_input(input_id = 'demo', raw_text = 'This is a test')
"""
text_pb = resources_pb2.Text(raw=raw_text)
return self._get_proto(input_id=input_id, dataset_id=dataset_id, text_pb=text_pb, **kwargs)
[docs] def get_text_input_from_csv(self, csv_path: str, dataset_id: str = None,
labels: str = True) -> List[Text]: #text specific
"""Create input proto for text data type from cscv.
Args:
csv_path (str): Path to the csv file.
dataset_id (str): The dataset ID for the dataset to add the input to.
labels (str): True if csv file has labels column.
Returns:
inputs: List of inputs
Example:
>>> from clarifai.client.input import Input
>>> input_obj = Input()
>>> input_protos = input_obj.get_text_input_from_csv(csv_path = 'filepath')
"""
input_protos = []
with open(csv_path) as _file:
reader = csv.reader(_file)
next(reader, None) # skip header
for id, input in enumerate(reader):
text = input[0]
if labels:
assert len(input) == 2, "csv file should have two columns(input, labels)"
labels = input[1] if isinstance(input[1], list) else [input[1]]
else:
labels = None
input_id = f"{dataset_id}-{id}"
input_protos.append(
self.get_text_input(
input_id=input_id, raw_text=text, dataset_id=dataset_id, labels=labels))
return input_protos
[docs] def get_text_inputs_from_folder(self,
folder_path: str,
dataset_id: str = None,
labels: bool = False) -> List[Text]: #text specific
"""Create input protos for text data type from folder.
Args:
folder_path (str): Path to the folder containing text.
Returns:
list of Input: A list of Input objects for the specified folder.
Example:
>>> from clarifai.client.input import Input
>>> input_obj = Input()
>>> input_protos = input_obj.get_text_inputs_from_folder(folder_path='demo_folder')
"""
input_protos = []
labels = [folder_path.split('/')[-1]] if labels else None
for filename in os.listdir(folder_path):
if filename.split('.')[-1] != 'txt':
continue
input_id = filename.split('.')[0]
text_pb = resources_pb2.Text(raw=open(os.path.join(folder_path, filename), 'rb').read())
input_protos.append(
self._get_proto(
input_id=input_id, dataset_id=dataset_id, text_pb=text_pb, labels=labels))
return input_protos
[docs] def get_annotation_proto(self, input_id: str, label: str, annotations: List) -> Annotation:
"""Create an annotation proto for each bounding box, label input pair.
Args:
input_id (str): The input ID for the annotation to create.
label (str): annotation label
annotations (List): a list of a single bbox's coordinates. # Annotations ordering: [xmin, ymin, xmax, ymax]
Returns:
An annotation object for the specified input ID.
Example:
>>> from clarifai.client.input import Input
>>> input_obj = Input()
>>> input_obj.get_annotation_proto(input_id='demo', label='demo', annotations=[x_min, y_min, x_max, y_max])
"""
if not isinstance(annotations, list):
raise UserError("annotations must be a list of bbox cooridnates")
input_annot_proto = resources_pb2.Annotation(
input_id=input_id,
data=resources_pb2.Data(regions=[
resources_pb2.Region(
region_info=resources_pb2.RegionInfo(bounding_box=resources_pb2.BoundingBox(
# Annotations ordering: [xmin, ymin, xmax, ymax]
# top_row must be less than bottom row
# left_col must be less than right col
top_row=annotations[1], #y_min
left_col=annotations[0], #x_min
bottom_row=annotations[3], #y_max
right_col=annotations[2] #x_max
)),
data=resources_pb2.Data(concepts=[
resources_pb2.Concept(
id=f"id-{''.join(label.split(' '))}", name=label, value=1.)
]))
]))
return input_annot_proto
[docs] def get_mask_proto(self, input_id: str, label: str, polygons: List[List[float]]) -> Annotation:
"""Create an annotation proto for each polygon box, label input pair.
Args:
input_id (str): The input ID for the annotation to create.
label (str): annotation label
polygons (List): Polygon x,y points iterable
Returns:
An annotation object for the specified input ID.
Example:
>>> from clarifai.client.input import Input
>>> input_obj = Input()
>>> input_obj.get_mask_proto(input_id='demo', label='demo', polygons=[[[x,y],...,[x,y]],...])
"""
if not isinstance(polygons, list):
raise UserError("polygons must be a list of points")
input_mask_proto = resources_pb2.Annotation(
input_id=input_id,
data=resources_pb2.Data(regions=[
resources_pb2.Region(
region_info=resources_pb2.RegionInfo(polygon=resources_pb2.Polygon(
points=[
resources_pb2.Point(
row=_point[1], # row is y point
col=_point[0], # col is x point
visibility="VISIBLE") for _point in polygons
])),
data=resources_pb2.Data(concepts=[
resources_pb2.Concept(
id=f"id-{''.join(label.split(' '))}", name=label, value=1.)
]))
]))
return input_mask_proto
[docs] def upload_from_url(self,
input_id: str,
image_url: str = None,
video_url: str = None,
audio_url: str = None,
text_url: str = None,
dataset_id: str = None,
**kwargs) -> str:
"""Upload input from url.
Args:
input_id (str): The input ID for the input to create.
image_url (str): The url for the image.
video_url (str): The url for the video.
audio_url (str): The url for the audio.
text_url (str): The url for the text.
dataset_id (str): The dataset ID for the dataset to add the input to.
Returns:
input_job_id: job id for the upload request.
Example:
>>> from clarifai.client.input import Input
>>> input_obj = Input(user_id = 'user_id', app_id = 'demo_app')
>>> input_obj.upload_from_url(input_id='demo', image_url='https://samples.clarifai.com/metro-north.jpg')
"""
input_pb = self.get_input_from_url(input_id, image_url, video_url, audio_url, text_url,
dataset_id, **kwargs)
return self.upload_inputs([input_pb])
[docs] def upload_from_file(self,
input_id: str,
image_file: str = None,
video_file: str = None,
audio_file: str = None,
dataset_id: str = None,
**kwargs) -> str:
"""Upload input from file.
Args:
input_id (str): The input ID for the input to create.
image_file (str): The file for the image.
video_file (str): The file for the video.
audio_file (str): The file for the audio.
dataset_id (str): The dataset ID for the dataset to add the input to.
Returns:
input_job_id: job id for the upload request.
Example:
>>> from clarifai.client.input import Input
>>> input_obj = Input(user_id = 'user_id', app_id = 'demo_app')
>>> input_obj.upload_from_file(input_id='demo', audio_file='demo.mp3')
"""
input_pb = self.get_input_from_file(input_id, image_file, video_file, audio_file, dataset_id,
**kwargs)
return self.upload_inputs([input_pb])
[docs] def upload_from_bytes(self,
input_id: str,
image_bytes: bytes = None,
video_bytes: bytes = None,
audio_bytes: bytes = None,
dataset_id: str = None,
**kwargs) -> str:
"""Upload input from bytes.
Args:
input_id (str): The input ID for the input to create.
image_bytes (str): The bytes for the image.
video_bytes (str): The bytes for the video.
audio_bytes (str): The bytes for the audio.
dataset_id (str): The dataset ID for the dataset to add the input to.
Returns:
input_job_id: job id for the upload request.
Example:
>>> from clarifai.client.input import Input
>>> input_obj = Input(user_id = 'user_id', app_id = 'demo_app')
>>> image = open('demo.jpg', 'rb').read()
>>> input_obj.upload_from_bytes(input_id='demo', image_bytes=image)
"""
input_pb = self.get_input_from_bytes(input_id, image_bytes, video_bytes, audio_bytes,
dataset_id, **kwargs)
return self.upload_inputs([input_pb])
[docs] def upload_text(self, input_id: str, raw_text: str, dataset_id: str = None,
**kwargs) -> str: #text specific
"""Upload text from raw text.
Args:
input_id (str): The input ID for the input to create.
raw_text (str): The raw text.
dataset_id (str): The dataset ID for the dataset to add the input to.
Returns:
input_job_id (str): job id for the upload request.
Example:
>>> from clarifai.client.input import Input
>>> input_obj = Input(user_id = 'user_id', app_id = 'demo_app')
>>> input_obj.upload_text(input_id = 'demo', raw_text = 'This is a test')
"""
input_pb = self._get_proto(
input_id=input_id,
dataset_id=dataset_id,
text_pb=resources_pb2.Text(raw=raw_text),
**kwargs)
return self.upload_inputs([input_pb])
[docs] def upload_inputs(self, inputs: List[Input], show_log: bool = True) -> str:
"""Upload list of input objects to the app.
Args:
inputs (list): List of input objects to upload.
show_log (bool): Show upload status log.
Returns:
input_job_id: job id for the upload request.
"""
if not isinstance(inputs, list):
raise UserError("inputs must be a list of Input objects")
input_job_id = uuid.uuid4().hex # generate a unique id for this job
request = service_pb2.PostInputsRequest(
user_app_id=self.user_app_id, inputs=inputs, inputs_add_job_id=input_job_id)
response = self._grpc_request(self.STUB.PostInputs, request)
if response.status.code != status_code_pb2.SUCCESS:
try:
self.logger.warning(response.inputs[0].status)
except IndexError:
self.logger.warning(response.status)
else:
if show_log:
self.logger.info("\nInputs Uploaded\n%s", response.status)
return input_job_id
[docs] def upload_annotations(self, batch_annot: List[resources_pb2.Annotation], show_log: bool = True
) -> Union[List[resources_pb2.Annotation], List[None]]:
"""Upload image annotations to app.
Args:
batch_annot: annot batch protos
Returns:
retry_upload: failed annot upload
"""
retry_upload = [] # those that fail to upload are stored for retries
request = service_pb2.PostAnnotationsRequest(
user_app_id=self.user_app_id, annotations=batch_annot)
response = self._grpc_request(self.STUB.PostAnnotations, request)
if response.status.code != status_code_pb2.SUCCESS:
try:
self.logger.warning(
f"Post annotations failed, status: {response.annotations[0].status.details}")
except:
self.logger.warning(f"Post annotations failed, status: {response.status.details}")
finally:
retry_upload.extend(batch_annot)
else:
if show_log:
self.logger.info("\nAnnotations Uploaded\n%s", response.status)
return retry_upload
def _upload_batch(self, inputs: List[Input]) -> List[Input]:
"""Upload a batch of input objects to the app.
Args:
inputs (List[Input]): List of input objects to upload.
Returns:
input_job_id: job id for the upload request.
"""
input_job_id = self.upload_inputs(inputs, False)
self._wait_for_inputs(input_job_id)
failed_inputs = self._delete_failed_inputs(inputs)
return failed_inputs
[docs] def delete_inputs(self, inputs: List[Input]) -> None:
"""Delete list of input objects from the app.
Args:
input_ids (Input): List of input objects to delete.
Example:
>>> from clarifai.client.user import User
>>> input_obj = User(user_id="user_id").app(app_id="app_id").inputs()
>>> input_obj.delete_inputs(input_obj.list_inputs())
"""
if not isinstance(inputs, list):
raise UserError("input_ids must be a list of input ids")
inputs_ids = [input.id for input in inputs]
request = service_pb2.DeleteInputsRequest(user_app_id=self.user_app_id, ids=inputs_ids)
response = self._grpc_request(self.STUB.DeleteInputs, request)
if response.status.code != status_code_pb2.SUCCESS:
raise Exception(response.status)
self.logger.info("\nInputs Deleted\n%s", response.status)
[docs] def list_inputs(self) -> List[Input]: # TODO: update lister
"""Lists all the inputs for the app.
Returns:
list of Input: A list of Input objects for the app.
Example:
>>> from clarifai.client.user import User
>>> input_obj = User(user_id="user_id").app(app_id="app_id").inputs()
>>> input_obj.list_inputs()
"""
request_data = dict(user_app_id=self.user_app_id, per_page=self.default_page_size)
all_inputs_info = list(
self.list_all_pages_generator(self.STUB.ListInputs, service_pb2.ListInputsRequest,
request_data))
for input_info in all_inputs_info:
input_info['id'] = input_info.pop('input_id')
return [resources_pb2.Input(**input_info) for input_info in all_inputs_info]
def _bulk_upload(self, inputs: List[Input], chunk_size: int = 128) -> None:
"""Uploads process for large number of inputs.
Args:
inputs (List[Input]): input protos
chunk_size (int): chunk size for each request
"""
num_workers: int = min(10, cpu_count()) # limit max workers to 10
chunk_size = min(128, chunk_size) # limit max protos in a req
chunked_inputs = Chunker(inputs, chunk_size).chunk()
with ThreadPoolExecutor(max_workers=num_workers) as executor:
with tqdm(total=len(chunked_inputs), desc='Uploading inputs') as progress:
# Submit all jobs to the executor and store the returned futures
futures = [
executor.submit(self._upload_batch, batch_input_ids)
for batch_input_ids in chunked_inputs
]
for job in as_completed(futures):
retry_input_proto = job.result()
self._retry_uploads(retry_input_proto)
progress.update()
def _wait_for_inputs(self, input_job_id: str) -> bool:
"""Wait for inputs to be processed. Cancel Job if timeout > 30 minutes.
Args:
input_job_id (str): Upload Input Job ID
Returns:
True if inputs are processed, False otherwise
"""
backoff_iterator = BackoffIterator()
max_retries = 10
start_time = time.time()
while True:
request = service_pb2.GetInputsAddJobRequest(user_app_id=self.user_app_id, id=input_job_id)
response = self._grpc_request(self.STUB.GetInputsAddJob, request)
if time.time() - start_time > 60 * 30 or max_retries == 0: # 30 minutes timeout
self._grpc_request(self.STUB.CancelInputsAddJob,
service_pb2.CancelInputsAddJobRequest(
user_app_id=self.user_app_id, id=input_job_id)) #Cancel Job
return False
if response.status.code != status_code_pb2.SUCCESS:
max_retries -= 1
self.logger.warning(f"Get input job failed, status: {response.status.details}\n")
continue
if response.inputs_add_job.progress.in_progress_count == 0 and response.inputs_add_job.progress.pending_count == 0:
return True
else:
time.sleep(next(backoff_iterator))
def _retry_uploads(self, failed_inputs: List[Input]) -> None:
"""Retry failed uploads.
Args:
failed_inputs (List[Input]): failed input prots
"""
if failed_inputs:
self._upload_batch(failed_inputs)
def _delete_failed_inputs(self, inputs: List[Input]) -> List[Input]:
"""Delete failed input ids from clarifai platform dataset.
Args:
inputs (List[Input]): batch input protos
Returns:
failed_inputs: failed inputs
"""
input_ids = [input.id for input in inputs]
success_status = status_pb2.Status(code=status_code_pb2.INPUT_DOWNLOAD_SUCCESS)
request = service_pb2.ListInputsRequest(
ids=input_ids,
per_page=len(input_ids),
user_app_id=self.user_app_id,
status=success_status)
response = self._grpc_request(self.STUB.ListInputs, request)
response_dict = MessageToDict(response)
success_inputs = response_dict.get('inputs', [])
success_input_ids = [input.get('id') for input in success_inputs]
failed_inputs = [input for input in inputs if input.id not in success_input_ids]
#delete failed inputs
self._grpc_request(self.STUB.DeleteInputs,
service_pb2.DeleteInputsRequest(
user_app_id=self.user_app_id, ids=[input.id
for input in failed_inputs]))
return failed_inputs
def __getattr__(self, name):
return getattr(self.input_info, name)
def __str__(self):
init_params = [param for param in self.kwargs.keys()]
attribute_strings = [
f"{param}={getattr(self.input_info, param)}" for param in init_params
if hasattr(self.input_info, param)
]
return f"Input Details: \n{', '.join(attribute_strings)}\n"