Engenharia de dados
Joins em PySpark
Unindo dados à esquerda e à direita com PySpark
Quando você começar a trabalhar duro com engenharia ou ciência de dados vai acabar notando que, muitas das vezes, senão todas, seus scripts reunirão inúmeras fontes de dados fazendo do join um método tão necessário e especial.
Este artigo cobrirá a capacidade de que o Spark tem de unir diferentes fontes de dados ou linhas de uma tabela, tratando de como funcionam as junções para que você possa entender como o Spark as executa na integra.
Como bônus, estudaremos o tipo crossJoin e o uso de filtros numa cláusula join.
Por dentro de um join
Um join une dois ou mais conjuntos de dados, à esquerda e à direita, ao avaliar o valor de uma ou mais expressões, determinando assim se um registro deve ser unido ou não a outro:
esquerda.join(direita, expressão, tipo)
A expressão de junção mais comum que há é a de igualdade. Ela compara se as chaves do DataFrame esquerdo equivalem a do DataFrame direto. Se o retorno for verdadeiro o Spark combinará os registros.
É possível usar expressões de junções mais sofisticadas e complexas que a de igualdade. É possível até adicionar os métodos where ou filter a expressão - você só tem que se certificar que a expressão retorne um Booleano.
Os tipos de joins
Se você já teve alguma experiência com banco de dados relacional ou até mesmo com planilhas eletrônicas, o conceito de juntar diferentes conjuntos de dados não lhe será penoso.
Ainda assim, para facilitar o entendimento, vamos seguir com exemplos de cada um dos tipos de join do PySpark. Dito isso, será necessário criar dois DataFrames: um representando a tabela do lado esquerdo e o outro representando a tabela do direito.
- DataFrame representando o lado esquerdo:
from pyspark.sql.session import SparkSessionspark = SparkSession.builder.appName('Joins').getOrCreate()schema = ['id', 'nome', 'gênero', 'dept_id']
data = [
(1, 'Robert', 'M', '10'),
(2, 'Bill', 'M', '20'),
(3, 'Brooke', 'F', '30'),
(4, 'Matei', 'M', '40'),
(5, 'Fulano', 'M', '50')
]left_df = spark.createDataFrame(data=data, schema=schema)
left_df.show()+---+------+------+-------+
| id| nome|gênero|dept_id|
+---+------+------+-------+
| 1|Robert| M| 10|
| 2| Bill| M| 20|
| 3|Brooke| F| 30|
| 4| Matei| M| 40|
| 5|Fulano| M| 50|
+---+------+------+-------+
2. DataFrame representando o lado direito:
schema = ['dept_id', 'dept_nome']
data = [
(10, 'Data Engineer'),
(20, 'Product Manager'),
(30, 'Machine Learning Lead'),
(40, 'Chief Technologist'),
(60, 'Engineering Director')
]right_df = spark.createDataFrame(data=data, schema=schema)
right_df.show(truncate=False)+-------+---------------------+
|dept_id|dept_nome |
+-------+---------------------+
|10 |Data Engineer |
|20 |Product Manager |
|30 |Machine Learning Lead|
|40 |Chief Technologist |
|60 |Engineering Director |
+-------+---------------------+
Inner join
O inner é o join padrão do Spark e, por conta disso, provavelmente o mais usado. Esta junção cria uma interseção ao unir registros correspondentes e descarta em ambos os lados os registros onde as chaves não correspondem:
No seguinte exemplo, unimos o DataFrame esquerdo, left_df, com o DataFrame direito, right_df, a partir da coluna dept_id. O inner avalia as chaves em ambos os DataFrames e une apenas os registros que casam ou são tidos como verdadeiro.
inner_df = (
left_df
.join(right_df, left_df.dept_id == right_df.dept_id, 'inner')
.drop(right_df.dept_id))inner_df.show(truncate=False)+---+------+------+-------+---------------------+
|id |nome |gênero|dept_id|dept_nome |
+---+------+------+-------+---------------------+
|1 |Robert|M |10 |Data Engineer |
|3 |Brooke|F |30 |Machine Learning Lead|
|2 |Bill |M |20 |Product Manager |
|4 |Matei |M |40 |Chief Technologist |
+---+------+------+-------+---------------------+
Como é possível observar, apenas os registros que possuem um valor na coluna dept_id do DataFrame esquerdo equivalentes a coluna dept_id do DataFrame direito foram retornados.
Não há um valor que combine com o dept_id = 50
do DataFrame left_df ou com o dept_id = 60
do DataFrame right_df e, por conta disso, os últimos registros das tabelas esquerda e direita são descartados sumariamente.
Se não houver correspondência entre as chaves, o inner retornará um conjunto vazio.
Eu imagino que você deve ter achado este código um tanto redundante, e com razão, por sorte, para a nossa conveniência, o mesmo pode ser escrito de uma forma muito mais enxuta:
left_df.join(right_df, on=['Key'], how='Type')
Onde:
- left: DataFrame esquerdo,
- right: DataFrame direito,
- on: colunas a se unir — deve haver em ambos os DataFrames,
- how: tipo do join — left, right, outer, inner…
Então:
inner_df = (
left_df.join(right_df, on=['dept_id'])
.orderBy('dept_id'))inner_df.show(truncate=False)+-------+---+------+------+---------------------+
|dept_id|id |nome |gênero|dept_nome |
+-------+---+------+------+---------------------+
|10 |1 |Robert|M |Data Engineer |
|20 |2 |Bill |M |Product Manager |
|30 |3 |Brooke|F |Machine Learning Lead|
|40 |4 |Matei |M |Chief Technologist |
+-------+---+------+------+---------------------+
Outer join
Também conhecida como full
, esta junção combina todos os registros de ambos os conjuntos de dados e onde a expressão de junção não corresponder, um valor nulo será retornado nos respectivos campos.
Deste modo, a tabela final irá conter todos os registros de ambas as tabelas.
No exemplo a seguir, o join do tipo outer une todos os registros de ambos os DataFrames retornando um valor nulo para os campos que não correspondem a expressão de junção.
outer_df = (
left_df.join(right_df, on=['dept_id'], how='outer')
.orderBy('dept_id'))outer_df.show(truncate=False)+-------+----+------+------+---------------------+
|dept_id|id |nome |gênero|dept_nome |
+-------+----+------+------+---------------------+
|10 |1 |Robert|M |Data Engineer |
|20 |2 |Bill |M |Product Manager |
|30 |3 |Brooke|F |Machine Learning Lead|
|40 |4 |Matei |M |Chief Technologist |
|50 |5 |Fulano|M |null |
|60 |null|null |null |Engineering Director |
+-------+----+------+------+---------------------+
O registro dept_id = 60
tem valor nulo para id, nome e gênero por conta de não haver as mesmas colunas no DataFrame right_df. Do mesmo modo, o registro dept_id = 50
tem valor nulo para a coluna dept_nome
.
Left outer join
Também conhecido apenas como left
, retorna todas as linhas do conjunto de dados esquerdo — não importa o que aconteça.
Quando não há correspondência, ele atribui nulo para o registro esquerdo e elimina o direito.
O registro dept_id = 60
tem o valor dept_nome
nulo por não ter um correspondente no DataFrame right_df.
left_df = (
left_df.join(right_df, on=['dept_id'], how='left')
.orderBy('dept_id'))left_df.show(truncate=False)+-------+---+------+------+---------------------+
|dept_id|id |nome |gênero|dept_nome |
+-------+---+------+------+---------------------+
|10 |1 |Robert|M |Data Engineer |
|20 |2 |Bill |M |Product Manager |
|30 |3 |Brooke|F |Machine Learning Lead|
|40 |4 |Matei |M |Chief Technologist |
|50 |5 |Fulano|M |null |
+-------+---+------+------+---------------------+
Right outer
Assim como se espera, a junção right mantém as linhas do DataFrame direito. Isso faz dele justamente o oposto do join left.
Quando a expressão de junção não corresponde, ele atribui nulo para o registro direito e descarta o esquerdo.
Note que, diferentemente do left join, agora o registro dept_id = 60
tem nulo nos campos id, nome e gênero, pois este registro não tem correspondência no DataFrame left_df.
right_df = (
left_df.join(right_df, on=['dept_id'], how='right')
.orderBy('dept_id'))right_df.show(truncate=False)+-------+----+------+------+---------------------+
|dept_id|id |nome |gênero|dept_nome |
+-------+----+------+------+---------------------+
|10 |1 |Robert|M |Data Engineer |
|20 |2 |Bill |M |Product Manager |
|30 |3 |Brooke|F |Machine Learning Lead|
|40 |4 |Matei |M |Chief Technologist |
|60 |null|null |null |Engineering Director |
+-------+----+------+------+---------------------+
Left semi
O join do tipo left semi se parece ao inner, porém apenas os registros do lado esquerdo são retornados e os do lado direito são descartados.
Na realidade, o que acontece aqui é que os joins do tipo semi apenas comparam se o valor do DataFrame A existem no DataFrame B e assim retornam a diferença.
Os registros não combinados na expressão de junção são ignorados nos conjuntos de dados esquerdo e direito e, por conta disso, se não houver correspondência alguma entre as chaves, o left semi retornará um conjunto vazio.
left_semi_df = (
left_df.join(right_df, on=['dept_id'], how='left_semi')
.orderBy('dept_id'))left_semi_df.show(truncate=False)+-------+---+------+------+
|dept_id|id |nome |gênero|
+-------+---+------+------+
|10 |1 |Robert|M |
|20 |2 |Bill |M |
|30 |3 |Brooke|F |
|40 |4 |Matei |M |
+-------+---+------+------+
Left anti
Já o left anti faz exatamente o oposto do left semi. Ele compara se o registro esquerdo existe no DataFrame direito, porém, o seu diferencial é que, ao invés de manter os valores correspondentes no DataFrame direito, ele mantém apenas os valores que não possuem uma chave correspondente.
Em outras palavras, ele vai retornar todos os elementos do conjunto esquerdo se não houver alguma correspondência com o conjunto direito.
Note que o registro dept_id = 50
é o único a ser retornado visto que ele não tem correspondência no DataFrame right_df.
left_anti_df = (
left_df.join(right_df, on=['dept_id'], how='left_anti')
.orderBy('dept_id'))left_anti_df.show(truncate=False)+-------+---+------+------+
|dept_id|id |nome |gênero|
+-------+---+------+------+
|50 |5 |Fulano|M |
+-------+---+------+------+
Bônus
Cross
A junção cruzada ou produto cartesiano não especifica as colunas a se unir e devido a isso ela une cada registro do DataFrame esquerdo a cada registro do DataFrame direito.
Isso significa que, se você tiver 10.000 linhas em ambos os DataFrames, a junção cruzada resultará em um total de 100.000.000 de registros (10.000 x 10.000). Como é fácil de imaginar, esta junção pode facilmente consumir toda a memória do seu sistema computacional. Use-a com cautela.
Para usar a junção cruzada, você deve declará-la explicitamente usando a palavra reservada crossJoin:
cross_df = (
left_df.crossJoin(right_df)
.orderBy('nome'))cross_df.show()+---+------+------+-------+-------+--------------------+
| id| nome|gênero|dept_id|dept_id| dept_nome|
+---+------+------+-------+-------+--------------------+
| 2| Bill| M| 20| 30|Machine Learning ...|
| 2| Bill| M| 20| 10| Data Engineer|
| 2| Bill| M| 20| 20| Product Manager|
| 2| Bill| M| 20| 40| Chief Technologist|
| 2| Bill| M| 20| 60|Engineering Director|
| 3|Brooke| F| 30| 20| Product Manager|
| 3|Brooke| F| 30| 60|Engineering Director|
| 3|Brooke| F| 30| 10| Data Engineer|
| 3|Brooke| F| 30| 40| Chief Technologist|
| 3|Brooke| F| 30| 30|Machine Learning ...|
| 5|Fulano| M| 50| 40| Chief Technologist|
| 5|Fulano| M| 50| 60|Engineering Director|
| 5|Fulano| M| 50| 10| Data Engineer|
| 5|Fulano| M| 50| 20| Product Manager|
| 5|Fulano| M| 50| 30|Machine Learning ...|
| 4| Matei| M| 40| 60|Engineering Director|
| 4| Matei| M| 40| 10| Data Engineer|
| 4| Matei| M| 40| 40| Chief Technologist|
| 4| Matei| M| 40| 20| Product Manager|
| 4| Matei| M| 40| 30|Machine Learning ...|
+---+------+------+-------+-------+--------------------+
only showing top 20 rows
Join com filtro
É possível filtrar facilmente registros diretamente na cláusula join. Neste exemplo usaremos um filtro para trazer apenas o gênero masculino (M) no DataFrame esquerdo, já no direito, usaremos apenas os departamentos de número maiores que o 10.
join_filter_df = (
left_df.filter(col('gênero') == 'M')
.join(right_df.filter(col('dept_id') >= 20),
on=['dept_id'], how='inner'))
join_filter_df.show(truncate=False)
+-------+---+-----+------+------------------+
|dept_id|id |nome |gênero|dept_nome |
+-------+---+-----+------+------------------+
|20 |2 |Bill |M |Product Manager |
|40 |4 |Matei|M |Chief Technologist|
+-------+---+-----+------+------------------+
Neste artigo você viu como funcionam os joins do PySpark, quais são os seus tipos e como eles funcionam na integra.
Se você chegou até aqui, muito obrigado! E se este post foi de alguma forma útil para você considere “bater palma”.