Friday, 19 September 2014

Radius diameter server python

Small example of radius and diameter server on python.

For reproducing different testing scenarios, there is a need to test diameter applications by using radius authentication. For example, Radius server can send diameter Credit-Control Initial (Update) requests to PCRF  or HSS or to Online-Charging server (OCS) and receive policy based rules or user quota information ( session or event based charging)

To create a small Radius and diameter servers and clients for testing purposes, one can consider using pyprotosim (http://sourceforge.net/p/pyprotosim) software.

In this practical scenario, we will build radius client, radius server listening client auth requests on udp port 1812  and which is connected to diameter-based OCS server listening on tcp port 3868. The OCS application will use internal database ( python dictionary with the key of user's identity). Each user's profile in database will store user's quota for event and session based charging, for example:

users_db = 
{'123456789012345':{'maxbytes':'10000000', 'sessionid':'none', 'usedbytes':'0','cctime':'0'},

  '123456789012346':{'maxbytes':'20000000', 'sessionid':'none','usedbytes':'0','cctime':'0' },

  '123456789012347':{'maxbytes':'30000000', 'sessionid':'none', 'usedbytes':'0','cctime':'0'}}


Test flow:

Radius client sends radius "Access-Request" message with the following AVPs to Radius server:


"Calling-Station-Id", "123456789012345")) # identity of user
"User-Name", "test"
"User-Password", "password", which is encrypted with shared key.

Radius server by receiving this information, first checks if User-name, decrypted user password and shared key are matched and extracts user identity from Calling-Station-Id AVP. After getting user identity Radius server creates Credit-Control Initial Request to OCS server. OCS server returns Credit-Control Answer with either 2001 "Success" response if user is found in internal database or 5003 "NOT AUTHORIZED" response when user is not found.

Radius server by receiving diameter 2001 response, answers with radius "Access-Accept" to the client OR  "Access-Reject" when negative response from OCS is received or username, password are not matched


Scripts for testing scenario:

1) demo_radius_access_client.py
2) demo_radius_diameter_server.py
3) demo_OCS_server.py

User 123456789012345 is provisioned in OCS
User 123456789012348 is not provisioned in OCS

Installation (Ubuntu):

Download and unzip pyprotosim in some directory. Create own directory under it, for example ocs_tests

Copy three script into this directory and (optionally) replace IP address in scripts to your own. By default 127.0.0.1 is used.

Testing:

Start scripts demo_radius_diameter_server.py and OCS_server.py in different terminal consoles

1) Edit  radius_access_client.py and replace Calling-Station-Id attributes value with user 123456789012345. Run script which will send Access-Request to radius_diameter_server. Radius server will make diameter request to OCS server and find user in our dictionary DB. Radius server returns Access-Accept response.

Results from tcpdump capture:

tshark -r /tmp/diam.cap -R 'radius or diameter'

  1   0.000000    127.0.0.1 -> 127.0.0.1    RADIUS 174 Access-Request(1) (id=1, l=130)
  5   0.040622    127.0.0.1 -> 127.0.0.1    DIAMETER 384 cmd=Credit-ControlRequest(272) flags=R--- appl=Diameter Common Messages(0) h2h=541c3666 e2e=1b318003
  7   0.163230    127.0.0.1 -> 127.0.0.1    DIAMETER 280 cmd=Credit-ControlAnswer(272) flags=---- appl=Diameter Common Messages(0) h2h=541c3666 e2e=1b318003
 12   0.166757    127.0.0.1 -> 127.0.0.1    RADIUS 76 Access-Accept(2) (id=1, l=32)

2) Edit  radius_access_client.py and replace Calling-Station-Id attributes value with user 123456789012348. Run script which will send Access-Request to radius_diameter_server. Radius server will make diameter request to OCS and will not find user in our dictionary DB. Radius server returns Access-Reject response.

Results from tcpdump capture:

tshark -r /tmp/diam.cap -R 'radius or diameter'
  1   0.000000    127.0.0.1 -> 127.0.0.1    RADIUS 174 Access-Request(1) (id=1, l=130)
  5   0.041789    127.0.0.1 -> 127.0.0.1    DIAMETER 384 cmd=Credit-ControlRequest(272) flags=R--- appl=Diameter Common Messages(0) h2h=541c3667 e2e=1b318004
  7   0.156535    127.0.0.1 -> 127.0.0.1    DIAMETER 244 cmd=Credit-ControlAnswer(272) flags=---- appl=Diameter Common Messages(0) h2h=541c3667 e2e=1b318004
 10   0.159009    127.0.0.1 -> 127.0.0.1    RADIUS 64 Access-Reject(3) (id=1, l=20)


Scripts examples can be seen here:

