Python Notes

  

These are my personal notes that I use as a quick help in my work.
You are welcome to read them.

Contents of current page Top-level home page
 
Index  Java Internet Oracle Notes
Linux Basics Web Basics SQL Notes
Informatica Servlets Apache BkpRstore SQL*Plus
Teradata   LDAP Storage PL/SQL
Windows     Tables OEM
UML   Net8 Portal
SQL Server Python perl Performance OLAP
Vmware Visual Basic PHP/MySQL User Mgmt  
Git        
More technical pages here

Contents

 


Introduction

  

See separate file with Python code snippets

Command Line

py --version      Windows
python3 --version      Linux, outside of a virtual enviroment
python --version      in a virtual environment
python -V      all environments

python -c command [arg] ... quote command with single quotes.
Exit with ^z or quit() or exit(). ^d on *nix

python -m module_name [arg]
Run a module that is somewhere in the system path. Note: no ".py" because this is a module name, not a file name
Sort of equivalent to:
python
import module_name

python -m searches in sys.path for the named module (without .py) and executes it as the main module. You can also import (import to do "help" and see doc)

python module_name.py [arg]
Run a script file. Note: the ".py" extension is not mandatory. It just has to be a valid file name

Command line documentation

Syntax Basics

Multi-line\
command \
with a backslash

# This is a comment

Complex type summary:

Type ImmutableOrdered Empty
String'...' "..." """..."""immYs[0] is 1st char""
List[a,] or [a,b]mutYaccess by offset: l[0] is 1st element[]
Tuple(a,) or (a, b)immYaccess by offset: t[0] is 1st element()
Dict{"a":b, "c":d}mutNaccess by keys: d["k"]; keys are unique.{}
Set{a, b}mutNaccess by iteration; elements are unique set()
Filen/a    

Note:
A tuple is immutable. But items it contains, such as lists, can be mutable.
User-defined classes are mutable.
Never pass a mutable object as a default value for a parameter.

  

Modules and Packages

A module is basically a file with a a_module.py extension.
Import into another module with import a_module. Access the objects in the module with a_module.obj. The module should be in the current directory or in the PYTHONPATH environment variable (see with sys.path).
Add a path: sys.path.append("full path")
See where the module was found: a_module.__file__
Provide alias for the module: import a_module as the_module_alias

Import another module's objects into the current module's namespace/symbol table with from a_module input name1,name2.

A module can be executed as a script: python a_module.py. In this case, the "__name__" dunder variable is "__main__". The following skips code when the module is imported, meaning when it is not a stand-alone script:
if __name__ == "__main__":

In the file that I am executing: __name__ == "main"
In imported modules: __name__ == "module_name" # module name is the file name
In functions: the_function_name.__name__ == "the_function_name"

A package is basically a sub-directory, which we will call a_package.
Be sure to put a file called "__init__.py" in the sub-directory a_package.
Import a module from the package with one of the two following lines:
from a_package import a_module
or
import a_package.a_module as a_mod
With __all__ = ["module1", "module2"] in the __init.py__, the listed modules are automatically loaded when doing from a_package import *
It is considered bad practice to use from a_package import *

Python adds the current directory to sys.path when running a script. See about PYTHONPATH below

It looks like you have to think in terms of the Python path sys.path, which is a list of directories. When running a script, the local directory is automatically one of the directories in the list sys.path. To import modules from another directory, here are the options:

Execute from root. This works without __init__.py in the root.


File a.py  ------------
import b.bmod
print(b.bmod.b_fctn())    # bmod module imports c.cmod
end  a.py  ------------

File b/__init__.py

File b/bmod.py  ------------
import c.cmod
def b_fctn():
    return "calling: " + c.cmod.c_fctn()
end  a.py  ------------

File c/__init__.py

File c/cmod.py  ------------
def c_fctn():
    return "Executing " + c_fctn.__name__
end  a.py  ------------

Alternative for file a.py above:


File a.py  ------------
from b import bmod
print(bmod.b_fctn())    # Notice: no 'b.'
end  a.py  ------------

If you are in the b sub-directory, and if the PYTHONPATH is not set to the parent directory, then add the parent to the sys.path.
The b/bmod.py file stays the same


File b/b.py  ------------
import sys
sys.path.append("..")
import bmod
print(bmod.b_fctn())    # bmod module imports c.cmod
end  a.py  ------------

variable sys.path.

The variable sys.path is a list of strings that determines the interpreter's search path for modules. It is initialized from these locations:

Add directories to the sys.path with the following code:

import sys
sys.path.append('/some/thing/python')

The best is to set the PYTHONPATH to the root of the project: export PYTHONPATH=.
With this, import the modules with the dot notation:
import folder.module
If you are in a sub-folder of the project's root folder, do: export PYTHONPATH=..
In pipenv, set it in the .env file:
### .env file
export PYTHONPATH=.

See details on package installation: https://packaging.python.org/tutorials/installing-packages/

 

Suggested structure

README.md
.gitignore
LICENSE
Pipfile
app/
    __init__.py
    app.py
docs/
    /conf.py
tests/
    test_basic.py
    test_advanced.py

Another structure:

Root:
  .git
  README.md
  .gitignore

  Pipfile
  Pipfile.lock

  abc  # dir with source
      common # folder
      transformers # folder
  tests  # folder
      common # folder for unit tests
      transformers # folder for unit tests
      integration_test  # folder
  configs # folder

  

Help and Troubleshooting

  

print(" ", end='') # end supresses the end of line

dir(module name) -> sorted list of strings with the names defined in the module
dir() --> current defined names
__builtin__

Start interactive python shell:
import module
help() ???
help(module) Shows information on the module (import first). Put this in the .py file too, helps a lot
help(object) Shows information on the object
help(object()) Shows information on what the object returns
object.__dict__ Also shows information on the object (note: __dict__ not __dir__).
object().__dict__ Information on what the object returns
In interactive shell, the _ is the last value

Style

My thoughts on styling:

 

 


Install

Install Pip

If pip is not installed, do:
python3 -m ensurepip --default-pip
py -m ensurepip --default-pip

If necessary:
sudo apt install python3-pip
or
python3 -m pip --user

Install a Package

On Windows
py -m pip install ...

On Linux and Mac, do pip3:
pip3 install ...

sudo apt install python-numpy
sudo apt install python-matplotlib

c:\Python34\Scripts\pip.exe install matplotlib
c:\Python27\Scripts\pip.exe install matplotlib
Do NOT run pip with sudo.

Preferably install in a virtual environment.

Environment variables:
PYTHONHOME: location of the standard Python libraries (default: prefix/lib/pythonversion and exec_prefix/lib/pythonversion)
Set PYTHONHOME to prefix:exec_prefix

Install from a requirements file:
python3 -m pip install -r requirements.txt

Behind a firewall, you may have to do add "--trusted-host pypi.org --trusted-host files.pythonhosted.org" as follows:
pip install --trusted-host pypi.org --trusted-host files.pythonhosted.org package...

Upgrade

Upgrade pip:
python3 -m pip install --upgrade pip setuptools wheel

Upgrade any package (notice same syntax as upgrade of pip):
python3 -m pip install --upgrade SomeProject

Replace python with python3 in a virtual environment

Typical Paths for Python Installations

Add these to the PATH environment variable:
C:\Users\..user..\AppData\Local\Programs\Python\Python38-32
C:\Users\..user..\AppData\Local\Packages\PythonSoftwareFoundation.Python. ....\LocalCache\local-packages\Python310\Scripts

If you set the PYTHONPATH env var as the root of the project, all imports will find the modules in the subdirectories

set PYTHONSTARTUP=PYTHONSTARTUP.txt
this is a file executed before first prompt
See https://docs.python.org/3/using/cmdline.html#envvar-PYTHONHOME

  

Installing a version that is not current

Go to python.org, and look for the list of versions.
Choose a version with an installer, otherwise you will have to run the install scripts.

Or, install with the regular installer, with one of the options:
sudo apt install python3.8
sudo yum install python38
sudo amazon-linux-extras install python3.8
It seems to be with the "." in "apt", and without in "yum".

  

Arguments

import sys
sys.argv[0] # this is the command
sys.argv[1] # first argument

Return to OS

s.py:
import sys

sys.exit(0)
# 0 is successful, 1 or more means error

s.py another option:
raise SystemExit(1)

python3 s.py
ret=$? # get the return code now (or it is lost when another command is issued)
if [ ${ret} -ne 0 ]
then
  # handle error
fi

 

Virtual Environments

Am I in a virtual environment? Run where python in Windows, or which python in Linux.

Online coding space: colab.research .google . com ????

 

pipenv

Start by installing:
pip install pipenv
(on linux: pip3)

On Windows, add the following to the PATH environment variable (this assumes installation of python from python.org):
C:\Users\<username>\AppData\Roaming\Python\Python38\Site-Packages
C:\Users\<username>\AppData\Roaming\Python\Python38\Scripts

pip3 list # list of installed packages

cd directory
pipenv shell --python .....python.exe
pipenv shell --python /usr/bin/python3
#This creates a new file "Pipfile"
#and creates the virtual env: see pipenv --venv
# in case of error, try re-installing: pip install pipenv
pipenv --venv  # see where virtual env is stored
pipenv install pandas
pip list    # shows all installed packages. Notice "pip" not "pipenv"
deactivate  # deactivate the venv

Edit the Pipfile if needed
Move dev packages from [packages] to a section called [dev-packages], such as pytest, pylint, jupyter, ...
Pipfile.lock contains the exact versions of what was installed
pipenv install --ignore-pipfile   # this installs the software from the pipfile.lock file instead
                                  # in this way, I can reproduce the environment exactly as I tested it
pipenv install --dev  # load with the dev-packages

# delete by removing the directory that is given by 'pipenv --venv'
# or:
cd project_directory_where_the_Pipenv_file_is_located
pipenv --rm

# existing installation:
# After first time, activate simply with :
pipenv shell    # restart shell
pipenv install  # installs everthing in the Pipfile
deactivate      # or exit

pipenv graph  # shows the installed packages and the dependencies

If necessary, do python3 -m pipenv ...

Exit pipenv: exit or deactivate

For a different version of python: Edit pipfile and put version 3.7 pipenv -python 3.7.
Or better: use virtualenv

Doc: https://pipenv.pypa.io/en/latest/basics/

If I get "Shell for UNKNOWN_VIRTUAL_ENVIRONMENT already activated" then do "exit" because I am still in a virtual environment

Run a file without opening a shell
pipenv run python a-py.py

Run in the virtual environment, but without having to do "exit" when done
pipenv run python

 

venv

python -m venv name_of_virtual_env
cd name_of_virtual_env
Scripts\activate.bat
Scripts\pip install . . .
Scripts\python  # to start shell
deactivate      # when done

(Set the slashes appropriate for the operating system)
venv module is standand, meaning that no installation is needed
Don't put scripts into directory with virtual environment. And add to .gitignore
Delete the env by deleting the directory

pip list. See all pkgs

pip freeze > requirements.txt
In another installation, use
pip install -r requirements.txt

 

virtualenv

Seems better for alternate version of Python

pip install virtualenv

Create environment
virtualenv env1
cd env1
source bin/activate (Linux/Mac)
Scripts\activate (Windows)

Exit:
deactivate

Virtual env with specific version of python. This requires installation of the specific version (good luck: I have had varying degrees of success).
Initialize the virtual environment:
virtualenv -p path/python.exe a_dir
virtualenv -p C:\path\python.exe a_dir
virtualenv -p /Library/Frameworks/Python.framework/Versions/3.8/bin/python3 the_env

Use req file
pip install -r requirements.txt

List packages:
pip list /p>

 

.env file

pipenv shell and pipenv run automatically load .env file.
By default, it is at the root of the project. Set another location with PIPENV_DOTENV_LOCATION.


######
# file .env
THE_ENV_VAR=abc
THE_PATH=${HOME}/...:/etc/another/path
######

file aaa.py
import os
env_var = os.environ['THE_ENV_VAR']
the_path = os.environ['THE_PATH']

View all:

for k,v in os.environ.items():
    print(f"{k}: {v}")

 

Dvp env

pipenv run pytest
pipenv run lint
pipenv run tidy

or
pipenv run pytest
pylint file.py
pipenv run tidy

Be sure to install with pipenv install pytest
pipenv install pylint

 

Dependency Management

https://packaging.python.org/guides/tool-recommendations/
https://realpython.com/pipenv-guide

applications:
  pip install -r requirements.txt
package:
  setup.py

vi

Configuration for vi:
:set syntax=python

or set the following (to be verified):
syntax enable
set tabstop=4
set shiftwidth=4
set expandtab
filetype indent on
set autoindent

or:
set sw=4 et ts=4 ai
set smartindent cinwords=if,elif,else,for,while,try,except,finally,def,class

Eggs

Go to the directory where I want to create the egg:
python.exe setup.py bdist_egg
The second to last line of the output has the location of the resulting file
Open a .egg with 7zip to view

  

Make ready for production

Options for passwords, access keys, and secrets

Improve code:

Documentation, being intuitive

Logging to std_out: use lib logging

Options for exception handling:

Orchestration tool: calls the run.py, which has configuration, initialization, and execution

Configuration in config.yaml. What is executed is the tuple (config, executable)

Meta file for job control: the source files that were successfully loaded

install pyyaml

  

  

Additional notes

Error with certificates in Mac

Execute /Applications/Python 3.7/Install Certificates.command
this command replaces the root certificates of the default Python installation with the ones shipped through the certifi package

  

  


Variables

  

No declaration needed or even possible
Assignement: a_var = 3
Note: _ (underscore) is a variable. Generally used to put a throw-away value
Names:

type(x) # gives type of variable x
if type(x) is str: # tests if the type is string, list, ...
id() # identity function

However, object type comparisons should always use isinstance() instead of comparing types directly:
Correct:
if isinstance(obj, int):
Wrong:
if type(obj) is type(1):

The "==" operator tells us if the objects have the same value:
a == b
The "is" keyword tells us if the underlying objects are the same:
a is b
The preceding line is equivalent to
id(a) == id(b)

0o (zero o in lower or upper case) is octal
0x (zero X) is hexadecimal
0b (zero B) is binary
convert with hex, bin, oct
a + bj # complex number

Naming conventions:

global xyz # makes variable global. Generally considered sloppy programming

Basic types


Boolean

Subclass of int
Literals True and False (initial cap, then lower case)
True equiv to int 1, False to int 0


Strings and Bytes

  

Special characters:

a_str.startswith("start_to_look_for") a_str.startswith("start_to_look_for", n) # comparison starts at position n

Unicode:
"\u0394" # Using a 16-bit hex value
"\U00000394" # Using a 32-bit hex value

str is the type for text (unicode)
chr(i) returns a character with code i. "\u234" is unicode
ord(c) with c as a Unicode character: returns an integer
hex(i) returns a hexadecimal string

 

int(s) # convert string to integer
ord(c) # return the character code

a_string[0:2] # first 2 characters! Substring from 0 (counting from 0, meaning position n-1) to position n (counting from 1)
print("12345"[0:1])  # --> "1" (first element)
print("12345"[0])  # --> "1" (first element)
print("12345"[0:3])  # --> "123"
print("12345"[0:30]) # --> "12345"
print("12345"[1:-1]) # --> "234"
print("12345"[3:])   # --> "45"
print("12345"[:3])   # --> "123"
print("12345"[:-2])  # --> "123"
print("12345"[2])  # --> "3"
print("12345"[-1])  # --> "5" (last element)
print("12345"[0:0])  # --> empty
print("1234567890"[::3])  # --> "1470" (first in slice, then every third)
print("1234567890"[::-1])  # --> "0987654321" (reverse)
print("1234567890"[4:9:3])  # --> "58" (first in slice, which is "56789", then every third )
print("1234567890"[:])  # --> Copy

"""multi-
line
string"""

len("asd") # gives length
str1 is str2 # true if both identifiers refer to the same location
str1 == str2 # true if the content is the same. str1 is str2 implies str1 == str2, but not the other way

s = s1 + s2 # concatenate with "+"
s = 'string' 'string2' # or string literals side by side
s = 'string' \
    'string2' #
or one string on each line, with backslashes (end-of-line indicator)
3*'string' # -->repeats
See operators below

In py3, if s is a string:
s.split(',') # if argument is null, then splits based on whitespace. Consecutive white spaces counted as one.
s.find('asdf') # find the 'asdf' in the string. -1 if not found
"dog" in "the quick dog." # determine if a a string contains a string
In py2, import the string module and do the following:
string.split(s, ',') # if null, then splits based on whitespace. Consecutive white spaces counted as one.
str.split() # alternate
string.find(s, 'asdf') # find the 'asdf' in the string. -1 if not found

escape: (need followup)
repr(s)
triple quotes
"\"" contains one char: "
r"\"" contains two chars: \ and " (this is a raw string)
b"\"" bytes, see above
f"text {expr} text" formatting, see below
str.replace('"', '\\"').replace("'", "\\'")
str.rstrip('\r\n') # remove combinations of trailing line feeds and carriage returns
str.startswith(begin_str) # return true if str starts with beginstr
str.startswith(begin_str, n) # comparison starts at position n

Prepare map for patterns:
mp = str.maketrans('abcdefghijklmnopqrstuvwxyz' + 'abcdefghijklmnopqrstuvwxyz'.upper() + '0123456789', 'a'*26 + 'A'*26 + '9'*10)
Translate:
a_string.translate(mp)

 

Show character code instead of character for special characters:
''.join([c if ord(c)<255 else "[" + str(ord(c)) + "]" for c in list(the_string)])

 

Methods for bytes and strings

try out and complete documentation

 

Formatting (f strings)

f"text {var} or {expression}"

f"text {numeric:8.2f}" # 2 digits after the decimal point, and 8 characters in all
If the output does not fit, it is expanded.

f"text {expression=}" # with "=" before the colon ":", the expression precedes the value.

Some options:

Literal curly brackets: {{ and }}.

Note that single quotes have to be used inside the the curly brackets when the f-string is surrounded by double quotes, and vice versa

Note that you cannt use a backslash inside the curly brackets

 

Bytes

bytes is the type for data or 8-bit strings; a literal is b"asdf". They can be considered lists of small integers (0-255).
"Déjà vu".encode('utf-8') gives b'D\xc3\xa9j\xc3\xa0 vu'

UTF-8 is the de-facto standard: it is generally safe to assume encoding="utf-8".

Convert string to bytes (both options are equivalent):
byts = bytes("abc", "utf-8")
byts = "abc".encode("utf-8")

To bytes (two hexadecimal digits per byte; ignores white space):
bytes.fromhex('2Ef0 F1f2 ')

From bytes to string (separator is optional, -2 keeps every two hexadecimal digits, starting from the left):
b'2ef0 f1f2'.hex(" ", -2)

bytearray objects are a mutable counterpart to bytes objects.

Write bytes to and read bytes from files:
open(name, "rb")
open(name, "wb")
You cannot specify the encoding in binary mode.

 

 


Operators

  

+ for concatenation
"abc" * 3 # repeat 3 times, referencing the same element. Leads to odd cases resembling pointer issues in C

// # floor division (division of two integers with result as integer)
Note: in py2, divisions are always integer divisions. Therefore, start files with:
from __future__ import division

x > 10 and x < 20 equivalent to 10 < x < 20
+= -= # a += b is a = a + b

Operator precedence (low to high)

OperatorsComments
lambda 
x if condition else yConditional expression
or 
and 
not x 
in, not in, is, is not, <, <=, >, >=, <>, !=, == 
|Bitwise OR
^Bitwise XOR
&Bitwise AND
<<, >>Shift operators
+, -Addition and subtraction
*, /, //, %Multiplication, division, remainder [8]
+x, -x, ~x Unary minus, unary plus, bitwise NOT
**Exponentiation [9]
x[index], x[index:index], x(arguments...), x.attributeSubscription, slicing, call, attribute reference
(expressions...), [expressions...], {key: value...}, 'expressions...'Binding or tuple display, list display, dictionary display, string conversion

lambda [parameter_list]: expression
lambda a,b,c: a*b+c
is equivalent to:
def anonymous_fctn(a,b,c)
    return a*b+c
In event handler binding: lambda a=1,b=20,c=-2: self.evt_handler(event, a, b, c) # add event parameter for event handlers, remove for command handlers
By the way, assigning a lambda to a variable is like doing def and is not recommended for readability

Note:
x = f   # assigns function f to x
x = f() # evaluates function f and assigns the result to x

