for i in range(num_processes):
s = DistributedSampler(data_set, num_processes, i)
d_loader = DataLoader(data_set, batch_size=int(num_batches / num_processes), drop_last=True, sampler=s)
- for k, data in enumerate(d_loader):
+ for data in d_loader:
scanned_data = torch.cat((scanned_data, data), 0)
self.assertEqual(scanned_data.size(), scanned_data.unique().size())
workers = loader.workers
if pin_memory:
pin_memory_thread = loader.pin_memory_thread
- for i, sample in enumerate(loader):
+ for i, _ in enumerate(loader):
if i == 10:
break
assert i == 10
def check_len(dl, expected):
self.assertEqual(len(dl), expected)
n = 0
- for sample in dl:
+ for _ in dl:
n += 1
self.assertEqual(n, expected)
check_len(self.dataset, 100)
@unittest.skipIf(not TEST_CUDA, "CUDA unavailable")
def test_shuffle_pin_memory(self):
loader = DataLoader(self.dataset, batch_size=2, shuffle=True, num_workers=4, pin_memory=True)
- for batch_ndx, (s, n) in enumerate(loader):
+ for (s, n) in loader:
self.assertIsInstance(s[0], str)
self.assertTrue(n.is_pinned())
@unittest.skipIf(not TEST_CUDA, "CUDA unavailable")
def test_pin_memory(self):
loader = DataLoader(self.dataset, batch_size=2, pin_memory=True)
- for batch_ndx, sample in enumerate(loader):
+ for sample in loader:
self.assertTrue(sample['a_tensor'].is_pinned())
self.assertTrue(sample['another_dict']['a_number'].is_pinned())
def test_custom_batch_pin(self):
loader = DataLoader(self.dataset, batch_size=2, collate_fn=collate_wrapper,
pin_memory=True)
- for batch_ndx, sample in enumerate(loader):
+ for sample in loader:
self.assertTrue(sample.inp.is_pinned())
self.assertTrue(sample.tgt.is_pinned())
def test_custom_batch_pin_worker(self):
loader = DataLoader(self.dataset, batch_size=2, collate_fn=collate_wrapper,
pin_memory=True, num_workers=1)
- for batch_ndx, sample in enumerate(loader):
+ for sample in loader:
self.assertTrue(sample.inp.is_pinned())
self.assertTrue(sample.tgt.is_pinned())