1) demo_OCS_server.py

 #!/usr/bin/python  
 #############################################################################  
 # Example of diameter server listening client requests on port 3868   
 # This script is using pyprotosim software  
 # Read the terms of BSD license at pyprotosim website:  
 # http://sourceforge.net/projects/pyprotosim/    
 ############################################################################  
 import sys  
 sys.path.append("../")  
 # Remove them if everything is in the same dir  
 import socket  
 import select  
 import logging  
 from libDiameter import *  
 ###########################################################################################  
 # users_db is dictionary with key identity, can be replaced connection to ldap db or sql db  
 #  
 ###########################################################################################  
 users_db = {'123456789012345':{'maxbytes':'10000000', 'sessionid':'value_1', 'usedbytes':'0','cctime':'0'},  
      '123456789012346':{'maxbytes':'20000000', 'sessionid':'value_2','usedbytes':'0','cctime':'0' },  
      '123456789012347':{'maxbytes':'30000000', 'sessionid':'value_3', 'usedbytes':'0','cctime':'0'}}  
 # Functions to check if user is valid in users_db database and fetch profile settings  
 # Also upon receiving Credit-Control Initial request, update new session id for this user if exists  
 def check_valid_user(identity):  
  id = identity  
  if id in users_db:  
   return True  
  else:  
   return False  
 def update_sessionid(identity,sessionid):  
   id = identity  
   sessionid = sessionid  
   if id in users_db:  
    sessionid_updated = users_db[id]['sessionid']=sessionid  
    return sessionid_updated  
   else:  
    return False  
 def check_profile(identity):  
  id=identity  
  if id in users_db:  
   maxbytes = users_db[id]['maxbytes']  
   cctime = users_db[id]['cctime']  
   return maxbytes,cctime  
  else:  
   return None  
 ############END OF USER FUNCTIONS##############################  
 # Starting tcp server on python listening requests on port 3868  
 ##############################################################  
 SKIP=0  
 def handle_OCS(conn):  
   global sock_list  
   # conn is the TCP socket connected to the client  
   dbg="Connection:",conn.getpeername(),'to',conn.getsockname()  
   logging.info(dbg)  
   #get input ,wait if no data  
   data=conn.recv(BUFFER_SIZE)  
   #suspect more data (try to get it all without stopping if no data)  
   if (len(data)==BUFFER_SIZE):  
     while 1:  
       try:  
         data+=self.request.recv(BUFFER_SIZE, socket.MSG_DONTWAIT)  
       except:  
         #error means no more data  
         break  
   if (data != ""):   
     #processing input  
     dbg="Incomming message",data.encode("hex")  
     logging.info(dbg)  
     ret=process_request(data.encode("hex"))   
     if ret==ERROR:  
       dbg="Error responding",ret  
       logging.error(dbg)  
     else:  
       if ret==SKIP:  
         dbg="Skipping response",ret  
         logging.info(dbg)          
       else:  
         dbg="Sending response",ret  
         logging.info(dbg)  
         conn.send(ret.decode("hex"))    
   else:  
     #no data found exit loop (posible closed socket)      
     # remove it from sock_list  
     sock_list.remove(conn)  
     conn.close()  
 # Create CEA response to CER request  
 # Just answering with 2001 OK      
 def create_CEA(H):  
   global DEST_REALM  
   CER_avps=splitMsgAVPs(H.msg)  
   DEST_REALM=findAVP("Origin-Realm",CER_avps)    
   # Let's build Capabilites-Exchange Answer  
   CEA_avps=[]  
   CEA_avps.append(encodeAVP("Origin-Host", ORIGIN_HOST))  
   CEA_avps.append(encodeAVP("Origin-Realm", ORIGIN_REALM))  
   CEA_avps.append(encodeAVP("Product-Name", "OCS-SIM"))  
   CEA_avps.append(encodeAVP('Auth-Application-Id', 4))  
   CEA_avps.append(encodeAVP("Supported-Vendor-Id", 10415))  
   CEA_avps.append(encodeAVP("Result-Code", 2001))  #DIAMETER_SUCCESS 2001  
   # Create message header (empty)  
   CEA=HDRItem()  
   # Set command code  
   CEA.cmd=H.cmd  
   # Set Application-id  
   CEA.appId=H.appId  
   # Set Hop-by-Hop and End-to-End from request  
   CEA.HopByHop=H.HopByHop  
   CEA.EndToEnd=H.EndToEnd  
   # Add AVPs to header and calculate remaining fields  
   ret=createRes(CEA,CEA_avps)  
   # ret now contains CEA Response as hex string  
   return ret  
 # Create Watchdog response in reply to Watchdog request . We reply with 2001 OK  
 def create_DWA(H):  
   # Let's build Diameter-WatchdogAnswer   
   DWA_avps=[]  
   DWA_avps.append(encodeAVP("Origin-Host", ORIGIN_HOST))  
   DWA_avps.append(encodeAVP("Origin-Realm", ORIGIN_REALM))  
   DWA_avps.append(encodeAVP("Result-Code", 2001)) #DIAMETER_SUCCESS 2001  
   # Create message header (empty)  
   DWA=HDRItem()  
   # Set command code  
   DWA.cmd=H.cmd  
   # Set Application-id  
   DWA.appId=H.appId  
   # Set Hop-by-Hop and End-to-End from request  
   DWA.HopByHop=H.HopByHop  
   DWA.EndToEnd=H.EndToEnd  
   # Add AVPs to header and calculate remaining fields  
   ret=createRes(DWA,DWA_avps)  
   # ret now contains DWA Response as hex string  
   return ret  
 # Create Disconnect_Peer response in reply to Disconnect_Peer request. We just reply with 2001 OK for testing purposes  
 def create_DPA(H):  
   # Let's build Diameter-Disconnect Peer Answer  
   DPA_avps=[]  
   DPA_avps.append(encodeAVP("Origin-Host", ORIGIN_HOST))  
   DPA_avps.append(encodeAVP("Origin-Realm", ORIGIN_REALM))  
   DPA_avps.append(encodeAVP("Result-Code", 2001)) #DIAMETER_SUCCESS 2001  
   # Create message header (empty)  
   DPA=HDRItem()  
   # Set command code  
   DPA.cmd=H.cmd  
   # Set Application-id  
   DPA.appId=H.appId  
   # Set Hop-by-Hop and End-to-End from request  
   DPA.HopByHop=H.HopByHop  
   DPA.EndToEnd=H.EndToEnd  
   # Add AVPs to header and calculate remaining fields  
   ret=createRes(DPA,DPA_avps)  
   # ret now contains DPA Response as hex string  
   return ret  
 # Create Unable To Comply response in reply to request which is not understood. We reply with 5012 result-code AVP  
 def create_UTC(H,msg):  
   # Let's build Unable to comply packet  
   DWA_avps=[]  
   DWA_avps.append(encodeAVP("Origin-Host", ORIGIN_HOST))  
   DWA_avps.append(encodeAVP("Origin-Realm", ORIGIN_REALM))  
   DWA_avps.append(encodeAVP("Result-Code", 5012)) #UNABLE TO COMPLY 5012  
   DWA_avps.append(encodeAVP("Error-Message", msg))  
   # Create message header (empty)  
   DWA=HDRItem()  
   # Set command code  
   DWA.cmd=H.cmd  
   # Set Application-id  
   DWA.appId=H.appId  
   # Set Hop-by-Hop and End-to-End from request  
   DWA.HopByHop=H.HopByHop  
   DWA.EndToEnd=H.EndToEnd  
   # Add AVPs to header and calculate remaining fields  
   ret=createRes(DWA,DWA_avps)  
   # ret now contains DWA Response as hex string  
   return ret  
 # And here we create CCA-I responses in reply to CCR -I requests:   
 def create_CCA(H):  
    CCR_avps=splitMsgAVPs(H.msg)  
    try:  
    CCA_SESSION=findAVP("Session-Id",CCR_avps)  
    CCA_SSID=findAVP("Subscription-Id",CCR_avps)   
    CCA_IMSI=findAVP("Subscription-Id-Data",CCA_SSID)  
    CCA_REQUEST_TYPE=findAVP("CC-Request-Type",CCR_avps)    
    CCA_REQUEST_NUMBER=findAVP("CC-Request-Number",CCR_avps)  
    CCA_Unit=findAVP("Used-Service-Unit",CCR_avps)  
    except:  
    pass  
    if CCA_REQUEST_TYPE in [1]:  
    # Checking if user is valid in users_db:  
    valid_user=check_valid_user(CCA_IMSI)         
    if valid_user is True:  
     # If user is valid in users_db, check profile settings and extract CC-Time and CC-Total-Octets  
     max_bytes,cctime=check_profile(CCA_IMSI)  
     # Update session id received from CCR request in users_db for this user  
     updated_sessionid=update_sessionid(CCA_IMSI,CCA_SESSION)  
     print "Updated sessionid in DB is now: ",updated_sessionid  
     print "User ",CCA_IMSI," has limit of: ",max_bytes,"bytes"  
     # Answer with CCA response:  
     CCA_avps=[ ]  
     CCA_avps.append(encodeAVP('Result-Code', '2001'))  
     CCA_avps.append(encodeAVP('Session-Id', CCA_SESSION))  
     CCA_avps.append(encodeAVP('Origin-Host', ORIGIN_HOST))  
     CCA_avps.append(encodeAVP('Origin-Realm', ORIGIN_REALM))  
     CCA_avps.append(encodeAVP('CC-Request-Type', CCA_REQUEST_TYPE))  
     CCA_avps.append(encodeAVP('CC-Request-Number', CCA_REQUEST_NUMBER+1))  
     CCA_avps.append(encodeAVP('Auth-Application-Id', 4))  
     CCA_avps.append(encodeAVP('Supported-Vendor-Id', 10415))  
     # Check for None values:  
     if str(max_bytes) != 'None' or str(cctime) != 'None':  
     CCA_avps.append(encodeAVP('Granted-Service-Unit',[  
     encodeAVP("CC-Total-Octets",int(max_bytes)),encodeAVP("CC-Time",int(cctime))  
     ]))  
      # Create message header (empty)  
     CCA=HDRItem()  
      # Set command code  
     CCA.cmd=H.cmd  
      # Set Application-id  
     CCA.appId=H.appId  
      # Set Hop-by-Hop and End-to-End from request  
     CCA.HopByHop=H.HopByHop  
     CCA.EndToEnd=H.EndToEnd  
      # Add AVPs to header and calculate remaining fields  
     ret=createRes(CCA,CCA_avps)  
      # ret now contains CCA Response as hex string        
     return ret  
    elif valid_user is False:  
     # If user is not found in DB, return 5003 - not authorized  
     print "No such user ",CCA_IMSI," is found in DB:"  
      CCA_avps=[ ]  
     CCA_avps.append(encodeAVP('Result-Code', '5003'))  
     CCA_avps.append(encodeAVP('Session-Id', CCA_SESSION))  
     CCA_avps.append(encodeAVP('Origin-Host', ORIGIN_HOST))  
     CCA_avps.append(encodeAVP('Origin-Realm', ORIGIN_REALM))  
     CCA_avps.append(encodeAVP('CC-Request-Type', CCA_REQUEST_TYPE))  
     CCA_avps.append(encodeAVP('CC-Request-Number', CCA_REQUEST_NUMBER+1))  
     CCA_avps.append(encodeAVP('Auth-Application-Id', 4))  
     CCA_avps.append(encodeAVP('Supported-Vendor-Id', 10415))  
      # Create message header (empty)  
     CCA=HDRItem()  
      # Set command code  
     CCA.cmd=H.cmd  
      # Set Application-id  
     CCA.appId=H.appId  
      # Set Hop-by-Hop and End-to-End from request  
     CCA.HopByHop=H.HopByHop  
     CCA.EndToEnd=H.EndToEnd  
      # Add AVPs to header and calculate remaining fields  
     ret=createRes(CCA,CCA_avps)  
      # ret now contains CCA Response as hex string     
     return ret   
 def process_request(rawdata):  
   H=HDRItem()  
   stripHdr(H,rawdata)  
   dbg="Processing",dictCOMMANDcode2name(H.flags,H.cmd)  
   logging.info(dbg)  
   if H.flags & DIAMETER_HDR_REQUEST==0:  
     # If Answer no need to do anything  
     return SKIP  
   if H.cmd==257: # Capabilities-Exchange  
     return create_CEA(H)  
   if H.cmd==280: # Device-Watchdog  
     return create_DWA(H)  
   if H.cmd==272: # Credit-Control  
     return create_CCA(H)      
   if H.cmd==282: # Disconnect-Request-Peer  
     return create_DPA(H)  
   return create_UTC(H,"Unknown command code")  
 def Quit():  
   for conn in sock_list:  
     conn.close()  
   sys.exit(0)  
 if __name__ == "__main__":  
   # level for decoding are: DEBUG, INFO, WARNING, ERROR, CRITICAL  
   #logging.basicConfig(level=logging.INFO)  
   # Define server_host:port to use  
   HOST = "127.0.0.1"  
   DIAM_PORT = 3868  
   ORIGIN_HOST = "diameter.3gpp.org"  
   ORIGIN_REALM = "realm.3gpp.org"  
   DEST_REALM = "" # Leave it empty  
   LoadDictionary("../dictDiameter.xml")  
   BUFFER_SIZE=1024    
   MAX_CLIENTS=5  
   sock_list=[]  
   # Create the server, binding to HOST:DIAM_PORT  
   OCS_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)  
   # fix "Address already in use" error upon restart  
   OCS_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)  
   OCS_server.bind((HOST, DIAM_PORT))   
   OCS_server.listen(MAX_CLIENTS)  
   sock_list.append(OCS_server)  
   logging.info("Server started")  
   # Activate the server; this will keep running until you  
   # interrupt the program with Ctrl-C  
   while True:  
     try:  
       read, write, error = select.select(sock_list,[],[],1)  
     except:  
       break  
     for r in read:  
       logging.info("Incoming data")  
       # Is it new or existing connection  
       if r==OCS_server:  
         # New connections: accept on new socket  
         conn,addr=OCS_server.accept()  
         sock_list.append(conn)  
         if handle_OCS(conn)==ERROR:  
           Quit()  
       else:  
         if handle_OCS(r)==ERROR:  
           Quit()  
   Quit()  