# ---------- code for class: curry (begin) ---------------------
class curry:
"""from Scott David Daniels'recipe
"curry -- associating parameters with a function"
in the "Python Cookbook"
http://aspn.activestate.com/ASPN/Python/Cookbook/
"""
def __init__(self, fun, *args, **kwargs):
self.fun = fun
self.pending = args[:]
self.kwargs = kwargs.copy()
def __call__(self, *args, **kwargs):
if kwargs and self.kwargs:
kw = self.kwargs.copy()
kw.update(kwargs)
else:
kw = kwargs or self.kwargs
return self.fun(*(self.pending + args), **kw)
# ---------- code for class: curry (end) ---------------------
# ---------- code for function: event_lambda (begin) --------
def event_lambda(f, *args, **kwds ):
"""A helper function that wraps lambda in a prettier interface.
Thanks to Chad Netzer for the code."""
return lambda event, f=f, args=args, kwds=kwds : f( *args, **kwds )
# ---------- code for function: event_lambda (end) -----------


Lists

  

A list is an ordered group of items or elements, not necessarily of the same type
As for all sequential types


[1,3,4,3,43] # a list
a = [[a, b, c], [x, y]] # a list of lists
["asd", "asfee"] # a list of strings
[] # empty list
list(something) # makes a list
i in [1,2,3] # True if i in the list

a_list[0:1] # returns the first element in a list: (e,)
See also under strings

a_list.append(new_element) # adds one element to list (if the new element is a list, then it is added as one element)
lst += elmt_to_append is equivalent to lst.append(elmt_to_append) (although a hair slower)
y_list = [a,b,c] + [e,d,f] # concatenates lists. Note: append and extend modify the list, the + operator creates a new list
lst = lst + lst2 # concatenates. Slow, because the list is copied to a new object
a_list.extend(list) # concatenates list to list
', '.join(list) # concatenates elements of list with ', ' in between each element
', '.join(filter(None, list)) #Filtering out the None elements prevents errors on concatenating null strings.
lst.index(a) returns the first index of the value a.
a_list.sort(key=None, reverse=False) # Sorts the list, changing the original list
sorted(a_list) # Sorts the list and creates a new list
sorted(a_list, key=f, reverse=TrueOrFalse) # f is a function (lambda): abs, ...
del a_list[i] # delete an element

The methods insert, remove and sort only modify the list and have no return value. If you want to keep the sorted in a new object, do: newobj = sorted(obj)

enumerate(a_list) returns tuples with (index, element)
Instead of:
for i in range(len(a_list)):
    do_something(i, a_list[i])

Do this:
for i, a in enumerate(a_list):
    do_something(i, a)

list(zip(a, b)) creates a list of tuples with elements from each list. The zipping stops when the end of the shortest list is reached
Unzip with the following trick:
col1, col2 = zip (*a_list_with_tuples_with_2_elements)
.items() returns a zip object. list(dct.items()) returns a list

len(a) gives length of the list

for i in [1,2,3,4,5]:
    print i

text to list: need to understand this:
data = data.split('\n')
x = [row.split(' ')[0] for row in data]
y = [row.split(' ')[1] for row in data]

map(f, a) # applies f to all elements of a
map(lambda v: v/2, a) # with lambda
reduce(f, a) # applies f to first two elements of a, then applies f to the result and third element...
# example: reduce(lambda x,y: x if x<y else y, a)
filter(f, a) # returns a list with elements for which f(element) is true
filter(lambda v: v/2 > 1, a) # with lambda
enumerate(f, a) # returns a list with elements for which f(element) is true
# The above assumes the following:
a = list(range(5))
def f(x):
    return x/2
def fi(x):
    return f(x)>1

all(a_list) # --> True if all elements are true, different from zero, or not empty
any(a_list) # --> True if any element is true, different from zero, or not empty

Comprehensions

List comprehension:
a_list = [ ((float(9)/5)*x + 32) for x in range(-40,101,5) ] # convert -40 to 100 deg C to F

This is how it works:

a = []
for i in range(10):
    if i%2==0:
        a.append(i**2)

Equivalent to:
a = [i**2 for i in range(10) if i%2==0]

Generator comprehension with round brackets:
gc = ( ((float(9)/5)*x + 32) for x in range(-40,101,5) )
A generator is does not store the data, but the functionality

Set comprehension:
{ } instead of [ ].
from math import sqrt
n = 100
no_primes = {j for i in range(2,int(sqrt(n))) for j in range(i*2, n, i)}
primes = {i for i in range(n) if i not in no_primes}

List comprehension with two loops:
[(x, y) for x in [1,2,3] for y in [3,1,4] if x != y]
[(1, 3), (1, 4), (2, 3), (2, 1), (2, 4), (3, 1), (3, 4)]

Unpacking

See under tuples

 

Renaming, Shallow Copies, and Deep Copies

Summary:

When you do list_copy = a_list, you have two pointers to the same memory location.
Create a copy with list_copy = a_list.copy() (shallow copy)
If lists are nested, and the nested lists also need to be copied, then do:
from copy import deepcopy
list_copy = deepcopy(a_list)

Explanation:

Python assigns variables to locations of lists, meaning that after "a=b", "a" and "b" point to the same memory location until a new assignement is made on one of the variables (see "print(id(a),id(b)").

When a and b are a list — pointing to the same list in memory — and I assign a new value to one of the elements, as in "a[0]="new value"", then both "a" and "b" see the change.
To ensure that "b" points to a different memory location than "a", do the following:
b=a.copy()

Although "b=a.copy()" is "deeper" than with just the equal sign, it is not deep enough. If one of the elements is itself a list, that nested list is not copied, and ... re-belote ... any change to the nested element will show in both outer lists.

