294 lines
9.8 KiB
Python
294 lines
9.8 KiB
Python
# -*- coding: utf-8 -*-
|
|
import logging
|
|
import string
|
|
from multiprocessing import Pool
|
|
|
|
import h5py
|
|
import joblib
|
|
import numpy as np
|
|
import pandas as pd
|
|
from tqdm import tqdm
|
|
|
|
logger = logging.getLogger('cisco_logger')
|
|
|
|
char2idx = dict((char, idx + 1) for (idx, char) in
|
|
enumerate(string.ascii_lowercase + string.punctuation + string.digits + " "))
|
|
|
|
idx2char = {v: k for k, v in char2idx.items()}
|
|
|
|
|
|
def get_character_dict():
|
|
return char2idx
|
|
|
|
|
|
def get_vocab_size():
|
|
return len(char2idx) + 1
|
|
|
|
|
|
def encode_char(c):
|
|
return char2idx.get(c, 0)
|
|
|
|
|
|
def decode_char(i):
|
|
return idx2char.get(i, "")
|
|
|
|
|
|
encode_char = np.vectorize(encode_char)
|
|
decode_char = np.vectorize(decode_char)
|
|
|
|
|
|
def encode_domain(domain: string):
|
|
return encode_char(list(domain))
|
|
|
|
|
|
def decode_domain(domain):
|
|
return "".join(decode_char(domain))
|
|
|
|
|
|
def get_user_chunks(user_flow, window=10):
|
|
result = []
|
|
chunk_size = (len(user_flow) // window)
|
|
for i in range(chunk_size):
|
|
result.append(user_flow.iloc[i * window:(i + 1) * window])
|
|
if result and len(result[-1]) != window:
|
|
result.pop()
|
|
return result
|
|
|
|
|
|
def get_domain_features(domain: string, max_length=40):
|
|
encoding = np.zeros((max_length,))
|
|
for j in range(min(len(domain), max_length)):
|
|
c = domain[len(domain) - 1 - j]
|
|
encoding[max_length - 1 - j] = encode_char(c)
|
|
return encoding
|
|
|
|
|
|
def get_all_flow_features(features):
|
|
flows = np.stack(
|
|
map(lambda f: f[["duration", "bytes_up", "bytes_down"]], features)
|
|
)
|
|
return np.log1p(flows)
|
|
|
|
|
|
def filter_window_dataset_by_hits(domain, flow, name, hits, trusted_hits, server):
|
|
# select only 1.0 and 0.0 from training data
|
|
pos_idx = np.where(np.logical_or(hits == 1.0, trusted_hits >= 1.0))[0]
|
|
neg_idx = np.where(hits == 0.0)[0]
|
|
idx = np.concatenate((pos_idx, neg_idx))
|
|
# choose selected sample to train on
|
|
domain = domain[idx]
|
|
flow = flow[idx]
|
|
client = np.zeros_like(idx, float)
|
|
client[:pos_idx.shape[-1]] = 1.0
|
|
server = server[idx]
|
|
name = name[idx]
|
|
|
|
return domain, flow, name, client, server
|
|
|
|
|
|
def create_raw_dataset_from_flows(user_flow_df, max_len, window_size=10):
|
|
logger.info("get chunks from user data frames")
|
|
with Pool() as pool:
|
|
results = []
|
|
for user_flow in tqdm(get_flow_per_user(user_flow_df), total=len(user_flow_df['user_hash'].unique().tolist())):
|
|
results.append(pool.apply_async(get_user_chunks, (user_flow, window_size)))
|
|
windows = [window for res in results for window in res.get()]
|
|
logger.info("create training dataset")
|
|
domain, flow, hits, name, server, trusted_hits = create_dataset_from_windows(chunks=windows,
|
|
max_len=max_len)
|
|
# make client labels discrete with 4 different values
|
|
hits = np.apply_along_axis(lambda x: make_label_discrete(x, 3), 0, np.atleast_2d(hits))
|
|
|
|
return domain, flow, name, hits, trusted_hits, server
|
|
|
|
|
|
def store_h5dataset(path, data: dict):
|
|
f = h5py.File(path + ".h5", "w")
|
|
for key, val in data.items():
|
|
f.create_dataset(key, data=val)
|
|
f.close()
|
|
|
|
|
|
def check_h5dataset(path):
|
|
return open(path + ".h5", "r")
|
|
|
|
|
|
def load_h5dataset(path):
|
|
f = h5py.File(path + ".h5", "r")
|
|
data = {}
|
|
for k in f.keys():
|
|
data[k] = f[k]
|
|
return data
|
|
|
|
|
|
def create_dataset_from_windows(chunks, max_len):
|
|
"""
|
|
combines domain and feature windows to sequential training data
|
|
:param chunks: list of flow feature windows
|
|
:param vocab:
|
|
:param max_len:
|
|
:return:
|
|
"""
|
|
|
|
def get_domain_features_reduced(d):
|
|
return get_domain_features(d[0], max_len)
|
|
|
|
logger.info(" compute domain features")
|
|
domain_features = []
|
|
for ds in tqdm(map(lambda f: f.domain, chunks)):
|
|
domain_features.append(np.apply_along_axis(get_domain_features_reduced, 2, np.atleast_3d(ds)))
|
|
domain_features = np.concatenate(domain_features, 0)
|
|
logger.info(" compute flow features")
|
|
flow_features = get_all_flow_features(chunks)
|
|
logger.info(" select hits")
|
|
hits = np.max(np.stack(map(lambda f: f.virusTotalHits, chunks)), axis=1)
|
|
logger.info(" select names")
|
|
names = np.stack(map(lambda f: f.user_hash, chunks))
|
|
assert (names[:, :1].repeat(10, axis=1) == names).all()
|
|
names = names[:, 0]
|
|
logger.info(" select servers")
|
|
servers = np.stack(map(lambda f: f.serverLabel, chunks))
|
|
logger.info(" select trusted hits")
|
|
trusted_hits = np.max(np.stack(map(lambda f: f.trustedHits, chunks)), axis=1)
|
|
|
|
return (domain_features, flow_features,
|
|
hits, names, servers, trusted_hits)
|
|
|
|
|
|
def make_label_discrete(values, threshold):
|
|
max_val = np.max(values)
|
|
if max_val >= threshold:
|
|
return 1.0
|
|
elif max_val == -1:
|
|
return -1.0
|
|
elif 0 < max_val < threshold:
|
|
return -2.0
|
|
else:
|
|
return 0.0
|
|
|
|
|
|
def get_user_flow_data(csv_file):
|
|
types = {
|
|
"duration": int,
|
|
"bytes_down": int,
|
|
"bytes_up": int,
|
|
"domain": object,
|
|
"timeStamp": float,
|
|
"http_method": object,
|
|
"server_ip": object,
|
|
"user_hash": float,
|
|
"virusTotalHits": int,
|
|
"serverLabel": int,
|
|
"trustedHits": int
|
|
}
|
|
df = pd.read_csv(csv_file, index_col=False)
|
|
df = df[list(types.keys())]
|
|
# df.set_index(keys=['user_hash'], drop=False, inplace=True)
|
|
return df
|
|
|
|
|
|
def get_flow_per_user(df):
|
|
users = df['user_hash'].unique().tolist()
|
|
for user in users:
|
|
yield df.loc[df.user_hash == user].dropna(axis=0, how="any")
|
|
|
|
|
|
def load_or_generate_h5data(train_data, domain_length, window_size):
|
|
logger.info(f"check for h5data {train_data}")
|
|
try:
|
|
check_h5dataset(train_data)
|
|
except FileNotFoundError:
|
|
logger.info("load raw training dataset")
|
|
domain, flow, name, hits, trusted_hits, server = load_or_generate_raw_h5data(train_data, domain_length,
|
|
window_size)
|
|
logger.info("filter training dataset")
|
|
domain, flow, name, client, server = filter_window_dataset_by_hits(domain.value, flow.value,
|
|
name.value, hits.value,
|
|
trusted_hits.value, server.value)
|
|
logger.info("store training dataset as h5 file")
|
|
data = {
|
|
"domain": domain.astype(np.int8),
|
|
"flow": flow,
|
|
"name": name,
|
|
"client": client.astype(np.bool),
|
|
"server": server.astype(np.bool)
|
|
}
|
|
store_h5dataset(train_data, data)
|
|
logger.info("load h5 dataset")
|
|
data = load_h5dataset(train_data)
|
|
return data["domain"], data["flow"], data["name"], data["client"], data["server"]
|
|
|
|
|
|
def load_or_generate_raw_h5data(train_data, domain_length, window_size):
|
|
h5data = train_data + "_raw"
|
|
logger.info(f"check for h5data {h5data}")
|
|
try:
|
|
check_h5dataset(h5data)
|
|
except FileNotFoundError:
|
|
logger.info("h5 data not found - load csv file")
|
|
user_flow_df = get_user_flow_data(train_data)
|
|
logger.info("create raw training dataset")
|
|
domain, flow, name, hits, trusted_hits, server = create_raw_dataset_from_flows(user_flow_df, domain_length,
|
|
window_size)
|
|
logger.info("store raw training dataset as h5 file")
|
|
data = {
|
|
"domain": domain.astype(np.int8),
|
|
"flow": flow,
|
|
"name": name,
|
|
"hits_vt": hits.astype(np.int8),
|
|
"hits_trusted": hits.astype(np.int8),
|
|
"server": server.astype(np.bool)
|
|
}
|
|
store_h5dataset(h5data, data)
|
|
logger.info("load h5 dataset")
|
|
data = load_h5dataset(h5data)
|
|
return data["domain"], data["flow"], data["name"], data["hits_vt"], data["hits_trusted"], data["server"]
|
|
|
|
|
|
def generate_names(train_data, window_size):
|
|
user_flow_df = get_user_flow_data(train_data)
|
|
with Pool() as pool:
|
|
results = []
|
|
for user_flow in tqdm(get_flow_per_user(user_flow_df),
|
|
total=len(user_flow_df['user_hash'].unique().tolist())):
|
|
results.append(pool.apply_async(get_user_chunks, (user_flow, window_size)))
|
|
windows = [window for res in results for window in res.get()]
|
|
names = np.stack(map(lambda f: f.user_hash, windows))
|
|
names = names[:, 0]
|
|
|
|
return names
|
|
|
|
|
|
def load_or_generate_domains(train_data, domain_length):
|
|
fn = f"{train_data}_domains.gz"
|
|
|
|
try:
|
|
user_flow_df = pd.read_csv(fn)
|
|
except FileNotFoundError:
|
|
user_flow_df = get_user_flow_data(train_data)
|
|
# user_flow_df.reset_index(inplace=True)
|
|
user_flow_df = user_flow_df[["domain", "serverLabel", "trustedHits", "virusTotalHits"]].dropna(axis=0,
|
|
how="any")
|
|
user_flow_df = user_flow_df.groupby(user_flow_df.domain).mean()
|
|
user_flow_df.reset_index(inplace=True)
|
|
|
|
user_flow_df["clientLabel"] = np.where(
|
|
np.logical_or(user_flow_df.trustedHits > 0, user_flow_df.virusTotalHits >= 3), True, False)
|
|
user_flow_df[["serverLabel", "clientLabel"]] = user_flow_df[["serverLabel", "clientLabel"]].astype(bool)
|
|
user_flow_df = user_flow_df[["domain", "serverLabel", "clientLabel"]]
|
|
|
|
user_flow_df.to_csv(fn, compression="gzip")
|
|
|
|
domain_encs = user_flow_df.domain.apply(lambda d: get_domain_features(d, domain_length))
|
|
domain_encs = np.stack(domain_encs)
|
|
|
|
return domain_encs, user_flow_df.domain, user_flow_df[["clientLabel", "serverLabel"]].as_matrix().astype(bool)
|
|
|
|
|
|
def save_predictions(path, results):
|
|
joblib.dump(results, path + "/results.joblib", compress=3)
|
|
|
|
|
|
def load_predictions(path):
|
|
return joblib.load(path + "/results.joblib")
|