2) demo_radius_diameter_server.py

 #!/usr/bin/python  
 #################################################################################################  
 # Example of radius server sending diameter Credit-Control requests to IP:port of Diameter server   
 # This script is using pyprotosim software  
 # Read the terms of BSD license at pyprotosim website:  
 # http://sourceforge.net/projects/pyprotosim/    
 ################################################################################################  
 import sys  
 #Next line is to include parent directory in PATH where libraries are  
 sys.path.append("../")  
 from libRadius import *  
 import libDiameter  
 import datetime  
 # Function to create diameter CCR request and parse received CCA response from Diameter server  
 def send_ccr_i(identity):  
  IDENTITY=identity  
  CCR_avps=[ ]  
  CCR_avps.append(libDiameter.encodeAVP('Origin-Host', ORIGIN_HOST))   
  CCR_avps.append(libDiameter.encodeAVP('Session-Id', GY_SESSIONID))  
  CCR_avps.append(libDiameter.encodeAVP('Called-Station-Id', 'test.apn'))  
  CCR_avps.append(libDiameter.encodeAVP('Origin-Realm', ORIGIN_REALM))  
  CCR_avps.append(libDiameter.encodeAVP('Destination-Realm', DEST_REALM))  
  CCR_avps.append(libDiameter.encodeAVP('Destination-Host', DEST_HOST))  
  CCR_avps.append(libDiameter.encodeAVP('Auth-Application-Id', 4))  
  CCR_avps.append(libDiameter.encodeAVP('CC-Request-Type', 1))  
  CCR_avps.append(libDiameter.encodeAVP('CC-Request-Number', 0))  
  CCR_avps.append(libDiameter.encodeAVP('Subscription-Id',[libDiameter.encodeAVP('Subscription-Id-Data', IDENTITY ), libDiameter.encodeAVP('Subscription-Id-Type', 1)]))  
  CCR_avps.append(libDiameter.encodeAVP('3GPP-SGSN-Address', '192.168.0.2'))  
  CCR_avps.append(libDiameter.encodeAVP('3GPP-MS-TimeZone', 'GMT'))  
  CCR_avps.append(libDiameter.encodeAVP('Access-Network-Charging-Address', '192.168.0.2'))  
  # Create message header (empty)  
  CCR=libDiameter.HDRItem()  
  # Set command code  
  CCR.cmd=libDiameter.dictCOMMANDname2code('Credit-Control')  
  # Set Hop-by-Hop and End-to-End  
  libDiameter.initializeHops(CCR)  
  # Add AVPs to header and calculate remaining fields  
  msg1=libDiameter.createReq(CCR,CCR_avps)  
  # msg now contains CCR Request as hex string  
  # send data  
  Conn=libDiameter.Connect(OCS_HOST,OCS_PORT)  
  Conn.send(msg1.decode('hex'))  
  # Receive response  
  received1 = Conn.recv(1024)  
  # Parse and display received ANSWER  
  #print "="*30  
  #print "THE CCA - I ANSWER IS:"  
  msg=received1.encode('hex')  
  #print "="*30  
  H=libDiameter.HDRItem()  
  libDiameter.stripHdr(H,msg)  
  CCA_avps=libDiameter.splitMsgAVPs(H.msg)  
  cmd=libDiameter.dictCOMMANDcode2name(H.flags,H.cmd)  
  # We need get STATE 2001 or 5003 from Response  
  CCA_status=libDiameter.findAVP("Result-Code",CCA_avps)  
  if str(CCA_status) in ['2001']:   
   return True  
  elif str(CCA_status) in ['5003','5002']:   
   return False  
  else:  
   return False  
 # Function that creates radius access response to the client  
 def create_Access_Response():  
  # Create message header (empty)  
  RES=HDRItem()  
  stripHdr(RES,data.encode("hex"))  
  RID=RES.Identifier  
  RES.Code=dictCOMMANDname2code("Access-Request")  
  RES.Identifier=RID  
  REQ_avps=splitMsgAVPs(RES.msg)  
  try:  
  IDENTITY=findAVP("Calling-Station-Id",REQ_avps)  
  USER_NAME=findAVP("User-Name",REQ_avps)  
  enc_password=findAVP("User-Password",REQ_avps)  
  USER_PASS_DECODED=PwDecrypt(enc_password,RES.Authenticator.decode("hex"),SECRET)  
  except:  
  pass  
  print "Found user identity is", str(IDENTITY)  
  print "Received Accounting Access Request from User" + ":"+str(USER_NAME) +","+ "and Password is " + str(USER_PASS_DECODED)  
  if USER_NAME == 'test' and USER_PASS_DECODED == 'password':  
  #Call DIAMETER with IDENTITY as CCR-I  
  valid_user=send_ccr_i(IDENTITY)  
  #This will be Access-Accept returned to client  
  if valid_user is True:  
   RES.Code=dictCOMMANDname2code("Access-Accept")  
   RES_avps=[]  
   auth=createAuthenticator()  
   RES_avps.append(encodeAVP('Session-Timeout',600 ))  
   RES_avps.append(encodeAVP('Acct-Interim-Interval', 60))  
   msg=createWithAuthenticator(RES,auth,RES_avps,SECRET)  
   return msg  
  elif valid_user is False:  
  #This will be Access-Reject returned to client  
   RES.Code=dictCOMMANDname2code("Access-Reject")  
   RES_avps=[]  
   auth=createAuthenticator()  
   msg=createWithAuthenticator(RES,auth,RES_avps,SECRET)  
   return msg   
  else:  
  # This will be Access-Reject too  
  RES.Code=dictCOMMANDname2code("Access-Reject")  
  RES_avps=[]  
  auth=createAuthenticator()  
  msg=createWithAuthenticator(RES,auth,RES_avps,SECRET)  
  return msg  
 def create_Session_Id():  
   #The Session-Id MUST be globally and eternally unique  
   #<DiameterIdentity>;<high 32 bits>;<low 32 bits>[;<optional value>]  
   now=datetime.datetime.now()  
   ret=ORIGIN_HOST+";"  
   ret=ret+str(now.year)[2:4]+"%02d"%now.month+"%02d"%now.day  
   ret=ret+"%02d"%now.hour+"%02d"%now.minute+";"  
   ret=ret+"%02d"%now.second+str(now.microsecond)  
   return ret  
 if __name__ == "__main__":  
  #logging.basicConfig(level=logging.DEBUG)  
  #logging.basicConfig(level=logging.INFO)  
  LoadDictionary("../dictRadius.xml")  
  libDiameter.LoadDictionary("../dictDiameter.xml")  
  # Set up here IP and Port for your RADIUS ACCOUNTING server  
  RADIUS_IP = "127.0.0.1"  
  RADIUS_PORT = 1812  
  BUFFER_SIZE=4096  
  # Set up shared secret here  
  SECRET="SOMEPASSWORD"  
  # FOR OCS SERVER:  
  OCS_HOST="127.0.0.1"  
  OCS_PORT=3868  
  ORIGIN_HOST="radius.3gpp.org"  
  ORIGIN_REALM="realm.3gpp.org"  
  DEST_REALM="realm.3gpp.org"  
  DEST_HOST="diameter.3gpp.org"  
  GY_SESSIONID=create_Session_Id()  
  # Now creating simple udp socket   
  RADIUS_server = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)  
  RADIUS_server.bind((RADIUS_IP, RADIUS_PORT))  
  # Starting udp server in a loop  
  # Looping server until user sends CTRL+C or kill to stop it.  
  while True:  
   data, addr = RADIUS_server.recvfrom(BUFFER_SIZE)  
   if (data != ""):   
   msg=create_Access_Response()  
   dbg="Sending Access Response"  
   logging.info(dbg)  
   RADIUS_server.sendto(msg.decode("hex"),addr)  
   #End of code  

