Tuesday, February 18, 2020

Shared Code Analysis: Finding similar code in malware samples

Shared Code Analysis: Finding similar code in malware samples 

Whats covered?

  1. Preparing the samples
  2. Jaccard? I'snt that the captain of the US Space Force?
  3. Graph Similarities


In my last blog post I hunted down some APT28 malware samples and sorted them by the malware family name. Now were going to use those samples and find code similarities between them.


Ok ive loaded the samples into my Ubuntu VM now what?


1 - Preparing the samples


Im a total noob at this data science buisness but so far im loving what im learning. Thats why this is applicable here.


What can we compare in all the samples that could show similarities in the code?

Strings
During malware analysis strings are used a lot for IOCs and to look for various keywords in a file to see if anything jumps out.  Usually these are strings that are defined by the programmer. This could be a good option.

Assembly Code
We could do a more complex approach and use capstone or redare and get the assembly code and compare that but that could take a while to figure out.. and compilers do fun stuff so very similar code could be very different when compiled with a flag or not... so lets not go there for now.

Import Address Table
What something all malware has that the author probably just doesnt think about even if they go crazy with obfuscation, anti-analysis tricks and all that jazz?  The imported dlls, and function calls.    They usually remain the same because you know... programming.  This is a pretty good option to compare. The Import Address Table is where we find a mapping of imports.

Dynamic API Call
What about API calls?  These have an advantage of giving a generic picture of actions taken by the malware and usually happen in a particular order.  To do this we will probably need to actually run the malware samples and that requires some planning to execute.  Could be a fun project for later.  Cuckoo Sandbox might be a good option for that but... have you ever set a full Cuckoo enviornment up?  Its time consuming and requires a good amount of computing resources.

How are we going to compare these samples?  

Strings would work pretty good but after some research looks like the Import Address Table a more reliable option.  Im tempted to use both.  just in case we run into malware samples without IAT.

IAT Wait whats that?

Id suggest reading this blog post here: https://tech-zealots.com/malware-analysis/journey-towards-import-address-table-of-an-executable-file/
As soon as the Windows loader loads an executable it does certain things in the background. First, it reads the files of a PE structure and loads an executable image into the memory. The other thing it does is to scan the Import Address Table (IAT) of an executable to locate the DLLs and functions that the executable uses and loads all these DLLs and maps them into the process address space.
[......] 
Within any executable file, we would see an array of data structures which is one per imported DLL. Each of these structures gives the name of the imported DLL and points to an array of function pointers. Import Address Table (IAT) is an array of these function pointers where the address of the imported function is written by the Windows loader.

Ok so how are we going to extract out and store this info?

After some google fu and reading looks like we can accomplish this with pefile pretty easy.

Once pefile is installed naturally we have to test it with a little code.
psudo code?

import pefile
import os

file = <our pe file>
pe=pefile.PE(file)
pe.parse_data_directories()
print pe.DIRECTORY_ENTRY_IMPORT

!!! Nope thats doesnt quite work.. ok back to the docs...


https://github.com/erocarrera/pefile/blob/wiki/UsageExamples.md#listing-the-imported-symbols

Found it.  Looks like can loop through

and pull them out.


Not so psudo code

import os
import pefile

file='/path/to/malware'
 
def getIAT(file):
    pe =  pefile.PE(file)
    # If the PE file was loaded using the fast_load=True argument, we will need to parse the data directories:
    pe.parse_data_directories()
    for entry in pe.DIRECTORY_ENTRY_IMPORT:
       print entry.dll
       for imp in entry.imports:
          iat=set(hex(imp.address))
         print iat


if __name__ == '__main__':
    getIAT(file)
 

oh.. bad code..


Add some better functionality


import os
import pefile
import argparse


def getIAT(file):

    pe =  pefile.PE(file)
    # If the PE file was loaded using the fast_load=True argument, we will need to parse the data directories:
    pe.parse_data_directories()
    for entry in pe.DIRECTORY_ENTRY_IMPORT:
       print entry.dll
       for imp in entry.imports:
          iat=set(hex(imp.address))
          print iat


if __name__ == '__main__':
    parser = argparse.ArgumentParser(
        description="GetIAT of file."
    )
    parser.add_argument(
        "target_file",
        help="malware file"
    )
    args = parser.parse_args()
    file=args.target_file
    getIAT(file)


And it gets something.... 



Not quite what were looking for.

Changing the code a little more

import os
import pefile
import argparse


def getIAT(file):

    pe =  pefile.PE(file)
    iat=set()
    # If the PE file was loaded using the fast_load=True argument, we will need to parse the data directories:
    pe.parse_data_directories()
    for entry in pe.DIRECTORY_ENTRY_IMPORT:
       print entry.dll
       for imp in entry.imports:
          iat.add(hex(imp.address))
          print iat


if __name__ == '__main__':
    parser = argparse.ArgumentParser(
        description="GetIAT of file."
    )
    parser.add_argument(
        "target_file",
        help="malware file"
    )
    args = parser.parse_args()
    file=args.target_file
    getIAT(file)






Ok... Its a start.

Now how are going to compare this for each sample?

Well we can take all the IAT data per sample. and compare them then devide how many are similar by the number of iat entries.   To do this we are going to use whats called the Jaccard Index.

2 - Jaccard Index

What is the Jaccard Index?
According to deepai.org the jaccard index is
The Jaccard Index, also known as the Jaccard similarity coefficient, is a statistic used in understanding the similarities between sample sets. The measurement emphasizes similarity between finite sample sets, and is formally defined as the size of the intersection divided by the size of the union of the sample sets.

Sound like what we need to use.


code:

def jaccard(set1,set2):
    """
    Calculate Jaccard distance between two sets of malware.
    Uses what is similar and how many attributes there are to calculate jaccard
    """
    intersection = set1.intersection(set2)
    intersection_length = float(len(intersection))
    union = set1.union(set2)
    union_length = float(len(union))
    return intersection_length / union_length

Now how do we visualize these connections in the data?


3 - Graphing similarities


Networkx to the rescue.

What does networkx do?  It allows us to visualize and graph the connections in the data by creating nodes and connections between them. In other words each sample is a node and if They share similar dll imports then we can connect them and say that they are similar.

APT28 Similarity Graph
























Lets try to create a simple graph




import networkx as nx 

G=nx.Graph() 

G.add_node("a") 
G.add_nodes_from(["b","c"]) 

G.add_edge(1,2)
edge = ("d", "e") 
G.add_edge(*edge) 
edge = ("a", "b") 
G.add_edge(*edge) 

print("Nodes of graph: ") 
print(G.nodes()) 
print("Edges of graph: ")
print(G.edges())

nx.draw(G)
















Putting It all together




Ok so what steps do we need to take now to get this whole thing working?


  1. Get user input of malware samples directory
  2. get file and check if PE
  3. extract IAT from PE
  4. add malware sample as a node on the graph  [ graph.addnode(path,label) ]
  5. Iterate samples and calculate jaccard index 
  6. if jaccard is above threshold add connection "edge" [ graph.addedge(mal1,mal2,how similar) ]
  7. write graph to disk




Psudo code:

Imports
   
def getIAT(path):
     """
        Extract IAT
     """

def jaccard(set1,set2):
    """
    Calculate Jaccard distance between two sets of malware.
    Uses what is similar and how many attributes there are to calculate jaccard
    """

def  check_pe(path):
   """
      Check if its a PE file
   """

__main__

Get arguments
directory=

for each file in directory
     check if PE
     if  PE file
         Extract IAT from file
         Store data
     ADD file as node to graph
     Iterate through malware files
        calculate jaccard for 2 malware files
      If jaccard distance is above the threshold add an connection("edge") between nodes 

write to disk
    


Lets code this up now.


#!/usr/bin/python

import itertools
import argparse
import networkx
from networkx.drawing.nx_pydot import write_dot
import pprint
import pefile
import os


def getIAT(fullpath):
    """
    Extract the Import Address Table from the binary
    """
    pe =  pefile.PE(fullpath)
    # If the PE file was loaded using the fast_load=True argument, we will need to parse the data directories:
    pe.parse_data_directories()
    iat_list=set()
    try:
        for entry in pe.DIRECTORY_ENTRY_IMPORT:

          for imp in entry.imports:
            iat_list.add(hex(imp.address))
            #iat=set(hex(imp.address))
    except AttributeError:
        print "ERROR! No imports in sample. Falling back to strings method.."
        iat_list=getstrings(fullpath)
    return iat_list

def getstrings(fullpath):
    """
    Extract strings from the binary
    really doesnt do much unless theres a large number of PE files without imports... packed warez?
    """
    strings = os.popen("strings '{0}'".format(fullpath)).read()
    strings = set(strings.split("\n"))
    return strings

def pecheck(fullpath):
    """
    Checks for 'MZ' to see if binary is PE
    """
    return open(fullpath).read(2) == "MZ"

def jaccard(set1,set2):
    """
    Calculate Jaccard distance between two sets of malware.
    Uses what is similar and how many attributes there are to calculate jaccard
    """
    intersection = set1.intersection(set2)
    intersection_length = float(len(intersection))
    union = set1.union(set2)
    union_length = float(len(union))
    return intersection_length / union_length


if __name__ == '__main__':
    parser = argparse.ArgumentParser(
        description="Find similarity between malware and graph it."
    )

    parser.add_argument(
        "target_directory",
        help="Directory containing malware"
    )

    parser.add_argument(
        "output_dot_file",
        help="Where to save the output graph DOT file"
    )

    parser.add_argument(
        "--jaccard_index_threshold","-j",dest="threshold",type=float,
        default=0.8,help="Threshold above which to create an 'edge' between samples"
    )
    """
    parser.add_argument(
        "--method","-m",dest=method
    )
    """
    args = parser.parse_args()
    malware_paths = [] # stores malware file paths
    malware_attributes = dict() # stores the malware Import Addtess Table
    graph = networkx.Graph() # similarity graph

    for root, dirs, paths in os.walk(args.target_directory):
        # iterate through directory to find malware paths
        for path in paths:
            full_path = os.path.join(root,path)
            malware_paths.append(full_path)

    # check if PE file
    malware_paths = filter(pecheck, malware_paths)

    # get the IAT for malware and store it
    for path in malware_paths:
        attributes = getIAT(path)
        print "Extracted {0} attributes from {1} ...".format(len(attributes),path)
        malware_attributes[path] = attributes

        # add each malware file to the graph
        graph.add_node(path,label=os.path.split(path)[-1][:10])

    # iterate through all pairs of malware
    for malware1,malware2 in itertools.combinations(malware_paths,2):

        # calculate the jaccard distance for the current malware samples
        jaccard_index = jaccard(malware_attributes[malware1],malware_attributes[malware2])

        # Check if jaccard distance is above the threshold.. if so add an edge
        if jaccard_index > args.threshold:
            print malware1,malware2,jaccard_index
            graph.add_edge(malware1,malware2,penwidth=1+(jaccard_index-args.threshold)*10)

    # Output the graph
    write_dot(graph,args.output_dot_file)


Full Code: https://github.com/es0/MalwareSimilarityGraph

1 comment:

  1. any way you could implement this but with the assembly code?

    ReplyDelete