A deep copy is done as follows (thanks to https://www.python-course.eu/python3_deep_copy.php):
from copy import deepcopy
b = deepcopy(a)

Note that the deep copy is only necessary when the lists are nested.
Note that the deep copy goes into all nested lists, not just the first nesting.
Be aware of recursive objects and cases where data is intended to be shared between copies.

Simple illustration:

from copy import deepcopy

a = [1, 2, ["a", "b"]]
print("a, original", a)
b_rename  = a           # new variable, same memory location
b_shallow = a.copy()    # shallow copy
b_deep    = deepcopy(a) # deep copy

# make changes
a[1]    = 321
a[2][1] = "changed"

# show results
print("a, changed", a)
print("b, renamed", b_rename)
print("b, shallow", b_shallow)
print("b, deep   ", b_deep)

  

Data Structures

Stacks

Use a_list.append(aaa) and a_list.pop() to use a list as a last-in-first-out stack.

Queues

For queues, lists are not efficient. Use the deque package, which is efficient for appending and poping to and from both ends:

from collections import deque
aqueue = deque([1, 2 ,3])
aqueue.append(4)
firstin = aqueue.popleft()

  

  


Tuples

Tuples are immutable lists
Most of what is done with lists can be done with tuples
t = ("one", "another", "third")
t = "one", "another", "third" # also works without brackets
() # empty tuple
(3,) # one-item tuple
t = tuple(something) # creates a tuple

t[1] # second item (first has index 0)

x, y = y, x # this swaps values (see "unpacking" under Lists)

Tuples vs lists:

  

Unpacking

Unpack a list (you have to know how many items it has):
x, y = [1, 4] # --> gives two non-lists with 1 and 4.
x, y are a tuple

Similar to unpacking, do multiple assignments on the same line:
a, b, c = 3, 6, 2

Similar to unpacking, swap values:
a, b = b, a

Unpacking works with list and other collections, including strings.

a,b,c = "abc"
print(a,b,c)

If the full length of the tuple is not known:
a variable preceded by an asterisk takes on the list of all remaining values, and is empty if there are none.
a, b, *c, d = (1,2,3,4,5,6)
Or optionally if I want to not use the values (good practice, not required):
a, b, *_ = (1,2,3,4)

  


Dictionaries

Disctionaries are unordered sets in which elements are accessed by keys. They are composed of key:value pairs: {'key name': a value, ....}
The index uses only immutable data types, including tuples (which are immutable), but not lists (which are not immutable)

dic = {"a":234, "bas":4322}
dic["a"] # gives 234
dic["c"] = 42332 # add an element
{} # empty dictionary

Operators:
len(dic)
del dic[k] # delete a key and value
k in dic
k not in dic
dic.pop(k) # returns the value for k and TAKES IT OUT of the dictionary
dic.popitem() # returns arbitrary key-value pair as tuple, and removes them
dic.get(k) # returns the value (keeping it in the dictionary). No error if key not in dic
dic.get(k, a_deflt) # returns the value and returns a_deflt if not in dic
dic.get(k).get(k2) if k in dic else "" # Although .get() does not throw an error if the key is not in the dictionary, check for non-existant key for a sub dictionary
copied_dic = dic.copy() # shallow copy
dic.clear() # points to empty dictionary
dic1.update(dic2) # add elements of dic2 into dic1. When keys are the same, assign new values from dic2

dic.keys() # Acts like a list of keys (actual type is dict_keys)
dic.values() # Acts like a list of values (actual type is dict_values)
dic.items() # Acts like a list of (key, value) tuples (actual type is dict_items)

for k in dic: # interate over the keys of the dictionary
    print("key:",k)
    print("value:",a_dict[k])

for k,v in dic.items(): # better practice: use ".items()"
    print("key:",k)
    print("value:",v)

if x is not None: # test for null object

Beware of:
if x
when you really mean
if x is not None
The following values evaluate to False, but they are not None:

try:
    do_something_with( a_dict["xyz"] )
except KeyError:
    print('"xyz" not in a_dict')
Or:
if ("xyz" in a_dict):
    do_something_with( a_dict["xyz"] )
else:
    print('"xyz" not in a_dict')

 

Dictionary comprehension:
a_text = "WHEN in the Course of human Events"
dct_of_char_counts = {c: a_text.count(c) for c in set(a_text)}

 

Advanced sort:
a_list.sort(key=lambda x: x[1]) # sort on the second element
a_list.sort(key=lambda x: x[3] + "|" + x[2]) # sort on something else

def a_sorting_function(x):
. . .
a_list.sort(key=a_sorting_function) # sort on something else (notice no "()")


  


Sets

  

A set is an unordered collection of unique and immutable objects. Similar to a key-only dictionary
Create set from "string", (tuple), [list], but not list of lists
The set is mutable, the elements are not: attempting to add a mutable element raises the error "hashable type"
Useful for finding memberships because the "in" operation is fast, and for having distinct collections
set(sequence or other iterable object) # create set, removing the duplicates
{sequence or other iterable object} # alternate notation
frozenset(...) # creates an immutable set.
set() # empty set

Operations:
s.add(element)
s.clear() # remove all elements
s.copy() # shallow copy
s1.difference(s2) # elements in s1 that are not in s2
s1 - s2 # elements in s1 that are not in s2, alternate notation
s1.difference_update(s2) # remove from s1 the elements in s2
s1 = s1 - s2 # alternate notation
s.discard(el) # remove el from set; no error if el not in set
s.remove(el) # remove el from set; raise error if el not in set
a in s # True if a in set s (note: this is a fast operation)
s1.union(s2) #
s1 | s2 # alternate
s1.intersection(s2) #
s1 & s2 # alternate
s1.disjoint(s2) # true if no intersection
s1 <= s2 s1.issubset(s2) # true if s1 is included in s2
s1 < s2 # proper subset (subset but not equal)
s1 >= s2 s1.issuperset(s2) #
s1 > s2 # proper superset
s.pop() # remove an element

Use a set for testing membership, because sets use hash tables under the covers. Searching for membership in a set is much faster than searching in a list
Therefore, make a set, then test for "if xyz in the_set: . . ."

  


Read

a_var = input("prompt: ") # get user input
a_var = eval(input(...)) # evaluate the returned value to a number

N = input("Please enter name: ")
print "Name is", N
# At keyboard, enclose with quotes!!! (at least in Py2)
#instead, use raw_input in py 2

N = raw_input("Please enter name: ")
print "Name is", N

some_input = raw_input().upper()

  


Numbers

the following are evaluated as false:
zero
False
""
[] # empty list
() # empty tuple
None

n = input("Maximal Number? ")

The "float" data type stores a number for which the decimal portion is close, but may not be exactly equal to the string representation: print(f"{0.1:.20f}") does NOT show 0.100000... as there is a small difference.
If precision is required, use:
from decimal import Decimal
a = Decimal("0.1") # be sure to declare as a string

Random numbers
import random
random.random() # Generate random number between 0 and 1
[random.random() for _ in range(4)] # Four random number between 0 and 1
random.seed(n) # n is a number
random.randrange(m) # Randomly choose between 0 and m-1
random.randrange(m, n) # Randomly choose between m and n-1
random.shuffle(a_list) # Shuffle the list
random.choice(a_list) # Choose one
random.sample(a_list, n) # Choose n in the list
[random.choice(a_list) for _ in range(n)] # Choose n in the list, allowing duplicates (replacements)

  


Files

  

f.open(. . .)
open(filename, mode, encoding=None)
f.name gives the name of the file
f.mode gives the mode

f.read() # loads the whole file
f.readlines() # loads all the lines into a list
f.readline() # iterator ????
for line in f: # line by line. Is readline method the default?


f.read(size) returns a string of size characters in text mode
f.read(size) returns a bytes object of size bytes in binary mode
If size is omitted, then the whole file is read.
If the end of the file has been reached, f.read() will return an empty string.

c = f.read(1)
while len(c)>0:
    print(c)
    c = f.read(1)


f.tell() #gives position of pointer of the reader
f.seek(offset, whence) where whence=0 means the start of the file, whence=1 current position, whence=2 end of file. Works best when in bytemode.
print(" ", end='') # this supresses the EOL
print(" ", end='[EOL]')


  

f.readline() reads a line. It returns the "\n" character too. When it returns an empty string, it reached the end.

Efficient way of reading:

for line in f:
    # do something with line

Or use:
list(f)
f.readlines()

  

Read:
f = open(fname, "r")
while True:
    l = f.readline()
    if not l: # reached EOF
        break
    one_line = l.strip() # note: if you strip before testing for the EOF, empty lines will make it think it reached the EOF
    ...
f.close()

Set context (better practice):
If the module errors out, the file connector is automatically closed
with open(file_with_list_of_usernames, "r") as f:
    for l in f:
        one_line = l.strip()
        ...

a_fh = open("data.txt","w", encoding="utf-8") # r for read, w for write (overwrite), a for append to the end. Optional encoding="utf-8" if necessary
print("asdf", file=a_fh)
a_fh.read(n)
a_fh.readline(n)
for l in a_fh:
    ....
a_fh.close()

f.write("abc") returns the number of characters written

a_fh.write("text\n")
a_fh.close()

Always close files explicitely, or better still, use a context

with open("hello.txt", mode="w") as file:
    file.write("Hello, World!")

formatting: see
http://www.python-course.eu/python3_formatted_output.php

files, system, ...

import os
os.listdir()
for a_path, dirs_in_path, files_in_path in os.walk("a directory"):
    print(a_path, dirs_in_path, files_in_path)

os.chdir(new_path)
os.getcwd() # get current directory
os.startfile(file in windows with an extension)
os.system(command)
os.mkdir("dir-to-be-created")
os.makedirs("path-and-dir-to-be-created") # Creates nested directories in one command
os.remove() remove a file.
os.rmdir("dir-to-be-removed") # remove an empty directory.
os.stat(file-name) # stats on a file
datetime.fromtimestamp(os.stat(file-name).st_mtime) # Date and time of last modification
shutil.rmtree() delete a directory and all its contents.

os.environ['ONE_ENV_VAR'] = '1'
a_var = os.environ['ONE_ENV_VAR']

import os
os.path.basename(" ...") # file name with extension but without the directory and without a slash
os.path.splitext(" ...")[0] # first part of the file name (including directory)
os.path.splitext(" ...")[1] # extension (including the ".")
os.path.join("path", "file") # concatenates and ensures that there is one and only one separator
os.path.dirname(" ...") # the directory without trailing slash
# file name with extension but without the directory
os.path.splitext(os.path.basename(" ..."))[0] # first part of the file name without the directory
The following is always true:
f ==os.path.dirname(f) + (os.sep if len(os.path.dirname(f))>0 else "") + os.path.splitext(os.path.basename(f))[0] + os.path.splitext(f)[1]

shutil.copy(src, dst)

import sys
sys.stdin
sys.stdout
sys.stderror

#read from stdin and allow pipe into Py:
for line in sys.stdin.readlines():
   ....
#then, in command line:
something.sh | the_py_script.py
#write out
sys.stdout.write(...)

Show default directory:
import sys
print (sys.prefix) #default directory
print (sys.path) #path to executables
print (sys.getwindowsversion())
print (sys.platform)

import os
print (os.environ)
os.environ.get('PYTHONPATH')
os.environ.get('WINDIR')

print(value1, ..., sep=', ', end='[eol]', file=sys.stdout, flush=False)
# the default sep is blank (space)
# the default end is \n

Always close files

with open("hello.txt", mode="w") as file:
    file.write("Hello, World!")
try:
    file = open("hello.txt", mode="w")
    file.write("Hello, World!")
finally:
    file.close()
Line Endings

When reading in text mode, the platform-specific line endings are converted behind the scenes to \n. The opposite is done when writing.
Platform specific: \n on *nix, \r\n on Windows.

Be careful not to corrupt binary files with this behavior.

  

StringsIO

import io

With io.StringIO(), if you get "TypeError: string argument expected, got 'bytes'", then use Bytes instead.

  


Structures

  


for a in sequence:    <-----\     # seq is list, tuple, string, key of dictionary
     ...                     \
     continue   # quit current iteration and goes to next
     ...
     pass       # do nothing, go to next line|
     ...                                 <---|
     ...
     break      # premature termination, skips the "else" part|
     ...                                                      |
else:  # Always executed, at exit of loop, and also when the loop was not entered
       # not executed in case of break                        |
     ...                                                 <----|


if (x == 0): # ( ) not mandatory
   ...
elif cond:
   ...
else:
   ...

a if (cond) else b # an expression that returns a if cond is true, else b


while cond:           <-----\
     ...                     \
     continue   # quit current iteration and goes to next
     ...
     break      # premature termination, skips the "else" part|
     ...                                                      |
else:  # executed when exiting loop if cond is false, even if did not enter loop
       # not executed in case of break                        |
     ...                                                 <----|


def fctn_name(param, param, optional-param=value):   # madatory parameters go first
    """
    doc string, accessible with fctn_name.__doc__
    """
    ...
    return     # returns None, execution ends here
    ...
    return a_val  # execution ends here
    ...
    return (a,b,d)  # return a tuple
    ...

 


def fctn_name():
    pass   # empty statement

 

fctn_name(3,2,7,g=value) # param g as keyword

variables are local to the function where they are first assigned
global v # set the variable as global

 

New since version 3.10:

Not tried yet
match an_expr: 
    case val1: 
        do_something 
    case val2: 
        do_something 
    case alt1 | alt2:  
        do_something  # if matches alt1 or alt2 
    case [a, b, c]: 
        do_something  # if structure of "an_expr" is a list of three elements 
                      # the elements are then available generically 
    case [a, b, c, *rest]: 
        do_something  # if structure of "an_expr" is a list of three OR MORE elements 
                      # all additional elements are available in variable rest 
    case other:       # equivalent to "else".  "case _:" can also be used 
        do_something
    case _:           # equivalent to "else".  "case other:" can also be used 
        do_something

 

Arguments

 

Python passes arguments by reference.

 

Call a function with a tuple, and the function handles each element (argument unpacking)
fctn_name(*a_tuple)
   ...
    # use a_tuple as a tuple
   ...
Call a function with a dictionary
fctn_name(**a_dic)
    # use a_dic as a dictionary (tried, did not work. Need to explore more)
   ...

def fctn_name(*a, **k)
   a... # tuple of the unnamed arguments
   k... # dictionary of the named arguments
   ...

def a_fctn(**kwargs):
   arg1 = kwargs.pop("arg_1", "default value")
   print(kwargs) # note that kwargs no longer has "name" because it was poped out

Hints for types (see annotations):
def fctn_name(a: str) -> str:

Catching Exceptions

Simplest, but bad practice (called a "bare except"):
try:
   ...
except:
   ...
When using the bare except, the clause will catch SystemExit and KeyboardInterrupt exceptions. I may not be able to interrupt the program with ctrl-C (the program will keep on going). And the bare except can disguise other problems.

No bare excepts. If I have to, do:
except Exception:
the ctrl-c does not get caught here. We can still stop our execution. (Without Exception, a ctrl-C does not stop execution).

Add the traceback:
option 1: traceback.print_exc()
option 2: s = traceback.format_exc() then print(s) or put it in a log

 

try:
   ...
except Exc1:
   ...
except Exce2, Exce3:
   ...
except Exception as e:
   print(f"Unexpected {e=}, {type(e)=}")
   e.add_note('Add some information')
   e.add_note('Add some more information')
   raise # re-raise the most recent error
else:
   # If there is no exception then execute this block.
finally:
   # This is always executed, even when an exception occurs

 

Custom exception:

class PasswordNotFound(Exception): 
    """
    Raised when no password was found
    """
    def __init__(self, username):
        self.message = f"Password not found for username '{username}'"
        super().__init__(self.message)

if len(the_pw)==0:
    raise PasswordNotFound(the_username):

 

continue with http://www.python-course.eu/python3_recursive_functions.php

 

 

 

Annotations

# function annotations:
def fctn(a: "annotation here", b: "here too") -> "annotation about returned type":
# Typically, the annotations are the types (as far as I know, nothing is enforced)
# But they can be any text
# Example:
def fctn(a: int,  b: str) -> list:

# Variable annotations:
var1: "annotation here"
# Again, typically, it is a type, but it can be a text:
var1: int = 123


Decorators


Closures

A closure is a record with a function. For example, a closure is inner function that has access to data after the outer function is executed. It seems to mean data kept with the functionality.


First-class functions:

Functions can be treated as any other object, including being assigned to a variable and passing it as a parameter. In python, and also javascript

def f1():
    return "something"

print(f1())  # prints something
print(f1  )    # shows "function f1 at ..." 


Returning a Function

Reminder to help understand decorators:

def outer_fctn():
    def inner_fctn():
        print("a")
    return inner_fctn()    # notice parentheses here
outer_fctn()  # this prints "a"

def outer_fctn():
    def inner_fctn():
        print("a")
    return inner_fctn      # notice NO parentheses here
outer_fctn()    # this prints something like "<function outer_fctn.<locals>.inner_fctn at 0x7f53beb02200>"
outer_fctn()()  # this prints "a"  (notice two sets of parentheses)
f = outer_fctn()
f()             # this prints "a"


Decorators

A decorator is a function that adds another function as an argument, and returns a function (function in, function out)

from functools import wraps                # optional: provides correct naming (see below)

def decorator_fctn(input_fctn):            # input: a function
    @wraps(input_fctn)                     # optional:
                                           # the wrapped function shows with name "input_fctn"
                                           # instead of the name "the_wrapper"
    def the_wrapper(*args, **kwargs):
        ...
        return input_fctn(*args, **kwargs) # this executes the functon (remember to put parentheses)
    return the_wrapper                     # output is a function (no parentheses)

@a_decorator
def a_fctn_with_decorator():
    return "this is 'a_fctn_with_decorator()'"

# is the same as:
def xfctn():
    return "this is 'a_fctn_with_decorator()'"
a_fctn_with_decorator = a_decorator(xfctn)

Try with this code:

def dec(f):
    print("dec A")
    def w():
        print(f"Wrapper for {f.__name__}")
        return f()
    print("dec B")
    return w

@dec
def decorated_f1():
    print("in some_fctn")
    return " executed"

decorated_f1()

def some_fctn():
    print("in some_fctn")
    return " executed"

decorated_f2 = dec(some_fctn)
decorated_f2()



# now, with parameters:
def dec(f):
    print("dec A")
    def w(*a, **k):        # added *a, **k here, otherwise no difference
        print(f"Wrapper for {f.__name__}")
        return f(*a, **k)        # added *a, **k here, otherwise no difference
    print("dec B")
    return w



def some_fctn(msg):
    print("in some_fctn, msg=",msg)
    return msg + " executed"

decorated_f = dec(some_fctn)
decorated_f("asdf")


@dec
def decorated_f2(msg):
    print("in some_fctn, msg=",msg)
    return msg + " executed"

decorated_f2("asdf")

(End of sample code to try)

Sample of Logger Decorator

import logging
logging.basicConfig(...)

def mylog(orif_fctn):
    def w(*args, **kwargs):
        logging.info(f"Ran with args: {str(args)} and {str(kwargs)}")
        return orig_fctn(*args, **kwargs)
    return w

Sample of Timing Decorator

import time
def time_exec(orig_fctn):
    def w(*a, **k):
        t = time.perf_counter()
        result = orig_fctn(*a, **k)
        elapsed = time.perf_counter() - t
        return result
    return w

When stacking decorators, the last gets executed first.


Enumerations

As enumerations, are not real classes, I prefer the following creation syntax:
from enum import Enum
my_enum = Enum("Color", ["RED", "BLUE", "GREEN", "YELLOW"])

And, frankly, I am not interested in what Python assigns as values. If I really want to see the values, do list(my_enum), or c.RED.value (and c.RED.name for the name).


Generators:

In a generator function, "yield" sort of replaces "return" in regular functions.
Note however that execution continues after the "yield", but does not after the "return".

def a_generator(var_that_is_an_enum):
    for i in var_that_is_an_enum:
       yield the_calculation(i)   # yield is the key
v = a_generator(a_list)
print(next(v))  # it knows to show first
print(next(v))  # it knows to show second

range():
print([i for i in range(5)]) --> [0, 1, 2, 3, 4]
print([i for i in range(0, 5)]) --> [0, 1, 2, 3, 4] 0 is default start
print([i for i in range(1, 5)]) --> [1, 2, 3, 4]
print([i for i in range(0, 5, 1)]) --> [0, 1, 2, 3, 4] 1 is default step
print([i for i in range(0, 5, 2)]) --> [0, 2, 4]
print([i for i in range(1, 5, 2)]) --> [1, 3]



Misc

x = 5
# can be written as:
(x := 5)  # valid, but not recomended!
# the brackets are crucial
After Py 3.8

 

  


Processes

formatting: see
http://www.python-course.eu/python3_formatted_output.php

files, system, ...

import os
os.listdir()
os.chdir(new_path)
os.getcwd()
os.startfile(file in windows with an extension)
os.system(command)

os.remove() remove a file.
os.rmdir() remove an empty directory.
shutil.rmtree() delete a directory and all its contents.

os.environ['ONE_ENV_VAR'] = '1'
a_var = os.environ['ONE_ENV_VAR']

shutil.copy(src, dst)

https://docs.python.org/3.4/library/subprocess.html

Forking:
(only on linux/unix?)
newpid = os.fork() # creates fork
# the fork copies memory and code to a new process. I guess both just pick up from there?
os.getpid() # the process id, as shown in ps -aux
os._exit(0) # exist the child process

Threading:
# works in Windows and in Linux (py 2, did not try py 3)
from threading import Thread
def a_function:
    pass
t = Thread(target=a_function, args=(...,))
t.start()
t.join() # not sure what this does yet

Generators / Iterators
use lazy generation of lists so as not to fill memory. range is lazy in Python3

  


Classes

class ClassName: # by convention PascalCase
     ...

class MyClass:
    class_attr = 123  # usually before the __init__

    def __init__(self, a, b):
        self.a = a   # instance attribute
        self.b = b

    def __str__(self):
        return f"attributes are {self.a} , {self.b} "
        # Overwrite this to provide a non-default value for display

    def method_nm(self):
        # instance method
        self.attrib = ... # value for the instance
        MyClass.attrib = ... # value for the class

    # parameter self points to an instance of the class when the method is called
    # and modify the class through the self.__class__ attribute
    # call in two ways:
    #obj.method_nm()
    #or
    #MyClass.method_nm(obj)

    @classmethod
    def classmethod_nm(cls, ...):
        # class method
        # cls parameter points to the class
        # can call without any instance
        cls.class_attribute = ...
        ...
        cls(args_for_constructor)    # Create a new object inside a class method


    @staticmethod
    def staticmethod_nm(...):
        # static method
        # Can call without any instance
        # Static methods do not pass an object by default.  The first argument is neither self nor cls.
        # Typically, if you do not access the "self" or the "cls", then you should probably make a static method

c = MyClass(1,2) # instantiate
class_instance = Class_name()   # remember the parentheses
Alias_of_class = Class_name     # without parentheses, it is just an alias

# A call to an instance method "adds" the instance as the first argument. It is represented by convention as self in the method definition.
# A call to class  method "adds" the class as the first argument. It is represented by convention as cls in the method definition.
# A call to static  method "adds" nothing.

# When calling a method, the following two syntaxes are equivalent
class_instance.method_name(a, b)
Class_name.method_name(class_instance, a, b)    # this is what is actually happening, hence the argument "self"

print(c.a, class_attr)  # see the attributes (but not best practice)
c.a = new_value  # alter attributes (not best practice: modify through a method)

help(Classname)        # shows information about class
print(dir(Classname))  # lists methods

A dunder method starts and ends with double underline: __dunder_method__.
__init__(): the constructor
__str__() # string representation
__repr__() # representation, for developpers. Often show what would re-create the object

isinstance(var, class) ==> True if var is an instance of class class, of a subclass of class

No private instance variables. By convention, _something with preceeding underscore is considered protected and should not be accessed from the outside
Protected means that subclasses can access
Double underscore for private

 

When you forget the "self", Python throws the error "takes 0 positional arguments but 1 was given". To resolve, add "self" as the first argument in the method.
If it is static, then add the "@staticmethod" decorator (and remove "self" or "class").

 

 


call Complex:
     sh = ...  # shared variable

     def __init__(self, re, im, v=None):    # Constructor
         self.r = re
         self.i = im
         self.s = set()
         # s, r and i are instance variables
         if v is not None:
             for vv in v:
                 s.add(vv)

     def a(self):    # By convention, all functions take self as first parameter
         ...

     def __repr__(self):    # Representation
         return "..." #
Put meaningful data into string format

To see the namespace of an instance or a class:
abc.__dict__
namespaces:
   __builtin__
   __main__ the main, top script
   namespaces of various functions
   innermost scope: the namespace local to the function, where local names are directly accessible, meaning unqualified

Data attributes override method attributes with the same name;
--> causes hard-to-find bugs
Suggestions:
capitalize method names
prefix data attribute names with something
verbs for methods and nouns for data attributes.

 

Setting and getting attributes indirect key


the_key = "a"
the_value = "a value"
setattr(c,the_key,"a value")
# is same as:
c.a = "a value"

v = getattr(c, the_key)
# is same as:
v = c.a

 

 

Context Manager

Class with a context manager:


class ClassWithContextManager():

    def __init__(self, username, password, connect_string):
        self.username = username
        self.password = password
        self.connect_string = connect_string

    def __enter__(self):
        print("Entering the context...")
        self.connection = whatever_package.connect(self.username, self.password, self.connect_string)
        return self.connection

    def __exit__(self, exc_type, exc_value, exc_tb):
        self.connection.close()
        print("Closing the context...")

 

 

Object Oriented Programming OOP

Terms:

Design Principles:

A class has

 

Decrease coupling:



 

Functional Programming

Functional programming allows parallel execution; Functional code is much easier to parallelize.
Less physical memory and CPU restrictions given that execution is spread out
Distribute functional code across an cluster of computers
Data should be manipulated by functions without maintaining any external state
Code should avoid global variables and always return new data instead of manipulating the data in-place.

 

Architecture Design

Infrastructure layer: databases, src and tgt
Adapter layer: access the instructure. Put the code for connecting to the data
Application layer: features of the application, in three parts: extract, transform, load
a main fctn as entry point with parameters, initialization, and run
Domain layer: entities, objects

 

Inheritance

class DerivedClassName(BaseClassName):
    def method_with_same_name_overwrites(param):
        return super().method_with_same_name_overwrites(param)

The method resolution order means that we go up the inheritance chain until we find the definition we are looking for


isinstance(instance, subclass)
isinstance(instance, superclass)
issubclasse(subclass, superclass)

 

Abstract Class

NOT TRIED

from abc import ABC, abstractmethod
# ABC = Abstract Base Class
# An abstract method has a declaration but no implementation,
# and needs an implementation in each inherited class

class abs_cls(ABC):
    @abstractmethod
    def abs_method(self):
        pass

 

Protocol

(since version 3.8)

NOT TRIED


from typing import Protocol
# The protocol basically defines the interface

class prot_cls(Protocol):
    def prot_method(self):
        ...   # yes, three dots

Then classes that implement this protocol have to have the methods. But they do not inherit from the protocol class. The protocols are like templates.

 

Getters, setters, deleters


# Property decorator (getter method)

@property
def a_fctn_that_will_appear_as_an_attrib(self):
# I can call this function without parentheses
Print(obj_instance.a_fctn_that_will_appear_as_an_attrib)

# Setter method

@the_method_that_looks_like_an_attribute.setter
def the_method_that_looks_like_an_attribute(self, param):
    self.a = ... param ...
# I can use "the_method_that_looks_like_an_attribute" as if it where an attribute:
obj_instance.the_method_that_looks_like_an_attribute = 'asdfa'

# Deleter method (to delete a property)

@the_method_that_looks_like_an_attribute.deleter
def the_method_that_looks_like_an_attribute(self):
    self.a = None
del obj_instance.the_method_that_looks_like_an_attribute


 

Sneaky behavior to be aware of.
The call_another() for the child object calls the the_other() method from the CHILD class, not from the parent class where the call_another() function is found.
Notice the self in self.the_other() call
To help understand this when doing a diagram, "pull over" the un-overwritten methods into the child classes. In particular, look for methods in the parent classe containing "self.method...()".

Let's call these virtual child methods "ghosts".

When analyzing, concentrate on the child methods that overwrite a parent method. Be sure to understand which object they are called by: child or parent.

main.pychild_kls.pygeneric_kls.py


import child_kls          <============

if __name__ == '__main__':
    chld = child_kls.ChildKls() ======>
    prnt = generic_kls.GenericKls() ==>






    rslt = chld.call_another() =======>
    # call_another() 
    # is NOT in the child object

    print(rslt)
    # rslt shows "From child class"
   





    rslt2 = prnt.call_another() =======>

    print(rslt2)
    # rslt2 shows "From parent class"

    # Behavior depends 
    # on which class is called
   

import generic_kls    <===============

class ChildKls(generic_kls.GenericKls):


    def __init__(self):
        super().__init__() ================>


    def the_other(self): == OVERWRITES ==>
        return "From child class"
                             /\
                             ||
==> def call_another(self):  ||
        return self.the_other()
    # No call_another() method here
    # but the code acts 
    # as if it were copied here

    # The preceding method is in light gray 
    # to indicate that it is not coded 
    # as part of the child object, 
    # but is virtually present.
    # This is the ghost method.

=============================================>

class GenericKls:





    def __init__(self):
        pass

    def the_other(self):
        return "From parent class"
                            /\
<= VIRTUAL ==               ||
    def call_another(self): ||
     /\ return self.the_other()
     || # if self is the child object
     || # then this calls the 
     || # the_other() method
     || # from the child object!
     ||
     ||
     ||
     ||
     ||
     ||
=======

ghost calls ghost: not an issue:

main.pychild_kls.pygeneric_kls.py


import child_kls          <============

if __name__ == '__main__':
    chld = child_kls.ChildKls() ======>
    prnt = generic_kls.GenericKls() ==>






    rslt = chld.call_another() =======>

    print(rslt)
    # rslt shows "From parent class"
   

    rslt2 = prnt.call_another() ======>

    print(rslt2)
    # rslt2 shows "From parent class"

    # No difference in behavior
   

import generic_kls    <===============

class ChildKls(generic_kls.GenericKls):


    def __init__(self):
        super().__init__() ================>


    def the_other(self):
        return "From parent class"
                            /\
                            ||
==> def call_another(self): ||
        return self.the_other() 




===========================================>

class GenericKls:





    def __init__(self):
        pass

    def the_other(self):
        return "From parent class"
                            /\
                            ||
    def call_another(self): ||
        return self.the_other()
      /\
      ||
      ||
      ||
========

non-ghost calls ghost: not an issue:

main.pychild_kls.pygeneric_kls.py


import child_kls          <============

if __name__ == '__main__':
    chld = child_kls.ChildKls() ======>
    prnt = generic_kls.GenericKls() ==>






    rslt = chld.call_another() =======>

    print(rslt)
    # rslt shows "From parent class"
   

    rslt2 = prnt.call_another() ======>

    print(rslt2)
    # rslt2 shows "From parent class"

    # No difference in behavior
   

import generic_kls    <===============

class ChildKls(generic_kls.GenericKls):


    def __init__(self):
        super().__init__() ================>


    def the_other(self):
        return "From parent class"
                            /\
                            ||
==> def call_another(self): ||
        return self.the_other()




===========================================>

class GenericKls:





    def __init__(self):
        pass

    def the_other(self):
        return "From parent class"
                            /\
                            ||
    def call_another(self): ||
        return self.the_other()
      /\
      ||
      ||
      ||
========

non-ghost calls non-ghost: different results, but not sneaky

main.pychild_kls.pygeneric_kls.py


import child_kls          <============

if __name__ == '__main__':
    chld = child_kls.ChildKls() ======>
    prnt = generic_kls.GenericKls() ==>






    rslt = chld.call_another() =======>

    print(rslt)
    # rslt shows "From child class"
   

    rslt2 = prnt.call_another() ======>

    print(rslt2)
    # rslt2 shows "From parent class"

    # Difference in behavior
    # but it is not sneaky, because
    # I know I am calling the child
    # and I am getting results
    # consistent with that class
   

import generic_kls    <===============

class ChildKls(generic_kls.GenericKls):


    def __init__(self):
        super().__init__() ================>


    def the_other(self):
        return "From child class"
                            /\
                            ||
==> def call_another(self): ||
        return self.the_other()




===========================================>

class GenericKls:





    def __init__(self):
        pass

    def the_other(self):
        return "From parent class"
                            /\
                            ||
    def call_another(self): ||
        return self.the_other()
      /\
      ||
      ||
      ||
========

ghost calls method not in parent: not an issue:

main.pychild_kls.pygeneric_kls.py


import child_kls          <============

if __name__ == '__main__':
    chld = child_kls.ChildKls() ======>
    prnt = generic_kls.GenericKls() ==>






    rslt = chld.call_another() =======>

    print(rslt)
    # rslt shows "From parent class"
   







   

import generic_kls    <===============

class ChildKls(generic_kls.GenericKls):


    def __init__(self):
        super().__init__() ================>


    def the_other(self):
        return "From parent class"
                            /\
                            ||
==> def call_another(self): ||
        return self.the_other() 






class GenericKls:





    def __init__(self):
        pass





    def call_another(self):
        return self.the_other()
        # undefined, can't do it, 
        # unless you do things dynamically




non-ghost calls method not in parent: not an issue. The different names make it clear.

main.pychild_kls.pygeneric_kls.py


import child_kls          <============

if __name__ == '__main__':
    chld = child_kls.ChildKls() ======>
    prnt = generic_kls.GenericKls() ==>









    rslt = chld.call_another() =======>

    print(rslt)
    # rslt shows "From child class"
   


    rslt2 = prnt.call_another() ======>

    print(rslt2)
    # rslt2 shows "From parent class"

    # No difference in behavior
   

import generic_kls    <===============

class ChildKls(generic_kls.GenericKls):


    def __init__(self):
        super().__init__() ================>


    def the_other(self):
        return "From child class"
                            /\
                            ||
                            ||
                            ||
                            ||
==> def call_another(self): ||
        return self.the_other()





===========================================>

class GenericKls:





    def __init__(self):
        pass




    def something_else(self):
        return "From parent class"
                            /\
                            ||
    def call_another(self): ||
        return self.something_else()
      /\
      ||
      ||
      ||
      ||
========

 

 


Date and Time

  

TO DO:
Group by:
Convert from date or time or datetime to string, including timezone
Display a time or a date (what does not show in above)
Convert from string to date or time, including tz
Add, substr days, hours
Display delta
Convert delta to int: first to seconds with timedeletavar.total_seconds()
Then to minutes , hours, etc

Current date or time:

import datetime as dt
dt.datetime.now()

import time
time.strftime("%Y-%m-%d", time.localtime())

Naive means object has not timezone information, aware means it does
A datetime object d is aware if both of the following hold:
   d.tzinfo is not None
   d.tzinfo.utcoffset(d) does not return None. Notice "d" as parameter
A time object t is aware if both of the following hold:
   t.tzinfo is not None
   t.tzinfo.utcoffset(None) does not return None. Notice "None" as parameter

import time
curr_dttm = time.strftime("%Y%m%d") + "_" + time.strftime("%H%M%S")
Inverse is strptime(date_string, format)
I think this works better:
from datetime
curr_dttm = datetime.datetime.strftime(datetime.now(), "%Y%m%d") + "_" + datetime.datetime.strftime(datetime.now(), "%H%M%S")
datetime.datetime.isoformat(sep='T', timespec='microseconds') # timespec in ['auto', 'hours', 'minutes', 'seconds', 'milliseconds', 'microseconds']. Do not use utcnow() because some methods are naive

datetime.fromtimestamp(timestamp, tz=timezone.utc) returns datetime from a POSIX timestamp
datetime.timestamp() is inverse

datetime.fromisoformat(date_string) and datetime.isoformat() are inverse operations
ISO format: YYYY-MM-DD[*HH[:MM[:SS[.fff[fff]]]][+HH:MM[:SS[.ffffff]]]] where * can match any single character.
datetime.astimezone(tz=None) # if tz is None, then return in the system timezone
import time; time.time() # current epoch time

from module datetime:
class datetime.date # Naive date
class datetime.time # time independent of date, includes attribute tzinfo
class datetime.datetime # date and time, includes attribute tzinfo. In the constructor, the year, month, and day are mandatory, hour, sec etc default to 0
class datetime.timedelta # Contructor: datetime.timedelta(days=0, seconds=0, microseconds=0, milliseconds=0, minutes=0, hours=0, weeks=0). Arguments may be <0 or > 0
class datetime.tzinfo

from datetime import datetime # datetime objects

datetime.date(year, month, day) returns a date
datetime.time(hour, minute, second, microsecond, tzinfo)
datetime.datetime(year, month, day, hour, minute, second, microsecond, tzinfo)

note: Objects of the date type are always naive, meaning that they are not aware of the time zone

datetime.today() # class method
datetime.now([tz])
datetime.utcnow()
datetime.combine(date, time)

dt of type datetime:

dt.strftime(format)
dt.year # class attribute
dt.month
dt.day
dt.hour
dt.minute
dt.second
dt.microsecond
dt.tzinfo
dt.date() # instance method
dt.time()
dt.timetz()

import time
t1=time.perf_counter()
.......
t2=time.perf_counter()
print (t2-t1) # in seconds

dt + timedelta
dt - timedelta
dt2 - dt1 --> timedelta
CR # 11927
dt2 < dt1

from datetime import timedelta

timedelta([days[, seconds[, microseconds[, milliseconds[, minutes[, hours[, weeks]]]]]]])
All arguments optional / default to 0. Arguments may be ints, longs, or floats, and may be positive or negative. Down to microsecond resolution.
example: timedelta(weeks=40, days=84, hours=23)

Operations: + -
* integer or long
t2 // integer
+t
-t
abs(t)
str(t), repr(t)

from datetime import date

date.today() == date.fromtimestamp(time.time())

dt of type date:
dt.year dt.month dt.day
dt.weekday : 0=Mon, 6=Sun
dt.isoweekday : 1=Mon, 7=Sun
dt.isocalendar() --> 3-tuple, (ISO year, ISO week number, ISO weekday)
dt.isoformat() --> date in ISO 8601 format, 'YYYY-MM-DD'
dt.strftime(format)

dt + timedelta
dt - timedelta
dt2 - dt1 --> timedelta
dt2 < dt1

import time
time.sleep(5) # sleep 5 seconds

Convert delta to int: first convert to seconds with timedeletavar.total_seconds(), then to minutes , hours, etc
Or
Divide directly with timedelta(days_or_hours_or=1):
Timedlta_var / timedelta(days=1)
Timedlta_var / timedelta(hours=1)
Timedlta_var / timedelta(minutes=1)

documentation on datetime

datetime

datetime.strptime(a_date, "%Y-%m-%d").date()
# .date() to get just the date, excluding time

# from date object to string:
date_obj.strftime("%Y-%m-%d")

# set a date object:
datetime(2022, 1, 1).date()

 

(dt.datetime.combine(dt.datetime.now().date(), dt.time(0,0)) + td).strftime("%H:%M")

 

Use time.perf_counter() (not time.time()) for time stamps in analyzing out performance.

 


CSV Format

  

import csv
#https://docs.python.org/3.4/library/csv.html
fn_in = "test_csv_in.csv"
fn_out = "test_csv_out.csv"
with open(fn_in, newline='') as fi:
    with open(fn_out, 'w', newline='') as fo:
        lines_in = csv.reader(fi, delimiter=',', quotechar='"')
        lines_out = csv.writer(fo, delimiter=';', quotechar='|', quoting=csv.QUOTE_MINIMAL)
        for row_in in lines_in:
            lines_out.writerow(row_in)

CSV Format

Two sections with CSV: merge


import csv
#https://docs.python.org/3.4/library/csv.html
fn_in = "test_csv_in.csv"
fn_out = "test_csv_out.csv"
with open(fn_in, newline='') as fi:
    with open(fn_out, 'w', newline='') as fo:
        lines_in = csv.reader(fi, delimiter=',', quotechar='"')
        lines_out = csv.writer(fo, delimiter=';', quotechar='|', quoting=csv.QUOTE_MINIMAL)
        for row_in in lines_in:
            lines_out.writerow(row_in)

  

Excel File


SQL Output and Parameters

  

import sqlite3

db_name = "test_sql2cvs.db"
conn = sqlite3.connect(db_name)
cr = conn.cursor()
p = (3,6,)
cr.execute('select * from a where i in (?,?) ', p);
print([col_desc[0] for col_desc in cr.description]) # column descriptions
for one_row in cr:
    print(one_row)

Use connection.executemany("insert into a_table (a,b,c) values (?,?,?)", list_of_tuples); for bulk insert

# # Larger example
# for t in [('2006-03-28', 'BUY', 'IBM', 1000, 45.00),
#         ('2006-04-05', 'BUY', 'MSFT', 1000, 72.00),
#         ('2006-04-06', 'SELL', 'IBM', 500, 53.00),
#         ]:
#     c.execute('insert into stocks values (?,?,?,?,?)', t)

MySQL

import pymysql, pandas as pd
db_conn = pymysql.connect(host=db_name, user=mysql_username, password=mysql_password, database=db_schema,charset="utf8")
the_params = {"abc":var1, "defg": var_b + "%"}
the_sql = "select ... where col1 = %(abc)s and col_b like %(defg)s ;"
just_assets = pd.read_sql_query(the_sql , db_conn, params=the_params)
# or (not tested yet) the_params = (var1, var_b + "%")
the_sql = "select ... where col1 = %s and col_b like %s ;"
Question: how to escape the regular "%"? "%%" does not seem to work.

An error occurs using the connector from mysql.connector

Note that "--," throws an error in MySQL. Put a space after the dash-dash


Matplotlib

  

http://matplotlib.org/users/pyplot_tutorial.html
c:\Python27\Scripts\pip.exe install matplotlib
sudo yum install python-matplotlib

import matplotlib.pyplot as plt
...
plt.plot(x, y) # line plot
plt.scatter(x, y) # scatter plot
plt.hist(x) # histogram
plt.hist(x, bins=20) # histogram with 20 buckets (default is 10)
plt.imshow(a_matrix) # plots a matrix as an image
plt.imshow(a_matrix, cmap='gray') # plots a matrix as an image in gray scale
plt.xlabel("...")
plt.ylabel("...")
plt.title("...")
plt.show()

r=np.random.randn(10000,2)
plt.scatter(r[:,0],r[:,1])

 

plot.savefig() to write to file
subplot(nrows, ncols, plot_number)

MNIST dataset: https://kaggle.com/c/digit-recognizer

 


tkinter

  

http://effbot.org/tkinterbook/tkinter-index.htm
https://wiki.python.org/moin/TkInter
http://www.tcl.tk/man/tcl8.4/TkCmd/contents.htm
import Tkinter
#capital T in py 2, lowercase in py 3

import tkMessageBox
tkMessageBox.showinfo(title, text)
# instead of showinfo: showwarning, showerror, askquestion, askokcancel, askyesno, and askretryignore

#### simple GUI
from Tkinter import *
root = Tk() # top level window is called root by convention
my_frame1 = Frame(root)
my_frame1.pack() # 3 geometry managers: pack, grid, place
#add this after packing the container:
b1 = Button(my_frame1)
b1["text"]= "Hi there"
b1["background"] = "green"
b1.pack()
root.mainloop() # waiting

#### same GUI, but with a class
from Tkinter import *

class App:
    def __init__(self, the_parent):
         self.parent = the_parent # remember the parent
         self.my_frame1 = Frame(self.parent)
         self.my_frame1.pack()

        self.b1 = Button(self.my_frame1)
        self.b1["text"]= "Hello, World!"
        self.b1["background"] = "green"
        self.b1.pack()
        self.b1.bind("<Button-1>", self.b1Click)

        self.b2 = Button(self.my_frame1)
        self.b2.configure(text= "Hello, World!", background = "green")
        self.b2.pack()
        self.b2.bind("<Button-1>", self.b2Click)

        self.b3 = Button(self.my_frame1, text= "Hello, World!", background = "green")
        self.b3.pack()
        self.b3.bind("<Button-1>", self.b3Click)

root = Tk()
app = App(root)
root.mainloop()

# binding
widget.bind(event_type_name, event_handler_name)

# Buttons
width attribute is in characters
command handler expects button press AND button release
padx,pady: string with unit. "2m" is 2 mm

# Frame
parameters
borderwidth=5
relief=RIDGE
height=50 width=50 # Note: often ignored when widgets are added
background="white"

# Frame packing
frames has internal and external padding: ipadx,ipady and padx,pady
The padding is specified when packing
Internal is around the widgets
side=TOP | BOTTOM | LEFT | RIGHT
fill=BOTH
expand=YES

Note:
when packing, there is a cavity
As widgets are added, they claim area in the cavity, but do not use is all.
With option expand=YES, it claims the whole area, but will not use all the area
With option fill, it will also grow to use the whole area. fill=X, fill=Y, or fill=BOTH. fill=NONE does not let it grow.
With option anchor="N", the widget's position will be at top. Other values: NE (north east), CENTER, ...
See tt100.py in the thinking_in_tkinter directory


Database Connections

  

http://www.python-course.eu/sql_python.php
import sqlite3
cn = sqlite3.connect("a_db.db")
cr = cn.cursor()
cr.execute(sql_cmd)
cn.commit()
cn.close()

odbc:

import pyodbc
sources = pyodbc.dataSources()
print( sources.keys() ) # show the defined ODBC sources
# I had installed Python 32 bit and was trying to connect via ODBC 64 bit.
# I re-installed Python, but the 64 bit version. It is working.

C:\Python37\scripts\pip.exe install cx_oracle
import cx_Oracle # Upper case "O"

Note that the ocbc or oracle connection objects must have the same architecture (32 bit or 64 bit) as the version of Python
For some reason, the 32-bit version of Python was first installed (surprising in this day and age - 2019). I forced the installation of 64 bit Python and it worked
def is32or64():
  import sys
  if sys.maxsize==2147483647:
    print("Python 32bit version")
  elif sys.maxsize==9223372036854775807:
    print("Python 64bit version")
  else:
    print("sys.maxsize=",sys.maxsize)

Sample Query Call

Python code:


import cx_Oracle

def call_myquery(qry: string, params: dict):
    # params is a dictionary in the form {"bind1": data1, "bind2": data2}
    # The SQL has the bind variables as follows
    # select ... from ... where col1 = :bind1 and col2 = :bind2
    # Do not put quotes around the bind variables
    # No ';' at the end of the query
    with cx_Oracle.connect(username, password, "host:port/database") as connection:
        with connection.cursor() as cursor:
            try:
                results = cursor.execute(qry, params)
                cols = [fld[0] for fld in cursor.description]
                # cursor.description contains
                # name, type, display_size, internal_size, precision, scale, null_ok
                # for each column
                # do fld[0] to get just the name
                rslt_lst = [{c: r for c,r in zip(cols, row)} for row in results]
            except Exception as e:
                print(f"Error on executing {qry} due to {e}")
        # cursor.close() not needed with the "with" statement
        # connection.close() not needed with the "with" statement
    return rslt_lst


qry = "select ... where a like :the_bind_var|| '%' "
params = {"the_bind_var": "a value"}
rslt = call_myquery(qry, params)


  

  

Sample Call to Stored Procedure

Stored Procedure:


CREATE OR REPLACE PROCEDURE myproc (a IN VARCHAR2, c IN OUT SYS_REFCURSOR) AS
BEGIN
  OPEN c FOR SELECT * FROM myschema.mytable WHERE aa = a;
END;
/

Python code:


import cx_Oracle

def call_myproc(a, b):
    with cx_Oracle.connect("username", "password", "hostname:port/service_name") as connection:
        with connection.cursor() as cursor:

            # Declare a cursor variable for the OUT parameter
            result_set = cursor.var(cx_Oracle.CURSOR)

            try:
                cursor.callproc('myproc', [a, b, result_set])
                result_cursor = result_set.getvalue()

                # Iterate over the result set and print the values
                for row in result_cursor:
                    print(row)
                # or to fetchall()


call_myproc('value_for_a')

  

  


Numpy

  

import numpy as np
a = np.array([asdf...])       # Note: Cannot do .append() with np array. Access first element with a[0]
a.shape gives the shape
Just a few reminders: a vector has one dimension, a matrix has two.

Element-wise operations: + * ** np.sqrt(a) np.log(a) np.exp(a)
Element-wise product requires that operands have the same shape.
a*b # list where each element is ai * bi This is an element-wise product.

A+4 # this is "broadcasting" where 4 is added to all the elements of A

np.dot(a,b) = sum(ai * bi) Dot product, or inner product
This is a dot product of two vectors and assumes that assumes both have the same length.
Dot product of two matrices A*B requires that the inner dimensions match.
np.dot(a,b) or a.dot(b) or b.dot(a) or a@b # note that a * b is element-wise multiplication, not the dot product
np.inner(a,b) is the same as np.dot(a,b)
np.sum(array) or a.sum() are the sum of the elements
np.outer(a,b) # outer product (like a cartesian product)
np.linalg.norm(a) # the magnitude of a: |a| or sqrt(sum(a * a))
np.linalg.inv(a) # inverse of a a.dot(np.linalg.inv(a)) gives the identity matrix (a must be square). The identity matrix has 1 on the diagonal, 0 elsewhere.
a.T is transpose. Note: vector.T == vector
np.linalg.det(a) # determinant of a
np.diag(2D_array) # diagonal: a vector of the elements on the diagonal
np.diag(1D_array) # a 2-D array of the elements on the diagonal, with 0 elsewhere
np.trace(a) # sum of the diag : np.diag(a).sum()
np.allclose(a,b) # the difference between the two parameters is small (but not necessarily zero)

two dimensional array (using matrix() is not recommended, use array() instead)
m = np.array([ [...] , [...] ])
Access with m[i][j] or m[i,j] By convention, the first index is row, second is column
m[:,1])  # --> all rows, second column (column 1). Read ":" as "all"
m[:,:8])  # --> all rows, first 9 columns
m[:,:-2])  # --> all rows, all but last 2 columns
m[:,-2:])  # --> all rows, last 2 columns