3) demo_radius_client.py

 #!/usr/bin/python  
 #############################################################################  
 # Example of radius client sending Access-Request to IP:port of Radius server   
 # This script is using pyprotosim software  
 # Read the terms of BSD license at pyprotosim website:  
 # http://sourceforge.net/projects/pyprotosim/    
 ############################################################################  
 #Next two lines are to include parent directory for testing  
 import sys  
 sys.path.append("../")  
 # Remove them normally  
 # Radius client  
 from libRadius import *  
 import datetime  
 import time  
 def create_Request():  
   # Create message header (empty)  
   REQ=HDRItem()  
   # Set command code  
   REQ.Code=dictCOMMANDname2code("Access-Request")  
   REQ.Identifier=1  
   REQ.Authenticator=createAuthenticator()  
   # Let's build Request   
   REQ_avps=[]  
   REQ_avps.append(encodeAVP("Calling-Station-Id", "123456789012348"))  
   REQ_avps.append(encodeAVP("Called-Station-Id", "test"))  
   REQ_avps.append(encodeAVP("User-Name", "test"))  
   REQ_avps.append(encodeAVP("User-Password", PwCrypt(USER_PASSWORD,REQ.Authenticator,SECRET)))  
   REQ_avps.append(encodeAVP("NAS-Identifier", "GGSN"))  
   REQ_avps.append(encodeAVP("NAS-IP-Address", "1.2.3.4"))  
   REQ_avps.append(encodeAVP("NAS-Port-Type", 5))  
   REQ_avps.append(encodeAVP("NAS-Port", 6000))  
   REQ_avps.append(encodeAVP("Acct-Session-Id", "sessionID"))  
   REQ_avps.append(encodeAVP("Acct-Multi-Session-Id", "multisessionID"))  
   REQ_avps.append(encodeAVP("Service-Type", 2))  
   REQ_avps.append(encodeAVP("Framed-Protocol", 1))  
   # Add AVPs to header and calculate remaining fields  
   msg=createReq(REQ,REQ_avps)  
   # msg now contains Access-Request as hex string  
   return msg  
 if __name__ == "__main__":  
   #logging.basicConfig(level=logging.DEBUG)  
   logging.basicConfig(level=logging.INFO)  
   LoadDictionary("../dictRadius.xml")  
   HOST="127.0.0.1"  
   PORT=1812  
   USER_PASSWORD='password'  
   SECRET='SOMEPASSWORD'  
   # Let's assume that my Radius messages will fit into 4k  
   MSG_SIZE=4096  
   ###########################################################  
   Conn=socket.socket(socket.AF_INET, socket.SOCK_DGRAM)  
   # socket is in blocking mode, so let's add a timeout  
   Conn.settimeout(5)  
   ###########################################################   
   # Create Access-Request    
   msg=create_Request()  
   # msg now contains Access-Request as hex string  
   logging.debug("+"*30)  
   #print "Access-Request",msg  
   # send data  
   Conn.sendto(msg.decode("hex"),(HOST,PORT))  
   # Receive response  
   received = Conn.recv(MSG_SIZE)  
   # Process response  
   RES=HDRItem()  
   stripHdr(RES,received.encode("hex"))  
   radius_avps=splitMsgAVPs(RES.msg)  
   for avps in radius_avps:  
     print decodeAVP(avps)  
   print radius_avps  
   # Normally - this is the end.  
   ###########################################################  
   # And close the connection  
   Conn.close()  

