Hello, I’m trying create my own base_handler by Documentation, but my skills are low to understand how correct write for my task. (I’m Junior DS )
Very need any help!
I have wrote script which just processes the data, feeds it to the Binary Classification model and outputs the result
I a lot tried describe correctly by example serve/base_handler.py at master · pytorch/serve · GitHub and other examples but I can’t understand if I correct write all code
curl:
curl -F data=@test.csv http://127.0.0.1:8080/predictions/binclaff
{
"code": 503,
"type": "InternalServerException",
"message": "Prediction failed"
}
Log:
2021-05-24 15:01:30,963 [INFO ] W-9000-binclaff_0.1-stdout org.pytorch.serve.wlm.WorkerLifeCycle - Invoking custom service failed.
2021-05-24 15:01:30,963 [INFO ] W-9000-binclaff_0.1-stdout org.pytorch.serve.wlm.WorkerLifeCycle - Traceback (most recent call last):
2021-05-24 15:01:30,963 [INFO ] W-9000-binclaff_0.1-stdout org.pytorch.serve.wlm.WorkerLifeCycle - File "/home/alexey/.local/lib/python3.8/site-packages/ts/service.py", line 100, in predict
2021-05-24 15:01:30,963 [INFO ] W-9000-binclaff_0.1-stdout org.pytorch.serve.wlm.WorkerLifeCycle - ret = self._entry_point(input_batch, self.context)
2021-05-24 15:01:30,963 [INFO ] W-9000-binclaff_0.1-stdout org.pytorch.serve.wlm.WorkerLifeCycle - File "/tmp/models/705a8b53edde46d384eaf6b6f46d6802/my_handler.py", line 191, in handle
2021-05-24 15:01:30,963 [INFO ] W-9000-binclaff_0.1-stdout org.pytorch.serve.wlm.WorkerLifeCycle - data = _service.preprocess(data)
2021-05-24 15:01:30,964 [INFO ] W-9000-binclaff_0.1-stdout org.pytorch.serve.wlm.WorkerLifeCycle - File "/tmp/models/705a8b53edde46d384eaf6b6f46d6802/my_handler.py", line 89, in preprocess
2021-05-24 15:01:30,964 [INFO ] W-9000-binclaff_0.1-stdout org.pytorch.serve.wlm.WorkerLifeCycle - df = read_csv(filename, dtype={'SessionId': uint64})
2021-05-24 15:01:30,964 [INFO ] W-9000-binclaff_0.1-stdout org.pytorch.serve.wlm.WorkerLifeCycle - File "/home/alexey/.local/lib/python3.8/site-packages/pandas/io/parsers.py", line 610, in read_csv
2021-05-24 15:01:30,964 [INFO ] W-9000-binclaff_0.1-stdout org.pytorch.serve.wlm.WorkerLifeCycle - return _read(filepath_or_buffer, kwds)
2021-05-24 15:01:30,964 [INFO ] W-9000-binclaff_0.1-stdout org.pytorch.serve.wlm.WorkerLifeCycle - File "/home/alexey/.local/lib/python3.8/site-packages/pandas/io/parsers.py", line 462, in _read
2021-05-24 15:01:30,964 [INFO ] W-9000-binclaff_0.1-stdout org.pytorch.serve.wlm.WorkerLifeCycle - parser = TextFileReader(filepath_or_buffer, **kwds)
2021-05-24 15:01:30,964 [INFO ] W-9000-binclaff_0.1-stdout org.pytorch.serve.wlm.WorkerLifeCycle - File "/home/alexey/.local/lib/python3.8/site-packages/pandas/io/parsers.py", line 819, in __init__
2021-05-24 15:01:30,964 [INFO ] W-9000-binclaff_0.1-stdout org.pytorch.serve.wlm.WorkerLifeCycle - self._engine = self._make_engine(self.engine)
2021-05-24 15:01:30,964 [INFO ] W-9000-binclaff_0.1-stdout org.pytorch.serve.wlm.WorkerLifeCycle - File "/home/alexey/.local/lib/python3.8/site-packages/pandas/io/parsers.py", line 1050, in _make_engine
2021-05-24 15:01:30,964 [INFO ] W-9000-binclaff_0.1 org.pytorch.serve.wlm.WorkerThread - Backend response time: 4
2021-05-24 15:01:30,964 [INFO ] W-9000-binclaff_0.1-stdout org.pytorch.serve.wlm.WorkerLifeCycle - return mapping[engine](self.f, **self.options) # type: ignore[call-arg]
2021-05-24 15:01:30,965 [INFO ] W-9000-binclaff_0.1-stdout org.pytorch.serve.wlm.WorkerLifeCycle - File "/home/alexey/.local/lib/python3.8/site-packages/pandas/io/parsers.py", line 1867, in __init__
2021-05-24 15:01:30,965 [INFO ] W-9000-binclaff_0.1-stdout org.pytorch.serve.wlm.WorkerLifeCycle - self._open_handles(src, kwds)
2021-05-24 15:01:30,965 [INFO ] W-9000-binclaff_0.1-stdout org.pytorch.serve.wlm.WorkerLifeCycle - File "/home/alexey/.local/lib/python3.8/site-packages/pandas/io/parsers.py", line 1362, in _open_handles
2021-05-24 15:01:30,965 [INFO ] W-9000-binclaff_0.1 ACCESS_LOG - /127.0.0.1:42598 "POST /predictions/binclaff HTTP/1.1" 503 32
2021-05-24 15:01:30,965 [INFO ] W-9000-binclaff_0.1 TS_METRICS - Requests5XX.Count:1|#Level:Host|#hostname:alexey-GB-BKi5-H-A-7200,timestamp:null
2021-05-24 15:01:30,965 [INFO ] W-9000-binclaff_0.1-stdout org.pytorch.serve.wlm.WorkerLifeCycle - self.handles = get_handle(
2021-05-24 15:01:30,965 [INFO ] W-9000-binclaff_0.1-stdout org.pytorch.serve.wlm.WorkerLifeCycle - File "/home/alexey/.local/lib/python3.8/site-packages/pandas/io/common.py", line 558, in get_handle
2021-05-24 15:01:30,965 [INFO ] W-9000-binclaff_0.1-stdout org.pytorch.serve.wlm.WorkerLifeCycle - ioargs = _get_filepath_or_buffer(
2021-05-24 15:01:30,966 [INFO ] W-9000-binclaff_0.1-stdout org.pytorch.serve.wlm.WorkerLifeCycle - File "/home/alexey/.local/lib/python3.8/site-packages/pandas/io/common.py", line 371, in _get_filepath_or_buffer
2021-05-24 15:01:30,966 [INFO ] W-9000-binclaff_0.1-stdout org.pytorch.serve.wlm.WorkerLifeCycle - raise ValueError(msg)
2021-05-24 15:01:30,966 [INFO ] W-9000-binclaff_0.1-stdout org.pytorch.serve.wlm.WorkerLifeCycle - ValueError: Invalid file path or buffer object type: <class 'list'>
2021-05-24 15:01:30,965 [DEBUG] W-9000-binclaff_0.1 org.pytorch.serve.job.Job - Waiting time ns: 123775, Inference time ns: 5403468
My Script
import pathlib
from numpy import uint64
from pandas import read_csv
from pandas import DataFrame
from pandas import SparseDtype
from pandas import concat
from sys import argv
import joblib
import torch
from torch.utils.data import Dataset, DataLoader
from mlxtend.preprocessing import TransactionEncoder
script, filename = argv
absolutePAth = pathlib.Path(__file__).parent.absolute()
model = "%s/jit_model_best.pt" % absolutePAth
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
class Prepocessing(DataLoader):
# load the dataset
def __init__(self, path):
# load the csv file as a dataframe
self.df = read_csv(path, dtype={'SessionId': uint64})
def data_tranform(self):
self.df = self.df.drop({'IdentityId', 'SessionId', 'IsBot', 'ClientHash', 'ClientIP', 'UserAgent', 'AccountId',
'is_bot'},
axis=1, errors='ignore')
self.df.external_referer_domains = self.df.external_referer_domains.str.replace(r'\]\[\'', '', regex=True)
self.df['external_referer_domains'] = self.df.external_referer_domains.str.findall(
'(?:(?:https?|ftp):\/\/)?[\w/\-?=%.]+\.[\w/\-&?=%.]+')
self.df['count_external_referer_domains'] = self.df['external_referer_domains'].apply(lambda x: len(x))
# нормальный массив из значений ивента и роутов
# подсчет колличество используеммых 1 пользователем ивентов и маршрутов
self.df['events'] = self.df.events.str.findall(r"\d+")
self.df['unique_routes'] = self.df.unique_routes.str.findall(r"\d+")
self.df['count_events'] = self.df['events'].apply(lambda x: len(x))
self.df['count_unique_routes'] = self.df['unique_routes'].apply(lambda x: len(x))
self.df['101'] = self.df.unique_routes.apply(lambda x: int('101' in x))
self.df['282'] = self.df.unique_routes.apply(lambda x: int('282' in x))
self.df['281'] = self.df.unique_routes.apply(lambda x: int('281' in x))
self.df['266'] = self.df.unique_routes.apply(lambda x: int('266' in x))
self.df['271'] = self.df.unique_routes.apply(lambda x: int('271' in x))
self.df['279'] = self.df.unique_routes.apply(lambda x: int('279' in x))
self.df['1'] = self.df.unique_routes.apply(lambda x: int('1' in x))
self.df['5'] = self.df.unique_routes.apply(lambda x: int('5' in x))
self.df['203'] = self.df.unique_routes.apply(lambda x: int('203' in x))
self.df = self.df.fillna(0)
# transaction encoder for events
te = TransactionEncoder()
te.__dict__['columns_mapping_'] = {str(i): i for i in list(range(48))}
te_ary = te.fit(self.df.events).transform(self.df.events, sparse=True)
if te_ary.shape[1] < 48:
te.__dict__['columns_mapping_'] = {str(i): i for i in list(range(te_ary.shape[1]))}
self.df_events = DataFrame.sparse.from_spmatrix(te_ary, columns=te.__dict__['columns_mapping_'].keys())
a = 48 - te_ary.shape[1]
b = 48 - a
for i in list(range(b, 48)):
self.df_events[i] = int(0)
self.df_events = self.df_events.astype(SparseDtype(int))
else:
self.df_events = DataFrame.sparse.from_spmatrix(te_ary, columns=te.__dict__['columns_mapping_'].keys())
self.df_events = self.df_events.astype(SparseDtype(int))
self.df_events = self.df_events.add_prefix('event_')
# drop events and routes bec we have spartse matrix
self.df = self.df.drop({'events', 'unique_routes', 'external_referer_domains'}, axis=1)
self.df = concat([self.df, self.df_events], axis=1, join="inner")
# load StandardScaler
scaler = joblib.load("%s/std_scaler_new.bin" % absolutePAth)
self.df = scaler.transform(self.df.values)
test_loader = DataLoader(dataset=torch.FloatTensor(self.df), batch_size=1)
return test_loader
def load_data(path):
df = read_csv(path, dtype={'SessionId': uint64})
return df
test_loader = Prepocessing(filename).data_tranform()
the_model = torch.jit.load(model, map_location=device)
the_model.eval()
y_pred_list = []
with torch.no_grad():
for X_batch in test_loader:
X_batch = X_batch.cpu()
y_test_pred = the_model.cpu()(X_batch)
y_test_pred = torch.sigmoid(y_test_pred)
y_pred_tag = torch.round(y_test_pred)
y_pred_list.append(y_pred_tag.cpu().numpy())
y_pred_list = [a.squeeze().tolist() for a in y_pred_list]
df = Prepocessing.load_data(filename)
df = DataFrame(
{"SessionId": df.SessionId,
"IsBot": y_pred_list})
My own fail handler
import logging
import os
import io
import re
import abc
from sys import argv
import joblib
import telegram
import csv
from numpy import uint64
from pandas import read_csv
from pandas import DataFrame
from pandas import SparseDtype
from pandas import concat
from mlxtend.preprocessing import TransactionEncoder
from ts.torch_handler.base_handler import BaseHandler
import torch
from torch.utils.data import Dataset, DataLoader
logger = logging.getLogger(__name__)
def tlog2(message):
token = "1809630354:AAGAYquyq6Aev5obJJ_7W8H8xCqX36AVX2o"
chat_id = "-557190848"
bot = telegram.Bot(token=token)
bot.sendMessage(chat_id=chat_id, text=message)
class MyHandler(BaseHandler):
"""
Base default handler to load torchscript or eager mode [state_dict] models
Also, provides handle method per torch serve custom model specification
"""
def __init__(self):
super(MyHandler, self).__init__()
self.model = None
self.mapping = None
self.device = None
self.initialized = False
self.context = None
self.manifest = None
self.map_location = None
self.explain = False
self.target = 0
def initialize(self, context):
"""Initialize function loads the model.pt file and initialized the model object.
First try to load torchscript else load eager mode state_dict based model.
Args:
context (context): It is a JSON Object containing information
pertaining to the model artifacts parameters.
Raises:
RuntimeError: Raises the Runtime error when the model.py is missing
"""
properties = context.system_properties
# на чем будет работать GPU or CPU
self.map_location = "cuda" if torch.cuda.is_available() and properties.get("gpu_id") is not None else "cpu"
self.device = torch.device(
self.map_location + ":" + str(properties.get("gpu_id"))
if torch.cuda.is_available() and properties.get("gpu_id") is not None
else self.map_location
)
self.manifest = context.manifest
model_dir = properties.get("model_dir")
model_pt_path = None
# определяем где находиться файл модель
if "serializedFile" in self.manifest["model"]:
serialized_file = self.manifest["model"]["serializedFile"]
model_pt_path = os.path.join(model_dir, serialized_file)
self.model = torch.jit.load(model_pt_path, map_location=self.device)
self.model.eval()
logger.debug('Model file %s loaded successfully', model_pt_path)
self.initialized = True
tlog2('initialize прошла успешно')
def preprocess(self, filename):
"""
Вставляем наш файл csv, который в дальнейшем открывается через Pandas и делает все обработки
:param filename:
:return:
На выходе получаем загружанные данные в PyTorch DataLoader
"""
df = read_csv(filename, dtype={'SessionId': uint64})
tlog2('файл открылся успешно прошла успешно')
df = df.drop({'IdentityId', 'SessionId', 'IsBot', 'ClientHash', 'ClientIP', 'UserAgent', 'AccountId',
'is_bot', 'is_robot'},
axis=1, errors='ignore')
df.external_referer_domains = df.external_referer_domains.str.replace(r'\]\[\'', '', regex=True)
df['external_referer_domains'] = df.external_referer_domains.str.findall(
'(?:(?:https?|ftp):\/\/)?[\w/\-?=%.]+\.[\w/\-&?=%.]+')
df['count_external_referer_domains'] = df['external_referer_domains'].apply(lambda x: len(x))
# нормальный массив из значений ивента и роутов
# подсчет колличество используеммых 1 пользователем ивентов и маршрутов
df['events'] = df.events.str.findall(r"\d+")
df['unique_routes'] = df.unique_routes.str.findall(r"\d+")
df['count_events'] = df['events'].apply(lambda x: len(x))
df['count_unique_routes'] = df['unique_routes'].apply(lambda x: len(x))
df['101'] = df.unique_routes.apply(lambda x: int('101' in x))
df['282'] = df.unique_routes.apply(lambda x: int('282' in x))
df['281'] = df.unique_routes.apply(lambda x: int('281' in x))
df['266'] = df.unique_routes.apply(lambda x: int('266' in x))
df['271'] = df.unique_routes.apply(lambda x: int('271' in x))
df['279'] = df.unique_routes.apply(lambda x: int('279' in x))
df['1'] = df.unique_routes.apply(lambda x: int('1' in x))
df['5'] = df.unique_routes.apply(lambda x: int('5' in x))
df['203'] = df.unique_routes.apply(lambda x: int('203' in x))
df['275'] = df.unique_routes.apply(lambda x: int('275' in x))
df = df.fillna(0)
# transaction encoder for events
te = TransactionEncoder()
te.__dict__['columns_mapping_'] = {str(i): i for i in list(range(48))}
te_ary = te.fit(df.events).transform(df.events, sparse=True)
if te_ary.shape[1] < 48:
te.__dict__['columns_mapping_'] = {str(i): i for i in list(range(te_ary.shape[1]))}
df_events = DataFrame.sparse.from_spmatrix(te_ary, columns=te.__dict__['columns_mapping_'].keys())
a = 48 - te_ary.shape[1]
b = 48 - a
for i in list(range(b, 48)):
df_events[i] = int(0)
df_events = df_events.astype(SparseDtype(int))
else:
df_events = DataFrame.sparse.from_spmatrix(te_ary, columns=te.__dict__['columns_mapping_'].keys())
df_events = df_events.astype(SparseDtype(int))
df_events = df_events.add_prefix('event_')
# drop events and routes bec we have spartse matrix
df = df.drop({'events', 'unique_routes', 'external_referer_domains'}, axis=1)
df = concat([df, df_events], axis=1, join="inner")
# load StandardScaler
scaler = joblib.load("scaler.bin")
df = scaler.transform(df.values)
df = torch.FloatTensor(df)
test_loader = DataLoader(dataset=df, batch_size=1)
return test_loader
def inference(self, data):
y_pred_list = []
with torch.no_grad():
for X_batch in data:
X_batch = X_batch.cpu()
y_test_pred = self.model.cpu()(X_batch)
y_test_pred = torch.sigmoid(y_test_pred)
y_pred_tag = torch.round(y_test_pred)
y_pred_list.append(y_pred_tag.cpu().numpy())
y_pred_list = [a.squeeze().tolist() for a in y_pred_list]
return y_pred_list
def postprocess(self, preds):
# TODO: Add any needed post-processing of the model predictions here
res = []
# pres has size [BATCH_SIZE, 1]
# convert it to list
preds = preds.cpu().tolist()
for pred in preds:
label = self.mapping[str(pred)][1]
res.append({'label': label, 'index': pred})
return res
_service = MyHandler()
def handle(data, context):
if not _service.initialized:
_service.initialize(context)
if data is None:
return None
data = _service.preprocess(data)
data = _service.inference(data)
data = _service.postprocess(data)
return data
my model mar in DropBox: Dropbox - binclaff.mar - Simplify your life
I will be very grateful if someone can help me correctly describe my script in the handler and show my mistakes.