Generate arrays:
np.array(a list)
np.zeroes(n) or np.zeroes((n,m)) # argument is a tuple
np.ones(n) or np.ones((n,m))
np.eye(n) identity matrix
np.random.random((n,m)) # argument is a tuple
np.random.randn(n,m) # gaussian distribution. Note that argument is NOT a tuple as above
.mean() .var()
np.random.multivariate_normal(mean=np.array([1,2]), cov=np.array(2x2array), size=1000)
np.linspace(start_point, end_point, number_of_points) # create a list of points. Example: np.linspace(0,1,11)
nparray.reshape(n,m) # with n,m as new dimensions

eigenvalue/vector (http://setosa.io/ev/eigenvectors-and-eigenvalues/)
Convention: rows are the samples, columns are the features. E.g. np.random.randn(100,3) : 100 samples, 3 features
np.cov(a) gives the co-variance (you might have to do np.cov(a.T)
Reminder: symmetric means a=a.T; hermitian means a=aH which is the conjugate transpose of A
eigenvalues,eigenvectors=np.linalg.eig(a) # returns a tuple
eigenvalues,eigenvectors=np.linalg.eigh(a) # for symmetric and Hermitian matrices

Solve linear system Ax=b
np.linalg.inv(A).dot(b) or np.linalg.solve(A,b)

read file
for line in open("the_file.csv"):
    row = line.split(',')
    sample = map(float, row)
    X.append(sample)
X=np.array(X)

Other

Convert to matrix: import pandas as pd
df=pd.read_csv("the_file.csv", header=None, sep=",").as_matrix()
as_matrix is optional

 

Y=np.fft.fft(y)

Scipy

nltk.download() # manage resources for nltk package

  

  

  


Pandas and DataFrames

  

import pandas as pd
import numpy as np

 

 

 

 

Load Data

Read to a data frame, with a previously opened connection
df = pd.read_sql("SELECT top "+str(NBR_ROWS)+" * from the_table ;", cn)

Read and write CSV:
df = pd.read_csv("the_file.csv", header=None, sep=",")
df.to_csv("filename.csv", index = False, header=True, sep="\t", compression="gzip")
Parameter header is the row with the column headers: 0 for first row, None for no headers. If 1 or more, the previous rows are ignored
Parameter sep is the column separation
With index=False removes the added index column
Note: engine="python" (default is C) to get other options
df.to_excel('filename.xlsx') # Export data frame to Excel. Note that Excel does not handle special characters very well.

df = pd.json_normalize(something_in_json_format) # Normalize semi-structured JSON data into a flat table (gives structure.element notation)

In the department of useless error messages:
ValueError: only single character unicode strings can be converted to Py_UCS4, got length 0
It seems that this happens when loading a csv into Excel with sep="" (empty separator)!

 

 

Data frames and SQLite

import sqlite3
import pandas as pd
connlite = sqlite3.connect(the_sqlite_db_name)
r = pd.read_sql_query(sql_cmd, connlite, params=sql_params) # sql_params is a tuple like (1234,)
connlite.close()

 

 

 

Create dataframes

d = pd.DataFrame([['a','r','t',3,5],['a','r','t',-1,-3],['a','r','s',2,4]])

# or

df = pd.DataFrame([[1,2.2,"abcd"],[2,3.5,"efgh"],[3,6.3,"ij"]])
df.columns=["a","b","c"]

 

 

Show

df
df.info()    # gives some basics about the data
df.head(10)  # shows first 10 rows
df.tail(10)  # shows last 10 rows
df.columns  # the columns
df.shape  # see the number of rows and columns
df.columns.values
df.index.values
len(df.index) # ==> number of rows

df.values.tolist() # Transform into a list of lists, in the form of a list of rows (as opposed to dataframes which are organized as a list of columns). I think this can be used for bulk SQL insert (but I have to test this)
df.columns.values.tolist() # List of column headers
[df.columns.values.tolist()]+df.values.tolist() # Make it look like a table (verify this snippet, not sure it works)

 

 

Examples

d = pd.DataFrame({ 'n': [1,2,3,4,5,6,7,8,9,10]
               , 'a': ['boat','boat','boat','plane', 'plane','boat','boat','boat','plane', 'plane',]
               , 'b': ['red','red','red','blue','blue','red','red','red','blue','blue']
               , 'c': ['big','big','small','small','small','big','big','small','small','small']
               , 'x': [3,4,-1,5,6,-1,5,6,3,4]
               , 'y': [2,-3,-2,2,1,2,2,1,0,4]})
d.groupby(by=['a','b','c'])['x','y'].sum()
d['x'].rolling(3).sum()
d.sort_values(by=['n'], ascending=False)
d.sort_values(by=['n'], ascending=False).groupby('a')['x','y'].rolling(3).sum()
d.sort_values(by=['n'], ascending=False).groupby(['a','b'])[['x','y']].rolling(3).sum()

 

 

Get Columns and Elements

Get the element on the third row of a column 'AA'. Notice square brackets.
Notice index starts at 0. This assumes reset_index or numbering 0,1,2, ...
df['AA'].iloc[2]
df.iloc[:,0] # returns the first column. Notice the "i" in "iloc"
df.iloc[0,:] # returns the first row. Also with "iloc"

The following leads to "chained indexing":
df = df [ ['ISIN', 'Date']]
Do this instead:
df = df.loc[:,['ISIN', 'Date']]
Note:

df[["col2", "col5", "col1", "col2"]] # Reorder columns, repeats are allowed (notice double square brackets)

Copy only the selected columns
col_lst = ["a", "b", "c", "src"]
df_copy = df[col_lst].copy()
or
df_copy = df[["a", "b", "c", "src"]].copy()

Rename columns:
df.rename(columns={'old_col1':'new_col1', 'old_col2':'new_col2'})

 

 

Append

Append one to another (same structure)
df = pd.concat([df1,df2])
df = pd.concat([df, df1.loc[:,non_null_raw_columns_hs]])
Do NOT do this: df = df.append(df1[non_null_raw_columns_hs])

 

 

Updates

In place, vs assign to new dataframe (however, not all functions allow "inplace")
df = df.something(...)
df.something(..., inplace=True)

Assign to all rows
df['Country'] = "aaa"

Changes to a single column
df["str_col"] = df["str_col"].str.title()

Update Existing vs New Column:
df['c'] = df['c'].str.upper() # modify existing column as upper case of existing column
df['cc'] = df['c'].str.upper() # add a new column as upper case of existing column

Add a field
df['newcol'] = "the_source"
df['newcol'] = df['firstcol'] + df['secondcol']
df["meas_ratio"] = df["meas1"] / df["meas2"] # no error if null

Replace nulls with something, but only for certain rows:
columns_for_which_null_should_be_na = {"A": 0, "B": 'N.A.'} # "A" ... are the column headers
df.fillna(value=columns_for_which_null_should_be_na)

DataFrame.fillna(value=None, method=None, axis=None, inplace=False, limit=None, downcast=None)[source]

df.isna() equiv to df.isnull()

 

 

Apply

.apply(f, axis=1) # f is a function that should have one parameter which is a row
Note: according to one person on web, apply is slower than a loop
df['newcol'] = df.apply(lambda row: row['firstcol'] * row['secondcol'], axis=1)
df['Discounted_Price'] = df.apply(lambda row: row.Cost - (row.Cost * 0.1), axis=1)
Equivalent without "apply":
df['Discounted_Price'] = df['Cost'] - (0.1 * df['Cost'])

 

 

Join, Merge, and Update

Update

Update columns in df based on indices in df_other. DOES NOT JOIN, it just copies data in order of the index
df.update(df_other)
By default: join='left', overwrite=True
If I want to only overwrite the NA, then set overwrite=False
A filter_func allows further filtering

DataFrame.update(other, join='left', overwrite=True, filter_func=None, errors='ignore')

Join two dataframes by index

Parameter "on" (type str or list of str): column(s) to join. If None, then joins index-on-index
Other parameters are : lsuffix='', rsuffix='', sort=False
Options for "how" are: 'left' (default), 'right', 'outer', 'inner'
When not joined, values show as NaN
df.join(df_other, on=None, how='left')
Force a key
df.join(df_other.set_index('key'), on='key', how='left')
Uses df_other's index but any column in df: this preserves the original DataFrame's index in the result.
df.join(df_other, on="original_index", rsuffix="_joined", how="left")
df.join(df_other.set_index('col1'), on='col1', how='left')

Merge

Join two dataframes by specified columns (as opposed to join that joins with indexes)
df.merge(df_other, how="inner", left_on=["a", "b"], right_on=["aa", "bb"], suffixes=("_left", "_right"), copy=False, indicator=True)
Parameter copy=True by default, but I am guessing a False leads to less issues, based on the doc
The indicator gives the source of the column.

df.merge( df_other \
        , left_on  = ["cola", "colb", "colc"] \
        , right_on = ["colA", "colB", "colC"] \
        , suffixes = ("", "_right") \
        , how = "left" \
        , copy = True  \
        )

 

 

Index

 

 

Set the index to one of the columns:

df.set_index("col") # The column is removed
df.set_index(["col1", "col2"]) # The columns are removed
df.set_index("col", drop=False) # The column is kept as a column too
df.set_index(pd.Index([..list of values..])) # Create the index from a list of values (needs intermediate step of making it an Index object)

Keep the old index, and re-apply when needed (to be confirmed):
original_indx = df.index
df.reset_index(inplace = True)
df.set_index("original_indx")

Redo the index

df.reset_index(inplace=True)
For example, after concatenating dataframes, the original indexes are kept.
Otherwise the indices from each original df will show 0 multiple times, 1 multiple times, ...
Do reset_index after append so as to have a unique index
And, as a reminder because it is not intuitive, the indexes identify the rows in Python
df.reset_index(drop=False) # The old index becomes a column

 

 

Functions

 

 

 

Filter, Sort, and Group By

Filter

df[df["col"]=="aa"] # returns rows with "aa" in col
df[len(df[a col])<4]
df[df["col"]=='a_value']
df[df["adate"]<date_limit] # I formatted as a string yyyy-mm-dd

SQL-like queries of the dataframe
df = df.query("col < 4.5 & col > 4")

df.mask is similar to df.where but careful: it affects the elements for which the condition is false

df.isin(("val1", "val2", "val3", ...)) # ==> true where element in the values

Drop rows based on filter

df.drop(some labels)
df.drop(df[<some boolean condition>].index)
df.drop(df[df["col"]=="aa"].index) # drops rows with aa in col
df.drop(df[..see examples under 'filter' above..].index)

Aggregation

df.sort_values(by=["cold"], ascending=True).groupby(by=["cola", "colb", "colc"])[["meas1", "meas2"]].sum()

.groupBy sorts by default. Therefore sorting before grouping does not make sense
Group by does three things: split a table into groups, apply some operations to each of those smaller tables (i.e. the aggregation function), and combine
In "for g,f in df.groupBy(..)..:", the resulting dataframe has two parts: the groups and the individual frames.

df.sort_values(by=["cold"], ascending=True).groupby(by="col")["meas"].sum()
Aggregations include:

It may be necessary to add .reset_index() because the columns of the groupby become the index.
reset_index() # makes them columns again
df.groupby(by=["a", "b"])[["m1", "m2"]].sum().reset_index()

Same, but without group by, in other words across all rows:
df[["col1", "col2"]].agg('max')
df["col"].max()

Split

Split a dataframe into multiple dataframes based on the value of a column:

for g,d in df.groupby('col1'):
    print(g,":\n",d) # g has one of the group-by values, d has the corresponding values
    # note that the original index is kept

Unstack

df=dtframe.set_index(['col1', 'col2'])
df.unstack(level=n) # where n corresponds to the index elements above (0 to index length - 1). Default -1.

Sort

df.sort_values(by="the_index", axis="index")
df.sort_values(by=["a", "b"], axis="index")

 

 

Series

 

 

Misc. Operators

String methods

df["str_col"].str.lower()
Note that .str is a method for series: works when the data type of a column is string
df["col"].str.title() # title case
df["col"].str.lower()
.str.contains("a-str") # Returns a true or false for each row

 

 

Logical operators

import numpy as np
np.logical_and(arr1, arr2)
np.logical_or(arr1, arr2)

df.pivot cannot handle aggregation
df.pivot_table has to have numerics
df.unstack is similar

 

 

Date time

import datetime print(datetime.datetime.isoformat(datetime.datetime.now(), sep='T', timespec='microseconds'))

 

 

Numpy

import numpy as np
np.select([cond1, cond2, ...], [exp1, exp2, ...], default=exp3)
np.where(cond1, exp1, np.where(cond2, exp2, ...))
# use numpy.where:
df[col3] = np.where((df[col1]-df[col2])>0, df[col1]-df[col2], 0)
# equiv to if col1>col2 then return col1-col2 else return 0
df["aaaaaa"] = np.select(
        [ df["col1"] == 'a value'
                , np.logical_and(df["col2"] == 'a value',  df["col3"].isin(('a','b')))
                , np.logical_and(df["col2"] == 'a value', ~df["col3"].isin(('a','b')))  # not in
                ]
                ,
                [ df["cola"]
                , df["colb"]
                , df["colc"]
                ]
                , "default"
                )

Fast Fourier Transform:
Y=np.fft.fft(y)

 

 

Other Snippets

def show_progress(last_ts=None, label=None):
    ts=time.perf_counter()
    if last_ts:
        print("Elapsed seconds:",ts-last_ts," Last time: ",str(last_ts), " Current time: ",str(ts), " - ", label)
    else:
        print("Current time: ", str(ts), " - ", label)
    return ts

 

 

Connect to SQL Server

import pyodbc
cn = pyodbc.connect('DRIVER={ODBC Driver 17 for SQL Server};SERVER='+sqls_svr+';DATABASE='+sqls_db+';Trusted_Connection=yes')
csr=cn.cursor()
csr.execute("SELECT @@version as version;")
cn.close()
df = pd.read_parquet("file_or_directory")
df.to_csv("output_file", index=False, sep="\t", compression="gzip")   # index=False suppresses the added index column
# compression to .gz

df['new_date'] = df.year.astype(str) + '-' + df.month.astype(str).str.zfill(2) + '-' + df.day.astype(str).str.zfill(2)

df.dtypes : returns the types

 

  

  


PySpark

  

Doc:

 

Unless shown otherwise, all expressions shown below return a dataframe

 

Behind the Scenes

Dataset: distributed collection of items
Collection of rows
Called DataFrame, but it is not a Pandas DataFrame
A DataFrame is a Dataset organized into named columns. Conceptually equivalent to a table or a data frame in Python

Data frames are implemented on top of RDDs. RDDs are immutable and their operations are lazy.
Transformations on RDDs return transformations. However, nothing is calculated.
Actions, such as collect(), trigger the actual computation. Otherwise, the computation is not done.

Shared variables:

Clusters:
High concurrency
Standard (recommended for single users)
single node, just for explore

Cluster mgr: connects to all nodes
Driver: has the SparkContext
Worker Node: has an executor. The executor performs the tasks

pool: allows clusters just released to be reused

driver node can be of a different type than the worker nodes. Generally, keep the same

select the latest runtime possible (LTS)
high concurrency for shared clusters, and standard for single users
enable auto-termination
use AWS spot instances if possible
Generally, prefer fewer but larger nodes, because many operations cannot be run in parallel, instead of many smaller nodes

DBU=processing capability per hour
jobs compute: lowest rate, for automated jobs
sql compute: idem, but not always available
all-purpose: higher rate, high concurrency
spot: lower costs , but can be terminated when prices go up

Compare Spark to MapReduce (not to HDFS=Hadoop Distributed File System)
Spark can also use HDFS
Most of the improvement of Spark over MapReduce is the fact that Spark does not have to write to disk at the end of every operation

 

 

Installation

pipenv install pyspark
sudo apt install default-jre
(sudo apt install scala)
(sudo apt install py4j)

Note:
Set SPARK_LOCAL_IP if you need to bind to another address
Set export SPARK_LOCAL_IP=127.0.0.1
127.0.0.1 did not seem to work
hostname resolves to a loopback address: 127.0.1.1; used 192.168.0.100 instead (IP in local network)

 

 

Start

Initialize with SparkSession
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark = SparkSession.builder.appName('a-name').getOrCreate()
In the pyspark shell, this is not necessary as the session is automatically stored in the spark variable
Both lines above initialize the spark session. The second line associates it with an application. Only one of the two lines is necessary.

Get functions:
import pyspark.sql.functions as F

 

As with anywhere else in Python, do dir(obj) to see the methods and help(obj) to get more details (if available).
Get documentation on parameters: print(an_object.explainParams().replace("\n", "\n\n"))

 

Create Dataframe

df2= spark.createDataFrame([ [1, 2., 'abcd']
                           , [2, 3., 'efgh']
                           , [3, 5., 'ij']
                           ]
                           , schema='a long, b double, c string'
                          )

from pyspark.sql import Row
df = spark.createDataFrame([ Row(a=1, b=2., c='abcd')
                           , Row(a=2, b=3., c='efgh')
                           , Row(a=3, b=5., c='ij')
                           ]
                           , schema='a long, b double, c string'
                          )
df = spark.createDataFrame([
    ['red', 'banana', 1, 10], ['blue', 'banana', 2, 20], ['red', 'carrot', 3, 30],
    ['blue', 'grape', 4, 40], ['red', 'carrot', 5, 50], ['black', 'carrot', 6, 60],
    ['red', 'banana', 7, 70], ['red', 'grape', 8, 80]], schema=['color', 'fruit', 'v1', 'v2'])
df.show()

When all else fails, take apart a data frame:
for one_row in tokenized.select(["words",]).collect():
    print(one_row["a_column_name"]

Extract column values:
for one_col in one_row:
    print("Column in one row:",type(one_col),one_col)

 

 

Out of the Box Functions

df.count()
df.show(truncate=False)
df.value.contains("Sp")) (verify syntax; is "contains" only for pandas?)
df.first()

.cache() # cache the data for each re-use

 

 

Work with schemas

See the structure with: df.printSchema()

from pyspark.sql.types import StructField,StringType,IntegerType,StructType
data_schema = [StructField('age', IntegerType(),True)    # name, type, boolean as nullable (True means nullable)
              ,StructField('name', StringType(), True)
              ]
final_struct = StructType(fields=data_schema)
df = spark.read.json('....', schema=final_struct)
# summary
df = spark.read.json('....', schema=StructType(fields=[StructField('age', IntegerType(),True)    # name, type, boolean as nullable
                                                      ,StructField('name', StringType(), True)
                                                      ]))

Note that letting pyspark infere the schema usually works.

 

 

Look at Structure

df.printSchema() # also df.schema, but this is for passing as an argument
df.columns # show columns (this is an attribute, not a method). See also under "Columns"
df.describe().show() # See also below
df.head() # See also under "Get Rows"
df.show(truncate=False)

Profiling

df.select(df.columns).describe().show()
or just:
df.describe().show() (this may only show numeric columns)

Format the numbers and the headers:

dscr = df.describe()
dscr.select( dscr["summary"]
           , f.format_number(dscr["Open"].cast("float"), 2).alias("open")
           , f.format_number(dscr["Close"].cast("float"), 2).alias("close")
           , f.format_number(dscr["vol"].cast("int"), 2).alias("vol")
           ).show()

Count values (profiling)

for c in df.columns:
    dfc = df.groupBy(c).count()
    # print(c, ": ", df.select(c).distinct().count(), sep="")   this is the same as next
    print(c, ": ", dfc.count(), sep="")
    dfc.orderBy(dfc["COUNT"].desc()).show(5)

Count distinct values for all columns:

for c in df.columns:
    di = df.select(c).distinct()
    if di.count() < 6:
        # if not too many distinct values, then show them
        print(c, ":") #, str([v for v in di.values()]))
        df.groupBy(c).count().show()
    else:
        # otherwise, show just the count
        print(c, ": ", df.select(c).distinct().count(), sep="")

Count all rows:
print(df.count())

 

Distinct and Counts

Equivalent to SELECT DISTINCT
df.select(["CNTRY_NM", "CNTRY_CD"]).distinct()

Equivalent to SELECT COUNT(DISTINCT ...)
df.select(["CNTRY_NM", "CNTRY_CD"]).distinct().count() # distinct().count() returns a number

 

 

 

Get Rows

df.show()
df.show(1) # top 1 row
df.show(1, vertical=True) # Display vertically. Useful for long values
df.head(n) --> row object
df.head(n)[0] --> row object
df.head(n)[0][0] --> First value
for row in df.head(n):.... # in case a loop is really necessary...
df.take(1) # number is optional
df.tail(1) # number is optional

result = df.filter(...).collect() # use this to keep the data for future use. List of row objects.
result[0] # First row as row object
result[0].asDict() -> dictionary for one row (where result = ....collect() as shown above)

 

 

Columns

df['col'] --> column object
df.col --> column object
df.select('col') --> dataframe with a single column (and I can do .show())
df.select(['col','col2']) --> dataframe, here with two columns
df.select(df.col) --> dataframe with column selected via its column object
df.select(df.col1, df.col2) --> dataframe with two column objects

Column objects don't really give you data. If you want to see the data for a column object "col", put it back into the dataframe like this: "df.select(col).show()"

Add columns:
df2 = df.withColumn('cc', upper(df.c)) # 'cc' is new col name, second argument is a column object (not a dataframe object)
df2 = df.withColumn("Ratio", df["A"]/df["B"]).select(f.format_number("Ratio",2)) # Add column and format

Rename columns:
df2 = df.withColumnRenamed('old_name', 'new_name') # just rename

Alias:
df.select(avg('col').alias('alias-to-display'))

 

 

Types

import pyspark.sql.types as T
df.a.cast(T.StringType())

 

 

Updates

Unless shown otherwise, all expressions shown below return a dataframe

Functions

import pyspark.sql.functions as F
F.upper(...) or str.upper()
upper, lower, length, ascii, base64, unbase64, trim,l,r, instr(str,substr), substring(str,pos,len), split(str,pattern,limit=-1)
concat, concat_ws(sep,*cols)
format_number(col,d) # where d is the number of decimals

format_string(format,*cols)

Dates

from pyspark.sql.functions import dayofmonth,month,year,weekofyear,dayofweek
from pyspark.sql.functions import hour,dayofyear
from pyspark.sql.functions import format_number,date_format
df.select(['Date', dayofmonth(df['Date'])]).show()
Date stored as : datetime.datetime(2022,4,25,0,0)
df.withColumn("Year",year(df['aDate'])).show() # add a column

Strings

size
split
df.value.contains("Sp")) (verify syntax; is "contains" only for pandas?)

Missing data

Keep as nulls

Or drop the rows
df.na.drop() # drop any row with any null
df.na.drop(thresh=2) # drop if 2 or less non-null values
df.na.drop(how='all') # drop if all are null
df.na.drop(subset=['col']) # drop if values in col are null

Or fill in with values
df.na.fill(0) # replace null with 0 in numeric columns
df.na.fill('') # replace null with '' in string columns
df.na.fill('n/a', subset=['col'])

Example of filling the missing values with the average value of the column:
import pyspark.sql.functions as f
avg_value = df.select(mean(df['the_column'])).collect()[0][0]
df.na.fill(avg_value, subset=['the_column'])

Equivalent to NVL (See "CASE WHEN ELSE" for replacing with a value from another column)
df_countries = df.select(["CNTRY_NM", "CNTRY_CD", "CRCD"]).distinct().na.fill("NULL", subset=["CNTRY_CD"])

 

 

Join

one_df.join(other_df, on=None, how=None)
on: str, list, or Column a string for the join column name, a list of column names, a join expression (Column), or a list of Columns
how (string): inner (default), cross, outer, full, fullouter, full_outer, left, leftouter, left_outer, right, rightouter, right_outer, semi, leftsemi, left_semi, anti, leftanti and left_anti
examples for "on":
name # column must be on both sides
one_df.name == other_df.name
[one_df.name == other_df.name, one_df.dob == other_df.dob]

Equivalent to JOIN
df_crcd.join(df_avg_funding_rcvd, on=["CRCD", "YEAR"], how="left").join(df_avg_tgt_peo, on=["CRCD", "YEAR"], how="left")

 

 

Filter, Sort, and Aggregate

Unless shown otherwise, all expressions shown below return a dataframe

Filter

df.filter(df.a == 1).show() # column object in the filter
df.filter("a = 1").show() # condition in sql syntax
When combining coniditions, use "&" "|" "~" for "and", "or", and "not" and surround each individual condition with "(...)"

These two are equivalent:
df.filter("Close < 60").count()
df.filter(df["Close"] < 60).count()

And these two:
100.0 * df.filter("High > 80").count()/df.count()
100.0 * df.filter(df["High"] > 80).count()/df.count()

Equivalent to CASE WHEN ELSE. Use multiple .when if needed.
from pyspark.sql.functions import when
df = df.withColumn("CRCD", when(df["CD"].isNull(), df["NM"]).otherwise(df["CD"]))

Another option, not tried:
from pyspark.sql.functions import coalesce
df.withColumn("B",coalesce(df.B,df.A))

Equivalent to WHERE
df.filter(df["CNTRY_NM"] == "Mali").show()

Remove rows where value is 0
df.filter(df["AVG_FUNDING"] != 0).filter(df["AVG_PEOPLE"] != 0)

 

Sort

df.orderBy('col').show() # ascending
df.orderBy(df['col'].desc()).show() # descending

df.orderBy(df["High"].desc()).head(1)[0][0]

 

Aggregate

df.groupby('col_dim').avg('meas1','meas2').show()
The .groupby() returns a GroupedData object, and the aggregation methods return dataframes:
i.e. a.groupby("col").mean() returns a dataframe.
Other aggregates: avg(), max(), count(), sum() (all with ())

Aggregate without group by: The argument is a dictionary, of which the keys are the column names and values are the desired aggregation
df.agg({'col': 'sum'}).show()
Note that the doc says df.agg(...) "shorthand for" df.groupby().agg(...) i.e. using an empty groupby method

Combine the two methods described above (groupby and agg):
df.groupby('col_dim').agg({'col': 'sum'}).show()

Another option:
from pyspark.sql.functions import countDistinct,avg,stddev
df.select(avg('Sales').alias('Avg Sales')).show()

Pyspark.sql.GroupedData functions: agg, apply, appliInPandas, count, pivot. And avg, mean, max, min, sum

df.select(f.max("Volume"), f.min("Volume")).show()
# this does aggregation with the functions max and min

Get the max (or other aggregate value):
df.agg({"col_name": "max"}).head(1)[0][0]

Pivot:
df.groupBy(...).pivot("pivot-column", [list of values that will become columns]).aggr_fctn
If the list of values is not provided, the process first determines the list. It is therefore more efficient to provide the list if it is known
Unpivot:
from pyspark.sql import functions as F
stack_expr = "stack(number-of-columns, 'col1', col1, 'col2', col2, ... , 'coln', coln) as (col-where-cols-will-go, col-where-values-will-go)"
un_pivot_df = df.select("non-pivoted-col", F.expr(stack_expr)).where("col-where-values-will-go is not null")

The ".where(...)" removes the rows with null and can be ommitted

All Together

df.filter(...).select(['c1', 'c2']).groupby('col_dim').avg('meas1','meas2').show()
???: df.orderBy(...).filter(...).select(['c1', 'c2']).groupby('col_dim').avg('meas1','meas2').show()
df.select(['c1', 'c2']).where("'c1' like 'abc%'") (notice backticks in the where)

df.filter(df["High"] == df.agg({"High": "max"}).head(1)[0][0]).select("Date").head(1)[0][0]

df.withColumn("Year", f.year(df["Date"])).groupby("Year").max("High").orderBy("Year").show()
df.withColumn("Month", f.month(df["Date"])).groupby("Month").avg("Close").orderBy("Month").show()

Select the date where a column has the maximum value:
df.filter(df["High"] == df.agg({"High": "max"}).head(1)[0][0]).select("Date").show()

Equivalent to WHERE, GROUP BY, AVG, AS (renamed columns, alias)
Notice parentheses and operands of or ("|"), and ("&"), and not ("~")
df_avg_tgt_peo = df.filter((df["METRIC"] == "People in need") | (df["METRIC"] == "People targeted")).groupBy(["YEAR", "CRCD"]).avg("VALUE").withColumnRenamed("AVG(VALUE)", "AVG_PEOPLE")

 

 

 

Spark SQL

Make the data frame look like a table:
df.createOrReplaceTempView("tablea") ("registers" the dataframe as a table)
Now, levarage the SQL syntax to perform queries.
Remember that, as with other spark operations, the data is not evaluated until a .show() or other action is performed.
spark.sql("SELECT count(*) from tablea").show()
dfnew = spark.sql("SELECT count(*) from tablea")

Some Equivalents

PySparkspark.sql(...)
.groupBy("prediction").count().show() select count(*), prediction from tb group by prediction

 

 

Input / output

Generally, no additional imports needed

Text file:
textFile = spark.read.text("zen.txt")
df.withColumn('o', F.concat_ws('|', dff.aa, dff.b)).select('o').write.text('abc.txt')

CSV file:
df.write.csv('ff.csv', header=True)
spark.read.csv('ff.csv', inferSchema=True, header=True).show()

Parquet file:
df.write.parquet('ff.parquet')
spark.read.parquet('ff.parquet').show()

 

 

Machine Learning - Tools

Split between training and test data

trn_dta, tst_dta = dta.randomSplit([0.7, 0.3])

Patterns:

from .. import ..
the_class_inst = TheClass(...)
the_class_out = the_class_inst.transform(input_data)
from .. import ..
the_class_inst = TheClass(...)
the_class_model = the_class_inst.fit(input_data)
the_class_out = the_class_model.transform(input_or_test_data)
# or, on one line:
the_class_out = the_class_inst.fit(input_data).transform(input_data)

Assemble Vector column from multiple columns

from pyspark.ml.feature import VectorAssembler
assembler_inst = VectorAssembler(inputCols=["...", "...", ], outputCol = "nm")
# notice plural in inputCols
assembler_out = assembler_inst.transform(input_data)

Transform strings into indexes, because pyspark ML cannot take strings

from pyspark.ml.feature import StringIndexer
indexer_inst = StringIndexer(inputCol="nm", outputCol="nm")
indexer_model = indexer_inst.fit(input_data)
indexer_out = indexer_model.transform(input_data)

Scale data so that large values to not skew

from pyspark.ml.feature import StandardScaler
scaler_inst = StandardScaler(inputCol="nm", outputCol="nm", withMean=True, withStd=True)
scaler_model = scaler_inst.fit(input_data)
scaler_out = scaler_model.transform(input_data)  # notice that the same input data is used for fitting and transforming

Tokenize

from pyspark.ml.feature import Tokenizer
t = Tokenizer(inputCol="nm", outputCol="nm")
output_data = t.transform(input_data)

Tokenize with regex

from pyspark.ml.feature import RegexTokenizer
t = RegexTokenizer(inputCol="nm", outputCol="nm", pattern="\\W")
output_data = t.transform(input_data)

Remove Stop Words

from pyspark.ml.feature import StopWordsRemover
r = StopWordsRemover(inputCol="nm", outputCol="nm")
output_data = r.transform(input_data)

N-Grams

from pyspark.ml.feature import NGram
n = NGram(n=2, inputCol="nm", outputCol="nm")
output_data = n.transform(input_data)

Term Frequency, Inverse Document Frequency

from pyspark.ml.feature import HashingTF, IDF
tf = HashingTF(inputCol="nm", outputCol="nm_out1")
output1_data = tf.transform(input_data)
idf = IDF(inputCol="nm_out1", outputCol="nm")
idf_model = idf.fit(output1_data)
output2_data = idf_model.transform(output1_data)

Count Vectorizer

from pyspark.ml.feature import CountVectorizer
cv = CountVectorizer(inputCol="nm", outputCol="nm", vocabSize=20, minDF=2)
# vocabSize is vocaulary size, approx 20 (I did not count)
# minDF is the minimum frequency for the word to be taken into account.  Here, with 2, each word has to be in at least 2 documents(???)
model = cv.fit(input_data)
output_data = model.transform(input_data)

Pipeline

from pyspark.ml import Pipeline
pipeline_inst = Pipeline(stages=[inst1, inst2, ])
pipeline_model = pipeline_inst.fit(input_data)
output_data = pipeline_model.transform(input_data)

Machine Leaning - Linear Regression

from pyspark.ml.regression import LinearRegression
lr_inst = LinearRegression(maxIter=10, featuresCol="nm", labelCol="nm", predictionCol="nm", regParam=0.3, elasticNetParam=0.8)
lr_model = lr_inst.fit(trn_dta)
lr_model.coefficients
lr_model.intercept
pred = lr_model.transform(dta)

Evaluate linear regression

lr_model.summary is the same as trn_rslt with trn_rslt = lr_model.evaluate(trn_dta) (?)

tst_rslt = lr_model.evaluate(tst_dta)
tst_rslt.r2
tst_rslt.meanAbsoluteError
tst_rslt.rootMeanSquaredError
tst_rslt.meanSquaredError

Machine Learning - Classifiers

Logistic Regression

from pyspark.ml.classification import LogisticRegression
lr_inst = LogisticRegression(featuresCol="nm", labelCol="nm", predictionCol="nm")
lr_model = lr_inst.fit(trn_dta)
lr_model.coefficients
tst_rslt = lr_model.transform(tst_dta)
pred = lr_model.transform(dta)

Naive Bayes

from pyspark.ml.classification import NaiveBayes
nb_inst = NaiveBayes(featuresCol="feature", labelCol="label", predictionCol="prediction")
# same pattern with model = inst.fit(); rslt = model.transform()

Decision Trees

from pyspark.ml.classification import DecisionTreeClassifier
dtc = DecisionTreeClassifier(featuresCol="nm", labelCol="nm", predictionCol="nm", maxDepth=5, maxBins=32)
# same pattern with model = inst.fit(); rslt = model.transform()
dtc_model.numNodes
dtc_model.featureImportances

Random Forest

from pyspark.ml.classification import RandomForestClassifier
rtc = RandomForestClassifier(featuresCol="nm", labelCol="nm", predictionCol="nm", numTrees=20)
# same pattern with model = inst.fit(); rslt = model.transform()

GBT Classifier

from pyspark.ml.classification import GBTClassifier
gbt = GBTClassifier(featuresCol="nm", labelCol="nm", predictionCol="nm")
# same pattern with model = inst.fit(); rslt = model.transform()

Evaluators for Classification

Binary Classifier

from pyspark.ml.evaluation import BinaryClassificationEvaluator
bin_clsf_inst = BinaryClassificationEvaluator(rawPredictionCol="nm", labelCol = "nm")
bin_clsf_inst.evaluate(tst_rslt)

Multi-class classifier (offers more metrics)

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
acc_eval = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")
precision_eval = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="precisionByLabel")
recall_eval = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="recallByLabel")
true_pos_eval = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="truePositiveRateByLabel")
false_pos_eval = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="falsePositiveRateByLabel")
acc = acc_eval.evaluate(test_results)
precision = precision_eval.evaluate(test_results)
recall = recall_eval.evaluate(test_results)
true_pos_rate = true_pos_eval.evaluate(test_results)
false_pos_rate = false_pos_eval.evaluate(test_results)