Tuesday, 16 September 2014

convert IP address hex to decimal python


convert IP address hex to decimal in python:

Python script to convert hexadecimal value of IPv4 address into decimal:

#!/usr/bin/python  
import sys  
# Checking options, must be (0xXXXXXXXX)  
if len(sys.argv)< 2:  
  print("Usage: %s (HEX FORMAT, for example 0xC0A80001)" % sys.argv[0])  
  sys.exit(0)  
user_option = sys.argv[1]  
hex_data=user_option[2:]  
#Check length, should be 8 , leading 0 is matter  
if len(hex_data)< 8:  
  hex_data = ''.join(('0',hex_data))  
def hex_to_ip_decimal(hex_data):  
  ipaddr = "%i.%i.%i.%i" % (int(hex_data[0:2],16),int(hex_data[2:4],16),int(hex_data[4:6],16),int(hex_data[6:8],16))  
  return ipaddr  
result=hex_to_ip_decimal(hex_data)  
print result  


Testing:

./decode_hex_to_ip.py 0xC0A80001
192.168.0.1

Tuesday, 2 September 2014

Small Radius Accounting server on python with LDAP connectivity


I was looking for specific testing scenario and I need to create a simple Radius accounting server which is connected to LDAP. This server should do :

 Upon receiving client Radius accounting start packet , server parses Framed-IP-Address, Calling-Station-Id and many other 3GPP- vendor and custom attributes and update this information in LDAP DB and respond with ACK.

