import random
from functools import partial
-from recorder import record_single
+from recorder import record_single, record_single_fp16
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=FutureWarning)
# @brief inpsect if file is created correctly
# @note this just checks if offset is corretly set, The result have to inspected
# manually
-def inspect_file(file_name):
+def inspect_file(file_name, _dtype = "float32"):
with open(file_name, "rb") as f:
while True:
- sz = int.from_bytes(f.read(4), byteorder="little")
+ if (_dtype == "float32"):
+ sz = int.from_bytes(f.read(4), byteorder="little")
+ elif (_dtype =="float16"):
+ sz = int.from_bytes(f.read(2), byteorder="little")
if not sz:
break
print("size: ", sz)
- print(np.fromfile(f, dtype="float32", count=sz))
+ print(np.fromfile(f, dtype=_dtype, count=sz))
class PositionalEncoding(tf.keras.layers.Layer):
def __init__(self, position, d_model):
fc = K.layers.Dense(4)
record_single(fc, (1, 1, 1, 10), "fc_single_batch")
+ fc1616 = K.layers.Dense(5)
+ record_single_fp16(fc1616, (3, 1, 1, 10), "fc_plain_fp16fp16",input_type='float16')
+ fc1616 = K.layers.Dense(4)
+ record_single_fp16(fc1616, (1, 1, 1, 10), "fc_single_batch_fp16fp16",input_type='float16')
+
bn = K.layers.BatchNormalization()
record_single(bn, (2, 4, 2, 3), "bn_channels_training", {"training": True})
record_single(bn, (2, 4, 2, 3), "bn_channels_inference", {"training": False})
record_single(positional_encoding, [(3, 1, 10, 6)], "positional_encoding")
inspect_file("dropout_20_training.nnlayergolden")
-
+inspect_file("fc_plain_fp16fp16.nnlayergolden", _dtype = "float16")
write_tensor(weights)
write_tensor(derivatives)
+
+def record_single_fp16(layer, input_shape, test_name, call_args={}, input_type='int'):
+ layer = attach_trans_layer(layer)
+ layer.build(input_shape)
+ if isinstance(input_shape, list):
+ inputs = [_rand_like(in_shape, 1, input_type) for in_shape in input_shape]
+ else:
+ inputs = _rand_like(input_shape, 1, input_type)
+
+ initial_weights = [tf.Variable(i) for i in layer.weights]
+
+ for _ in range(4):
+ layer.call(inputs, **call_args) # warm layer multiple times
+
+ with tf.GradientTape(persistent=True) as tape:
+ if isinstance(inputs, list):
+ list([tape.watch(inp) for inp in inputs])
+ else:
+ tape.watch(inputs)
+ outputs = layer.call(inputs, **call_args)
+ dy_constant = outputs * 2 # set incoming derivative to 2 instead of 1
+
+ weights = layer.weights.copy()
+ gradients = tape.gradient(dy_constant, layer.trainable_weights)
+ derivatives = tape.gradient(dy_constant, inputs)
+
+ try:
+ gradients = layer.to_nntr_trainable_weights(gradients)
+ except AttributeError:
+ pass
+
+ with open(test_name + ".nnlayergolden", "wb") as f:
+ writer = _get_writer(f)
+
+ def write_tensor_fp16(tensors):
+ if not isinstance(tensors, list):
+ tensors = [tensors]
+ for tensor in tensors:
+ tensor = tf.cast(tensor, tf.float16)
+ writer(tf.size(tensor,out_type=tf.int16), tensor)
+
+
+ ## @todo inputs outputs derivatives can be more than one
+ ## @note please update genLayerTests.py comments when updating below
+ write_tensor_fp16(initial_weights)
+ write_tensor_fp16(inputs)
+ write_tensor_fp16(outputs)
+ write_tensor_fp16(gradients)
+ write_tensor_fp16(weights)
+ write_tensor_fp16(derivatives)
void sizeCheckedReadTensor(nntrainer::Tensor &t, std::ifstream &file,
const std::string &error_msg) {
unsigned int sz = 0;
- nntrainer::checkedRead(file, (char *)&sz, sizeof(unsigned));
+
+ if (t.getDataType() == ml::train::TensorDim::DataType::FP32) {
+ nntrainer::checkedRead(file, (char *)&sz, sizeof(unsigned));
+ } else if (t.getDataType() == ml::train::TensorDim::DataType::FP16) {
+ nntrainer::checkedRead(file, (char *)&sz, sizeof(_FP16));
+ }
+
NNTR_THROW_IF(t.getDim().getDataLen() != sz, std::invalid_argument)
<< "[ReadFail] dimension does not match at " << error_msg << " sz: " << sz
<< " dimsize: " << t.getDim().getDataLen() << '\n';