Code for confusion matrix

test_results.createOrReplaceTempView("test_result")
the_sql = """
select count(*)
     , label
     , prediction 
  from test_result 
  group by label, prediction 
  order by label, prediction
"""
pred_val = spark.sql(the_sql)
pred_val.show()

for one_row in pred_val.collect():
    if one_row["label"] == 0 and one_row["prediction"] == 0:
        true_neg = one_row["count(1)"]
    if one_row["label"] == 0 and one_row["prediction"] == 1:
        false_pos = one_row["count(1)"]
    if one_row["label"] == 1 and one_row["prediction"] == 0:
        false_neg = one_row["count(1)"]
    if one_row["label"] == 1 and one_row["prediction"] == 1:
        true_pos = one_row["count(1)"]
print(f"|{true_neg+true_pos+false_neg+false_pos:8d}|pred pos|pred neg|")
print(f"|actu pos|{true_pos:8d}|{false_neg:8d}| (true pos |false neg)")
print(f"|actu neg|{false_pos:8d}|{true_neg:8d}| (false pos|true neg )")
print(f" ")
print(f"True positive rate, recall, sensitivity: {true_pos/(true_pos+false_neg)}")
print(f"False negative rate: {false_neg/(true_pos+false_neg)}")
print(f"False positive rate, probability of false alarm, fall-out: {false_pos/(false_pos+true_neg)}")
print(f"True negative rate, specificity, selectivity: {true_neg/(false_pos+true_neg)}")
print(f" ")
print(f"Positive predictive value, precision: {true_pos/(true_pos+false_pos)}")
print(f"False omission rate: {false_neg/(false_neg+true_neg)}")
print(f"False discovery rate: {false_pos/(true_pos+false_pos)}")
print(f"Negative predictive value: {true_neg/(false_neg+true_neg)}")
print(f" ")
print(f"Accuracy: {(true_pos + true_neg)/(true_neg+true_pos+false_neg+false_pos)}")
See https://en.wikipedia.org/wiki/Confusion_matrix