At the beginning I was experimented with pyrad, but spent some time to create and attach customized dictionary properly and also for 3GPP attributes and later had difficulties to parse these attributes , but  chose pyprotosim software (http://sourceforge.net/projects/pyprotosim/) and documentation for python sockets to build small udp server and clients. The dictionary is stored in xml format and easy can be exported and added your own attributes when needed.
So, here is the an example of simple Radius accounting server which is using python-ldap module for LDAP connectivity.
In Ubuntu :
sudo apt-get install python-ldap will install it on the system.
and unpack pyprotosim and create some directory under it.

$demo_radius_acct_server.py
#!/usr/bin/python  
# Simple example of small Radius Accounting Server on basis of pyprotosim #software  
# which parses incoming message and searches Calling-Station-Id and Framed-IP-Address AVPs  
# (which are IDENTITY AND IPADDRESS of user in LDAP).  
# If the accounting packet is of type = Start, then it connects to LDAP DB, where  
# this user is stored and calls function to update received from radius packet new IP address in  
# LDAP DB for that user (identity).  
# Please refer to BSD license and copyright information for pyprotosim #software to (http://sourceforge.net/projects/pyprotosim/)  
##############################################  
import sys  
import ldap  
#Next line is to include parent directory in PATH where libraries are  
sys.path.append("../")  
# Remove it normally  
from libRadius import *  
######################################################################   
# Function to update new IP address for this subscriber in LDAP DB  
######################################################################  
def update_user(identity,ipaddress):  
  try:  
   print "UPDATING IP ADDRESS in LDAP DB"  
   IDENTITY=identity  
   IPADDRESS=ipaddress  
   l = ldap.initialize(LDAP_HOST_URL)  
   ldap_username = LDAP_USERNAME  
   ldap_password = LDAP_PASSWORD  
   l.simple_bind(ldap_username, ldap_password)  
   baseDN = "cn=users,o=mycompany,o=org"  
   searchScope = ldap.SCOPE_SUBTREE  
   retrieveAttributes = None  
  except:  
   # NO LDAP CONNECTIVITY - RAISE an ERROR  
   print "THERE IS NO LDAP CONNECTIVITY!!!! PLEASE CHECK LDAP CONNECTION"  
  try:    
   from ldap import modlist  
   # Here is your ldap filter string  
   ldap_filter='identity=' + str(IDENTITY)  
   dn=ldap_filter + "," + baseDN  
   mod_attrs = [( ldap.MOD_REPLACE, 'ipaddress', str(IPADDRESS) )]      
   l.modify_s(dn,mod_attrs)  
   print "successfully modified ipaddress for user:", IDENTITY, "in LDAB DB"  
  except ldap.LDAPError, error_message:  
    print error_message     
  # Its nice to the server to disconnect and free resources when done    
  l.unbind_s()  
##############################################################################   
# Function to search IP address of client for this subscriber in LDAP DB  
##############################################################################  
# That's the main function that updates IP address value in LDAP DB accordingly  
# for the given user identity and creates Accounting-Response  
def create_Acct_Response():  
  # Create message header (empty)  
  RES=HDRItem()  
  stripHdr(RES,data.encode("hex"))  
  RID=RES.Identifier  
  RES.Code=dictCOMMANDname2code("Accounting-Response")  
  RES.Identifier=RID  
  REQ_avps=splitMsgAVPs(RES.msg)  
  STATE=findAVP("Acct-Status-Type",REQ_avps)  
  try:  
   IDENTITY=findAVP("Calling-Station-Id",REQ_avps)  
   IPADDR=findAVP("Framed-IP-Address",REQ_avps)  
  except:  
   pass  
  print "Found IDENTITY/IP:", str(IDENTITY) + ", " + str(IPADDR)  
  STATE=STATE.encode("hex")  
  if STATE == '00000001':  
   print "THIS IS ACCOUNTING START"  
   # updating new IP address in LDAB DB for given user:  
   update_user(IDENTITY,IPADDR)  
  elif STATE == '00000002':  
   print "THIS IS ACCOUNTING STOP"  
   # do some code here  
  elif STATE == '00000003':  
   print "THIS IS ACCOUNTING INTERIM"  
   # do some code here  
  elif STATE == '00000007':  
   print "THIS IS ACCOUNTING ON"  
   # add code here if needed  
  elif STATE == '00000008':  
   print "THIS IS ACCOUNTING OFF"  
   # add code here if needed  
  else:  
   print "UNKNOWN PACKET RECEIVED"  
  # The RES_avps are left empty here because function createWithAuthenticator must be constructed with RES_avps according libRadius.xml  
  # But according RFC for Radius protocol, accounting response could have no any AVPs in it, therefore leaving RES_avps as empty array  
  RES_avps=[]  
  auth=createZeroAuthenticator()  
  # Accounting-Response has no need to have attributes in it  
  msg=createWithAuthenticator(RES,auth,RES_avps,SECRET)  
  # msg now contains Accounting-Response as hex string  
  return msg  
if __name__ == "__main__":  
  #logging.basicConfig(level=logging.DEBUG)  
  #logging.basicConfig(level=logging.INFO)  
  LoadDictionary("../dictRadius.xml")  
  #Change your LDAP IP and Port here:  
  LDAP_HOST_URL='ldap://127.0.0.1:389'  
  #Change to your ldap user  
  LDAP_USERNAME="cn=admin,o=mycompany,o=org"  
  LDAP_PASSWORD="secret"  
   # Set up here IP and Port for RADIUS ACCOUNTING server  
  RADIUS_IP = "127.0.0.1"  
  RADIUS_PORT = 1813  
  BUFFER_SIZE=4096  
  # Set up shared secret here  
  SECRET="SOMEPASSWORD"  
  # Now creating simple udp socket  
  RADIUS_server = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)  
  RADIUS_server.bind((RADIUS_IP, RADIUS_PORT))  
  # Looping server until user sends CTRL+C or kill to stop it.  
  while True:  
   data, addr = RADIUS_server.recvfrom(BUFFER_SIZE)  
   if (data != ""):  
    msg=create_Acct_Response()  
    dbg="Sending response"  
    logging.info(dbg)  
    RADIUS_server.sendto(msg.decode("hex"),addr)  
   #End of code  
