skip to Main Content

I’m trying to learn PySpark better and I’m streaming tweets and trying to capture the hashtags from the tweet’s text (I know twitter API’s json already provides the hashtags, I’m doing this as an excercise).

So with a pyspark dataframe named Hashtags,

-------------------------------------------
Batch: 18
-------------------------------------------
+--------------------+--------------------+
|               value|            Hashtags|
+--------------------+--------------------+
|Instead, it has c...|[instead,, it, ha...|
|  #iran #abd #Biden |[#iran, #abd, #bi...|
+--------------------+--------------------+

I take the column "value", make it to lower case, split on whitespace/tab/newline thus creating an array column named "Hashtags", and then attempt to remove any elements with just whitespace, and any elements that don’t begin with "#".

Hashtags = Hashtags.withColumn("Hashtags", lower(Hashtags["value"]))
Hashtags = Hashtags.withColumn("Hashtags", split(Hashtags["Hashtags"], r's'))
Hashtags = Hashtags.withColumn("Hashtags", F.array_remove(Hashtags["Hashtags"], r's'))
Hashtags = Hashtags.withColumn("Hashtags", F.array_remove(Hashtags["Hashtags"], r'^(?!#).+'))

As far as I can tell, the array_remove() does remove elements with the regex r's' but it doesn’t remove elements that don’t begin with a "#".

I know the regex itself works outside of array_remove() because I tested it like this:

RegText = r'^(?!#).+'
print(re.findall(RegText, "#AnandWrites"), re.match(RegText, "#AnandWrites"))
print(re.findall(RegText, "AnandWrites"), re.match(RegText, "AnandWrites"))
print(re.findall(RegText, "withxe2x80xa6"), re.match(RegText, "withxe2x80xa6"))
print(re.findall(RegText, "An#andWrites"), re.match(RegText, "An#andWrites"))

which gives me the following result, indicating that it successfully matches strings that don’t begin with a "#"

[] None
['AnandWrites'] <re.Match object; span=(0, 11), match='AnandWrites'>
['withâx80¦'] <re.Match object; span=(0, 7), match='withâx80¦'>
['An#andWrites'] <re.Match object; span=(0, 12), match='An#andWrites'>

2

Answers


  1. array_remove cannot be used with regex. You can consider using filter with rlike instead:

    df2 = df.withColumn(
        'Hashtags', 
        F.expr(r"""
            filter(
                split(lower(value), '\s'), 
                x -> x not rlike '\s' and x not rlike '^(?!#).+'
            )
        """)
    )
    
    df2.show(truncate=False)
    +-----------------+---------------------+
    |value            |Hashtags             |
    +-----------------+---------------------+
    |Instead, it has  |[]                   |
    |#iran #abd #biden|[#iran, #abd, #biden]|
    +-----------------+---------------------+
    
    Login or Signup to reply.
  2. You can use the following udf (user defined function):

    from pyspark.sql.functions import udf
    from pyspark.sql.types import ArrayType, StringType
    
    def keep_hashtags(array):
        return [x for x in array if x.startswith("#")]
    
    keep_hashtags_udf = udf(keep_hashtags, ArrayType(StringType()))
    
    Hashtags = Hashtags.withColumn("Hashtags", keep_hashtags_udf(Hashtags["Hashtags"]))
    

    This should give you the desired result. Also, check my blog: https://byambaa1982.github.io/2023/02/11/pysparkv4.html

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search