Machine Learning - K Means

K Means

from pyspark.ml.clustering import KMeans
kmeans_inst = KMeans(featuresCol="nm", k=2, seed=1) # seed is optional
kmeans_model = kmeans_inst.fit(input_data)
kmeans_model.clusterCenters()
pred = kmeans_model.transform(input_data)   # no labels, so no training vs test

 

 

Pandas

df.toPandas() # this can cause an out-of-memory error

Note that Pandas DataFrames are "eagerly evaluated", which means that all the data has to fit in memory

import pyspark.pandas as ps
psdf = ps.DataFrame({'id': range(10)}).sort_values(by="id")
psdf.spark.explain()
This is like an explain plan
"Exchange" means that the nodes swap data. Ideally, we do not want this.
"Exchange SinglePartition" means it is using only one partition. Ideally, we want to use all.

def plus_mean(pandas_df):
    return pandas_df.assign(v1=pandas_df.v1 - pandas_df.v1.mean())
df.groupby('color').applyInPandas(plus_mean, schema=df.schema).show()

https://www.udemy.com/course/spark-and-python-for-big-data-with-pyspark/learn/lecture/5856256?src=sac&kw=spar#overview
Spark and Python for Big Data with PySpark

 

 

Databricks

Databricks Installation

https://databricks.com/try-databricks
Training: https://academy.databricks.com/
user guide: https://docs.databricks.com/user-guide/index.html
Community edition, personal
Create a cluster, then create a notebook. Start with "import pyspark"
upload a file
df = sqlContext.sql("select * from the_table")

 

Using Databricks

New workspace

Basic Notebook:

import pyspark

df = sqlContext.sql("select * from . . .;")
# Databricks creates the context automatically

 

PySpark on AWS EC2

Create EC2 instance, all traffic (restricted to my IP)

Run these on the instance:

sudo apt update
sudo apt install python3-pip
sudo apt install default-jre
java -version
sudo apt install scala
scala -version
pip install pyspark
pip install jupyter
Run these on the instance:

If the pip install pyspark does not work, try these:

pip py4j

go to spark.apache.org then downloads.
wget https://www.apache.org/dyn/closer.lua/spark/spark-3.3.0/spark-3.3.0-bin-hadoop3.tgz

on EC2 instance:

jupyter notebook --generate-config
sudo openssl req -x509 -nodes -days 365 -newkey rsa:1024 -keyout ~/.ssh/forjupytercert.pem -out ~/.ssh/forjupyterkey.pem
vi ~/.jupyter/. . .. .py   # the config file
# add the following:
c = get_config()
c.NotebookApp.certfile = u'/home/ubuntu/.ssh/forjupytercert.pem'
c.NotebookApp.ip = '*'
c.NotebookApp.open_browser = False
c.NotebookApp.port = 8888

# start the jupyter notebook
# and modify the link to put the IP address of the EC2 instance

I encountered the following issues:

 

  

  


Python with AWS Lambda

  

Minimal lambda function:

def lambda_handler(event, context):
    print("event::", event)
    return {
        'statusCode': 200,
        'body': json.dumps('Hello from Lambda!')
    }
 

PATH variable includes specific folders in the /opt directory.
Layer paths for each Lambda runtime
Python:

Example of a deployment:

Zip into a zip file:

python.zip
   lambda_function.py
   other_module.py
   python/abc.py
   python/def.py

The zip file can have any name, but it must have a python sub-directory.

The main executable is lambda_function.py. Other files can exist at the same level.
Import any additional module as import another_module

Example of a layer:

Zip into a zip file:

python.zip
   python/abc.py
   python/def.py
   python/requests      # all files for "requests" package
   python/another_package

The zip file can have any name, but it must have a python sub-directory.

Upload to a custom layer.

Then, in lambda function, do: import abc, def
Note: abc.py can call other libraries, such as pandas, as long as pandas is in a layer

Create a layer for packages

Remember to update the version in the lambda functions using the layer.

How to create a layer with downloaded packages:

The packages have to be downloaded in the same environment as AWS Linux. I successfully did this by spinning up an AWS Linux instance.

Use the "--target" option so as to isolate the desired package and its dependencies. Installing in the default location (lib/python3.8/site-packages/ and lib64/python3.8/site-packages/) will include the setuptools and the wheel and possibly other packages, and it will split the dependencies betweebn lib and lib64.

If I am creating the main deployment package, then add the lambda_function.py function to the root of the zip.

The zip files should be less than 50MB in size. Otherwise, upload to the function from an S3 bucket.

 

S3

List objects

s3 = boto3.resource("s3")
bucket = s3.Bucket(bucket_name)
rsp = bucket.objects.all()
# or
s3 = boto3.client("s3")
rsp = s3.list_objects(Bucket=bucket_name)

Upload a string to a file

s3 = boto3.resource("s3")
bucket = s3.Bucket(bucket_name)
object = bucket.put_object(
        Body=data_string_containing_the_data.encode("utf-8"),
        Key=the_key_meaning_the_file_name
    )

 

Upload a file

s3 = boto3.resource("s3")
bucket = s3.Bucket(bucket_name)
rsp = bucket.upload_file(file_name, object_name)
# or
s3 = boto3.client("s3")
rsp = s3.upload_file(file_name, bucket_name, object_name)

 

Download an object to a string

s3 = boto3.resource("s3")
bucket = s3.Bucket(bucket_name)
returned_data = io.BytesIO()
bucket.download_fileobj(Key=the_key_meaning_the_file_name, Fileobj=returned_data)
print("returned data:\n", returned_data.getvalue())
returned_data.close()

 

Required permissions:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::the_bucket_name"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:PutObject",
                "s3:GetObject"
            ],
            "Resource": [
                "arn:aws:s3::: the_bucket_name/*"
            ]
        }
    ]
}

Notice: ListBucket on bucket, PutObject and GetObject on objects, meaning bucket followed by asterisk.

 

See documentation on boto3

  

  


Python with AWS Glue

  

Boilerplate Code

Top of file

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

from awsglue.dynamicframe import DynamicFrame # needed for fromDF()

Show status: put in cell
%status

Initialize Context

glueContext = GlueContext(SparkContext.getOrCreate())
spark = glueContext.spark_session

sc = SparkContext()
glueContext = GlueContext(sc)

Dynamic Frames

Read via catalog:
glue_dyfr = glueContext.create_dynamic_frame.from_catalog(database="..", table_name="..")
glue_dyfr.printSchema()

Count rows:
glue_dyfr.count()

Drop a field and rename another:
glue_dyfr = glue_dyfr.drop_fields(['col2', 'col3']).rename_field('col0', 'new_name')

Map columns:

col_mappings=[
    ('a_col', "string", 'a_col', "string"),
    ("col1", "string", "Country Code", "string"),
    ("col2", "string", "Indicator Name", "string"),
    ]
glue_dyfr = ApplyMapping.apply(frame = glue_dyfr, mappings = col_mappings)

Write to csv file:

glueContext.write_dynamic_frame.from_options(frame = glue_dyfr,
      connection_type = "s3",
      connection_options = {"path": "s3://chrisbucket41/gluej-exercise/tgt/"},
      format = "csv")

 

PySpark DataFrames

Select a subset of columns and show data:
glue_dyfr.select_fields(['a_col']).toDF().distinct().show()

Conversion

Convert to PySpark DataFram:
pyspark_df = glue_dyfr.toDF()

Convert back to Glue DynamicFrame:
from awsglue.dynamicframe import DynamicFrame
glue_dyfr = DynamicFrame.fromDF(pyspark_df, glueContext, "a-name")

Examples of Transformations

See also PySpark section for more details

Filter with SQL-like syntax:
pyspark_df = pyspark_df.where("'a_col' != 'Not classified'") # note back ticks
pyspark_df2 = pyspark_df2.filter((pyspark_df2['a_col'] != 'Country Name') & (pyspark_df2['a_col'] != 'Not classified'))

Unpivot:
from pyspark.sql import functions as F
unpiv_df = pyspark_df.select('a_col', F.expr(" num, 'col', col, ..."))

Aggregation:
c = unpiv_df.groupby('a_col').avg("pop")

Rename a column:
c = c.withColumnRenamed('avg(pop)', 'avg_pop')

Join:
pyspark_df = pyspark_df.join(c,'a_col',"left")

columns:
pyspark_df.createOrReplaceTempView("countries")
spark.sql("select count(*) from countries").show()

Convert to a list of lists:
pyspark_df.select(yr_cols).collect()[0]

Start jobs

In Python

job_name = "job_from_workbook"
import boto3
glue = boto3.client(service_name='glue', region_name='us-east-1', endpoint_url='https://glue.us-east-1.amazonaws.com')
myNewJobRun = glue.start_job_run(JobName=job_name) # This starts the job
print(glue.get_job_run(JobName=job_name, RunId=myNewJobRun['JobRunId'])['JobRun']['JobRunState'])

With AWS CLI

Upload script file with UI or with:
aws s3 cp job.py s3://bucket/folder/

Start job in UI or with:
aws glue start-job-run --job-name "job_name"

Get job progress in UI or with:
aws glue get-job-run --job-name "job_name" --run-id "j..."
Get the run-id from the return when the job was started

Jupyter

Put this in the first cell:
%region us-east-1
%iam_role arn:aws:iam::aws-account-id:role/AWSGlueServiceRoleDev
%worker_type G.1X
%number_of_workers 2
%idle_timeout 30

  

  


Snowflake and Python

  

Anaconda is natively available inside Snowflake.

https://developers.snowflake.com

test query:
select current_version()

