TorchServe failed load

Hello, I’m trying create my own base_handler by Documentation, but my skills are low to understand how correct write for my task. (I’m Junior DS :frowning: )
Very need any help!
I have wrote script which just processes the data, feeds it to the Binary Classification model and outputs the result
I a lot tried describe correctly by example serve/base_handler.py at master · pytorch/serve · GitHub and other examples but I can’t understand if I correct write all code
curl:

curl -F data=@test.csv http://127.0.0.1:8080/predictions/binclaff
{
  "code": 503,
  "type": "InternalServerException",
  "message": "Prediction failed"
}

Log:

2021-05-24 15:01:30,963 [INFO ] W-9000-binclaff_0.1-stdout org.pytorch.serve.wlm.WorkerLifeCycle - Invoking custom service failed.
2021-05-24 15:01:30,963 [INFO ] W-9000-binclaff_0.1-stdout org.pytorch.serve.wlm.WorkerLifeCycle - Traceback (most recent call last):
2021-05-24 15:01:30,963 [INFO ] W-9000-binclaff_0.1-stdout org.pytorch.serve.wlm.WorkerLifeCycle -   File "/home/alexey/.local/lib/python3.8/site-packages/ts/service.py", line 100, in predict
2021-05-24 15:01:30,963 [INFO ] W-9000-binclaff_0.1-stdout org.pytorch.serve.wlm.WorkerLifeCycle -     ret = self._entry_point(input_batch, self.context)
2021-05-24 15:01:30,963 [INFO ] W-9000-binclaff_0.1-stdout org.pytorch.serve.wlm.WorkerLifeCycle -   File "/tmp/models/705a8b53edde46d384eaf6b6f46d6802/my_handler.py", line 191, in handle
2021-05-24 15:01:30,963 [INFO ] W-9000-binclaff_0.1-stdout org.pytorch.serve.wlm.WorkerLifeCycle -     data = _service.preprocess(data)
2021-05-24 15:01:30,964 [INFO ] W-9000-binclaff_0.1-stdout org.pytorch.serve.wlm.WorkerLifeCycle -   File "/tmp/models/705a8b53edde46d384eaf6b6f46d6802/my_handler.py", line 89, in preprocess
2021-05-24 15:01:30,964 [INFO ] W-9000-binclaff_0.1-stdout org.pytorch.serve.wlm.WorkerLifeCycle -     df = read_csv(filename, dtype={'SessionId': uint64})
2021-05-24 15:01:30,964 [INFO ] W-9000-binclaff_0.1-stdout org.pytorch.serve.wlm.WorkerLifeCycle -   File "/home/alexey/.local/lib/python3.8/site-packages/pandas/io/parsers.py", line 610, in read_csv
2021-05-24 15:01:30,964 [INFO ] W-9000-binclaff_0.1-stdout org.pytorch.serve.wlm.WorkerLifeCycle -     return _read(filepath_or_buffer, kwds)
2021-05-24 15:01:30,964 [INFO ] W-9000-binclaff_0.1-stdout org.pytorch.serve.wlm.WorkerLifeCycle -   File "/home/alexey/.local/lib/python3.8/site-packages/pandas/io/parsers.py", line 462, in _read
2021-05-24 15:01:30,964 [INFO ] W-9000-binclaff_0.1-stdout org.pytorch.serve.wlm.WorkerLifeCycle -     parser = TextFileReader(filepath_or_buffer, **kwds)
2021-05-24 15:01:30,964 [INFO ] W-9000-binclaff_0.1-stdout org.pytorch.serve.wlm.WorkerLifeCycle -   File "/home/alexey/.local/lib/python3.8/site-packages/pandas/io/parsers.py", line 819, in __init__
2021-05-24 15:01:30,964 [INFO ] W-9000-binclaff_0.1-stdout org.pytorch.serve.wlm.WorkerLifeCycle -     self._engine = self._make_engine(self.engine)
2021-05-24 15:01:30,964 [INFO ] W-9000-binclaff_0.1-stdout org.pytorch.serve.wlm.WorkerLifeCycle -   File "/home/alexey/.local/lib/python3.8/site-packages/pandas/io/parsers.py", line 1050, in _make_engine
2021-05-24 15:01:30,964 [INFO ] W-9000-binclaff_0.1 org.pytorch.serve.wlm.WorkerThread - Backend response time: 4
2021-05-24 15:01:30,964 [INFO ] W-9000-binclaff_0.1-stdout org.pytorch.serve.wlm.WorkerLifeCycle -     return mapping[engine](self.f, **self.options)  # type: ignore[call-arg]
2021-05-24 15:01:30,965 [INFO ] W-9000-binclaff_0.1-stdout org.pytorch.serve.wlm.WorkerLifeCycle -   File "/home/alexey/.local/lib/python3.8/site-packages/pandas/io/parsers.py", line 1867, in __init__
2021-05-24 15:01:30,965 [INFO ] W-9000-binclaff_0.1-stdout org.pytorch.serve.wlm.WorkerLifeCycle -     self._open_handles(src, kwds)
2021-05-24 15:01:30,965 [INFO ] W-9000-binclaff_0.1-stdout org.pytorch.serve.wlm.WorkerLifeCycle -   File "/home/alexey/.local/lib/python3.8/site-packages/pandas/io/parsers.py", line 1362, in _open_handles
2021-05-24 15:01:30,965 [INFO ] W-9000-binclaff_0.1 ACCESS_LOG - /127.0.0.1:42598 "POST /predictions/binclaff HTTP/1.1" 503 32
2021-05-24 15:01:30,965 [INFO ] W-9000-binclaff_0.1 TS_METRICS - Requests5XX.Count:1|#Level:Host|#hostname:alexey-GB-BKi5-H-A-7200,timestamp:null
2021-05-24 15:01:30,965 [INFO ] W-9000-binclaff_0.1-stdout org.pytorch.serve.wlm.WorkerLifeCycle -     self.handles = get_handle(
2021-05-24 15:01:30,965 [INFO ] W-9000-binclaff_0.1-stdout org.pytorch.serve.wlm.WorkerLifeCycle -   File "/home/alexey/.local/lib/python3.8/site-packages/pandas/io/common.py", line 558, in get_handle
2021-05-24 15:01:30,965 [INFO ] W-9000-binclaff_0.1-stdout org.pytorch.serve.wlm.WorkerLifeCycle -     ioargs = _get_filepath_or_buffer(
2021-05-24 15:01:30,966 [INFO ] W-9000-binclaff_0.1-stdout org.pytorch.serve.wlm.WorkerLifeCycle -   File "/home/alexey/.local/lib/python3.8/site-packages/pandas/io/common.py", line 371, in _get_filepath_or_buffer
2021-05-24 15:01:30,966 [INFO ] W-9000-binclaff_0.1-stdout org.pytorch.serve.wlm.WorkerLifeCycle -     raise ValueError(msg)
2021-05-24 15:01:30,966 [INFO ] W-9000-binclaff_0.1-stdout org.pytorch.serve.wlm.WorkerLifeCycle - ValueError: Invalid file path or buffer object type: <class 'list'>
2021-05-24 15:01:30,965 [DEBUG] W-9000-binclaff_0.1 org.pytorch.serve.job.Job - Waiting time ns: 123775, Inference time ns: 5403468

My Script

import pathlib

from numpy import uint64

from pandas import read_csv
from pandas import DataFrame
from pandas import SparseDtype
from pandas import concat

from sys import argv
import joblib
import torch
from torch.utils.data import Dataset, DataLoader
from mlxtend.preprocessing import TransactionEncoder

script, filename = argv

absolutePAth = pathlib.Path(__file__).parent.absolute()
model = "%s/jit_model_best.pt" % absolutePAth
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

class Prepocessing(DataLoader):
    # load the dataset
    def __init__(self, path):
        # load the csv file as a dataframe
        self.df = read_csv(path, dtype={'SessionId': uint64})

    def data_tranform(self):
        self.df = self.df.drop({'IdentityId', 'SessionId', 'IsBot', 'ClientHash', 'ClientIP', 'UserAgent', 'AccountId',
                                'is_bot'},
                               axis=1, errors='ignore')

        self.df.external_referer_domains = self.df.external_referer_domains.str.replace(r'\]\[\'', '', regex=True)
        self.df['external_referer_domains'] = self.df.external_referer_domains.str.findall(
            '(?:(?:https?|ftp):\/\/)?[\w/\-?=%.]+\.[\w/\-&?=%.]+')
        self.df['count_external_referer_domains'] = self.df['external_referer_domains'].apply(lambda x: len(x))

        # нормальный массив из значений ивента и роутов
        # подсчет колличество используеммых 1 пользователем ивентов и маршрутов
        self.df['events'] = self.df.events.str.findall(r"\d+")
        self.df['unique_routes'] = self.df.unique_routes.str.findall(r"\d+")
        self.df['count_events'] = self.df['events'].apply(lambda x: len(x))
        self.df['count_unique_routes'] = self.df['unique_routes'].apply(lambda x: len(x))

        self.df['101'] = self.df.unique_routes.apply(lambda x: int('101' in x))
        self.df['282'] = self.df.unique_routes.apply(lambda x: int('282' in x))
        self.df['281'] = self.df.unique_routes.apply(lambda x: int('281' in x))
        self.df['266'] = self.df.unique_routes.apply(lambda x: int('266' in x))
        self.df['271'] = self.df.unique_routes.apply(lambda x: int('271' in x))
        self.df['279'] = self.df.unique_routes.apply(lambda x: int('279' in x))
        self.df['1'] = self.df.unique_routes.apply(lambda x: int('1' in x))
        self.df['5'] = self.df.unique_routes.apply(lambda x: int('5' in x))
        self.df['203'] = self.df.unique_routes.apply(lambda x: int('203' in x))

        self.df = self.df.fillna(0)

        # transaction encoder for events
        te = TransactionEncoder()
        te.__dict__['columns_mapping_'] = {str(i): i for i in list(range(48))}
        te_ary = te.fit(self.df.events).transform(self.df.events, sparse=True)

        if te_ary.shape[1] < 48:
            te.__dict__['columns_mapping_'] = {str(i): i for i in list(range(te_ary.shape[1]))}
            self.df_events = DataFrame.sparse.from_spmatrix(te_ary, columns=te.__dict__['columns_mapping_'].keys())
            a = 48 - te_ary.shape[1]
            b = 48 - a
            for i in list(range(b, 48)):
                self.df_events[i] = int(0)
                self.df_events = self.df_events.astype(SparseDtype(int))

        else:
            self.df_events = DataFrame.sparse.from_spmatrix(te_ary, columns=te.__dict__['columns_mapping_'].keys())
            self.df_events = self.df_events.astype(SparseDtype(int))

        self.df_events = self.df_events.add_prefix('event_')

        # drop events and routes bec we have spartse matrix
        self.df = self.df.drop({'events', 'unique_routes', 'external_referer_domains'}, axis=1)
        self.df = concat([self.df, self.df_events], axis=1, join="inner")

        # load StandardScaler
        scaler = joblib.load("%s/std_scaler_new.bin" % absolutePAth)
        self.df = scaler.transform(self.df.values)

        test_loader = DataLoader(dataset=torch.FloatTensor(self.df), batch_size=1)

        return test_loader

    def load_data(path):
        df = read_csv(path, dtype={'SessionId': uint64})
        return df




test_loader = Prepocessing(filename).data_tranform()

the_model = torch.jit.load(model, map_location=device)
the_model.eval()

y_pred_list = []
with torch.no_grad():
    for X_batch in test_loader:
        X_batch = X_batch.cpu()
        y_test_pred = the_model.cpu()(X_batch)
        y_test_pred = torch.sigmoid(y_test_pred)
        y_pred_tag = torch.round(y_test_pred)
        y_pred_list.append(y_pred_tag.cpu().numpy())

y_pred_list = [a.squeeze().tolist() for a in y_pred_list]

df = Prepocessing.load_data(filename)
df = DataFrame(
    {"SessionId": df.SessionId,
     "IsBot": y_pred_list})

My own fail handler

import logging
import os
import io
import re
import abc
from sys import argv
import joblib
import telegram
import csv
from numpy import uint64
from pandas import read_csv
from pandas import DataFrame
from pandas import SparseDtype
from pandas import concat
from mlxtend.preprocessing import TransactionEncoder
from ts.torch_handler.base_handler import BaseHandler

import torch
from torch.utils.data import Dataset, DataLoader

logger = logging.getLogger(__name__)

def tlog2(message):
    token = "1809630354:AAGAYquyq6Aev5obJJ_7W8H8xCqX36AVX2o"
    chat_id = "-557190848"
    bot = telegram.Bot(token=token)
    bot.sendMessage(chat_id=chat_id, text=message)

class MyHandler(BaseHandler):
    """
    Base default handler to load torchscript or eager mode [state_dict] models
    Also, provides handle method per torch serve custom model specification
    """

    def __init__(self):
        super(MyHandler, self).__init__()
        self.model = None
        self.mapping = None
        self.device = None
        self.initialized = False
        self.context = None
        self.manifest = None
        self.map_location = None
        self.explain = False
        self.target = 0

    def initialize(self, context):
        """Initialize function loads the model.pt file and initialized the model object.
	   First try to load torchscript else load eager mode state_dict based model.
        Args:
            context (context): It is a JSON Object containing information
            pertaining to the model artifacts parameters.
        Raises:
            RuntimeError: Raises the Runtime error when the model.py is missing
        """
        properties = context.system_properties
        # на чем будет работать GPU or CPU
        self.map_location = "cuda" if torch.cuda.is_available() and properties.get("gpu_id") is not None else "cpu"
        self.device = torch.device(
            self.map_location + ":" + str(properties.get("gpu_id"))
            if torch.cuda.is_available() and properties.get("gpu_id") is not None
            else self.map_location
        )
        self.manifest = context.manifest

        model_dir = properties.get("model_dir")
        model_pt_path = None
        # определяем где находиться файл модель
        if "serializedFile" in self.manifest["model"]:
            serialized_file = self.manifest["model"]["serializedFile"]
            model_pt_path = os.path.join(model_dir, serialized_file)

        self.model = torch.jit.load(model_pt_path, map_location=self.device)
        self.model.eval()

        logger.debug('Model file %s loaded successfully', model_pt_path)

        self.initialized = True

        tlog2('initialize прошла успешно')

    def preprocess(self, filename):
        """
        Вставляем наш файл csv, который в дальнейшем открывается через Pandas и делает все обработки
        :param filename:
        :return:
        На выходе получаем загружанные данные в PyTorch DataLoader
        """
        df = read_csv(filename, dtype={'SessionId': uint64})
        tlog2('файл открылся успешно прошла успешно')
        df = df.drop({'IdentityId', 'SessionId', 'IsBot', 'ClientHash', 'ClientIP', 'UserAgent', 'AccountId',
                                'is_bot', 'is_robot'},
                               axis=1, errors='ignore')

        df.external_referer_domains = df.external_referer_domains.str.replace(r'\]\[\'', '', regex=True)
        df['external_referer_domains'] = df.external_referer_domains.str.findall(
            '(?:(?:https?|ftp):\/\/)?[\w/\-?=%.]+\.[\w/\-&?=%.]+')
        df['count_external_referer_domains'] = df['external_referer_domains'].apply(lambda x: len(x))

        # нормальный массив из значений ивента и роутов
        # подсчет колличество используеммых 1 пользователем ивентов и маршрутов
        df['events'] = df.events.str.findall(r"\d+")
        df['unique_routes'] = df.unique_routes.str.findall(r"\d+")
        df['count_events'] = df['events'].apply(lambda x: len(x))
        df['count_unique_routes'] = df['unique_routes'].apply(lambda x: len(x))

        df['101'] = df.unique_routes.apply(lambda x: int('101' in x))
        df['282'] = df.unique_routes.apply(lambda x: int('282' in x))
        df['281'] = df.unique_routes.apply(lambda x: int('281' in x))
        df['266'] = df.unique_routes.apply(lambda x: int('266' in x))
        df['271'] = df.unique_routes.apply(lambda x: int('271' in x))
        df['279'] = df.unique_routes.apply(lambda x: int('279' in x))
        df['1'] = df.unique_routes.apply(lambda x: int('1' in x))
        df['5'] = df.unique_routes.apply(lambda x: int('5' in x))
        df['203'] = df.unique_routes.apply(lambda x: int('203' in x))
        df['275'] = df.unique_routes.apply(lambda x: int('275' in x))

        df = df.fillna(0)

        # transaction encoder for events
        te = TransactionEncoder()
        te.__dict__['columns_mapping_'] = {str(i): i for i in list(range(48))}
        te_ary = te.fit(df.events).transform(df.events, sparse=True)

        if te_ary.shape[1] < 48:
            te.__dict__['columns_mapping_'] = {str(i): i for i in list(range(te_ary.shape[1]))}
            df_events = DataFrame.sparse.from_spmatrix(te_ary, columns=te.__dict__['columns_mapping_'].keys())
            a = 48 - te_ary.shape[1]
            b = 48 - a
            for i in list(range(b, 48)):
                df_events[i] = int(0)
                df_events = df_events.astype(SparseDtype(int))

        else:
            df_events = DataFrame.sparse.from_spmatrix(te_ary, columns=te.__dict__['columns_mapping_'].keys())
            df_events = df_events.astype(SparseDtype(int))

        df_events = df_events.add_prefix('event_')

        # drop events and routes bec we have spartse matrix
        df = df.drop({'events', 'unique_routes', 'external_referer_domains'}, axis=1)
        df = concat([df, df_events], axis=1, join="inner")

        # load StandardScaler
        scaler = joblib.load("scaler.bin")
        df = scaler.transform(df.values)

        df = torch.FloatTensor(df)
        test_loader = DataLoader(dataset=df, batch_size=1)

        return test_loader


    def inference(self, data):

        y_pred_list = []
        with torch.no_grad():
            for X_batch in data:
                X_batch = X_batch.cpu()
                y_test_pred = self.model.cpu()(X_batch)
                y_test_pred = torch.sigmoid(y_test_pred)
                y_pred_tag = torch.round(y_test_pred)
                y_pred_list.append(y_pred_tag.cpu().numpy())
                y_pred_list = [a.squeeze().tolist() for a in y_pred_list]

        return y_pred_list


    def postprocess(self, preds):
        # TODO: Add any needed post-processing of the model predictions here

        res = []
        # pres has size [BATCH_SIZE, 1]
        # convert it to list
        preds = preds.cpu().tolist()
        for pred in preds:
            label = self.mapping[str(pred)][1]
            res.append({'label': label, 'index': pred})
        return res


_service = MyHandler()

def handle(data, context):
    if not _service.initialized:
        _service.initialize(context)

    if data is None:
        return None

    data = _service.preprocess(data)
    data = _service.inference(data)
    data = _service.postprocess(data)

    return data

my model mar in DropBox: Dropbox - binclaff.mar - Simplify your life

I will be very grateful if someone can help me correctly describe my script in the handler and show my mistakes.

Based on the stack trace it seems that pandas is failing to load the file:

ValueError: Invalid file path or buffer object type: <class 'list'>

Could you check, if the path is valid in self.df = read_csv(path, dtype={'SessionId': uint64})?

I fixed this issue. Solution for send request with csv file:

 df = read_csv(StringIO(path[0].get("body").decode()), dtype={'SessionId': uint64, 'IdentityId': uint64})