So, upon receving radius accounting packet , the value of new IP address in LDAP DB is updated successfully

./demo_radius_acct_server.py

Found IDENTITY/IP: 123456789012346, 192.168.0.220
THIS IS ACCOUNTING START
UPDATING IP ADDRESS in LDAP DB
successfully modified ipaddress for user: 123456789012346 in LDAB DB


Tuesday, 6 May 2014

python ldap


If ldap is using to authenticate users and for the purpose of load tests it is sometimes required to generate and load into ldap db certain amount of user's ids within defined ip range.

This can be achieved by using python's ldap module effectively.

The following example will create user's uids and login ids within the defined ip addresses range between 192.168.0.1 and 192.168.0.5  (5 users) with corresponding uids from 100 till 105 and login ids named user100-user105 accordingly and load them into ldap database. 

# create_ldap_users.py
#!/usr/bin/python  
#You need to install python-ldap module  
import ldap
from ldap import modlist  
# Global values, should be defined here  
LDAP_URL='ldap://127.0.0.1:389'  
# First and last ip in range  
STARTING_IP="192.168.0.1"  
ENDING_IP="192.168.0.5"  
# From this uid users will be incremented  
STARTING_UID="100"  
# End of global values  
# Example of function was taken from internet, will return ip_range  
def ipRange(start_ip, end_ip):  
  start = list(map(int, start_ip.split(".")))  
  end = list(map(int, end_ip.split(".")))  
  temp = start  
  ip_range = []  
  ip_range.append(start_ip)  
  while temp != end:  
   start[3] += 1  
   for i in (3, 2, 1):  
    if temp[i] == 256:  
     temp[i] = 0  
     temp[i-1] += 1  
     ip_range.append(".".join(map(str, temp)))  
   return ip_range  