The account is the first part of the URL provided at sign-up (the part before ".snowflakecomputing.com")


 

Snowflake SQL Syntax

select current_version() as v;

CREATE WAREHOUSE IF NOT EXISTS whname;    -- X-Small by default (this is the smallest)
USE WAREHOUSE whname;
CREATE DATABASE IF NOT EXISTS dbname;
USE DATABASE dbname;
CREATE SCHEMA IF NOT EXISTS schname;
CREATE OR REPLACE TABLE               schname.tbname(. . .);
CREATE            TABLE IF NOT EXISTS schname.tbname(. . .);
ALTER WAREHOUSE IF EXISTS whname RESUME IF SUSPENDED;  -- To start up the warehouse
ALTER WAREHOUSE           whname SUSPEND;              -- To suspend the warehouse

CREATE OR REPLACE FILE FORMAT a_format_name
  TYPE = 'CSV'
  FIELD_DELIMITER = ';'
  SKIP_HEADER = 1
-- or
  TYPE = 'JSON'
  STRIP_OUTER_ARRAY = TRUE
  ;

-- temporary json table
CREATE OR REPLACE TEMPORARY TABLE a_json_table (json_data VARIANT);

-- Create stage (recommended if loading often from the same location
-- It is probably best to put the schema name too
CREATE OR REPLACE STAGE sch_nm.stage_name FILE_FORMAT = a_format_name;

For PUT and LIST, see Snowpark below.

 

aaaaaaaaaaaaaaa

Snowflake Connector

installed python version 3.9
installed virtualenv

ran python -m pip install -r requirements_39.reqs
requirements taken from page:
https://github.com/snowflakedb/snowflake-connector-python/tree/main/tested_requirements

Snowflake Snippet

import snowflake.connector as sc
with sc.connect(
           account = "abc"
         , user = un
         , password = pw
         , warehouse = " WH"
         , database = "DB"
         ) as cn:

    sql = "select current_version()"
    with cn.cursor() as cr:
        cr.execute(sql)
        r = cr.fetchone()
        print(r[0])

Alternate

import snowflake.connector
params = {"account": . . ., "user": . . ., "password": . . .,
          "warehouse": . . ., "database": . . .}
with snowflake.connector.connect(param) as cn:
    with cn.cursor() as cr:
        cr.execute("select * from . . .")
        for r in cr.fetchall():
            print(r)

Write to a table:
df.write.mode("overwrite").save_as_table("table1")

Create a view:
df.create_or_replace_view(f"{database}.{schema}.{view_name}")

Select json data elements:
select col_name:json_base_element.elmt.elmt :: float, . . . . from . . .;
Types float, int, . . .

Pandas:
cr.execute(sql)
df = cr.fetch_pandas_all()

 

Snowpark Installation

Use version 3.8 of Python (as of Nov 2022)
This page said 3.9 was OK: https://docs.snowflake.com/en/user-guide/python-connector-install.html
I installed, but then the pip install "snowflake-snowpark-python[pandas]" command said that only 3.8 was possible.

In Linux, dowloaded gz from python.org and extracted to /usr/local/lib
sudo apt install libssl-dev libffi-dev
(If needed, do sudo dpkg-reconfigure pkg-name ????? and maybe sudo apt install libdvd-pkg)
pip install "snowflake-snowpark-python[pandas]"

See more details in "Virtual Environments", under Installation.

 

Snowpark

from snowflake.snowpark import Session
from snowflake.snowpark.functions import avg  # or other function
from snowflake.snowpark.functions import when
from snowflake.snowpark.functions import col
# alternate:
import snowflake.snowpark.functions as f

params = { "account":   sf_account
         , "user":      sf_user
         , "password":  sf_pw
         , "warehouse": "wsmall"
         , "database":  "first"
         }
s = Session.builder.configs(params).create()
df = s.table("tb_nm")   # PySpark df???
df.write.mode("overwrite").save_as_table("new tb nm")
df = s.sql(sql)   # no ";" !
df.show()
s.close()

Upload from local (two forward slashes after "file:"). Returns a dataframe:
s.sql("PUT file://C:\. . .\basefilename*.csv @sch_nm.stage_name AUTO_COMPRESS=TRUE;").show() -- windows
s.sql("PUT file:///. . ./basefilename*.csv @sch_nm.stage_name AUTO_COMPRESS=TRUE;").show() -- linux
These return a dataframe with the list of files.
Look at the "status" column. "UPLOADED" means successful.
You can put again (overwrite), in which case the status shows "SKIPPED"

Show the files in the stage (returns a dataframe)
s.sql("list @sch_nm.stage_name ;").show()

Remove all the files in the stage. Notice the trailing slash (returns a dataframe).
s.sql("remove @sch_nm.stage_name/ ;").show()

Copy single file from stage to table

COPY INTO a_table
  FROM @stage_name/file_name.ext.gz -- put file name here for single file
                                    -- get the file name from the list
  FILE_FORMAT = (FORMAT_NAME = a_format_name)
  ON_ERROR = 'skip_file'            -- or leave out if I want to fail on error
  ;

Copy multiple files from stage to a single table

COPY INTO a_table
  FROM @stage_name                     -- put just stage name for multiple files
  FILE_FORMAT = (FORMAT_NAME = a_format_name)
  PATTERN='.*basefilename[1-5].csv.gz' -- put a pattern for the files
                                       -- pattern appears to always start with "."
  ON_ERROR = 'skip_file'               -- or leave out if I want to fail on error
  ;

 

Write dataframe to a table
df.write.mode("overwrite").save_as_table("schema_name.table_name") # returns None

Snowflake does not support indexes
Options for parameter if_exists: 'fail', 'replace', 'append'

See snippet python_snippets.html#snowpark1

 

Snowpark:

Look at later:

continue https://python-course.eu/python-tutorial/packages.php and https://docs.python.org/3/tutorial/modules.html

look at this: https://towardsdatascience.com/6-new-features-in-python-3-8-for-python-newbies-dc2e7b804acc

do this: https://docs.snowflake.com/en/user-guide/data-load-internal-tutorial.html

  

  


Misc. Packages

  

Other packages described below:
airflow
boto3 (AWS)
configparser
diagrams
django
faker
flask
graph-tool
itertools
json
jupyter
logging
networkx and pyvis
parquet
pylint
pytest
random
re (regular expressions)
requests
smtp
yaml
zeep

Some Useful Libraries

Zen of Python: import this

imageMagick:
pip3 install Wand

for python3, use pip3

pipenv install pyarrow

import antigravity Levity in Python

 

to format :
'pipenv install black
# if needed: pipenv install black --pre

 

Some Useful Libraries

Some useful libraries

Web develpment:

Data science:

ML / AI:

other:


 
Back to list of packages
 

Random Numbers

import random
random.random() # Generate random number between 0 and 1
[random.random() for _ in range(4)] # Four random number between 0 and 1
random.seed(n) # n is a number
random.randrange(m) # Randomly choose between 0 and m-1
random.randrange(m, n) # Randomly choose between m and n-1
random.shuffle(a_list) # Shuffle the list
random.choice(a_list) # Choose one
random.sample(a_list, n) # Choose n in the list
[random.choice(a_list) for _ in range(n)] # Choose n in the list, allowing duplicates (replacements)

  

 
Back to list of packages
 

Regular Expressions

import re
re.match("s", "long string") #
re.search("s", "long string") #
re.split("[ ;.,]", "long string") # splits based on separators [ ;,.]
re.sub("[1-9]", "0", "long string") #Substitute number with zeros

  

 
Back to list of packages
 

Airflow

Shell script:

export AIRFLOW_HOME=~/code/py/airflow
alias python='python3'

AIRFLOW_VERSION=2.2.3
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"

airflow standalone
python -m airflow standalone

# Visit localhost:8080 in the browser and use the admin account details
# shown on the terminal to login.
# Enable the example_bash_operator dag in the home page

 

 
Back to list of packages
 

boto3 (AWS)

See code snippets in separate file

pipenv install awscli 
pipenv install boto3 # use this to connect from py to AWS

s3_rec = boto3.resource('s3')
bucket_obj = s3.Bucket(bucket_name)
csv_obj = bucket.Object(key=o.key).get().get('Body').read().decode('utf-8')
df = pd.read_csv(StringIO(csv_obj), delimiter=',')

# write
out_buffer = StringIO()
df.to_csv(out_buffer, index=False)
bucket_obj.put_object(Body=out_buffer.getvalue(), Key=filename)

 

 
Back to list of packages
 

configparser

Ready-to-use sample:

########
# sample file:
[a_group]
param1=a_value
db_name=the_db.{env.id.upper}
########

import configparser
config_file = "/home/. . ./.aws/credentials"
config = configparser.ConfigParser()
config.read(config_file)
datatbase_name=config.get("a_group"," db_name")   # see sample above
param_one=config.get("a_group"," param1")         # see sample above
aws_access_key=config.get("default","aws_access_key_id")      # based on standard .aws
aws_secret_key=config.get("default","aws_secret_access_key")  # based on standard .aws

See under snippets python_snippets.html for a more detailed example

 

Note that spaces before and after the equal sign are ignored. Spaces after the value are ignored. Spaces in the value are kept.
A colon ":" can be used instead of an equal sign "="
The following are equivalent:
item=a value with spaces[EOL]
item: a value with spaces[EOL]
item = a value with spaces [EOL]

 

Hard-code in code:

config = configparser.ConfigParser()
    #config.read(cfg_fn)
    config.read_string("""
#put the contents of the config file
[a_group]
param1=a_value
db_name=the_db.abc
""")

 

 

See snippets:

 

 

 
Back to list of packages
 

Diagrams

Diagrams needs graphviz


# pipenv install diagrams
# pipenv install graphviz
# Also download graphviz separately
from diagrams import Diagram, Cluster
import os
os.environ["PATH"] += os.pathsep + r"C:\progfile\Graphviz\bin"
import graphviz

######################################


from diagrams.aws.compute import EC2
from diagrams.aws.network import ELB
from diagrams.aws.network import Route53

with Diagram("descr", show=True, direction="TB") as diag:
    # TB = towards bottom
    dns = Route53("dns")
    load_balancer = ELB("Load Balancer")
    with Cluster("Webservers"):
        svc_group = [EC2("wb 1"),
                     EC2("wb 2"),
                     EC2("wb 3")]
    dns >> load_balancer >> svc_group >> dns2

diag


######################################

from diagrams.generic.database import SQL
from diagrams.generic.storage import Storage
from diagrams.programming.flowchart import Document
from diagrams.programming.flowchart import Database
from diagrams.programming.flowchart import StoredData


with Diagram("stored proc") as diag:
    src = Storage("the table")
    sp = SQL("stored prod")
    tgt = Storage("the tgt table")
    st = StoredData("sd")

    src >> sp >> tgt

diag



https://diagrams.mingrammer.com/

 
Back to list of packages
 

JSON

json.dumps(something_complex) # serializes the object
json.dump(x, f) # serializes and writes to file f
x = json.load(f) # reads it back

import json
json.dump(obj, fp, ensure_ascii=True, # fp is a file pointer
     indent=None, # None gives compact, 4 gives a prettier output
     separators=None, #(item_separator, key_separator), such as (',', ':')
    )
json.dumps(obj, ensure_ascii=True) # dumps to string
print(json.dumps(obj))

a_dict=json.load(fp,
     parse_float=None,
     parse_int=None,
     parse_constant=None,
   )
a_dict=json.loads(a_string)

  

json to python conversion (json --> python)

Dates as "YYYY-MM-DDTHH:MM:SS.sss"
If I get the error 'Object of type datetime is not JSON serializable, then provide a default function for serializing: json.dumps(the_object, default=str).

  

 
Back to list of packages
 

Requests

See also Flask

Doc: https://2.python-requests.org//en/latest/user/quickstart/

#pip install requests
import requests
import json

response = requests.get("http://api.open-notify.org/astros.json")
print("Status=",response.status_code)
print("Response=",response.json())

# the response has three parts: message, request, response

import json
json.dumps(obj, sort_keys=True, indent=4)

 

See also zeep

Parquet

Doc:

import pandas as pd
import pyarrow   # this is a parquet engine   Alternative: fastparquet

pd.read_parquet(parquet_file, engine='auto')

 

 
Back to list of packages
 

Flask

See also requests

Shell Script:

#!/bin/bash
export FLASK_APP=min_flask
python3 -m flask run

File min_flask.py

from flask import Flask, redirect, url_for

app = Flask(__name__)

@app.route("/")
def hello_world():
    return "<p>Namou</p>"

# export FLASK_APP=min_flask
# python3 -m flask run

@app.route("/h/")
def help_r():
    return """<p>/h help<br />
            /hello/a-name<br />
            /param/help  --&gt; see /h<br />
            /param/helloxyz  --&gt> see hello<br />
            /param/something<br />
            /id/an-integer</p>"""
# end with / to be canonical


@app.route("/hello/<the_name>/")
def show_name(the_name):  # parmameter has to be what is beween < and >
    if type(the_name) is str:
        return "<h1>Hello "+the_name+"! str</h1>"
    else:
        return "<h1>Hello "+the_name+"!xnot a strx</h1>"


@app.route("/id/<int:the_id>/")
def get_id(the_id):
    return "<h1>The ID =  "+str(the_id)+"</h1>"


@app.route("/param/<the_param>/")
def get_param(the_param):
    if the_param=="help":
        return redirect(url_for('help_r'))
    elif the_param.startswith("hello"):
        return redirect(url_for('show_name', the_name=the_param[5:]))
    else:
        return "<p>Unknown parameter: '"+the_param+"'</p>"
# needs: from flask import redirect, url_for

Another example:

pipenv shell
pipenv install flask

file:

from flask import Flask, request, jsonify

app = Flask(__name__)

@app.route('/')
def test():
    return "Hello World"

@app.route('/pt', methods=['GET'])
# http://127.0.0.1:8080/p?pm=a-value
def test_param():
    param = request.args.get("pm")
    return jsonify({"found": param})

if __name__ == '__main__':
    app.run(port=8080)

https://flask.palletsprojects.com/en/2.0.x/tutorial/layout/

/yourapplication
    /yourapplication
        __init__.py
        /static
            style.css
        /templates
            layout.html
            index.html
            login.html
            ...

 

 
Back to list of packages
 

Jupyter

pipenv install jupyter
Type: jupyter notebook
In the upper right, click on "new".

Shortcuts:

Clear output: Cell > all output > clear

Markdown: tables
|Header|Cells|
|---|---|
|Body of|the table|

 

 
Back to list of packages
 

Networkx and pyvis

Installation:
pip3 install networkx
pip3 install pyvis # For visualization

 

Top of file

import networkx as nx    # skip if just visualizing
from pyvis.network import Network
G = nx.Graph()
# G = nx.DiGraph()     # directed graph

 

Add nodes and edges

G.add_nodes_from([
    (4, {"color": "red", "label": "4"}),
    (5, {"color": "green", "label": "5"}),
    (6, {"color": "purple", "label": "6"}),
    (7, {"intensity": "strong", "label": "7"}),
])

# An edge with a node not entered earlier is added automatically (here 1)
G.add_edges_from([
    (1,4,{'weight': 3, 'label': "1-4", "color": "yellow"}),
    (4,5),
    (6,4,{'weight': 2}),
    (7,4,{'hidden': True})
])
G.add_edges_from([(5,5)])

G.remove_node(103) # removes its edges too

# add labels to all nodes
for n in G.nodes:
    G.nodes[n]['label'] = "n" + str(n)

 

Work with nodes and edges

G.number_of_nodes()
G.number_of_edges()
G.nodes
G.edges
for n in G.nodes:
    print("node: ",n)
    print("adj:",G.adj[n])
    print("deg: ",G.degree[n])

 

Show

print(G)

 

Visualize with pyvis

g = Network(directed=True)      # directed=True to see the arrows
g = Network(height="1500px", width="100%", bgcolor="#222222", font_color="white", directed=True)
g.from_nx(G)                    # if the graph was built with networkx
g.width="75%"                   # best for viewing with buttons, otherwise 100%
g.show_buttons(filter_=['physics']) # remember the underscore
g.set_edge_smooth("dynamic") # show multiple arrows

# select one of the following:
g.barnes_hut()  # the default
g.force_atlas_2based()
g.hrepulsion()

g.show("give_a_file_name.html")

 

Build network with pyvis

g.add_node("...", color="#...", shape="box")
g.add_edge("from...","to...", color="#...")

Node shapes:

Text has no shape, just the text

 




https://networkx.org/documentation/stable/tutorial.html

graph types:
Graph, DiGraph, MultiGraph, and MultiDiGraph


For edges, if there is only one numeric attribute, then use the 'weight' keyword for the attribute



list(nx.connected_components(G))
nx.clustering(G)

sp = dict(nx.all_pairs_shortest_path(G))



for e in list(G.edges)



dag

list(nx.topological_sort(graph)) # => ['root', 'a', 'b', 'd', 'e', 'c']

nx.is_directed_acyclic_graph(graph) # => True

https://networkx.org/documentation/stable/reference/algorithms/dag.html#






from matplotlib import pyplot as plt
g1 = nx.DiGraph()
g1.add_edges_from([("root", "a"), ("a", "b"), ("a", "e"), ("b", "c"), ("b", "d"), ("d", "e")])
plt.tight_layout()
nx.draw_networkx(g1, arrows=True)
plt.savefig("g1.png", format="PNG")
# tell matplotlib you're done with the plot: https://stackoverflow.com/questions/741877/how-do-i-tell-matplotlib-that-i-am-done-with-a-plot
plt.clf()


g2 = nx.DiGraph()
g2.add_edges_from([(1, 2), (2, 3), (3, 4), (4, 1)])
plt.tight_layout()
nx.draw_networkx(g2, arrows=True)
plt.savefig("g2.png", format="PNG")
plt.clf()

try this

https://towardsdatascience.com/graph-visualisation-basics-with-python-part-ii-directed-graph-with-networkx-5c1cd5564daa
https://networkx.org/documentation/stable/auto_examples/drawing/plot_directed.html



Further documentation:

 

Some notes about graphs:
See https://en.wikipedia.org/wiki/Flowchart
Some key words: IDEF1X, Data flow, Yourdon, DeMarco

 

 
Back to list of packages
 

Graph-tool

 

import graph_tool.all
https://graph-tool.skewed.de/

 

 
Back to list of packages
 

Logging

Basic outputs to console:

import logging
logging.basicConfig(level=logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, or CRITICAL
logging.info('something')   # notice lower case here, and upper case above when selecting the level

levels, in decreasing level of verbosity:
DEBUG, INFO, WARNING, ERROR, CRITICAL
Each level displays messages of its level and those to the right

Formatted output to file

import logging
logging.basicConfig( filename=__name__ + ".log"
                   , level=logging.DEBUG
                   , format="%(asctime)s:%(levelname)s:%(name)s:%(filename)s:%(module)s:%(funcName)s:%(lineno)s:%(msg)s"
                   )

%(asctime)s date and time
%(filename)s Python script file name (lower case "n")
%(module)s name of module (often filename without the ".py")
%(funcName)s name of function (upper case "N"), "" if not in function
%(levelname)s shows DEBUG, INFOR, . . (lower case "N")
%(msg)s
See more at https://docs.python.org/3/library/logging.html

Only one basicConfiguration is allowed per logger.
The default logger is the root logger.
To add loggers, do this:
import logging # in every module
log_obj = logging.getLogger(__name__) # __name__ by convention, can be anything
log_obj.setLevel(logging.DEBUG) # I can set the level on the object (shown here) or on the handler (see below)
file_handler = logging.FileHandler("a file name") formatter = logging.Formatter("%(asctime)s:%(name)s. . . .")
file_handler.setFormatter(formatter)
file_handler.setLevel(logging.DEBUG) # I can set the level on the object (see above), or on the handler (shown here)
log_obj.addHandler(file_handler)
# then call the logging with the object, not logging:
log_obj.debug("the msg")

Add another handler:
stream_handler = logging.StreamHandler()
streaming_formatter = logging.Formatter("%(asctime)s:%(name)s. . . .")
stream_handler.setFormatter(streaming_formatter) # I can set a different formatter if I need
stream_handler.setLevel(logging.DEBUG) # I can set a different level, other than the logger level
logger.addHandler(stream_handler)

Question: if I do not set anything on the logger objects, do I get the default logger level configuration?

To show the traceback in case of an error, do:
logging.exception("the msg")
instead of
logging.error("the msg")
Or use the log_obj

The following two are equivalent:
logging.exception("") # better because I can put a message
and
logging.error(traceback.format_exc())

This results in an error:
logging.error("Error:", str(e))
logging.exception("Error:", str(e))

Suppress excessive debug messages in other packages:
logging.getLogger("module").setLevel(logging.INFO)
The "module" can be "package.sub.module"

https://docs.python.org/3/library/logging.html


 

See snippet: python_snippets.html#logging_and_config

 

 
Back to list of packages
 

pylint

linter: pylint

 

 
Back to list of packages
 

YAML

  

first-line begins with ---
End of the document with ...
each line of text contains key and value pairs like a map.
key and values are separated by a colon (:) and space
use spaces instead of the tab for indentation
# Comments , can be in middle of line

# list, with list (brackets) and with bullets. Both are equivalent
key1:
  - value1
  - value2
  - value3
  - value4
  - value5


or
key1: [value1,value2,
    value3,value4,value5]

# indent for nesting the arrays

# associative array (here list of two associative arrays with id and name)
 - id: 234
   name: abc
 - id: 567
   name: fgh

or
[{id: 234, name: abc}, [id, name]: [567, fgh}]

Strings do not need quotes, but I guess it is better with quotes
With double quotes, use \ backslash to escape
With single quotes, the only escape is double single quotes

In multiline strings, | preserves the newlines, > folds the newlines

A question mark can be used in front of a key, in the form "?key: value" to allow the key to contain leading dashes, square brackets, etc., without quotes.

Separate documents in the same stream with --- (triple dash). Optionally end a document with triple period

&anchor01 # define anchor label "anchor01"
*anchor01 # references the anchor01. It allows re-use of the data

Explicitely define a data type:
Key: !!str a string
Key2: !!float
Options: !!float, !!str, !!binary

parsers:
http://yaml-online-parser.appspot.com/
http://www.yamllint.com/

Good intro:
https://en.wikipedia.org/wiki/YAML
Official spec: https://yaml.org/spec/1.2.2/

YAML Python Package

import logging
import logging.config

import yaml

def main():
    """
        entry point to run the job.
    """
    # Parsing YAML file
    config = '...config.yml'
    config = yaml.safe_load(open(config))
    # configure logging
    log_config = config['logging']
    logging.config.dictConfig(log_config)
    logger = logging.getLogger(__name__)
    logger.info("This is a test.")   


if __name__ == '__main__':
    main()

yaml config file (needs more work):

# Logging configuration
logging:
  version: 1
  formatters:
    the_app:
      format: "The Name- %(asctime)s - %(levelname)s - %(message)s"
  handlers:
    console:
      class: logging.StreamHandler
      formatter: the_app
      level: DEBUG
  root:
    level: DEBUG
    handlers: [ console ]

  

  

 
Back to list of packages
 

Pytest

  

pytest has more features than the unittest package. The unittest package comes with python.

Documentation in https://docs.pytest.org/en/stable/

A test has four steps:

  1. Arrange (fixtures specifically help with this)
  2. Act
  3. Assert
  4. Cleanup


Basic commands:
pipenv shell
pipenv install pytest
pytest .                    # run the tests
pytest code/asdf.py -v      # increase verbosity
pytest code/asdf.py -s      # show output of prints
pytest -k "something"       # k is keyword flag. Executes only test functions with name "test_.....something...."
pytest -m a_marker          # Run only the tests with marker a_marker
pytest -m "not a_marker"    # Exclude tests with marker a_marker
pytest -v                   # Verbose output: show more information
pytest -s                   # Show printed output (the result of 'print(...)' statements
pytest --durations=0        # Track the time of execution


Some basic code:
import pytest

@pytest.mark.skip(reason="optional reason")          # marker for skipping a test:
def test_should_be_skipped() -> None:
    assert 1==2

@pytest.mark.skipif(3>1, reason="...")
def test_should_be_skipped() -> None:
    assert 1==2

@pytest.mark.xfail
def shows_as_xfail_and_not_fail() -> None:
# shows xpass or xfail in the results

@pytest.mark.any_marker
def ...
# then call with
# pytest .  -m slow
# only the marked functions are tested

@pytest.mark.django_db
def ...
# Makes the following function spin up a database and a transaction will be created just for that test, then rolled back when the test is completed


Fixtures

Fixtures are basically objects that appear in the code without me defining them.

Use for setup and teardown
The use of fixtures explicitely declares the dependencies (makes code more maintainable).
A fixture can use another fixture.
A fixture can have a function scope, class scope (i.e. run once per classe), module (run once per module), session (one per session)

Fixture Without Arguments

@pytest.fixture
def the_fixture():
    # do something
    return ...

def test_that_uses_fixture(params, the_fixture, params):   # the fixture is one of the parameters, without parentheses
    # pytest looks at all fixtures and sees one called "the_fixture"
    # it runs "the_fixture" and puts the result in the argument
    print(f"Printing {the_fixture} from fixture")

Fixture With Arguments

@pytest.fixture
def the_fixture():
    def _fixture_name(arg1, arg2):
        ...
        return ...
    def _fixture  # Here is the magic: return the function.  No ()

def fctn_that_uses_fixture(params, the_fixture, params):   # the fixture is one of the parameters, without parentheses
    # here, the_fixture is a function that takes an argument
    a = the_fixture(a1, a2)

You may want to put all fixtures in one file.
Place in the directory most appropriate for the scope of the fixtures.


Parametrize:

Allows running a test multiple times with differing data

@pytest.mark.parametrize(
        "the_input_param",
        ["A...", "B...", "C..."],
        )
def test_paramtrzed(the_input_param: str) -> None:
    print(f"\ntest with {the_input_param}")
# run multiple times with the different values

Indirect:
Look at the documentation.
Basically, when indirect=True, the parameters are passed to the "request" object from where they are extracted.
The parameters go to the fixture. Inside the fixture, the request object holds the parameters. Extract the parameters from the "request" object. The fixture object then returns the required data.


Assert Exception Raising
def fctn_that_raises_an_exception() -> None:
    raise ValueError("an excep")

def test_raise_an_exception_should_pass() -> None:
    with pytest.raises(ValueError):
        fctn_that_raises_an_exception()

def test_raise_an_exception_should_pass_and_test_message() -> None:
    with pytest.raises(ValueError) as e:
        fctn_that_raises_an_exception()
    # this tests the text of the message
    assert "an excep" == str(e.value)
    # Note that we need "str()" of the e.value\

Test the test by doing a "pass" instead of "raise" in the called function fctn_that_raises_an_exception()


pytest.ini:

Add options in the pytest.ini file instead of typing pytest -v each time:
[pytest]
addopts = -v -s
addopts = -v -s --durations=0

Markers have to be registered in pytest.ini. The pytest.ini file is at the root of the project.

[pytest]
markers =
    this_is_a_marker: and this is the comment.  Use this in test...py file: @pytest.mark.this_is_a_marker


Test logging:

Initialize the logger (as always)
Note to pass something called "caplog" as the parameter to the test function.
In the text function, simply assert that a given string is in the caplog.text
Note that only warning or error or ciritical logs can be tested
See sample code for testing "info" level. Put "with caplog.at_level(logging.INFO):" and do the testing inside the section (this is a context manager)


python-xdist

Allows multiple threads

pipenv install python-xdist
pytest -n NUMCPUS
pytest -n auto   # uses max number of CPUs


Allure
Pip install allure-pytest
pytest -alluredir=/tmp/test_rslts
allure serve /tmp/test_rslts


pytest-sugar

pip install pytest-sugar
Gives a nicer display of test results

 
Back to list of packages
 

Mocking

Goal: concentrate on one set of functions without getting exceptions from other functions that are out of scope for my develoment and testing.


Mocking Requests

This means we do not send requests to the third party, but we build responses that look like (mock) the real responses. However, remember to also test that the mocked responses are the same as the real responses.


Patching

Replaces an object with another. If there is a function I do not want to run, "I can patch it out".

The patch function mocks the functionality of a given function. The parameter is the function that we want to mock Mocking is creating an object with something that has the same behavior, but with simpler dependencies


MagicMock
from unittest.mock import Mock, MagicMock
a_mock_obj = MagicMock()
object_that_returns_a_value = MagicMock(return_value="my value")
object_that_raises_an_error = MagicMock(side_effect=ValueError("aaa"))


Sample Code for Mocking
file mma.py
def f2mock():
    print("in f2mock")
    return 1234

file mmm.py
import mma
def f():
    print("f start")
    x = mma.f2mock()
    return x

from unittest.mock import patch, MagicMock
import module_that_has_f_and_db_write as mmm

# patching: first param is a string with target, second param is object to use instead
# patch out "mma.f2mock" because that is what shows in the code
# patch where the function is used, not where it is defined
@patch("mma.f2mock", MagicMock(return_value=13))    
def test_mock_out_db_write():
    assert mmm.f() == 13

# alternate: use a context manager:
def test_mock_out_db_write():
    assert mmm.f() == 1234
    with patch("mma.f2mock", MagicMock(return_value=13)) as mock_dbwri:
       assert mmm.f() == 13

Option #1 for designating the function to be mocked:

file mmm.py
import mma

def f():
    return mma.f2mock()

file test_mmm.py
from unittest.mock import patch, MagicMock
import mmm

def test_mock_out_f2mock():
    assert mmm.f() == 1234
    with patch("mma.f2mock", MagicMock(return_value=13)) as mock_dbwri:
       assert mmm.f() == 13

Option #2 for designating the function to be mocked:

file mmm.py
import mma as abc      # as abc here

def f():
    return abc.f2mock()   # call abc.f2mock() not mma.f2mock()

file test_mmm.py
from unittest.mock import patch, MagicMock
import mmm

def test_mock_out_f2mock():
    assert mmm.f() == 1234
    with patch("mma.f2mock", MagicMock(return_value=13)) as mock_dbwri:  # notice mma here, and not abc
       assert mmm.f() == 13

Option #3 for designating the function to be mocked:

file mmm.py
from mma import f2mock as ddd   # import just the function, and rename

def f():
    return ddd()     # calling with just ddd()

file test_mmm.py
from unittest.mock import patch, MagicMock
import mmm

def test_mock_out_f2mock():
    assert mmm.f() == 1234
    with patch("mmm.ddd", MagicMock(return_value=13)) as mock_dbwri:  # notice mmm.ddd here
       assert mmm.f() == 13
responses package
pipenv install responses

@responses.activate
def test_where_i_want_to_mock():
    responses.add(method=responses.GET, url=anyurl, json={what I want to simulate}, status=200)  # can be any status
    rsp = requests.get(anyurl)  # note: same url as above
    assert rsp.json() == {what I want to simulate}

 

 

 

 
Back to list of packages
 

Django

 

See https://www.djangoproject.com/

pipenv install djangorestframework

django-admin startproject the-name                     # this creates the service
cd the-name
python manage.py runserver   # .../first-sub-dir-where-manage.py-is-located
python manage.py migrate
python manage.py createsuperuser
#go to http://127.0.0.1:8000/admin
python manage.py startapp  application-name            # create a new application inside the service
python manage.py makemigrations application-name       # then migrate after each change

You may have to add:
export PYTHONPATH=/..../root_directory_of_project

In models.py file: create class, inherit from models.Model. Add attributes
In the admin.py file, register the application.
Create a serializer, and inherit from models serializer.
Create view sets in the views.py file.
Put URLs in urls.py in the app directory. This file maps the routes (urls) to the functions

from rest_framework import routers
from .views import CompanyViewSet

app_router = routers.DefaultRouter()
app_router.register("app_prefix", viewset=CompanyViewSet, basename="companies")   # CompanyViewset is defined in the views.py file

In the urls.py for the whole server, add:
from api.service_name.app_name.urls import app_router
add this line:
path("", include(app_router.urls))

Send email automatically when posting to ..../send_email

For testing, import the Django TestCase class from django.test.
Use the "in-memory backend"
Test by looking at the outbox. Asset that it has nothing before starting, and 1 email after starting.
We do NOT want it to fail silently


 
Back to list of packages
 

Faker

https://pypi.org/project/Faker/

from faker import Faker
fake = Faker()   # default "en_US"
# other options: 'it_IT', 'fr_FR', ['en_US', 'ja_JP']

fake.name()
fake.address()
fake.text()

Also, command line:
faker --version


 

 
Back to list of packages
 

itertools

import itertools (standard library)

Something is iterable if it has a method called "__iter__".
This allows the interator to remember its state.
Take any iterable object (list, tuple, . . .) and get its interator:
it = obj.__iter__()
it = iter(obj) # alternative syntax that does the same thing
Then, do next(it) as needed to get the successive values, or of course do for a in it:

print(next(a_counter))
print(next(a_counter))
print(next(a_counter))
Note: there is no end on a counter. Going past the end throws an error

Iterators can only go forward.

a_counter = itertools.counter() # optional parameters start and step (decimal possible)

c = itertools.cycle(a_iterator) # cycle thru forever

c = itertools.repeat(a_value) # repeat with an optional parameter times=

itertools.combinations(lst, n) # (order does not matter)
itertools.permutations(lst, n) # (order matters: (1,2) and (2,1) both listed)
itertools.combintations_with_replacements(lst, n) # combinations that allow repeats
itertools.product(lst, repeat=n) # cartesian product
itertools.product([0, 1], repeat=4) # all possible values of 4 bits

Get part of a generator, and make a new generator (rememeber, nothing is yet calculated):
itertools.islice(gn, 7) # The first 7
itertools.islice(gn, 2, 7) # Skips 2, then returns the next 7-2=5
itertools.islice(gn, 2, 7, 2) # step by 2, starting with the first
Note that these three arguments are the same as for range().


 

 
Back to list of packages
 

Send Emails with smtp

import smtplib (standard library)

I have not tried this yet.

with smtplib.SMTP('server.com', 587) as ms:
    ms.ehlo()
    ms.starttls()
    ms.ehlo()
    ms.login(u, pw)
msg = "subject: . . .\n\n. . . ."
smtp.sendmail(my_email, rcver_email, msg)

Run a local mail server. Note that it has no tls and no login:
python -m smtpd -c DebuggingServer -n localhost:a_port_num

Alternative:

with smtplib.SMTP_SSL('server.com', 465) as ms:
    ms.login(u, pw)
msg = "subject: . . .\n\n. . . ."
smtp.sendmail(my_email, rcver_email, msg)

Make handling of message part easier:

from email.message import EmailMessage
msg = EmailMessage()
msg["Subject"] = ". . ."
msg["From"] = ". . ."
msg["To"] = ". . ."   # for multiple, do a list, or a string with comma separated emails
msg.set_content(". . .")
smtp.send_message(msg)
smtp.add_alternative(html_str, subtype="html")

Attach file

with open("file name", "rb") as f:
    d = f.read()
    img_type = "jpeg"   # or use package imghdr: imghdr.what("file name")
    fn = f.name
msg.add_aytachment(d, maintype="image", subtype="jpeg", filename=fn)  # or result of the "what()"

Generic: maintype="application", subtype="octet-stream"


 
Back to list of packages
 

zeep

See also requests

 
Back to list of packages
 

  

  


Tutorials

  

http://www.python-course.eu/python3_deep_copy.php
continue with http://www.python-course.eu/python3_recursive_functions.php
http://www.sthurlow.com/python/lesson01/
http://wiki.python.org/moin/BeginnersGuide/Programmers
Brent Welch's "Practical Programming in Tcl and Tk"

 

Read later:

Documentation:

 


Tips

 

Context Manager:

When opening files, use a context manager. This forces the file to close if the code throws an error.
Instead of:
f = open(...)
f.close() Do this:
with open(...) as f:

 

No bare "except" clause.

With a bare "except" clause, a ctrl-C triggers an exception, yet I want to stop the execution.
Instead, do "except ValueError:" or whatever error type. Otherwise, do "except Exception e:"
It is better to not handle the exception than to do a "pass". Remove useless exception clauses.

 

Type Checking

Instead of checking for a type this way:
If type(a) == a_type:
Do
If isinstance(a, a_type):

 

The is keyword:
If x is None:
If x is True:   or if x
If x is False:  or if not x

 

Zipping

Note: the result has the length of the shortest input. zip([1,2],[]) gives []

a = ["a", "b", "c", "d"]
b = [1,2,3]
c = ["α", "β", "γ"]
z = zip (a,b,c)
[zz for zz in z]
# returns: [('a', 1, 'α'), ('b', 2, 'β'), ('c', 3, 'γ')]

 

Loop of Dictionary Keys

To loop over keys of a dictionary, do for k in d not for k in d.keys() as .keys() is not necessary here.

Note: if I am to modify the keys, then iterate over acopy: for k in list(d)

 

Tuple unpacking

Instead of
a = mytuple[0]
b = mytuple[1]
do
a, b = mytuple

If I do not know the full length of the tuple:
a variable preceded by an asterisk takes on the list of all remaining values, and is empty if there are none.
a, b, *c, d = (1,2,3,4,5,6)
Or optionally if I want to not use the values (good practice, not required):
a, b, *_ = (1,2,3,4)

 

No Echo on Screen

For inputing password on the screen:

import getpass
u = input("username: ")
p = getpass.getpass("password: ")

 

Enumerate Lists

Instead of for i in range(len(lst)) do

lst = [3,4,5]
for i,a in enumerate(lst, start=1):

or, when there are two lists:

a = [3,4,5]
b = [5,6,7]
for aa,bb in zip(a,b):

When counting in the two lists:

a = [3,4,5]
b = [5,6,7]
for i, (aa,bb) in enumerate(zip(a,b)):

Note, enumerate starts at 0 by default; add start=... if needed.

 

Counter inside a loop

See also "enumerate".

Instead of

i=0
for a in lst:
    ...
    i += 1

Do

for i,a in enumerate(lst):
    ...

At the end, i will contain the number of loops that were done
Of course, this does not work if I have a conditional count

 

pep8

Pep8 is a style guide. See https://peps.python.org/pep-0008/

 

Do not use mutable default arguments

Never pass a mutable object as a default value for parameter. Instead of

def asdf(lst=[]):
    # lst is an empty list at first call, but this picks up the previous value in the subsequent calls

Do this:

def asdf(lst=None):
    # instead, set "lst=None")

This is because Python evaluates the inputs when creating the function. Set the default to None instead.

Watch this unexpected result: (sneaky behavior to be aware of):

import time
import datetime
def show_curr_time(t=datetime.datetime.now()):   # default value is the the current time when the function is created!
    print(t)

show_curr_time()
time.sleep(1)
show_curr_time()  # same value as above

 

Last Element of List or String

The last element of a list or string is at the position len(...)-1.
The following code will throw an error: a_string[len(a_string)].
Instead, do this a_string[len(a_string)-1], or better still a_string[-1].

Related to this, the following will throw an error:
the_string[the_pos] if len(the_string) >= the_pos else '' when the_pos == len(the_string).
Use the following code instead (notice ">" not ">="):
the_string[the_pos] if len(the_string) > the_pos else ''

 

Separators

Display large numbers with separators "_" (underscore):
x = 32_439_318 Print as follows (but it only takes a comma): print(f"{x:,}")

 

 

Editors:

  


py web

  

https://docs.python.org/2/howto/webservers.html
http://fragments.turtlemeat.com/pythonwebserver.php
http://www.linuxjournal.com/content/tech-tip-really-simple-http-server-python
https://wiki.python.org/moin/WebProgramming
http://docs.python-guide.org/en/latest/scenarios/web/
mod_wsgi (Apache) (Embedding Python)
modwsgi allows you to run Python WSGI applications on Apache HTTP Server.
https://pypi.python.org/pypi/mod_wsgi
http://modpython.org/
http://www.onlamp.com/pub/a/python/2003/10/02/mod_python.html

 


Some Tools

  

See details on package installation: https://packaging.python.org/tutorials/installing-packages/

The Classical Language Toolkit https://github.com/cltk/cltk

natural language pipeline that supports massive multilingual applications https://pypi.python.org/pypi/polyglot/

Text fabric, includes graph-like approach https://pypi.python.org/pypi/text-fabric/
See also (collection of richly annotated data sources, including HB and GNT) https://github.com/ETCBC/text-fabric-data

Library for working with neo4j and graphs https://github.com/bruth/graphlib/

Another library https://github.com/svasilev94/GraphLibrary

Graph visualization https://pypi.python.org/pypi/graphistry/0.9.51

High performance graph data structures and algorithms https://pypi.python.org/pypi/python-igraph/0.7.1.post6
See also http://igraph.org/python/doc/tutorial/tutorial.html

Graphyne is a smart graph - a property graph capable to actively reacting to changes and incorporating decision making logic, written in Python https://github.com/davidhstocker/Graphyne

 


Last