Hello community.
I’m trying to load and offload the model’s weight from the host(pinned) and GPU and from the GPU and host(pinned), with some multiple Python threads and CUDA streams.
But, I’ve got different inference results when transferring the model’s weights(actually layers you know) when migrate concurrently and not concurrently.
Also, I presume the problem is on difference between DeviceCachingAllocator 's free()
and the CUDAHostAllocator’s free()
CUDAHostAllocator inserts the event to cuda_events_
when it’s not ready.
But, DeviceCachingAllocator doesn’t insert the event, even if it’s not ready.
Are these behaviors expected?
Here are some codes for simulating the problem. I know it’s…a little bit long…but you can reproduce the problem easily.
and also, I’m using PyTorch 2.0.1
from queue import Queue
import threading
import torch
N_MEMORY = 10
MEMORY_LIMIT = 3 * 1024**3 # 3GB
LAYER_SIZE = 512 * 1024**2 # 512MB
class Object(object):
pass
class FakeLoader(threading.Thread):
def __init__(self, streams):
threading.Thread.__init__(self)
self.streams = streams
self.memory_loaded = 0
self.loaded_layer = Queue()
def can_load(self):
global layers
if self.memory_loaded + LAYER_SIZE < MEMORY_LIMIT:
return True
else:
return False
def load(self, id):
global layers
print("BEGIN LOAD", id)
with torch.cuda.stream(self.streams["upstream"]):
layers[id].tensor = layers[id].tensor.to("cuda:0", non_blocking=True)
layers[id].end_load_event = torch.cuda.Event(enable_timing=True)
layers[id].end_load_event.record(self.streams["upstream"])
self.memory_loaded += LAYER_SIZE
self.loaded_layer.put(id)
def unload(self, id):
global layers
print("BEGIN UNLOAD", id)
with torch.cuda.stream(self.streams["downstream"]):
layers[id].tensor = layers[id].tensor.to("cpu", non_blocking=True)
# !!! UNCOMMENT if you want precise result
# self.streams["downstream"].synchronize()
self.memory_loaded -= LAYER_SIZE
def run(self):
global layers
for id in range(N_MEMORY):
layers[id].begin_load_event = torch.cuda.Event(enable_timing=True)
layers[id].begin_load_event.record(self.streams["upstream"])
while True:
if self.can_load():
self.load(id)
break
else:
self.unload(self.loaded_layer.get())
layers[id].load_lock.set()
if __name__ == "__main__":
global layers
layers = {}
orginal_memorys_cpu = {}
orginal_memorys_gpu = {}
for id in range(N_MEMORY):
layers[id] = Object()
layers[id].tensor = torch.randint(
32000, [1, 64 * 1024**2], dtype=torch.long
) # 512MB
layers[id].tensor = layers[id].tensor.pin_memory()
orginal_memorys_cpu[id] = layers[id].tensor.detach().clone().pin_memory()
orginal_memorys_gpu[id] = layers[id].tensor.detach().clone().to("cuda:0")
streams = {}
streams["upstream"] = torch.cuda.Stream()
streams["downstream"] = torch.cuda.Stream()
# init()
for layer in layers.values():
layer.load_lock = threading.Event()
loader = FakeLoader(streams)
loader.start()
loader.join()
# check_result()
for id in range(N_MEMORY):
if layers[id].tensor.device == torch.device("cpu"):
print("Layer", id, "is in cpu")
if torch.allclose(
layers[id].tensor, orginal_memorys_cpu[id], equal_nan=True
):
print(f"Layer {id} is same")
else:
print(f"Layer {id} is different")
elif layers[id].tensor.device == torch.device("cuda:0"):
print("Layer", id, "is in cuda")
if torch.allclose(
layers[id].tensor, orginal_memorys_gpu[id], equal_nan=True
):
print(f"Layer {id} is same")
else:
print(f"Layer {id} is different")
else:
print("!!! SOMETHING WRONG Device is", layers[id].tensor.device)
Thanks.