if __name__== '__main__':  
 # Connect to ldap   
 l = ldap.initialize(LDAP_URL)  
 ldap_username = "cn=admin,o=mycompany, o=org"  
 ldap_password = "password"  
 l.simple_bind(ldap_username, ldap_password)  
 # Creating two uids and ipaddresses text files:  
 ip_addr_file = open('ip_addresses.txt', 'w')  
 # Defining range from global values  
 ip_range = ipRange(STARTING_IP, ENDING_IP)  
 number_uids=len(ip_range)  
 for ip in ip_range:  
  ip_addr_file.write("%s\n" % str(ip))  
  ip_addr_file.close()  
  uid_file = open('uids.txt',"w")  
 for uid in range(number_uids):  
  first_uid=int(STARTING_UID)  
  uid_to_ip_map=first_uid + uid  
  uid_file.write("%s\n" % str(uid_to_ip_map))  
  uid_file.close()  
  # After files were created, go over them and create list with pairs of uid matching its ip address.  
 ip_addr_file = open('ip_addresses.txt','r')  
 ip_addr_file_splitted = ip_addr_file.read().split()  
 uid_file = open('uids.txt','r')  
 uid_file_splitted = uid_file.read().split()  
 # Here is a list with ips and uids from files  
 ldap_users = [ip_addr_file_splitted,uid_file_splitted]  
 # Loop over range of ip_addresses:  
 for numbers in range(0,len(ip_addr_file_splitted)):  
  # create merged list each first uid matches first ip, second uid -> second etc.  
  merged = [user[numbers] for user in ldap_users]  
  # Creating empty dn list  
 dn ={}  
 dn = "uid=" + str(merged[1])+",cn=users,o=mycompany,o=org"  
 dn = str(dn)  
 # Creating empty list with attributes  
 attrs = {}  
 # Filling attributes according our schema  
 attrs['uid'] = str(merged[1]) # second element from merged list, this is id  
 attrs['objectClass'] = 'top'  
 attrs['objectClass'] = 'login'  
 attrs['login'] = 'user' + str(merged[1]) # ( fisrt element in merged list)  
 attrs['ipaddress'] = str(merged[0])# (second element from merged list)  
 attrs['telephone'] = '123-456-78'  
 attrs['department'] = 'IT'  
 # Creating ldif  
 ldif = modlist.addModlist(attrs)  
 print ldif  
 # Loading into ldap  
 l.add_s(dn,ldif)  
 # disconnecting and free resources when done  
 l.unbind_s()  
 # Print how many users were created in ldap  
 print "NUMBER OF USER_UIDs WERE CREATED: ", number_uids  


The example output is :

# ./create_ldap_users.py

[('uid', '100'), ('objectClass', 'login'), ('telephone', '123-456-78'), ('department', 'IT'), ('login', 'user100'), ('ipaddress', '192.168.0.1')]

[('uid', '101'), ('objectClass', 'login'), ('telephone', '123-456-78'), ('department', 'IT'), ('login', 'user101'), ('ipaddress', '192.168.0.2')]
 
[('uid', '102'), ('objectClass', 'login'), ('telephone', '123-456-78'), ('department', 'IT'), ('login', 'user102'), ('ipaddress', '192.168.0.3')]
 
[('uid', '103'), ('objectClass', 'login'), ('telephone', '123-456-78'), ('department', 'IT'), ('login', 'user103'), ('ipaddress', '192.168.0.4')]
 
[('uid', '104'), ('objectClass', 'login'), ('telephone', '123-456-78'), ('department', 'IT'), ('login', 'user104'), ('ipaddress', '192.168.0.5')]
 
NUMBER OF USER_UIDs WERE CREATED:  5