import sys
import os
import logging
import random
import numpy as np
[docs]
class ALU_Dataset:
"""Generate training data for all ALU operations
The ALU takes two integers and applies one of the supported
model_ops. E.g. `op1=123, op2=100, op='-' -> result 23`
The net is supposed to learn to 'calculate' the results for
arbitrary op1, op2 (positive integers, `0..2**bit_count - 1`) and
the twelve supported ops:
`["+", "-", "*", "/", "%", "AND", "OR", "XOR", ">", "<", "=", "!="]`
:param bit_count: number of bits for each of the two operands, default 31 (mult uses 15 bits)
:param pre_weight: if True, the model_dis will be reweighted to generate samples for 'difficult' ops
"""
def __init__(self, bit_count=31, pre_weight=False):
self.log = logging.getLogger("Datasets")
self.model_ops = [
"+",
"-",
"*",
"/",
"%",
"AND",
"OR",
"XOR",
">",
"<",
"=",
"!=",
]
self.model_is_boolean = [
False,
False,
False,
False,
False,
False,
False,
False,
True,
True,
True,
True,
]
# Probabilites for creating a sample for each of the ops, (Will be
# reweighted on checks to generate for samples for 'difficult' ops):
self.model_dis = [10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10]
model_dis_w = [19, 12, 110, 15, 36, 10, 10, 10, 10, 10, 10, 10]
self.model_funcs = [
self._add_smpl,
self._diff_smpl,
self._mult_smpl,
self._div_smpl,
self._mod_smpl,
self._and_smpl,
self._bor_smpl,
self._xor_smpl,
self._greater_smpl,
self._lesser_smpl,
self._eq_smpl,
self._neq_smpl,
]
self.bit_count = bit_count
self.op_count = len(self.model_ops)
if self.bit_count + 1 > self.op_count:
self.embedding_size = self.bit_count + 1
else:
self.embedding_size = self.op_count
self.pre_weight = pre_weight
self.all_bits_one = 2**self.bit_count - 1
self.true_vect = self.all_bits_one
self.false_vect = 0
self.input_size = (self.bit_count + 1) * 2 + len(self.model_ops)
self.output_size = 32
if pre_weight is True:
self.model_dis = model_dis_w
@staticmethod
def _int_to_binary_vect(num_int, num_bits=8):
"""get a binary encoded vector of n of bit-lenght nm
:param num_int: integer to encoded
:param num_bits: number of bits to use for positional_encoding
:return: binary vector of length num_bits
"""
num_vect = np.zeros(num_bits, dtype=np.float32)
for i in range(0, num_bits):
if num_int & (2**i) != 0:
num_vect[i] = 1.0
return num_vect
@staticmethod
def _int_to_onehot_vect(num_int, num_bits):
"""get a one-hot encoded vector of n of bit-lenght nm
:param num_int: integer to encoded
:param num_bits: number of bits to use for positional_encoding
:return: one-hot vector of length num_bits
"""
num_vect = np.zeros(num_bits, dtype=np.float32)
num_vect[num_int] = 1.0
return num_vect
@staticmethod
def _get_random_bits(bits):
"""get bits random int 0...2**bits-1
:param bits: number of bits to uses
:return: random int `0...2**bits-1`
"""
return random.randint(0, 2**bits - 1)
[docs]
def op_string_to_index(self, op_string):
"""transform op_string (e.g. '+' -> 0) into corresponding index
:param op_string: string of op to transform
:return: index of op_string
"""
for i in range(0, len(self.model_ops)):
if self.model_ops[i] == op_string:
return i
return -1
[docs]
def get_data_point(
self,
equal_distrib=False,
valid_ops=None,
vector=False,
positional_encoding=False,
):
"""Get a random example for on ALU operation for training
:param equal_distrib: if False, more 'difficult' ops will be generated more often.
:param valid_ops: if not None, only the ops in valid_ops will be used
:param vector: if True, the result will be returned as an embedded encoded vector
:param positional_encoding: if True, the result will be returned as an embedded encoded vector with additional bits for positional positional_encoding
"""
# result = -1
op1 = self._get_random_bits(self.bit_count)
op2 = self._get_random_bits(self.bit_count)
if valid_ops is not None and len(valid_ops) == 0:
valid_ops = None
if valid_ops is not None:
if equal_distrib is False:
self.log.warning(
"Op restriction via valid_ops forces equal_distrib=True"
)
equal_distrib = True
for op in valid_ops:
if op not in self.model_ops:
self.log.warning(
f"Cannot restrict valid_ops to {op}, unknown operation, ignoring all valid_ops"
)
valid_ops = None
break
if equal_distrib or valid_ops is not None:
if valid_ops is None:
op_index = random.randint(0, len(self.model_ops) - 1)
else:
if len(valid_ops) == 1:
op_index = 0
else:
op_index = random.randint(0, len(valid_ops) - 1)
op_index = self.model_ops.index(valid_ops[op_index])
else: # make 'difficult' ops more present in training samples:
rx = 0
for md in self.model_dis:
rx += md
rrx = random.randint(0, rx)
rx = 0
op_index = 0
for op_index in range(0, len(self.model_ops)):
rx += self.model_dis[op_index]
if rx > rrx:
break
return self._encode_op(
op1, op2, op_index, vector=vector, positional_suffix=positional_encoding
)
def _generatorenerator(self, samples=20000, equal_distrib=False, valid_ops=None):
while True:
x, Y = self.create_training_data(
samples=samples,
valid_ops=valid_ops,
equal_distrib=equal_distrib,
title=None,
)
yield x, Y
def _encode_op(self, op1, op2, op_index, vector=False, positional_suffix=False):
"""turn two ints and operation into training data"""
op1, op2, result = self.model_funcs[op_index](op1, op2)
if self.model_is_boolean[op_index] is True:
if result == self.false_vect:
str_result = "True"
else:
str_result = "undefined"
else:
str_result = result
sym = f"{op1} {self.model_ops[op_index]} {op2} = {str_result}"
if vector is True:
if positional_suffix is True:
sz = self.embedding_size + 3
else:
sz = self.embedding_size
v1 = self._int_to_binary_vect(op1, num_bits=sz)
v2 = self._int_to_onehot_vect(op_index, num_bits=sz)
v3 = self._int_to_binary_vect(op2, num_bits=sz)
if positional_suffix is True:
v1[-3] = 1.0
v2[-2] = 1.0
v3[-1] = 1.0
inp = np.array([v1, v2, v3], dtype=np.float32)
else:
inp = np.concatenate(
[
self._int_to_binary_vect(op1, num_bits=self.bit_count + 1),
self._int_to_onehot_vect(op_index, num_bits=len(self.model_ops)),
self._int_to_binary_vect(op2, num_bits=self.bit_count + 1),
]
)
oup = self._int_to_binary_vect(result, num_bits=self.output_size)
return inp, oup, result, op_index, sym
@staticmethod
def _add_smpl(op1, op2):
"""addition training example"""
result = op1 + op2
return op1, op2, result
@staticmethod
def _diff_smpl(op1, op2):
"""subtraction training example"""
if op2 > op1:
op2, op1 = op1, op2
result = op1 - op2
return op1, op2, result
def _mult_smpl(self, op1, op2):
"""multiplication training example"""
modul = 2 ** (self.bit_count // 2) - 1
op1 = op1 % modul
op2 = op2 % modul
result = op1 * op2
return op1, op2, result
def _div_smpl(self, op1, op2):
"""integer division training example"""
while op2 == 0:
op2 = self._get_random_bits(self.bit_count)
if op1 < op2 and random.randint(0, 2) != 0:
if op1 != 0:
op1, op2 = op2, op1
result = op1 // op2
return op1, op2, result
def _mod_smpl(self, op1, op2):
"""modulo (remainder) training example"""
while op2 == 0:
op2 = self._get_random_bits(self.bit_count)
if op1 < op2 and random.randint(0, 2) != 0:
if op1 != 0:
op1, op2 = op2, op1
result = op1 % op2
return op1, op2, result
@staticmethod
def _and_smpl(op1, op2):
"""bitwise AND training example"""
result = op1 & op2
return op1, op2, result
@staticmethod
def _bor_smpl(op1, op2):
"""bitwise OR training example"""
result = op1 | op2
return op1, op2, result
@staticmethod
def _xor_smpl(op1, op2):
"""bitwise XOR training example"""
result = op1 ^ op2
return op1, op2, result
def _greater_smpl(self, op1, op2):
"""integer comparisation > training example"""
if op1 > op2:
result = self.true_vect
else:
result = self.false_vect
return op1, op2, result
def _lesser_smpl(self, op1, op2):
"""integer comparisation < training example"""
if op1 < op2:
result = self.true_vect
else:
result = self.false_vect
return op1, op2, result
def _eq_smpl(self, op1, op2):
"""integer comparisation == training example"""
if random.randint(0, 1) == 0: # create more cases
op2 = op1
if op1 == op2:
result = self.true_vect
else:
result = self.false_vect
return op1, op2, result
def _neq_smpl(self, op1, op2):
"""integer comparisation != training example"""
if random.randint(0, 1) == 0: # create more cases
op2 = op1
if op1 != op2:
result = self.true_vect
else:
result = self.false_vect
return op1, op2, result
[docs]
def create_data_point(
self, op1, op2, op_string, vector=False, positional_suffix=False
):
"""create training data from given ints op1, op2 and op_string"""
op_index = self.op_string_to_index(op_string)
if op_index == -1:
print(f"Invalid operation {op_string}")
return np.array([]), np.array([]), -1, -1, None
return self._encode_op(op1, op2, op_index, vector, positional_suffix)
[docs]
def create_training_data(
self,
samples=10000,
valid_ops=None,
equal_distrib=False,
verbose=True,
title=None,
):
"""create a number of training samples"""
x, y, _, _, _ = self.get_data_point()
dpx = np.zeros((samples, len(x)), dtype=np.float32)
dpy = np.zeros((samples, len(y)), dtype=np.float32)
if verbose is True:
if title is None:
print(f"Creating {samples} data points (. = 1000 progress)")
else:
print(f"{title}: Creating {samples} data points (. = 1000 progress)")
for i in range(0, samples):
if verbose is True:
if i % 100000 == 0:
print(f"{i:>10} ", end="")
if (i + 1) % 1000 == 0:
if verbose is True:
print(".", end="")
sys.stdout.flush()
if (i + 1) % 100000 == 0:
print()
if valid_ops is None:
x, y, _, _, _ = self.get_data_point(equal_distrib=equal_distrib)
else:
x, y, _, _, _ = self.get_data_point(
equal_distrib=True, valid_ops=valid_ops
)
dpx[i, :] = x
dpy[i, :] = y
if verbose is True:
print()
return dpx, dpy
[docs]
def create_vector_training_data(
self,
samples=10000,
valid_ops=None,
equal_distrib=False,
verbose=True,
title=None,
positional_encoding=True,
):
"""create a number of training samples"""
x, y, _, _, _ = self.get_data_point()
if positional_encoding is True:
sz = self.embedding_size + 3
else:
sz = self.embedding_size
dpx = np.zeros((samples, 3, sz), dtype=np.float32)
dpy = np.zeros((samples, len(y)), dtype=np.float32)
if verbose is True:
if title is None:
print(f"Creating {samples} data points (. = 1000 progress)")
else:
print(f"{title}: Creating {samples} data points (. = 1000 progress)")
for i in range(0, samples):
if verbose is True:
if i % 100000 == 0:
print(f"{i:>10} ", end="")
if (i + 1) % 1000 == 0:
if verbose is True:
print(".", end="")
sys.stdout.flush()
if (i + 1) % 100000 == 0:
print()
if valid_ops is None:
x, y, _, _, _ = self.get_data_point(
equal_distrib=equal_distrib,
vector=True,
positional_encoding=positional_encoding,
)
else:
x, y, _, _, _ = self.get_data_point(
equal_distrib=True,
valid_ops=valid_ops,
vector=True,
positional_encoding=positional_encoding,
)
dpx[i, :, :] = x
dpy[i, :] = y
if verbose is True:
print()
return dpx, dpy
[docs]
def decode_results(self, result_int_vects):
"""take an array of 32-float results from neural net and convert to ints"""
result_vect_ints = []
for vect in result_int_vects:
if len(vect) != self.output_size:
print(f"Ignoring unexpected vector of length {len(vect)}")
else:
int_result = 0
for i in range(0, self.output_size):
if vect[i] > 0.5:
int_result += 2**i
result_vect_ints.append(int_result)
return result_vect_ints
[docs]
def check_results(
self,
model,
samples=1000,
vector=False,
positional_encoding=True,
valid_ops=None,
verbose=False,
):
"""Run a number of tests on trained model"""
ok = 0
err = 0
operr = [0] * len(self.model_ops)
opok = [0] * len(self.model_ops)
for _ in range(0, samples):
x, _, z, op, s = self.get_data_point(
equal_distrib=True,
vector=vector,
positional_encoding=positional_encoding,
valid_ops=valid_ops,
)
res = self.decode_results(model.predict(np.array([x])))
if res[0] == z:
ok += 1
opok[op] += 1
r = "OK"
else:
err += 1
operr[op] += 1
r = "Error"
if verbose is True:
if self.model_is_boolean[op] is True:
if res[0] == self.false_vect:
str_result = "False"
elif res[0] == self.true_vect:
str_result = "True"
else:
str_result = "undefined"
else:
str_result = res[0]
if res[0] == z:
print(f"{s} == {str_result}: {r}")
else:
print(f"{s} != {str_result}: {r}")
if self.model_is_boolean[op] is False:
print(bin(res[0]))
print(bin(z))
opsum = ok + err
if opsum == 0:
opsum = 1
print(f"Ok: {ok}, Error: {err} -> {ok/opsum*100.0}%")
print("")
for i in range(0, len(self.model_ops)):
opsumi = opok[i] + operr[i]
if opsumi == 0:
continue
# modify the distribution of training-data generated to favour
# ops with bad test results, so that more training data is
# generated on difficult cases:
self.model_dis[i] = int(operr[i] / opsumi * 100) + 10
print(f"OP{self.model_ops[i]}: Ok: {opok[i]}, Error: {operr[i]}", end="")
print(f" -> {opok[i]/opsumi*100.0}%")
if valid_ops is None:
print("Change probability for ops in new training data:")
print(f"Ops: {self.model_ops}")
print(f"Weights: {self.model_dis}")
return ok / opsum